23#include <sys/socket.h>
88 using Py_IsInitialized_type =
int (*)(
void);
89 using PyGILState_Ensure_type =
void* (*)(
void);
90 using PyGILState_Release_type =
void (*)(
void*);
91 void* fPyGILState_STATE =
nullptr;
92 template<
class FPTYPE>
93 FPTYPE GetSymT(
const char*
name) {
return (FPTYPE) dlsym(
nullptr,
name);}
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>(
"Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized())
return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>(
"PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>(
"PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
130 std::string basePath =
"/tmp/ROOTMP-";
135 unsigned nWorker = 0;
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
140 Error(
"TMPClient::Fork",
"[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
147 ROOT::Internal::TGILRAII tgilraai;
158 if (
s &&
s->IsValid()) {
162 Error(
"TMPClient::Fork",
"[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
179 if (signalHandlers && signalHandlers->
GetSize() > 0)
187 for (
auto h : *fileHandlers) {
197 while (lofact && (lofact->GetSize() > 0)) {
199 lofact.reset(
nullptr);
206 while (lofdeact && (lofdeact->GetSize() > 0)) {
208 lofdeact.reset(
nullptr);
227 server.
Init(sockets[1], nWorker);
263 if (count == nMessages)
269 Error(
"TMPClient:Broadcast",
"[E] Could not send message to server\n");
316 waitpid(pid,
nullptr, 0);
336 unsigned code = msg.first;
338 const char *str = ReadBuffer<const char*>(msg.second.get());
341 Error(
"TMPClient::HandleMPCode",
"[I][C] message received: %s\n", str);
343 Error(
"TMPClient::HandleMPCode",
"[E][C] error message received: %s\n", str);
346 Error(
"TMPClient::HandleMPCode",
"[I][C] shutdown notice received from %s\n", str);
349 Error(
"TMPClient::HandleMPCode",
"[W][C] unknown code received. code=%d\n", code);
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TGuiFactory * gBatchGuiFactory
R__EXTERN TGuiFactory * gGuiFactory
typedef void((*Func_t)())
R__EXTERN TSystem * gSystem
R__EXTERN TVirtualX * gGXBatch
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
unsigned fNWorkers
The number of workers that should be spawned upon forking.
TMPClient(unsigned nWorkers=0)
Class constructor.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
~TMPClient()
Class destructor.
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
void DeActivate(TSocket *s)
DeActivate a certain socket.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ActivateAll()
Activate all de-activated sockets.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Sequenceable collection abstract base class.
virtual TObject * First() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual TSeqCollection * GetListOfFileHandlers() const
virtual TSeqCollection * GetListOfSignalHandlers() const
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
@ kMessage
Generic message.
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kShutdownNotice
Used by the workers to notify client of shutdown.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static constexpr double s