ROOT  6.06/09
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1 /** @file BidirMMapPipe.h
2  *
3  * header file for BidirMMapPipe, a class which forks off a child process and
4  * serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 
10 #ifndef BIDIRMMAPPIPE_H
11 #define BIDIRMMAPPIPE_H
12 
13 #include <list>
14 #include <vector>
15 #include <cassert>
16 #include <cstring>
17 #include <unistd.h>
18 
19 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
20 #define END_NAMESPACE_ROOFIT }
21 
23 
24 /// namespace for implementation details of BidirMMapPipe
25 namespace BidirMMapPipe_impl {
26  // forward declarations
27  class BidirMMapPipeException;
28  class Page;
29  class PagePool;
30  class Pages;
31 
32  /** @brief class representing a chunk of pages
33  *
34  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
35  * @date 2013-07-24
36  *
37  * allocating pages from the OS happens in chunks in order to not exhaust
38  * the maximum allowed number of memory mappings per process; this class
39  * takes care of such a chunk
40  *
41  * a page chunk allows callers to obtain or release pages in groups of
42  * continuous pages of fixed size
43  */
44  class PageChunk {
45  public:
46  /// type of mmap support found
47  typedef enum {
48  Unknown, ///< don't know yet what'll work
49  Copy, ///< mmap doesn't work, have to copy back and forth
50  FileBacked, ///< mmapping a temp file works
51  DevZero, ///< mmapping /dev/zero works
52  Anonymous ///< anonymous mmap works
53  } MMapVariety;
54 
55  private:
56  static unsigned s_pagesize; ///< system page size (run-time determined)
57  /// mmap variety that works on this system
59 
60  /// convenience typedef
61  typedef BidirMMapPipeException Exception;
62 
63  void* m_begin; ///< pointer to start of mmapped area
64  void* m_end; ///< pointer one behind end of mmapped area
65  // FIXME: cannot keep freelist inline - other end may need that
66  // data, and we'd end up overwriting the page header
67  std::list<void*> m_freelist; ///< free pages list
68  PagePool* m_parent; ///< parent page pool
69  unsigned m_nPgPerGrp; ///< number of pages per group
70  unsigned m_nUsedGrp; ///< number of used page groups
71 
72  /// determine page size at run time
73  static unsigned getPageSize();
74 
75  /// mmap pages, len is length of mmapped area in bytes
76  static void* dommap(unsigned len);
77  /// munmap pages p, len is length of mmapped area in bytes
78  static void domunmap(void* p, unsigned len);
79  /// forbid copying
80  PageChunk(const PageChunk&) {}
81  /// forbid assignment
82  PageChunk& operator=(const PageChunk&) { return *this; }
83  public:
84  /// return the page size of the system
85  static unsigned pagesize() { return s_pagesize; }
86  /// return mmap variety support found
87  static MMapVariety mmapVariety() { return s_mmapworks; }
88 
89  /// constructor
90  PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
91 
92  /// destructor
93  ~PageChunk();
94 
95  /// return if p is contained in this PageChunk
96  bool contains(const Pages& p) const;
97 
98  /// pop a group of pages off the free list
99  Pages pop();
100 
101  /// push a group of pages onto the free list
102  void push(const Pages& p);
103 
104  /// return length of chunk
105  unsigned len() const
106  {
107  return reinterpret_cast<unsigned char*>(m_end) -
108  reinterpret_cast<unsigned char*>(m_begin);
109  }
110  /// return number of pages per page group
111  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
112 
113  /// return true if no used page groups in this chunk
114  bool empty() const { return !m_nUsedGrp; }
115 
116  /// return true if no free page groups in this chunk
117  bool full() const { return m_freelist.empty(); }
118 
119  /// free all pages except for those pointed to by p
120  void zap(Pages& p);
121  };
122 
123  /** @brief handle class for a number of Pages
124  *
125  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
126  * @date 2013-07-24
127  *
128  * the associated pages are continuous in memory
129  */
130  class Pages {
131  private:
132  /// implementation
133  typedef struct {
134  PageChunk *m_parent; ///< pointer to parent pool
135  Page* m_pages; ///< pointer to first page
136  unsigned m_refcnt; ///< reference counter
137  unsigned char m_npages; ///< length in pages
138  } impl;
139  public:
140  /// default constructor
141  Pages() : m_pimpl(0) { }
142 
143  /// destructor
144  ~Pages();
145 
146  /** @brief copy constructor
147  *
148  * copy Pages handle to new object - old object loses ownership,
149  * and becomes a dangling handle
150  */
151  Pages(const Pages& other);
152 
153  /** @brief assignment operator
154  *
155  * assign Pages handle to new object - old object loses ownership,
156  * and becomes a dangling handle
157  */
158  Pages& operator=(const Pages& other);
159 
160  /// return page size
161  static unsigned pagesize();
162 
163  /// return number of pages accessible
164  unsigned npages() const { return m_pimpl->m_npages; }
165 
166  /// return page number pageno
167  Page* page(unsigned pgno) const;
168 
169  /// return page number pageno
170  Page* operator[](unsigned pgno) const { return page(pgno); }
171 
172  /// perform page to page number mapping
173  unsigned pageno(Page* p) const;
174 
175  /// perform page to page number mapping
176  unsigned operator[](Page* p) const { return pageno(p); }
177 
178  /// swap with other's contents
179  void swap(Pages& other)
180  {
181  impl* tmp = other.m_pimpl;
182  other.m_pimpl = m_pimpl;
183  m_pimpl = tmp;
184  }
185 
186  private:
187  /// page pool is our friend - it's allowed to construct Pages
189 
190  /// pointer to implementation
192 
193  /// constructor
194  Pages(PageChunk* parent, Page* pages, unsigned npg);
195  };
196 }
197 
198 /** @brief BidirMMapPipe creates a bidirectional channel between the current
199  * process and a child it forks.
200  *
201  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
202  * @date 2013-07-07
203  *
204  * This class creates a bidirectional channel between this process and a child
205  * it creates with fork().
206  *
207  * The channel is comrised of a small shared pool of buffer memory mmapped into
208  * both process spaces, and two pipes to synchronise the exchange of data. The
209  * idea behind using the pipes at all is to have some primitive which we can
210  * block on without having to worry about atomic operations or polling, leaving
211  * these tasks to the OS. In case the anonymous mmap cannot be performed on the
212  * OS the code is running on (for whatever reason), the code falls back to
213  * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
214  * dynamically allocated buffer which is then transmitted through the pipe(s),
215  * a slightly slower alternative (because the data is copied more often).
216  *
217  * The channel supports five major operations: read(), write(), flush(),
218  * purge() and close(). Reading and writing may block until the required buffer
219  * space is available. Writes may queue up data to be sent to the other end
220  * until either enough pages are full, or the user calls flush which forces
221  * any unsent buffers to be sent to the other end. flush forces any data that
222  * is to be sent to be sent. purge discards any buffered data waiting to be
223  * read and/or sent. Closing the channel on the child returns zero, closing it
224  * on the parent returns the child's exit status.
225  *
226  * The class also provides operator<< and operator>> for C++-style I/O for
227  * basic data types (bool, char, short, int, long, long long, float, double
228  * and their unsigned counterparts). Data is transmitted binary (i.e. no
229  * formatting to strings like std::cout does). There are also overloads to
230  * support C-style zero terminated strings and std::string. In terms of
231  * performance, the former is to be preferred.
232  *
233  * If the caller needs to multiplex input and output to/from several pipes, the
234  * class provides the poll() method which allows to block until an event occurs
235  * on any of the polled pipes.
236  *
237  * After the BidirMMapPipe is closed, no further operations may be performed on
238  * that object, save for the destructor which may still be called.
239  *
240  * If the BidirMMapPipe has not properly been closed, the destructor will call
241  * close. However, the exit code of the child is lost in that case.
242  *
243  * Closing the object causes the mmapped memory to be unmapped and the two
244  * pipes to be closed. We also install an atexit handler in the process of
245  * creating BidirMMapPipes. This ensures that when the current process
246  * terminates, a SIGTERM signal is sent to the child processes created for all
247  * unclosed pipes to avoid leaving zombie processes in the OS's process table.
248  *
249  * BidirMMapPipe creation, closing and destruction are thread safe. If the
250  * BidirMMapPipe is used in more than one thread, the other operations have to
251  * be protected with a mutex (or something similar), though.
252  *
253  * End of file (other end closed its pipe, or died) is indicated with the eof()
254  * method, serious I/O errors set a flags (bad(), fail()), and also throw
255  * exceptions. For normal read/write operations, they can be suppressed (i.e.
256  * error reporting only using flags) with a constructor argument.
257  *
258  * Technicalities:
259  * - there is a pool of mmapped pages, half the pages are allocated to the
260  * parent process, half to the child
261  * - when one side has accumulated enough data (or a flush forces dirty pages
262  * out to the other end), it sends these pages to the other end by writing a
263  * byte containing the page number into the pipe
264  * - the other end (which has the pages mmapped, too) reads the page number(s)
265  * and puts the corresponding pages on its busy list
266  * - as the other ends reads, it frees busy pages, and eventually tries to put
267  * them on the its list; if a page belongs to the other end of the
268  * connection, it is sent back
269  * - lists of pages are sent across the pipe, not individual pages, in order
270  * to minimise the number of read/write operations needed
271  * - when mmap works properly, only one bytes containing the page number of
272  * the page list head is sent back and forth; the contents of that page
273  * allow to access the rest of the page list sent, and page headers on the
274  * list tell the receiving end if the page is free or has to be added to the
275  * busy list
276  * - when mmap does not work, we transfer one byte to indicate the head of the
277  * page list sent, and for each page on the list of sent pages, the page
278  * header and the page payload is sent (if the page is free, we only
279  * transmit the page header, and we never transmit more payload than
280  * the page actually contains)
281  * - in the child, all open BidirMMapPipes but the current one are closed. this
282  * is done for two reasons: first, to conserve file descriptors and address
283  * space. second, if more than one process is meant to use such a
284  * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
285  * are hard to find and understand. it's much better to come up with a design
286  * which does not need pipes to be shared among more than two processes.
287  *
288  * Here is a trivial example of a parent and a child talking to each other over
289  * a BidirMMapPipe:
290  * @code
291  * #include <string>
292  * #include <iostream>
293  * #include <cstdlib>
294  *
295  * #include "BidirMMapPipe.h"
296  *
297  * int simplechild(BidirMMapPipe& pipe)
298  * {
299  * // child does an echo loop
300  * while (pipe.good() && !pipe.eof()) {
301  * // read a string
302  * std::string str;
303  * pipe >> str;
304  * if (!pipe) return -1;
305  * if (pipe.eof()) break;
306  * // check if parent wants us to shut down
307  * if (!str.empty()) {
308  * std::cout << "[CHILD] : read: " << str << std::endl;
309  * str = "... early in the morning?";
310  * }
311  * pipe << str << BidirMMapPipe::flush;
312  * if (str.empty()) break;
313  * if (!pipe) return -1;
314  * std::cout << "[CHILD] : wrote: " << str << std::endl;
315  * }
316  * // send shutdown request acknowledged
317  * pipe << "" << BidirMMapPipe::flush;
318  *
319  * pipe.close();
320  * return 0;
321  * }
322  *
323  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
324  * {
325  * BidirMMapPipe *p = new BidirMMapPipe();
326  * if (p->isChild()) {
327  * int retVal = childexec(*p);
328  * delete p;
329  * std::exit(retVal);
330  * }
331  * return p;
332  * }
333  *
334  * int main()
335  * {
336  * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
337  * std::endl;
338  * BidirMMapPipe* pipe = spawnChild(simplechild);
339  * for (int i = 0; i < 5; ++i) {
340  * std::string str("What shall we do with a drunken sailor...");
341  * *pipe << str << BidirMMapPipe::flush;
342  * if (!*pipe) return -1;
343  * std::cout << "[PARENT]: wrote: " << str << std::endl;
344  * *pipe >> str;
345  * if (!*pipe) return -1;
346  * std::cout << "[PARENT]: read: " << str << std::endl;
347  * }
348  * // ask child to shut down
349  * pipe << "" << BidirMMapPipe::flush;
350  * // wait for it to see the shutdown request
351  * std::string s;
352  * pipe >> s;
353  * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
354  * std::endl;
355  * delete pipe;
356  * return 0;
357  * }
358  * @endcode
359  *
360  * When designing your own protocols to use over the pipe, there are a few
361  * things to bear in mind:
362  * - Do as http does: When building a request, send all the options and
363  * properties of that request with the request itself in a single go (one
364  * flush). Then, the server has everything it needs, and hopefully, it'll
365  * shut up for a while and to let the client do something useful in the
366  * meantime... The same goes when the server replies to the request: include
367  * everything there is to know about the result of the request in the reply.
368  * - The expensive operation should be the request that is made, all other
369  * operations should somehow be formulated as options or properties to that
370  * request.
371  * - Include a shutdown handshake in whatever protocol you send over the
372  * pipe. That way, you can shut things down in a controlled way. Otherwise,
373  * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
374  * one end closes its pipe while the other is still trying to read.
375  */
377 #ifndef _WIN32
378  public:
379  /// type used to represent sizes
381  /// convenience typedef for BidirMMapPipeException
382  typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
383  /// flag bits for partial C++ iostream compatibility
384  enum {
385  eofbit = 1, ///< end of file reached
386  failbit = 2, ///< logical failure (e.g. pipe closed)
387  rderrbit = 4, ///< read error
388  wrerrbit = 8, ///< write error
389  badbit = rderrbit | wrerrbit, ///< general I/O error
390  exceptionsbit = 16 ///< error reporting with exceptions
391  };
392 
393  /** @brief constructor (forks!)
394  *
395  * Creates a bidirectional communications channel between this process
396  * and a child the constructor forks. On return from the constructor,
397  * isParent() and isChild() can be used to tell the parent end from the
398  * child end of the pipe. In the child, all other open BidirMMapPipes
399  * are closed.
400  *
401  * @param useExceptions read()/write() error reporting also done using
402  * exceptions
403  * @param useSocketpair use a socketpair instead of a pair or pipes
404  *
405  * Normally, exceptions are thrown for all serious I/O errors (apart
406  * from end of file). Setting useExceptions to false will force the
407  * read() and write() methods to only report serious I/O errors using
408  * flags.
409  *
410  * When useSocketpair is true, use a pair of Unix domain sockets
411  * created using socketpair instead a pair of pipes. The advantage is
412  * that only one pair of file descriptors is needed instead of two
413  * pairs which are needed for the pipe pair. Performance should very
414  * similar on most platforms, especially if mmap works, since only
415  * very little data is sent through the pipe(s)/socketpair.
416  */
417  BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
418 
419  /** @brief destructor
420  *
421  * closes this end of pipe
422  */
423  ~BidirMMapPipe();
424 
425  /** @brief return the current setting of the debug flag
426  *
427  * @returns an integer with the debug Setting
428  */
429  static int debugflag() { return s_debugflag; }
430 
431  /** @brief set the debug flags
432  *
433  * @param flag debug flags (if zero, no messages are printed)
434  */
435  static void setDebugflag(int flag) { s_debugflag = flag; }
436 
437  /** @brief read from pipe
438  *
439  * @param addr where to put read data
440  * @param sz size of data to read (in bytes)
441  * @returns size of data read, or 0 in case of end-of-file
442  *
443  * read may block until data from other end is available. It will
444  * return 0 if the other end closed the pipe.
445  */
446  size_type read(void* addr, size_type sz);
447 
448  /** @brief wirte to pipe
449  *
450  * @param addr where to get data to write from
451  * @param sz size of data to write (in bytes)
452  * @returns size of data written, or 0 in case of end-of-file
453  *
454  * write may block until data can be written to other end (depends a
455  * bit on available buffer space). It will return 0 if the other end
456  * closed the pipe. The data is queued to be written on the next
457  * convenient occasion, or it can be forced out with flush().
458  */
459  size_type write(const void* addr, size_type sz);
460 
461  /** @brief flush buffers with unwritten data
462  *
463  * This forces unwritten data to be written to the other end. The call
464  * will block until this has been done (or the attempt failed with an
465  * error).
466  */
467  void flush();
468 
469  /** @brief purge buffered data waiting to be read and/or written
470  *
471  * Discards all internal buffers.
472  */
473  void purge();
474 
475  /** @brief number of bytes that can be read without blocking
476  *
477  * @returns number of bytes that can be read without blocking
478  */
479  size_type bytesReadableNonBlocking();
480 
481  /** @brief number of bytes that can be written without blocking
482  *
483  * @returns number of bytes that can be written without blocking
484  */
485  size_type bytesWritableNonBlocking();
486 
487  /** @brief flush buffers, close pipe
488  *
489  * Flush buffers, discard unread data, closes the pipe. If the pipe is
490  * in the parent process, it waits for the child.
491  *
492  * @returns exit code of child process in parent, zero in child
493  */
494  int close();
495 
496  /** @brief return PID of the process on the other end of the pipe
497  *
498  * @returns PID of the process running on the remote end
499  */
500  pid_t pidOtherEnd() const
501  { return isChild() ? m_parentPid : m_childPid; }
502 
503  /// condition flags for poll
504  enum PollFlags {
505  None = 0, ///< nothing special on this pipe
506  Readable = 1, ///< pipe has data for reading
507  Writable = 2, ///< pipe can be written to
508  ReadError = 4, ///< pipe error read end
509  WriteError = 8, ///< pipe error Write end
510  Error = ReadError | WriteError, ///< pipe error
511  ReadEndOfFile = 32, ///< read pipe in end-of-file state
512  WriteEndOfFile = 64,///< write pipe in end-of-file state
513  EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
514  ReadInvalid = 64, ///< read end of pipe invalid
515  WriteInvalid = 128, ///< write end of pipe invalid
516  Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
517  };
518 
519  /// for poll() interface
520  class PollEntry {
521  public:
522  BidirMMapPipe* pipe; ///< pipe of interest
523  unsigned events; ///< events of interest (or'ed bitmask)
524  unsigned revents; ///< events that happened (or'ed bitmask)
525  /// poll a pipe for all events
527  pipe(_pipe), events(None), revents(None) { }
528  /// poll a pipe for specified events
529  PollEntry(BidirMMapPipe* _pipe, int _events) :
530  pipe(_pipe), events(_events), revents(None) { }
531  };
532  /// convenience typedef for poll() interface
533  typedef std::vector<PollEntry> PollVector;
534 
535  /** @brief poll a set of pipes for events (ready to read from, ready to
536  * write to, error)
537  *
538  * @param pipes set of pipes to check
539  * @param timeout timeout in milliseconds
540  * @returns positive number: number of pipes which have
541  * status changes, 0: timeout, or no pipes with
542  * status changed, -1 on error
543  *
544  * Timeout can be zero (check for specified events, and return), finite
545  * (wait at most timeout milliseconds before returning), or -1
546  * (infinite). The poll method returns when the timeout has elapsed,
547  * or if an event occurs on one of the pipes being polled, whichever
548  * happens earlier.
549  *
550  * Pipes is a vector of one or more PollEntries, which each list a pipe
551  * and events to poll for. If events is left empty (zero), all
552  * conditions are polled for, otherwise only the indicated ones. On
553  * return, the revents fields contain the events that occurred for each
554  * pipe; error Error, EndOfFile or Invalid events are always set,
555  * regardless of wether they were in the set of requested events.
556  *
557  * poll may block slightly longer than specified by timeout due to OS
558  * timer granularity and OS scheduling. Due to its implementation, the
559  * poll call can also return early if the remote end of the page sends
560  * a free page while polling (which is put on that pipe's freelist),
561  * while that pipe is polled for e.g Reading. The status of the pipe is
562  * indicated correctly in revents, and the caller can simply poll
563  * again. (The reason this is done this way is because it helps to
564  * replenish the pool of free pages and queue busy pages without
565  * blocking.)
566  *
567  * Here's a piece of example code waiting on two pipes; if they become
568  * readable they are read:
569  * @code
570  * #include <unistd.h>
571  * #include <cstdlib>
572  * #include <string>
573  * #include <sstream>
574  * #include <iostream>
575  *
576  * #include "BidirMMapPipe.h"
577  *
578  * // what to execute in the child
579  * int randomchild(BidirMMapPipe& pipe)
580  * {
581  * ::srand48(::getpid());
582  * for (int i = 0; i < 5; ++i) {
583  * // sleep a random time between 0 and .9 seconds
584  * ::usleep(int(1e6 * ::drand48()));
585  * std::ostringstream buf;
586  * buf << "child pid " << ::getpid() << " sends message " << i;
587  * std::cout << "[CHILD] : " << buf.str() << std::endl;
588  * pipe << buf.str() << BidirMMapPipe::flush;
589  * if (!pipe) return -1;
590  * if (pipe.eof()) break;
591  * }
592  * // tell parent we're done
593  * pipe << "" << BidirMMapPipe::flush;
594  * // wait for parent to acknowledge
595  * std::string s;
596  * pipe >> s;
597  * pipe.close();
598  * return 0;
599  * }
600  *
601  * // function to spawn a child
602  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
603  * {
604  * BidirMMapPipe *p = new BidirMMapPipe();
605  * if (p->isChild()) {
606  * int retVal = childexec(*p);
607  * delete p;
608  * std::exit(retVal);
609  * }
610  * return p;
611  * }
612  *
613  * int main()
614  * {
615  * typedef BidirMMapPipe::PollEntry PollEntry;
616  * // poll data structure
617  * BidirMMapPipe::PollVector pipes;
618  * pipes.reserve(3);
619  * // spawn children
620  * for (int i = 0; i < 3; ++i) {
621  * pipes.push_back(PollEntry(spawnChild(randomchild),
622  * BidirMMapPipe::Readable));
623  * }
624  * // while at least some children alive
625  * while (!pipes.empty()) {
626  * // poll, wait until status change (infinite timeout)
627  * int npipes = BidirMMapPipe::poll(pipes, -1);
628  * // scan for pipes with changed status
629  * for (std::vector<PollEntry>::iterator it = pipes.begin();
630  * npipes && pipes.end() != it; ) {
631  * if (!it->revents) {
632  * // unchanged, next one
633  * ++it;
634  * continue;
635  * }
636  * --npipes; // maybe we can stop early...
637  * // read from pipes which are readable
638  * if (it->revents & BidirMMapPipe::Readable) {
639  * std::string s;
640  * *(it->pipe) >> s;
641  * if (!s.empty()) {
642  * std::cout << "[PARENT]: Read from pipe " <<
643  * it->pipe << ": " << s << std::endl;
644  * ++it;
645  * continue;
646  * } else {
647  * // child is shutting down...
648  * *(it->pipe) << "" << BidirMMapPipe::flush;
649  * goto childcloses;
650  * }
651  * }
652  * // retire pipes with error or end-of-file condition
653  * if (it->revents & (BidirMMapPipe::Error |
654  * BidirMMapPipe::EndOfFile |
655  * BidirMMapPipe::Invalid)) {
656  * std::cout << "[PARENT]: Error on pipe " <<
657  * it->pipe << " revents " << it->revents <<
658  * std::endl;
659  * childcloses:
660  * std::cout << "[PARENT]:\tchild exit status: " <<
661  * it->pipe->close() << std::endl;
662  * if (retVal) return retVal;
663  * delete it->pipe;
664  * it = pipes.erase(it);
665  * continue;
666  * }
667  * }
668  * }
669  * return 0;
670  * }
671  * @endcode
672  */
673  static int poll(PollVector& pipes, int timeout);
674 
675  /** @brief return if this end of the pipe is the parent end
676  *
677  * @returns true if parent end of pipe
678  */
679  bool isParent() const { return m_childPid; }
680 
681  /** @brief return if this end of the pipe is the child end
682  *
683  * @returns true if child end of pipe
684  */
685  bool isChild() const { return !m_childPid; }
686 
687  /** @brief if BidirMMapPipe uses a socketpair for communications
688  *
689  * @returns true if BidirMMapPipe uses a socketpair for communications
690  */
691  bool usesSocketpair() const { return m_inpipe == m_outpipe; }
692 
693  /** @brief if BidirMMapPipe uses a pipe pair for communications
694  *
695  * @returns true if BidirMMapPipe uses a pipe pair for communications
696  */
697  bool usesPipepair() const { return m_inpipe != m_outpipe; }
698 
699  /** @brief return flags (end of file, BidirMMapPipe closed, ...)
700  *
701  * @returns flags (end of file, BidirMMapPipe closed, ...)
702  */
703  int rdstate() const { return m_flags; }
704 
705  /** @brief true if end-of-file
706  *
707  * @returns true if end-of-file
708  */
709  bool eof() const { return m_flags & eofbit; }
710 
711  /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
712  *
713  * @returns true in case of grave logical error (I/O on closed pipe,...)
714  */
715  bool fail() const { return m_flags & failbit; }
716 
717  /** @brief true on I/O error
718  *
719  * @returns true on I/O error
720  */
721  bool bad() const { return m_flags & badbit; }
722 
723  /** @brief status of stream is good
724  *
725  * @returns true if pipe is good (no errors, eof, ...)
726  */
727  bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
728 
729  /** @brief true if closed
730  *
731  * @returns true if stream is closed
732  */
733  bool closed() const { return m_flags & failbit; }
734 
735  /** @brief return true if not serious error (fail/bad)
736  *
737  * @returns true if stream is does not have serious error (fail/bad)
738  *
739  * (if EOF, this is still true)
740  */
741  operator bool() const { return !fail() && !bad(); }
742 
743  /** @brief return true if serious error (fail/bad)
744  *
745  * @returns true if stream has a serious error (fail/bad)
746  */
747  bool operator!() const { return fail() || bad(); }
748 
749 #ifdef STREAMOP
750 #undef STREAMOP
751 #endif
752 #define STREAMOP(TYPE) \
753  BidirMMapPipe& operator<<(const TYPE& val) \
754  { write(&val, sizeof(TYPE)); return *this; } \
755  BidirMMapPipe& operator>>(TYPE& val) \
756  { read(&val, sizeof(TYPE)); return *this; }
757  STREAMOP(bool); ///< C++ style stream operators for bool
758  STREAMOP(char); ///< C++ style stream operators for char
759  STREAMOP(short); ///< C++ style stream operators for short
760  STREAMOP(int); ///< C++ style stream operators for int
761  STREAMOP(long); ///< C++ style stream operators for long
762  STREAMOP(long long); ///< C++ style stream operators for long long
763  STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
764  STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
765  STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
766  STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
767  STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
768  STREAMOP(float); ///< C++ style stream operators for float
769  STREAMOP(double); ///< C++ style stream operators for double
770 #undef STREAMOP
771 
772  /** @brief write a C-style string
773  *
774  * @param str C-style string
775  * @returns pipe written to
776  */
777  BidirMMapPipe& operator<<(const char* str);
778 
779  /** @brief read a C-style string
780  *
781  * @param str pointer to string (space allocated with malloc!)
782  * @returns pipe read from
783  *
784  * since this is for C-style strings, we use malloc/realloc/free for
785  * strings. passing in a NULL pointer is valid here, and the routine
786  * will use realloc to allocate a chunk of memory of the right size.
787  */
788  BidirMMapPipe& operator>>(char* (&str));
789 
790  /** @brief write a std::string object
791  *
792  * @param str string to write
793  * @returns pipe written to
794  */
795  BidirMMapPipe& operator<<(const std::string& str);
796 
797  /** @brief read a std::string object
798  *
799  * @param str string to be read
800  * @returns pipe read from
801  */
802  BidirMMapPipe& operator>>(std::string& str);
803 
804  /** @brief write raw pointer to T to other side
805  *
806  * NOTE: This will not write the pointee! Only the value of the
807  * pointer is transferred.
808  *
809  * @param tptr pointer to be written
810  * @returns pipe written to
811  */
812  template<class T> BidirMMapPipe& operator<<(const T* tptr)
813  { write(&tptr, sizeof(tptr)); return *this; }
814 
815  /** @brief read raw pointer to T from other side
816  *
817  * NOTE: This will not read the pointee! Only the value of the
818  * pointer is transferred.
819  *
820  * @param tptr pointer to be read
821  * @returns pipe read from
822  */
823  template<class T> BidirMMapPipe& operator>>(T* &tptr)
824  { read(&tptr, sizeof(tptr)); return *this; }
825 
826  /** @brief I/O manipulator support
827  *
828  * @param f manipulator
829  * @returns pipe with manipulator applied
830  *
831  * example:
832  * @code
833  * pipe << BidirMMapPipe::flush;
834  * @endcode
835  */
837  { return manip(*this); }
838 
839  /** @brief I/O manipulator support
840  *
841  * @param f manipulator
842  * @returns pipe with manipulator applied
843  *
844  * example:
845  * @code
846  * pipe >> BidirMMapPipe::purge;
847  * @endcode
848  */
850  { return manip(*this); }
851 
852  /// for usage a la "pipe << flush;"
853  static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
854  /// for usage a la "pipe << purge;"
855  static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
856 
857  private:
858  /// copy-construction forbidden
860  /// assignment forbidden
861  BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
862 
863  /// page is our friend
865  /// convenience typedef for Page
867 
868  /// tuning constants
869  enum {
870  // TotPages = 16 will give 32k buffers at 4k page size for both
871  // parent and child; if your average message to send is larger
872  // than this, consider raising the value (max 256)
873  TotPages = 16, ///< pages shared (child + parent)
874 
875  PagesPerEnd = TotPages / 2, ///< pages per pipe end
876 
877  // if FlushThresh pages are filled, the code forces a flush; 3/4
878  // of the pages available seems to work quite well
879  FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
880  };
881 
882  // per-class members
883  static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
884  /// list of open BidirMMapPipes
885  static std::list<BidirMMapPipe*> s_openpipes;
886  /// pool of mmapped pages
887  static BidirMMapPipe_impl::PagePool* s_pagepool;
888  /// page pool reference counter
889  static unsigned s_pagepoolrefcnt;
890  /// debug flag
891  static int s_debugflag;
892 
893  /// return page pool
894  static BidirMMapPipe_impl::PagePool& pagepool();
895 
896  // per-instance members
897  BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
898  Page* m_busylist; ///< linked list: busy pages (data to be read)
899  Page* m_freelist; ///< linked list: free pages
900  Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
901  int m_inpipe; ///< pipe end from which data may be read
902  int m_outpipe; ///< pipe end to which data may be written
903  int m_flags; ///< flags (e.g. end of file)
904  pid_t m_childPid; ///< pid of the child (zero if we're child)
905  pid_t m_parentPid; ///< pid of the parent
906 
907  /// cleanup routine - at exit, we want our children to get a SIGTERM...
908  static void teardownall(void);
909 
910  /// return length of a page list
911  static unsigned lenPageList(const Page* list);
912 
913  /** "feed" the busy and free lists with a list of pages
914  *
915  * @param plist linked list of pages
916  *
917  * goes through plist, puts free pages from plist onto the freelist
918  * (or sends them to the remote end if they belong there), and puts
919  * non-empty pages on plist onto the busy list
920  */
921  void feedPageLists(Page* plist);
922 
923  /// put on dirty pages list
924  void markPageDirty(Page* p);
925 
926  /// transfer bytes through the pipe (reading, writing, may block)
927  static size_type xferraw(int fd, void* addr, size_type len,
928  ssize_t (*xferfn)(int, void*, std::size_t));
929  /// transfer bytes through the pipe (reading, writing, may block)
930  static size_type xferraw(int fd, void* addr, const size_type len,
931  ssize_t (*xferfn)(int, const void*, std::size_t))
932  {
933  return xferraw(fd, addr, len,
934  reinterpret_cast<ssize_t (*)(
935  int, void*, std::size_t)>(xferfn));
936  }
937 
938  /** @brief send page(s) to the other end (may block)
939  *
940  * @param plist linked list of pages to send
941  *
942  * the implementation gathers the different write(s) whereever
943  * possible; if mmap works, this results in a single write to transfer
944  * the list of pages sent, if we need to copy things through the pipe,
945  * we have one write to transfer which pages are sent, and then one
946  * write per page.
947  */
948  void sendpages(Page* plist);
949 
950  /** @brief receive a pages from the other end (may block), queue them
951  *
952  * @returns number of pages received
953  *
954  * this is an application-level scatter read, which gets the list of
955  * pages to read from the pipe. if mmap works, it needs only one read
956  * call (to get the head of the list of pages transferred). if we need
957  * to copy pages through the pipe, we need to add one read for each
958  * empty page, and two reads for each non-empty page.
959  */
960  unsigned recvpages();
961 
962  /** @brief receive pages from other end (non-blocking)
963  *
964  * @returns number of pages received
965  *
966  * like recvpages(), but does not block if nothing is available for
967  * reading
968  */
969  unsigned recvpages_nonblock();
970 
971  /// get a busy page to read data from (may block)
972  Page* busypage();
973  /// get a dirty page to write data to (may block)
974  Page* dirtypage();
975 
976  /// close the pipe (no flush if forced)
977  int doClose(bool force, bool holdlock = false);
978  /// perform the flush
979  void doFlush(bool forcePartialPages = true);
980 #endif //_WIN32
981 };
982 
984 
985 #undef BEGIN_NAMESPACE_ROOFIT
986 #undef END_NAMESPACE_ROOFIT
987 
988 #endif // BIDIRMMAPPIPE_H
989 
990 // vim: ft=cpp:sw=4:tw=78
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
std::size_t size_type
type used to represent sizes
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
bool eof() const
true if end-of-file
error reporting with exceptions
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
pages per pipe end
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
impl * m_pimpl
pointer to implementation
double T(double x)
Definition: ChebyshevPol.h:34
unsigned m_refcnt
reference counter
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or'ed bitmask)
general I/O error
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
size_t
Definition: TBuffer.cxx:28
pid_t m_childPid
pid of the child (zero if we're child)
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:87
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
PollFlags
condition flags for poll
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:20
void sendpages(Page *plist)
send page(s) to the other end (may block)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
pipe can be written to
Page * busypage()
get a busy page to read data from (may block)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
Page * operator[](unsigned pgno) const
return page number pageno
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static unsigned lenPageList(const Page *list)
return length of a page list
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
bool fail() const
logical failure (e.g.
end of file reached
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
Pages pop()
pop a group of pages off the free list
static void setDebugflag(int flag)
set the debug flags
logical failure (e.g. pipe closed)
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool full() const
return true if no free page groups in this chunk
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
friend class BidirMMapPipe_impl::Page
page is our friend
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
pages shared (child + parent)
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:82
void zap(Pages &p)
free all pages except for those pointed to by p
unsigned operator[](Page *p) const
perform page to page number mapping
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
STREAMOP(bool)
C++ style stream operators for bool.
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
nothing special on this pipe
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
int m_flags
flags (e.g. end of file)
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the page size of the system
Definition: BidirMMapPipe.h:85
unsigned pageno(Page *p) const
perform page to page number mapping
bool operator!() const
return true if serious error (fail/bad)