Logo ROOT   master
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1 /** @file BidirMMapPipe.h
2  *
3  * header file for BidirMMapPipe, a class which forks off a child process and
4  * serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 
10 #ifndef BIDIRMMAPPIPE_H
11 #define BIDIRMMAPPIPE_H
12 
13 #include <list>
14 #include <vector>
15 #include <cstring>
16 #include <unistd.h>
17 
18 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
19 #define END_NAMESPACE_ROOFIT }
20 
22 
23 /// namespace for implementation details of BidirMMapPipe
24 namespace BidirMMapPipe_impl {
25  // forward declarations
26  class BidirMMapPipeException;
27  class Page;
28  class PagePool;
29  class Pages;
30 
31  /** @brief class representing a chunk of pages
32  *
33  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
34  * @date 2013-07-24
35  *
36  * allocating pages from the OS happens in chunks in order to not exhaust
37  * the maximum allowed number of memory mappings per process; this class
38  * takes care of such a chunk
39  *
40  * a page chunk allows callers to obtain or release pages in groups of
41  * continuous pages of fixed size
42  */
43  class PageChunk {
44  public:
45  /// type of mmap support found
46  typedef enum {
47  Unknown, ///< don't know yet what'll work
48  Copy, ///< mmap doesn't work, have to copy back and forth
49  FileBacked, ///< mmapping a temp file works
50  DevZero, ///< mmapping /dev/zero works
51  Anonymous ///< anonymous mmap works
52  } MMapVariety;
53 
54  private:
55  static unsigned s_physpgsz; ///< system physical page size
56  static unsigned s_pagesize; ///< logical page size (run-time determined)
57  /// mmap variety that works on this system
59 
60  /// convenience typedef
61  typedef BidirMMapPipeException Exception;
62 
63  void* m_begin; ///< pointer to start of mmapped area
64  void* m_end; ///< pointer one behind end of mmapped area
65  // FIXME: cannot keep freelist inline - other end may need that
66  // data, and we'd end up overwriting the page header
67  std::list<void*> m_freelist; ///< free pages list
68  PagePool* m_parent; ///< parent page pool
69  unsigned m_nPgPerGrp; ///< number of pages per group
70  unsigned m_nUsedGrp; ///< number of used page groups
71 
72  /// determine page size at run time
73  static unsigned getPageSize();
74 
75  /// mmap pages, len is length of mmapped area in bytes
76  static void* dommap(unsigned len);
77  /// munmap pages p, len is length of mmapped area in bytes
78  static void domunmap(void* p, unsigned len);
79  /// forbid copying
80  PageChunk(const PageChunk&) {}
81  /// forbid assignment
82  PageChunk& operator=(const PageChunk&) { return *this; }
83  public:
84  /// return the logical page size
85  static unsigned pagesize() { return s_pagesize; }
86  /// return the physical page size of the system
87  static unsigned physPgSz() { return s_physpgsz; }
88  /// return mmap variety support found
89  static MMapVariety mmapVariety() { return s_mmapworks; }
90 
91  /// constructor
92  PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
93 
94  /// destructor
95  ~PageChunk();
96 
97  /// return if p is contained in this PageChunk
98  bool contains(const Pages& p) const;
99 
100  /// pop a group of pages off the free list
101  Pages pop();
102 
103  /// push a group of pages onto the free list
104  void push(const Pages& p);
105 
106  /// return length of chunk
107  unsigned len() const
108  {
109  return reinterpret_cast<unsigned char*>(m_end) -
110  reinterpret_cast<unsigned char*>(m_begin);
111  }
112  /// return number of pages per page group
113  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
114 
115  /// return true if no used page groups in this chunk
116  bool empty() const { return !m_nUsedGrp; }
117 
118  /// return true if no free page groups in this chunk
119  bool full() const { return m_freelist.empty(); }
120 
121  /// free all pages except for those pointed to by p
122  void zap(Pages& p);
123  };
124 
125  /** @brief handle class for a number of Pages
126  *
127  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
128  * @date 2013-07-24
129  *
130  * the associated pages are continuous in memory
131  */
132  class Pages {
133  private:
134  /// implementation
135  typedef struct {
136  PageChunk *m_parent; ///< pointer to parent pool
137  Page* m_pages; ///< pointer to first page
138  unsigned m_refcnt; ///< reference counter
139  unsigned char m_npages; ///< length in pages
140  } impl;
141  public:
142  /// default constructor
143  Pages() : m_pimpl(0) { }
144 
145  /// destructor
146  ~Pages();
147 
148  /** @brief copy constructor
149  *
150  * copy Pages handle to new object - old object loses ownership,
151  * and becomes a dangling handle
152  */
153  Pages(const Pages& other);
154 
155  /** @brief assignment operator
156  *
157  * assign Pages handle to new object - old object loses ownership,
158  * and becomes a dangling handle
159  */
160  Pages& operator=(const Pages& other);
161 
162  /// return page size
163  static unsigned pagesize();
164 
165  /// return number of pages accessible
166  unsigned npages() const { return m_pimpl->m_npages; }
167 
168  /// return page number pageno
169  Page* page(unsigned pgno) const;
170 
171  /// return page number pageno
172  Page* operator[](unsigned pgno) const { return page(pgno); }
173 
174  /// perform page to page number mapping
175  unsigned pageno(Page* p) const;
176 
177  /// perform page to page number mapping
178  unsigned operator[](Page* p) const { return pageno(p); }
179 
180  /// swap with other's contents
181  void swap(Pages& other)
182  {
183  impl* tmp = other.m_pimpl;
184  other.m_pimpl = m_pimpl;
185  m_pimpl = tmp;
186  }
187 
188  private:
189  /// page pool is our friend - it's allowed to construct Pages
191 
192  /// pointer to implementation
194 
195  /// constructor
196  Pages(PageChunk* parent, Page* pages, unsigned npg);
197  };
198 }
199 
200 /** @brief BidirMMapPipe creates a bidirectional channel between the current
201  * process and a child it forks.
202  *
203  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
204  * @date 2013-07-07
205  *
206  * This class creates a bidirectional channel between this process and a child
207  * it creates with fork().
208  *
209  * The channel is comrised of a small shared pool of buffer memory mmapped into
210  * both process spaces, and two pipes to synchronise the exchange of data. The
211  * idea behind using the pipes at all is to have some primitive which we can
212  * block on without having to worry about atomic operations or polling, leaving
213  * these tasks to the OS. In case the anonymous mmap cannot be performed on the
214  * OS the code is running on (for whatever reason), the code falls back to
215  * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
216  * dynamically allocated buffer which is then transmitted through the pipe(s),
217  * a slightly slower alternative (because the data is copied more often).
218  *
219  * The channel supports five major operations: read(), write(), flush(),
220  * purge() and close(). Reading and writing may block until the required buffer
221  * space is available. Writes may queue up data to be sent to the other end
222  * until either enough pages are full, or the user calls flush which forces
223  * any unsent buffers to be sent to the other end. flush forces any data that
224  * is to be sent to be sent. purge discards any buffered data waiting to be
225  * read and/or sent. Closing the channel on the child returns zero, closing it
226  * on the parent returns the child's exit status.
227  *
228  * The class also provides operator<< and operator>> for C++-style I/O for
229  * basic data types (bool, char, short, int, long, long long, float, double
230  * and their unsigned counterparts). Data is transmitted binary (i.e. no
231  * formatting to strings like std::cout does). There are also overloads to
232  * support C-style zero terminated strings and std::string. In terms of
233  * performance, the former is to be preferred.
234  *
235  * If the caller needs to multiplex input and output to/from several pipes, the
236  * class provides the poll() method which allows to block until an event occurs
237  * on any of the polled pipes.
238  *
239  * After the BidirMMapPipe is closed, no further operations may be performed on
240  * that object, save for the destructor which may still be called.
241  *
242  * If the BidirMMapPipe has not properly been closed, the destructor will call
243  * close. However, the exit code of the child is lost in that case.
244  *
245  * Closing the object causes the mmapped memory to be unmapped and the two
246  * pipes to be closed. We also install an atexit handler in the process of
247  * creating BidirMMapPipes. This ensures that when the current process
248  * terminates, a SIGTERM signal is sent to the child processes created for all
249  * unclosed pipes to avoid leaving zombie processes in the OS's process table.
250  *
251  * BidirMMapPipe creation, closing and destruction are thread safe. If the
252  * BidirMMapPipe is used in more than one thread, the other operations have to
253  * be protected with a mutex (or something similar), though.
254  *
255  * End of file (other end closed its pipe, or died) is indicated with the eof()
256  * method, serious I/O errors set a flags (bad(), fail()), and also throw
257  * exceptions. For normal read/write operations, they can be suppressed (i.e.
258  * error reporting only using flags) with a constructor argument.
259  *
260  * Technicalities:
261  * - there is a pool of mmapped pages, half the pages are allocated to the
262  * parent process, half to the child
263  * - when one side has accumulated enough data (or a flush forces dirty pages
264  * out to the other end), it sends these pages to the other end by writing a
265  * byte containing the page number into the pipe
266  * - the other end (which has the pages mmapped, too) reads the page number(s)
267  * and puts the corresponding pages on its busy list
268  * - as the other ends reads, it frees busy pages, and eventually tries to put
269  * them on the its list; if a page belongs to the other end of the
270  * connection, it is sent back
271  * - lists of pages are sent across the pipe, not individual pages, in order
272  * to minimise the number of read/write operations needed
273  * - when mmap works properly, only one bytes containing the page number of
274  * the page list head is sent back and forth; the contents of that page
275  * allow to access the rest of the page list sent, and page headers on the
276  * list tell the receiving end if the page is free or has to be added to the
277  * busy list
278  * - when mmap does not work, we transfer one byte to indicate the head of the
279  * page list sent, and for each page on the list of sent pages, the page
280  * header and the page payload is sent (if the page is free, we only
281  * transmit the page header, and we never transmit more payload than
282  * the page actually contains)
283  * - in the child, all open BidirMMapPipes but the current one are closed. this
284  * is done for two reasons: first, to conserve file descriptors and address
285  * space. second, if more than one process is meant to use such a
286  * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
287  * are hard to find and understand. it's much better to come up with a design
288  * which does not need pipes to be shared among more than two processes.
289  *
290  * Here is a trivial example of a parent and a child talking to each other over
291  * a BidirMMapPipe:
292  * @code
293  * #include <string>
294  * #include <iostream>
295  * #include <cstdlib>
296  *
297  * #include "BidirMMapPipe.h"
298  *
299  * int simplechild(BidirMMapPipe& pipe)
300  * {
301  * // child does an echo loop
302  * while (pipe.good() && !pipe.eof()) {
303  * // read a string
304  * std::string str;
305  * pipe >> str;
306  * if (!pipe) return -1;
307  * if (pipe.eof()) break;
308  * // check if parent wants us to shut down
309  * if (!str.empty()) {
310  * std::cout << "[CHILD] : read: " << str << std::endl;
311  * str = "... early in the morning?";
312  * }
313  * pipe << str << BidirMMapPipe::flush;
314  * if (str.empty()) break;
315  * if (!pipe) return -1;
316  * std::cout << "[CHILD] : wrote: " << str << std::endl;
317  * }
318  * // send shutdown request acknowledged
319  * pipe << "" << BidirMMapPipe::flush;
320  *
321  * pipe.close();
322  * return 0;
323  * }
324  *
325  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
326  * {
327  * BidirMMapPipe *p = new BidirMMapPipe();
328  * if (p->isChild()) {
329  * int retVal = childexec(*p);
330  * delete p;
331  * std::exit(retVal);
332  * }
333  * return p;
334  * }
335  *
336  * int main()
337  * {
338  * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
339  * std::endl;
340  * BidirMMapPipe* pipe = spawnChild(simplechild);
341  * for (int i = 0; i < 5; ++i) {
342  * std::string str("What shall we do with a drunken sailor...");
343  * *pipe << str << BidirMMapPipe::flush;
344  * if (!*pipe) return -1;
345  * std::cout << "[PARENT]: wrote: " << str << std::endl;
346  * *pipe >> str;
347  * if (!*pipe) return -1;
348  * std::cout << "[PARENT]: read: " << str << std::endl;
349  * }
350  * // ask child to shut down
351  * pipe << "" << BidirMMapPipe::flush;
352  * // wait for it to see the shutdown request
353  * std::string s;
354  * pipe >> s;
355  * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
356  * std::endl;
357  * delete pipe;
358  * return 0;
359  * }
360  * @endcode
361  *
362  * When designing your own protocols to use over the pipe, there are a few
363  * things to bear in mind:
364  * - Do as http does: When building a request, send all the options and
365  * properties of that request with the request itself in a single go (one
366  * flush). Then, the server has everything it needs, and hopefully, it'll
367  * shut up for a while and to let the client do something useful in the
368  * meantime... The same goes when the server replies to the request: include
369  * everything there is to know about the result of the request in the reply.
370  * - The expensive operation should be the request that is made, all other
371  * operations should somehow be formulated as options or properties to that
372  * request.
373  * - Include a shutdown handshake in whatever protocol you send over the
374  * pipe. That way, you can shut things down in a controlled way. Otherwise,
375  * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
376  * one end closes its pipe while the other is still trying to read.
377  */
379 #ifndef _WIN32
380  public:
381  /// type used to represent sizes
382  typedef std::size_t size_type;
383  /// convenience typedef for BidirMMapPipeException
384  typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
385  /// flag bits for partial C++ iostream compatibility
386  enum {
387  eofbit = 1, ///< end of file reached
388  failbit = 2, ///< logical failure (e.g. pipe closed)
389  rderrbit = 4, ///< read error
390  wrerrbit = 8, ///< write error
391  badbit = rderrbit | wrerrbit, ///< general I/O error
392  exceptionsbit = 16 ///< error reporting with exceptions
393  };
394 
395  /** @brief constructor (forks!)
396  *
397  * Creates a bidirectional communications channel between this process
398  * and a child the constructor forks. On return from the constructor,
399  * isParent() and isChild() can be used to tell the parent end from the
400  * child end of the pipe. In the child, all other open BidirMMapPipes
401  * are closed.
402  *
403  * @param useExceptions read()/write() error reporting also done using
404  * exceptions
405  * @param useSocketpair use a socketpair instead of a pair or pipes
406  *
407  * Normally, exceptions are thrown for all serious I/O errors (apart
408  * from end of file). Setting useExceptions to false will force the
409  * read() and write() methods to only report serious I/O errors using
410  * flags.
411  *
412  * When useSocketpair is true, use a pair of Unix domain sockets
413  * created using socketpair instead a pair of pipes. The advantage is
414  * that only one pair of file descriptors is needed instead of two
415  * pairs which are needed for the pipe pair. Performance should very
416  * similar on most platforms, especially if mmap works, since only
417  * very little data is sent through the pipe(s)/socketpair.
418  */
419  BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
420 
421  /** @brief destructor
422  *
423  * closes this end of pipe
424  */
425  ~BidirMMapPipe();
426 
427  /** @brief return the current setting of the debug flag
428  *
429  * @returns an integer with the debug Setting
430  */
431  static int debugflag() { return s_debugflag; }
432 
433  /** @brief set the debug flags
434  *
435  * @param flag debug flags (if zero, no messages are printed)
436  */
437  static void setDebugflag(int flag) { s_debugflag = flag; }
438 
439  /** @brief read from pipe
440  *
441  * @param addr where to put read data
442  * @param sz size of data to read (in bytes)
443  * @returns size of data read, or 0 in case of end-of-file
444  *
445  * read may block until data from other end is available. It will
446  * return 0 if the other end closed the pipe.
447  */
448  size_type read(void* addr, size_type sz);
449 
450  /** @brief wirte to pipe
451  *
452  * @param addr where to get data to write from
453  * @param sz size of data to write (in bytes)
454  * @returns size of data written, or 0 in case of end-of-file
455  *
456  * write may block until data can be written to other end (depends a
457  * bit on available buffer space). It will return 0 if the other end
458  * closed the pipe. The data is queued to be written on the next
459  * convenient occasion, or it can be forced out with flush().
460  */
461  size_type write(const void* addr, size_type sz);
462 
463  /** @brief flush buffers with unwritten data
464  *
465  * This forces unwritten data to be written to the other end. The call
466  * will block until this has been done (or the attempt failed with an
467  * error).
468  */
469  void flush();
470 
471  /** @brief purge buffered data waiting to be read and/or written
472  *
473  * Discards all internal buffers.
474  */
475  void purge();
476 
477  /** @brief number of bytes that can be read without blocking
478  *
479  * @returns number of bytes that can be read without blocking
480  */
482 
483  /** @brief number of bytes that can be written without blocking
484  *
485  * @returns number of bytes that can be written without blocking
486  */
488 
489  /** @brief flush buffers, close pipe
490  *
491  * Flush buffers, discard unread data, closes the pipe. If the pipe is
492  * in the parent process, it waits for the child.
493  *
494  * @returns exit code of child process in parent, zero in child
495  */
496  int close();
497 
498  /** @brief return PID of the process on the other end of the pipe
499  *
500  * @returns PID of the process running on the remote end
501  */
502  pid_t pidOtherEnd() const
503  { return isChild() ? m_parentPid : m_childPid; }
504 
505  /// condition flags for poll
506  enum PollFlags {
507  None = 0, ///< nothing special on this pipe
508  Readable = 1, ///< pipe has data for reading
509  Writable = 2, ///< pipe can be written to
510  ReadError = 4, ///< pipe error read end
511  WriteError = 8, ///< pipe error Write end
512  Error = ReadError | WriteError, ///< pipe error
513  ReadEndOfFile = 32, ///< read pipe in end-of-file state
514  WriteEndOfFile = 64,///< write pipe in end-of-file state
515  EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
516  ReadInvalid = 64, ///< read end of pipe invalid
517  WriteInvalid = 128, ///< write end of pipe invalid
518  Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
519  };
520 
521  /// for poll() interface
522  class PollEntry {
523  public:
524  BidirMMapPipe* pipe; ///< pipe of interest
525  unsigned events; ///< events of interest (or'ed bitmask)
526  unsigned revents; ///< events that happened (or'ed bitmask)
527  /// poll a pipe for all events
529  pipe(_pipe), events(None), revents(None) { }
530  /// poll a pipe for specified events
531  PollEntry(BidirMMapPipe* _pipe, int _events) :
532  pipe(_pipe), events(_events), revents(None) { }
533  };
534  /// convenience typedef for poll() interface
535  typedef std::vector<PollEntry> PollVector;
536 
537  /** @brief poll a set of pipes for events (ready to read from, ready to
538  * write to, error)
539  *
540  * @param pipes set of pipes to check
541  * @param timeout timeout in milliseconds
542  * @returns positive number: number of pipes which have
543  * status changes, 0: timeout, or no pipes with
544  * status changed, -1 on error
545  *
546  * Timeout can be zero (check for specified events, and return), finite
547  * (wait at most timeout milliseconds before returning), or -1
548  * (infinite). The poll method returns when the timeout has elapsed,
549  * or if an event occurs on one of the pipes being polled, whichever
550  * happens earlier.
551  *
552  * Pipes is a vector of one or more PollEntries, which each list a pipe
553  * and events to poll for. If events is left empty (zero), all
554  * conditions are polled for, otherwise only the indicated ones. On
555  * return, the revents fields contain the events that occurred for each
556  * pipe; error Error, EndOfFile or Invalid events are always set,
557  * regardless of wether they were in the set of requested events.
558  *
559  * poll may block slightly longer than specified by timeout due to OS
560  * timer granularity and OS scheduling. Due to its implementation, the
561  * poll call can also return early if the remote end of the page sends
562  * a free page while polling (which is put on that pipe's freelist),
563  * while that pipe is polled for e.g Reading. The status of the pipe is
564  * indicated correctly in revents, and the caller can simply poll
565  * again. (The reason this is done this way is because it helps to
566  * replenish the pool of free pages and queue busy pages without
567  * blocking.)
568  *
569  * Here's a piece of example code waiting on two pipes; if they become
570  * readable they are read:
571  * @code
572  * #include <unistd.h>
573  * #include <cstdlib>
574  * #include <string>
575  * #include <sstream>
576  * #include <iostream>
577  *
578  * #include "BidirMMapPipe.h"
579  *
580  * // what to execute in the child
581  * int randomchild(BidirMMapPipe& pipe)
582  * {
583  * ::srand48(::getpid());
584  * for (int i = 0; i < 5; ++i) {
585  * // sleep a random time between 0 and .9 seconds
586  * ::usleep(int(1e6 * ::drand48()));
587  * std::ostringstream buf;
588  * buf << "child pid " << ::getpid() << " sends message " << i;
589  * std::cout << "[CHILD] : " << buf.str() << std::endl;
590  * pipe << buf.str() << BidirMMapPipe::flush;
591  * if (!pipe) return -1;
592  * if (pipe.eof()) break;
593  * }
594  * // tell parent we're done
595  * pipe << "" << BidirMMapPipe::flush;
596  * // wait for parent to acknowledge
597  * std::string s;
598  * pipe >> s;
599  * pipe.close();
600  * return 0;
601  * }
602  *
603  * // function to spawn a child
604  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
605  * {
606  * BidirMMapPipe *p = new BidirMMapPipe();
607  * if (p->isChild()) {
608  * int retVal = childexec(*p);
609  * delete p;
610  * std::exit(retVal);
611  * }
612  * return p;
613  * }
614  *
615  * int main()
616  * {
617  * typedef BidirMMapPipe::PollEntry PollEntry;
618  * // poll data structure
619  * BidirMMapPipe::PollVector pipes;
620  * pipes.reserve(3);
621  * // spawn children
622  * for (int i = 0; i < 3; ++i) {
623  * pipes.push_back(PollEntry(spawnChild(randomchild),
624  * BidirMMapPipe::Readable));
625  * }
626  * // while at least some children alive
627  * while (!pipes.empty()) {
628  * // poll, wait until status change (infinite timeout)
629  * int npipes = BidirMMapPipe::poll(pipes, -1);
630  * // scan for pipes with changed status
631  * for (std::vector<PollEntry>::iterator it = pipes.begin();
632  * npipes && pipes.end() != it; ) {
633  * if (!it->revents) {
634  * // unchanged, next one
635  * ++it;
636  * continue;
637  * }
638  * --npipes; // maybe we can stop early...
639  * // read from pipes which are readable
640  * if (it->revents & BidirMMapPipe::Readable) {
641  * std::string s;
642  * *(it->pipe) >> s;
643  * if (!s.empty()) {
644  * std::cout << "[PARENT]: Read from pipe " <<
645  * it->pipe << ": " << s << std::endl;
646  * ++it;
647  * continue;
648  * } else {
649  * // child is shutting down...
650  * *(it->pipe) << "" << BidirMMapPipe::flush;
651  * goto childcloses;
652  * }
653  * }
654  * // retire pipes with error or end-of-file condition
655  * if (it->revents & (BidirMMapPipe::Error |
656  * BidirMMapPipe::EndOfFile |
657  * BidirMMapPipe::Invalid)) {
658  * std::cout << "[PARENT]: Error on pipe " <<
659  * it->pipe << " revents " << it->revents <<
660  * std::endl;
661  * childcloses:
662  * std::cout << "[PARENT]:\tchild exit status: " <<
663  * it->pipe->close() << std::endl;
664  * if (retVal) return retVal;
665  * delete it->pipe;
666  * it = pipes.erase(it);
667  * continue;
668  * }
669  * }
670  * }
671  * return 0;
672  * }
673  * @endcode
674  */
675  static int poll(PollVector& pipes, int timeout);
676 
677  /** @brief return if this end of the pipe is the parent end
678  *
679  * @returns true if parent end of pipe
680  */
681  bool isParent() const { return m_childPid; }
682 
683  /** @brief return if this end of the pipe is the child end
684  *
685  * @returns true if child end of pipe
686  */
687  bool isChild() const { return !m_childPid; }
688 
689  /** @brief if BidirMMapPipe uses a socketpair for communications
690  *
691  * @returns true if BidirMMapPipe uses a socketpair for communications
692  */
693  bool usesSocketpair() const { return m_inpipe == m_outpipe; }
694 
695  /** @brief if BidirMMapPipe uses a pipe pair for communications
696  *
697  * @returns true if BidirMMapPipe uses a pipe pair for communications
698  */
699  bool usesPipepair() const { return m_inpipe != m_outpipe; }
700 
701  /** @brief return flags (end of file, BidirMMapPipe closed, ...)
702  *
703  * @returns flags (end of file, BidirMMapPipe closed, ...)
704  */
705  int rdstate() const { return m_flags; }
706 
707  /** @brief true if end-of-file
708  *
709  * @returns true if end-of-file
710  */
711  bool eof() const { return m_flags & eofbit; }
712 
713  /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
714  *
715  * @returns true in case of grave logical error (I/O on closed pipe,...)
716  */
717  bool fail() const { return m_flags & failbit; }
718 
719  /** @brief true on I/O error
720  *
721  * @returns true on I/O error
722  */
723  bool bad() const { return m_flags & badbit; }
724 
725  /** @brief status of stream is good
726  *
727  * @returns true if pipe is good (no errors, eof, ...)
728  */
729  bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
730 
731  /** @brief true if closed
732  *
733  * @returns true if stream is closed
734  */
735  bool closed() const { return m_flags & failbit; }
736 
737  /** @brief return true if not serious error (fail/bad)
738  *
739  * @returns true if stream is does not have serious error (fail/bad)
740  *
741  * (if EOF, this is still true)
742  */
743  operator bool() const { return !fail() && !bad(); }
744 
745  /** @brief return true if serious error (fail/bad)
746  *
747  * @returns true if stream has a serious error (fail/bad)
748  */
749  bool operator!() const { return fail() || bad(); }
750 
751 #ifdef STREAMOP
752 #undef STREAMOP
753 #endif
754 #define STREAMOP(TYPE) \
755  BidirMMapPipe& operator<<(const TYPE& val) \
756  { write(&val, sizeof(TYPE)); return *this; } \
757  BidirMMapPipe& operator>>(TYPE& val) \
758  { read(&val, sizeof(TYPE)); return *this; }
759  STREAMOP(bool); ///< C++ style stream operators for bool
760  STREAMOP(char); ///< C++ style stream operators for char
761  STREAMOP(short); ///< C++ style stream operators for short
762  STREAMOP(int); ///< C++ style stream operators for int
763  STREAMOP(long); ///< C++ style stream operators for long
764  STREAMOP(long long); ///< C++ style stream operators for long long
765  STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
766  STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
767  STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
768  STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
769  STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
770  STREAMOP(float); ///< C++ style stream operators for float
771  STREAMOP(double); ///< C++ style stream operators for double
772 #undef STREAMOP
773 
774  /** @brief write a C-style string
775  *
776  * @param str C-style string
777  * @returns pipe written to
778  */
779  BidirMMapPipe& operator<<(const char* str);
780 
781  /** @brief read a C-style string
782  *
783  * @param str pointer to string (space allocated with malloc!)
784  * @returns pipe read from
785  *
786  * since this is for C-style strings, we use malloc/realloc/free for
787  * strings. passing in a NULL pointer is valid here, and the routine
788  * will use realloc to allocate a chunk of memory of the right size.
789  */
790  BidirMMapPipe& operator>>(char* (&str));
791 
792  /** @brief write a std::string object
793  *
794  * @param str string to write
795  * @returns pipe written to
796  */
797  BidirMMapPipe& operator<<(const std::string& str);
798 
799  /** @brief read a std::string object
800  *
801  * @param str string to be read
802  * @returns pipe read from
803  */
804  BidirMMapPipe& operator>>(std::string& str);
805 
806  /** @brief write raw pointer to T to other side
807  *
808  * NOTE: This will not write the pointee! Only the value of the
809  * pointer is transferred.
810  *
811  * @param tptr pointer to be written
812  * @returns pipe written to
813  */
814  template<class T> BidirMMapPipe& operator<<(const T* tptr)
815  { write(&tptr, sizeof(tptr)); return *this; }
816 
817  /** @brief read raw pointer to T from other side
818  *
819  * NOTE: This will not read the pointee! Only the value of the
820  * pointer is transferred.
821  *
822  * @param tptr pointer to be read
823  * @returns pipe read from
824  */
825  template<class T> BidirMMapPipe& operator>>(T* &tptr)
826  { read(&tptr, sizeof(tptr)); return *this; }
827 
828  /** @brief I/O manipulator support
829  *
830  * @param manip manipulator
831  * @returns pipe with manipulator applied
832  *
833  * example:
834  * @code
835  * pipe << BidirMMapPipe::flush;
836  * @endcode
837  */
839  { return manip(*this); }
840 
841  /** @brief I/O manipulator support
842  *
843  * @param manip manipulator
844  * @returns pipe with manipulator applied
845  *
846  * example:
847  * @code
848  * pipe >> BidirMMapPipe::purge;
849  * @endcode
850  */
852  { return manip(*this); }
853 
854  /// for usage a la "pipe << flush;"
855  static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
856  /// for usage a la "pipe << purge;"
857  static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
858 
859  private:
860  /// copy-construction forbidden
862  /// assignment forbidden
863  BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
864 
865  /// page is our friend
867  /// convenience typedef for Page
869 
870  /// tuning constants
871  enum {
872  // TotPages = 16 will give 32k buffers at 4k page size for both
873  // parent and child; if your average message to send is larger
874  // than this, consider raising the value (max 256)
875  TotPages = 16, ///< pages shared (child + parent)
876 
877  PagesPerEnd = TotPages / 2, ///< pages per pipe end
878 
879  // if FlushThresh pages are filled, the code forces a flush; 3/4
880  // of the pages available seems to work quite well
881  FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
882  };
883 
884  // per-class members
885  static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
886  /// list of open BidirMMapPipes
887  static std::list<BidirMMapPipe*> s_openpipes;
888  /// pool of mmapped pages
889  static BidirMMapPipe_impl::PagePool* s_pagepool;
890  /// page pool reference counter
891  static unsigned s_pagepoolrefcnt;
892  /// debug flag
893  static int s_debugflag;
894 
895  /// return page pool
896  static BidirMMapPipe_impl::PagePool& pagepool();
897 
898  // per-instance members
899  BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
900  Page* m_busylist; ///< linked list: busy pages (data to be read)
901  Page* m_freelist; ///< linked list: free pages
902  Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
903  int m_inpipe; ///< pipe end from which data may be read
904  int m_outpipe; ///< pipe end to which data may be written
905  int m_flags; ///< flags (e.g. end of file)
906  pid_t m_childPid; ///< pid of the child (zero if we're child)
907  pid_t m_parentPid; ///< pid of the parent
908 
909  /// cleanup routine - at exit, we want our children to get a SIGTERM...
910  static void teardownall(void);
911 
912  /// return length of a page list
913  static unsigned lenPageList(const Page* list);
914 
915  /** "feed" the busy and free lists with a list of pages
916  *
917  * @param plist linked list of pages
918  *
919  * goes through plist, puts free pages from plist onto the freelist
920  * (or sends them to the remote end if they belong there), and puts
921  * non-empty pages on plist onto the busy list
922  */
923  void feedPageLists(Page* plist);
924 
925  /// put on dirty pages list
926  void markPageDirty(Page* p);
927 
928  /// transfer bytes through the pipe (reading, writing, may block)
929  static size_type xferraw(int fd, void* addr, size_type len,
930  ssize_t (*xferfn)(int, void*, std::size_t));
931  /// transfer bytes through the pipe (reading, writing, may block)
932  static size_type xferraw(int fd, void* addr, const size_type len,
933  ssize_t (*xferfn)(int, const void*, std::size_t))
934  {
935  return xferraw(fd, addr, len,
936  reinterpret_cast<ssize_t (*)(
937  int, void*, std::size_t)>(xferfn));
938  }
939 
940  /** @brief send page(s) to the other end (may block)
941  *
942  * @param plist linked list of pages to send
943  *
944  * the implementation gathers the different write(s) whereever
945  * possible; if mmap works, this results in a single write to transfer
946  * the list of pages sent, if we need to copy things through the pipe,
947  * we have one write to transfer which pages are sent, and then one
948  * write per page.
949  */
950  void sendpages(Page* plist);
951 
952  /** @brief receive a pages from the other end (may block), queue them
953  *
954  * @returns number of pages received
955  *
956  * this is an application-level scatter read, which gets the list of
957  * pages to read from the pipe. if mmap works, it needs only one read
958  * call (to get the head of the list of pages transferred). if we need
959  * to copy pages through the pipe, we need to add one read for each
960  * empty page, and two reads for each non-empty page.
961  */
962  unsigned recvpages();
963 
964  /** @brief receive pages from other end (non-blocking)
965  *
966  * @returns number of pages received
967  *
968  * like recvpages(), but does not block if nothing is available for
969  * reading
970  */
971  unsigned recvpages_nonblock();
972 
973  /// get a busy page to read data from (may block)
974  Page* busypage();
975  /// get a dirty page to write data to (may block)
976  Page* dirtypage();
977 
978  /// close the pipe (no flush if forced)
979  int doClose(bool force, bool holdlock = false);
980  /// perform the flush
981  void doFlush(bool forcePartialPages = true);
982 #endif //_WIN32
983 };
984 
986 
987 #undef BEGIN_NAMESPACE_ROOFIT
988 #undef END_NAMESPACE_ROOFIT
989 
990 #endif // BIDIRMMAPPIPE_H
991 
992 // vim: ft=cpp:sw=4:tw=78:et
std::size_t size_type
type used to represent sizes
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:18
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
pages shared (child + parent)
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
end of file reached
unsigned revents
events that happened (or&#39;ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
bool empty() const
return true if no used page groups in this chunk
read pipe in end-of-file state
handle class for a number of Pages
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
impl * m_pimpl
pointer to implementation
double T(double x)
Definition: ChebyshevPol.h:34
unsigned m_refcnt
reference counter
bool bad() const
true on I/O error
Page * m_freelist
linked list: free pages
write pipe in end-of-file state
don&#39;t know yet what&#39;ll work
Definition: BidirMMapPipe.h:47
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or&#39;ed bitmask)
bool isChild() const
return if this end of the pipe is the child end
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
bool fail() const
logical failure (e.g.
pid_t m_childPid
pid of the child (zero if we&#39;re child)
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
~BidirMMapPipe()
destructor
Page * page(unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:43
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
size_type read(void *addr, size_type sz)
read from pipe
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:89
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
PollFlags
condition flags for poll
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
void sendpages(Page *plist)
send page(s) to the other end (may block)
pipe can be written to
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:87
Page * busypage()
get a busy page to read data from (may block)
unsigned nPagesPerGroup() const
return number of pages per page group
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:55
void swap(Pages &other)
swap with other&#39;s contents
Page * dirtypage()
get a dirty page to write data to (may block)
general I/O error
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
error reporting with exceptions
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
bool eof() const
true if end-of-file
static unsigned lenPageList(const Page *list)
return length of a page list
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
Pages pop()
pop a group of pages off the free list
static void setDebugflag(int flag)
set the debug flags
pages per pipe end
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool isParent() const
return if this end of the pipe is the parent end
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
friend class BidirMMapPipe_impl::Page
page is our friend
logical failure (e.g. pipe closed)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
Page * operator[](unsigned pgno) const
return page number pageno
unsigned pageno(Page *p) const
perform page to page number mapping
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:46
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:82
void zap(Pages &p)
free all pages except for those pointed to by p
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
bool contains(const Pages &p) const
return if p is contained in this PageChunk
STREAMOP(bool)
C++ style stream operators for bool.
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
unsigned len() const
return length of chunk
bool operator!() const
return true if serious error (fail/bad)
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
unsigned npages() const
return number of pages accessible
unsigned operator[](Page *p) const
perform page to page number mapping
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:56
bool full() const
return true if no free page groups in this chunk
pipe has data for reading
bool good() const
status of stream is good
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
nothing special on this pipe
bool closed() const
true if closed
int m_flags
flags (e.g. end of file)
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
mmap doesn&#39;t work, have to copy back and forth
Definition: BidirMMapPipe.h:48
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:85