ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <iostream>
17 #include <algorithm>
18 #include <exception>
20 #include <poll.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/wait.h>
30 #include <sys/socket.h>
32 #include "BidirMMapPipe.h"
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
39 /// namespace for implementation details of BidirMMapPipe
40 namespace BidirMMapPipe_impl {
41  /** @brief exception to throw if low-level OS calls go wrong
42  *
43  * @author Manuel Schiller <>
44  * @date 2013-07-07
45  */
46  class BidirMMapPipeException : public std::exception
47  {
48  private:
49  enum {
50  s_sz = 256 ///< length of buffer
51  };
52  char m_buf[s_sz]; ///< buffer containing the error message
54  /// for the POSIX version of strerror_r
55  static int dostrerror_r(int err, char* buf, std::size_t sz,
56  int (*f)(int, char*, std::size_t))
57  { return f(err, buf, sz); }
58  /// for the GNU version of strerror_r
59  static int dostrerror_r(int, char*, std::size_t,
60  char* (*f)(int, char*, std::size_t));
61  public:
62  /// constructor taking error code, hint on operation (msg)
63  BidirMMapPipeException(const char* msg, int err);
64  /// return a destcription of what went wrong
65  virtual const char* what() const throw() { return m_buf; }
66  };
68  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69  {
70  std::size_t msgsz = std::strlen(msg);
71  if (msgsz) {
72  msgsz = std::min(msgsz, std::size_t(s_sz));
73  std::copy(msg, msg + msgsz, m_buf);
74  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76  }
77  if (msgsz < s_sz) {
78  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79  // have to sort it out with overloads
80  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81  }
82  m_buf[s_sz - 1] = 0; // enforce zero-termination
83  }
85  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86  std::size_t sz, char* (*f)(int, char*, std::size_t))
87  {
88  buf[0] = 0;
89  char *tmp = f(err, buf, sz);
90  if (tmp && tmp != buf) {
91  std::strncpy(buf, tmp, sz);
92  buf[sz - 1] = 0;
93  if (std::strlen(tmp) > sz - 1) return ERANGE;
94  }
95  return 0;
96  }
98  /** @brief class representing the header structure in an mmapped page
99  *
100  * @author Manuel Schiller <>
101  * @date 2013-07-07
102  *
103  * contains a field to put pages into a linked list, a field for the size
104  * of the data being transmitted, and a field for the position until which
105  * the data has been read
106  */
107  class Page
108  {
109  private:
110  // use as small a data type as possible to maximise payload area
111  // of pages
112  short m_next; ///< next page in list (in pagesizes)
113  unsigned short m_size; ///< size of payload (in bytes)
114  unsigned short m_pos; ///< index of next byte in payload area
115  /// copy construction forbidden
116  Page(const Page&) {}
117  /// assertignment forbidden
118  Page& operator=(const Page&)
119  { return *reinterpret_cast<Page*>(0); }
120  public:
121  /// constructor
122  Page() : m_next(0), m_size(0), m_pos(0)
123  {
124  // check that short is big enough - must be done at runtime
125  // because the page size is not known until runtime
128  }
129  /// set pointer to next page
130  void setNext(const Page* p);
131  /// return pointer to next page
132  Page* next() const;
133  /// return reference to size field
134  unsigned short& size() { return m_size; }
135  /// return size (of payload data)
136  unsigned size() const { return m_size; }
137  /// return reference to position field
138  unsigned short& pos() { return m_pos; }
139  /// return position
140  unsigned pos() const { return m_pos; }
141  /// return pointer to first byte in payload data area of page
142  inline unsigned char* begin() const
143  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
144  + sizeof(Page); }
145  /// return pointer to first byte in payload data area of page
146  inline unsigned char* end() const
147  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148  + PageChunk::pagesize(); }
149  /// return the capacity of the page
150  static unsigned capacity()
151  { return PageChunk::pagesize() - sizeof(Page); }
152  /// true if page empty
153  bool empty() const { return !m_size; }
154  /// true if page partially filled
155  bool filled() const { return !empty(); }
156  /// free space left (to be written to)
157  unsigned free() const { return capacity() - m_size; }
158  /// bytes remaining to be read
159  unsigned remaining() const { return m_size - m_pos; }
160  /// true if page completely full
161  bool full() const { return !free(); }
162  };
164  void Page::setNext(const Page* p)
165  {
166  if (!p) {
167  m_next = 0;
168  } else {
169  const char* p1 = reinterpret_cast<char*>(this);
170  const char* p2 = reinterpret_cast<const char*>(p);
171  std::ptrdiff_t tmp = p2 - p1;
172  // difference must be divisible by page size
173  assert(!(tmp % PageChunk::pagesize()));
174  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
175  m_next = tmp;
176  // no truncation when saving in a short
177  assert(m_next == tmp);
178  // final check: next() must return p
179  assert(next() == p);
180  }
181  }
183  Page* Page::next() const
184  {
185  if (!m_next) return 0;
186  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
187  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
188  return reinterpret_cast<Page*>(ptmp);
189  }
191  /** @brief class representing a page pool
192  *
193  * @author Manuel Schiller <>
194  * @date 2013-07-24
195  *
196  * pool of mmapped pages (on systems which support it, on all others, the
197  * functionality is emulated with dynamically allocated memory)
198  *
199  * in most operating systems there is a limit to how many mappings any one
200  * process is allowed to request; for this reason, we mmap a relatively
201  * large amount up front, and then carve off little pieces as we need them
202  */
203  class PagePool {
204  private:
205  /// convenience typedef
206  typedef BidirMMapPipeException Exception;
208  enum {
209  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
210  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
211  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
212  };
213  /// a chunk of memory in the pool
214  typedef BidirMMapPipe_impl::PageChunk Chunk;
215  /// list of chunks
216  typedef std::list<Chunk*> ChunkList;
218  friend class BidirMMapPipe_impl::PageChunk;
219  public:
220  /// convenience typedef
222  /// constructor
223  PagePool(unsigned nPagesPerGroup);
224  /// destructor
225  ~PagePool();
226  /// pop a free element out of the pool
227  Pages pop();
229  /// return page size of the system
230  static unsigned pagesize() { return PageChunk::pagesize(); }
231  /// return variety of mmap supported on the system
232  static MMapVariety mmapVariety()
233  { return PageChunk::mmapVariety(); }
235  /// return number of pages per group (ie. as returned by pop())
236  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
238  /// zap the pool (unmap all but Pages p)
239  void zap(Pages& p);
241  private:
242  /// list of chunks used by the pool
243  ChunkList m_chunks;
244  /// list of chunks used by the pool which are not full
245  ChunkList m_freelist;
246  /// chunk size map (histogram of chunk sizes)
247  unsigned m_szmap[(maxsz - minsz) / szincr];
248  /// current chunk size
249  int m_cursz;
250  /// page group size
251  unsigned m_nPgPerGrp;
253  /// adjust _cursz to current largest block
254  void updateCurSz(int sz, int incr);
255  /// find size of next chunk to allocate (in a hopefully smart way)
256  int nextChunkSz() const;
257  /// release a chunk
258  void putOnFreeList(Chunk* chunk);
259  /// release a chunk
260  void release(Chunk* chunk);
261  };
263  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
264  m_pimpl(new impl)
265  {
266  assert(npg < 256);
267  m_pimpl->m_parent = parent;
268  m_pimpl->m_pages = pages;
269  m_pimpl->m_refcnt = 1;
270  m_pimpl->m_npages = npg;
271  /// initialise pages
272  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
273  }
279  {
280  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
281  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
282  delete m_pimpl;
283  }
284  }
286  Pages::Pages(const Pages& other) :
287  m_pimpl(other.m_pimpl)
288  { ++(m_pimpl->m_refcnt); }
290  Pages& Pages::operator=(const Pages& other)
291  {
292  if (&other == this) return *this;
293  if (--(m_pimpl->m_refcnt)) {
294  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
295  delete m_pimpl;
296  }
297  m_pimpl = other.m_pimpl;
298  ++(m_pimpl->m_refcnt);
299  return *this;
300  }
302  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
304  Page* Pages::page(unsigned pgno) const
305  {
306  assert(pgno < m_pimpl->m_npages);
307  unsigned char* pptr =
308  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
309  pptr += pgno * pagesize();
310  return reinterpret_cast<Page*>(pptr);
311  }
313  unsigned Pages::pageno(Page* p) const
314  {
315  const unsigned char* pptr =
316  reinterpret_cast<const unsigned char*>(p);
317  const unsigned char* bptr =
318  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
319  assert(0 == ((pptr - bptr) % pagesize()));
320  const unsigned nr = (pptr - bptr) / pagesize();
321  assert(nr < m_pimpl->m_npages);
322  return nr;
323  }
326  {
327  // find out page size of system
328  long pgsz = sysconf(_SC_PAGESIZE);
329  if (-1 == pgsz) throw Exception("sysconf", errno);
330  if (pgsz > 512 && pgsz > long(sizeof(Page)))
331  return pgsz;
333  // in case of failure or implausible value, use a safe default: 4k
334  // page size, and do not try to mmap
335  s_mmapworks = Copy;
336  return 1 << 12;
337  }
339  PageChunk::PageChunk(PagePool* parent,
340  unsigned length, unsigned nPgPerGroup) :
341  m_begin(dommap(length)),
342  m_end(reinterpret_cast<void*>(
343  reinterpret_cast<unsigned char*>(m_begin) + length)),
344  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
345  {
346  // ok, push groups of pages onto freelist here
347  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
348  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
349  while (p < pend) {
350  m_freelist.push_back(reinterpret_cast<void*>(p));
351  p += nPgPerGroup * PagePool::pagesize();
352  }
353  }
356  {
357  if (m_parent) assert(empty());
358  if (m_begin) domunmap(m_begin, len());
359  }
361  bool PageChunk::contains(const Pages& p) const
362  { return p.m_pimpl->m_parent == this; }
365  {
366  assert(!m_freelist.empty());
367  void* p = m_freelist.front();
368  m_freelist.pop_front();
369  ++m_nUsedGrp;
370  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
371  }
373  void PageChunk::push(const Pages& p)
374  {
375  assert(contains(p));
376  bool wasempty = m_freelist.empty();
377  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
378  --m_nUsedGrp;
379  if (m_parent) {
380  // notify parent if we need to be put on the free list again
381  if (wasempty) m_parent->putOnFreeList(this);
382  // notify parent if we're empty
383  if (empty()) return m_parent->release(this);
384  }
385  }
387  void* PageChunk::dommap(unsigned len)
388  {
389  assert(len && 0 == (len % s_pagesize));
390  // ok, the idea here is to try the different methods of mmapping, and
391  // choose the first one that works. we have four flavours:
392  // 1 - anonymous mmap (best)
393  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
394  // bit more tedious to set up, since you need to open/close a
395  // device file)
396  // 3 - mmap of a temporary file (very tedious to set up - need to
397  // create a temporary file, delete it, make the underlying storage
398  // large enough, then mmap the fd and close it)
399  // 4 - if all those fail, we malloc the buffers, and copy the data
400  // through the OS (then we're no better than normal pipes)
401  static bool msgprinted = false;
402  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
403 #if defined(MAP_ANONYMOUS)
404 #undef MYANONFLAG
406 #elif defined(MAP_ANON)
407 #undef MYANONFLAG
409 #else
410 #undef MYANONFLAG
411 #endif
412 #ifdef MYANONFLAG
413  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
415  if (MAP_FAILED == retVal) {
416  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
417  } else {
420  if (!msgprinted) {
421  std::cerr << " INFO: In " << __func__ << " (" <<
422  __FILE__ << ", line " << __LINE__ <<
423  "): anonymous mmapping works, excellent!" <<
424  std::endl;
425  msgprinted = true;
426  }
427  return retVal;
428  }
429 #endif
430 #undef MYANONFLAG
431  }
432  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
433  // ok, no anonymous mappings supported directly, so try to map
434  // /dev/zero which has much the same effect on many systems
435  int fd = ::open("/dev/zero", O_RDWR);
436  if (-1 == fd)
437  throw Exception("open /dev/zero", errno);
438  void* retVal = ::mmap(0, len,
440  if (MAP_FAILED == retVal) {
441  int errsv = errno;
442  ::close(fd);
443  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
444  } else {
447  }
448  if (-1 == ::close(fd))
449  throw Exception("close /dev/zero", errno);
450  if (!msgprinted) {
451  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
452  ", line " << __LINE__ << "): mmapping /dev/zero works, "
453  "very good!" << std::endl;
454  msgprinted = true;
455  }
456  return retVal;
457  }
458  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
459  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
460  int fd;
461  // open temp file
462  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
463  // remove it, but keep fd open
464  if (-1 == ::unlink(name)) {
465  int errsv = errno;
466  ::close(fd);
467  throw Exception("unlink", errsv);
468  }
469  // make it the right size: lseek
470  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
471  int errsv = errno;
472  ::close(fd);
473  throw Exception("lseek", errsv);
474  }
475  // make it the right size: write a byte
476  if (1 != ::write(fd, name, 1)) {
477  int errsv = errno;
478  ::close(fd);
479  throw Exception("write", errsv);
480  }
481  // do mmap
482  void* retVal = ::mmap(0, len,
484  if (MAP_FAILED == retVal) {
485  int errsv = errno;
486  ::close(fd);
487  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
488  } else {
491  }
492  if (-1 == ::close(fd)) {
493  int errsv = errno;
494  ::munmap(retVal, len);
495  throw Exception("close", errsv);
496  }
497  if (!msgprinted) {
498  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
499  ", line " << __LINE__ << "): mmapping temporary files "
500  "works, good!" << std::endl;
501  msgprinted = true;
502  }
503  return retVal;
504  }
505  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
506  // fallback solution: mmap does not work on this OS (or does not
507  // work for what we want to use it), so use a normal buffer of
508  // memory instead, and collect data in that buffer - this needs an
509  // additional write/read to/from the pipe(s), but there you go...
510  if (!msgprinted) {
511  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
512  ", line " << __LINE__ << "): anonymous mmapping of "
513  "shared buffers failed, falling back to read/write on "
514  " pipes!" << std::endl;
515  msgprinted = true;
516  }
517  s_mmapworks = Copy;
518  void* retVal = std::malloc(len);
519  if (!retVal) throw Exception("malloc", errno);
520  return retVal;
521  }
522  // should never get here
523  assert(false);
524  return 0;
525  }
527  void PageChunk::domunmap(void* addr, unsigned len)
528  {
529  assert(len && 0 == (len % s_pagesize));
530  if (addr) {
532  if (Copy != s_mmapworks) {
533  if (-1 == ::munmap(addr, len))
534  throw Exception("munmap", errno);
535  } else {
536  std::free(addr);
537  }
538  }
539  }
542  {
543  // try to mprotect the other bits of the pool with no access...
544  // we'd really like a version of mremap here that can unmap all the
545  // other pages in the chunk, but that does not exist, so we protect
546  // the other pages in this chunk such that they may neither be read,
547  // written nor executed, only the pages we're interested in for
548  // communications stay readable and writable
549  //
550  // if an OS does not support changing the protection of a part of an
551  // mmapped area, the mprotect calls below should just fail and not
552  // change any protection, so we're a little less safe against
553  // corruption, but everything should still work
554  if (Copy != s_mmapworks) {
555  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
556  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
557  unsigned char* p2 = p1 + p.npages() * pagesize();
558  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
559  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
560  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
561  }
562  m_parent = 0;
563  m_freelist.clear();
564  m_nUsedGrp = 1;
565  p.m_pimpl->m_parent = 0;
566  m_begin = m_end = 0;
567  // commit suicide
568  delete this;
569  }
571  PagePool::PagePool(unsigned nPgPerGroup) :
572  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
573  { std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0); }
575  PagePool::~PagePool()
576  {
577  m_freelist.clear();
578  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
579  delete *it;
580  m_chunks.clear();
581  }
583  void PagePool::zap(Pages& p)
584  {
585  // unmap all pages but those pointed to by p
586  m_freelist.clear();
587  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
588  if ((*it)->contains(p)) {
589  (*it)->zap(p);
590  } else {
591  delete *it;
592  }
593  }
594  m_chunks.clear();
595  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
596  m_cursz = minsz;
597  }
599  Pages PagePool::pop()
600  {
601  if (m_freelist.empty()) {
602  // allocate and register new chunk and put it on the freelist
603  const int sz = nextChunkSz();
604  Chunk *c = new Chunk(this,
605  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
606  m_chunks.push_front(c);
607  m_freelist.push_back(c);
608  updateCurSz(sz, +1);
609  }
610  // get free element from first chunk on _freelist
611  Chunk* c = m_freelist.front();
612  Pages p(c->pop());
613  // full chunks are removed from _freelist
614  if (c->full()) m_freelist.pop_front();
615  return p;
616  }
618  void PagePool::release(PageChunk* chunk)
619  {
620  assert(chunk->empty());
621  // find chunk on freelist and remove
622  ChunkList::iterator it = std::find(
623  m_freelist.begin(), m_freelist.end(), chunk);
624  if (m_freelist.end() == it)
625  throw Exception("PagePool::release(PageChunk*)", EINVAL);
626  m_freelist.erase(it);
627  // find chunk in m_chunks and remove
628  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
629  if (m_chunks.end() == it)
630  throw Exception("PagePool::release(PageChunk*)", EINVAL);
631  m_chunks.erase(it);
632  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
633  delete chunk;
634  updateCurSz(sz, -1);
635  }
637  void PagePool::putOnFreeList(PageChunk* chunk)
638  {
639  assert(!chunk->full());
640  m_freelist.push_back(chunk);
641  }
643  void PagePool::updateCurSz(int sz, int incr)
644  {
645  m_szmap[(sz - minsz) / szincr] += incr;
646  m_cursz = minsz;
647  for (int i = (maxsz - minsz) / szincr; i--; ) {
648  if (m_szmap[i]) {
649  m_cursz += i * szincr;
650  break;
651  }
652  }
653  }
655  int PagePool::nextChunkSz() const
656  {
657  // no chunks with space available, figure out chunk size
658  int sz = m_cursz;
659  if (m_chunks.empty()) {
660  // if we start allocating chunks, we start from minsz
661  sz = minsz;
662  } else {
663  if (minsz >= sz) {
664  // minimal sized chunks are always grown
665  sz = minsz + szincr;
666  } else {
667  if (1 != m_chunks.size()) {
668  // if we have more than one completely filled chunk, grow
669  sz += szincr;
670  } else {
671  // just one chunk left, try shrinking chunk size
672  sz -= szincr;
673  }
674  }
675  }
676  // clamp size to allowed range
677  if (sz > maxsz) sz = maxsz;
678  if (sz < minsz) sz = minsz;
679  return sz;
680  }
681 }
683 // static BidirMMapPipe members
684 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
685 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
686 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
687 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
689 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
690 {
691  if (!s_pagepool)
692  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
693  return *s_pagepool;
694 }
697 {
698  pthread_mutex_lock(&s_openpipesmutex);
699  while (!s_openpipes.empty()) {
700  BidirMMapPipe *p = s_openpipes.front();
701  pthread_mutex_unlock(&s_openpipesmutex);
702  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
703  p->doClose(true, true);
704  pthread_mutex_lock(&s_openpipesmutex);
705  }
706  pthread_mutex_unlock(&s_openpipesmutex);
707 }
710  m_pages(pagepool().pop())
711 {
712  // free pages again
714  if (!s_pagepoolrefcnt) {
715  delete s_pagepool;
716  s_pagepool = 0;
717  }
718 }
720 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
721  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
722  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
723  m_parentPid(::getpid())
725 {
727  assert(TotPages && 0 == (TotPages & 1) && TotPages <= 256);
728  int fds[4] = { -1, -1, -1, -1 };
729  int myerrno;
730  static bool firstcall = true;
731  if (useExceptions) m_flags |= exceptionsbit;
733  try {
734  if (firstcall) {
735  firstcall = false;
736  // register a cleanup handler to make sure all BidirMMapPipes are torn
737  // down, and child processes are sent a SIGTERM
738  if (0 != atexit(BidirMMapPipe::teardownall))
739  throw Exception("atexit", errno);
740  }
742  // build free lists
743  for (unsigned i = 1; i < TotPages; ++i)
744  m_pages[i - 1]->setNext(m_pages[i]);
745  m_pages[PagesPerEnd - 1]->setNext(0);
746  if (!useSocketpair) {
747  // create pipes
748  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
749  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
750  } else {
751  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
752  throw Exception("socketpair", errno);
753  }
754  // fork the child
755  pthread_mutex_lock(&s_openpipesmutex);
756  char c;
757  switch ((m_childPid = ::fork())) {
758  case -1: // error in fork()
759  myerrno = errno;
760  pthread_mutex_unlock(&s_openpipesmutex);
761  m_childPid = 0;
762  throw Exception("fork", myerrno);
763  case 0: // child
764  // put the ends in the right place
765  if (-1 != fds[2]) {
766  // pair of pipes
767  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
768  myerrno = errno;
769  pthread_mutex_unlock(&s_openpipesmutex);
770  throw Exception("close", myerrno);
771  }
772  fds[0] = fds[3] = -1;
773  m_outpipe = fds[1];
774  m_inpipe = fds[2];
775  } else {
776  // socket pair
777  if (-1 == ::close(fds[0])) {
778  myerrno = errno;
779  pthread_mutex_unlock(&s_openpipesmutex);
780  throw Exception("close", myerrno);
781  }
782  fds[0] = -1;
783  m_inpipe = m_outpipe = fds[1];
784  }
785  // close other pipes our parent may have open - we have no business
786  // reading from/writing to those...
787  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
788  s_openpipes.end() != it; ) {
789  BidirMMapPipe* p = *it;
790  it = s_openpipes.erase(it);
791  p->doClose(true, true);
792  }
793  pagepool().zap(m_pages);
794  s_pagepoolrefcnt = 0;
795  delete s_pagepool;
796  s_pagepool = 0;
797  s_openpipes.push_front(this);
798  pthread_mutex_unlock(&s_openpipesmutex);
799  // ok, put our pages on freelist
801  // handshare with other end (to make sure it's alive)...
802  c = 'C'; // ...hild
803  if (1 != xferraw(m_outpipe, &c, 1, ::write))
804  throw Exception("handshake: xferraw write", EPIPE);
805  if (1 != xferraw(m_inpipe, &c, 1, ::read))
806  throw Exception("handshake: xferraw read", EPIPE);
807  if ('P' != c) throw Exception("handshake", EPIPE);
808  break;
809  default: // parent
810  // put the ends in the right place
811  if (-1 != fds[2]) {
812  // pair of pipes
813  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
814  myerrno = errno;
815  pthread_mutex_unlock(&s_openpipesmutex);
816  throw Exception("close", myerrno);
817  }
818  fds[1] = fds[2] = -1;
819  m_outpipe = fds[3];
820  m_inpipe = fds[0];
821  } else {
822  // socketpair
823  if (-1 == ::close(fds[1])) {
824  myerrno = errno;
825  pthread_mutex_unlock(&s_openpipesmutex);
826  throw Exception("close", myerrno);
827  }
828  fds[1] = -1;
829  m_inpipe = m_outpipe = fds[0];
830  }
831  // put on list of open pipes (so we can kill child processes
832  // if things go wrong)
833  s_openpipes.push_front(this);
834  pthread_mutex_unlock(&s_openpipesmutex);
835  // ok, put our pages on freelist
836  m_freelist = m_pages[0u];
837  // handshare with other end (to make sure it's alive)...
838  c = 'P'; // ...arent
839  if (1 != xferraw(m_outpipe, &c, 1, ::write))
840  throw Exception("handshake: xferraw write", EPIPE);
841  if (1 != xferraw(m_inpipe, &c, 1, ::read))
842  throw Exception("handshake: xferraw read", EPIPE);
843  if ('C' != c) throw Exception("handshake", EPIPE);
844  break;
845  }
846  // mark file descriptors for close on exec (we do not want to leak the
847  // connection to anything we happen to exec)
848  int fdflags = 0;
849  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
850  throw Exception("fcntl", errno);
851  fdflags |= FD_CLOEXEC;
852  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
853  throw Exception("fcntl", errno);
854  if (m_inpipe != m_outpipe) {
855  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
856  throw Exception("fcntl", errno);
857  fdflags |= FD_CLOEXEC;
858  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
859  throw Exception("fcntl", errno);
860  }
861  // ok, finally, clear the failbit
862  m_flags &= ~failbit;
863  // all done
864  } catch (const BidirMMapPipe::Exception& e) {
865  if (0 != m_childPid) kill(m_childPid, SIGTERM);
866  for (int i = 0; i < 4; ++i)
867  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
868  {
869  // free resources associated with mmapped pages
871  }
872  if (!--s_pagepoolrefcnt) {
873  delete s_pagepool;
874  s_pagepool = 0;
875  }
876  throw e;
877  }
878 }
881 {
882  assert(!(m_flags & failbit));
883  return doClose(false);
884 }
886 int BidirMMapPipe::doClose(bool force, bool holdlock)
887 {
888  if (m_flags & failbit) return 0;
889  // flush data to be written
890  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
891  // shut down the write direction (no more writes from our side)
892  if (m_inpipe == m_outpipe) {
893  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
894  throw Exception("shutdown", errno);
895  m_outpipe = -1;
896  } else {
897  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
898  if (!force) throw Exception("close", errno);
899  m_outpipe = -1;
900  }
901  // shut down the write direction (no more writes from our side)
902  // drain anything the other end might still want to send
903  if (!force && -1 != m_inpipe) {
904  // **************** THIS IS EXTREMELY UGLY: ****************
905  // POLLHUP is not set reliably on pipe/socket shutdown on all
906  // platforms, unfortunately, so we poll for readability here until
907  // the other end closes, too
908  //
909  // the read loop below ensures that the other end sees the POLLIN that
910  // is set on shutdown instead, and goes ahead to close its end
911  //
912  // if we don't do this, and close straight away, the other end
913  // will catch a SIGPIPE or similar, and we don't want that
914  int err;
915  struct pollfd fds;
916  fds.fd = m_inpipe;
917 = POLLIN;
918  fds.revents = 0;
919  do {
920  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
921  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
922  if (fds.revents & POLLIN) {
923  char c;
924  if (1 > ::read(m_inpipe, &c, 1)) break;
925  }
926  }
927  } while (0 > err && EINTR == errno);
928  // ignore all other poll errors
929  }
930  // close read end
931  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
932  if (!force) throw Exception("close", errno);
933  m_inpipe = -1;
934  // unmap memory
935  try {
937  if (!--s_pagepoolrefcnt) {
938  delete s_pagepool;
939  s_pagepool = 0;
940  }
941  } catch (const std::exception& e) {
942  if (!force) throw e;
943  }
945  // wait for child process
946  int retVal = 0;
947  if (isParent()) {
948  int tmp;
949  do {
950  tmp = waitpid(m_childPid, &retVal, 0);
951  } while (-1 == tmp && EINTR == errno);
952  if (-1 == tmp)
953  if (!force) throw Exception("waitpid", errno);
954  m_childPid = 0;
955  }
956  // remove from list of open pipes
957  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
958  std::list<BidirMMapPipe*>::iterator it = std::find(
959  s_openpipes.begin(), s_openpipes.end(), this);
960  if (s_openpipes.end() != it) s_openpipes.erase(it);
961  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
962  m_flags |= failbit;
963  return retVal;
964 }
967 { doClose(false); }
970  int fd, void* addr, size_type len,
971  ssize_t (*xferfn)(int, void*, std::size_t))
972 {
973  size_type xferred = 0;
974  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
975  while (len) {
976  ssize_t tmp = xferfn(fd, buf, len);
977  if (tmp > 0) {
978  xferred += tmp;
979  len -= tmp;
980  buf += tmp;
981  continue;
982  } else if (0 == tmp) {
983  // check for end-of-file on pipe
984  break;
985  } else if (-1 == tmp) {
986  // ok some error occurred, so figure out if we want to retry of throw
987  switch (errno) {
988  default:
989  // if anything was transferred, return number of bytes
990  // transferred so far, we can start throwing on the next
991  // transfer...
992  if (xferred) return xferred;
993  // else throw
994  throw Exception("xferraw", errno);
995  case EAGAIN: // fallthrough intended
997  case EWOULDBLOCK: // fallthrough intended
998 #endif
999  std::cerr << " ERROR: In " << __func__ << " (" <<
1000  __FILE__ << ", line " << __LINE__ <<
1001  "): expect transfer to block!" << std::endl;
1002  case EINTR:
1003  break;
1004  }
1005  continue;
1006  } else {
1007  throw Exception("xferraw: unexpected return value from read/write",
1008  errno);
1009  }
1010  }
1011  return xferred;
1012 }
1015 {
1016  if (plist) {
1017  unsigned char pg = m_pages[plist];
1018  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1021  // ok, have to copy pages through pipe
1022  for (Page* p = plist; p; p = p->next()) {
1023  if (sizeof(Page) + p->size() !=
1024  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1025  ::write)) {
1026  throw Exception("sendpages: short write", EPIPE);
1027  }
1028  }
1029  }
1030  } else {
1031  throw Exception("sendpages: short write", EPIPE);
1032  }
1033  } else { assert(plist); }
1034 }
1037 {
1038  unsigned char pg;
1039  unsigned retVal = 0;
1040  Page *plisthead = 0, *plisttail = 0;
1041  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1042  plisthead = plisttail = m_pages[pg];
1043  // ok, have number of pages
1046  // ok, need to copy pages through pipe
1047  for (; plisttail; ++retVal) {
1048  Page* p = plisttail;
1049  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1050  ::read)) {
1051  plisttail = p->next();
1052  if (!p->size()) continue;
1053  // break in case of read error
1054  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1055  ::read)) break;
1056  }
1057  }
1058  } else {
1059  retVal = lenPageList(plisthead);
1060  }
1061  }
1062  // put list of pages we just received into correct lists (busy/free)
1063  if (plisthead) feedPageLists(plisthead);
1064  // ok, retVal contains the number of pages read, so put them on the
1065  // correct lists
1066  return retVal;
1067 }
1070 {
1071  struct pollfd fds;
1072  fds.fd = m_inpipe;
1073 = POLLIN;
1074  fds.revents = 0;
1075  unsigned retVal = 0;
1076  do {
1077  int rc = ::poll(&fds, 1, 0);
1078  if (0 > rc) {
1079  if (EINTR == errno) continue;
1080  break;
1081  }
1082  if (1 == retVal && fds.revents & POLLIN &&
1083  !(fds.revents & (POLLNVAL | POLLERR))) {
1084  // ok, we can read without blocking, so the other end has
1085  // something for us
1086  return recvpages();
1087  } else {
1088  break;
1089  }
1090  } while (true);
1091  return retVal;
1092 }
1095 {
1096  unsigned n = 0;
1097  for ( ; p; p = p->next()) ++n;
1098  return n;
1099 }
1102 {
1103  assert(plist);
1104  // get end of busy list
1105  Page *blend = m_busylist;
1106  while (blend && blend->next()) blend = blend->next();
1107  // ok, might have to send free pages to other end, and (if we do have to
1108  // send something to the other end) while we're at it, send any dirty
1109  // pages which are completely full, too
1110  Page *sendlisthead = 0, *sendlisttail = 0;
1111  // loop over plist
1112  while (plist) {
1113  Page* p = plist;
1114  plist = p->next();
1115  p->setNext(0);
1116  if (p->size()) {
1117  // busy page...
1118  p->pos() = 0;
1119  // put at end of busy list
1120  if (blend) blend->setNext(p);
1121  else m_busylist = p;
1122  blend = p;
1123  } else {
1124  // free page...
1125  // Very simple algorithm: once we're done with a page, we send it back
1126  // where it came from. If it's from our end, we put it on the free list, if
1127  // it's from the other end, we send it back.
1128  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1129  (isChild() && m_pages[p] < PagesPerEnd)) {
1130  // page "belongs" to other end
1131  if (!sendlisthead) sendlisthead = p;
1132  if (sendlisttail) sendlisttail->setNext(p);
1133  sendlisttail = p;
1134  } else {
1135  // add page to freelist
1136  p->setNext(m_freelist);
1137  m_freelist = p;
1138  }
1139  }
1140  }
1141  // check if we have to send stuff to the other end
1142  if (sendlisthead) {
1143  // go through our list of dirty pages, and see what we can
1144  // send along
1145  Page* dp;
1146  while ((dp = m_dirtylist) && dp->full()) {
1147  Page* p = dp;
1148  // move head of dirty list
1149  m_dirtylist = p->next();
1150  // queue for sending
1151  p->setNext(0);
1152  sendlisttail->setNext(p);
1153  sendlisttail = p;
1154  }
1155  // poll if the other end is still alive - this needs that we first
1156  // close the write pipe of the other end when the remote end of the
1157  // connection is shutting down in doClose; we'll see that because we
1158  // get a POLLHUP on our inpipe
1159  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1160  struct pollfd fds[2];
1161  fds[0].fd = m_outpipe;
1162  fds[0].events = fds[0].revents = 0;
1163  if (m_outpipe != m_inpipe) {
1164  fds[1].fd = m_inpipe;
1165  fds[1].events = fds[1].revents = 0;
1166  } else {
1167  fds[0].events |= POLLIN;
1168  }
1169  int retVal = 0;
1170  do {
1171  retVal = ::poll(fds, nfds, 0);
1172  if (0 > retVal && EINTR == errno)
1173  continue;
1174  break;
1175  } while (true);
1176  if (0 <= retVal) {
1177  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1178  if (m_outpipe != m_inpipe) {
1179  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1180  } else {
1181  if (ok && fds[0].revents & POLLIN) {
1182  unsigned ret = recvpages();
1183  if (!ret) ok = false;
1184  }
1185  }
1187  if (ok) sendpages(sendlisthead);
1188  // (if the pipe is dead already, we don't care that we leak the
1189  // contents of the pages on the send list here, so that is why
1190  // there's no else clause here)
1191  } else {
1192  throw Exception("feedPageLists: poll", errno);
1193  }
1194  }
1195 }
1198 {
1199  assert(p);
1200  assert(p == m_freelist);
1201  // remove from freelist
1202  m_freelist = p->next();
1203  p->setNext(0);
1204  // append to dirty list
1205  Page* dl = m_dirtylist;
1206  while (dl && dl->next()) dl = dl->next();
1207  if (dl) dl->setNext(p);
1208  else m_dirtylist = p;
1209 }
1212 {
1213  // queue any pages available for reading we can without blocking
1215  Page* p;
1216  // if there are no busy pages, try to get them from the other end,
1217  // block if we have to...
1218  while (!(p = m_busylist)) if (!recvpages()) return 0;
1219  return p;
1220 }
1223 {
1224  // queue any pages available for reading we can without blocking
1226  Page* p = m_dirtylist;
1227  // go to end of dirty list
1228  if (p) while (p->next()) p = p->next();
1229  if (!p || p->full()) {
1230  // need to append free page, so get one
1231  while (!(p = m_freelist)) if (!recvpages()) return 0;
1232  markPageDirty(p);
1233  }
1234  return p;
1235 }
1238 { return doFlush(true); }
1240 void BidirMMapPipe::doFlush(bool forcePartialPages)
1241 {
1242  assert(!(m_flags & failbit));
1243  // build a list of pages to flush
1244  Page *flushlisthead = 0, *flushlisttail = 0;
1245  while (m_dirtylist) {
1246  Page* p = m_dirtylist;
1247  if (!forcePartialPages && !p->full()) break;
1248  // remove dirty page from dirty list
1249  m_dirtylist = p->next();
1250  p->setNext(0);
1251  // and send it to other end
1252  if (!flushlisthead) flushlisthead = p;
1253  if (flushlisttail) flushlisttail->setNext(p);
1254  flushlisttail = p;
1255  }
1256  if (flushlisthead) sendpages(flushlisthead);
1257 }
1260 {
1261  assert(!(m_flags & failbit));
1262  // join busy and dirty lists
1263  {
1264  Page *l = m_busylist;
1265  while (l && l->next()) l = l->next();
1266  if (l) l->setNext(m_dirtylist);
1267  else m_busylist = m_dirtylist;
1268  }
1269  // empty busy and dirty pages
1270  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1271  // put them on the free list
1273  m_busylist = m_dirtylist = 0;
1274 }
1277 {
1278  // queue all pages waiting for consumption in the pipe before we give an
1279  // answer
1281  size_type retVal = 0;
1282  for (Page* p = m_busylist; p; p = p->next())
1283  retVal += p->size() - p->pos();
1284  return retVal;
1285 }
1288 {
1289  // queue all pages waiting for consumption in the pipe before we give an
1290  // answer
1292  // check if we could write to the pipe without blocking (we need to know
1293  // because we might need to check if flushing of dirty pages would block)
1294  bool couldwrite = false;
1295  {
1296  struct pollfd fds;
1297  fds.fd = m_outpipe;
1298 = POLLOUT;
1299  fds.revents = 0;
1300  int retVal = 0;
1301  do {
1302  retVal = ::poll(&fds, 1, 0);
1303  if (0 > retVal) {
1304  if (EINTR == errno) continue;
1305  throw Exception("bytesWritableNonBlocking: poll", errno);
1306  }
1307  if (1 == retVal && fds.revents & POLLOUT &&
1308  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1309  couldwrite = true;
1310  break;
1311  } while (true);
1312  }
1313  // ok, start counting bytes
1314  size_type retVal = 0;
1315  unsigned npages = 0;
1316  // go through the dirty list
1317  for (Page* p = m_dirtylist; p; p = p->next()) {
1318  ++npages;
1319  // if page only partially filled
1320  if (!p->full())
1321  retVal += p->free();
1322  if (npages >= FlushThresh && !couldwrite) break;
1323  }
1324  // go through the free list
1325  for (Page* p = m_freelist; p && (!m_dirtylist ||
1326  npages < FlushThresh || couldwrite); p = p->next()) {
1327  ++npages;
1328  retVal += Page::capacity();
1329  }
1330  return retVal;
1331 }
1334 {
1335  assert(!(m_flags & failbit));
1336  size_type nread = 0;
1337  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1338  try {
1339  while (sz) {
1340  // find next page to read from
1341  Page* p = busypage();
1342  if (!p) {
1343  m_flags |= eofbit;
1344  return nread;
1345  }
1346  unsigned char* pp = p->begin() + p->pos();
1347  size_type csz = std::min(size_type(p->remaining()), sz);
1348  std::copy(pp, pp + csz, ap);
1349  nread += csz;
1350  ap += csz;
1351  sz -= csz;
1352  p->pos() += csz;
1353  assert(p->size() >= p->pos());
1354  if (p->size() == p->pos()) {
1355  // if no unread data remains, page is free
1356  m_busylist = p->next();
1357  p->setNext(0);
1358  p->size() = 0;
1359  feedPageLists(p);
1360  }
1361  }
1362  } catch (const Exception& e) {
1363  m_flags |= rderrbit;
1364  if (m_flags & exceptionsbit) throw e;
1365  }
1366  return nread;
1367 }
1370 {
1371  assert(!(m_flags & failbit));
1372  size_type written = 0;
1373  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1374  try {
1375  while (sz) {
1376  // find next page to write to
1377  Page* p = dirtypage();
1378  if (!p) {
1379  m_flags |= eofbit;
1380  return written;
1381  }
1382  unsigned char* pp = p->begin() + p->size();
1383  size_type csz = std::min(size_type(p->free()), sz);
1384  std::copy(ap, ap + csz, pp);
1385  written += csz;
1386  ap += csz;
1387  p->size() += csz;
1388  sz -= csz;
1389  assert(p->capacity() >= p->size());
1390  if (p->full()) {
1391  // if page is full, see if we're above the flush threshold of
1392  // 3/4 of our pages
1394  doFlush(false);
1395  }
1396  }
1397  } catch (const Exception& e) {
1398  m_flags |= wrerrbit;
1399  if (m_flags & exceptionsbit) throw e;
1400  }
1401  return written;
1402 }
1405 {
1406  // go through pipes, and change flags where we already know without really
1407  // polling - stuff where we don't need poll to wait for its timeout in the
1408  // OS...
1409  bool canskiptimeout = false;
1410  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1411  std::vector<unsigned>::iterator mit = masks.begin();
1412  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1413  ++it, ++mit) {
1414  PollEntry& pe = *it;
1415  pe.revents = None;
1416  // null pipe pointer or closed pipe is invalid
1417  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1418  // check for error
1419  if (pe.pipe->bad()) pe.revents |= Error;
1420  // check for end of file
1421  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1422  // check if readable
1423  if ( & Readable) {
1424  *mit |= Readable;
1425  if (pe.pipe->m_busylist) pe.revents |= Readable;
1426  }
1427  // check if writable
1428  if ( & Writable) {
1429  *mit |= Writable;
1430  if (pe.pipe->m_freelist) {
1431  pe.revents |= Writable;
1432  } else {
1433  Page *dl = pe.pipe->m_dirtylist;
1434  while (dl && dl->next()) dl = dl->next();
1435  if (dl && dl->pos() < Page::capacity())
1436  pe.revents |= Writable;
1437  }
1438  }
1439  if (pe.revents) canskiptimeout = true;
1440  }
1441  // set up the data structures required for the poll syscall
1442  std::vector<pollfd> fds;
1443  fds.reserve(2 * pipes.size());
1444  std::map<int, PollEntry*> fds2pipes;
1445  for (PollVector::const_iterator it = pipes.begin();
1446  pipes.end() != it; ++it) {
1447  const PollEntry& pe = *it;
1448  struct pollfd tmp;
1449  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1450  const_cast<PollEntry*>(&pe)));
1451 = tmp.revents = 0;
1452  // we always poll for readability; this allows us to queue pages
1453  // early
1454 |= POLLIN;
1455  if (pe.pipe->m_outpipe != tmp.fd) {
1456  // ok, it's a pair of pipes
1457  fds.push_back(tmp);
1458  fds2pipes.insert(std::make_pair(
1459  unsigned(tmp.fd = pe.pipe->m_outpipe),
1460  const_cast<PollEntry*>(&pe)));
1461 = 0;
1463  }
1464  if ( & Writable) |= POLLOUT;
1465  fds.push_back(tmp);
1466  }
1467  // poll
1468  int retVal = 0;
1469  do {
1470  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1471  if (0 > retVal) {
1472  if (EINTR == errno) continue;
1473  throw Exception("poll", errno);
1474  }
1475  break;
1476  } while (true);
1477  // fds may have changed state, so update...
1478  for (std::vector<pollfd>::iterator it = fds.begin();
1479  fds.end() != it; ++it) {
1480  pollfd& fe = *it;
1481  //if (!fe.revents) continue;
1482  --retVal;
1483  PollEntry& pe = *fds2pipes[fe.fd];
1484 oncemore:
1485  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1486  pe.revents |= ReadInvalid;
1487  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1488  pe.revents |= WriteInvalid;
1489  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1490  pe.revents |= ReadError;
1491  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1492  pe.revents |= WriteError;
1493  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1494  pe.revents |= ReadEndOfFile;
1495  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1496  pe.revents |= WriteEndOfFile;
1497  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1498  !(fe.revents & (POLLNVAL | POLLERR))) {
1499  // ok, there is at least one page for us to receive from the
1500  // other end
1501  if (0 == pe.pipe->recvpages()) continue;
1502  // more pages there?
1503  do {
1504  int tmp = ::poll(&fe, 1, 0);
1505  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1506  if (0 > tmp) {
1507  if (EINTR == errno) continue;
1508  throw Exception("poll", errno);
1509  }
1510  break;
1511  } while (true);
1512  }
1513  if (pe.pipe->m_busylist) pe.revents |= Readable;
1514  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1515  if (pe.pipe->m_freelist) {
1516  pe.revents |= Writable;
1517  } else {
1518  Page *dl = pe.pipe->m_dirtylist;
1519  while (dl && dl->next()) dl = dl->next();
1520  if (dl && dl->pos() < Page::capacity())
1521  pe.revents |= Writable;
1522  }
1523  }
1524  }
1525  // apply correct masks, and count pipes with pending events
1526  int npipes = 0;
1527  mit = masks.begin();
1528  for (PollVector::iterator it = pipes.begin();
1529  pipes.end() != it; ++it, ++mit)
1530  if ((it->revents &= *mit)) ++npipes;
1531  return npipes;
1532 }
1535 {
1536  size_t sz = std::strlen(str);
1537  *this << sz;
1538  if (sz) write(str, sz);
1539  return *this;
1540 }
1543 {
1544  size_t sz = 0;
1545  *this >> sz;
1546  if (good() && !eof()) {
1547  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1548  if (!str) throw Exception("realloc", errno);
1549  if (sz) read(str, sz);
1550  str[sz] = 0;
1551  }
1552  return *this;
1553 }
1556 {
1557  size_t sz = str.size();
1558  *this << sz;
1559  write(, sz);
1560  return *this;
1561 }
1564 {
1565  str.clear();
1566  size_t sz = 0;
1567  *this >> sz;
1568  if (good() && !eof()) {
1569  str.reserve(sz);
1570  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1571  }
1572  return *this;
1573 }
1578 using namespace RooFit;
1580 int simplechild(BidirMMapPipe& pipe)
1581 {
1582  // child does an echo loop
1583  while (pipe.good() && !pipe.eof()) {
1584  // read a string
1585  std::string str;
1586  pipe >> str;
1587  if (!pipe) return -1;
1588  if (pipe.eof()) break;
1589  if (!str.empty()) {
1590  std::cout << "[CHILD] : read: " << str << std::endl;
1591  str = "... early in the morning?";
1592  }
1593  pipe << str << BidirMMapPipe::flush;
1594  // did our parent tell us to shut down?
1595  if (str.empty()) break;
1596  if (!pipe) return -1;
1597  if (pipe.eof()) break;
1598  std::cout << "[CHILD] : wrote: " << str << std::endl;
1599  }
1600  pipe.close();
1601  return 0;
1602 }
1604 #include <sstream>
1605 int randomchild(BidirMMapPipe& pipe)
1606 {
1607  // child sends out something at random intervals
1608  ::srand48(::getpid());
1609  {
1610  // wait for parent's go ahead signal
1611  std::string s;
1612  pipe >> s;
1613  }
1614  // no shutdown sequence needed on this side - we're producing the data,
1615  // and the parent can just read until we're done (when it'll get EOF)
1616  for (int i = 0; i < 5; ++i) {
1617  // sleep a random time between 0 and .9 seconds
1618  ::usleep(int(1e6 * ::drand48()));
1619  std::ostringstream buf;
1620  buf << "child pid " << ::getpid() << " sends message " << i;
1621  std::string str = buf.str();
1622  std::cout << "[CHILD] : " << str << std::endl;
1623  pipe << str << BidirMMapPipe::flush;
1624  if (!pipe) return -1;
1625  if (pipe.eof()) break;
1626  }
1627  // tell parent we're shutting down
1628  pipe << "" << BidirMMapPipe::flush;
1629  // wait for parent to acknowledge
1630  std::string s;
1631  pipe >> s;
1632  pipe.close();
1633  return 0;
1634 }
1636 int benchchildrtt(BidirMMapPipe& pipe)
1637 {
1638  // child does the equivalent of listening for pings and sending the
1639  // packet back
1640  char* str = 0;
1641  while (pipe && !pipe.eof()) {
1642  pipe >> str;
1643  if (!pipe) {
1644  std::free(str);
1645  pipe.close();
1646  return -1;
1647  }
1648  if (pipe.eof()) break;
1649  pipe << str << BidirMMapPipe::flush;
1650  // if we have just completed the shutdown handshake, we break here
1651  if (!std::strlen(str)) break;
1652  }
1653  std::free(str);
1654  pipe.close();
1655  return 0;
1656 }
1658 int benchchildsink(BidirMMapPipe& pipe)
1659 {
1660  // child behaves like a sink
1661  char* str = 0;
1662  while (pipe && !pipe.eof()) {
1663  pipe >> str;
1664  if (!std::strlen(str)) break;
1665  }
1666  pipe << "" << BidirMMapPipe::flush;
1667  std::free(str);
1668  pipe.close();
1669  return 0;
1670 }
1672 int benchchildsource(BidirMMapPipe& pipe)
1673 {
1674  // child behaves like a source
1675  char* str = 0;
1676  for (unsigned i = 0; i <= 24; ++i) {
1677  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1678  std::memset(str, '4', 1 << i);
1679  str[1 << i] = 0;
1680  for (unsigned j = 0; j < 1 << 7; ++j) {
1681  pipe << str;
1682  if (!pipe || pipe.eof()) {
1683  std::free(str);
1684  pipe.close();
1685  return -1;
1686  }
1687  }
1688  // tell parent we're done with this block size
1689  pipe << "" << BidirMMapPipe::flush;
1690  }
1691  // tell parent to shut down
1692  pipe << "" << BidirMMapPipe::flush;
1693  std::free(str);
1694  pipe.close();
1695  return 0;
1696 }
1698 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1699 {
1700  // create a pipe with the given child at the remote end
1701  BidirMMapPipe *p = new BidirMMapPipe();
1702  if (p->isChild()) {
1703  int retVal = childexec(*p);
1704  delete p;
1705  std::exit(retVal);
1706  }
1707  return p;
1708 }
1710 #include <sys/time.h>
1711 #include <iomanip>
1712 int main()
1713 {
1714  // simple echo loop test
1715  {
1716  std::cout << "[PARENT]: simple challenge-response test, "
1717  "one child:" << std::endl;
1718  BidirMMapPipe* pipe = spawnChild(simplechild);
1719  for (int i = 0; i < 5; ++i) {
1720  std::string str("What shall we do with a drunken sailor...");
1721  *pipe << str << BidirMMapPipe::flush;
1722  if (!*pipe) return -1;
1723  std::cout << "[PARENT]: wrote: " << str << std::endl;
1724  *pipe >> str;
1725  if (!*pipe) return -1;
1726  std::cout << "[PARENT]: read: " << str << std::endl;
1727  }
1728  // send shutdown string
1729  *pipe << "" << BidirMMapPipe::flush;
1730  // wait for shutdown handshake
1731  std::string s;
1732  *pipe >> s;
1733  int retVal = pipe->close();
1734  std::cout << "[PARENT]: exit status of child: " << retVal <<
1735  std::endl;
1736  if (retVal) return retVal;
1737  delete pipe;
1738  }
1739  // simple poll test - children send 5 results in random intervals
1740  {
1741  unsigned nch = 20;
1742  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1743  " children:" << std::endl;
1744  typedef BidirMMapPipe::PollEntry PollEntry;
1745  // poll data structure
1747  pipes.reserve(nch);
1748  // spawn children
1749  for (unsigned i = 0; i < nch; ++i) {
1750  std::cout << "[PARENT]: spawning child " << i << std::endl;
1751  pipes.push_back(PollEntry(spawnChild(randomchild),
1753  }
1754  // wake children up
1755  std::cout << "[PARENT]: waking up children" << std::endl;
1756  for (unsigned i = 0; i < nch; ++i)
1757  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1758  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1759  // while at least some children alive
1760  while (!pipes.empty()) {
1761  // poll, wait until status change (infinite timeout)
1762  int npipes = BidirMMapPipe::poll(pipes, -1);
1763  // scan for pipes with changed status
1764  for (std::vector<PollEntry>::iterator it = pipes.begin();
1765  npipes && pipes.end() != it; ) {
1766  if (!it->revents) {
1767  // unchanged, next one
1768  ++it;
1769  continue;
1770  }
1771  --npipes; // maybe we can stop early...
1772  // read from pipes which are readable
1773  if (it->revents & BidirMMapPipe::Readable) {
1774  std::string s;
1775  *(it->pipe) >> s;
1776  if (!s.empty()) {
1777  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1778  ": " << s << std::endl;
1779  ++it;
1780  continue;
1781  } else {
1782  // child is shutting down...
1783  *(it->pipe) << "" << BidirMMapPipe::flush;
1784  goto childcloses;
1785  }
1786  }
1787  // retire pipes with error or end-of-file condition
1788  if (it->revents & (BidirMMapPipe::Error |
1791  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1792  " revents" <<
1793  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1794  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1795  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1796  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1797  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1798  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1799  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1800  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1801  std::endl;
1802 childcloses:
1803  int retVal = it->pipe->close();
1804  std::cout << "[PARENT]: child exit status: " <<
1805  retVal << ", number of children still alive: " <<
1806  (pipes.size() - 1) << std::endl;
1807  if (retVal) return retVal;
1808  delete it->pipe;
1809  it = pipes.erase(it);
1810  continue;
1811  }
1812  }
1813  }
1814  }
1815  // little benchmark - round trip time
1816  {
1817  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1818  for (unsigned i = 0; i <= 24; ++i) {
1819  char *s = new char[1 + (1 << i)];
1820  std::memset(s, 'A', 1 << i);
1821  s[1 << i] = 0;
1822  const unsigned n = 1 << 7;
1823  double avg = 0., min = 1e42, max = -1e42;
1824  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1825  for (unsigned j = n; j--; ) {
1826  struct timeval t1;
1827  ::gettimeofday(&t1, 0);
1828  *pipe << s << BidirMMapPipe::flush;
1829  if (!*pipe || pipe->eof()) break;
1830  *pipe >> s;
1831  if (!*pipe || pipe->eof()) break;
1832  struct timeval t2;
1833  ::gettimeofday(&t2, 0);
1834  t2.tv_sec -= t1.tv_sec;
1835  t2.tv_usec -= t1.tv_usec;
1836  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1837  if (dt < min) min = dt;
1838  if (dt > max) max = dt;
1839  avg += dt;
1840  }
1841  // send a shutdown string
1842  *pipe << "" << BidirMMapPipe::flush;
1843  // get child's shutdown ok
1844  *pipe >> s;
1845  avg /= double(n);
1846  avg *= 1e6; min *= 1e6; max *= 1e6;
1847  int retVal = pipe->close();
1848  if (retVal) {
1849  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1850  return retVal;
1851  }
1852  delete pipe;
1853  // there is a factor 2 in the formula for the transfer rate below,
1854  // because we transfer data of twice the size of the block - once
1855  // to the child, and once for the return trip
1856  std::cout << "block size " << std::setw(9) << (1 << i) <<
1857  " avg " << std::setw(7) << avg << " us min " <<
1858  std::setw(7) << min << " us max " << std::setw(7) << max <<
1859  "us speed " << std::setw(9) <<
1860  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1861  " MB/s" << std::endl;
1862  delete[] s;
1863  }
1864  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1865  }
1866  // little benchmark - child as sink
1867  {
1868  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1869  for (unsigned i = 0; i <= 24; ++i) {
1870  char *s = new char[1 + (1 << i)];
1871  std::memset(s, 'A', 1 << i);
1872  s[1 << i] = 0;
1873  const unsigned n = 1 << 7;
1874  double avg = 0., min = 1e42, max = -1e42;
1875  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1876  for (unsigned j = n; j--; ) {
1877  struct timeval t1;
1878  ::gettimeofday(&t1, 0);
1879  // streaming mode - we do not flush here
1880  *pipe << s;
1881  if (!*pipe || pipe->eof()) break;
1882  struct timeval t2;
1883  ::gettimeofday(&t2, 0);
1884  t2.tv_sec -= t1.tv_sec;
1885  t2.tv_usec -= t1.tv_usec;
1886  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1887  if (dt < min) min = dt;
1888  if (dt > max) max = dt;
1889  avg += dt;
1890  }
1891  // send a shutdown string
1892  *pipe << "" << BidirMMapPipe::flush;
1893  // get child's shutdown ok
1894  *pipe >> s;
1895  avg /= double(n);
1896  avg *= 1e6; min *= 1e6; max *= 1e6;
1897  int retVal = pipe->close();
1898  if (retVal) {
1899  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1900  return retVal;
1901  }
1902  delete pipe;
1903  std::cout << "block size " << std::setw(9) << (1 << i) <<
1904  " avg " << std::setw(7) << avg << " us min " <<
1905  std::setw(7) << min << " us max " << std::setw(7) << max <<
1906  "us speed " << std::setw(9) <<
1907  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1908  " MB/s" << std::endl;
1909  delete[] s;
1910  }
1911  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1912  }
1913  // little benchmark - child as source
1914  {
1915  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1916  char *s = 0;
1917  double avg = 0., min = 1e42, max = -1e42;
1918  unsigned n = 0, bsz = 0;
1919  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1920  while (*pipe && !pipe->eof()) {
1921  struct timeval t1;
1922  ::gettimeofday(&t1, 0);
1923  // streaming mode - we do not flush here
1924  *pipe >> s;
1925  if (!*pipe || pipe->eof()) break;
1926  struct timeval t2;
1927  ::gettimeofday(&t2, 0);
1928  t2.tv_sec -= t1.tv_sec;
1929  t2.tv_usec -= t1.tv_usec;
1930  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1931  if (std::strlen(s)) {
1932  ++n;
1933  if (dt < min) min = dt;
1934  if (dt > max) max = dt;
1935  avg += dt;
1936  bsz = std::strlen(s);
1937  } else {
1938  if (!n) break;
1939  // next block size
1940  avg /= double(n);
1941  avg *= 1e6; min *= 1e6; max *= 1e6;
1943  std::cout << "block size " << std::setw(9) << bsz <<
1944  " avg " << std::setw(7) << avg << " us min " <<
1945  std::setw(7) << min << " us max " << std::setw(7) <<
1946  max << "us speed " << std::setw(9) <<
1947  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1948  " MB/s" << std::endl;
1949  n = 0;
1950  avg = 0.;
1951  min = 1e42;
1952  max = -1e42;
1953  }
1954  }
1955  int retVal = pipe->close();
1956  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1957  if (retVal) return retVal;
1958  delete pipe;
1959  std::free(s);
1960  }
1961  return 0;
1962 }
1964 #endif // _WIN32
1966 // vim: ft=cpp:sw=4:tw=78
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
bool eof() const
true if end-of-file
static Vc_ALWAYS_INLINE int_v min(const int_v &x, const int_v &y)
Definition: vector.h:433
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
return c
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
unsigned m_refcnt
reference counter
#define assert(cond)
Definition: unittest.h:542
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
Definition: TBuffer.cxx:28
pid_t m_childPid
pid of the child (zero if we're child)
TLatex * t1
Definition: textangle.C:20
pipe error read end
TFile * f
void markPageDirty(Page *p)
put on dirty pages list
unsigned nPagesPerGroup() const
return number of pages per page group
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
pages shared (child + parent)
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:87
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
pipe can be written to
Page * busypage()
get a busy page to read data from (may block)
if(pyself &&pyself!=Py_None)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
default constructor
int close()
flush buffers, close pipe
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static const char * what
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
tuple main
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
logical failure (e.g. pipe closed)
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
error reporting with exceptions
TLine * l
Definition: textangle.C:4
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
tuple free
TText * t2
Definition: rootenv.C:28
#define blend
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
type of mmap support found
Definition: BidirMMapPipe.h:47
void zap(Pages &p)
free all pages except for those pointed to by p
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
void fill()
Definition: utils.cpp:314
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
end of file reached
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Definition: vector.h:440
#define name(a, b)
Definition: linkTestLib0.cpp:5
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static unsigned long masks[]
Definition: gifencode.c:211
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
nothing special on this pipe
const Int_t n
Definition: legend1.C:16
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the page size of the system
Definition: BidirMMapPipe.h:85
pages per pipe end
unsigned pageno(Page *p) const
perform page to page number mapping