Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TParallelMergingFile.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Philippe Canal October 2011.
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TParallelMergingFile //
15// //
16// Specialization of TMemFile to connect to a parallel file merger. //
17// Upon a call to UploadAndReset, the content already written to the //
18// file is upload to the server and the object implementing the function//
19// ResetAfterMerge (like TTree) are reset. //
20// The parallel file merger will then collate the information coming //
21// from this client and any other client in to the file described by //
22// the filename of this object. //
23// //
24//////////////////////////////////////////////////////////////////////////
25
27#include "TSocket.h"
28#include "TArrayC.h"
29
30////////////////////////////////////////////////////////////////////////////////
31/// Constructor.
32/// We do no yet open any connection to the server. This will be done at the
33/// time the first upload will be requested.
34
36 const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
38{
39 TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40 if (serverurl.Length()) {
41 serverurl.ReplaceAll("pmerge=","pmerge://");
43 }
44}
45
46////////////////////////////////////////////////////////////////////////////////
47/// Destructor.
48
50{
51 // We need to call Close, right here so that it is executed _before_
52 // the data member of TParallelMergingFile are destructed.
53 Close();
54}
55
56////////////////////////////////////////////////////////////////////////////////
57
59{
61 if (fSocket) {
62 if (0==fSocket->Send("Finished")) { // tell server we are finished
63 Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
64 }
65 fSocket->Close();
66 fSocket.reset();
67 }
68}
69
70/// Attempts to connect to the server. Has no effect if the file is already connected.
71/// Returns true if the file is connected after calling this function (including if it was connected already).
72/// Note: normally it's not necessary to explicitly call this as it will be called automatically by Write().
74{
75 if (fSocket)
76 return kTRUE;
77
78 const char *path = fServerLocation.GetFile();
79 if (path && strlen(path) > 0 && path[0] == '/') {
80 // UNIX domain socket
81 fSocket.reset(new TSocket(path));
82 if (!fSocket->IsValid()) {
83 Error("UploadAndReset", "Could not contact the server %s\n", path);
84 fSocket.reset();
85 return kFALSE;
86 }
87 } else {
88 // TCP socket
89 const char *host = fServerLocation.GetHost();
91 if (host == 0 || host[0] == '\0') {
92 host = "localhost";
93 }
94 if (port <= 0) {
95 port = 1095;
96 }
97 fSocket.reset(new TSocket(host, port));
98 if (!fSocket->IsValid()) {
99 Error("UploadAndReset", "Could not contact the server %s:%d\n", host, port);
100 fSocket.reset();
101 return kFALSE;
102 }
103 }
104 // Wait till we get the start message
105 // server tells us who we are
106 Int_t kind;
107 Int_t n = fSocket->Recv(fServerIdx, kind);
108
109 if (n < 0 && kind != 0 /* kStartConnection */)
110 {
111 Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
112 fSocket.reset();
113 return kTRUE;
114 }
115 n = fSocket->Recv(fServerVersion, kind);
116 if (n < 0 && kind != 1 /* kProtocol */)
117 {
118 Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
119 } else {
120 Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
121 }
123
124 return kTRUE;
125}
126
127////////////////////////////////////////////////////////////////////////////////
128/// Upload the current file data to the merging server.
129/// Reset the file and return true in case of success.
130
132{
134
135 fMessage.Reset(kMESS_ANY); // re-use TMessage object
140
141 if (int error = fSocket->Send(fMessage); error <= 0) {
142 Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
143 fSocket.reset();
144 return kFALSE;
145 }
146
147 // Record the StreamerInfo we sent over.
149 if (!fClassSent) {
150 fClassSent.reset(new TArrayC(isize));
151 } else {
152 if (isize > fClassSent->GetSize()) {
153 fClassSent->Set(isize);
154 }
155 }
156 for(Int_t c = 0; c < isize; ++c) {
157 if (fClassIndex->fArray[c]) {
158 fClassSent->fArray[c] = 1;
159 }
160 }
162
163 return kTRUE;
164}
165
166////////////////////////////////////////////////////////////////////////////////
167/// Write memory objects to this file and upload them to the parallel merge server.
168/// Then reset all the resetable object (those with a ResetAfterMerge routine,
169/// like TTree).
170///
171/// Loop on all objects in memory (including subdirectories).
172/// A new key is created in the KEYS linked list for each object.
173/// The list of keys is then saved on the file (via WriteKeys)
174/// as a single data record.
175/// For values of opt see TObject::Write().
176/// The directory header info is rewritten on the directory header record.
177/// The linked list of FREE segments is written.
178/// The file header is written (bytes 1->fBEGIN).
179
181{
182 std::size_t prevSize = GetBytesWritten();
183 auto nbytes = TMemFile::Write(0,opt,bufsize);
184 std::size_t newSize = GetBytesWritten();
185 // NOTE: we don't rely on nbytes > 0 to do UploadAndReset() because nbytes may be 0 if the file
186 // only contains non-TObject objects (as they are not written by TMemFile::Write).
187 if (newSize > prevSize) {
189 }
190 return nbytes;
191}
192
193////////////////////////////////////////////////////////////////////////////////
194/// One can not save a const TDirectory object.
195
197{
198 Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
199 return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
200}
201
202////////////////////////////////////////////////////////////////////////////////
203/// Write the list of TStreamerInfo as a single object in this file
204/// The class Streamer description for all classes written to this file
205/// is saved. See class TStreamerInfo.
206
208{
209 if (!fWritable) return;
210 if (!fClassIndex) return;
211 //no need to update the index if no new classes added to the file
212 if (fClassIndex->fArray[0] == 0) return;
213
214 // clear fClassIndex for anything we already sent.
215 if (fClassSent) {
217 Int_t ssize = fClassSent->GetSize();
218 for(Int_t c = 0; c < isize && c < ssize; ++c) {
219 if (fClassSent->fArray[c]) {
220 fClassIndex->fArray[c] = 0;
221 }
222 }
223 }
224
226}
@ kMESS_ANY
#define c(i)
Definition RSha256.hxx:101
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:241
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:267
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Array of chars or bytes (8 bits per element).
Definition TArrayC.h:27
Char_t * fArray
Definition TArrayC.h:30
Int_t GetSize() const
Definition TArray.h:47
void WriteLong64(Long64_t l) override
void WriteTString(const TString &s) override
Write TString to TBuffer.
void WriteInt(Int_t i) override
Bool_t fWritable
True if directory is writable.
virtual void Close(Option_t *option="")
Delete all objects from memory and directory structure itself.
virtual Int_t Write(const char *=nullptr, Int_t=0, Int_t=0) override
Write this object to the current directory.
Definition TDirectory.h:265
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition TFile.h:172
TUrl fUrl
!URL of file
Definition TFile.h:188
virtual Long64_t GetEND() const
Definition TFile.h:309
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition TFile.cxx:3468
virtual Long64_t GetBytesWritten() const
Return the total number of bytes written so far to the file.
Definition TFile.cxx:4250
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:27
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
Definition TMemFile.cxx:254
void ResetAfterMerge(TFileMergeInfo *) override
Wipe all the data from the permanent buffer but keep, the in-memory object alive.
Definition TMemFile.cxx:339
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:120
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:183
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
std::unique_ptr< TArrayC > fClassSent
std::unique_ptr< TSocket > fSocket
Bool_t OpenConnection()
Attempts to connect to the server.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write memory objects to this file and upload them to the parallel merge server.
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Constructor.
void WriteStreamerInfo() override
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
void Close(Option_t *option="") override
Close a file.
Bool_t UploadAndReset()
Upload the current file data to the merging server.
Basic string class.
Definition TString.h:138
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
const char * GetHost() const
Definition TUrl.h:67
const char * GetOptions() const
Definition TUrl.h:71
Int_t GetPort() const
Definition TUrl.h:78
const Int_t n
Definition legend1.C:16