Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TParallelMergingFile.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Philippe Canal October 2011.
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TParallelMergingFile //
15// //
16// Specialization of TMemFile to connect to a parallel file merger. //
17// Upon a call to UploadAndReset, the content already written to the //
18// file is upload to the server and the object implementing the function//
19// ResetAfterMerge (like TTree) are reset. //
20// The parallel file merger will then collate the information coming //
21// from this client and any other client in to the file described by //
22// the filename of this object. //
23// //
24//////////////////////////////////////////////////////////////////////////
25
27#include "TSocket.h"
28#include "TArrayC.h"
29
30////////////////////////////////////////////////////////////////////////////////
31/// Constructor.
32/// We do no yet open any connection to the server. This will be done at the
33/// time the first upload will be requested.
34
35TParallelMergingFile::TParallelMergingFile(const char *filename, Option_t *option /* = "" */,
36 const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37 TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38{
39 TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40 if (serverurl.Length()) {
41 serverurl.ReplaceAll("pmerge=","pmerge://");
42 fServerLocation = TUrl(serverurl);
43 }
44}
45
46////////////////////////////////////////////////////////////////////////////////
47/// Destructor.
48
50{
51 // We need to call Close, right here so that it is executed _before_
52 // the data member of TParallelMergingFile are destructed.
53 Close();
54 delete fClassSent;
55}
56
57////////////////////////////////////////////////////////////////////////////////
58
60{
61 TMemFile::Close(option);
62 if (fSocket) {
63 if (0==fSocket->Send("Finished")) { // tell server we are finished
64 Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65 }
66 fSocket->Close();
67 delete fSocket;
68 }
69 fSocket = 0;
70}
71
72////////////////////////////////////////////////////////////////////////////////
73/// Upload the current file data to the merging server.
74/// Reset the file and return true in case of success.
75
77{
78 // Open connection to server
79 if (fSocket == 0) {
80 const char *host = fServerLocation.GetHost();
82 if (host == 0 || host[0] == '\0') {
83 host = "localhost";
84 }
85 if (port <= 0) {
86 port = 1095;
87 }
88 fSocket = new TSocket(host,port);
89 if (!fSocket->IsValid()) {
90 Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
91 delete fSocket;
92 fSocket = 0;
93 return kFALSE;
94 }
95 // Wait till we get the start message
96 // server tells us who we are
97 Int_t kind;
98 Int_t n = fSocket->Recv(fServerIdx, kind);
99
100 if (n < 0 && kind != 0 /* kStartConnection */)
101 {
102 Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
103 delete fSocket;
104 fSocket = 0;
105 return kTRUE;
106 }
107 n = fSocket->Recv(fServerVersion, kind);
108 if (n < 0 && kind != 1 /* kProtocol */)
109 {
110 Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
111 } else {
112 Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
113 }
115 }
116
117 fMessage.Reset(kMESS_ANY); // re-use TMessage object
122
123 // FIXME: CXX17: Use init-statement in if to declare `error` variable
124 int error;
125 if ((error = fSocket->Send(fMessage)) <= 0) {
126 Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
127 delete fSocket;
128 fSocket = 0;
129 return kFALSE;
130 }
131
132 // Record the StreamerInfo we sent over.
133 Int_t isize = fClassIndex->GetSize();
134 if (!fClassSent) {
135 fClassSent = new TArrayC(isize);
136 } else {
137 if (isize > fClassSent->GetSize()) {
138 fClassSent->Set(isize);
139 }
140 }
141 for(Int_t c = 0; c < isize; ++c) {
142 if (fClassIndex->fArray[c]) {
143 fClassSent->fArray[c] = 1;
144 }
145 }
147
148 return kTRUE;
149}
150
151////////////////////////////////////////////////////////////////////////////////
152/// Write memory objects to this file and upload them to the parallel merge server.
153/// Then reset all the resetable object (those with a ResetAfterMerge routine,
154/// like TTree).
155///
156/// Loop on all objects in memory (including subdirectories).
157/// A new key is created in the KEYS linked list for each object.
158/// The list of keys is then saved on the file (via WriteKeys)
159/// as a single data record.
160/// For values of opt see TObject::Write().
161/// The directory header info is rewritten on the directory header record.
162/// The linked list of FREE segments is written.
163/// The file header is written (bytes 1->fBEGIN).
164
166{
167 Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
168 if (nbytes) {
170 }
171 return nbytes;
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// One can not save a const TDirectory object.
176
177Int_t TParallelMergingFile::Write(const char *n, Int_t opt, Int_t bufsize) const
178{
179 Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
180 return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
181}
182
183////////////////////////////////////////////////////////////////////////////////
184/// Write the list of TStreamerInfo as a single object in this file
185/// The class Streamer description for all classes written to this file
186/// is saved. See class TStreamerInfo.
187
189{
190 if (!fWritable) return;
191 if (!fClassIndex) return;
192 //no need to update the index if no new classes added to the file
193 if (fClassIndex->fArray[0] == 0) return;
194
195 // clear fClassIndex for anything we already sent.
196 if (fClassSent) {
197 Int_t isize = fClassIndex->GetSize();
198 Int_t ssize = fClassSent->GetSize();
199 for(Int_t c = 0; c < isize && c < ssize; ++c) {
200 if (fClassSent->fArray[c]) {
201 fClassIndex->fArray[c] = 0;
202 }
203 }
204 }
205
207}
@ kMESS_ANY
@ kMESS_OBJECT
#define c(i)
Definition RSha256.hxx:101
const Bool_t kFALSE
Definition RtypesCore.h:92
const Bool_t kTRUE
Definition RtypesCore.h:91
const char Option_t
Definition RtypesCore.h:66
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:220
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:187
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:245
Array of chars or bytes (8 bits per element).
Definition TArrayC.h:27
void Set(Int_t n)
Set size of this array to n chars.
Definition TArrayC.cxx:105
Char_t * fArray
Definition TArrayC.h:30
Int_t GetSize() const
Definition TArray.h:47
void WriteLong64(Long64_t l) override
void WriteTString(const TString &s) override
Write TString to TBuffer.
void WriteInt(Int_t i) override
Bool_t fWritable
True if directory is writable.
virtual void Close(Option_t *option="")
Delete all objects from memory and directory structure itself.
virtual Int_t Write(const char *=nullptr, Int_t=0, Int_t=0) override
Write this object to the current directory.
Definition TDirectory.h:217
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition TFile.h:95
TUrl fUrl
!URL of file
Definition TFile.h:111
virtual Long64_t GetEND() const
Definition TFile.h:223
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition TFile.cxx:3706
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
Definition TMemFile.cxx:255
void ResetAfterMerge(TFileMergeInfo *) override
Wipe all the data from the permanent buffer but keep, the in-memory object alive.
Definition TMemFile.cxx:320
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:116
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:179
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file and upload them to the parallel merge server.
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Constructor.
virtual void Close(Option_t *option="")
Close a file.
Bool_t UploadAndReset()
Upload the current file data to the merging server.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
Ssiz_t Length() const
Definition TString.h:410
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition TString.h:692
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetHost() const
Definition TUrl.h:67
const char * GetOptions() const
Definition TUrl.h:71
Int_t GetPort() const
Definition TUrl.h:78
const Int_t n
Definition legend1.C:16