35 while( (key = (
TKey*)nextkey()) ) {
60 while( (key = (
TKey*)nextkey()) ) {
86 if (destination==0 || source==0)
return;
90 while( (key = (
TKey*)nextkey()) ) {
98 if (!destination_subdir) {
99 destination_subdir = destination->
mkdir(key->GetName());
103 TKey *oldkey = destination->
GetKey(key->GetName());
108 TKey *newkey =
new TKey(destination,*key,0 );
127 ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
128 ClientInfo(
const char *filename,
UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
129 fLocalName.Form(
"%s-%d-%d",filename,clientId,
gSystem->
GetPid());
132 void Set(
TFile *file)
147 fTimeSincePrevContact = now.
AsDouble() - fLastContact.AsDouble();
153 struct ParallelFileMerger :
public TObject
155 typedef std::vector<ClientInfo> ClientColl_t;
158 TBits fClientsContact;
160 ClientColl_t fClients;
164 ParallelFileMerger(
const char *filename,
Bool_t writeCache =
kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(
kFALSE,
kTRUE)
168 fMerger.SetPrintLevel(0);
169 fMerger.OutputFile(filename,
"RECREATE");
170 if (writeCache)
new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
173 ~ParallelFileMerger()
177 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
178 fprintf(stderr,
"Client %d reported %u times\n",
f,fClients[
f].fContactsCount);
180 for( ClientColl_t::iterator
iter = fClients.begin();
181 iter != fClients.end();
191 return fFilename.Hash();
205 fMerger.AddFile(input);
218 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
219 fMerger.AddFile(fClients[
f].fFile);
225 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
226 if (fClients[
f].fFile) {
237 fNClientsContact = 0;
238 fClientsContact.Clear();
247 return fClientsContact.CountBits() > 0;
254 if (fClients.size()==0) {
261 for(
unsigned int c = 0 ;
c < fClients.size(); ++
c) {
262 sum += fClients[
c].fTimeSincePrevContact;
263 sum2 += fClients[
c].fTimeSincePrevContact*fClients[
c].fTimeSincePrevContact;
265 Double_t avg = sum / fClients.size();
269 if ( (now.
AsDouble() - fLastMerge.AsDouble()) > target) {
279 Float_t cut = clientThreshold * fClients.size();
280 return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
288 fClientsContact.SetBitNumber(clientId);
289 if (fClients.size() < clientId+1) {
290 fClients.push_back( ClientInfo(fFilename,clientId) );
292 fClients[clientId].Set(file);
335 kStartConnection = 0,
341 printf(
"fastMergeServerHist ready to accept connections\n");
351 if (clientCount > 100) {
352 printf(
"only accept 100 clients connections\n");
357 client->
Send(clientIndex, kStartConnection);
358 client->
Send(kProtocolVersion, kProtocol);
362 printf(
"Accept %d connections\n",clientCount);
370 Error(
"fastMergeServer",
"The client did not send a message\n");
374 printf(
"Client %d: %s\n", clientCount, str);
376 printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->
GetBytesRecv(),
380 if (mon->
GetActive() == 0 || clientCount == 0) {
381 printf(
"No more active clients... stopping\n");
398 const Float_t clientThreshold = 0.75;
400 ParallelFileMerger *info = (ParallelFileMerger*)mergers.
FindObject(filename);
402 info =
new ParallelFileMerger(filename,cache);
407 info->InitialMerge(
transient);
409 info->RegisterClient(clientId,
transient);
410 if (info->NeedMerge(clientThreshold)) {
412 Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
419 printf(
"*** Unexpected message ***\n");
426 ParallelFileMerger *info;
427 while ( (info = (ParallelFileMerger*)
next()) ) {
428 if (info->NeedFinalMerge())
Merge incrementally all type of objects.
void SetBufferOffset(Int_t offset=0)
void parallelMergeServer(bool cache=false)
virtual int GetPid()
Get process id.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
virtual TList * GetListOfKeys() const
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
virtual ULong_t Hash() const
Return hash value for this object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
virtual void ReadTString(TString &s)
Read TString from TBuffer.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual TList * GetList() const
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
static const char * filename()
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
A TMemFile is like a normal TFile except that it reads and writes only from memory.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
UInt_t GetBytesSent() const
UInt_t GetBytesRecv() const
virtual TKey * GetKey(const char *, Short_t=9999) const
virtual void ReadLong64(Long64_t &l)
const int kReplaceImmediately
THashTable implements a hash table to store TObject's.
virtual void ReadInt(Int_t &i)
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
#define ClassDef(name, id)
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
std::map< std::string, std::string >::const_iterator iter
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
void Info(const char *location, const char *msgfmt,...)
Merge the input file with the content of the output file (if already exising).
Book space in a file, create I/O buffers, to fill them, (un)compress them.
virtual void Delete(Option_t *option="")
Delete an object from the file.
void Error(const char *location, const char *msgfmt,...)
static Bool_t R__NeedInitialMerge(TDirectory *dir)
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual TFile * GetFile() const
virtual void Close(Option_t *opt="")
Close the socket.
Double_t length(const TVector2 &v)
R__EXTERN TSystem * gSystem
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
This class provides file copy and merging services.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Bool_t TestBit(UInt_t f) const
virtual const char * GetName() const
Returns name of object.
The ROOT global object gROOT contains a list of all defined classes.
virtual void SaveSelf(Bool_t=kFALSE)
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual const char * GetName() const
Returns name of object.
Describe directory structure in memory.
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
The TTimeStamp encapsulates seconds and ns since EPOCH.
void Add(TObject *obj)
Add object to the hash table.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Mother of all ROOT objects.
Double_t AsDouble() const
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
TObject * FindObject(const char *name) const
Find object using its name.
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Double_t Sqrt(Double_t x)
TClass * GetClass() const
Only the objects with a MergeAfterReset member function.
A cache when writing files over the network.