Logo ROOT   6.08/07
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "MPCode.h"
13 #include "MPSendRecv.h"
14 #include "TEnv.h"
15 #include "TError.h"
16 #include "TMPWorker.h"
17 #include "TSystem.h"
18 #include <memory> //unique_ptr
19 #include <string>
20 
21 #include <iostream>
22 
23 //////////////////////////////////////////////////////////////////////////
24 ///
25 /// \class TMPWorker
26 ///
27 /// This class works in conjuction with TMPClient, reacting to messages
28 /// received from it as specified by the Notify and HandleInput methods.
29 /// When TMPClient::Fork is called, a TMPWorker instance is passed to it
30 /// which will take control of the ROOT session in the children processes.
31 ///
32 /// After forking, every time a message is sent or broadcast to the workers,
33 /// TMPWorker::Notify is called and the message is retrieved.
34 /// Messages exchanged between TMPClient and TMPWorker should be sent with
35 /// the MPSend() standalone function.\n
36 /// If the code of the message received is above 1000 (i.e. it is an MPCode)
37 /// the qualified TMPWorker::HandleInput method is called, that takes care
38 /// of handling the most generic type of messages. Otherwise the unqualified
39 /// (possibly overridden) version of HandleInput is called, allowing classes
40 /// that inherit from TMPWorker to manage their own protocol.\n
41 /// An application's worker class should inherit from TMPWorker and implement
42 /// a HandleInput method that overrides TMPWorker's.\n
43 ///
44 //////////////////////////////////////////////////////////////////////////
45 
46 
47 
48 //////////////////////////////////////////////////////////////////////////
49 /// Class constructors.
50 /// Note that this does not set variables like fPid or fS (worker's socket).\n
51 /// These operations are handled by the Init method, which is called after
52 /// forking.\n
53 /// This separation is in place because the instantiation of a worker
54 /// must be done once _before_ forking, while the initialization of the
55 /// members must be done _after_ forking by each of the children processes.
57  : fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr),
58  fNWorkers(0), fMaxNEntries(0),
59  fProcessedEntries(0), fS(), fPid(0), fNWorker(0),
60  fTreeCache(0), fTreeCacheIsLearning(kFALSE),
61  fUseTreeCache(kTRUE), fCacheSize(-1)
62 {
63  Setup();
64 }
65 
66 TMPWorker::TMPWorker(const std::vector<std::string>& fileNames,
67  const std::string& treeName,
68  unsigned nWorkers, ULong64_t maxEntries)
69  : fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
70  fNWorkers(nWorkers), fMaxNEntries(maxEntries),
71  fProcessedEntries(0), fS(), fPid(0), fNWorker(0),
74 {
75  Setup();
76 }
77 
78 TMPWorker::TMPWorker(TTree *tree, unsigned nWorkers, ULong64_t maxEntries)
79  : fFileNames(), fTreeName(), fTree(tree), fFile(nullptr),
80  fNWorkers(nWorkers), fMaxNEntries(maxEntries),
81  fProcessedEntries(0), fS(), fPid(0), fNWorker(0),
84 {
85  Setup();
86 }
87 
89 {
90  // Properly close the open file, if any
91  CloseFile();
92 }
93 
94 //////////////////////////////////////////////////////////////////////////
95 /// Auxilliary method for common initializations
97 {
98  Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 0);
99  if (uc != 1) fUseTreeCache = kFALSE;
100  fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
101 }
102 
103 //////////////////////////////////////////////////////////////////////////
104 /// This method is called by children processes right after forking.
105 /// Initialization of worker properties that must be delayed until after
106 /// forking must be done here.\n
107 /// For example, Init saves the pid into fPid, and adds the TMPWorker to
108 /// the main eventloop (as a TFileHandler).\n
109 /// Make sure this operations are performed also by overriding implementations,
110 /// e.g. by calling TMPWorker::Init explicitly.
111 void TMPWorker::Init(int fd, unsigned workerN)
112 {
113  fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
114  fPid = getpid();
115  fNWorker = workerN;
116  fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
117 }
118 
119 
121 {
122  while(true) {
123  MPCodeBufPair msg = MPRecv(fS.get());
124  if (msg.first == MPCode::kRecvError) {
125  Error("TMPWorker::Run", "Lost connection to client\n");
126  gSystem->Exit(0);
127  }
128 
129  if (msg.first < 1000)
130  HandleInput(msg); //call overridden method
131  else
132  TMPWorker::HandleInput(msg); //call this class' method
133  }
134 }
135 
136 
137 //////////////////////////////////////////////////////////////////////////
138 /// Handle a message with an EMPCode.
139 /// This method is called upon receiving a message with a code >= 1000 (i.e.
140 /// EMPCode). It handles the most generic types of messages.\n
141 /// Classes inheriting from TMPWorker should implement their own HandleInput
142 /// function, that should be able to handle codes specific to that application.\n
143 /// The appropriate version of the HandleInput method (TMPWorker's or the
144 /// overriding version) is automatically called depending on the message code.
146 {
147  unsigned code = msg.first;
148 
149  std::string reply = fId;
150  if (code == MPCode::kMessage) {
151  //general message, ignore it
152  reply += ": ok";
153  MPSend(fS.get(), MPCode::kMessage, reply.data());
154  } else if (code == MPCode::kError) {
155  //general error, ignore it
156  reply += ": ko";
157  MPSend(fS.get(), MPCode::kMessage, reply.data());
158  } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
159  //client is asking the server to shutdown or client is dying
160  MPSend(fS.get(), MPCode::kShutdownNotice, reply.data());
161  gSystem->Exit(0);
162  } else {
163  reply += ": unknown code received. code=" + std::to_string(code);
164  MPSend(fS.get(), MPCode::kError, reply.data());
165  }
166 }
167 
168 
169 //////////////////////////////////////////////////////////////////////////
170 /// Handle file closing.
171 
173 {
174  // Avoid destroying the cache; must be placed before deleting the trees
175  if (fFile) {
176  if (fTree) fFile->SetCacheRead(0, fTree);
177  delete fFile ;
178  fFile = 0;
179  }
180 }
181 
182 //////////////////////////////////////////////////////////////////////////
183 /// Handle file opening.
184 
185 TFile *TMPWorker::OpenFile(const std::string& fileName)
186 {
187 
188  TFile *fp = TFile::Open(fileName.c_str());
189  if (fp == nullptr || fp->IsZombie()) {
190  std::stringstream ss;
191  ss << "could not open file " << fileName;
192  std::string errmsg = ss.str();
194  return nullptr;
195  }
196 
197  return fp;
198 }
199 
200 
201 //////////////////////////////////////////////////////////////////////////
202 /// Retrieve a tree from an open file.
203 
205 {
206  //retrieve the TTree with the specified name from file
207  //we are not the owner of the TTree object, the file is!
208  TTree *tree = nullptr;
209  if(fTreeName == "") {
210  // retrieve the first TTree
211  // (re-adapted from TEventIter.cxx)
212  if (fp->GetListOfKeys()) {
213  for(auto k : *fp->GetListOfKeys()) {
214  TKey *key = static_cast<TKey*>(k);
215  if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
216  tree = static_cast<TTree*>(fp->Get(key->GetName()));
217  }
218  }
219  } else {
220  tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
221  }
222  if (tree == nullptr) {
223  std::stringstream ss;
224  ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
225  std::string errmsg = ss.str();
227  return nullptr;
228  }
229 
230  return tree;
231 }
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// Tree cache handling
235 
237 {
238  if (fUseTreeCache) {
239  TFile *curfile = tree->GetCurrentFile();
240  if (curfile) {
241  if (!fTreeCache) {
242  tree->SetCacheSize(fCacheSize);
243  fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
244  if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
245  } else {
247  curfile->SetCacheRead(fTreeCache, tree);
248  fTreeCache->UpdateBranches(tree);
249  }
250  if (fTreeCache) {
253  Info("SetupTreeCache","the tree cache is in learning phase");
254  }
255  } else {
256  Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
257  }
258  } else {
259  // Disable the cache
260  tree->SetCacheSize(0);
261  }
262 }
263 
264 
265 //////////////////////////////////////////////////////////////////////////
266 /// Error sender
267 
268 void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
269 {
270  std::string reply = fId + ": " + errmsg;
271  MPSend(GetSocket(), errcode, reply.data());
272 }
pid_t GetPid()
Definition: TMPWorker.h:46
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Bool_t fTreeCacheIsLearning
Definition: TMPWorker.h:76
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:145
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:70
TFile * OpenFile(const std::string &fileName)
Handle file opening.
Definition: TMPWorker.cxx:185
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition: TFile.cxx:2197
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:71
virtual const char * GetClassName() const
Definition: TKey.h:77
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:34
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:50
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
int Int_t
Definition: RtypesCore.h:41
Error message.
Definition: MPCode.h:30
const Bool_t kFALSE
Definition: Rtypes.h:92
Error while reading from the socket.
Definition: MPCode.h:34
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TMPWorker.h:57
std::string fTreeName
the name of the tree to be processed
Definition: TMPWorker.h:52
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:268
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3907
unsigned GetNWorker() const
Definition: TMPWorker.h:47
virtual Long64_t GetCacheSize() const
Definition: TTree.h:384
void Info(const char *location, const char *msgfmt,...)
Bool_t fUseTreeCache
Definition: TMPWorker.h:77
virtual void ResetCache()
This will simply clear the cache.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
Definition: TMPWorker.h:53
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TMPWorker.h:56
Used by the workers to notify client of shutdown.
Definition: MPCode.h:33
void Error(const char *location, const char *msgfmt,...)
void SetupTreeCache(TTree *tree)
Tree cache handling.
Definition: TMPWorker.cxx:236
std::unique_ptr< TSocket > fS
This worker&#39;s socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:69
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:31
void CloseFile()
Handle file closing.
Definition: TMPWorker.cxx:172
TFile * fFile
last open file
Definition: TMPWorker.h:54
Long64_t fCacheSize
Definition: TMPWorker.h:78
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
TMPWorker()
Class constructors.
Definition: TMPWorker.cxx:56
TTreeCache * fTreeCache
Definition: TMPWorker.h:75
unsigned fNWorkers
the number of workers spawned
Definition: TMPWorker.h:55
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition: TTree.cxx:5053
TSocket * GetSocket()
Definition: TMPWorker.h:45
void Warning(const char *location, const char *msgfmt,...)
void Run()
Definition: TMPWorker.cxx:120
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
Tell the client there was an error while processing.
Definition: PoolUtils.h:44
Bool_t IsZombie() const
Definition: TObject.h:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
unsigned long long ULong64_t
Definition: RtypesCore.h:70
virtual ~TMPWorker()
Definition: TMPWorker.cxx:88
virtual Int_t SetCacheSize(Long64_t cachesize=-1)
Set maximum size of the file cache .
Definition: TTree.cxx:8048
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TMPWorker.h:51
virtual Bool_t IsLearning() const
Definition: TTreeCache.h:91
std::string fId
identifier string in the form W<nwrk>|P<proc id>="">
Definition: TMPWorker.h:50
#define nullptr
Definition: Rtypes.h:87
virtual TList * GetListOfKeys() const
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:721
Generic message.
Definition: MPCode.h:29
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:111
Definition: tree.py:1
TFileCacheRead * GetCacheRead(TObject *tree=0) const
Return a pointer to the current read cache.
Definition: TFile.cxx:1202
A TTree object has a header with a name and a title.
Definition: TTree.h:98
const Bool_t kTRUE
Definition: Rtypes.h:91
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
void Setup()
Auxilliary method for common initializations.
Definition: TMPWorker.cxx:96
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
Definition: TMPWorker.cxx:204