Logo ROOT  
Reference Guide
fastMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can receive TMemFile from multiple clients and merge them into a single file without block.

Note: This server assumes that the client will reset the histogram after each upload to simplify the merging.

This server can accept connections while handling currently open connections. Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x fastMergerServer.C
  • Execute in the second and third windows: .x treeClient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
void fastMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(9090, kTRUE);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
merger.SetPrintLevel(0);
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
while (1) {
TMessage *mess;
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientCount, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
delete transient;
transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
mess->SetBufferOffset(mess->Length()+length);
merger.OutputFile(filename,"UPDATE");
merger.AddAdoptFile(transient);
merger.PartialMerge(TFileMerger::kAllIncremental);
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
}
void Class()
Definition: Class.C:29
@ kMESS_STRING
Definition: MessageTypes.h:34
@ kMESS_ANY
Definition: MessageTypes.h:31
@ kMESS_OBJECT
Definition: MessageTypes.h:35
int Int_t
Definition: RtypesCore.h:43
unsigned int UInt_t
Definition: RtypesCore.h:44
const Bool_t kFALSE
Definition: RtypesCore.h:90
long long Long64_t
Definition: RtypesCore.h:71
const Bool_t kTRUE
Definition: RtypesCore.h:89
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
Definition: TBufferFile.h:398
void ReadLong64(Long64_t &l) override
Definition: TBufferFile.h:425
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:92
Int_t Length() const
Definition: TBuffer.h:99
char * Buffer() const
Definition: TBuffer.h:95
A cache when writing files over the network.
This class provides file copy and merging services.
Definition: TFileMerger.h:32
@ kAllIncremental
Merge incrementally all type of objects.
Definition: TFileMerger.h:79
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
UInt_t What() const
Definition: TMessage.h:75
TClass * GetClass() const
Definition: TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:389
virtual Bool_t IsValid() const
Definition: TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
static const std::string transient("transient")
static constexpr double s
Author
Fons Rademakers

Definition in file fastMergeServer.C.