Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
parallelMergeServer.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_net
3/// This script shows how to make a simple iterative server that
4/// can accept connections while handling currently open connections.
5/// Compare this script to hserv.C that blocks on accept.
6/// In this script a server socket is created and added to a monitor.
7/// A monitor object is used to monitor connection requests on
8/// the server socket. After accepting the connection
9/// the new socket is added to the monitor and immediately ready
10/// for use. Once two connections are accepted the server socket
11/// is removed from the monitor and closed. The monitor continues
12/// monitoring the sockets.
13///
14/// To run this demo do the following:
15/// - Open three windows
16/// - Start ROOT in all three windows
17/// - Execute in the first window: .x hserv2.C
18/// - Execute in the second and third windows: .x hclient.C
19///
20/// \macro_code
21///
22/// \author Fons Rademakers
23
24#include "TMessage.h"
25#include "TBenchmark.h"
26#include "TSocket.h"
27#include "TH2.h"
28#include "TTree.h"
29#include "TMemFile.h"
30#include "TRandom.h"
31#include "TError.h"
32#include "TFileMerger.h"
33
34#include "TServerSocket.h"
35#include "TPad.h"
36#include "TCanvas.h"
37#include "TMonitor.h"
38
39#include "TFileCacheWrite.h"
40#include "TSystem.h"
41#include "THashTable.h"
42
43#include "TMath.h"
44#include "TTimeStamp.h"
45
46const int kIncremental = 0;
47const int kReplaceImmediately = 1;
48const int kReplaceWait = 2;
49
50#include "TKey.h"
52{
53
54 if (dir==nullptr) return kFALSE;
55
57 TKey *key;
58 while( (key = (TKey*)nextkey()) ) {
61 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
62 if (!subdir) {
63 subdir = (TDirectory *)key->ReadObj();
64 }
66 return kTRUE;
67 }
68 } else {
69 if (nullptr != cl->GetResetAfterMerge()) {
70 return kTRUE;
71 }
72 }
73 }
74 return kFALSE;
75}
76
78{
79 if (dir==nullptr) return;
80
82 TKey *key;
83 while( (key = (TKey*)nextkey()) ) {
86 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
87 if (!subdir) {
88 subdir = (TDirectory *)key->ReadObj();
89 }
91 } else {
93 if (withReset) {
94 todelete = (nullptr != cl->GetResetAfterMerge());
95 } else {
96 todelete = (nullptr == cl->GetResetAfterMerge());
97 }
98 if (todelete) {
99 key->Delete();
100 dir->GetListOfKeys()->Remove(key);
101 delete key;
102 }
103 }
104 }
105}
106
108{
109 if (destination==nullptr || source==nullptr) return;
110
111 TIter nextkey(source->GetListOfKeys());
112 TKey *key;
113 while( (key = (TKey*)nextkey()) ) {
115 if (cl->InheritsFrom(TDirectory::Class())) {
116 TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117 if (!source_subdir) {
118 source_subdir = (TDirectory *)key->ReadObj();
119 }
120 TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121 if (!destination_subdir) {
122 destination_subdir = destination->mkdir(key->GetName());
123 }
125 } else {
126 TKey *oldkey = destination->GetKey(key->GetName());
127 if (oldkey) {
128 oldkey->Delete();
129 delete oldkey;
130 }
131 TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
132 destination->GetFile()->SumBuffer(newkey->GetObjlen());
133 newkey->WriteFile(0);
134 if (destination->GetFile()->TestBit(TFile::kWriteError)) {
135 return;
136 }
137 }
138 }
139 destination->SaveSelf();
140}
141
142struct ClientInfo
143{
144 TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
149
150 ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151 ClientInfo(const char *filename, UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
152 fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
153 }
154
155 void Set(TFile *file)
156 {
157 // Register the new file as coming from this client.
158 if (file != fFile) {
159 // We need to keep any of the keys from the previous file that
160 // are not in the new file.
161 if (fFile) {
162 R__MigrateKey(fFile,file);
163 // delete the previous memory file (if any)
164 delete file;
165 } else {
166 fFile = file;
167 }
168 }
170 fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
173 }
174};
175
176struct ParallelFileMerger : public TObject
177{
178 typedef std::vector<ClientInfo> ClientColl_t;
179
180 TString fFilename;
185 TFileMerger fMerger;
186
187 ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
188 {
189 // Default constructor.
190
191 fMerger.SetPrintLevel(0);
192 fMerger.OutputFile(filename,"RECREATE");
193 if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
194 }
195
196 ~ParallelFileMerger() override
197 {
198 // Destructor.
199
200 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
201 fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
202 }
204 iter != fClients.end();
205 ++iter)
206 {
207 delete iter->fFile;
208 }
209 }
210
211 ULong_t Hash() const override
212 {
213 // Return hash value for this object.
214 return fFilename.Hash();
215 }
216
217 const char *GetName() const override
218 {
219 // Return the name of the object which is the name of the output file.
220 return fFilename;
221 }
222
224 {
225 // Initial merge of the input to copy the resetable object (TTree) into the output
226 // and remove them from the input file.
227
228 fMerger.AddFile(input);
229
231
233 return result;
234 }
235
236 Bool_t Merge()
237 {
238 // Merge the current inputs into the output file.
239
240 R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
241 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
242 fMerger.AddFile(fClients[f].fFile);
243 }
245
246 // Remove any 'resetable' object (like TTree) from the input file so that they will not
247 // be re-merged. Keep only the object that always need to be re-merged (Histograms).
248 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
249 if (fClients[f].fFile) {
251 } else {
252 // We back up the file (probably due to memory constraint)
253 TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
254 R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
255 file->Write();
256 delete file;
257 }
258 }
261 fClientsContact.Clear();
262
263 return result;
264 }
265
267 {
268 // Return true, if there is any data that has not been merged.
269
270 return fClientsContact.CountBits() > 0;
271 }
272
274 {
275 // Return true, if enough client have reported
276
277 if (fClients.empty()) {
278 return kFALSE;
279 }
280
281 // Calculate average and rms of the time between the last 2 contacts.
282 Double_t sum = 0;
283 Double_t sum2 = 0;
284 for(unsigned int c = 0 ; c < fClients.size(); ++c) {
285 sum += fClients[c].fTimeSincePrevContact;
286 sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
287 }
288 Double_t avg = sum / fClients.size();
289 Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290 Double_t target = avg + 2*sigma;
292 if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
293// Float_t cut = clientThreshold * fClients.size();
294// if (!(fClientsContact.CountBits() > cut )) {
295// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
296// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
297// }
298// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
299// }
300 return kTRUE;
301 }
302 Float_t cut = clientThreshold * fClients.size();
303 return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
304 }
305
307 {
308 // Register that a client has sent a file.
309
311 fClientsContact.SetBitNumber(clientId);
312 if (fClients.size() < clientId+1) {
313 fClients.push_back( ClientInfo(fFilename,clientId) );
314 }
315 fClients[clientId].Set(file);
316 }
317
319};
320
321void parallelMergeServer(bool cache = false) {
322 // Open a server socket looking for connections on a named service or
323 // on a specified port.
324 //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
325 TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
326 if (!ss->IsValid()) {
327 return;
328 }
329
330 TMonitor *mon = new TMonitor;
331
332 mon->Add(ss);
333
336
338
339 enum StatusKind {
341 kProtocol = 1,
342
344 };
345
346 printf("fastMergeServerHist ready to accept connections\n");
347 while (true) {
348 TMessage *mess;
349 TSocket *s;
350
351 // NOTE: this needs to be update to handle the case where the client
352 // dies.
353 s = mon->Select();
354
355 if (s->IsA() == TServerSocket::Class()) {
356 if (clientCount > 100) {
357 printf("only accept 100 clients connections\n");
358 mon->Remove(ss);
359 ss->Close();
360 } else {
361 TSocket *client = ((TServerSocket *)s)->Accept();
364 ++clientCount;
365 ++clientIndex;
366 mon->Add(client);
367 printf("Accept %d connections\n",clientCount);
368 }
369 continue;
370 }
371
372 s->Recv(mess);
373
374 if (mess==nullptr) {
375 Error("fastMergeServer","The client did not send a message\n");
376 } else if (mess->What() == kMESS_STRING) {
377 char str[64];
378 mess->ReadString(str, 64);
379 printf("Client %d: %s\n", clientCount, str);
380 mon->Remove(s);
381 printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
382 s->GetBytesSent());
383 s->Close();
384 --clientCount;
385 if (mon->GetActive() == 0 || clientCount == 0) {
386 printf("No more active clients... stopping\n");
387 break;
388 }
389 } else if (mess->What() == kMESS_ANY) {
390
394 mess->ReadInt(clientId);
395 mess->ReadTString(filename);
396 mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
397
398 // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
399
400 TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
401 mess->SetBufferOffset(mess->Length()+length);
402
403 const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
404
406 if (!info) {
407 info = new ParallelFileMerger(filename,cache);
408 mergers.Add(info);
409 }
410
411 if (R__NeedInitialMerge(transient)) {
412 info->InitialMerge(transient);
413 }
414 info->RegisterClient(clientId,transient);
415 if (info->NeedMerge(clientThreshold)) {
416 // Enough clients reported.
417 Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
418 info->Merge();
419 }
420 transient = nullptr;
421 } else if (mess->What() == kMESS_OBJECT) {
422 printf("got object of class: %s\n", mess->GetClass()->GetName());
423 } else {
424 printf("*** Unexpected message ***\n");
425 }
426
427 delete mess;
428 }
429
430 TIter next(&mergers);
432 while ( (info = (ParallelFileMerger*)next()) ) {
433 if (info->NeedFinalMerge())
434 {
435 info->Merge();
436 }
437 }
438
439 mergers.Delete();
440 delete mon;
441 delete ss;
442}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
bool Bool_t
Definition RtypesCore.h:63
int Int_t
Definition RtypesCore.h:45
unsigned long ULong_t
Definition RtypesCore.h:55
unsigned int UInt_t
Definition RtypesCore.h:46
float Float_t
Definition RtypesCore.h:57
constexpr Bool_t kFALSE
Definition RtypesCore.h:94
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:69
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
#define ClassDefOverride(name, id)
Definition Rtypes.h:346
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t target
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
UInt_t Hash(const TString &s)
Definition TString.h:494
R__EXTERN TSystem * gSystem
Definition TSystem.h:561
const_iterator begin() const
const_iterator end() const
Container of bits.
Definition TBits.h:26
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:84
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition TClass.cxx:7589
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
Definition TClass.cxx:4971
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition TClass.cxx:3069
Describe directory structure in memory.
Definition TDirectory.h:45
static TClass * Class()
virtual TList * GetList() const
Definition TDirectory.h:222
virtual TList * GetListOfKeys() const
Definition TDirectory.h:223
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition TFileMerger.h:94
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition TFileMerger.h:90
@ kIncremental
Merge the input file with the content of the output file (if already existing).
Definition TFileMerger.h:73
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition TFileMerger.h:74
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:79
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4088
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsiz=0) override
Write memory objects to this file.
Definition TFile.cxx:2435
@ kWriteError
Definition TFile.h:194
THashTable implements a hash table to store TObject's.
Definition THashTable.h:35
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
void Delete(Option_t *option="") override
Delete an object from the file.
Definition TKey.cxx:538
virtual const char * GetClassName() const
Definition TKey.h:75
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:758
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
static TClass * Class()
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
TClass * IsA() const override
Definition TSocket.h:171
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:677
virtual int GetPid()
Get process id.
Definition TSystem.cxx:707
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:45
const Double_t sigma
std::ostream & Info()
Definition hadd.cxx:163
Double_t Sqrt(Double_t x)
Returns the square root of x.
Definition TMath.h:666
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2345