This script shows how to make a simple iterative server that can accept connections while handling currently open connections.
Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
{
if (dir==
nullptr)
return kFALSE;
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
if (R__NeedInitialMerge(subdir)) {
}
} else {
}
}
}
}
{
if (dir==nullptr) return;
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
R__DeleteObject(subdir,withReset);
} else {
if (withReset) {
} else {
}
if (todelete) {
delete key;
}
}
}
}
{
if (destination==nullptr || source==nullptr) return;
while( (key = (
TKey*)nextkey()) ) {
if (!source_subdir) {
}
if (!destination_subdir) {
}
R__MigrateKey(destination,source);
} else {
if (oldkey) {
delete oldkey;
}
TKey *newkey =
new TKey(destination,*key,0 );
return;
}
}
}
}
struct ClientInfo
{
TFile *fFile;
TString fLocalName;
TTimeStamp fLastContact;
ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(
const char *
filename,
UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
}
void Set(TFile *file)
{
if (file != fFile) {
if (fFile) {
R__MigrateKey(fFile,file);
delete file;
} else {
fFile = file;
}
}
TTimeStamp now;
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger :
public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact;
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
{
if (writeCache)
new TFileCacheWrite(fMerger.
GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger() override
{
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
fprintf(stderr,
"Client %d reported %u times\n",
f,fClients[
f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
{
}
const char *
GetName()
const override
{
return fFilename;
}
{
}
{
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
}
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
R__DeleteObject(fClients[
f].fFile,
kTRUE);
} else {
R__DeleteObject(file,
kTRUE);
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
}
{
}
{
if (fClients.empty()) {
}
for(
unsigned int c = 0 ;
c < fClients.size(); ++
c) {
sum += fClients[
c].fTimeSincePrevContact;
sum2 += fClients[
c].fTimeSincePrevContact*fClients[
c].fTimeSincePrevContact;
}
TTimeStamp now;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.
CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(
UInt_t clientId, TFile *file)
{
++fNClientsContact;
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
};
void parallelMergeServer(bool cache = false) {
FILE *dummy =
gSystem->TempFileName(socketPath);
if (!dummy) {
Error(
"fastMergeServer",
"Cannot create temporary file for socket\n");
return;
}
std::string strSocketPath(socketPath.
View());
remove(strSocketPath.c_str());
fclose(dummy);
return;
}
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
while (true) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
} else {
client->
Send(clientIndex, kStartConnection);
client->
Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
printf("Accept %d connections\n",clientCount);
}
continue;
}
if (mess==nullptr) {
Error(
"fastMergeServer",
"The client did not send a message\n");
char str[64];
printf("Client %d: %s\n", clientCount, str);
printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->
GetBytesRecv(),
--clientCount;
if (mon->
GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
const Float_t clientThreshold = 0.75;
if (!info) {
info =
new ParallelFileMerger(
filename,cache);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = nullptr;
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
delete mon;
remove(strSocketPath.c_str());
delete ss;
}
bool Bool_t
Boolean (0=false, 1=true) (bool).
int Int_t
Signed integer 4 bytes (int).
unsigned long ULong_t
Unsigned long integer 4 bytes (unsigned long). Size depends on architecture.
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
float Float_t
Float 4 bytes (float).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
#define ClassDefOverride(name, id)
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t target
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
void Clear(Option_t *option="") override
Clear the value.
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
TClass instances represent classes, structs and namespaces in the ROOT type system.
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Describe directory structure in memory.
virtual TList * GetList() const
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
virtual TKey * GetKey(const char *, Short_t=9999) const
virtual void SaveSelf(Bool_t=kFALSE)
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
@ kIncremental
Merge the input file with the content of the output file (if already existing).
@ kResetable
Only the objects with a MergeAfterReset member function.
@ kAllIncremental
Merge incrementally all type of objects.
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write this object to the current directory.
void SumBuffer(Int_t bufsize)
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
THashTable implements a hash table to store TObject's.
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
void Delete(Option_t *option="") override
Delete an object from the file.
virtual const char * GetClassName() const
virtual TObject * ReadObj()
To read a TObject* from the file.
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=nullptr)
Write the encoded object supported by this key.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
TObject * Remove(TObject *obj) override
Remove object from the list.
A TMemFile is like a normal TFile except that it reads and writes only from memory.
TClass * GetClass() const
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
Bool_t TestBit(UInt_t f) const
virtual const char * GetName() const
Returns name of object.
virtual ULong_t Hash() const
Return hash value for this object.
This class implements server sockets.
This class implements client sockets.
UInt_t GetBytesRecv() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
UInt_t GetBytesSent() const
virtual void Close(Option_t *opt="")
Close the socket.
TClass * IsA() const override
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
std::string_view View() const
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Double_t AsDouble() const
Double_t Sqrt(Double_t x)
Returns the square root of x.
static uint64_t sum(uint64_t i)