Logo ROOT   6.10/09
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <iostream>
17 #include <algorithm>
18 #include <exception>
19 
20 #include <poll.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/wait.h>
30 #include <sys/socket.h>
31 
32 #include "BidirMMapPipe.h"
33 
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
36 
38 
39 /// namespace for implementation details of BidirMMapPipe
40 namespace BidirMMapPipe_impl {
41  /** @brief exception to throw if low-level OS calls go wrong
42  *
43  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44  * @date 2013-07-07
45  */
46  class BidirMMapPipeException : public std::exception
47  {
48  private:
49  enum {
50  s_sz = 256 ///< length of buffer
51  };
52  char m_buf[s_sz]; ///< buffer containing the error message
53 
54  /// for the POSIX version of strerror_r
55  static int dostrerror_r(int err, char* buf, std::size_t sz,
56  int (*f)(int, char*, std::size_t))
57  { return f(err, buf, sz); }
58  /// for the GNU version of strerror_r
59  static int dostrerror_r(int, char*, std::size_t,
60  char* (*f)(int, char*, std::size_t));
61  public:
62  /// constructor taking error code, hint on operation (msg)
63  BidirMMapPipeException(const char* msg, int err);
64  /// return a destcription of what went wrong
65  virtual const char* what() const noexcept { return m_buf; }
66  };
67 
68  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69  {
70  std::size_t msgsz = std::strlen(msg);
71  if (msgsz) {
72  msgsz = std::min(msgsz, std::size_t(s_sz));
73  std::copy(msg, msg + msgsz, m_buf);
74  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76  }
77  if (msgsz < s_sz) {
78  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79  // have to sort it out with overloads
80  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81  }
82  m_buf[s_sz - 1] = 0; // enforce zero-termination
83  }
84 
85  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86  std::size_t sz, char* (*f)(int, char*, std::size_t))
87  {
88  buf[0] = 0;
89  char *tmp = f(err, buf, sz);
90  if (tmp && tmp != buf) {
91  std::strncpy(buf, tmp, sz);
92  buf[sz - 1] = 0;
93  if (std::strlen(tmp) > sz - 1) return ERANGE;
94  }
95  return 0;
96  }
97 
98  /** @brief class representing the header structure in an mmapped page
99  *
100  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101  * @date 2013-07-07
102  *
103  * contains a field to put pages into a linked list, a field for the size
104  * of the data being transmitted, and a field for the position until which
105  * the data has been read
106  */
107  class Page
108  {
109  private:
110  // use as small a data type as possible to maximise payload area
111  // of pages
112  short m_next; ///< next page in list (in pagesizes)
113  unsigned short m_size; ///< size of payload (in bytes)
114  unsigned short m_pos; ///< index of next byte in payload area
115  /// copy construction forbidden
116  Page(const Page&) {}
117  /// assigment forbidden
118  Page& operator=(const Page&) = delete;
119  public:
120  /// constructor
121  Page() : m_next(0), m_size(0), m_pos(0)
122  {
123  // check that short is big enough - must be done at runtime
124  // because the page size is not known until runtime
125  assert(std::numeric_limits<unsigned short>::max() >=
127  }
128  /// set pointer to next page
129  void setNext(const Page* p);
130  /// return pointer to next page
131  Page* next() const;
132  /// return reference to size field
133  unsigned short& size() { return m_size; }
134  /// return size (of payload data)
135  unsigned size() const { return m_size; }
136  /// return reference to position field
137  unsigned short& pos() { return m_pos; }
138  /// return position
139  unsigned pos() const { return m_pos; }
140  /// return pointer to first byte in payload data area of page
141  inline unsigned char* begin() const
142  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
143  + sizeof(Page); }
144  /// return pointer to first byte in payload data area of page
145  inline unsigned char* end() const
146  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
147  + PageChunk::pagesize(); }
148  /// return the capacity of the page
149  static unsigned capacity()
150  { return PageChunk::pagesize() - sizeof(Page); }
151  /// true if page empty
152  bool empty() const { return !m_size; }
153  /// true if page partially filled
154  bool filled() const { return !empty(); }
155  /// free space left (to be written to)
156  unsigned free() const { return capacity() - m_size; }
157  /// bytes remaining to be read
158  unsigned remaining() const { return m_size - m_pos; }
159  /// true if page completely full
160  bool full() const { return !free(); }
161  };
162 
163  void Page::setNext(const Page* p)
164  {
165  if (!p) {
166  m_next = 0;
167  } else {
168  const char* p1 = reinterpret_cast<char*>(this);
169  const char* p2 = reinterpret_cast<const char*>(p);
170  std::ptrdiff_t tmp = p2 - p1;
171  // difference must be divisible by page size
172  assert(!(tmp % PageChunk::pagesize()));
173  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
174  m_next = tmp;
175  // no truncation when saving in a short
176  assert(m_next == tmp);
177  // final check: next() must return p
178  assert(next() == p);
179  }
180  }
181 
182  Page* Page::next() const
183  {
184  if (!m_next) return 0;
185  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
186  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
187  return reinterpret_cast<Page*>(ptmp);
188  }
189 
190  /** @brief class representing a page pool
191  *
192  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
193  * @date 2013-07-24
194  *
195  * pool of mmapped pages (on systems which support it, on all others, the
196  * functionality is emulated with dynamically allocated memory)
197  *
198  * in most operating systems there is a limit to how many mappings any one
199  * process is allowed to request; for this reason, we mmap a relatively
200  * large amount up front, and then carve off little pieces as we need them
201  *
202  * Moreover, some systems have too large a physical page size in their MMU
203  * for the code to handle (we want offsets and lengths to fit into 16
204  * bits), so we carve such big physical pages into smaller logical Pages
205  * if needed. The largest logical page size is currently 16 KiB.
206  */
207  class PagePool {
208  private:
209  /// convenience typedef
210  typedef BidirMMapPipeException Exception;
211 
212  enum {
213  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
214  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
215  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
216  };
217  /// a chunk of memory in the pool
218  typedef BidirMMapPipe_impl::PageChunk Chunk;
219  /// list of chunks
220  typedef std::list<Chunk*> ChunkList;
221 
222  friend class BidirMMapPipe_impl::PageChunk;
223  public:
224  /// convenience typedef
226  /// constructor
227  PagePool(unsigned nPagesPerGroup);
228  /// destructor
229  ~PagePool();
230  /// pop a free element out of the pool
231  Pages pop();
232 
233  /// return (logical) page size of the system
234  static unsigned pagesize() { return PageChunk::pagesize(); }
235  /// return variety of mmap supported on the system
236  static MMapVariety mmapVariety()
237  { return PageChunk::mmapVariety(); }
238 
239  /// return number of pages per group (ie. as returned by pop())
240  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
241 
242  /// zap the pool (unmap all but Pages p)
243  void zap(Pages& p);
244 
245  private:
246  /// list of chunks used by the pool
247  ChunkList m_chunks;
248  /// list of chunks used by the pool which are not full
249  ChunkList m_freelist;
250  /// chunk size map (histogram of chunk sizes)
251  unsigned m_szmap[(maxsz - minsz) / szincr];
252  /// current chunk size
253  int m_cursz;
254  /// page group size
255  unsigned m_nPgPerGrp;
256 
257  /// adjust _cursz to current largest block
258  void updateCurSz(int sz, int incr);
259  /// find size of next chunk to allocate (in a hopefully smart way)
260  int nextChunkSz() const;
261  /// release a chunk
262  void putOnFreeList(Chunk* chunk);
263  /// release a chunk
264  void release(Chunk* chunk);
265  };
266 
267  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
268  m_pimpl(new impl)
269  {
270  assert(npg < 256);
271  m_pimpl->m_parent = parent;
272  m_pimpl->m_pages = pages;
273  m_pimpl->m_refcnt = 1;
274  m_pimpl->m_npages = npg;
275  /// initialise pages
276  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
277  }
278 
280  unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
282 
284  {
285  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
286  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
287  delete m_pimpl;
288  }
289  }
290 
291  Pages::Pages(const Pages& other) :
292  m_pimpl(other.m_pimpl)
293  { ++(m_pimpl->m_refcnt); }
294 
295  Pages& Pages::operator=(const Pages& other)
296  {
297  if (&other == this) return *this;
298  if (--(m_pimpl->m_refcnt)) {
299  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
300  delete m_pimpl;
301  }
302  m_pimpl = other.m_pimpl;
303  ++(m_pimpl->m_refcnt);
304  return *this;
305  }
306 
307  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
308 
309  Page* Pages::page(unsigned pgno) const
310  {
311  assert(pgno < m_pimpl->m_npages);
312  unsigned char* pptr =
313  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
314  pptr += pgno * pagesize();
315  return reinterpret_cast<Page*>(pptr);
316  }
317 
318  unsigned Pages::pageno(Page* p) const
319  {
320  const unsigned char* pptr =
321  reinterpret_cast<const unsigned char*>(p);
322  const unsigned char* bptr =
323  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
324  assert(0 == ((pptr - bptr) % pagesize()));
325  const unsigned nr = (pptr - bptr) / pagesize();
326  assert(nr < m_pimpl->m_npages);
327  return nr;
328  }
329 
331  {
332  // find out page size of system
333  long pgsz = sysconf(_SC_PAGESIZE);
334  if (-1 == pgsz) throw Exception("sysconf", errno);
335  if (pgsz > 512 && pgsz > long(sizeof(Page)))
336  return pgsz;
337 
338  // in case of failure or implausible value, use a safe default: 4k
339  // page size, and do not try to mmap
340  s_mmapworks = Copy;
341  return 1 << 12;
342  }
343 
344  PageChunk::PageChunk(PagePool* parent,
345  unsigned length, unsigned nPgPerGroup) :
346  m_begin(dommap(length)),
347  m_end(reinterpret_cast<void*>(
348  reinterpret_cast<unsigned char*>(m_begin) + length)),
349  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
350  {
351  // ok, push groups of pages onto freelist here
352  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
353  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
354  while (p < pend) {
355  m_freelist.push_back(reinterpret_cast<void*>(p));
356  p += nPgPerGroup * PagePool::pagesize();
357  }
358  }
359 
361  {
362  if (m_parent) assert(empty());
363  if (m_begin) domunmap(m_begin, len());
364  }
365 
366  bool PageChunk::contains(const Pages& p) const
367  { return p.m_pimpl->m_parent == this; }
368 
370  {
371  assert(!m_freelist.empty());
372  void* p = m_freelist.front();
373  m_freelist.pop_front();
374  ++m_nUsedGrp;
375  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
376  }
377 
378  void PageChunk::push(const Pages& p)
379  {
380  assert(contains(p));
381  bool wasempty = m_freelist.empty();
382  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
383  --m_nUsedGrp;
384  if (m_parent) {
385  // notify parent if we need to be put on the free list again
386  if (wasempty) m_parent->putOnFreeList(this);
387  // notify parent if we're empty
388  if (empty()) return m_parent->release(this);
389  }
390  }
391 
392  void* PageChunk::dommap(unsigned len)
393  {
394  assert(len && 0 == (len % s_physpgsz));
395  // ok, the idea here is to try the different methods of mmapping, and
396  // choose the first one that works. we have four flavours:
397  // 1 - anonymous mmap (best)
398  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
399  // bit more tedious to set up, since you need to open/close a
400  // device file)
401  // 3 - mmap of a temporary file (very tedious to set up - need to
402  // create a temporary file, delete it, make the underlying storage
403  // large enough, then mmap the fd and close it)
404  // 4 - if all those fail, we malloc the buffers, and copy the data
405  // through the OS (then we're no better than normal pipes)
406  static bool msgprinted = false;
407  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
408 #if defined(MAP_ANONYMOUS)
409 #undef MYANONFLAG
410 #define MYANONFLAG MAP_ANONYMOUS
411 #elif defined(MAP_ANON)
412 #undef MYANONFLAG
413 #define MYANONFLAG MAP_ANON
414 #else
415 #undef MYANONFLAG
416 #endif
417 #ifdef MYANONFLAG
418  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419  MYANONFLAG | MAP_SHARED, -1, 0);
420  if (MAP_FAILED == retVal) {
421  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
422  } else {
423  assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
425  if (BidirMMapPipe::debugflag() && !msgprinted) {
426  std::cerr << " INFO: In " << __func__ << " (" <<
427  __FILE__ << ", line " << __LINE__ <<
428  "): anonymous mmapping works, excellent!" <<
429  std::endl;
430  msgprinted = true;
431  }
432  return retVal;
433  }
434 #endif
435 #undef MYANONFLAG
436  }
437  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
438  // ok, no anonymous mappings supported directly, so try to map
439  // /dev/zero which has much the same effect on many systems
440  int fd = ::open("/dev/zero", O_RDWR);
441  if (-1 == fd)
442  throw Exception("open /dev/zero", errno);
443  void* retVal = ::mmap(0, len,
444  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445  if (MAP_FAILED == retVal) {
446  int errsv = errno;
447  ::close(fd);
448  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
449  } else {
450  assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
452  }
453  if (-1 == ::close(fd))
454  throw Exception("close /dev/zero", errno);
455  if (BidirMMapPipe::debugflag() && !msgprinted) {
456  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
457  ", line " << __LINE__ << "): mmapping /dev/zero works, "
458  "very good!" << std::endl;
459  msgprinted = true;
460  }
461  return retVal;
462  }
463  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
464  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
465  int fd;
466  // open temp file
467  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
468  // remove it, but keep fd open
469  if (-1 == ::unlink(name)) {
470  int errsv = errno;
471  ::close(fd);
472  throw Exception("unlink", errsv);
473  }
474  // make it the right size: lseek
475  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476  int errsv = errno;
477  ::close(fd);
478  throw Exception("lseek", errsv);
479  }
480  // make it the right size: write a byte
481  if (1 != ::write(fd, name, 1)) {
482  int errsv = errno;
483  ::close(fd);
484  throw Exception("write", errsv);
485  }
486  // do mmap
487  void* retVal = ::mmap(0, len,
488  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489  if (MAP_FAILED == retVal) {
490  int errsv = errno;
491  ::close(fd);
492  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
493  } else {
494  assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
496  }
497  if (-1 == ::close(fd)) {
498  int errsv = errno;
499  ::munmap(retVal, len);
500  throw Exception("close", errsv);
501  }
502  if (BidirMMapPipe::debugflag() && !msgprinted) {
503  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
504  ", line " << __LINE__ << "): mmapping temporary files "
505  "works, good!" << std::endl;
506  msgprinted = true;
507  }
508  return retVal;
509  }
510  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
511  // fallback solution: mmap does not work on this OS (or does not
512  // work for what we want to use it), so use a normal buffer of
513  // memory instead, and collect data in that buffer - this needs an
514  // additional write/read to/from the pipe(s), but there you go...
515  if (BidirMMapPipe::debugflag() && !msgprinted) {
516  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
517  ", line " << __LINE__ << "): anonymous mmapping of "
518  "shared buffers failed, falling back to read/write on "
519  " pipes!" << std::endl;
520  msgprinted = true;
521  }
522  s_mmapworks = Copy;
523  void* retVal = std::malloc(len);
524  if (!retVal) throw Exception("malloc", errno);
525  return retVal;
526  }
527  // should never get here
528  assert(false);
529  return 0;
530  }
531 
532  void PageChunk::domunmap(void* addr, unsigned len)
533  {
534  assert(len && 0 == (len % s_physpgsz));
535  if (addr) {
536  assert(Unknown != s_mmapworks);
537  if (Copy != s_mmapworks) {
538  if (-1 == ::munmap(addr, len))
539  throw Exception("munmap", errno);
540  } else {
541  std::free(addr);
542  }
543  }
544  }
545 
547  {
548  // try to mprotect the other bits of the pool with no access...
549  // we'd really like a version of mremap here that can unmap all the
550  // other pages in the chunk, but that does not exist, so we protect
551  // the other pages in this chunk such that they may neither be read,
552  // written nor executed, only the pages we're interested in for
553  // communications stay readable and writable
554  //
555  // if an OS does not support changing the protection of a part of an
556  // mmapped area, the mprotect calls below should just fail and not
557  // change any protection, so we're a little less safe against
558  // corruption, but everything should still work
559  if (Copy != s_mmapworks) {
560  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
561  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
562  unsigned char* p2 = p1 + p.npages() * s_physpgsz;
563  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
564  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
566  }
567  m_parent = 0;
568  m_freelist.clear();
569  m_nUsedGrp = 1;
570  p.m_pimpl->m_parent = 0;
571  m_begin = m_end = 0;
572  // commit suicide
573  delete this;
574  }
575 
576  PagePool::PagePool(unsigned nPgPerGroup) :
577  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
578  {
579  // if logical and physical page size differ, we may have to adjust
580  // m_nPgPerGrp to make things fit
582  const unsigned mult =
584  const unsigned desired = nPgPerGroup * PageChunk::pagesize();
585  // round up to to next physical page boundary
586  const unsigned actual = mult *
587  (desired / mult + bool(desired % mult));
588  const unsigned newPgPerGrp = actual / PageChunk::pagesize();
589  if (BidirMMapPipe::debugflag()) {
590  std::cerr << " INFO: In " << __func__ << " (" <<
591  __FILE__ << ", line " << __LINE__ <<
592  "): physical page size " << PageChunk::physPgSz() <<
593  ", subdividing into logical pages of size " <<
594  PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
595  m_nPgPerGrp << " -> " << newPgPerGrp <<
596  std::endl;
597  }
598  assert(newPgPerGrp >= m_nPgPerGrp);
599  m_nPgPerGrp = newPgPerGrp;
600  }
601  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
602  }
603 
604  PagePool::~PagePool()
605  {
606  m_freelist.clear();
607  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
608  delete *it;
609  m_chunks.clear();
610  }
611 
612  void PagePool::zap(Pages& p)
613  {
614  // unmap all pages but those pointed to by p
615  m_freelist.clear();
616  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617  if ((*it)->contains(p)) {
618  (*it)->zap(p);
619  } else {
620  delete *it;
621  }
622  }
623  m_chunks.clear();
624  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
625  m_cursz = minsz;
626  }
627 
628  Pages PagePool::pop()
629  {
630  if (m_freelist.empty()) {
631  // allocate and register new chunk and put it on the freelist
632  const int sz = nextChunkSz();
633  Chunk *c = new Chunk(this,
634  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
635  m_chunks.push_front(c);
636  m_freelist.push_back(c);
637  updateCurSz(sz, +1);
638  }
639  // get free element from first chunk on _freelist
640  Chunk* c = m_freelist.front();
641  Pages p(c->pop());
642  // full chunks are removed from _freelist
643  if (c->full()) m_freelist.pop_front();
644  return p;
645  }
646 
647  void PagePool::release(PageChunk* chunk)
648  {
649  assert(chunk->empty());
650  // find chunk on freelist and remove
651  ChunkList::iterator it = std::find(
652  m_freelist.begin(), m_freelist.end(), chunk);
653  if (m_freelist.end() == it)
654  throw Exception("PagePool::release(PageChunk*)", EINVAL);
655  m_freelist.erase(it);
656  // find chunk in m_chunks and remove
657  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658  if (m_chunks.end() == it)
659  throw Exception("PagePool::release(PageChunk*)", EINVAL);
660  m_chunks.erase(it);
661  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
662  delete chunk;
663  updateCurSz(sz, -1);
664  }
665 
666  void PagePool::putOnFreeList(PageChunk* chunk)
667  {
668  assert(!chunk->full());
669  m_freelist.push_back(chunk);
670  }
671 
672  void PagePool::updateCurSz(int sz, int incr)
673  {
674  m_szmap[(sz - minsz) / szincr] += incr;
675  m_cursz = minsz;
676  for (int i = (maxsz - minsz) / szincr; i--; ) {
677  if (m_szmap[i]) {
678  m_cursz += i * szincr;
679  break;
680  }
681  }
682  }
683 
684  int PagePool::nextChunkSz() const
685  {
686  // no chunks with space available, figure out chunk size
687  int sz = m_cursz;
688  if (m_chunks.empty()) {
689  // if we start allocating chunks, we start from minsz
690  sz = minsz;
691  } else {
692  if (minsz >= sz) {
693  // minimal sized chunks are always grown
694  sz = minsz + szincr;
695  } else {
696  if (1 != m_chunks.size()) {
697  // if we have more than one completely filled chunk, grow
698  sz += szincr;
699  } else {
700  // just one chunk left, try shrinking chunk size
701  sz -= szincr;
702  }
703  }
704  }
705  // clamp size to allowed range
706  if (sz > maxsz) sz = maxsz;
707  if (sz < minsz) sz = minsz;
708  return sz;
709  }
710 }
711 
712 // static BidirMMapPipe members
713 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
714 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
715 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
718 
719 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
720 {
721  if (!s_pagepool)
722  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
723  return *s_pagepool;
724 }
725 
727 {
728  pthread_mutex_lock(&s_openpipesmutex);
729  while (!s_openpipes.empty()) {
730  BidirMMapPipe *p = s_openpipes.front();
731  pthread_mutex_unlock(&s_openpipesmutex);
732  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
733  p->doClose(true, true);
734  pthread_mutex_lock(&s_openpipesmutex);
735  }
736  pthread_mutex_unlock(&s_openpipesmutex);
737 }
738 
740  m_pages(pagepool().pop())
741 {
742  // free pages again
744  if (!s_pagepoolrefcnt) {
745  delete s_pagepool;
746  s_pagepool = 0;
747  }
748 }
749 
750 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
753  m_parentPid(::getpid())
754 
755 {
757  assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
758  int fds[4] = { -1, -1, -1, -1 };
759  int myerrno;
760  static bool firstcall = true;
761  if (useExceptions) m_flags |= exceptionsbit;
762 
763  try {
764  if (firstcall) {
765  firstcall = false;
766  // register a cleanup handler to make sure all BidirMMapPipes are torn
767  // down, and child processes are sent a SIGTERM
768  if (0 != atexit(BidirMMapPipe::teardownall))
769  throw Exception("atexit", errno);
770  }
771 
772  // build free lists
773  for (unsigned i = 1; i < TotPages; ++i)
774  m_pages[i - 1]->setNext(m_pages[i]);
775  m_pages[PagesPerEnd - 1]->setNext(0);
776  if (!useSocketpair) {
777  // create pipes
778  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
779  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
780  } else {
781  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
782  throw Exception("socketpair", errno);
783  }
784  // fork the child
785  pthread_mutex_lock(&s_openpipesmutex);
786  char c;
787  switch ((m_childPid = ::fork())) {
788  case -1: // error in fork()
789  myerrno = errno;
790  pthread_mutex_unlock(&s_openpipesmutex);
791  m_childPid = 0;
792  throw Exception("fork", myerrno);
793  case 0: // child
794  // put the ends in the right place
795  if (-1 != fds[2]) {
796  // pair of pipes
797  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
798  myerrno = errno;
799  pthread_mutex_unlock(&s_openpipesmutex);
800  throw Exception("close", myerrno);
801  }
802  fds[0] = fds[3] = -1;
803  m_outpipe = fds[1];
804  m_inpipe = fds[2];
805  } else {
806  // socket pair
807  if (-1 == ::close(fds[0])) {
808  myerrno = errno;
809  pthread_mutex_unlock(&s_openpipesmutex);
810  throw Exception("close", myerrno);
811  }
812  fds[0] = -1;
813  m_inpipe = m_outpipe = fds[1];
814  }
815  // close other pipes our parent may have open - we have no business
816  // reading from/writing to those...
817  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
818  s_openpipes.end() != it; ) {
819  BidirMMapPipe* p = *it;
820  it = s_openpipes.erase(it);
821  p->doClose(true, true);
822  }
823  pagepool().zap(m_pages);
824  s_pagepoolrefcnt = 0;
825  delete s_pagepool;
826  s_pagepool = 0;
827  s_openpipes.push_front(this);
828  pthread_mutex_unlock(&s_openpipesmutex);
829  // ok, put our pages on freelist
831  // handshare with other end (to make sure it's alive)...
832  c = 'C'; // ...hild
833  if (1 != xferraw(m_outpipe, &c, 1, ::write))
834  throw Exception("handshake: xferraw write", EPIPE);
835  if (1 != xferraw(m_inpipe, &c, 1, ::read))
836  throw Exception("handshake: xferraw read", EPIPE);
837  if ('P' != c) throw Exception("handshake", EPIPE);
838  break;
839  default: // parent
840  // put the ends in the right place
841  if (-1 != fds[2]) {
842  // pair of pipes
843  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
844  myerrno = errno;
845  pthread_mutex_unlock(&s_openpipesmutex);
846  throw Exception("close", myerrno);
847  }
848  fds[1] = fds[2] = -1;
849  m_outpipe = fds[3];
850  m_inpipe = fds[0];
851  } else {
852  // socketpair
853  if (-1 == ::close(fds[1])) {
854  myerrno = errno;
855  pthread_mutex_unlock(&s_openpipesmutex);
856  throw Exception("close", myerrno);
857  }
858  fds[1] = -1;
859  m_inpipe = m_outpipe = fds[0];
860  }
861  // put on list of open pipes (so we can kill child processes
862  // if things go wrong)
863  s_openpipes.push_front(this);
864  pthread_mutex_unlock(&s_openpipesmutex);
865  // ok, put our pages on freelist
866  m_freelist = m_pages[0u];
867  // handshare with other end (to make sure it's alive)...
868  c = 'P'; // ...arent
869  if (1 != xferraw(m_outpipe, &c, 1, ::write))
870  throw Exception("handshake: xferraw write", EPIPE);
871  if (1 != xferraw(m_inpipe, &c, 1, ::read))
872  throw Exception("handshake: xferraw read", EPIPE);
873  if ('C' != c) throw Exception("handshake", EPIPE);
874  break;
875  }
876  // mark file descriptors for close on exec (we do not want to leak the
877  // connection to anything we happen to exec)
878  int fdflags = 0;
879  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
880  throw Exception("fcntl", errno);
881  fdflags |= FD_CLOEXEC;
882  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
883  throw Exception("fcntl", errno);
884  if (m_inpipe != m_outpipe) {
885  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
886  throw Exception("fcntl", errno);
887  fdflags |= FD_CLOEXEC;
888  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
889  throw Exception("fcntl", errno);
890  }
891  // ok, finally, clear the failbit
892  m_flags &= ~failbit;
893  // all done
894  } catch (const BidirMMapPipe::Exception& e) {
895  if (0 != m_childPid) kill(m_childPid, SIGTERM);
896  for (int i = 0; i < 4; ++i)
897  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
898  {
899  // free resources associated with mmapped pages
901  }
902  if (!--s_pagepoolrefcnt) {
903  delete s_pagepool;
904  s_pagepool = 0;
905  }
906  throw e;
907  }
908 }
909 
911 {
912  assert(!(m_flags & failbit));
913  return doClose(false);
914 }
915 
916 int BidirMMapPipe::doClose(bool force, bool holdlock)
917 {
918  if (m_flags & failbit) return 0;
919  // flush data to be written
920  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
921  // shut down the write direction (no more writes from our side)
922  if (m_inpipe == m_outpipe) {
923  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
924  throw Exception("shutdown", errno);
925  m_outpipe = -1;
926  } else {
927  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
928  if (!force) throw Exception("close", errno);
929  m_outpipe = -1;
930  }
931  // shut down the write direction (no more writes from our side)
932  // drain anything the other end might still want to send
933  if (!force && -1 != m_inpipe) {
934  // **************** THIS IS EXTREMELY UGLY: ****************
935  // POLLHUP is not set reliably on pipe/socket shutdown on all
936  // platforms, unfortunately, so we poll for readability here until
937  // the other end closes, too
938  //
939  // the read loop below ensures that the other end sees the POLLIN that
940  // is set on shutdown instead, and goes ahead to close its end
941  //
942  // if we don't do this, and close straight away, the other end
943  // will catch a SIGPIPE or similar, and we don't want that
944  int err;
945  struct pollfd fds;
946  fds.fd = m_inpipe;
947  fds.events = POLLIN;
948  fds.revents = 0;
949  do {
950  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
951  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
952  if (fds.revents & POLLIN) {
953  char c;
954  if (1 > ::read(m_inpipe, &c, 1)) break;
955  }
956  }
957  } while (0 > err && EINTR == errno);
958  // ignore all other poll errors
959  }
960  // close read end
961  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
962  if (!force) throw Exception("close", errno);
963  m_inpipe = -1;
964  // unmap memory
965  try {
967  if (!--s_pagepoolrefcnt) {
968  delete s_pagepool;
969  s_pagepool = 0;
970  }
971  } catch (const std::exception& e) {
972  if (!force) throw e;
973  }
975  // wait for child process
976  int retVal = 0;
977  if (isParent()) {
978  int tmp;
979  do {
980  tmp = waitpid(m_childPid, &retVal, 0);
981  } while (-1 == tmp && EINTR == errno);
982  if (-1 == tmp)
983  if (!force) throw Exception("waitpid", errno);
984  m_childPid = 0;
985  }
986  // remove from list of open pipes
987  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
988  std::list<BidirMMapPipe*>::iterator it = std::find(
989  s_openpipes.begin(), s_openpipes.end(), this);
990  if (s_openpipes.end() != it) s_openpipes.erase(it);
991  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
992  m_flags |= failbit;
993  return retVal;
994 }
995 
997 { doClose(false); }
998 
1000  int fd, void* addr, size_type len,
1001  ssize_t (*xferfn)(int, void*, std::size_t))
1002 {
1003  size_type xferred = 0;
1004  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1005  while (len) {
1006  ssize_t tmp = xferfn(fd, buf, len);
1007  if (tmp > 0) {
1008  xferred += tmp;
1009  len -= tmp;
1010  buf += tmp;
1011  continue;
1012  } else if (0 == tmp) {
1013  // check for end-of-file on pipe
1014  break;
1015  } else if (-1 == tmp) {
1016  // ok some error occurred, so figure out if we want to retry of throw
1017  switch (errno) {
1018  default:
1019  // if anything was transferred, return number of bytes
1020  // transferred so far, we can start throwing on the next
1021  // transfer...
1022  if (xferred) return xferred;
1023  // else throw
1024  throw Exception("xferraw", errno);
1025  case EAGAIN: // fallthrough intended
1026 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1027  case EWOULDBLOCK: // fallthrough intended
1028 #endif
1029  std::cerr << " ERROR: In " << __func__ << " (" <<
1030  __FILE__ << ", line " << __LINE__ <<
1031  "): expect transfer to block!" << std::endl;
1032  case EINTR:
1033  break;
1034  }
1035  continue;
1036  } else {
1037  throw Exception("xferraw: unexpected return value from read/write",
1038  errno);
1039  }
1040  }
1041  return xferred;
1042 }
1043 
1045 {
1046  if (plist) {
1047  unsigned char pg = m_pages[plist];
1048  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1051  // ok, have to copy pages through pipe
1052  for (Page* p = plist; p; p = p->next()) {
1053  if (sizeof(Page) + p->size() !=
1054  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1055  ::write)) {
1056  throw Exception("sendpages: short write", EPIPE);
1057  }
1058  }
1059  }
1060  } else {
1061  throw Exception("sendpages: short write", EPIPE);
1062  }
1063  } else { assert(plist); }
1064 }
1065 
1067 {
1068  unsigned char pg;
1069  unsigned retVal = 0;
1070  Page *plisthead = 0, *plisttail = 0;
1071  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1072  plisthead = plisttail = m_pages[pg];
1073  // ok, have number of pages
1076  // ok, need to copy pages through pipe
1077  for (; plisttail; ++retVal) {
1078  Page* p = plisttail;
1079  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1080  ::read)) {
1081  plisttail = p->next();
1082  if (!p->size()) continue;
1083  // break in case of read error
1084  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1085  ::read)) break;
1086  }
1087  }
1088  } else {
1089  retVal = lenPageList(plisthead);
1090  }
1091  }
1092  // put list of pages we just received into correct lists (busy/free)
1093  if (plisthead) feedPageLists(plisthead);
1094  // ok, retVal contains the number of pages read, so put them on the
1095  // correct lists
1096  return retVal;
1097 }
1098 
1100 {
1101  struct pollfd fds;
1102  fds.fd = m_inpipe;
1103  fds.events = POLLIN;
1104  fds.revents = 0;
1105  unsigned retVal = 0;
1106  do {
1107  int rc = ::poll(&fds, 1, 0);
1108  if (0 > rc) {
1109  if (EINTR == errno) continue;
1110  break;
1111  }
1112  if (1 == retVal && fds.revents & POLLIN &&
1113  !(fds.revents & (POLLNVAL | POLLERR))) {
1114  // ok, we can read without blocking, so the other end has
1115  // something for us
1116  return recvpages();
1117  } else {
1118  break;
1119  }
1120  } while (true);
1121  return retVal;
1122 }
1123 
1125 {
1126  unsigned n = 0;
1127  for ( ; p; p = p->next()) ++n;
1128  return n;
1129 }
1130 
1132 {
1133  assert(plist);
1134  // get end of busy list
1135  Page *blend = m_busylist;
1136  while (blend && blend->next()) blend = blend->next();
1137  // ok, might have to send free pages to other end, and (if we do have to
1138  // send something to the other end) while we're at it, send any dirty
1139  // pages which are completely full, too
1140  Page *sendlisthead = 0, *sendlisttail = 0;
1141  // loop over plist
1142  while (plist) {
1143  Page* p = plist;
1144  plist = p->next();
1145  p->setNext(0);
1146  if (p->size()) {
1147  // busy page...
1148  p->pos() = 0;
1149  // put at end of busy list
1150  if (blend) blend->setNext(p);
1151  else m_busylist = p;
1152  blend = p;
1153  } else {
1154  // free page...
1155  // Very simple algorithm: once we're done with a page, we send it back
1156  // where it came from. If it's from our end, we put it on the free list, if
1157  // it's from the other end, we send it back.
1158  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1159  (isChild() && m_pages[p] < PagesPerEnd)) {
1160  // page "belongs" to other end
1161  if (!sendlisthead) sendlisthead = p;
1162  if (sendlisttail) sendlisttail->setNext(p);
1163  sendlisttail = p;
1164  } else {
1165  // add page to freelist
1166  p->setNext(m_freelist);
1167  m_freelist = p;
1168  }
1169  }
1170  }
1171  // check if we have to send stuff to the other end
1172  if (sendlisthead) {
1173  // go through our list of dirty pages, and see what we can
1174  // send along
1175  Page* dp;
1176  while ((dp = m_dirtylist) && dp->full()) {
1177  Page* p = dp;
1178  // move head of dirty list
1179  m_dirtylist = p->next();
1180  // queue for sending
1181  p->setNext(0);
1182  sendlisttail->setNext(p);
1183  sendlisttail = p;
1184  }
1185  // poll if the other end is still alive - this needs that we first
1186  // close the write pipe of the other end when the remote end of the
1187  // connection is shutting down in doClose; we'll see that because we
1188  // get a POLLHUP on our inpipe
1189  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1190  struct pollfd fds[2];
1191  fds[0].fd = m_outpipe;
1192  fds[0].events = fds[0].revents = 0;
1193  if (m_outpipe != m_inpipe) {
1194  fds[1].fd = m_inpipe;
1195  fds[1].events = fds[1].revents = 0;
1196  } else {
1197  fds[0].events |= POLLIN;
1198  }
1199  int retVal = 0;
1200  do {
1201  retVal = ::poll(fds, nfds, 0);
1202  if (0 > retVal && EINTR == errno)
1203  continue;
1204  break;
1205  } while (true);
1206  if (0 <= retVal) {
1207  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208  if (m_outpipe != m_inpipe) {
1209  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1210  } else {
1211  if (ok && fds[0].revents & POLLIN) {
1212  unsigned ret = recvpages();
1213  if (!ret) ok = false;
1214  }
1215  }
1216 
1217  if (ok) sendpages(sendlisthead);
1218  // (if the pipe is dead already, we don't care that we leak the
1219  // contents of the pages on the send list here, so that is why
1220  // there's no else clause here)
1221  } else {
1222  throw Exception("feedPageLists: poll", errno);
1223  }
1224  }
1225 }
1226 
1228 {
1229  assert(p);
1230  assert(p == m_freelist);
1231  // remove from freelist
1232  m_freelist = p->next();
1233  p->setNext(0);
1234  // append to dirty list
1235  Page* dl = m_dirtylist;
1236  while (dl && dl->next()) dl = dl->next();
1237  if (dl) dl->setNext(p);
1238  else m_dirtylist = p;
1239 }
1240 
1242 {
1243  // queue any pages available for reading we can without blocking
1245  Page* p;
1246  // if there are no busy pages, try to get them from the other end,
1247  // block if we have to...
1248  while (!(p = m_busylist)) if (!recvpages()) return 0;
1249  return p;
1250 }
1251 
1253 {
1254  // queue any pages available for reading we can without blocking
1256  Page* p = m_dirtylist;
1257  // go to end of dirty list
1258  if (p) while (p->next()) p = p->next();
1259  if (!p || p->full()) {
1260  // need to append free page, so get one
1261  while (!(p = m_freelist)) if (!recvpages()) return 0;
1262  markPageDirty(p);
1263  }
1264  return p;
1265 }
1266 
1268 { return doFlush(true); }
1269 
1270 void BidirMMapPipe::doFlush(bool forcePartialPages)
1271 {
1272  assert(!(m_flags & failbit));
1273  // build a list of pages to flush
1274  Page *flushlisthead = 0, *flushlisttail = 0;
1275  while (m_dirtylist) {
1276  Page* p = m_dirtylist;
1277  if (!forcePartialPages && !p->full()) break;
1278  // remove dirty page from dirty list
1279  m_dirtylist = p->next();
1280  p->setNext(0);
1281  // and send it to other end
1282  if (!flushlisthead) flushlisthead = p;
1283  if (flushlisttail) flushlisttail->setNext(p);
1284  flushlisttail = p;
1285  }
1286  if (flushlisthead) sendpages(flushlisthead);
1287 }
1288 
1290 {
1291  assert(!(m_flags & failbit));
1292  // join busy and dirty lists
1293  {
1294  Page *l = m_busylist;
1295  while (l && l->next()) l = l->next();
1296  if (l) l->setNext(m_dirtylist);
1297  else m_busylist = m_dirtylist;
1298  }
1299  // empty busy and dirty pages
1300  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1301  // put them on the free list
1303  m_busylist = m_dirtylist = 0;
1304 }
1305 
1307 {
1308  // queue all pages waiting for consumption in the pipe before we give an
1309  // answer
1311  size_type retVal = 0;
1312  for (Page* p = m_busylist; p; p = p->next())
1313  retVal += p->size() - p->pos();
1314  return retVal;
1315 }
1316 
1318 {
1319  // queue all pages waiting for consumption in the pipe before we give an
1320  // answer
1322  // check if we could write to the pipe without blocking (we need to know
1323  // because we might need to check if flushing of dirty pages would block)
1324  bool couldwrite = false;
1325  {
1326  struct pollfd fds;
1327  fds.fd = m_outpipe;
1328  fds.events = POLLOUT;
1329  fds.revents = 0;
1330  int retVal = 0;
1331  do {
1332  retVal = ::poll(&fds, 1, 0);
1333  if (0 > retVal) {
1334  if (EINTR == errno) continue;
1335  throw Exception("bytesWritableNonBlocking: poll", errno);
1336  }
1337  if (1 == retVal && fds.revents & POLLOUT &&
1338  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1339  couldwrite = true;
1340  break;
1341  } while (true);
1342  }
1343  // ok, start counting bytes
1344  size_type retVal = 0;
1345  unsigned npages = 0;
1346  // go through the dirty list
1347  for (Page* p = m_dirtylist; p; p = p->next()) {
1348  ++npages;
1349  // if page only partially filled
1350  if (!p->full())
1351  retVal += p->free();
1352  if (npages >= FlushThresh && !couldwrite) break;
1353  }
1354  // go through the free list
1355  for (Page* p = m_freelist; p && (!m_dirtylist ||
1356  npages < FlushThresh || couldwrite); p = p->next()) {
1357  ++npages;
1358  retVal += Page::capacity();
1359  }
1360  return retVal;
1361 }
1362 
1364 {
1365  assert(!(m_flags & failbit));
1366  size_type nread = 0;
1367  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1368  try {
1369  while (sz) {
1370  // find next page to read from
1371  Page* p = busypage();
1372  if (!p) {
1373  m_flags |= eofbit;
1374  return nread;
1375  }
1376  unsigned char* pp = p->begin() + p->pos();
1377  size_type csz = std::min(size_type(p->remaining()), sz);
1378  std::copy(pp, pp + csz, ap);
1379  nread += csz;
1380  ap += csz;
1381  sz -= csz;
1382  p->pos() += csz;
1383  assert(p->size() >= p->pos());
1384  if (p->size() == p->pos()) {
1385  // if no unread data remains, page is free
1386  m_busylist = p->next();
1387  p->setNext(0);
1388  p->size() = 0;
1389  feedPageLists(p);
1390  }
1391  }
1392  } catch (const Exception& e) {
1393  m_flags |= rderrbit;
1394  if (m_flags & exceptionsbit) throw e;
1395  }
1396  return nread;
1397 }
1398 
1400 {
1401  assert(!(m_flags & failbit));
1402  size_type written = 0;
1403  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1404  try {
1405  while (sz) {
1406  // find next page to write to
1407  Page* p = dirtypage();
1408  if (!p) {
1409  m_flags |= eofbit;
1410  return written;
1411  }
1412  unsigned char* pp = p->begin() + p->size();
1413  size_type csz = std::min(size_type(p->free()), sz);
1414  std::copy(ap, ap + csz, pp);
1415  written += csz;
1416  ap += csz;
1417  p->size() += csz;
1418  sz -= csz;
1419  assert(p->capacity() >= p->size());
1420  if (p->full()) {
1421  // if page is full, see if we're above the flush threshold of
1422  // 3/4 of our pages
1424  doFlush(false);
1425  }
1426  }
1427  } catch (const Exception& e) {
1428  m_flags |= wrerrbit;
1429  if (m_flags & exceptionsbit) throw e;
1430  }
1431  return written;
1432 }
1433 
1435 {
1436  // go through pipes, and change flags where we already know without really
1437  // polling - stuff where we don't need poll to wait for its timeout in the
1438  // OS...
1439  bool canskiptimeout = false;
1440  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1441  std::vector<unsigned>::iterator mit = masks.begin();
1442  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1443  ++it, ++mit) {
1444  PollEntry& pe = *it;
1445  pe.revents = None;
1446  // null pipe pointer or closed pipe is invalid
1447  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1448  // check for error
1449  if (pe.pipe->bad()) pe.revents |= Error;
1450  // check for end of file
1451  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1452  // check if readable
1453  if (pe.events & Readable) {
1454  *mit |= Readable;
1455  if (pe.pipe->m_busylist) pe.revents |= Readable;
1456  }
1457  // check if writable
1458  if (pe.events & Writable) {
1459  *mit |= Writable;
1460  if (pe.pipe->m_freelist) {
1461  pe.revents |= Writable;
1462  } else {
1463  Page *dl = pe.pipe->m_dirtylist;
1464  while (dl && dl->next()) dl = dl->next();
1465  if (dl && dl->pos() < Page::capacity())
1466  pe.revents |= Writable;
1467  }
1468  }
1469  if (pe.revents) canskiptimeout = true;
1470  }
1471  // set up the data structures required for the poll syscall
1472  std::vector<pollfd> fds;
1473  fds.reserve(2 * pipes.size());
1474  std::map<int, PollEntry*> fds2pipes;
1475  for (PollVector::const_iterator it = pipes.begin();
1476  pipes.end() != it; ++it) {
1477  const PollEntry& pe = *it;
1478  struct pollfd tmp;
1479  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1480  const_cast<PollEntry*>(&pe)));
1481  tmp.events = tmp.revents = 0;
1482  // we always poll for readability; this allows us to queue pages
1483  // early
1484  tmp.events |= POLLIN;
1485  if (pe.pipe->m_outpipe != tmp.fd) {
1486  // ok, it's a pair of pipes
1487  fds.push_back(tmp);
1488  fds2pipes.insert(std::make_pair(
1489  unsigned(tmp.fd = pe.pipe->m_outpipe),
1490  const_cast<PollEntry*>(&pe)));
1491  tmp.events = 0;
1492 
1493  }
1494  if (pe.events & Writable) tmp.events |= POLLOUT;
1495  fds.push_back(tmp);
1496  }
1497  // poll
1498  int retVal = 0;
1499  do {
1500  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1501  if (0 > retVal) {
1502  if (EINTR == errno) continue;
1503  throw Exception("poll", errno);
1504  }
1505  break;
1506  } while (true);
1507  // fds may have changed state, so update...
1508  for (std::vector<pollfd>::iterator it = fds.begin();
1509  fds.end() != it; ++it) {
1510  pollfd& fe = *it;
1511  //if (!fe.revents) continue;
1512  --retVal;
1513  PollEntry& pe = *fds2pipes[fe.fd];
1514 oncemore:
1515  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1516  pe.revents |= ReadInvalid;
1517  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1518  pe.revents |= WriteInvalid;
1519  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1520  pe.revents |= ReadError;
1521  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1522  pe.revents |= WriteError;
1523  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1524  pe.revents |= ReadEndOfFile;
1525  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1526  pe.revents |= WriteEndOfFile;
1527  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1528  !(fe.revents & (POLLNVAL | POLLERR))) {
1529  // ok, there is at least one page for us to receive from the
1530  // other end
1531  if (0 == pe.pipe->recvpages()) continue;
1532  // more pages there?
1533  do {
1534  int tmp = ::poll(&fe, 1, 0);
1535  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1536  if (0 > tmp) {
1537  if (EINTR == errno) continue;
1538  throw Exception("poll", errno);
1539  }
1540  break;
1541  } while (true);
1542  }
1543  if (pe.pipe->m_busylist) pe.revents |= Readable;
1544  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1545  if (pe.pipe->m_freelist) {
1546  pe.revents |= Writable;
1547  } else {
1548  Page *dl = pe.pipe->m_dirtylist;
1549  while (dl && dl->next()) dl = dl->next();
1550  if (dl && dl->pos() < Page::capacity())
1551  pe.revents |= Writable;
1552  }
1553  }
1554  }
1555  // apply correct masks, and count pipes with pending events
1556  int npipes = 0;
1557  mit = masks.begin();
1558  for (PollVector::iterator it = pipes.begin();
1559  pipes.end() != it; ++it, ++mit)
1560  if ((it->revents &= *mit)) ++npipes;
1561  return npipes;
1562 }
1563 
1565 {
1566  size_t sz = std::strlen(str);
1567  *this << sz;
1568  if (sz) write(str, sz);
1569  return *this;
1570 }
1571 
1573 {
1574  size_t sz = 0;
1575  *this >> sz;
1576  if (good() && !eof()) {
1577  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1578  if (!str) throw Exception("realloc", errno);
1579  if (sz) read(str, sz);
1580  str[sz] = 0;
1581  }
1582  return *this;
1583 }
1584 
1586 {
1587  size_t sz = str.size();
1588  *this << sz;
1589  write(str.data(), sz);
1590  return *this;
1591 }
1592 
1594 {
1595  str.clear();
1596  size_t sz = 0;
1597  *this >> sz;
1598  if (good() && !eof()) {
1599  str.reserve(sz);
1600  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1601  }
1602  return *this;
1603 }
1604 
1606 
1607 #ifdef TEST_BIDIRMMAPPIPE
1608 using namespace RooFit;
1609 
1610 int simplechild(BidirMMapPipe& pipe)
1611 {
1612  // child does an echo loop
1613  while (pipe.good() && !pipe.eof()) {
1614  // read a string
1615  std::string str;
1616  pipe >> str;
1617  if (!pipe) return -1;
1618  if (pipe.eof()) break;
1619  if (!str.empty()) {
1620  std::cout << "[CHILD] : read: " << str << std::endl;
1621  str = "... early in the morning?";
1622  }
1623  pipe << str << BidirMMapPipe::flush;
1624  // did our parent tell us to shut down?
1625  if (str.empty()) break;
1626  if (!pipe) return -1;
1627  if (pipe.eof()) break;
1628  std::cout << "[CHILD] : wrote: " << str << std::endl;
1629  }
1630  pipe.close();
1631  return 0;
1632 }
1633 
1634 #include <sstream>
1635 int randomchild(BidirMMapPipe& pipe)
1636 {
1637  // child sends out something at random intervals
1638  ::srand48(::getpid());
1639  {
1640  // wait for parent's go ahead signal
1641  std::string s;
1642  pipe >> s;
1643  }
1644  // no shutdown sequence needed on this side - we're producing the data,
1645  // and the parent can just read until we're done (when it'll get EOF)
1646  for (int i = 0; i < 5; ++i) {
1647  // sleep a random time between 0 and .9 seconds
1648  ::usleep(int(1e6 * ::drand48()));
1649  std::ostringstream buf;
1650  buf << "child pid " << ::getpid() << " sends message " << i;
1651  std::string str = buf.str();
1652  std::cout << "[CHILD] : " << str << std::endl;
1653  pipe << str << BidirMMapPipe::flush;
1654  if (!pipe) return -1;
1655  if (pipe.eof()) break;
1656  }
1657  // tell parent we're shutting down
1658  pipe << "" << BidirMMapPipe::flush;
1659  // wait for parent to acknowledge
1660  std::string s;
1661  pipe >> s;
1662  pipe.close();
1663  return 0;
1664 }
1665 
1666 int benchchildrtt(BidirMMapPipe& pipe)
1667 {
1668  // child does the equivalent of listening for pings and sending the
1669  // packet back
1670  char* str = 0;
1671  while (pipe && !pipe.eof()) {
1672  pipe >> str;
1673  if (!pipe) {
1674  std::free(str);
1675  pipe.close();
1676  return -1;
1677  }
1678  if (pipe.eof()) break;
1679  pipe << str << BidirMMapPipe::flush;
1680  // if we have just completed the shutdown handshake, we break here
1681  if (!std::strlen(str)) break;
1682  }
1683  std::free(str);
1684  pipe.close();
1685  return 0;
1686 }
1687 
1688 int benchchildsink(BidirMMapPipe& pipe)
1689 {
1690  // child behaves like a sink
1691  char* str = 0;
1692  while (pipe && !pipe.eof()) {
1693  pipe >> str;
1694  if (!std::strlen(str)) break;
1695  }
1696  pipe << "" << BidirMMapPipe::flush;
1697  std::free(str);
1698  pipe.close();
1699  return 0;
1700 }
1701 
1702 int benchchildsource(BidirMMapPipe& pipe)
1703 {
1704  // child behaves like a source
1705  char* str = 0;
1706  for (unsigned i = 0; i <= 24; ++i) {
1707  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1708  std::memset(str, '4', 1 << i);
1709  str[1 << i] = 0;
1710  for (unsigned j = 0; j < 1 << 7; ++j) {
1711  pipe << str;
1712  if (!pipe || pipe.eof()) {
1713  std::free(str);
1714  pipe.close();
1715  return -1;
1716  }
1717  }
1718  // tell parent we're done with this block size
1719  pipe << "" << BidirMMapPipe::flush;
1720  }
1721  // tell parent to shut down
1722  pipe << "" << BidirMMapPipe::flush;
1723  std::free(str);
1724  pipe.close();
1725  return 0;
1726 }
1727 
1728 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1729 {
1730  // create a pipe with the given child at the remote end
1731  BidirMMapPipe *p = new BidirMMapPipe();
1732  if (p->isChild()) {
1733  int retVal = childexec(*p);
1734  delete p;
1735  std::exit(retVal);
1736  }
1737  return p;
1738 }
1739 
1740 #include <sys/time.h>
1741 #include <iomanip>
1742 int main()
1743 {
1744  // simple echo loop test
1745  {
1746  std::cout << "[PARENT]: simple challenge-response test, "
1747  "one child:" << std::endl;
1748  BidirMMapPipe* pipe = spawnChild(simplechild);
1749  for (int i = 0; i < 5; ++i) {
1750  std::string str("What shall we do with a drunken sailor...");
1751  *pipe << str << BidirMMapPipe::flush;
1752  if (!*pipe) return -1;
1753  std::cout << "[PARENT]: wrote: " << str << std::endl;
1754  *pipe >> str;
1755  if (!*pipe) return -1;
1756  std::cout << "[PARENT]: read: " << str << std::endl;
1757  }
1758  // send shutdown string
1759  *pipe << "" << BidirMMapPipe::flush;
1760  // wait for shutdown handshake
1761  std::string s;
1762  *pipe >> s;
1763  int retVal = pipe->close();
1764  std::cout << "[PARENT]: exit status of child: " << retVal <<
1765  std::endl;
1766  if (retVal) return retVal;
1767  delete pipe;
1768  }
1769  // simple poll test - children send 5 results in random intervals
1770  {
1771  unsigned nch = 20;
1772  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1773  " children:" << std::endl;
1775  // poll data structure
1777  pipes.reserve(nch);
1778  // spawn children
1779  for (unsigned i = 0; i < nch; ++i) {
1780  std::cout << "[PARENT]: spawning child " << i << std::endl;
1781  pipes.push_back(PollEntry(spawnChild(randomchild),
1783  }
1784  // wake children up
1785  std::cout << "[PARENT]: waking up children" << std::endl;
1786  for (unsigned i = 0; i < nch; ++i)
1787  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1788  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1789  // while at least some children alive
1790  while (!pipes.empty()) {
1791  // poll, wait until status change (infinite timeout)
1792  int npipes = BidirMMapPipe::poll(pipes, -1);
1793  // scan for pipes with changed status
1794  for (std::vector<PollEntry>::iterator it = pipes.begin();
1795  npipes && pipes.end() != it; ) {
1796  if (!it->revents) {
1797  // unchanged, next one
1798  ++it;
1799  continue;
1800  }
1801  --npipes; // maybe we can stop early...
1802  // read from pipes which are readable
1803  if (it->revents & BidirMMapPipe::Readable) {
1804  std::string s;
1805  *(it->pipe) >> s;
1806  if (!s.empty()) {
1807  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1808  ": " << s << std::endl;
1809  ++it;
1810  continue;
1811  } else {
1812  // child is shutting down...
1813  *(it->pipe) << "" << BidirMMapPipe::flush;
1814  goto childcloses;
1815  }
1816  }
1817  // retire pipes with error or end-of-file condition
1818  if (it->revents & (BidirMMapPipe::Error |
1821  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1822  " revents" <<
1823  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1824  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1825  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1826  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1827  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1828  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1829  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1830  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1831  std::endl;
1832 childcloses:
1833  int retVal = it->pipe->close();
1834  std::cout << "[PARENT]: child exit status: " <<
1835  retVal << ", number of children still alive: " <<
1836  (pipes.size() - 1) << std::endl;
1837  if (retVal) return retVal;
1838  delete it->pipe;
1839  it = pipes.erase(it);
1840  continue;
1841  }
1842  }
1843  }
1844  }
1845  // little benchmark - round trip time
1846  {
1847  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848  for (unsigned i = 0; i <= 24; ++i) {
1849  char *s = new char[1 + (1 << i)];
1850  std::memset(s, 'A', 1 << i);
1851  s[1 << i] = 0;
1852  const unsigned n = 1 << 7;
1853  double avg = 0., min = 1e42, max = -1e42;
1854  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1855  for (unsigned j = n; j--; ) {
1856  struct timeval t1;
1857  ::gettimeofday(&t1, 0);
1858  *pipe << s << BidirMMapPipe::flush;
1859  if (!*pipe || pipe->eof()) break;
1860  *pipe >> s;
1861  if (!*pipe || pipe->eof()) break;
1862  struct timeval t2;
1863  ::gettimeofday(&t2, 0);
1864  t2.tv_sec -= t1.tv_sec;
1865  t2.tv_usec -= t1.tv_usec;
1866  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867  if (dt < min) min = dt;
1868  if (dt > max) max = dt;
1869  avg += dt;
1870  }
1871  // send a shutdown string
1872  *pipe << "" << BidirMMapPipe::flush;
1873  // get child's shutdown ok
1874  *pipe >> s;
1875  avg /= double(n);
1876  avg *= 1e6; min *= 1e6; max *= 1e6;
1877  int retVal = pipe->close();
1878  if (retVal) {
1879  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1880  return retVal;
1881  }
1882  delete pipe;
1883  // there is a factor 2 in the formula for the transfer rate below,
1884  // because we transfer data of twice the size of the block - once
1885  // to the child, and once for the return trip
1886  std::cout << "block size " << std::setw(9) << (1 << i) <<
1887  " avg " << std::setw(7) << avg << " us min " <<
1888  std::setw(7) << min << " us max " << std::setw(7) << max <<
1889  "us speed " << std::setw(9) <<
1890  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1891  " MB/s" << std::endl;
1892  delete[] s;
1893  }
1894  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1895  }
1896  // little benchmark - child as sink
1897  {
1898  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1899  for (unsigned i = 0; i <= 24; ++i) {
1900  char *s = new char[1 + (1 << i)];
1901  std::memset(s, 'A', 1 << i);
1902  s[1 << i] = 0;
1903  const unsigned n = 1 << 7;
1904  double avg = 0., min = 1e42, max = -1e42;
1905  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1906  for (unsigned j = n; j--; ) {
1907  struct timeval t1;
1908  ::gettimeofday(&t1, 0);
1909  // streaming mode - we do not flush here
1910  *pipe << s;
1911  if (!*pipe || pipe->eof()) break;
1912  struct timeval t2;
1913  ::gettimeofday(&t2, 0);
1914  t2.tv_sec -= t1.tv_sec;
1915  t2.tv_usec -= t1.tv_usec;
1916  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1917  if (dt < min) min = dt;
1918  if (dt > max) max = dt;
1919  avg += dt;
1920  }
1921  // send a shutdown string
1922  *pipe << "" << BidirMMapPipe::flush;
1923  // get child's shutdown ok
1924  *pipe >> s;
1925  avg /= double(n);
1926  avg *= 1e6; min *= 1e6; max *= 1e6;
1927  int retVal = pipe->close();
1928  if (retVal) {
1929  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1930  return retVal;
1931  }
1932  delete pipe;
1933  std::cout << "block size " << std::setw(9) << (1 << i) <<
1934  " avg " << std::setw(7) << avg << " us min " <<
1935  std::setw(7) << min << " us max " << std::setw(7) << max <<
1936  "us speed " << std::setw(9) <<
1937  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1938  " MB/s" << std::endl;
1939  delete[] s;
1940  }
1941  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1942  }
1943  // little benchmark - child as source
1944  {
1945  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1946  char *s = 0;
1947  double avg = 0., min = 1e42, max = -1e42;
1948  unsigned n = 0, bsz = 0;
1949  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1950  while (*pipe && !pipe->eof()) {
1951  struct timeval t1;
1952  ::gettimeofday(&t1, 0);
1953  // streaming mode - we do not flush here
1954  *pipe >> s;
1955  if (!*pipe || pipe->eof()) break;
1956  struct timeval t2;
1957  ::gettimeofday(&t2, 0);
1958  t2.tv_sec -= t1.tv_sec;
1959  t2.tv_usec -= t1.tv_usec;
1960  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1961  if (std::strlen(s)) {
1962  ++n;
1963  if (dt < min) min = dt;
1964  if (dt > max) max = dt;
1965  avg += dt;
1966  bsz = std::strlen(s);
1967  } else {
1968  if (!n) break;
1969  // next block size
1970  avg /= double(n);
1971  avg *= 1e6; min *= 1e6; max *= 1e6;
1972 
1973  std::cout << "block size " << std::setw(9) << bsz <<
1974  " avg " << std::setw(7) << avg << " us min " <<
1975  std::setw(7) << min << " us max " << std::setw(7) <<
1976  max << "us speed " << std::setw(9) <<
1977  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1978  " MB/s" << std::endl;
1979  n = 0;
1980  avg = 0.;
1981  min = 1e42;
1982  max = -1e42;
1983  }
1984  }
1985  int retVal = pipe->close();
1986  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1987  if (retVal) return retVal;
1988  delete pipe;
1989  std::free(s);
1990  }
1991  return 0;
1992 }
1993 #endif // TEST_BIDIRMMAPPIPE
1994 #endif // _WIN32
1995 
1996 // vim: ft=cpp:sw=4:tw=78:et
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:69
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or&#39;ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
bool empty() const
return true if no used page groups in this chunk
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:81
end of file reached
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
unsigned m_refcnt
reference counter
bool bad() const
true on I/O error
Page * m_freelist
linked list: free pages
write pipe in end-of-file state
fill
Definition: fit1_py.py:6
don&#39;t know yet what&#39;ll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:62
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or&#39;ed bitmask)
bool isChild() const
return if this end of the pipe is the child end
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:59
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:71
#define malloc
Definition: civetweb.c:818
pid_t m_childPid
pid of the child (zero if we&#39;re child)
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
Page * page(unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
size_type read(void *addr, size_type sz)
read from pipe
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:90
void flush()
flush buffers with unwritten data
logical failure (e.g. pipe closed)
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
pipe can be written to
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:88
Page * busypage()
get a busy page to read data from (may block)
unsigned nPagesPerGroup() const
return number of pages per page group
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:56
void swap(Pages &other)
swap with other&#39;s contents
Page * dirtypage()
get a dirty page to write data to (may block)
#define realloc
Definition: civetweb.c:820
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:70
bool eof() const
true if end-of-file
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:64
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
TLine * l
Definition: textangle.C:4
bool isParent() const
return if this end of the pipe is the parent end
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
unsigned pageno(Page *p) const
perform page to page number mapping
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
void zap(Pages &p)
free all pages except for those pointed to by p
void Copy(void *source, void *dest)
double f(double x)
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
#define free
Definition: civetweb.c:821
pages per pipe end
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
bool contains(const Pages &p) const
return if p is contained in this PageChunk
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:68
unsigned len() const
return length of chunk
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:65
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
error reporting with exceptions
unsigned npages() const
return number of pages accessible
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
pages shared (child + parent)
static unsigned long masks[]
Definition: gifencode.c:211
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:57
bool full() const
return true if no free page groups in this chunk
pipe has data for reading
bool good() const
status of stream is good
nothing special on this pipe
bool closed() const
true if closed
const Int_t n
Definition: legend1.C:16
int m_flags
flags (e.g. end of file)
mmap doesn&#39;t work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:86
int main(int argc, char **argv)
#define BEGIN_NAMESPACE_ROOFIT