Logo ROOT  
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1/** @file BidirMMapPipe.h
2 *
3 * header file for BidirMMapPipe, a class which forks off a child process and
4 * serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9
10#ifndef BIDIRMMAPPIPE_H
11#define BIDIRMMAPPIPE_H
12
13#include <list>
14#include <vector>
15#include <cstring>
16#include <unistd.h>
17
18#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
19#define END_NAMESPACE_ROOFIT }
20
22
23/// namespace for implementation details of BidirMMapPipe
24namespace BidirMMapPipe_impl {
25 // forward declarations
26 class BidirMMapPipeException;
27 class Page;
28 class PagePool;
29 class Pages;
30
31 /** @brief class representing a chunk of pages
32 *
33 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
34 * @date 2013-07-24
35 *
36 * allocating pages from the OS happens in chunks in order to not exhaust
37 * the maximum allowed number of memory mappings per process; this class
38 * takes care of such a chunk
39 *
40 * a page chunk allows callers to obtain or release pages in groups of
41 * continuous pages of fixed size
42 */
43 class PageChunk {
44 public:
45 /// type of mmap support found
46 typedef enum {
47 Unknown, ///< don't know yet what'll work
48 Copy, ///< mmap doesn't work, have to copy back and forth
49 FileBacked, ///< mmapping a temp file works
50 DevZero, ///< mmapping /dev/zero works
51 Anonymous ///< anonymous mmap works
53
54 private:
55 static unsigned s_physpgsz; ///< system physical page size
56 static unsigned s_pagesize; ///< logical page size (run-time determined)
57 /// mmap variety that works on this system
59
60 /// convenience typedef
62
63 void* m_begin; ///< pointer to start of mmapped area
64 void* m_end; ///< pointer one behind end of mmapped area
65 // FIXME: cannot keep freelist inline - other end may need that
66 // data, and we'd end up overwriting the page header
67 std::list<void*> m_freelist; ///< free pages list
68 PagePool* m_parent; ///< parent page pool
69 unsigned m_nPgPerGrp; ///< number of pages per group
70 unsigned m_nUsedGrp; ///< number of used page groups
71
72 /// determine page size at run time
73 static unsigned getPageSize();
74
75 /// mmap pages, len is length of mmapped area in bytes
76 static void* dommap(unsigned len);
77 /// munmap pages p, len is length of mmapped area in bytes
78 static void domunmap(void* p, unsigned len);
79 /// forbid copying
81 /// forbid assignment
82 PageChunk& operator=(const PageChunk&) { return *this; }
83 public:
84 /// return the logical page size
85 static unsigned pagesize() { return s_pagesize; }
86 /// return the physical page size of the system
87 static unsigned physPgSz() { return s_physpgsz; }
88 /// return mmap variety support found
89 static MMapVariety mmapVariety() { return s_mmapworks; }
90
91 /// constructor
92 PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
93
94 /// destructor
95 ~PageChunk();
96
97 /// return if p is contained in this PageChunk
98 bool contains(const Pages& p) const;
99
100 /// pop a group of pages off the free list
101 Pages pop();
102
103 /// push a group of pages onto the free list
104 void push(const Pages& p);
105
106 /// return length of chunk
107 unsigned len() const
108 {
109 return reinterpret_cast<unsigned char*>(m_end) -
110 reinterpret_cast<unsigned char*>(m_begin);
111 }
112 /// return number of pages per page group
113 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
114
115 /// return true if no used page groups in this chunk
116 bool empty() const { return !m_nUsedGrp; }
117
118 /// return true if no free page groups in this chunk
119 bool full() const { return m_freelist.empty(); }
120
121 /// free all pages except for those pointed to by p
122 void zap(Pages& p);
123 };
124
125 /** @brief handle class for a number of Pages
126 *
127 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
128 * @date 2013-07-24
129 *
130 * the associated pages are continuous in memory
131 */
132 class Pages {
133 private:
134 /// implementation
135 typedef struct {
136 PageChunk *m_parent; ///< pointer to parent pool
137 Page* m_pages; ///< pointer to first page
138 unsigned m_refcnt; ///< reference counter
139 unsigned char m_npages; ///< length in pages
140 } impl;
141 public:
142 /// default constructor
143 Pages() : m_pimpl(0) { }
144
145 /// destructor
146 ~Pages();
147
148 /** @brief copy constructor
149 *
150 * copy Pages handle to new object - old object loses ownership,
151 * and becomes a dangling handle
152 */
153 Pages(const Pages& other);
154
155 /** @brief assignment operator
156 *
157 * assign Pages handle to new object - old object loses ownership,
158 * and becomes a dangling handle
159 */
160 Pages& operator=(const Pages& other);
161
162 /// return page size
163 static unsigned pagesize();
164
165 /// return number of pages accessible
166 unsigned npages() const { return m_pimpl->m_npages; }
167
168 /// return page number pageno
169 Page* page(unsigned pgno) const;
170
171 /// return page number pageno
172 Page* operator[](unsigned pgno) const { return page(pgno); }
173
174 /// perform page to page number mapping
175 unsigned pageno(Page* p) const;
176
177 /// perform page to page number mapping
178 unsigned operator[](Page* p) const { return pageno(p); }
179
180 /// swap with other's contents
181 void swap(Pages& other)
182 {
183 impl* tmp = other.m_pimpl;
184 other.m_pimpl = m_pimpl;
185 m_pimpl = tmp;
186 }
187
188 private:
189 /// page pool is our friend - it's allowed to construct Pages
191
192 /// pointer to implementation
194
195 /// constructor
196 Pages(PageChunk* parent, Page* pages, unsigned npg);
197 };
198}
199
200/** @brief BidirMMapPipe creates a bidirectional channel between the current
201 * process and a child it forks.
202 *
203 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
204 * @date 2013-07-07
205 *
206 * This class creates a bidirectional channel between this process and a child
207 * it creates with fork().
208 *
209 * The channel is comrised of a small shared pool of buffer memory mmapped into
210 * both process spaces, and two pipes to synchronise the exchange of data. The
211 * idea behind using the pipes at all is to have some primitive which we can
212 * block on without having to worry about atomic operations or polling, leaving
213 * these tasks to the OS. In case the anonymous mmap cannot be performed on the
214 * OS the code is running on (for whatever reason), the code falls back to
215 * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
216 * dynamically allocated buffer which is then transmitted through the pipe(s),
217 * a slightly slower alternative (because the data is copied more often).
218 *
219 * The channel supports five major operations: read(), write(), flush(),
220 * purge() and close(). Reading and writing may block until the required buffer
221 * space is available. Writes may queue up data to be sent to the other end
222 * until either enough pages are full, or the user calls flush which forces
223 * any unsent buffers to be sent to the other end. flush forces any data that
224 * is to be sent to be sent. purge discards any buffered data waiting to be
225 * read and/or sent. Closing the channel on the child returns zero, closing it
226 * on the parent returns the child's exit status.
227 *
228 * The class also provides operator<< and operator>> for C++-style I/O for
229 * basic data types (bool, char, short, int, long, long long, float, double
230 * and their unsigned counterparts). Data is transmitted binary (i.e. no
231 * formatting to strings like std::cout does). There are also overloads to
232 * support C-style zero terminated strings and std::string. In terms of
233 * performance, the former is to be preferred.
234 *
235 * If the caller needs to multiplex input and output to/from several pipes, the
236 * class provides the poll() method which allows to block until an event occurs
237 * on any of the polled pipes.
238 *
239 * After the BidirMMapPipe is closed, no further operations may be performed on
240 * that object, save for the destructor which may still be called.
241 *
242 * If the BidirMMapPipe has not properly been closed, the destructor will call
243 * close. However, the exit code of the child is lost in that case.
244 *
245 * Closing the object causes the mmapped memory to be unmapped and the two
246 * pipes to be closed. We also install an atexit handler in the process of
247 * creating BidirMMapPipes. This ensures that when the current process
248 * terminates, a SIGTERM signal is sent to the child processes created for all
249 * unclosed pipes to avoid leaving zombie processes in the OS's process table.
250 *
251 * BidirMMapPipe creation, closing and destruction are thread safe. If the
252 * BidirMMapPipe is used in more than one thread, the other operations have to
253 * be protected with a mutex (or something similar), though.
254 *
255 * End of file (other end closed its pipe, or died) is indicated with the eof()
256 * method, serious I/O errors set a flags (bad(), fail()), and also throw
257 * exceptions. For normal read/write operations, they can be suppressed (i.e.
258 * error reporting only using flags) with a constructor argument.
259 *
260 * Technicalities:
261 * - there is a pool of mmapped pages, half the pages are allocated to the
262 * parent process, half to the child
263 * - when one side has accumulated enough data (or a flush forces dirty pages
264 * out to the other end), it sends these pages to the other end by writing a
265 * byte containing the page number into the pipe
266 * - the other end (which has the pages mmapped, too) reads the page number(s)
267 * and puts the corresponding pages on its busy list
268 * - as the other ends reads, it frees busy pages, and eventually tries to put
269 * them on the its list; if a page belongs to the other end of the
270 * connection, it is sent back
271 * - lists of pages are sent across the pipe, not individual pages, in order
272 * to minimise the number of read/write operations needed
273 * - when mmap works properly, only one bytes containing the page number of
274 * the page list head is sent back and forth; the contents of that page
275 * allow to access the rest of the page list sent, and page headers on the
276 * list tell the receiving end if the page is free or has to be added to the
277 * busy list
278 * - when mmap does not work, we transfer one byte to indicate the head of the
279 * page list sent, and for each page on the list of sent pages, the page
280 * header and the page payload is sent (if the page is free, we only
281 * transmit the page header, and we never transmit more payload than
282 * the page actually contains)
283 * - in the child, all open BidirMMapPipes but the current one are closed. this
284 * is done for two reasons: first, to conserve file descriptors and address
285 * space. second, if more than one process is meant to use such a
286 * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
287 * are hard to find and understand. it's much better to come up with a design
288 * which does not need pipes to be shared among more than two processes.
289 *
290 * Here is a trivial example of a parent and a child talking to each other over
291 * a BidirMMapPipe:
292 * @code
293 * #include <string>
294 * #include <iostream>
295 * #include <cstdlib>
296 *
297 * #include "BidirMMapPipe.h"
298 *
299 * int simplechild(BidirMMapPipe& pipe)
300 * {
301 * // child does an echo loop
302 * while (pipe.good() && !pipe.eof()) {
303 * // read a string
304 * std::string str;
305 * pipe >> str;
306 * if (!pipe) return -1;
307 * if (pipe.eof()) break;
308 * // check if parent wants us to shut down
309 * if (!str.empty()) {
310 * std::cout << "[CHILD] : read: " << str << std::endl;
311 * str = "... early in the morning?";
312 * }
313 * pipe << str << BidirMMapPipe::flush;
314 * if (str.empty()) break;
315 * if (!pipe) return -1;
316 * std::cout << "[CHILD] : wrote: " << str << std::endl;
317 * }
318 * // send shutdown request acknowledged
319 * pipe << "" << BidirMMapPipe::flush;
320 *
321 * pipe.close();
322 * return 0;
323 * }
324 *
325 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
326 * {
327 * BidirMMapPipe *p = new BidirMMapPipe();
328 * if (p->isChild()) {
329 * int retVal = childexec(*p);
330 * delete p;
331 * std::exit(retVal);
332 * }
333 * return p;
334 * }
335 *
336 * int main()
337 * {
338 * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
339 * std::endl;
340 * BidirMMapPipe* pipe = spawnChild(simplechild);
341 * for (int i = 0; i < 5; ++i) {
342 * std::string str("What shall we do with a drunken sailor...");
343 * *pipe << str << BidirMMapPipe::flush;
344 * if (!*pipe) return -1;
345 * std::cout << "[PARENT]: wrote: " << str << std::endl;
346 * *pipe >> str;
347 * if (!*pipe) return -1;
348 * std::cout << "[PARENT]: read: " << str << std::endl;
349 * }
350 * // ask child to shut down
351 * pipe << "" << BidirMMapPipe::flush;
352 * // wait for it to see the shutdown request
353 * std::string s;
354 * pipe >> s;
355 * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
356 * std::endl;
357 * delete pipe;
358 * return 0;
359 * }
360 * @endcode
361 *
362 * When designing your own protocols to use over the pipe, there are a few
363 * things to bear in mind:
364 * - Do as http does: When building a request, send all the options and
365 * properties of that request with the request itself in a single go (one
366 * flush). Then, the server has everything it needs, and hopefully, it'll
367 * shut up for a while and to let the client do something useful in the
368 * meantime... The same goes when the server replies to the request: include
369 * everything there is to know about the result of the request in the reply.
370 * - The expensive operation should be the request that is made, all other
371 * operations should somehow be formulated as options or properties to that
372 * request.
373 * - Include a shutdown handshake in whatever protocol you send over the
374 * pipe. That way, you can shut things down in a controlled way. Otherwise,
375 * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
376 * one end closes its pipe while the other is still trying to read.
377 */
379#ifndef _WIN32
380 public:
381 /// type used to represent sizes
382 typedef std::size_t size_type;
383 /// convenience typedef for BidirMMapPipeException
385 /// flag bits for partial C++ iostream compatibility
386 enum {
387 eofbit = 1, ///< end of file reached
388 failbit = 2, ///< logical failure (e.g. pipe closed)
389 rderrbit = 4, ///< read error
390 wrerrbit = 8, ///< write error
391 badbit = rderrbit | wrerrbit, ///< general I/O error
392 exceptionsbit = 16 ///< error reporting with exceptions
393 };
394
395 /** @brief constructor (forks!)
396 *
397 * Creates a bidirectional communications channel between this process
398 * and a child the constructor forks. On return from the constructor,
399 * isParent() and isChild() can be used to tell the parent end from the
400 * child end of the pipe. In the child, all other open BidirMMapPipes
401 * are closed.
402 *
403 * @param useExceptions read()/write() error reporting also done using
404 * exceptions
405 * @param useSocketpair use a socketpair instead of a pair or pipes
406 *
407 * Normally, exceptions are thrown for all serious I/O errors (apart
408 * from end of file). Setting useExceptions to false will force the
409 * read() and write() methods to only report serious I/O errors using
410 * flags.
411 *
412 * When useSocketpair is true, use a pair of Unix domain sockets
413 * created using socketpair instead a pair of pipes. The advantage is
414 * that only one pair of file descriptors is needed instead of two
415 * pairs which are needed for the pipe pair. Performance should very
416 * similar on most platforms, especially if mmap works, since only
417 * very little data is sent through the pipe(s)/socketpair.
418 */
419 BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
420
421 /** @brief destructor
422 *
423 * closes this end of pipe
424 */
426
427 /** @brief return the current setting of the debug flag
428 *
429 * @returns an integer with the debug Setting
430 */
431 static int debugflag() { return s_debugflag; }
432
433 /** @brief set the debug flags
434 *
435 * @param flag debug flags (if zero, no messages are printed)
436 */
437 static void setDebugflag(int flag) { s_debugflag = flag; }
438
439 /** @brief read from pipe
440 *
441 * @param addr where to put read data
442 * @param sz size of data to read (in bytes)
443 * @returns size of data read, or 0 in case of end-of-file
444 *
445 * read may block until data from other end is available. It will
446 * return 0 if the other end closed the pipe.
447 */
448 size_type read(void* addr, size_type sz);
449
450 /** @brief wirte to pipe
451 *
452 * @param addr where to get data to write from
453 * @param sz size of data to write (in bytes)
454 * @returns size of data written, or 0 in case of end-of-file
455 *
456 * write may block until data can be written to other end (depends a
457 * bit on available buffer space). It will return 0 if the other end
458 * closed the pipe. The data is queued to be written on the next
459 * convenient occasion, or it can be forced out with flush().
460 */
461 size_type write(const void* addr, size_type sz);
462
463 /** @brief flush buffers with unwritten data
464 *
465 * This forces unwritten data to be written to the other end. The call
466 * will block until this has been done (or the attempt failed with an
467 * error).
468 */
469 void flush();
470
471 /** @brief purge buffered data waiting to be read and/or written
472 *
473 * Discards all internal buffers.
474 */
475 void purge();
476
477 /** @brief number of bytes that can be read without blocking
478 *
479 * @returns number of bytes that can be read without blocking
480 */
482
483 /** @brief number of bytes that can be written without blocking
484 *
485 * @returns number of bytes that can be written without blocking
486 */
488
489 /** @brief flush buffers, close pipe
490 *
491 * Flush buffers, discard unread data, closes the pipe. If the pipe is
492 * in the parent process, it waits for the child.
493 *
494 * @returns exit code of child process in parent, zero in child
495 */
496 int close();
497
498 /** @brief return PID of the process on the other end of the pipe
499 *
500 * @returns PID of the process running on the remote end
501 */
502 pid_t pidOtherEnd() const
503 { return isChild() ? m_parentPid : m_childPid; }
504
505 /// condition flags for poll
507 None = 0, ///< nothing special on this pipe
508 Readable = 1, ///< pipe has data for reading
509 Writable = 2, ///< pipe can be written to
510 ReadError = 4, ///< pipe error read end
511 WriteError = 8, ///< pipe error Write end
512 Error = ReadError | WriteError, ///< pipe error
513 ReadEndOfFile = 32, ///< read pipe in end-of-file state
514 WriteEndOfFile = 64,///< write pipe in end-of-file state
516 ReadInvalid = 64, ///< read end of pipe invalid
517 WriteInvalid = 128, ///< write end of pipe invalid
518 Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
519 };
520
521 /// for poll() interface
522 class PollEntry {
523 public:
524 BidirMMapPipe* pipe; ///< pipe of interest
525 unsigned events; ///< events of interest (or'ed bitmask)
526 unsigned revents; ///< events that happened (or'ed bitmask)
527 /// poll a pipe for all events
529 pipe(_pipe), events(None), revents(None) { }
530 /// poll a pipe for specified events
531 PollEntry(BidirMMapPipe* _pipe, int _events) :
532 pipe(_pipe), events(_events), revents(None) { }
533 };
534 /// convenience typedef for poll() interface
535 typedef std::vector<PollEntry> PollVector;
536
537 /** @brief poll a set of pipes for events (ready to read from, ready to
538 * write to, error)
539 *
540 * @param pipes set of pipes to check
541 * @param timeout timeout in milliseconds
542 * @returns positive number: number of pipes which have
543 * status changes, 0: timeout, or no pipes with
544 * status changed, -1 on error
545 *
546 * Timeout can be zero (check for specified events, and return), finite
547 * (wait at most timeout milliseconds before returning), or -1
548 * (infinite). The poll method returns when the timeout has elapsed,
549 * or if an event occurs on one of the pipes being polled, whichever
550 * happens earlier.
551 *
552 * Pipes is a vector of one or more PollEntries, which each list a pipe
553 * and events to poll for. If events is left empty (zero), all
554 * conditions are polled for, otherwise only the indicated ones. On
555 * return, the revents fields contain the events that occurred for each
556 * pipe; error Error, EndOfFile or Invalid events are always set,
557 * regardless of wether they were in the set of requested events.
558 *
559 * poll may block slightly longer than specified by timeout due to OS
560 * timer granularity and OS scheduling. Due to its implementation, the
561 * poll call can also return early if the remote end of the page sends
562 * a free page while polling (which is put on that pipe's freelist),
563 * while that pipe is polled for e.g Reading. The status of the pipe is
564 * indicated correctly in revents, and the caller can simply poll
565 * again. (The reason this is done this way is because it helps to
566 * replenish the pool of free pages and queue busy pages without
567 * blocking.)
568 *
569 * Here's a piece of example code waiting on two pipes; if they become
570 * readable they are read:
571 * @code
572 * #include <unistd.h>
573 * #include <cstdlib>
574 * #include <string>
575 * #include <sstream>
576 * #include <iostream>
577 *
578 * #include "BidirMMapPipe.h"
579 *
580 * // what to execute in the child
581 * int randomchild(BidirMMapPipe& pipe)
582 * {
583 * ::srand48(::getpid());
584 * for (int i = 0; i < 5; ++i) {
585 * // sleep a random time between 0 and .9 seconds
586 * ::usleep(int(1e6 * ::drand48()));
587 * std::ostringstream buf;
588 * buf << "child pid " << ::getpid() << " sends message " << i;
589 * std::cout << "[CHILD] : " << buf.str() << std::endl;
590 * pipe << buf.str() << BidirMMapPipe::flush;
591 * if (!pipe) return -1;
592 * if (pipe.eof()) break;
593 * }
594 * // tell parent we're done
595 * pipe << "" << BidirMMapPipe::flush;
596 * // wait for parent to acknowledge
597 * std::string s;
598 * pipe >> s;
599 * pipe.close();
600 * return 0;
601 * }
602 *
603 * // function to spawn a child
604 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
605 * {
606 * BidirMMapPipe *p = new BidirMMapPipe();
607 * if (p->isChild()) {
608 * int retVal = childexec(*p);
609 * delete p;
610 * std::exit(retVal);
611 * }
612 * return p;
613 * }
614 *
615 * int main()
616 * {
617 * typedef BidirMMapPipe::PollEntry PollEntry;
618 * // poll data structure
619 * BidirMMapPipe::PollVector pipes;
620 * pipes.reserve(3);
621 * // spawn children
622 * for (int i = 0; i < 3; ++i) {
623 * pipes.push_back(PollEntry(spawnChild(randomchild),
624 * BidirMMapPipe::Readable));
625 * }
626 * // while at least some children alive
627 * while (!pipes.empty()) {
628 * // poll, wait until status change (infinite timeout)
629 * int npipes = BidirMMapPipe::poll(pipes, -1);
630 * // scan for pipes with changed status
631 * for (std::vector<PollEntry>::iterator it = pipes.begin();
632 * npipes && pipes.end() != it; ) {
633 * if (!it->revents) {
634 * // unchanged, next one
635 * ++it;
636 * continue;
637 * }
638 * --npipes; // maybe we can stop early...
639 * // read from pipes which are readable
640 * if (it->revents & BidirMMapPipe::Readable) {
641 * std::string s;
642 * *(it->pipe) >> s;
643 * if (!s.empty()) {
644 * std::cout << "[PARENT]: Read from pipe " <<
645 * it->pipe << ": " << s << std::endl;
646 * ++it;
647 * continue;
648 * } else {
649 * // child is shutting down...
650 * *(it->pipe) << "" << BidirMMapPipe::flush;
651 * goto childcloses;
652 * }
653 * }
654 * // retire pipes with error or end-of-file condition
655 * if (it->revents & (BidirMMapPipe::Error |
656 * BidirMMapPipe::EndOfFile |
657 * BidirMMapPipe::Invalid)) {
658 * std::cout << "[PARENT]: Error on pipe " <<
659 * it->pipe << " revents " << it->revents <<
660 * std::endl;
661 * childcloses:
662 * std::cout << "[PARENT]:\tchild exit status: " <<
663 * it->pipe->close() << std::endl;
664 * if (retVal) return retVal;
665 * delete it->pipe;
666 * it = pipes.erase(it);
667 * continue;
668 * }
669 * }
670 * }
671 * return 0;
672 * }
673 * @endcode
674 */
675 static int poll(PollVector& pipes, int timeout);
676
677 /** @brief return if this end of the pipe is the parent end
678 *
679 * @returns true if parent end of pipe
680 */
681 bool isParent() const { return m_childPid; }
682
683 /** @brief return if this end of the pipe is the child end
684 *
685 * @returns true if child end of pipe
686 */
687 bool isChild() const { return !m_childPid; }
688
689 /** @brief if BidirMMapPipe uses a socketpair for communications
690 *
691 * @returns true if BidirMMapPipe uses a socketpair for communications
692 */
693 bool usesSocketpair() const { return m_inpipe == m_outpipe; }
694
695 /** @brief if BidirMMapPipe uses a pipe pair for communications
696 *
697 * @returns true if BidirMMapPipe uses a pipe pair for communications
698 */
699 bool usesPipepair() const { return m_inpipe != m_outpipe; }
700
701 /** @brief return flags (end of file, BidirMMapPipe closed, ...)
702 *
703 * @returns flags (end of file, BidirMMapPipe closed, ...)
704 */
705 int rdstate() const { return m_flags; }
706
707 /** @brief true if end-of-file
708 *
709 * @returns true if end-of-file
710 */
711 bool eof() const { return m_flags & eofbit; }
712
713 /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
714 *
715 * @returns true in case of grave logical error (I/O on closed pipe,...)
716 */
717 bool fail() const { return m_flags & failbit; }
718
719 /** @brief true on I/O error
720 *
721 * @returns true on I/O error
722 */
723 bool bad() const { return m_flags & badbit; }
724
725 /** @brief status of stream is good
726 *
727 * @returns true if pipe is good (no errors, eof, ...)
728 */
729 bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
730
731 /** @brief true if closed
732 *
733 * @returns true if stream is closed
734 */
735 bool closed() const { return m_flags & failbit; }
736
737 /** @brief return true if not serious error (fail/bad)
738 *
739 * @returns true if stream is does not have serious error (fail/bad)
740 *
741 * (if EOF, this is still true)
742 */
743 operator bool() const { return !fail() && !bad(); }
744
745 /** @brief return true if serious error (fail/bad)
746 *
747 * @returns true if stream has a serious error (fail/bad)
748 */
749 bool operator!() const { return fail() || bad(); }
750
751#ifdef STREAMOP
752#undef STREAMOP
753#endif
754#define STREAMOP(TYPE) \
755 BidirMMapPipe& operator<<(const TYPE& val) \
756 { write(&val, sizeof(TYPE)); return *this; } \
757 BidirMMapPipe& operator>>(TYPE& val) \
758 { read(&val, sizeof(TYPE)); return *this; }
759 STREAMOP(bool); ///< C++ style stream operators for bool
760 STREAMOP(char); ///< C++ style stream operators for char
761 STREAMOP(short); ///< C++ style stream operators for short
762 STREAMOP(int); ///< C++ style stream operators for int
763 STREAMOP(long); ///< C++ style stream operators for long
764 STREAMOP(long long); ///< C++ style stream operators for long long
765 STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
766 STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
767 STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
768 STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
769 STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
770 STREAMOP(float); ///< C++ style stream operators for float
771 STREAMOP(double); ///< C++ style stream operators for double
772#undef STREAMOP
773
774 /** @brief write a C-style string
775 *
776 * @param str C-style string
777 * @returns pipe written to
778 */
779 BidirMMapPipe& operator<<(const char* str);
780
781 /** @brief read a C-style string
782 *
783 * @param str pointer to string (space allocated with malloc!)
784 * @returns pipe read from
785 *
786 * since this is for C-style strings, we use malloc/realloc/free for
787 * strings. passing in a NULL pointer is valid here, and the routine
788 * will use realloc to allocate a chunk of memory of the right size.
789 */
790 BidirMMapPipe& operator>>(char* (&str));
791
792 /** @brief write a std::string object
793 *
794 * @param str string to write
795 * @returns pipe written to
796 */
797 BidirMMapPipe& operator<<(const std::string& str);
798
799 /** @brief read a std::string object
800 *
801 * @param str string to be read
802 * @returns pipe read from
803 */
804 BidirMMapPipe& operator>>(std::string& str);
805
806 /** @brief write raw pointer to T to other side
807 *
808 * NOTE: This will not write the pointee! Only the value of the
809 * pointer is transferred.
810 *
811 * @param tptr pointer to be written
812 * @returns pipe written to
813 */
814 template<class T> BidirMMapPipe& operator<<(const T* tptr)
815 { write(&tptr, sizeof(tptr)); return *this; }
816
817 /** @brief read raw pointer to T from other side
818 *
819 * NOTE: This will not read the pointee! Only the value of the
820 * pointer is transferred.
821 *
822 * @param tptr pointer to be read
823 * @returns pipe read from
824 */
825 template<class T> BidirMMapPipe& operator>>(T* &tptr)
826 { read(&tptr, sizeof(tptr)); return *this; }
827
828 /** @brief I/O manipulator support
829 *
830 * @param manip manipulator
831 * @returns pipe with manipulator applied
832 *
833 * example:
834 * @code
835 * pipe << BidirMMapPipe::flush;
836 * @endcode
837 */
839 { return manip(*this); }
840
841 /** @brief I/O manipulator support
842 *
843 * @param manip manipulator
844 * @returns pipe with manipulator applied
845 *
846 * example:
847 * @code
848 * pipe >> BidirMMapPipe::purge;
849 * @endcode
850 */
852 { return manip(*this); }
853
854 /// for usage a la "pipe << flush;"
855 static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
856 /// for usage a la "pipe << purge;"
857 static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
858
859 private:
860 /// copy-construction forbidden
862 /// assignment forbidden
863 BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
864
865 /// page is our friend
867 /// convenience typedef for Page
869
870 /// tuning constants
871 enum {
872 // TotPages = 16 will give 32k buffers at 4k page size for both
873 // parent and child; if your average message to send is larger
874 // than this, consider raising the value (max 256)
875 TotPages = 16, ///< pages shared (child + parent)
876
877 PagesPerEnd = TotPages / 2, ///< pages per pipe end
878
879 // if FlushThresh pages are filled, the code forces a flush; 3/4
880 // of the pages available seems to work quite well
881 FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
882 };
883
884 // per-class members
885 static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
886 /// list of open BidirMMapPipes
887 static std::list<BidirMMapPipe*> s_openpipes;
888 /// pool of mmapped pages
890 /// page pool reference counter
891 static unsigned s_pagepoolrefcnt;
892 /// debug flag
893 static int s_debugflag;
894
895 /// return page pool
897
898 // per-instance members
900 Page* m_busylist; ///< linked list: busy pages (data to be read)
901 Page* m_freelist; ///< linked list: free pages
902 Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
903 int m_inpipe; ///< pipe end from which data may be read
904 int m_outpipe; ///< pipe end to which data may be written
905 int m_flags; ///< flags (e.g. end of file)
906 pid_t m_childPid; ///< pid of the child (zero if we're child)
907 pid_t m_parentPid; ///< pid of the parent
908
909 /// cleanup routine - at exit, we want our children to get a SIGTERM...
910 static void teardownall(void);
911
912 /// return length of a page list
913 static unsigned lenPageList(const Page* list);
914
915 /** "feed" the busy and free lists with a list of pages
916 *
917 * @param plist linked list of pages
918 *
919 * goes through plist, puts free pages from plist onto the freelist
920 * (or sends them to the remote end if they belong there), and puts
921 * non-empty pages on plist onto the busy list
922 */
923 void feedPageLists(Page* plist);
924
925 /// put on dirty pages list
926 void markPageDirty(Page* p);
927
928 /// transfer bytes through the pipe (reading, writing, may block)
929 static size_type xferraw(int fd, void* addr, size_type len,
930 ssize_t (*xferfn)(int, void*, std::size_t));
931 /// transfer bytes through the pipe (reading, writing, may block)
932 static size_type xferraw(int fd, void* addr, const size_type len,
933 ssize_t (*xferfn)(int, const void*, std::size_t))
934 {
935 return xferraw(fd, addr, len,
936 reinterpret_cast<ssize_t (*)(
937 int, void*, std::size_t)>(xferfn));
938 }
939
940 /** @brief send page(s) to the other end (may block)
941 *
942 * @param plist linked list of pages to send
943 *
944 * the implementation gathers the different write(s) whereever
945 * possible; if mmap works, this results in a single write to transfer
946 * the list of pages sent, if we need to copy things through the pipe,
947 * we have one write to transfer which pages are sent, and then one
948 * write per page.
949 */
950 void sendpages(Page* plist);
951
952 /** @brief receive a pages from the other end (may block), queue them
953 *
954 * @returns number of pages received
955 *
956 * this is an application-level scatter read, which gets the list of
957 * pages to read from the pipe. if mmap works, it needs only one read
958 * call (to get the head of the list of pages transferred). if we need
959 * to copy pages through the pipe, we need to add one read for each
960 * empty page, and two reads for each non-empty page.
961 */
962 unsigned recvpages();
963
964 /** @brief receive pages from other end (non-blocking)
965 *
966 * @returns number of pages received
967 *
968 * like recvpages(), but does not block if nothing is available for
969 * reading
970 */
971 unsigned recvpages_nonblock();
972
973 /// get a busy page to read data from (may block)
974 Page* busypage();
975 /// get a dirty page to write data to (may block)
976 Page* dirtypage();
977
978 /// close the pipe (no flush if forced)
979 int doClose(bool force, bool holdlock = false);
980 /// perform the flush
981 void doFlush(bool forcePartialPages = true);
982#endif //_WIN32
983};
984
986
987#undef BEGIN_NAMESPACE_ROOFIT
988#undef END_NAMESPACE_ROOFIT
989
990#endif // BIDIRMMAPPIPE_H
991
992// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:18
for poll() interface
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
unsigned revents
events that happened (or'ed bitmask)
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
exception to throw if low-level OS calls go wrong
class representing a chunk of pages
Definition: BidirMMapPipe.h:43
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:82
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:87
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:55
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:46
@ Copy
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:48
@ Unknown
don't know yet what'll work
Definition: BidirMMapPipe.h:47
@ DevZero
mmapping /dev/zero works
Definition: BidirMMapPipe.h:50
@ Anonymous
anonymous mmap works
Definition: BidirMMapPipe.h:51
@ FileBacked
mmapping a temp file works
Definition: BidirMMapPipe.h:49
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:56
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:89
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:85
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
class representing a page pool
class representing the header structure in an mmapped page
handle class for a number of Pages
unsigned operator[](Page *p) const
perform page to page number mapping
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Page * operator[](unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
STREAMOP(short)
C++ style stream operators for short.
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void purge()
purge buffered data waiting to be read and/or written
STREAMOP(unsigned long long)
C++ style stream operators for unsigned long long.
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
STREAMOP(float)
C++ style stream operators for float.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
BidirMMapPipe_impl::Page Page
convenience typedef for Page
STREAMOP(long long)
C++ style stream operators for long long.
STREAMOP(long)
C++ style stream operators for long.
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
Page * m_dirtylist
linked list: dirty pages (data to be sent)
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
pid_t m_parentPid
pid of the parent
STREAMOP(bool)
C++ style stream operators for bool.
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
STREAMOP(double)
C++ style stream operators for double.
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
STREAMOP(unsigned long)
C++ style stream operators for unsigned long.
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
@ wrerrbit
write error
@ badbit
general I/O error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
STREAMOP(int)
C++ style stream operators for int.
STREAMOP(unsigned short)
C++ style stream operators for unsigned short.
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
bool bad() const
true on I/O error
PollFlags
condition flags for poll
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
Page * dirtypage()
get a dirty page to write data to (may block)
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
STREAMOP(char)
C++ style stream operators for char.
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
STREAMOP(unsigned char)
C++ style stream operators for unsigned char.
size_type read(void *addr, size_type sz)
read from pipe
STREAMOP(unsigned int)
C++ style stream operators for unsigned int.
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
bool operator!() const
return true if serious error (fail/bad)
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
bool fail() const
logical failure (e.g.
static unsigned s_pagepoolrefcnt
page pool reference counter
static void setDebugflag(int flag)
set the debug flags
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
namespace for implementation details of BidirMMapPipe
double T(double x)
Definition: ChebyshevPol.h:34
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool