fastMergeServer.C: This script shows how to make a simple iterative server that
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TFileCacheWrite.h"
void fastMergeServer(bool cache = false) {
// This script shows how to make a simple iterative server that
// can receive TMemFile from multiple clients and merge them into
// a single file without block.
//
// Note: This server assumes that the client will reset the histogram
// after each upload to simplify the merging.
//
// This server can accept connections while handling currently open connections.
// Compare this script to hserv.C that blocks on accept.
// In this script a server socket is created and added to a monitor.
// A monitor object is used to monitor connection requests on
// the server socket. After accepting the connection
// the new socket is added to the monitor and immediately ready
// for use. Once two connections are accepted the server socket
// is removed from the monitor and closed. The monitor continues
// monitoring the sockets.
//
// To run this demo do the following:
// - Open three windows
// - Start ROOT in all three windows
// - Execute in the first window: .x fastMergerServer.C
// - Execute in the second and third windows: .x treeClient.C
//Author: Fons Rademakers
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(9090, kTRUE);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
TMemFile *transient = 0;
TFileMerger merger(kFALSE,kFALSE);
merger.SetPrintLevel(0);
enum StatusKind {
kStartConnection = 0
};
if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
while (1) {
TMessage *mess;
TSocket *s;
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientCount, kStartConnection);
++clientCount;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
delete transient;
transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
mess->SetBufferOffset(mess->Length()+length);
merger.OutputFile(filename);
merger.AddAdoptFile(transient);
merger.PartialMerge(TFileMerger::kAllIncremental);
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
}