Logo ROOT  
Reference Guide
parallelMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can accept connections while handling currently open connections.

Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x hserv2.C
  • Execute in the second and third windows: .x hclient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "THashTable.h"
#include "TMath.h"
#include "TTimeStamp.h"
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
#include "TKey.h"
static Bool_t R__NeedInitialMerge(TDirectory *dir)
if (dir==0) return kFALSE;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
if (R__NeedInitialMerge(subdir)) {
return kTRUE;
} else {
if (0 != cl->GetResetAfterMerge()) {
return kTRUE;
return kFALSE;
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
if (dir==0) return;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
} else {
Bool_t todelete = kFALSE;
if (withReset) {
todelete = (0 != cl->GetResetAfterMerge());
} else {
todelete = (0 == cl->GetResetAfterMerge());
if (todelete) {
delete key;
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
if (destination==0 || source==0) return;
TIter nextkey(source->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
if (!source_subdir) {
source_subdir = (TDirectory *)key->ReadObj();
TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
if (!destination_subdir) {
destination_subdir = destination->mkdir(key->GetName());
} else {
TKey *oldkey = destination->GetKey(key->GetName());
if (oldkey) {
delete oldkey;
TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
if (destination->GetFile()->TestBit(TFile::kWriteError)) {
struct ClientInfo
TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
TString fLocalName;
UInt_t fContactsCount;
TTimeStamp fLastContact;
Double_t fTimeSincePrevContact;
ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
void Set(TFile *file)
// Register the new file as coming from this client.
if (file != fFile) {
// We need to keep any of the keys from the previous file that
// are not in the new file.
if (fFile) {
// delete the previous memory file (if any)
delete file;
} else {
fFile = file;
fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
fLastContact = now;
struct ParallelFileMerger : public TObject
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact; //
UInt_t fNClientsContact; //
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
// Default constructor.
if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
// Destructor.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
delete iter->fFile;
ULong_t Hash() const
// Return hash value for this object.
return fFilename.Hash();
const char *GetName() const
// Return the name of the object which is the name of the output file.
return fFilename;
Bool_t InitialMerge(TFile *input)
// Initial merge of the input to copy the resetable object (TTree) into the output
// and remove them from the input file.
return result;
Bool_t Merge()
// Merge the current inputs into the output file.
R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
// Remove any 'resetable' object (like TTree) from the input file so that they will not
// be re-merged. Keep only the object that always need to be re-merged (Histograms).
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
if (fClients[f].fFile) {
} else {
// We back up the file (probably due to memory constraint)
TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
delete file;
fLastMerge = TTimeStamp();
fNClientsContact = 0;
return result;
Bool_t NeedFinalMerge()
// Return true, if there is any data that has not been merged.
return fClientsContact.CountBits() > 0;
Bool_t NeedMerge(Float_t clientThreshold)
// Return true, if enough client have reported
if (fClients.size()==0) {
return kFALSE;
// Calculate average and rms of the time between the last 2 contacts.
Double_t sum2 = 0;
for(unsigned int c = 0 ; c < fClients.size(); ++c) {
sum += fClients[c].fTimeSincePrevContact;
sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
Double_t avg = sum / fClients.size();
Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
Double_t target = avg + 2*sigma;
if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
// Float_t cut = clientThreshold * fClients.size();
// if (!(fClientsContact.CountBits() > cut )) {
// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
// }
// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
// }
return kTRUE;
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
void RegisterClient(UInt_t clientId, TFile *file)
// Register that a client has sent a file.
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
if (!ss->IsValid()) {
TMonitor *mon = new TMonitor;
UInt_t clientCount = 0;
UInt_t clientIndex = 0;
THashTable mergers;
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
printf("fastMergeServerHist ready to accept connections\n");
while (1) {
TMessage *mess;
// NOTE: this needs to be update to handle the case where the client
// dies.
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientIndex, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
printf("Accept %d connections\n",clientCount);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
// Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
if (R__NeedInitialMerge(transient)) {
if (info->NeedMerge(clientThreshold)) {
// Enough clients reported.
Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
delete mess;
TIter next(&mergers);
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
delete mon;
delete ss;
void Class()
Definition: Class.C:29
Definition: MessageTypes.h:34
Definition: MessageTypes.h:31
Definition: MessageTypes.h:35
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
int Int_t
Definition: RtypesCore.h:45
unsigned int UInt_t
Definition: RtypesCore.h:46
const Bool_t kFALSE
Definition: RtypesCore.h:92
unsigned long ULong_t
Definition: RtypesCore.h:55
bool Bool_t
Definition: RtypesCore.h:63
double Double_t
Definition: RtypesCore.h:59
long long Long64_t
Definition: RtypesCore.h:73
float Float_t
Definition: RtypesCore.h:57
const Bool_t kTRUE
Definition: RtypesCore.h:91
#define ClassDef(name, id)
Definition: Rtypes.h:325
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition: TError.cxx:220
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
R__EXTERN TSystem * gSystem
Definition: TSystem.h:559
Container of bits.
Definition: TBits.h:26
void Clear(Option_t *option="")
Clear the value.
Definition: TBits.cxx:84
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition: TBits.cxx:118
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:205
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
Definition: TBufferFile.h:399
void ReadLong64(Long64_t &l) override
Definition: TBufferFile.h:426
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:93
Int_t Length() const
Definition: TBuffer.h:100
char * Buffer() const
Definition: TBuffer.h:96
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition: TClass.h:80
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition: TClass.cxx:7413
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
Definition: TClass.cxx:4851
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2957
Describe directory structure in memory.
Definition: TDirectory.h:45
virtual TList * GetList() const
Definition: TDirectory.h:176
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
Definition: TDirectory.cxx:407
virtual TFile * GetFile() const
Definition: TDirectory.h:174
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:175
virtual void SaveSelf(Bool_t=kFALSE)
Definition: TDirectory.h:208
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:177
A cache when writing files over the network.
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition: TFileMerger.h:92
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition: TFileMerger.h:88
@ kIncremental
Merge the input file with the content of the output file (if already exising).
Definition: TFileMerger.h:71
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition: TFileMerger.h:72
@ kAllIncremental
Merge incrementally all type of objects.
Definition: TFileMerger.h:77
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:54
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition: TFile.cxx:2333
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3997
@ kWriteError
Definition: TFile.h:186
THashTable implements a hash table to store TObject's.
Definition: THashTable.h:35
void Add(TObject *obj)
Add object to the hash table.
Definition: THashTable.cxx:92
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashTable.cxx:238
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
Definition: THashTable.cxx:220
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
virtual void Delete(Option_t *option="")
Delete an object from the file.
Definition: TKey.cxx:538
Int_t GetObjlen() const
Definition: TKey.h:88
virtual const char * GetClassName() const
Definition: TKey.h:76
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
Definition: TKey.cxx:1450
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:750
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:822
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:578
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
UInt_t What() const
Definition: TMessage.h:75
TClass * GetClass() const
Definition: TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Mother of all ROOT objects.
Definition: TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:359
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:187
virtual ULong_t Hash() const
Return hash value for this object.
Definition: TObject.cxx:435
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:389
virtual Bool_t IsValid() const
Definition: TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:136
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition: TString.cxx:658
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2309
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:708
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
Double_t AsDouble() const
Definition: TTimeStamp.h:138
const Double_t sigma
static const std::string transient("transient")
static constexpr double s
Double_t Sqrt(Double_t x)
Definition: TMath.h:691
Definition: file.py:1
static uint64_t sum(uint64_t i)
Definition: Factory.cxx:2345
Fons Rademakers

Definition in file parallelMergeServer.C.