Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
parallelMergeServer.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_net
3/// This script shows how to make a simple iterative server that
4/// can accept connections while handling currently open connections.
5/// Compare this script to hserv.C that blocks on accept.
6/// In this script a server socket is created and added to a monitor.
7/// A monitor object is used to monitor connection requests on
8/// the server socket. After accepting the connection
9/// the new socket is added to the monitor and immediately ready
10/// for use. Once two connections are accepted the server socket
11/// is removed from the monitor and closed. The monitor continues
12/// monitoring the sockets.
13///
14/// To run this demo do the following:
15/// - Open three windows
16/// - Start ROOT in all three windows
17/// - Execute in the first window: .x parallelMergeServer.C
18/// - Execute in the second and third windows: .x parallelMergeClient.C("<socket path printed by the server>")
19///
20/// \macro_code
21///
22/// \author Fons Rademakers
23
24#include "TMessage.h"
25#include "TBenchmark.h"
26#include "TSocket.h"
27#include "TH2.h"
28#include "TTree.h"
29#include "TMemFile.h"
30#include "TRandom.h"
31#include "TError.h"
32#include "TFileMerger.h"
33
34#include "TServerSocket.h"
35#include "TPad.h"
36#include "TCanvas.h"
37#include "TMonitor.h"
38
39#include "TFileCacheWrite.h"
40#include "TSystem.h"
41#include "THashTable.h"
42
43#include "TMath.h"
44#include "TTimeStamp.h"
45
46const int kIncremental = 0;
47const int kReplaceImmediately = 1;
48const int kReplaceWait = 2;
49
50#include "TKey.h"
52{
53
54 if (dir==nullptr) return kFALSE;
55
56 TIter nextkey(dir->GetListOfKeys());
57 TKey *key;
58 while( (key = (TKey*)nextkey()) ) {
61 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
62 if (!subdir) {
63 subdir = (TDirectory *)key->ReadObj();
64 }
66 return kTRUE;
67 }
68 } else {
69 if (nullptr != cl->GetResetAfterMerge()) {
70 return kTRUE;
71 }
72 }
73 }
74 return kFALSE;
75}
76
78{
79 if (dir==nullptr) return;
80
81 TIter nextkey(dir->GetListOfKeys());
82 TKey *key;
83 while( (key = (TKey*)nextkey()) ) {
86 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
87 if (!subdir) {
88 subdir = (TDirectory *)key->ReadObj();
89 }
91 } else {
93 if (withReset) {
94 todelete = (nullptr != cl->GetResetAfterMerge());
95 } else {
96 todelete = (nullptr == cl->GetResetAfterMerge());
97 }
98 if (todelete) {
99 key->Delete();
100 dir->GetListOfKeys()->Remove(key);
101 delete key;
102 }
103 }
104 }
105}
106
108{
109 if (destination==nullptr || source==nullptr) return;
110
111 TIter nextkey(source->GetListOfKeys());
112 TKey *key;
113 while( (key = (TKey*)nextkey()) ) {
115 if (cl->InheritsFrom(TDirectory::Class())) {
116 TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117 if (!source_subdir) {
118 source_subdir = (TDirectory *)key->ReadObj();
119 }
120 TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121 if (!destination_subdir) {
122 destination_subdir = destination->mkdir(key->GetName());
123 }
125 } else {
126 TKey *oldkey = destination->GetKey(key->GetName());
127 if (oldkey) {
128 oldkey->Delete();
129 delete oldkey;
130 }
131 TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
132 destination->GetFile()->SumBuffer(newkey->GetObjlen());
133 newkey->WriteFile(0);
134 if (destination->GetFile()->TestBit(TFile::kWriteError)) {
135 return;
136 }
137 }
138 }
139 destination->SaveSelf();
140}
141
142struct ClientInfo
143{
144 TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
149
150 ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151 ClientInfo(const char *filename, UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
152 fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
153 }
154
155 void Set(TFile *file)
156 {
157 // Register the new file as coming from this client.
158 if (file != fFile) {
159 // We need to keep any of the keys from the previous file that
160 // are not in the new file.
161 if (fFile) {
162 R__MigrateKey(fFile,file);
163 // delete the previous memory file (if any)
164 delete file;
165 } else {
166 fFile = file;
167 }
168 }
170 fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
173 }
174};
175
176struct ParallelFileMerger : public TObject
177{
178 typedef std::vector<ClientInfo> ClientColl_t;
179
180 TString fFilename;
183 ClientColl_t fClients;
185 TFileMerger fMerger;
186
187 ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
188 {
189 // Default constructor.
190
191 fMerger.SetPrintLevel(0);
192 fMerger.OutputFile(filename,"RECREATE");
193 if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
194 }
195
196 ~ParallelFileMerger() override
197 {
198 // Destructor.
199
200 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
201 fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
202 }
203 for( ClientColl_t::iterator iter = fClients.begin();
204 iter != fClients.end();
205 ++iter)
206 {
207 delete iter->fFile;
208 }
209 }
210
211 ULong_t Hash() const override
212 {
213 // Return hash value for this object.
214 return fFilename.Hash();
215 }
216
217 const char *GetName() const override
218 {
219 // Return the name of the object which is the name of the output file.
220 return fFilename;
221 }
222
224 {
225 // Initial merge of the input to copy the resetable object (TTree) into the output
226 // and remove them from the input file.
227
228 fMerger.AddFile(input);
229
231
233 return result;
234 }
235
236 Bool_t Merge()
237 {
238 // Merge the current inputs into the output file.
239
240 R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
241 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
242 fMerger.AddFile(fClients[f].fFile);
243 }
245
246 // Remove any 'resetable' object (like TTree) from the input file so that they will not
247 // be re-merged. Keep only the object that always need to be re-merged (Histograms).
248 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
249 if (fClients[f].fFile) {
250 R__DeleteObject(fClients[f].fFile,kTRUE);
251 } else {
252 // We back up the file (probably due to memory constraint)
253 TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
254 R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
255 file->Write();
256 delete file;
257 }
258 }
261 fClientsContact.Clear();
262
263 return result;
264 }
265
267 {
268 // Return true, if there is any data that has not been merged.
269
270 return fClientsContact.CountBits() > 0;
271 }
272
274 {
275 // Return true, if enough client have reported
276
277 if (fClients.empty()) {
278 return kFALSE;
279 }
280
281 // Calculate average and rms of the time between the last 2 contacts.
282 Double_t sum = 0;
283 Double_t sum2 = 0;
284 for(unsigned int c = 0 ; c < fClients.size(); ++c) {
285 sum += fClients[c].fTimeSincePrevContact;
286 sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
287 }
288 Double_t avg = sum / fClients.size();
289 Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290 Double_t target = avg + 2*sigma;
292 if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
293// Float_t cut = clientThreshold * fClients.size();
294// if (!(fClientsContact.CountBits() > cut )) {
295// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
296// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
297// }
298// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
299// }
300 return kTRUE;
301 }
302 Float_t cut = clientThreshold * fClients.size();
303 return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
304 }
305
307 {
308 // Register that a client has sent a file.
309
311 fClientsContact.SetBitNumber(clientId);
312 if (fClients.size() < clientId+1) {
313 fClients.push_back( ClientInfo(fFilename,clientId) );
314 }
315 fClients[clientId].Set(file);
316 }
317
319};
320
321void parallelMergeServer(bool cache = false) {
322 // Open a server socket looking for connections on a named service
323 TString socketPath = "rootserv."; // prefix for temporary file in the temp folder
324 // Get a unique, temporary file name for the socket. We remove and close the file
325 // immediatly in order to reopen it as a socket. There is a race here: between
326 // the removal and the creation of the socket, the file could have been recreated.
327 // But it is unlikely (due to the random letters in the name) and harmless: the socket
328 // cannot be created in this case.
330 if (!dummy) {
331 Error("fastMergeServer", "Cannot create temporary file for socket\n");
332 return;
333 }
334
335 std::string strSocketPath(socketPath.View());
336 remove(strSocketPath.c_str());
337 fclose(dummy);
339 if (!ss->IsValid()) {
340 return;
341 }
342
343 TMonitor *mon = new TMonitor;
344
345 mon->Add(ss);
346
349
351
352 enum StatusKind {
354 kProtocol = 1,
355
357 };
358
359 printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
360 while (true) {
361 TMessage *mess;
362 TSocket *s;
363
364 // NOTE: this needs to be update to handle the case where the client
365 // dies.
366 s = mon->Select();
367
368 if (s->IsA() == TServerSocket::Class()) {
369 if (clientCount > 100) {
370 printf("only accept 100 clients connections\n");
371 mon->Remove(ss);
372 ss->Close();
373 } else {
374 TSocket *client = ((TServerSocket *)s)->Accept();
377 ++clientCount;
378 ++clientIndex;
379 mon->Add(client);
380 printf("Accept %d connections\n",clientCount);
381 }
382 continue;
383 }
384
385 s->Recv(mess);
386
387 if (mess==nullptr) {
388 Error("fastMergeServer","The client did not send a message\n");
389 } else if (mess->What() == kMESS_STRING) {
390 char str[64];
391 mess->ReadString(str, 64);
392 printf("Client %d: %s\n", clientCount, str);
393 mon->Remove(s);
394 printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
395 s->GetBytesSent());
396 s->Close();
397 --clientCount;
398 if (mon->GetActive() == 0 || clientCount == 0) {
399 printf("No more active clients... stopping\n");
400 break;
401 }
402 } else if (mess->What() == kMESS_ANY) {
403
407 mess->ReadInt(clientId);
408 mess->ReadTString(filename);
409 mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
410
411 // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
412
413 TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
414 mess->SetBufferOffset(mess->Length()+length);
415
416 const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
417
419 if (!info) {
420 info = new ParallelFileMerger(filename,cache);
421 mergers.Add(info);
422 }
423
424 if (R__NeedInitialMerge(transient)) {
425 info->InitialMerge(transient);
426 }
427 info->RegisterClient(clientId,transient);
428 if (info->NeedMerge(clientThreshold)) {
429 // Enough clients reported.
430 Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
431 info->Merge();
432 }
433 transient = nullptr;
434 } else if (mess->What() == kMESS_OBJECT) {
435 printf("got object of class: %s\n", mess->GetClass()->GetName());
436 } else {
437 printf("*** Unexpected message ***\n");
438 }
439
440 delete mess;
441 }
442
443 TIter next(&mergers);
445 while ( (info = (ParallelFileMerger*)next()) ) {
446 if (info->NeedFinalMerge())
447 {
448 info->Merge();
449 }
450 }
451
452 mergers.Delete();
453 delete mon;
454 remove(strSocketPath.c_str());
455 delete ss;
456}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
bool Bool_t
Definition RtypesCore.h:63
int Int_t
Definition RtypesCore.h:45
unsigned long ULong_t
Definition RtypesCore.h:55
unsigned int UInt_t
Definition RtypesCore.h:46
float Float_t
Definition RtypesCore.h:57
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
#define ClassDefOverride(name, id)
Definition Rtypes.h:341
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t target
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
UInt_t Hash(const TString &s)
Definition TString.h:494
R__EXTERN TSystem * gSystem
Definition TSystem.h:566
const_iterator begin() const
const_iterator end() const
Container of bits.
Definition TBits.h:26
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:81
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition TClass.cxx:7457
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
Definition TClass.cxx:4887
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition TClass.cxx:2968
Describe directory structure in memory.
Definition TDirectory.h:45
static TClass * Class()
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition TFileMerger.h:92
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition TFileMerger.h:88
@ kIncremental
Merge the input file with the content of the output file (if already existing).
Definition TFileMerger.h:71
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition TFileMerger.h:72
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:77
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4094
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsiz=0) override
Write memory objects to this file.
Definition TFile.cxx:2441
@ kWriteError
Definition TFile.h:194
THashTable implements a hash table to store TObject's.
Definition THashTable.h:35
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
void Delete(Option_t *option="") override
Delete an object from the file.
Definition TKey.cxx:542
virtual const char * GetClassName() const
Definition TKey.h:75
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:762
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
static TClass * Class()
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
TClass * IsA() const override
Definition TSocket.h:171
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:677
virtual FILE * TempFileName(TString &base, const char *dir=nullptr, const char *suffix=nullptr)
Create a secure temporary file by appending a unique 6 letter string to base.
Definition TSystem.cxx:1499
virtual int GetPid()
Get process id.
Definition TSystem.cxx:707
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:45
const Double_t sigma
Double_t Sqrt(Double_t x)
Returns the square root of x.
Definition TMath.h:662
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2345