Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
fastMergeServer.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_net
3/// This script shows how to make a simple iterative server that
4/// can receive TMemFile from multiple clients and merge them into
5/// a single file without block.
6///
7/// Note: This server assumes that the client will reset the histogram
8/// after each upload to simplify the merging.
9///
10/// This server can accept connections while handling currently open connections.
11/// Compare this script to hserv.C that blocks on accept.
12/// In this script a server socket is created and added to a monitor.
13/// A monitor object is used to monitor connection requests on
14/// the server socket. After accepting the connection
15/// the new socket is added to the monitor and immediately ready
16/// for use. Once two connections are accepted the server socket
17/// is removed from the monitor and closed. The monitor continues
18/// monitoring the sockets.
19///
20/// To run this demo do the following:
21/// - Open three windows
22/// - Start ROOT in all three windows
23/// - Execute in the first window: .x fastMergerServer.C
24/// - Execute in the second and third windows: .x treeClient.C
25///
26/// \macro_code
27///
28/// \author Fons Rademakers
29
30#include "TMessage.h"
31#include "TBenchmark.h"
32#include "TSocket.h"
33#include "TH2.h"
34#include "TTree.h"
35#include "TMemFile.h"
36#include "TRandom.h"
37#include "TError.h"
38#include "TFileMerger.h"
39
40#include "TServerSocket.h"
41#include "TPad.h"
42#include "TCanvas.h"
43#include "TMonitor.h"
44
45#include "TFileCacheWrite.h"
46
47void fastMergeServer(bool cache = false) {
48 // Open a server socket looking for connections on a named service or
49 // on a specified port.
50 //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
51 TServerSocket *ss = new TServerSocket(9090, kTRUE);
52 if (!ss->IsValid()) {
53 return;
54 }
55
56 TMonitor *mon = new TMonitor;
57
58 mon->Add(ss);
59
60 UInt_t clientCount = 0;
61 TMemFile *transient = nullptr;
62
64 merger.SetPrintLevel(0);
65
66 enum StatusKind {
67 kStartConnection = 0,
68 kProtocol = 1,
69
70 kProtocolVersion = 1
71 };
72 if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
73 while (true) {
74 TMessage *mess;
75 TSocket *s;
76
77 s = mon->Select();
78
79 if (s->IsA() == TServerSocket::Class()) {
80 if (clientCount > 100) {
81 printf("only accept 100 clients connections\n");
82 mon->Remove(ss);
83 ss->Close();
84 } else {
85 TSocket *client = ((TServerSocket *)s)->Accept();
86 client->Send(clientCount, kStartConnection);
87 client->Send(kProtocolVersion, kProtocol);
88 ++clientCount;
89 mon->Add(client);
90 printf("Accept %d connections\n",clientCount);
91 }
92 continue;
93 }
94
95 s->Recv(mess);
96
97 if (mess==nullptr) {
98 Error("fastMergeServer","The client did not send a message\n");
99 } else if (mess->What() == kMESS_STRING) {
100 char str[64];
101 mess->ReadString(str, 64);
102 printf("Client %d: %s\n", clientCount, str);
103 mon->Remove(s);
104 printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
105 s->GetBytesSent());
106 s->Close();
107 --clientCount;
108 if (mon->GetActive() == 0 || clientCount == 0) {
109 printf("No more active clients... stopping\n");
110 break;
111 }
112 } else if (mess->What() == kMESS_ANY) {
113
116 Int_t clientId;
117 mess->ReadInt(clientId);
118 mess->ReadTString(filename);
119 mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
120
121 Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
122
123 delete transient;
124 transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
125 mess->SetBufferOffset(mess->Length()+length);
126 merger.OutputFile(filename,"UPDATE");
127 merger.AddAdoptFile(transient);
128
129 merger.PartialMerge(TFileMerger::kAllIncremental);
130 transient = nullptr;
131 } else if (mess->What() == kMESS_OBJECT) {
132 printf("got object of class: %s\n", mess->GetClass()->GetName());
133 } else {
134 printf("*** Unexpected message ***\n");
135 }
136
137 delete mess;
138 }
139}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
int Int_t
Definition RtypesCore.h:45
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:94
long long Long64_t
Definition RtypesCore.h:69
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:79
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
UInt_t What() const
Definition TMessage.h:75
TClass * GetClass() const
Definition TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:214
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
static TClass * Class()
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
TClass * IsA() const override
Definition TSocket.h:171
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139