ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <iostream>
17 #include <algorithm>
18 #include <exception>
19 
20 #include <poll.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/wait.h>
30 #include <sys/socket.h>
31 
32 #include "BidirMMapPipe.h"
33 
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
36 
38 
39 /// namespace for implementation details of BidirMMapPipe
40 namespace BidirMMapPipe_impl {
41  /** @brief exception to throw if low-level OS calls go wrong
42  *
43  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44  * @date 2013-07-07
45  */
46  class BidirMMapPipeException : public std::exception
47  {
48  private:
49  enum {
50  s_sz = 256 ///< length of buffer
51  };
52  char m_buf[s_sz]; ///< buffer containing the error message
53 
54  /// for the POSIX version of strerror_r
55  static int dostrerror_r(int err, char* buf, std::size_t sz,
56  int (*f)(int, char*, std::size_t))
57  { return f(err, buf, sz); }
58  /// for the GNU version of strerror_r
59  static int dostrerror_r(int, char*, std::size_t,
60  char* (*f)(int, char*, std::size_t));
61  public:
62  /// constructor taking error code, hint on operation (msg)
63  BidirMMapPipeException(const char* msg, int err);
64  /// return a destcription of what went wrong
65  virtual const char* what() const throw() { return m_buf; }
66  };
67 
68  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69  {
70  std::size_t msgsz = std::strlen(msg);
71  if (msgsz) {
72  msgsz = std::min(msgsz, std::size_t(s_sz));
73  std::copy(msg, msg + msgsz, m_buf);
74  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76  }
77  if (msgsz < s_sz) {
78  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79  // have to sort it out with overloads
80  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81  }
82  m_buf[s_sz - 1] = 0; // enforce zero-termination
83  }
84 
85  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86  std::size_t sz, char* (*f)(int, char*, std::size_t))
87  {
88  buf[0] = 0;
89  char *tmp = f(err, buf, sz);
90  if (tmp && tmp != buf) {
91  std::strncpy(buf, tmp, sz);
92  buf[sz - 1] = 0;
93  if (std::strlen(tmp) > sz - 1) return ERANGE;
94  }
95  return 0;
96  }
97 
98  /** @brief class representing the header structure in an mmapped page
99  *
100  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101  * @date 2013-07-07
102  *
103  * contains a field to put pages into a linked list, a field for the size
104  * of the data being transmitted, and a field for the position until which
105  * the data has been read
106  */
107  class Page
108  {
109  private:
110  // use as small a data type as possible to maximise payload area
111  // of pages
112  short m_next; ///< next page in list (in pagesizes)
113  unsigned short m_size; ///< size of payload (in bytes)
114  unsigned short m_pos; ///< index of next byte in payload area
115  /// copy construction forbidden
116  Page(const Page&) {}
117  /// assertignment forbidden
118  Page& operator=(const Page&)
119  { return *reinterpret_cast<Page*>(0); }
120  public:
121  /// constructor
122  Page() : m_next(0), m_size(0), m_pos(0)
123  {
124  // check that short is big enough - must be done at runtime
125  // because the page size is not known until runtime
128  }
129  /// set pointer to next page
130  void setNext(const Page* p);
131  /// return pointer to next page
132  Page* next() const;
133  /// return reference to size field
134  unsigned short& size() { return m_size; }
135  /// return size (of payload data)
136  unsigned size() const { return m_size; }
137  /// return reference to position field
138  unsigned short& pos() { return m_pos; }
139  /// return position
140  unsigned pos() const { return m_pos; }
141  /// return pointer to first byte in payload data area of page
142  inline unsigned char* begin() const
143  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
144  + sizeof(Page); }
145  /// return pointer to first byte in payload data area of page
146  inline unsigned char* end() const
147  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148  + PageChunk::pagesize(); }
149  /// return the capacity of the page
150  static unsigned capacity()
151  { return PageChunk::pagesize() - sizeof(Page); }
152  /// true if page empty
153  bool empty() const { return !m_size; }
154  /// true if page partially filled
155  bool filled() const { return !empty(); }
156  /// free space left (to be written to)
157  unsigned free() const { return capacity() - m_size; }
158  /// bytes remaining to be read
159  unsigned remaining() const { return m_size - m_pos; }
160  /// true if page completely full
161  bool full() const { return !free(); }
162  };
163 
164  void Page::setNext(const Page* p)
165  {
166  if (!p) {
167  m_next = 0;
168  } else {
169  const char* p1 = reinterpret_cast<char*>(this);
170  const char* p2 = reinterpret_cast<const char*>(p);
171  std::ptrdiff_t tmp = p2 - p1;
172  // difference must be divisible by page size
173  assert(!(tmp % PageChunk::pagesize()));
174  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
175  m_next = tmp;
176  // no truncation when saving in a short
177  assert(m_next == tmp);
178  // final check: next() must return p
179  assert(next() == p);
180  }
181  }
182 
183  Page* Page::next() const
184  {
185  if (!m_next) return 0;
186  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
187  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
188  return reinterpret_cast<Page*>(ptmp);
189  }
190 
191  /** @brief class representing a page pool
192  *
193  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
194  * @date 2013-07-24
195  *
196  * pool of mmapped pages (on systems which support it, on all others, the
197  * functionality is emulated with dynamically allocated memory)
198  *
199  * in most operating systems there is a limit to how many mappings any one
200  * process is allowed to request; for this reason, we mmap a relatively
201  * large amount up front, and then carve off little pieces as we need them
202  */
203  class PagePool {
204  private:
205  /// convenience typedef
206  typedef BidirMMapPipeException Exception;
207 
208  enum {
209  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
210  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
211  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
212  };
213  /// a chunk of memory in the pool
214  typedef BidirMMapPipe_impl::PageChunk Chunk;
215  /// list of chunks
216  typedef std::list<Chunk*> ChunkList;
217 
218  friend class BidirMMapPipe_impl::PageChunk;
219  public:
220  /// convenience typedef
222  /// constructor
223  PagePool(unsigned nPagesPerGroup);
224  /// destructor
225  ~PagePool();
226  /// pop a free element out of the pool
227  Pages pop();
228 
229  /// return page size of the system
230  static unsigned pagesize() { return PageChunk::pagesize(); }
231  /// return variety of mmap supported on the system
232  static MMapVariety mmapVariety()
233  { return PageChunk::mmapVariety(); }
234 
235  /// return number of pages per group (ie. as returned by pop())
236  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
237 
238  /// zap the pool (unmap all but Pages p)
239  void zap(Pages& p);
240 
241  private:
242  /// list of chunks used by the pool
243  ChunkList m_chunks;
244  /// list of chunks used by the pool which are not full
245  ChunkList m_freelist;
246  /// chunk size map (histogram of chunk sizes)
247  unsigned m_szmap[(maxsz - minsz) / szincr];
248  /// current chunk size
249  int m_cursz;
250  /// page group size
251  unsigned m_nPgPerGrp;
252 
253  /// adjust _cursz to current largest block
254  void updateCurSz(int sz, int incr);
255  /// find size of next chunk to allocate (in a hopefully smart way)
256  int nextChunkSz() const;
257  /// release a chunk
258  void putOnFreeList(Chunk* chunk);
259  /// release a chunk
260  void release(Chunk* chunk);
261  };
262 
263  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
264  m_pimpl(new impl)
265  {
266  assert(npg < 256);
267  m_pimpl->m_parent = parent;
268  m_pimpl->m_pages = pages;
269  m_pimpl->m_refcnt = 1;
270  m_pimpl->m_npages = npg;
271  /// initialise pages
272  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
273  }
274 
277 
279  {
280  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
281  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
282  delete m_pimpl;
283  }
284  }
285 
286  Pages::Pages(const Pages& other) :
287  m_pimpl(other.m_pimpl)
288  { ++(m_pimpl->m_refcnt); }
289 
290  Pages& Pages::operator=(const Pages& other)
291  {
292  if (&other == this) return *this;
293  if (--(m_pimpl->m_refcnt)) {
294  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
295  delete m_pimpl;
296  }
297  m_pimpl = other.m_pimpl;
298  ++(m_pimpl->m_refcnt);
299  return *this;
300  }
301 
302  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
303 
304  Page* Pages::page(unsigned pgno) const
305  {
306  assert(pgno < m_pimpl->m_npages);
307  unsigned char* pptr =
308  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
309  pptr += pgno * pagesize();
310  return reinterpret_cast<Page*>(pptr);
311  }
312 
313  unsigned Pages::pageno(Page* p) const
314  {
315  const unsigned char* pptr =
316  reinterpret_cast<const unsigned char*>(p);
317  const unsigned char* bptr =
318  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
319  assert(0 == ((pptr - bptr) % pagesize()));
320  const unsigned nr = (pptr - bptr) / pagesize();
321  assert(nr < m_pimpl->m_npages);
322  return nr;
323  }
324 
326  {
327  // find out page size of system
328  long pgsz = sysconf(_SC_PAGESIZE);
329  if (-1 == pgsz) throw Exception("sysconf", errno);
330  if (pgsz > 512 && pgsz > long(sizeof(Page)))
331  return pgsz;
332 
333  // in case of failure or implausible value, use a safe default: 4k
334  // page size, and do not try to mmap
335  s_mmapworks = Copy;
336  return 1 << 12;
337  }
338 
339  PageChunk::PageChunk(PagePool* parent,
340  unsigned length, unsigned nPgPerGroup) :
341  m_begin(dommap(length)),
342  m_end(reinterpret_cast<void*>(
343  reinterpret_cast<unsigned char*>(m_begin) + length)),
344  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
345  {
346  // ok, push groups of pages onto freelist here
347  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
348  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
349  while (p < pend) {
350  m_freelist.push_back(reinterpret_cast<void*>(p));
351  p += nPgPerGroup * PagePool::pagesize();
352  }
353  }
354 
356  {
357  if (m_parent) assert(empty());
358  if (m_begin) domunmap(m_begin, len());
359  }
360 
361  bool PageChunk::contains(const Pages& p) const
362  { return p.m_pimpl->m_parent == this; }
363 
365  {
366  assert(!m_freelist.empty());
367  void* p = m_freelist.front();
368  m_freelist.pop_front();
369  ++m_nUsedGrp;
370  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
371  }
372 
373  void PageChunk::push(const Pages& p)
374  {
375  assert(contains(p));
376  bool wasempty = m_freelist.empty();
377  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
378  --m_nUsedGrp;
379  if (m_parent) {
380  // notify parent if we need to be put on the free list again
381  if (wasempty) m_parent->putOnFreeList(this);
382  // notify parent if we're empty
383  if (empty()) return m_parent->release(this);
384  }
385  }
386 
387  void* PageChunk::dommap(unsigned len)
388  {
389  assert(len && 0 == (len % s_pagesize));
390  // ok, the idea here is to try the different methods of mmapping, and
391  // choose the first one that works. we have four flavours:
392  // 1 - anonymous mmap (best)
393  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
394  // bit more tedious to set up, since you need to open/close a
395  // device file)
396  // 3 - mmap of a temporary file (very tedious to set up - need to
397  // create a temporary file, delete it, make the underlying storage
398  // large enough, then mmap the fd and close it)
399  // 4 - if all those fail, we malloc the buffers, and copy the data
400  // through the OS (then we're no better than normal pipes)
401  static bool msgprinted = false;
402  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
403 #if defined(MAP_ANONYMOUS)
404 #undef MYANONFLAG
405 #define MYANONFLAG MAP_ANONYMOUS
406 #elif defined(MAP_ANON)
407 #undef MYANONFLAG
408 #define MYANONFLAG MAP_ANON
409 #else
410 #undef MYANONFLAG
411 #endif
412 #ifdef MYANONFLAG
413  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
414  MYANONFLAG | MAP_SHARED, -1, 0);
415  if (MAP_FAILED == retVal) {
416  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
417  } else {
420  if (!msgprinted) {
421  std::cerr << " INFO: In " << __func__ << " (" <<
422  __FILE__ << ", line " << __LINE__ <<
423  "): anonymous mmapping works, excellent!" <<
424  std::endl;
425  msgprinted = true;
426  }
427  return retVal;
428  }
429 #endif
430 #undef MYANONFLAG
431  }
432  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
433  // ok, no anonymous mappings supported directly, so try to map
434  // /dev/zero which has much the same effect on many systems
435  int fd = ::open("/dev/zero", O_RDWR);
436  if (-1 == fd)
437  throw Exception("open /dev/zero", errno);
438  void* retVal = ::mmap(0, len,
439  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
440  if (MAP_FAILED == retVal) {
441  int errsv = errno;
442  ::close(fd);
443  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
444  } else {
447  }
448  if (-1 == ::close(fd))
449  throw Exception("close /dev/zero", errno);
450  if (!msgprinted) {
451  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
452  ", line " << __LINE__ << "): mmapping /dev/zero works, "
453  "very good!" << std::endl;
454  msgprinted = true;
455  }
456  return retVal;
457  }
458  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
459  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
460  int fd;
461  // open temp file
462  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
463  // remove it, but keep fd open
464  if (-1 == ::unlink(name)) {
465  int errsv = errno;
466  ::close(fd);
467  throw Exception("unlink", errsv);
468  }
469  // make it the right size: lseek
470  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
471  int errsv = errno;
472  ::close(fd);
473  throw Exception("lseek", errsv);
474  }
475  // make it the right size: write a byte
476  if (1 != ::write(fd, name, 1)) {
477  int errsv = errno;
478  ::close(fd);
479  throw Exception("write", errsv);
480  }
481  // do mmap
482  void* retVal = ::mmap(0, len,
483  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
484  if (MAP_FAILED == retVal) {
485  int errsv = errno;
486  ::close(fd);
487  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
488  } else {
491  }
492  if (-1 == ::close(fd)) {
493  int errsv = errno;
494  ::munmap(retVal, len);
495  throw Exception("close", errsv);
496  }
497  if (!msgprinted) {
498  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
499  ", line " << __LINE__ << "): mmapping temporary files "
500  "works, good!" << std::endl;
501  msgprinted = true;
502  }
503  return retVal;
504  }
505  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
506  // fallback solution: mmap does not work on this OS (or does not
507  // work for what we want to use it), so use a normal buffer of
508  // memory instead, and collect data in that buffer - this needs an
509  // additional write/read to/from the pipe(s), but there you go...
510  if (!msgprinted) {
511  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
512  ", line " << __LINE__ << "): anonymous mmapping of "
513  "shared buffers failed, falling back to read/write on "
514  " pipes!" << std::endl;
515  msgprinted = true;
516  }
517  s_mmapworks = Copy;
518  void* retVal = std::malloc(len);
519  if (!retVal) throw Exception("malloc", errno);
520  return retVal;
521  }
522  // should never get here
523  assert(false);
524  return 0;
525  }
526 
527  void PageChunk::domunmap(void* addr, unsigned len)
528  {
529  assert(len && 0 == (len % s_pagesize));
530  if (addr) {
532  if (Copy != s_mmapworks) {
533  if (-1 == ::munmap(addr, len))
534  throw Exception("munmap", errno);
535  } else {
536  std::free(addr);
537  }
538  }
539  }
540 
542  {
543  // try to mprotect the other bits of the pool with no access...
544  // we'd really like a version of mremap here that can unmap all the
545  // other pages in the chunk, but that does not exist, so we protect
546  // the other pages in this chunk such that they may neither be read,
547  // written nor executed, only the pages we're interested in for
548  // communications stay readable and writable
549  //
550  // if an OS does not support changing the protection of a part of an
551  // mmapped area, the mprotect calls below should just fail and not
552  // change any protection, so we're a little less safe against
553  // corruption, but everything should still work
554  if (Copy != s_mmapworks) {
555  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
556  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
557  unsigned char* p2 = p1 + p.npages() * pagesize();
558  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
559  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
560  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
561  }
562  m_parent = 0;
563  m_freelist.clear();
564  m_nUsedGrp = 1;
565  p.m_pimpl->m_parent = 0;
566  m_begin = m_end = 0;
567  // commit suicide
568  delete this;
569  }
570 
571  PagePool::PagePool(unsigned nPgPerGroup) :
572  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
573  { std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0); }
574 
575  PagePool::~PagePool()
576  {
577  m_freelist.clear();
578  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
579  delete *it;
580  m_chunks.clear();
581  }
582 
583  void PagePool::zap(Pages& p)
584  {
585  // unmap all pages but those pointed to by p
586  m_freelist.clear();
587  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
588  if ((*it)->contains(p)) {
589  (*it)->zap(p);
590  } else {
591  delete *it;
592  }
593  }
594  m_chunks.clear();
595  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
596  m_cursz = minsz;
597  }
598 
599  Pages PagePool::pop()
600  {
601  if (m_freelist.empty()) {
602  // allocate and register new chunk and put it on the freelist
603  const int sz = nextChunkSz();
604  Chunk *c = new Chunk(this,
605  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
606  m_chunks.push_front(c);
607  m_freelist.push_back(c);
608  updateCurSz(sz, +1);
609  }
610  // get free element from first chunk on _freelist
611  Chunk* c = m_freelist.front();
612  Pages p(c->pop());
613  // full chunks are removed from _freelist
614  if (c->full()) m_freelist.pop_front();
615  return p;
616  }
617 
618  void PagePool::release(PageChunk* chunk)
619  {
620  assert(chunk->empty());
621  // find chunk on freelist and remove
622  ChunkList::iterator it = std::find(
623  m_freelist.begin(), m_freelist.end(), chunk);
624  if (m_freelist.end() == it)
625  throw Exception("PagePool::release(PageChunk*)", EINVAL);
626  m_freelist.erase(it);
627  // find chunk in m_chunks and remove
628  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
629  if (m_chunks.end() == it)
630  throw Exception("PagePool::release(PageChunk*)", EINVAL);
631  m_chunks.erase(it);
632  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
633  delete chunk;
634  updateCurSz(sz, -1);
635  }
636 
637  void PagePool::putOnFreeList(PageChunk* chunk)
638  {
639  assert(!chunk->full());
640  m_freelist.push_back(chunk);
641  }
642 
643  void PagePool::updateCurSz(int sz, int incr)
644  {
645  m_szmap[(sz - minsz) / szincr] += incr;
646  m_cursz = minsz;
647  for (int i = (maxsz - minsz) / szincr; i--; ) {
648  if (m_szmap[i]) {
649  m_cursz += i * szincr;
650  break;
651  }
652  }
653  }
654 
655  int PagePool::nextChunkSz() const
656  {
657  // no chunks with space available, figure out chunk size
658  int sz = m_cursz;
659  if (m_chunks.empty()) {
660  // if we start allocating chunks, we start from minsz
661  sz = minsz;
662  } else {
663  if (minsz >= sz) {
664  // minimal sized chunks are always grown
665  sz = minsz + szincr;
666  } else {
667  if (1 != m_chunks.size()) {
668  // if we have more than one completely filled chunk, grow
669  sz += szincr;
670  } else {
671  // just one chunk left, try shrinking chunk size
672  sz -= szincr;
673  }
674  }
675  }
676  // clamp size to allowed range
677  if (sz > maxsz) sz = maxsz;
678  if (sz < minsz) sz = minsz;
679  return sz;
680  }
681 }
682 
683 // static BidirMMapPipe members
684 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
685 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
686 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
687 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
688 
689 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
690 {
691  if (!s_pagepool)
692  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
693  return *s_pagepool;
694 }
695 
697 {
698  pthread_mutex_lock(&s_openpipesmutex);
699  while (!s_openpipes.empty()) {
700  BidirMMapPipe *p = s_openpipes.front();
701  pthread_mutex_unlock(&s_openpipesmutex);
702  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
703  p->doClose(true, true);
704  pthread_mutex_lock(&s_openpipesmutex);
705  }
706  pthread_mutex_unlock(&s_openpipesmutex);
707 }
708 
710  m_pages(pagepool().pop())
711 {
712  // free pages again
714  if (!s_pagepoolrefcnt) {
715  delete s_pagepool;
716  s_pagepool = 0;
717  }
718 }
719 
720 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
721  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
722  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
723  m_parentPid(::getpid())
724 
725 {
727  assert(TotPages && 0 == (TotPages & 1) && TotPages <= 256);
728  int fds[4] = { -1, -1, -1, -1 };
729  int myerrno;
730  static bool firstcall = true;
731  if (useExceptions) m_flags |= exceptionsbit;
732 
733  try {
734  if (firstcall) {
735  firstcall = false;
736  // register a cleanup handler to make sure all BidirMMapPipes are torn
737  // down, and child processes are sent a SIGTERM
738  if (0 != atexit(BidirMMapPipe::teardownall))
739  throw Exception("atexit", errno);
740  }
741 
742  // build free lists
743  for (unsigned i = 1; i < TotPages; ++i)
744  m_pages[i - 1]->setNext(m_pages[i]);
745  m_pages[PagesPerEnd - 1]->setNext(0);
746  if (!useSocketpair) {
747  // create pipes
748  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
749  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
750  } else {
751  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
752  throw Exception("socketpair", errno);
753  }
754  // fork the child
755  pthread_mutex_lock(&s_openpipesmutex);
756  char c;
757  switch ((m_childPid = ::fork())) {
758  case -1: // error in fork()
759  myerrno = errno;
760  pthread_mutex_unlock(&s_openpipesmutex);
761  m_childPid = 0;
762  throw Exception("fork", myerrno);
763  case 0: // child
764  // put the ends in the right place
765  if (-1 != fds[2]) {
766  // pair of pipes
767  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
768  myerrno = errno;
769  pthread_mutex_unlock(&s_openpipesmutex);
770  throw Exception("close", myerrno);
771  }
772  fds[0] = fds[3] = -1;
773  m_outpipe = fds[1];
774  m_inpipe = fds[2];
775  } else {
776  // socket pair
777  if (-1 == ::close(fds[0])) {
778  myerrno = errno;
779  pthread_mutex_unlock(&s_openpipesmutex);
780  throw Exception("close", myerrno);
781  }
782  fds[0] = -1;
783  m_inpipe = m_outpipe = fds[1];
784  }
785  // close other pipes our parent may have open - we have no business
786  // reading from/writing to those...
787  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
788  s_openpipes.end() != it; ) {
789  BidirMMapPipe* p = *it;
790  it = s_openpipes.erase(it);
791  p->doClose(true, true);
792  }
793  pagepool().zap(m_pages);
794  s_pagepoolrefcnt = 0;
795  delete s_pagepool;
796  s_pagepool = 0;
797  s_openpipes.push_front(this);
798  pthread_mutex_unlock(&s_openpipesmutex);
799  // ok, put our pages on freelist
801  // handshare with other end (to make sure it's alive)...
802  c = 'C'; // ...hild
803  if (1 != xferraw(m_outpipe, &c, 1, ::write))
804  throw Exception("handshake: xferraw write", EPIPE);
805  if (1 != xferraw(m_inpipe, &c, 1, ::read))
806  throw Exception("handshake: xferraw read", EPIPE);
807  if ('P' != c) throw Exception("handshake", EPIPE);
808  break;
809  default: // parent
810  // put the ends in the right place
811  if (-1 != fds[2]) {
812  // pair of pipes
813  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
814  myerrno = errno;
815  pthread_mutex_unlock(&s_openpipesmutex);
816  throw Exception("close", myerrno);
817  }
818  fds[1] = fds[2] = -1;
819  m_outpipe = fds[3];
820  m_inpipe = fds[0];
821  } else {
822  // socketpair
823  if (-1 == ::close(fds[1])) {
824  myerrno = errno;
825  pthread_mutex_unlock(&s_openpipesmutex);
826  throw Exception("close", myerrno);
827  }
828  fds[1] = -1;
829  m_inpipe = m_outpipe = fds[0];
830  }
831  // put on list of open pipes (so we can kill child processes
832  // if things go wrong)
833  s_openpipes.push_front(this);
834  pthread_mutex_unlock(&s_openpipesmutex);
835  // ok, put our pages on freelist
836  m_freelist = m_pages[0u];
837  // handshare with other end (to make sure it's alive)...
838  c = 'P'; // ...arent
839  if (1 != xferraw(m_outpipe, &c, 1, ::write))
840  throw Exception("handshake: xferraw write", EPIPE);
841  if (1 != xferraw(m_inpipe, &c, 1, ::read))
842  throw Exception("handshake: xferraw read", EPIPE);
843  if ('C' != c) throw Exception("handshake", EPIPE);
844  break;
845  }
846  // mark file descriptors for close on exec (we do not want to leak the
847  // connection to anything we happen to exec)
848  int fdflags = 0;
849  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
850  throw Exception("fcntl", errno);
851  fdflags |= FD_CLOEXEC;
852  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
853  throw Exception("fcntl", errno);
854  if (m_inpipe != m_outpipe) {
855  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
856  throw Exception("fcntl", errno);
857  fdflags |= FD_CLOEXEC;
858  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
859  throw Exception("fcntl", errno);
860  }
861  // ok, finally, clear the failbit
862  m_flags &= ~failbit;
863  // all done
864  } catch (const BidirMMapPipe::Exception& e) {
865  if (0 != m_childPid) kill(m_childPid, SIGTERM);
866  for (int i = 0; i < 4; ++i)
867  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
868  {
869  // free resources associated with mmapped pages
871  }
872  if (!--s_pagepoolrefcnt) {
873  delete s_pagepool;
874  s_pagepool = 0;
875  }
876  throw e;
877  }
878 }
879 
881 {
882  assert(!(m_flags & failbit));
883  return doClose(false);
884 }
885 
886 int BidirMMapPipe::doClose(bool force, bool holdlock)
887 {
888  if (m_flags & failbit) return 0;
889  // flush data to be written
890  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
891  // shut down the write direction (no more writes from our side)
892  if (m_inpipe == m_outpipe) {
893  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
894  throw Exception("shutdown", errno);
895  m_outpipe = -1;
896  } else {
897  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
898  if (!force) throw Exception("close", errno);
899  m_outpipe = -1;
900  }
901  // shut down the write direction (no more writes from our side)
902  // drain anything the other end might still want to send
903  if (!force && -1 != m_inpipe) {
904  // **************** THIS IS EXTREMELY UGLY: ****************
905  // POLLHUP is not set reliably on pipe/socket shutdown on all
906  // platforms, unfortunately, so we poll for readability here until
907  // the other end closes, too
908  //
909  // the read loop below ensures that the other end sees the POLLIN that
910  // is set on shutdown instead, and goes ahead to close its end
911  //
912  // if we don't do this, and close straight away, the other end
913  // will catch a SIGPIPE or similar, and we don't want that
914  int err;
915  struct pollfd fds;
916  fds.fd = m_inpipe;
917  fds.events = POLLIN;
918  fds.revents = 0;
919  do {
920  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
921  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
922  if (fds.revents & POLLIN) {
923  char c;
924  if (1 > ::read(m_inpipe, &c, 1)) break;
925  }
926  }
927  } while (0 > err && EINTR == errno);
928  // ignore all other poll errors
929  }
930  // close read end
931  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
932  if (!force) throw Exception("close", errno);
933  m_inpipe = -1;
934  // unmap memory
935  try {
937  if (!--s_pagepoolrefcnt) {
938  delete s_pagepool;
939  s_pagepool = 0;
940  }
941  } catch (const std::exception& e) {
942  if (!force) throw e;
943  }
945  // wait for child process
946  int retVal = 0;
947  if (isParent()) {
948  int tmp;
949  do {
950  tmp = waitpid(m_childPid, &retVal, 0);
951  } while (-1 == tmp && EINTR == errno);
952  if (-1 == tmp)
953  if (!force) throw Exception("waitpid", errno);
954  m_childPid = 0;
955  }
956  // remove from list of open pipes
957  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
958  std::list<BidirMMapPipe*>::iterator it = std::find(
959  s_openpipes.begin(), s_openpipes.end(), this);
960  if (s_openpipes.end() != it) s_openpipes.erase(it);
961  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
962  m_flags |= failbit;
963  return retVal;
964 }
965 
967 { doClose(false); }
968 
970  int fd, void* addr, size_type len,
971  ssize_t (*xferfn)(int, void*, std::size_t))
972 {
973  size_type xferred = 0;
974  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
975  while (len) {
976  ssize_t tmp = xferfn(fd, buf, len);
977  if (tmp > 0) {
978  xferred += tmp;
979  len -= tmp;
980  buf += tmp;
981  continue;
982  } else if (0 == tmp) {
983  // check for end-of-file on pipe
984  break;
985  } else if (-1 == tmp) {
986  // ok some error occurred, so figure out if we want to retry of throw
987  switch (errno) {
988  default:
989  // if anything was transferred, return number of bytes
990  // transferred so far, we can start throwing on the next
991  // transfer...
992  if (xferred) return xferred;
993  // else throw
994  throw Exception("xferraw", errno);
995  case EAGAIN: // fallthrough intended
996 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
997  case EWOULDBLOCK: // fallthrough intended
998 #endif
999  std::cerr << " ERROR: In " << __func__ << " (" <<
1000  __FILE__ << ", line " << __LINE__ <<
1001  "): expect transfer to block!" << std::endl;
1002  case EINTR:
1003  break;
1004  }
1005  continue;
1006  } else {
1007  throw Exception("xferraw: unexpected return value from read/write",
1008  errno);
1009  }
1010  }
1011  return xferred;
1012 }
1013 
1015 {
1016  if (plist) {
1017  unsigned char pg = m_pages[plist];
1018  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1021  // ok, have to copy pages through pipe
1022  for (Page* p = plist; p; p = p->next()) {
1023  if (sizeof(Page) + p->size() !=
1024  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1025  ::write)) {
1026  throw Exception("sendpages: short write", EPIPE);
1027  }
1028  }
1029  }
1030  } else {
1031  throw Exception("sendpages: short write", EPIPE);
1032  }
1033  } else { assert(plist); }
1034 }
1035 
1037 {
1038  unsigned char pg;
1039  unsigned retVal = 0;
1040  Page *plisthead = 0, *plisttail = 0;
1041  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1042  plisthead = plisttail = m_pages[pg];
1043  // ok, have number of pages
1046  // ok, need to copy pages through pipe
1047  for (; plisttail; ++retVal) {
1048  Page* p = plisttail;
1049  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1050  ::read)) {
1051  plisttail = p->next();
1052  if (!p->size()) continue;
1053  // break in case of read error
1054  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1055  ::read)) break;
1056  }
1057  }
1058  } else {
1059  retVal = lenPageList(plisthead);
1060  }
1061  }
1062  // put list of pages we just received into correct lists (busy/free)
1063  if (plisthead) feedPageLists(plisthead);
1064  // ok, retVal contains the number of pages read, so put them on the
1065  // correct lists
1066  return retVal;
1067 }
1068 
1070 {
1071  struct pollfd fds;
1072  fds.fd = m_inpipe;
1073  fds.events = POLLIN;
1074  fds.revents = 0;
1075  unsigned retVal = 0;
1076  do {
1077  int rc = ::poll(&fds, 1, 0);
1078  if (0 > rc) {
1079  if (EINTR == errno) continue;
1080  break;
1081  }
1082  if (1 == retVal && fds.revents & POLLIN &&
1083  !(fds.revents & (POLLNVAL | POLLERR))) {
1084  // ok, we can read without blocking, so the other end has
1085  // something for us
1086  return recvpages();
1087  } else {
1088  break;
1089  }
1090  } while (true);
1091  return retVal;
1092 }
1093 
1095 {
1096  unsigned n = 0;
1097  for ( ; p; p = p->next()) ++n;
1098  return n;
1099 }
1100 
1102 {
1103  assert(plist);
1104  // get end of busy list
1105  Page *blend = m_busylist;
1106  while (blend && blend->next()) blend = blend->next();
1107  // ok, might have to send free pages to other end, and (if we do have to
1108  // send something to the other end) while we're at it, send any dirty
1109  // pages which are completely full, too
1110  Page *sendlisthead = 0, *sendlisttail = 0;
1111  // loop over plist
1112  while (plist) {
1113  Page* p = plist;
1114  plist = p->next();
1115  p->setNext(0);
1116  if (p->size()) {
1117  // busy page...
1118  p->pos() = 0;
1119  // put at end of busy list
1120  if (blend) blend->setNext(p);
1121  else m_busylist = p;
1122  blend = p;
1123  } else {
1124  // free page...
1125  // Very simple algorithm: once we're done with a page, we send it back
1126  // where it came from. If it's from our end, we put it on the free list, if
1127  // it's from the other end, we send it back.
1128  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1129  (isChild() && m_pages[p] < PagesPerEnd)) {
1130  // page "belongs" to other end
1131  if (!sendlisthead) sendlisthead = p;
1132  if (sendlisttail) sendlisttail->setNext(p);
1133  sendlisttail = p;
1134  } else {
1135  // add page to freelist
1136  p->setNext(m_freelist);
1137  m_freelist = p;
1138  }
1139  }
1140  }
1141  // check if we have to send stuff to the other end
1142  if (sendlisthead) {
1143  // go through our list of dirty pages, and see what we can
1144  // send along
1145  Page* dp;
1146  while ((dp = m_dirtylist) && dp->full()) {
1147  Page* p = dp;
1148  // move head of dirty list
1149  m_dirtylist = p->next();
1150  // queue for sending
1151  p->setNext(0);
1152  sendlisttail->setNext(p);
1153  sendlisttail = p;
1154  }
1155  // poll if the other end is still alive - this needs that we first
1156  // close the write pipe of the other end when the remote end of the
1157  // connection is shutting down in doClose; we'll see that because we
1158  // get a POLLHUP on our inpipe
1159  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1160  struct pollfd fds[2];
1161  fds[0].fd = m_outpipe;
1162  fds[0].events = fds[0].revents = 0;
1163  if (m_outpipe != m_inpipe) {
1164  fds[1].fd = m_inpipe;
1165  fds[1].events = fds[1].revents = 0;
1166  } else {
1167  fds[0].events |= POLLIN;
1168  }
1169  int retVal = 0;
1170  do {
1171  retVal = ::poll(fds, nfds, 0);
1172  if (0 > retVal && EINTR == errno)
1173  continue;
1174  break;
1175  } while (true);
1176  if (0 <= retVal) {
1177  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1178  if (m_outpipe != m_inpipe) {
1179  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1180  } else {
1181  if (ok && fds[0].revents & POLLIN) {
1182  unsigned ret = recvpages();
1183  if (!ret) ok = false;
1184  }
1185  }
1186 
1187  if (ok) sendpages(sendlisthead);
1188  // (if the pipe is dead already, we don't care that we leak the
1189  // contents of the pages on the send list here, so that is why
1190  // there's no else clause here)
1191  } else {
1192  throw Exception("feedPageLists: poll", errno);
1193  }
1194  }
1195 }
1196 
1198 {
1199  assert(p);
1200  assert(p == m_freelist);
1201  // remove from freelist
1202  m_freelist = p->next();
1203  p->setNext(0);
1204  // append to dirty list
1205  Page* dl = m_dirtylist;
1206  while (dl && dl->next()) dl = dl->next();
1207  if (dl) dl->setNext(p);
1208  else m_dirtylist = p;
1209 }
1210 
1212 {
1213  // queue any pages available for reading we can without blocking
1215  Page* p;
1216  // if there are no busy pages, try to get them from the other end,
1217  // block if we have to...
1218  while (!(p = m_busylist)) if (!recvpages()) return 0;
1219  return p;
1220 }
1221 
1223 {
1224  // queue any pages available for reading we can without blocking
1226  Page* p = m_dirtylist;
1227  // go to end of dirty list
1228  if (p) while (p->next()) p = p->next();
1229  if (!p || p->full()) {
1230  // need to append free page, so get one
1231  while (!(p = m_freelist)) if (!recvpages()) return 0;
1232  markPageDirty(p);
1233  }
1234  return p;
1235 }
1236 
1238 { return doFlush(true); }
1239 
1240 void BidirMMapPipe::doFlush(bool forcePartialPages)
1241 {
1242  assert(!(m_flags & failbit));
1243  // build a list of pages to flush
1244  Page *flushlisthead = 0, *flushlisttail = 0;
1245  while (m_dirtylist) {
1246  Page* p = m_dirtylist;
1247  if (!forcePartialPages && !p->full()) break;
1248  // remove dirty page from dirty list
1249  m_dirtylist = p->next();
1250  p->setNext(0);
1251  // and send it to other end
1252  if (!flushlisthead) flushlisthead = p;
1253  if (flushlisttail) flushlisttail->setNext(p);
1254  flushlisttail = p;
1255  }
1256  if (flushlisthead) sendpages(flushlisthead);
1257 }
1258 
1260 {
1261  assert(!(m_flags & failbit));
1262  // join busy and dirty lists
1263  {
1264  Page *l = m_busylist;
1265  while (l && l->next()) l = l->next();
1266  if (l) l->setNext(m_dirtylist);
1267  else m_busylist = m_dirtylist;
1268  }
1269  // empty busy and dirty pages
1270  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1271  // put them on the free list
1273  m_busylist = m_dirtylist = 0;
1274 }
1275 
1277 {
1278  // queue all pages waiting for consumption in the pipe before we give an
1279  // answer
1281  size_type retVal = 0;
1282  for (Page* p = m_busylist; p; p = p->next())
1283  retVal += p->size() - p->pos();
1284  return retVal;
1285 }
1286 
1288 {
1289  // queue all pages waiting for consumption in the pipe before we give an
1290  // answer
1292  // check if we could write to the pipe without blocking (we need to know
1293  // because we might need to check if flushing of dirty pages would block)
1294  bool couldwrite = false;
1295  {
1296  struct pollfd fds;
1297  fds.fd = m_outpipe;
1298  fds.events = POLLOUT;
1299  fds.revents = 0;
1300  int retVal = 0;
1301  do {
1302  retVal = ::poll(&fds, 1, 0);
1303  if (0 > retVal) {
1304  if (EINTR == errno) continue;
1305  throw Exception("bytesWritableNonBlocking: poll", errno);
1306  }
1307  if (1 == retVal && fds.revents & POLLOUT &&
1308  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1309  couldwrite = true;
1310  break;
1311  } while (true);
1312  }
1313  // ok, start counting bytes
1314  size_type retVal = 0;
1315  unsigned npages = 0;
1316  // go through the dirty list
1317  for (Page* p = m_dirtylist; p; p = p->next()) {
1318  ++npages;
1319  // if page only partially filled
1320  if (!p->full())
1321  retVal += p->free();
1322  if (npages >= FlushThresh && !couldwrite) break;
1323  }
1324  // go through the free list
1325  for (Page* p = m_freelist; p && (!m_dirtylist ||
1326  npages < FlushThresh || couldwrite); p = p->next()) {
1327  ++npages;
1328  retVal += Page::capacity();
1329  }
1330  return retVal;
1331 }
1332 
1334 {
1335  assert(!(m_flags & failbit));
1336  size_type nread = 0;
1337  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1338  try {
1339  while (sz) {
1340  // find next page to read from
1341  Page* p = busypage();
1342  if (!p) {
1343  m_flags |= eofbit;
1344  return nread;
1345  }
1346  unsigned char* pp = p->begin() + p->pos();
1347  size_type csz = std::min(size_type(p->remaining()), sz);
1348  std::copy(pp, pp + csz, ap);
1349  nread += csz;
1350  ap += csz;
1351  sz -= csz;
1352  p->pos() += csz;
1353  assert(p->size() >= p->pos());
1354  if (p->size() == p->pos()) {
1355  // if no unread data remains, page is free
1356  m_busylist = p->next();
1357  p->setNext(0);
1358  p->size() = 0;
1359  feedPageLists(p);
1360  }
1361  }
1362  } catch (const Exception& e) {
1363  m_flags |= rderrbit;
1364  if (m_flags & exceptionsbit) throw e;
1365  }
1366  return nread;
1367 }
1368 
1370 {
1371  assert(!(m_flags & failbit));
1372  size_type written = 0;
1373  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1374  try {
1375  while (sz) {
1376  // find next page to write to
1377  Page* p = dirtypage();
1378  if (!p) {
1379  m_flags |= eofbit;
1380  return written;
1381  }
1382  unsigned char* pp = p->begin() + p->size();
1383  size_type csz = std::min(size_type(p->free()), sz);
1384  std::copy(ap, ap + csz, pp);
1385  written += csz;
1386  ap += csz;
1387  p->size() += csz;
1388  sz -= csz;
1389  assert(p->capacity() >= p->size());
1390  if (p->full()) {
1391  // if page is full, see if we're above the flush threshold of
1392  // 3/4 of our pages
1394  doFlush(false);
1395  }
1396  }
1397  } catch (const Exception& e) {
1398  m_flags |= wrerrbit;
1399  if (m_flags & exceptionsbit) throw e;
1400  }
1401  return written;
1402 }
1403 
1405 {
1406  // go through pipes, and change flags where we already know without really
1407  // polling - stuff where we don't need poll to wait for its timeout in the
1408  // OS...
1409  bool canskiptimeout = false;
1410  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1411  std::vector<unsigned>::iterator mit = masks.begin();
1412  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1413  ++it, ++mit) {
1414  PollEntry& pe = *it;
1415  pe.revents = None;
1416  // null pipe pointer or closed pipe is invalid
1417  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1418  // check for error
1419  if (pe.pipe->bad()) pe.revents |= Error;
1420  // check for end of file
1421  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1422  // check if readable
1423  if (pe.events & Readable) {
1424  *mit |= Readable;
1425  if (pe.pipe->m_busylist) pe.revents |= Readable;
1426  }
1427  // check if writable
1428  if (pe.events & Writable) {
1429  *mit |= Writable;
1430  if (pe.pipe->m_freelist) {
1431  pe.revents |= Writable;
1432  } else {
1433  Page *dl = pe.pipe->m_dirtylist;
1434  while (dl && dl->next()) dl = dl->next();
1435  if (dl && dl->pos() < Page::capacity())
1436  pe.revents |= Writable;
1437  }
1438  }
1439  if (pe.revents) canskiptimeout = true;
1440  }
1441  // set up the data structures required for the poll syscall
1442  std::vector<pollfd> fds;
1443  fds.reserve(2 * pipes.size());
1444  std::map<int, PollEntry*> fds2pipes;
1445  for (PollVector::const_iterator it = pipes.begin();
1446  pipes.end() != it; ++it) {
1447  const PollEntry& pe = *it;
1448  struct pollfd tmp;
1449  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1450  const_cast<PollEntry*>(&pe)));
1451  tmp.events = tmp.revents = 0;
1452  // we always poll for readability; this allows us to queue pages
1453  // early
1454  tmp.events |= POLLIN;
1455  if (pe.pipe->m_outpipe != tmp.fd) {
1456  // ok, it's a pair of pipes
1457  fds.push_back(tmp);
1458  fds2pipes.insert(std::make_pair(
1459  unsigned(tmp.fd = pe.pipe->m_outpipe),
1460  const_cast<PollEntry*>(&pe)));
1461  tmp.events = 0;
1462 
1463  }
1464  if (pe.events & Writable) tmp.events |= POLLOUT;
1465  fds.push_back(tmp);
1466  }
1467  // poll
1468  int retVal = 0;
1469  do {
1470  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1471  if (0 > retVal) {
1472  if (EINTR == errno) continue;
1473  throw Exception("poll", errno);
1474  }
1475  break;
1476  } while (true);
1477  // fds may have changed state, so update...
1478  for (std::vector<pollfd>::iterator it = fds.begin();
1479  fds.end() != it; ++it) {
1480  pollfd& fe = *it;
1481  //if (!fe.revents) continue;
1482  --retVal;
1483  PollEntry& pe = *fds2pipes[fe.fd];
1484 oncemore:
1485  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1486  pe.revents |= ReadInvalid;
1487  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1488  pe.revents |= WriteInvalid;
1489  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1490  pe.revents |= ReadError;
1491  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1492  pe.revents |= WriteError;
1493  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1494  pe.revents |= ReadEndOfFile;
1495  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1496  pe.revents |= WriteEndOfFile;
1497  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1498  !(fe.revents & (POLLNVAL | POLLERR))) {
1499  // ok, there is at least one page for us to receive from the
1500  // other end
1501  if (0 == pe.pipe->recvpages()) continue;
1502  // more pages there?
1503  do {
1504  int tmp = ::poll(&fe, 1, 0);
1505  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1506  if (0 > tmp) {
1507  if (EINTR == errno) continue;
1508  throw Exception("poll", errno);
1509  }
1510  break;
1511  } while (true);
1512  }
1513  if (pe.pipe->m_busylist) pe.revents |= Readable;
1514  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1515  if (pe.pipe->m_freelist) {
1516  pe.revents |= Writable;
1517  } else {
1518  Page *dl = pe.pipe->m_dirtylist;
1519  while (dl && dl->next()) dl = dl->next();
1520  if (dl && dl->pos() < Page::capacity())
1521  pe.revents |= Writable;
1522  }
1523  }
1524  }
1525  // apply correct masks, and count pipes with pending events
1526  int npipes = 0;
1527  mit = masks.begin();
1528  for (PollVector::iterator it = pipes.begin();
1529  pipes.end() != it; ++it, ++mit)
1530  if ((it->revents &= *mit)) ++npipes;
1531  return npipes;
1532 }
1533 
1535 {
1536  size_t sz = std::strlen(str);
1537  *this << sz;
1538  if (sz) write(str, sz);
1539  return *this;
1540 }
1541 
1543 {
1544  size_t sz = 0;
1545  *this >> sz;
1546  if (good() && !eof()) {
1547  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1548  if (!str) throw Exception("realloc", errno);
1549  if (sz) read(str, sz);
1550  str[sz] = 0;
1551  }
1552  return *this;
1553 }
1554 
1556 {
1557  size_t sz = str.size();
1558  *this << sz;
1559  write(str.data(), sz);
1560  return *this;
1561 }
1562 
1564 {
1565  str.clear();
1566  size_t sz = 0;
1567  *this >> sz;
1568  if (good() && !eof()) {
1569  str.reserve(sz);
1570  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1571  }
1572  return *this;
1573 }
1574 
1576 
1577 #ifdef TEST_BIDIRMMAPPIPE
1578 using namespace RooFit;
1579 
1580 int simplechild(BidirMMapPipe& pipe)
1581 {
1582  // child does an echo loop
1583  while (pipe.good() && !pipe.eof()) {
1584  // read a string
1585  std::string str;
1586  pipe >> str;
1587  if (!pipe) return -1;
1588  if (pipe.eof()) break;
1589  if (!str.empty()) {
1590  std::cout << "[CHILD] : read: " << str << std::endl;
1591  str = "... early in the morning?";
1592  }
1593  pipe << str << BidirMMapPipe::flush;
1594  // did our parent tell us to shut down?
1595  if (str.empty()) break;
1596  if (!pipe) return -1;
1597  if (pipe.eof()) break;
1598  std::cout << "[CHILD] : wrote: " << str << std::endl;
1599  }
1600  pipe.close();
1601  return 0;
1602 }
1603 
1604 #include <sstream>
1605 int randomchild(BidirMMapPipe& pipe)
1606 {
1607  // child sends out something at random intervals
1608  ::srand48(::getpid());
1609  {
1610  // wait for parent's go ahead signal
1611  std::string s;
1612  pipe >> s;
1613  }
1614  // no shutdown sequence needed on this side - we're producing the data,
1615  // and the parent can just read until we're done (when it'll get EOF)
1616  for (int i = 0; i < 5; ++i) {
1617  // sleep a random time between 0 and .9 seconds
1618  ::usleep(int(1e6 * ::drand48()));
1619  std::ostringstream buf;
1620  buf << "child pid " << ::getpid() << " sends message " << i;
1621  std::string str = buf.str();
1622  std::cout << "[CHILD] : " << str << std::endl;
1623  pipe << str << BidirMMapPipe::flush;
1624  if (!pipe) return -1;
1625  if (pipe.eof()) break;
1626  }
1627  // tell parent we're shutting down
1628  pipe << "" << BidirMMapPipe::flush;
1629  // wait for parent to acknowledge
1630  std::string s;
1631  pipe >> s;
1632  pipe.close();
1633  return 0;
1634 }
1635 
1636 int benchchildrtt(BidirMMapPipe& pipe)
1637 {
1638  // child does the equivalent of listening for pings and sending the
1639  // packet back
1640  char* str = 0;
1641  while (pipe && !pipe.eof()) {
1642  pipe >> str;
1643  if (!pipe) {
1644  std::free(str);
1645  pipe.close();
1646  return -1;
1647  }
1648  if (pipe.eof()) break;
1649  pipe << str << BidirMMapPipe::flush;
1650  // if we have just completed the shutdown handshake, we break here
1651  if (!std::strlen(str)) break;
1652  }
1653  std::free(str);
1654  pipe.close();
1655  return 0;
1656 }
1657 
1658 int benchchildsink(BidirMMapPipe& pipe)
1659 {
1660  // child behaves like a sink
1661  char* str = 0;
1662  while (pipe && !pipe.eof()) {
1663  pipe >> str;
1664  if (!std::strlen(str)) break;
1665  }
1666  pipe << "" << BidirMMapPipe::flush;
1667  std::free(str);
1668  pipe.close();
1669  return 0;
1670 }
1671 
1672 int benchchildsource(BidirMMapPipe& pipe)
1673 {
1674  // child behaves like a source
1675  char* str = 0;
1676  for (unsigned i = 0; i <= 24; ++i) {
1677  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1678  std::memset(str, '4', 1 << i);
1679  str[1 << i] = 0;
1680  for (unsigned j = 0; j < 1 << 7; ++j) {
1681  pipe << str;
1682  if (!pipe || pipe.eof()) {
1683  std::free(str);
1684  pipe.close();
1685  return -1;
1686  }
1687  }
1688  // tell parent we're done with this block size
1689  pipe << "" << BidirMMapPipe::flush;
1690  }
1691  // tell parent to shut down
1692  pipe << "" << BidirMMapPipe::flush;
1693  std::free(str);
1694  pipe.close();
1695  return 0;
1696 }
1697 
1698 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1699 {
1700  // create a pipe with the given child at the remote end
1701  BidirMMapPipe *p = new BidirMMapPipe();
1702  if (p->isChild()) {
1703  int retVal = childexec(*p);
1704  delete p;
1705  std::exit(retVal);
1706  }
1707  return p;
1708 }
1709 
1710 #include <sys/time.h>
1711 #include <iomanip>
1712 int main()
1713 {
1714  // simple echo loop test
1715  {
1716  std::cout << "[PARENT]: simple challenge-response test, "
1717  "one child:" << std::endl;
1718  BidirMMapPipe* pipe = spawnChild(simplechild);
1719  for (int i = 0; i < 5; ++i) {
1720  std::string str("What shall we do with a drunken sailor...");
1721  *pipe << str << BidirMMapPipe::flush;
1722  if (!*pipe) return -1;
1723  std::cout << "[PARENT]: wrote: " << str << std::endl;
1724  *pipe >> str;
1725  if (!*pipe) return -1;
1726  std::cout << "[PARENT]: read: " << str << std::endl;
1727  }
1728  // send shutdown string
1729  *pipe << "" << BidirMMapPipe::flush;
1730  // wait for shutdown handshake
1731  std::string s;
1732  *pipe >> s;
1733  int retVal = pipe->close();
1734  std::cout << "[PARENT]: exit status of child: " << retVal <<
1735  std::endl;
1736  if (retVal) return retVal;
1737  delete pipe;
1738  }
1739  // simple poll test - children send 5 results in random intervals
1740  {
1741  unsigned nch = 20;
1742  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1743  " children:" << std::endl;
1744  typedef BidirMMapPipe::PollEntry PollEntry;
1745  // poll data structure
1747  pipes.reserve(nch);
1748  // spawn children
1749  for (unsigned i = 0; i < nch; ++i) {
1750  std::cout << "[PARENT]: spawning child " << i << std::endl;
1751  pipes.push_back(PollEntry(spawnChild(randomchild),
1753  }
1754  // wake children up
1755  std::cout << "[PARENT]: waking up children" << std::endl;
1756  for (unsigned i = 0; i < nch; ++i)
1757  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1758  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1759  // while at least some children alive
1760  while (!pipes.empty()) {
1761  // poll, wait until status change (infinite timeout)
1762  int npipes = BidirMMapPipe::poll(pipes, -1);
1763  // scan for pipes with changed status
1764  for (std::vector<PollEntry>::iterator it = pipes.begin();
1765  npipes && pipes.end() != it; ) {
1766  if (!it->revents) {
1767  // unchanged, next one
1768  ++it;
1769  continue;
1770  }
1771  --npipes; // maybe we can stop early...
1772  // read from pipes which are readable
1773  if (it->revents & BidirMMapPipe::Readable) {
1774  std::string s;
1775  *(it->pipe) >> s;
1776  if (!s.empty()) {
1777  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1778  ": " << s << std::endl;
1779  ++it;
1780  continue;
1781  } else {
1782  // child is shutting down...
1783  *(it->pipe) << "" << BidirMMapPipe::flush;
1784  goto childcloses;
1785  }
1786  }
1787  // retire pipes with error or end-of-file condition
1788  if (it->revents & (BidirMMapPipe::Error |
1791  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1792  " revents" <<
1793  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1794  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1795  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1796  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1797  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1798  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1799  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1800  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1801  std::endl;
1802 childcloses:
1803  int retVal = it->pipe->close();
1804  std::cout << "[PARENT]: child exit status: " <<
1805  retVal << ", number of children still alive: " <<
1806  (pipes.size() - 1) << std::endl;
1807  if (retVal) return retVal;
1808  delete it->pipe;
1809  it = pipes.erase(it);
1810  continue;
1811  }
1812  }
1813  }
1814  }
1815  // little benchmark - round trip time
1816  {
1817  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1818  for (unsigned i = 0; i <= 24; ++i) {
1819  char *s = new char[1 + (1 << i)];
1820  std::memset(s, 'A', 1 << i);
1821  s[1 << i] = 0;
1822  const unsigned n = 1 << 7;
1823  double avg = 0., min = 1e42, max = -1e42;
1824  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1825  for (unsigned j = n; j--; ) {
1826  struct timeval t1;
1827  ::gettimeofday(&t1, 0);
1828  *pipe << s << BidirMMapPipe::flush;
1829  if (!*pipe || pipe->eof()) break;
1830  *pipe >> s;
1831  if (!*pipe || pipe->eof()) break;
1832  struct timeval t2;
1833  ::gettimeofday(&t2, 0);
1834  t2.tv_sec -= t1.tv_sec;
1835  t2.tv_usec -= t1.tv_usec;
1836  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1837  if (dt < min) min = dt;
1838  if (dt > max) max = dt;
1839  avg += dt;
1840  }
1841  // send a shutdown string
1842  *pipe << "" << BidirMMapPipe::flush;
1843  // get child's shutdown ok
1844  *pipe >> s;
1845  avg /= double(n);
1846  avg *= 1e6; min *= 1e6; max *= 1e6;
1847  int retVal = pipe->close();
1848  if (retVal) {
1849  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1850  return retVal;
1851  }
1852  delete pipe;
1853  // there is a factor 2 in the formula for the transfer rate below,
1854  // because we transfer data of twice the size of the block - once
1855  // to the child, and once for the return trip
1856  std::cout << "block size " << std::setw(9) << (1 << i) <<
1857  " avg " << std::setw(7) << avg << " us min " <<
1858  std::setw(7) << min << " us max " << std::setw(7) << max <<
1859  "us speed " << std::setw(9) <<
1860  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1861  " MB/s" << std::endl;
1862  delete[] s;
1863  }
1864  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1865  }
1866  // little benchmark - child as sink
1867  {
1868  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1869  for (unsigned i = 0; i <= 24; ++i) {
1870  char *s = new char[1 + (1 << i)];
1871  std::memset(s, 'A', 1 << i);
1872  s[1 << i] = 0;
1873  const unsigned n = 1 << 7;
1874  double avg = 0., min = 1e42, max = -1e42;
1875  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1876  for (unsigned j = n; j--; ) {
1877  struct timeval t1;
1878  ::gettimeofday(&t1, 0);
1879  // streaming mode - we do not flush here
1880  *pipe << s;
1881  if (!*pipe || pipe->eof()) break;
1882  struct timeval t2;
1883  ::gettimeofday(&t2, 0);
1884  t2.tv_sec -= t1.tv_sec;
1885  t2.tv_usec -= t1.tv_usec;
1886  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1887  if (dt < min) min = dt;
1888  if (dt > max) max = dt;
1889  avg += dt;
1890  }
1891  // send a shutdown string
1892  *pipe << "" << BidirMMapPipe::flush;
1893  // get child's shutdown ok
1894  *pipe >> s;
1895  avg /= double(n);
1896  avg *= 1e6; min *= 1e6; max *= 1e6;
1897  int retVal = pipe->close();
1898  if (retVal) {
1899  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1900  return retVal;
1901  }
1902  delete pipe;
1903  std::cout << "block size " << std::setw(9) << (1 << i) <<
1904  " avg " << std::setw(7) << avg << " us min " <<
1905  std::setw(7) << min << " us max " << std::setw(7) << max <<
1906  "us speed " << std::setw(9) <<
1907  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1908  " MB/s" << std::endl;
1909  delete[] s;
1910  }
1911  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1912  }
1913  // little benchmark - child as source
1914  {
1915  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1916  char *s = 0;
1917  double avg = 0., min = 1e42, max = -1e42;
1918  unsigned n = 0, bsz = 0;
1919  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1920  while (*pipe && !pipe->eof()) {
1921  struct timeval t1;
1922  ::gettimeofday(&t1, 0);
1923  // streaming mode - we do not flush here
1924  *pipe >> s;
1925  if (!*pipe || pipe->eof()) break;
1926  struct timeval t2;
1927  ::gettimeofday(&t2, 0);
1928  t2.tv_sec -= t1.tv_sec;
1929  t2.tv_usec -= t1.tv_usec;
1930  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1931  if (std::strlen(s)) {
1932  ++n;
1933  if (dt < min) min = dt;
1934  if (dt > max) max = dt;
1935  avg += dt;
1936  bsz = std::strlen(s);
1937  } else {
1938  if (!n) break;
1939  // next block size
1940  avg /= double(n);
1941  avg *= 1e6; min *= 1e6; max *= 1e6;
1942 
1943  std::cout << "block size " << std::setw(9) << bsz <<
1944  " avg " << std::setw(7) << avg << " us min " <<
1945  std::setw(7) << min << " us max " << std::setw(7) <<
1946  max << "us speed " << std::setw(9) <<
1947  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1948  " MB/s" << std::endl;
1949  n = 0;
1950  avg = 0.;
1951  min = 1e42;
1952  max = -1e42;
1953  }
1954  }
1955  int retVal = pipe->close();
1956  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1957  if (retVal) return retVal;
1958  delete pipe;
1959  std::free(s);
1960  }
1961  return 0;
1962 }
1963 #endif // TEST_BIDIRMMAPPIPE
1964 #endif // _WIN32
1965 
1966 // vim: ft=cpp:sw=4:tw=78
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
bool eof() const
true if end-of-file
static Vc_ALWAYS_INLINE int_v min(const int_v &x, const int_v &y)
Definition: vector.h:433
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
return c
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
unsigned m_refcnt
reference counter
#define assert(cond)
Definition: unittest.h:542
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
size_t
Definition: TBuffer.cxx:28
pid_t m_childPid
pid of the child (zero if we're child)
TLatex * t1
Definition: textangle.C:20
pipe error read end
TFile * f
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
pages shared (child + parent)
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:87
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
pipe can be written to
Page * busypage()
get a busy page to read data from (may block)
if(pyself &&pyself!=Py_None)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static const char * what
Definition: stlLoader.cc:6
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
tuple main
Definition: hsum.py:20
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
logical failure (e.g. pipe closed)
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
error reporting with exceptions
TLine * l
Definition: textangle.C:4
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
tuple free
Definition: fildir.py:30
TText * t2
Definition: rootenv.C:28
#define blend
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
void zap(Pages &p)
free all pages except for those pointed to by p
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
void fill()
Definition: utils.cpp:314
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
end of file reached
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Definition: vector.h:440
#define name(a, b)
Definition: linkTestLib0.cpp:5
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static unsigned long masks[]
Definition: gifencode.c:211
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
nothing special on this pipe
const Int_t n
Definition: legend1.C:16
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the page size of the system
Definition: BidirMMapPipe.h:85
pages per pipe end
unsigned pageno(Page *p) const
perform page to page number mapping
#define BEGIN_NAMESPACE_ROOFIT