Logo ROOT   master
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <cassert>
17 #include <iostream>
18 #include <algorithm>
19 #include <exception>
20 
21 #include <poll.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <unistd.h>
26 #include <stdlib.h>
27 #include <pthread.h>
28 #include <sys/mman.h>
29 #include <sys/stat.h>
30 #include <sys/wait.h>
31 #include <sys/socket.h>
32 
33 #include "BidirMMapPipe.h"
34 
35 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
36 #define END_NAMESPACE_ROOFIT }
37 
39 
40 /// namespace for implementation details of BidirMMapPipe
41 namespace BidirMMapPipe_impl {
42  /** @brief exception to throw if low-level OS calls go wrong
43  *
44  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
45  * @date 2013-07-07
46  */
47  class BidirMMapPipeException : public std::exception
48  {
49  private:
50  enum {
51  s_sz = 256 ///< length of buffer
52  };
53  char m_buf[s_sz]; ///< buffer containing the error message
54 
55  /// for the POSIX version of strerror_r
56  static int dostrerror_r(int err, char* buf, std::size_t sz,
57  int (*f)(int, char*, std::size_t))
58  { return f(err, buf, sz); }
59  /// for the GNU version of strerror_r
60  static int dostrerror_r(int, char*, std::size_t,
61  char* (*f)(int, char*, std::size_t));
62  public:
63  /// constructor taking error code, hint on operation (msg)
64  BidirMMapPipeException(const char* msg, int err);
65  /// return a destcription of what went wrong
66  virtual const char* what() const noexcept { return m_buf; }
67  };
68 
69  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
70  {
71  std::size_t msgsz = std::strlen(msg);
72  if (msgsz) {
73  msgsz = std::min(msgsz, std::size_t(s_sz));
74  std::copy(msg, msg + msgsz, m_buf);
75  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
76  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
77  }
78  if (msgsz < s_sz) {
79  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
80  // have to sort it out with overloads
81  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
82  }
83  m_buf[s_sz - 1] = 0; // enforce zero-termination
84  }
85 
86  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
87  std::size_t sz, char* (*f)(int, char*, std::size_t))
88  {
89  buf[0] = 0;
90  char *tmp = f(err, buf, sz);
91  if (tmp && tmp != buf) {
92  std::strncpy(buf, tmp, sz);
93  buf[sz - 1] = 0;
94  if (std::strlen(tmp) > sz - 1) return ERANGE;
95  }
96  return 0;
97  }
98 
99  /** @brief class representing the header structure in an mmapped page
100  *
101  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
102  * @date 2013-07-07
103  *
104  * contains a field to put pages into a linked list, a field for the size
105  * of the data being transmitted, and a field for the position until which
106  * the data has been read
107  */
108  class Page
109  {
110  private:
111  // use as small a data type as possible to maximise payload area
112  // of pages
113  short m_next; ///< next page in list (in pagesizes)
114  unsigned short m_size; ///< size of payload (in bytes)
115  unsigned short m_pos; ///< index of next byte in payload area
116  /// copy construction forbidden
117  Page(const Page&) {}
118  /// assigment forbidden
119  Page& operator=(const Page&) = delete;
120  public:
121  /// constructor
122  Page() : m_next(0), m_size(0), m_pos(0)
123  {
124  // check that short is big enough - must be done at runtime
125  // because the page size is not known until runtime
126  assert(std::numeric_limits<unsigned short>::max() >=
128  }
129  /// set pointer to next page
130  void setNext(const Page* p);
131  /// return pointer to next page
132  Page* next() const;
133  /// return reference to size field
134  unsigned short& size() { return m_size; }
135  /// return size (of payload data)
136  unsigned size() const { return m_size; }
137  /// return reference to position field
138  unsigned short& pos() { return m_pos; }
139  /// return position
140  unsigned pos() const { return m_pos; }
141  /// return pointer to first byte in payload data area of page
142  inline unsigned char* begin() const
143  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
144  + sizeof(Page); }
145  /// return pointer to first byte in payload data area of page
146  inline unsigned char* end() const
147  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148  + PageChunk::pagesize(); }
149  /// return the capacity of the page
150  static unsigned capacity()
151  { return PageChunk::pagesize() - sizeof(Page); }
152  /// true if page empty
153  bool empty() const { return !m_size; }
154  /// true if page partially filled
155  bool filled() const { return !empty(); }
156  /// free space left (to be written to)
157  unsigned free() const { return capacity() - m_size; }
158  /// bytes remaining to be read
159  unsigned remaining() const { return m_size - m_pos; }
160  /// true if page completely full
161  bool full() const { return !free(); }
162  };
163 
164  void Page::setNext(const Page* p)
165  {
166  if (!p) {
167  m_next = 0;
168  } else {
169  const char* p1 = reinterpret_cast<char*>(this);
170  const char* p2 = reinterpret_cast<const char*>(p);
171  std::ptrdiff_t tmp = p2 - p1;
172  // difference must be divisible by page size
173  assert(!(tmp % PageChunk::pagesize()));
174  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
175  m_next = tmp;
176  // no truncation when saving in a short
177  assert(m_next == tmp);
178  // final check: next() must return p
179  assert(next() == p);
180  }
181  }
182 
183  Page* Page::next() const
184  {
185  if (!m_next) return 0;
186  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
187  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
188  return reinterpret_cast<Page*>(ptmp);
189  }
190 
191  /** @brief class representing a page pool
192  *
193  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
194  * @date 2013-07-24
195  *
196  * pool of mmapped pages (on systems which support it, on all others, the
197  * functionality is emulated with dynamically allocated memory)
198  *
199  * in most operating systems there is a limit to how many mappings any one
200  * process is allowed to request; for this reason, we mmap a relatively
201  * large amount up front, and then carve off little pieces as we need them
202  *
203  * Moreover, some systems have too large a physical page size in their MMU
204  * for the code to handle (we want offsets and lengths to fit into 16
205  * bits), so we carve such big physical pages into smaller logical Pages
206  * if needed. The largest logical page size is currently 16 KiB.
207  */
208  class PagePool {
209  private:
210  /// convenience typedef
211  typedef BidirMMapPipeException Exception;
212 
213  enum {
214  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
215  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
216  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
217  };
218  /// a chunk of memory in the pool
219  typedef BidirMMapPipe_impl::PageChunk Chunk;
220  /// list of chunks
221  typedef std::list<Chunk*> ChunkList;
222 
223  friend class BidirMMapPipe_impl::PageChunk;
224  public:
225  /// convenience typedef
227  /// constructor
228  PagePool(unsigned nPagesPerGroup);
229  /// destructor
230  ~PagePool();
231  /// pop a free element out of the pool
232  Pages pop();
233 
234  /// return (logical) page size of the system
235  static unsigned pagesize() { return PageChunk::pagesize(); }
236  /// return variety of mmap supported on the system
237  static MMapVariety mmapVariety()
238  { return PageChunk::mmapVariety(); }
239 
240  /// return number of pages per group (ie. as returned by pop())
241  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
242 
243  /// zap the pool (unmap all but Pages p)
244  void zap(Pages& p);
245 
246  private:
247  /// list of chunks used by the pool
248  ChunkList m_chunks;
249  /// list of chunks used by the pool which are not full
250  ChunkList m_freelist;
251  /// chunk size map (histogram of chunk sizes)
252  unsigned m_szmap[(maxsz - minsz) / szincr];
253  /// current chunk size
254  int m_cursz;
255  /// page group size
256  unsigned m_nPgPerGrp;
257 
258  /// adjust _cursz to current largest block
259  void updateCurSz(int sz, int incr);
260  /// find size of next chunk to allocate (in a hopefully smart way)
261  int nextChunkSz() const;
262  /// release a chunk
263  void putOnFreeList(Chunk* chunk);
264  /// release a chunk
265  void release(Chunk* chunk);
266  };
267 
268  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
269  m_pimpl(new impl)
270  {
271  assert(npg < 256);
272  m_pimpl->m_parent = parent;
273  m_pimpl->m_pages = pages;
274  m_pimpl->m_refcnt = 1;
275  m_pimpl->m_npages = npg;
276  /// initialise pages
277  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
278  }
279 
281  unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
283 
285  {
286  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
287  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
288  delete m_pimpl;
289  }
290  }
291 
292  Pages::Pages(const Pages& other) :
293  m_pimpl(other.m_pimpl)
294  { ++(m_pimpl->m_refcnt); }
295 
296  Pages& Pages::operator=(const Pages& other)
297  {
298  if (&other == this) return *this;
299  if (--(m_pimpl->m_refcnt)) {
300  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
301  delete m_pimpl;
302  }
303  m_pimpl = other.m_pimpl;
304  ++(m_pimpl->m_refcnt);
305  return *this;
306  }
307 
308  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
309 
310  Page* Pages::page(unsigned pgno) const
311  {
312  assert(pgno < m_pimpl->m_npages);
313  unsigned char* pptr =
314  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
315  pptr += pgno * pagesize();
316  return reinterpret_cast<Page*>(pptr);
317  }
318 
319  unsigned Pages::pageno(Page* p) const
320  {
321  const unsigned char* pptr =
322  reinterpret_cast<const unsigned char*>(p);
323  const unsigned char* bptr =
324  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
325  assert(0 == ((pptr - bptr) % pagesize()));
326  const unsigned nr = (pptr - bptr) / pagesize();
327  assert(nr < m_pimpl->m_npages);
328  return nr;
329  }
330 
332  {
333  // find out page size of system
334  long pgsz = sysconf(_SC_PAGESIZE);
335  if (-1 == pgsz) throw Exception("sysconf", errno);
336  if (pgsz > 512 && pgsz > long(sizeof(Page)))
337  return pgsz;
338 
339  // in case of failure or implausible value, use a safe default: 4k
340  // page size, and do not try to mmap
341  s_mmapworks = Copy;
342  return 1 << 12;
343  }
344 
345  PageChunk::PageChunk(PagePool* parent,
346  unsigned length, unsigned nPgPerGroup) :
347  m_begin(dommap(length)),
348  m_end(reinterpret_cast<void*>(
349  reinterpret_cast<unsigned char*>(m_begin) + length)),
350  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
351  {
352  // ok, push groups of pages onto freelist here
353  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
354  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
355  while (p < pend) {
356  m_freelist.push_back(reinterpret_cast<void*>(p));
357  p += nPgPerGroup * PagePool::pagesize();
358  }
359  }
360 
362  {
363  if (m_parent) assert(empty());
364  if (m_begin) domunmap(m_begin, len());
365  }
366 
367  bool PageChunk::contains(const Pages& p) const
368  { return p.m_pimpl->m_parent == this; }
369 
371  {
372  assert(!m_freelist.empty());
373  void* p = m_freelist.front();
374  m_freelist.pop_front();
375  ++m_nUsedGrp;
376  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
377  }
378 
379  void PageChunk::push(const Pages& p)
380  {
381  assert(contains(p));
382  bool wasempty = m_freelist.empty();
383  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
384  --m_nUsedGrp;
385  if (m_parent) {
386  // notify parent if we need to be put on the free list again
387  if (wasempty) m_parent->putOnFreeList(this);
388  // notify parent if we're empty
389  if (empty()) return m_parent->release(this);
390  }
391  }
392 
393  void* PageChunk::dommap(unsigned len)
394  {
395  assert(len && 0 == (len % s_physpgsz));
396  // ok, the idea here is to try the different methods of mmapping, and
397  // choose the first one that works. we have four flavours:
398  // 1 - anonymous mmap (best)
399  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
400  // bit more tedious to set up, since you need to open/close a
401  // device file)
402  // 3 - mmap of a temporary file (very tedious to set up - need to
403  // create a temporary file, delete it, make the underlying storage
404  // large enough, then mmap the fd and close it)
405  // 4 - if all those fail, we malloc the buffers, and copy the data
406  // through the OS (then we're no better than normal pipes)
407  static bool msgprinted = false;
408  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
409 #if defined(MAP_ANONYMOUS)
410 #undef MYANONFLAG
411 #define MYANONFLAG MAP_ANONYMOUS
412 #elif defined(MAP_ANON)
413 #undef MYANONFLAG
414 #define MYANONFLAG MAP_ANON
415 #else
416 #undef MYANONFLAG
417 #endif
418 #ifdef MYANONFLAG
419  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
420  MYANONFLAG | MAP_SHARED, -1, 0);
421  if (MAP_FAILED == retVal) {
422  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
423  } else {
424  assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
426  if (BidirMMapPipe::debugflag() && !msgprinted) {
427  std::cerr << " INFO: In " << __func__ << " (" <<
428  __FILE__ << ", line " << __LINE__ <<
429  "): anonymous mmapping works, excellent!" <<
430  std::endl;
431  msgprinted = true;
432  }
433  return retVal;
434  }
435 #endif
436 #undef MYANONFLAG
437  }
438  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
439  // ok, no anonymous mappings supported directly, so try to map
440  // /dev/zero which has much the same effect on many systems
441  int fd = ::open("/dev/zero", O_RDWR);
442  if (-1 == fd)
443  throw Exception("open /dev/zero", errno);
444  void* retVal = ::mmap(0, len,
445  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
446  if (MAP_FAILED == retVal) {
447  int errsv = errno;
448  ::close(fd);
449  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
450  } else {
451  assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
453  }
454  if (-1 == ::close(fd))
455  throw Exception("close /dev/zero", errno);
456  if (BidirMMapPipe::debugflag() && !msgprinted) {
457  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
458  ", line " << __LINE__ << "): mmapping /dev/zero works, "
459  "very good!" << std::endl;
460  msgprinted = true;
461  }
462  return retVal;
463  }
464  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
465  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
466  int fd;
467  // open temp file
468  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
469  // remove it, but keep fd open
470  if (-1 == ::unlink(name)) {
471  int errsv = errno;
472  ::close(fd);
473  throw Exception("unlink", errsv);
474  }
475  // make it the right size: lseek
476  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
477  int errsv = errno;
478  ::close(fd);
479  throw Exception("lseek", errsv);
480  }
481  // make it the right size: write a byte
482  if (1 != ::write(fd, name, 1)) {
483  int errsv = errno;
484  ::close(fd);
485  throw Exception("write", errsv);
486  }
487  // do mmap
488  void* retVal = ::mmap(0, len,
489  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
490  if (MAP_FAILED == retVal) {
491  int errsv = errno;
492  ::close(fd);
493  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
494  } else {
495  assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
497  }
498  if (-1 == ::close(fd)) {
499  int errsv = errno;
500  ::munmap(retVal, len);
501  throw Exception("close", errsv);
502  }
503  if (BidirMMapPipe::debugflag() && !msgprinted) {
504  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
505  ", line " << __LINE__ << "): mmapping temporary files "
506  "works, good!" << std::endl;
507  msgprinted = true;
508  }
509  return retVal;
510  }
511  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
512  // fallback solution: mmap does not work on this OS (or does not
513  // work for what we want to use it), so use a normal buffer of
514  // memory instead, and collect data in that buffer - this needs an
515  // additional write/read to/from the pipe(s), but there you go...
516  if (BidirMMapPipe::debugflag() && !msgprinted) {
517  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
518  ", line " << __LINE__ << "): anonymous mmapping of "
519  "shared buffers failed, falling back to read/write on "
520  " pipes!" << std::endl;
521  msgprinted = true;
522  }
523  s_mmapworks = Copy;
524  void* retVal = std::malloc(len);
525  if (!retVal) throw Exception("malloc", errno);
526  return retVal;
527  }
528  // should never get here
529  assert(false);
530  return 0;
531  }
532 
533  void PageChunk::domunmap(void* addr, unsigned len)
534  {
535  assert(len && 0 == (len % s_physpgsz));
536  if (addr) {
537  assert(Unknown != s_mmapworks);
538  if (Copy != s_mmapworks) {
539  if (-1 == ::munmap(addr, len))
540  throw Exception("munmap", errno);
541  } else {
542  std::free(addr);
543  }
544  }
545  }
546 
548  {
549  // try to mprotect the other bits of the pool with no access...
550  // we'd really like a version of mremap here that can unmap all the
551  // other pages in the chunk, but that does not exist, so we protect
552  // the other pages in this chunk such that they may neither be read,
553  // written nor executed, only the pages we're interested in for
554  // communications stay readable and writable
555  //
556  // if an OS does not support changing the protection of a part of an
557  // mmapped area, the mprotect calls below should just fail and not
558  // change any protection, so we're a little less safe against
559  // corruption, but everything should still work
560  if (Copy != s_mmapworks) {
561  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
562  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
563  unsigned char* p2 = p1 + p.npages() * s_physpgsz;
564  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
565  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
566  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
567  }
568  m_parent = 0;
569  m_freelist.clear();
570  m_nUsedGrp = 1;
571  p.m_pimpl->m_parent = 0;
572  m_begin = m_end = 0;
573  // commit suicide
574  delete this;
575  }
576 
577  PagePool::PagePool(unsigned nPgPerGroup) :
578  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
579  {
580  // if logical and physical page size differ, we may have to adjust
581  // m_nPgPerGrp to make things fit
583  const unsigned mult =
585  const unsigned desired = nPgPerGroup * PageChunk::pagesize();
586  // round up to to next physical page boundary
587  const unsigned actual = mult *
588  (desired / mult + bool(desired % mult));
589  const unsigned newPgPerGrp = actual / PageChunk::pagesize();
590  if (BidirMMapPipe::debugflag()) {
591  std::cerr << " INFO: In " << __func__ << " (" <<
592  __FILE__ << ", line " << __LINE__ <<
593  "): physical page size " << PageChunk::physPgSz() <<
594  ", subdividing into logical pages of size " <<
595  PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
596  m_nPgPerGrp << " -> " << newPgPerGrp <<
597  std::endl;
598  }
599  assert(newPgPerGrp >= m_nPgPerGrp);
600  m_nPgPerGrp = newPgPerGrp;
601  }
602  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
603  }
604 
605  PagePool::~PagePool()
606  {
607  m_freelist.clear();
608  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
609  delete *it;
610  m_chunks.clear();
611  }
612 
613  void PagePool::zap(Pages& p)
614  {
615  // unmap all pages but those pointed to by p
616  m_freelist.clear();
617  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
618  if ((*it)->contains(p)) {
619  (*it)->zap(p);
620  } else {
621  delete *it;
622  }
623  }
624  m_chunks.clear();
625  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
626  m_cursz = minsz;
627  }
628 
629  Pages PagePool::pop()
630  {
631  if (m_freelist.empty()) {
632  // allocate and register new chunk and put it on the freelist
633  const int sz = nextChunkSz();
634  Chunk *c = new Chunk(this,
635  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
636  m_chunks.push_front(c);
637  m_freelist.push_back(c);
638  updateCurSz(sz, +1);
639  }
640  // get free element from first chunk on _freelist
641  Chunk* c = m_freelist.front();
642  Pages p(c->pop());
643  // full chunks are removed from _freelist
644  if (c->full()) m_freelist.pop_front();
645  return p;
646  }
647 
648  void PagePool::release(PageChunk* chunk)
649  {
650  assert(chunk->empty());
651  // find chunk on freelist and remove
652  ChunkList::iterator it = std::find(
653  m_freelist.begin(), m_freelist.end(), chunk);
654  if (m_freelist.end() == it)
655  throw Exception("PagePool::release(PageChunk*)", EINVAL);
656  m_freelist.erase(it);
657  // find chunk in m_chunks and remove
658  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
659  if (m_chunks.end() == it)
660  throw Exception("PagePool::release(PageChunk*)", EINVAL);
661  m_chunks.erase(it);
662  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
663  delete chunk;
664  updateCurSz(sz, -1);
665  }
666 
667  void PagePool::putOnFreeList(PageChunk* chunk)
668  {
669  assert(!chunk->full());
670  m_freelist.push_back(chunk);
671  }
672 
673  void PagePool::updateCurSz(int sz, int incr)
674  {
675  m_szmap[(sz - minsz) / szincr] += incr;
676  m_cursz = minsz;
677  for (int i = (maxsz - minsz) / szincr; i--; ) {
678  if (m_szmap[i]) {
679  m_cursz += i * szincr;
680  break;
681  }
682  }
683  }
684 
685  int PagePool::nextChunkSz() const
686  {
687  // no chunks with space available, figure out chunk size
688  int sz = m_cursz;
689  if (m_chunks.empty()) {
690  // if we start allocating chunks, we start from minsz
691  sz = minsz;
692  } else {
693  if (minsz >= sz) {
694  // minimal sized chunks are always grown
695  sz = minsz + szincr;
696  } else {
697  if (1 != m_chunks.size()) {
698  // if we have more than one completely filled chunk, grow
699  sz += szincr;
700  } else {
701  // just one chunk left, try shrinking chunk size
702  sz -= szincr;
703  }
704  }
705  }
706  // clamp size to allowed range
707  if (sz > maxsz) sz = maxsz;
708  if (sz < minsz) sz = minsz;
709  return sz;
710  }
711 }
712 
713 // static BidirMMapPipe members
714 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
715 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
716 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
719 
720 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
721 {
722  if (!s_pagepool)
723  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
724  return *s_pagepool;
725 }
726 
728 {
729  pthread_mutex_lock(&s_openpipesmutex);
730  while (!s_openpipes.empty()) {
731  BidirMMapPipe *p = s_openpipes.front();
732  pthread_mutex_unlock(&s_openpipesmutex);
733  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
734  p->doClose(true, true);
735  pthread_mutex_lock(&s_openpipesmutex);
736  }
737  pthread_mutex_unlock(&s_openpipesmutex);
738 }
739 
741  m_pages(pagepool().pop())
742 {
743  // free pages again
745  if (!s_pagepoolrefcnt) {
746  delete s_pagepool;
747  s_pagepool = 0;
748  }
749 }
750 
751 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
752  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
753  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
754  m_parentPid(::getpid())
755 
756 {
758  assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
759  int fds[4] = { -1, -1, -1, -1 };
760  int myerrno;
761  static bool firstcall = true;
762  if (useExceptions) m_flags |= exceptionsbit;
763 
764  try {
765  if (firstcall) {
766  firstcall = false;
767  // register a cleanup handler to make sure all BidirMMapPipes are torn
768  // down, and child processes are sent a SIGTERM
769  if (0 != atexit(BidirMMapPipe::teardownall))
770  throw Exception("atexit", errno);
771  }
772 
773  // build free lists
774  for (unsigned i = 1; i < TotPages; ++i)
775  m_pages[i - 1]->setNext(m_pages[i]);
776  m_pages[PagesPerEnd - 1]->setNext(0);
777  if (!useSocketpair) {
778  // create pipes
779  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
780  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
781  } else {
782  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
783  throw Exception("socketpair", errno);
784  }
785  // fork the child
786  pthread_mutex_lock(&s_openpipesmutex);
787  char c;
788  switch ((m_childPid = ::fork())) {
789  case -1: // error in fork()
790  myerrno = errno;
791  pthread_mutex_unlock(&s_openpipesmutex);
792  m_childPid = 0;
793  throw Exception("fork", myerrno);
794  case 0: // child
795  // put the ends in the right place
796  if (-1 != fds[2]) {
797  // pair of pipes
798  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
799  myerrno = errno;
800  pthread_mutex_unlock(&s_openpipesmutex);
801  throw Exception("close", myerrno);
802  }
803  fds[0] = fds[3] = -1;
804  m_outpipe = fds[1];
805  m_inpipe = fds[2];
806  } else {
807  // socket pair
808  if (-1 == ::close(fds[0])) {
809  myerrno = errno;
810  pthread_mutex_unlock(&s_openpipesmutex);
811  throw Exception("close", myerrno);
812  }
813  fds[0] = -1;
814  m_inpipe = m_outpipe = fds[1];
815  }
816  // close other pipes our parent may have open - we have no business
817  // reading from/writing to those...
818  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
819  s_openpipes.end() != it; ) {
820  BidirMMapPipe* p = *it;
821  it = s_openpipes.erase(it);
822  p->doClose(true, true);
823  }
824  pagepool().zap(m_pages);
825  s_pagepoolrefcnt = 0;
826  delete s_pagepool;
827  s_pagepool = 0;
828  s_openpipes.push_front(this);
829  pthread_mutex_unlock(&s_openpipesmutex);
830  // ok, put our pages on freelist
832  // handshare with other end (to make sure it's alive)...
833  c = 'C'; // ...hild
834  if (1 != xferraw(m_outpipe, &c, 1, ::write))
835  throw Exception("handshake: xferraw write", EPIPE);
836  if (1 != xferraw(m_inpipe, &c, 1, ::read))
837  throw Exception("handshake: xferraw read", EPIPE);
838  if ('P' != c) throw Exception("handshake", EPIPE);
839  break;
840  default: // parent
841  // put the ends in the right place
842  if (-1 != fds[2]) {
843  // pair of pipes
844  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
845  myerrno = errno;
846  pthread_mutex_unlock(&s_openpipesmutex);
847  throw Exception("close", myerrno);
848  }
849  fds[1] = fds[2] = -1;
850  m_outpipe = fds[3];
851  m_inpipe = fds[0];
852  } else {
853  // socketpair
854  if (-1 == ::close(fds[1])) {
855  myerrno = errno;
856  pthread_mutex_unlock(&s_openpipesmutex);
857  throw Exception("close", myerrno);
858  }
859  fds[1] = -1;
860  m_inpipe = m_outpipe = fds[0];
861  }
862  // put on list of open pipes (so we can kill child processes
863  // if things go wrong)
864  s_openpipes.push_front(this);
865  pthread_mutex_unlock(&s_openpipesmutex);
866  // ok, put our pages on freelist
867  m_freelist = m_pages[0u];
868  // handshare with other end (to make sure it's alive)...
869  c = 'P'; // ...arent
870  if (1 != xferraw(m_outpipe, &c, 1, ::write))
871  throw Exception("handshake: xferraw write", EPIPE);
872  if (1 != xferraw(m_inpipe, &c, 1, ::read))
873  throw Exception("handshake: xferraw read", EPIPE);
874  if ('C' != c) throw Exception("handshake", EPIPE);
875  break;
876  }
877  // mark file descriptors for close on exec (we do not want to leak the
878  // connection to anything we happen to exec)
879  int fdflags = 0;
880  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
881  throw Exception("fcntl", errno);
882  fdflags |= FD_CLOEXEC;
883  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
884  throw Exception("fcntl", errno);
885  if (m_inpipe != m_outpipe) {
886  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
887  throw Exception("fcntl", errno);
888  fdflags |= FD_CLOEXEC;
889  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
890  throw Exception("fcntl", errno);
891  }
892  // ok, finally, clear the failbit
893  m_flags &= ~failbit;
894  // all done
895  } catch (BidirMMapPipe::Exception&) {
896  if (0 != m_childPid) kill(m_childPid, SIGTERM);
897  for (int i = 0; i < 4; ++i)
898  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
899  {
900  // free resources associated with mmapped pages
902  }
903  if (!--s_pagepoolrefcnt) {
904  delete s_pagepool;
905  s_pagepool = 0;
906  }
907  throw;
908  }
909 }
910 
912 {
913  assert(!(m_flags & failbit));
914  return doClose(false);
915 }
916 
917 int BidirMMapPipe::doClose(bool force, bool holdlock)
918 {
919  if (m_flags & failbit) return 0;
920  // flush data to be written
921  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
922  // shut down the write direction (no more writes from our side)
923  if (m_inpipe == m_outpipe) {
924  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
925  throw Exception("shutdown", errno);
926  m_outpipe = -1;
927  } else {
928  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
929  if (!force) throw Exception("close", errno);
930  m_outpipe = -1;
931  }
932  // shut down the write direction (no more writes from our side)
933  // drain anything the other end might still want to send
934  if (!force && -1 != m_inpipe) {
935  // **************** THIS IS EXTREMELY UGLY: ****************
936  // POLLHUP is not set reliably on pipe/socket shutdown on all
937  // platforms, unfortunately, so we poll for readability here until
938  // the other end closes, too
939  //
940  // the read loop below ensures that the other end sees the POLLIN that
941  // is set on shutdown instead, and goes ahead to close its end
942  //
943  // if we don't do this, and close straight away, the other end
944  // will catch a SIGPIPE or similar, and we don't want that
945  int err;
946  struct pollfd fds;
947  fds.fd = m_inpipe;
948  fds.events = POLLIN;
949  fds.revents = 0;
950  do {
951  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
952  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
953  if (fds.revents & POLLIN) {
954  char c;
955  if (1 > ::read(m_inpipe, &c, 1)) break;
956  }
957  }
958  } while (0 > err && EINTR == errno);
959  // ignore all other poll errors
960  }
961  // close read end
962  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
963  if (!force) throw Exception("close", errno);
964  m_inpipe = -1;
965  // unmap memory
966  try {
968  if (!--s_pagepoolrefcnt) {
969  delete s_pagepool;
970  s_pagepool = 0;
971  }
972  } catch (std::exception&) {
973  if (!force) throw;
974  }
976  // wait for child process
977  int retVal = 0;
978  if (isParent()) {
979  int tmp;
980  do {
981  tmp = waitpid(m_childPid, &retVal, 0);
982  } while (-1 == tmp && EINTR == errno);
983  if (-1 == tmp)
984  if (!force) throw Exception("waitpid", errno);
985  m_childPid = 0;
986  }
987  // remove from list of open pipes
988  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
989  std::list<BidirMMapPipe*>::iterator it = std::find(
990  s_openpipes.begin(), s_openpipes.end(), this);
991  if (s_openpipes.end() != it) s_openpipes.erase(it);
992  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
993  m_flags |= failbit;
994  return retVal;
995 }
996 
998 { doClose(false); }
999 
1001  int fd, void* addr, size_type len,
1002  ssize_t (*xferfn)(int, void*, std::size_t))
1003 {
1004  size_type xferred = 0;
1005  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1006  while (len) {
1007  ssize_t tmp = xferfn(fd, buf, len);
1008  if (tmp > 0) {
1009  xferred += tmp;
1010  len -= tmp;
1011  buf += tmp;
1012  continue;
1013  } else if (0 == tmp) {
1014  // check for end-of-file on pipe
1015  break;
1016  } else if (-1 == tmp) {
1017  // ok some error occurred, so figure out if we want to retry of throw
1018  switch (errno) {
1019  default:
1020  // if anything was transferred, return number of bytes
1021  // transferred so far, we can start throwing on the next
1022  // transfer...
1023  if (xferred) return xferred;
1024  // else throw
1025  throw Exception("xferraw", errno);
1026  case EAGAIN: // fallthrough intended
1027 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1028  case EWOULDBLOCK: // fallthrough intended
1029 #endif
1030  std::cerr << " ERROR: In " << __func__ << " (" <<
1031  __FILE__ << ", line " << __LINE__ <<
1032  "): expect transfer to block!" << std::endl;
1033  case EINTR:
1034  break;
1035  }
1036  continue;
1037  } else {
1038  throw Exception("xferraw: unexpected return value from read/write",
1039  errno);
1040  }
1041  }
1042  return xferred;
1043 }
1044 
1046 {
1047  if (plist) {
1048  unsigned char pg = m_pages[plist];
1049  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1052  // ok, have to copy pages through pipe
1053  for (Page* p = plist; p; p = p->next()) {
1054  if (sizeof(Page) + p->size() !=
1055  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1056  ::write)) {
1057  throw Exception("sendpages: short write", EPIPE);
1058  }
1059  }
1060  }
1061  } else {
1062  throw Exception("sendpages: short write", EPIPE);
1063  }
1064  } else { assert(plist); }
1065 }
1066 
1068 {
1069  unsigned char pg;
1070  unsigned retVal = 0;
1071  Page *plisthead = 0, *plisttail = 0;
1072  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1073  plisthead = plisttail = m_pages[pg];
1074  // ok, have number of pages
1077  // ok, need to copy pages through pipe
1078  for (; plisttail; ++retVal) {
1079  Page* p = plisttail;
1080  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1081  ::read)) {
1082  plisttail = p->next();
1083  if (!p->size()) continue;
1084  // break in case of read error
1085  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1086  ::read)) break;
1087  }
1088  }
1089  } else {
1090  retVal = lenPageList(plisthead);
1091  }
1092  }
1093  // put list of pages we just received into correct lists (busy/free)
1094  if (plisthead) feedPageLists(plisthead);
1095  // ok, retVal contains the number of pages read, so put them on the
1096  // correct lists
1097  return retVal;
1098 }
1099 
1101 {
1102  struct pollfd fds;
1103  fds.fd = m_inpipe;
1104  fds.events = POLLIN;
1105  fds.revents = 0;
1106  unsigned retVal = 0;
1107  do {
1108  int rc = ::poll(&fds, 1, 0);
1109  if (0 > rc) {
1110  if (EINTR == errno) continue;
1111  break;
1112  }
1113  if (1 == retVal && fds.revents & POLLIN &&
1114  !(fds.revents & (POLLNVAL | POLLERR))) {
1115  // ok, we can read without blocking, so the other end has
1116  // something for us
1117  return recvpages();
1118  } else {
1119  break;
1120  }
1121  } while (true);
1122  return retVal;
1123 }
1124 
1126 {
1127  unsigned n = 0;
1128  for ( ; p; p = p->next()) ++n;
1129  return n;
1130 }
1131 
1133 {
1134  assert(plist);
1135  // get end of busy list
1136  Page *blend = m_busylist;
1137  while (blend && blend->next()) blend = blend->next();
1138  // ok, might have to send free pages to other end, and (if we do have to
1139  // send something to the other end) while we're at it, send any dirty
1140  // pages which are completely full, too
1141  Page *sendlisthead = 0, *sendlisttail = 0;
1142  // loop over plist
1143  while (plist) {
1144  Page* p = plist;
1145  plist = p->next();
1146  p->setNext(0);
1147  if (p->size()) {
1148  // busy page...
1149  p->pos() = 0;
1150  // put at end of busy list
1151  if (blend) blend->setNext(p);
1152  else m_busylist = p;
1153  blend = p;
1154  } else {
1155  // free page...
1156  // Very simple algorithm: once we're done with a page, we send it back
1157  // where it came from. If it's from our end, we put it on the free list, if
1158  // it's from the other end, we send it back.
1159  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1160  (isChild() && m_pages[p] < PagesPerEnd)) {
1161  // page "belongs" to other end
1162  if (!sendlisthead) sendlisthead = p;
1163  if (sendlisttail) sendlisttail->setNext(p);
1164  sendlisttail = p;
1165  } else {
1166  // add page to freelist
1167  p->setNext(m_freelist);
1168  m_freelist = p;
1169  }
1170  }
1171  }
1172  // check if we have to send stuff to the other end
1173  if (sendlisthead) {
1174  // go through our list of dirty pages, and see what we can
1175  // send along
1176  Page* dp;
1177  while ((dp = m_dirtylist) && dp->full()) {
1178  Page* p = dp;
1179  // move head of dirty list
1180  m_dirtylist = p->next();
1181  // queue for sending
1182  p->setNext(0);
1183  sendlisttail->setNext(p);
1184  sendlisttail = p;
1185  }
1186  // poll if the other end is still alive - this needs that we first
1187  // close the write pipe of the other end when the remote end of the
1188  // connection is shutting down in doClose; we'll see that because we
1189  // get a POLLHUP on our inpipe
1190  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1191  struct pollfd fds[2];
1192  fds[0].fd = m_outpipe;
1193  fds[0].events = fds[0].revents = 0;
1194  if (m_outpipe != m_inpipe) {
1195  fds[1].fd = m_inpipe;
1196  fds[1].events = fds[1].revents = 0;
1197  } else {
1198  fds[0].events |= POLLIN;
1199  }
1200  int retVal = 0;
1201  do {
1202  retVal = ::poll(fds, nfds, 0);
1203  if (0 > retVal && EINTR == errno)
1204  continue;
1205  break;
1206  } while (true);
1207  if (0 <= retVal) {
1208  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1209  if (m_outpipe != m_inpipe) {
1210  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1211  } else {
1212  if (ok && fds[0].revents & POLLIN) {
1213  unsigned ret = recvpages();
1214  if (!ret) ok = false;
1215  }
1216  }
1217 
1218  if (ok) sendpages(sendlisthead);
1219  // (if the pipe is dead already, we don't care that we leak the
1220  // contents of the pages on the send list here, so that is why
1221  // there's no else clause here)
1222  } else {
1223  throw Exception("feedPageLists: poll", errno);
1224  }
1225  }
1226 }
1227 
1229 {
1230  assert(p);
1231  assert(p == m_freelist);
1232  // remove from freelist
1233  m_freelist = p->next();
1234  p->setNext(0);
1235  // append to dirty list
1236  Page* dl = m_dirtylist;
1237  while (dl && dl->next()) dl = dl->next();
1238  if (dl) dl->setNext(p);
1239  else m_dirtylist = p;
1240 }
1241 
1243 {
1244  // queue any pages available for reading we can without blocking
1246  Page* p;
1247  // if there are no busy pages, try to get them from the other end,
1248  // block if we have to...
1249  while (!(p = m_busylist)) if (!recvpages()) return 0;
1250  return p;
1251 }
1252 
1254 {
1255  // queue any pages available for reading we can without blocking
1257  Page* p = m_dirtylist;
1258  // go to end of dirty list
1259  if (p) while (p->next()) p = p->next();
1260  if (!p || p->full()) {
1261  // need to append free page, so get one
1262  while (!(p = m_freelist)) if (!recvpages()) return 0;
1263  markPageDirty(p);
1264  }
1265  return p;
1266 }
1267 
1269 { return doFlush(true); }
1270 
1271 void BidirMMapPipe::doFlush(bool forcePartialPages)
1272 {
1273  assert(!(m_flags & failbit));
1274  // build a list of pages to flush
1275  Page *flushlisthead = 0, *flushlisttail = 0;
1276  while (m_dirtylist) {
1277  Page* p = m_dirtylist;
1278  if (!forcePartialPages && !p->full()) break;
1279  // remove dirty page from dirty list
1280  m_dirtylist = p->next();
1281  p->setNext(0);
1282  // and send it to other end
1283  if (!flushlisthead) flushlisthead = p;
1284  if (flushlisttail) flushlisttail->setNext(p);
1285  flushlisttail = p;
1286  }
1287  if (flushlisthead) sendpages(flushlisthead);
1288 }
1289 
1291 {
1292  assert(!(m_flags & failbit));
1293  // join busy and dirty lists
1294  {
1295  Page *l = m_busylist;
1296  while (l && l->next()) l = l->next();
1297  if (l) l->setNext(m_dirtylist);
1298  else m_busylist = m_dirtylist;
1299  }
1300  // empty busy and dirty pages
1301  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1302  // put them on the free list
1304  m_busylist = m_dirtylist = 0;
1305 }
1306 
1308 {
1309  // queue all pages waiting for consumption in the pipe before we give an
1310  // answer
1312  size_type retVal = 0;
1313  for (Page* p = m_busylist; p; p = p->next())
1314  retVal += p->size() - p->pos();
1315  return retVal;
1316 }
1317 
1319 {
1320  // queue all pages waiting for consumption in the pipe before we give an
1321  // answer
1323  // check if we could write to the pipe without blocking (we need to know
1324  // because we might need to check if flushing of dirty pages would block)
1325  bool couldwrite = false;
1326  {
1327  struct pollfd fds;
1328  fds.fd = m_outpipe;
1329  fds.events = POLLOUT;
1330  fds.revents = 0;
1331  int retVal = 0;
1332  do {
1333  retVal = ::poll(&fds, 1, 0);
1334  if (0 > retVal) {
1335  if (EINTR == errno) continue;
1336  throw Exception("bytesWritableNonBlocking: poll", errno);
1337  }
1338  if (1 == retVal && fds.revents & POLLOUT &&
1339  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1340  couldwrite = true;
1341  break;
1342  } while (true);
1343  }
1344  // ok, start counting bytes
1345  size_type retVal = 0;
1346  unsigned npages = 0;
1347  // go through the dirty list
1348  for (Page* p = m_dirtylist; p; p = p->next()) {
1349  ++npages;
1350  // if page only partially filled
1351  if (!p->full())
1352  retVal += p->free();
1353  if (npages >= FlushThresh && !couldwrite) break;
1354  }
1355  // go through the free list
1356  for (Page* p = m_freelist; p && (!m_dirtylist ||
1357  npages < FlushThresh || couldwrite); p = p->next()) {
1358  ++npages;
1359  retVal += Page::capacity();
1360  }
1361  return retVal;
1362 }
1363 
1365 {
1366  assert(!(m_flags & failbit));
1367  size_type nread = 0;
1368  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1369  try {
1370  while (sz) {
1371  // find next page to read from
1372  Page* p = busypage();
1373  if (!p) {
1374  m_flags |= eofbit;
1375  return nread;
1376  }
1377  unsigned char* pp = p->begin() + p->pos();
1378  size_type csz = std::min(size_type(p->remaining()), sz);
1379  std::copy(pp, pp + csz, ap);
1380  nread += csz;
1381  ap += csz;
1382  sz -= csz;
1383  p->pos() += csz;
1384  assert(p->size() >= p->pos());
1385  if (p->size() == p->pos()) {
1386  // if no unread data remains, page is free
1387  m_busylist = p->next();
1388  p->setNext(0);
1389  p->size() = 0;
1390  feedPageLists(p);
1391  }
1392  }
1393  } catch (Exception&) {
1394  m_flags |= rderrbit;
1395  if (m_flags & exceptionsbit) throw;
1396  }
1397  return nread;
1398 }
1399 
1401 {
1402  assert(!(m_flags & failbit));
1403  size_type written = 0;
1404  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1405  try {
1406  while (sz) {
1407  // find next page to write to
1408  Page* p = dirtypage();
1409  if (!p) {
1410  m_flags |= eofbit;
1411  return written;
1412  }
1413  unsigned char* pp = p->begin() + p->size();
1414  size_type csz = std::min(size_type(p->free()), sz);
1415  std::copy(ap, ap + csz, pp);
1416  written += csz;
1417  ap += csz;
1418  p->size() += csz;
1419  sz -= csz;
1420  assert(p->capacity() >= p->size());
1421  if (p->full()) {
1422  // if page is full, see if we're above the flush threshold of
1423  // 3/4 of our pages
1425  doFlush(false);
1426  }
1427  }
1428  } catch (Exception&) {
1429  m_flags |= wrerrbit;
1430  if (m_flags & exceptionsbit) throw;
1431  }
1432  return written;
1433 }
1434 
1436 {
1437  // go through pipes, and change flags where we already know without really
1438  // polling - stuff where we don't need poll to wait for its timeout in the
1439  // OS...
1440  bool canskiptimeout = false;
1441  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1442  std::vector<unsigned>::iterator mit = masks.begin();
1443  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1444  ++it, ++mit) {
1445  PollEntry& pe = *it;
1446  pe.revents = None;
1447  // null pipe pointer or closed pipe is invalid
1448  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1449  // check for error
1450  if (pe.pipe->bad()) pe.revents |= Error;
1451  // check for end of file
1452  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1453  // check if readable
1454  if (pe.events & Readable) {
1455  *mit |= Readable;
1456  if (pe.pipe->m_busylist) pe.revents |= Readable;
1457  }
1458  // check if writable
1459  if (pe.events & Writable) {
1460  *mit |= Writable;
1461  if (pe.pipe->m_freelist) {
1462  pe.revents |= Writable;
1463  } else {
1464  Page *dl = pe.pipe->m_dirtylist;
1465  while (dl && dl->next()) dl = dl->next();
1466  if (dl && dl->pos() < Page::capacity())
1467  pe.revents |= Writable;
1468  }
1469  }
1470  if (pe.revents) canskiptimeout = true;
1471  }
1472  // set up the data structures required for the poll syscall
1473  std::vector<pollfd> fds;
1474  fds.reserve(2 * pipes.size());
1475  std::map<int, PollEntry*> fds2pipes;
1476  for (PollVector::const_iterator it = pipes.begin();
1477  pipes.end() != it; ++it) {
1478  const PollEntry& pe = *it;
1479  struct pollfd tmp;
1480  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1481  const_cast<PollEntry*>(&pe)));
1482  tmp.events = tmp.revents = 0;
1483  // we always poll for readability; this allows us to queue pages
1484  // early
1485  tmp.events |= POLLIN;
1486  if (pe.pipe->m_outpipe != tmp.fd) {
1487  // ok, it's a pair of pipes
1488  fds.push_back(tmp);
1489  fds2pipes.insert(std::make_pair(
1490  unsigned(tmp.fd = pe.pipe->m_outpipe),
1491  const_cast<PollEntry*>(&pe)));
1492  tmp.events = 0;
1493 
1494  }
1495  if (pe.events & Writable) tmp.events |= POLLOUT;
1496  fds.push_back(tmp);
1497  }
1498  // poll
1499  int retVal = 0;
1500  do {
1501  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1502  if (0 > retVal) {
1503  if (EINTR == errno) continue;
1504  throw Exception("poll", errno);
1505  }
1506  break;
1507  } while (true);
1508  // fds may have changed state, so update...
1509  for (std::vector<pollfd>::iterator it = fds.begin();
1510  fds.end() != it; ++it) {
1511  pollfd& fe = *it;
1512  //if (!fe.revents) continue;
1513  --retVal;
1514  PollEntry& pe = *fds2pipes[fe.fd];
1515 oncemore:
1516  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1517  pe.revents |= ReadInvalid;
1518  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1519  pe.revents |= WriteInvalid;
1520  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1521  pe.revents |= ReadError;
1522  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1523  pe.revents |= WriteError;
1524  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1525  pe.revents |= ReadEndOfFile;
1526  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1527  pe.revents |= WriteEndOfFile;
1528  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1529  !(fe.revents & (POLLNVAL | POLLERR))) {
1530  // ok, there is at least one page for us to receive from the
1531  // other end
1532  if (0 == pe.pipe->recvpages()) continue;
1533  // more pages there?
1534  do {
1535  int tmp = ::poll(&fe, 1, 0);
1536  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1537  if (0 > tmp) {
1538  if (EINTR == errno) continue;
1539  throw Exception("poll", errno);
1540  }
1541  break;
1542  } while (true);
1543  }
1544  if (pe.pipe->m_busylist) pe.revents |= Readable;
1545  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1546  if (pe.pipe->m_freelist) {
1547  pe.revents |= Writable;
1548  } else {
1549  Page *dl = pe.pipe->m_dirtylist;
1550  while (dl && dl->next()) dl = dl->next();
1551  if (dl && dl->pos() < Page::capacity())
1552  pe.revents |= Writable;
1553  }
1554  }
1555  }
1556  // apply correct masks, and count pipes with pending events
1557  int npipes = 0;
1558  mit = masks.begin();
1559  for (PollVector::iterator it = pipes.begin();
1560  pipes.end() != it; ++it, ++mit)
1561  if ((it->revents &= *mit)) ++npipes;
1562  return npipes;
1563 }
1564 
1566 {
1567  size_t sz = std::strlen(str);
1568  *this << sz;
1569  if (sz) write(str, sz);
1570  return *this;
1571 }
1572 
1574 {
1575  size_t sz = 0;
1576  *this >> sz;
1577  if (good() && !eof()) {
1578  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1579  if (!str) throw Exception("realloc", errno);
1580  if (sz) read(str, sz);
1581  str[sz] = 0;
1582  }
1583  return *this;
1584 }
1585 
1587 {
1588  size_t sz = str.size();
1589  *this << sz;
1590  write(str.data(), sz);
1591  return *this;
1592 }
1593 
1595 {
1596  str.clear();
1597  size_t sz = 0;
1598  *this >> sz;
1599  if (good() && !eof()) {
1600  str.reserve(sz);
1601  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1602  }
1603  return *this;
1604 }
1605 
1607 
1608 #ifdef TEST_BIDIRMMAPPIPE
1609 using namespace RooFit;
1610 
1611 int simplechild(BidirMMapPipe& pipe)
1612 {
1613  // child does an echo loop
1614  while (pipe.good() && !pipe.eof()) {
1615  // read a string
1616  std::string str;
1617  pipe >> str;
1618  if (!pipe) return -1;
1619  if (pipe.eof()) break;
1620  if (!str.empty()) {
1621  std::cout << "[CHILD] : read: " << str << std::endl;
1622  str = "... early in the morning?";
1623  }
1624  pipe << str << BidirMMapPipe::flush;
1625  // did our parent tell us to shut down?
1626  if (str.empty()) break;
1627  if (!pipe) return -1;
1628  if (pipe.eof()) break;
1629  std::cout << "[CHILD] : wrote: " << str << std::endl;
1630  }
1631  pipe.close();
1632  return 0;
1633 }
1634 
1635 #include <sstream>
1636 int randomchild(BidirMMapPipe& pipe)
1637 {
1638  // child sends out something at random intervals
1639  ::srand48(::getpid());
1640  {
1641  // wait for parent's go ahead signal
1642  std::string s;
1643  pipe >> s;
1644  }
1645  // no shutdown sequence needed on this side - we're producing the data,
1646  // and the parent can just read until we're done (when it'll get EOF)
1647  for (int i = 0; i < 5; ++i) {
1648  // sleep a random time between 0 and .9 seconds
1649  ::usleep(int(1e6 * ::drand48()));
1650  std::ostringstream buf;
1651  buf << "child pid " << ::getpid() << " sends message " << i;
1652  std::string str = buf.str();
1653  std::cout << "[CHILD] : " << str << std::endl;
1654  pipe << str << BidirMMapPipe::flush;
1655  if (!pipe) return -1;
1656  if (pipe.eof()) break;
1657  }
1658  // tell parent we're shutting down
1659  pipe << "" << BidirMMapPipe::flush;
1660  // wait for parent to acknowledge
1661  std::string s;
1662  pipe >> s;
1663  pipe.close();
1664  return 0;
1665 }
1666 
1667 int benchchildrtt(BidirMMapPipe& pipe)
1668 {
1669  // child does the equivalent of listening for pings and sending the
1670  // packet back
1671  char* str = 0;
1672  while (pipe && !pipe.eof()) {
1673  pipe >> str;
1674  if (!pipe) {
1675  std::free(str);
1676  pipe.close();
1677  return -1;
1678  }
1679  if (pipe.eof()) break;
1680  pipe << str << BidirMMapPipe::flush;
1681  // if we have just completed the shutdown handshake, we break here
1682  if (!std::strlen(str)) break;
1683  }
1684  std::free(str);
1685  pipe.close();
1686  return 0;
1687 }
1688 
1689 int benchchildsink(BidirMMapPipe& pipe)
1690 {
1691  // child behaves like a sink
1692  char* str = 0;
1693  while (pipe && !pipe.eof()) {
1694  pipe >> str;
1695  if (!std::strlen(str)) break;
1696  }
1697  pipe << "" << BidirMMapPipe::flush;
1698  std::free(str);
1699  pipe.close();
1700  return 0;
1701 }
1702 
1703 int benchchildsource(BidirMMapPipe& pipe)
1704 {
1705  // child behaves like a source
1706  char* str = 0;
1707  for (unsigned i = 0; i <= 24; ++i) {
1708  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1709  std::memset(str, '4', 1 << i);
1710  str[1 << i] = 0;
1711  for (unsigned j = 0; j < 1 << 7; ++j) {
1712  pipe << str;
1713  if (!pipe || pipe.eof()) {
1714  std::free(str);
1715  pipe.close();
1716  return -1;
1717  }
1718  }
1719  // tell parent we're done with this block size
1720  pipe << "" << BidirMMapPipe::flush;
1721  }
1722  // tell parent to shut down
1723  pipe << "" << BidirMMapPipe::flush;
1724  std::free(str);
1725  pipe.close();
1726  return 0;
1727 }
1728 
1729 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1730 {
1731  // create a pipe with the given child at the remote end
1732  BidirMMapPipe *p = new BidirMMapPipe();
1733  if (p->isChild()) {
1734  int retVal = childexec(*p);
1735  delete p;
1736  std::exit(retVal);
1737  }
1738  return p;
1739 }
1740 
1741 #include <sys/time.h>
1742 #include <iomanip>
1743 int main()
1744 {
1745  // simple echo loop test
1746  {
1747  std::cout << "[PARENT]: simple challenge-response test, "
1748  "one child:" << std::endl;
1749  BidirMMapPipe* pipe = spawnChild(simplechild);
1750  for (int i = 0; i < 5; ++i) {
1751  std::string str("What shall we do with a drunken sailor...");
1752  *pipe << str << BidirMMapPipe::flush;
1753  if (!*pipe) return -1;
1754  std::cout << "[PARENT]: wrote: " << str << std::endl;
1755  *pipe >> str;
1756  if (!*pipe) return -1;
1757  std::cout << "[PARENT]: read: " << str << std::endl;
1758  }
1759  // send shutdown string
1760  *pipe << "" << BidirMMapPipe::flush;
1761  // wait for shutdown handshake
1762  std::string s;
1763  *pipe >> s;
1764  int retVal = pipe->close();
1765  std::cout << "[PARENT]: exit status of child: " << retVal <<
1766  std::endl;
1767  if (retVal) return retVal;
1768  delete pipe;
1769  }
1770  // simple poll test - children send 5 results in random intervals
1771  {
1772  unsigned nch = 20;
1773  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1774  " children:" << std::endl;
1775  typedef BidirMMapPipe::PollEntry PollEntry;
1776  // poll data structure
1778  pipes.reserve(nch);
1779  // spawn children
1780  for (unsigned i = 0; i < nch; ++i) {
1781  std::cout << "[PARENT]: spawning child " << i << std::endl;
1782  pipes.push_back(PollEntry(spawnChild(randomchild),
1784  }
1785  // wake children up
1786  std::cout << "[PARENT]: waking up children" << std::endl;
1787  for (unsigned i = 0; i < nch; ++i)
1788  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1789  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1790  // while at least some children alive
1791  while (!pipes.empty()) {
1792  // poll, wait until status change (infinite timeout)
1793  int npipes = BidirMMapPipe::poll(pipes, -1);
1794  // scan for pipes with changed status
1795  for (std::vector<PollEntry>::iterator it = pipes.begin();
1796  npipes && pipes.end() != it; ) {
1797  if (!it->revents) {
1798  // unchanged, next one
1799  ++it;
1800  continue;
1801  }
1802  --npipes; // maybe we can stop early...
1803  // read from pipes which are readable
1804  if (it->revents & BidirMMapPipe::Readable) {
1805  std::string s;
1806  *(it->pipe) >> s;
1807  if (!s.empty()) {
1808  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1809  ": " << s << std::endl;
1810  ++it;
1811  continue;
1812  } else {
1813  // child is shutting down...
1814  *(it->pipe) << "" << BidirMMapPipe::flush;
1815  goto childcloses;
1816  }
1817  }
1818  // retire pipes with error or end-of-file condition
1819  if (it->revents & (BidirMMapPipe::Error |
1822  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1823  " revents" <<
1824  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1825  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1826  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1827  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1828  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1829  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1830  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1831  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1832  std::endl;
1833 childcloses:
1834  int retVal = it->pipe->close();
1835  std::cout << "[PARENT]: child exit status: " <<
1836  retVal << ", number of children still alive: " <<
1837  (pipes.size() - 1) << std::endl;
1838  if (retVal) return retVal;
1839  delete it->pipe;
1840  it = pipes.erase(it);
1841  continue;
1842  }
1843  }
1844  }
1845  }
1846  // little benchmark - round trip time
1847  {
1848  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1849  for (unsigned i = 0; i <= 24; ++i) {
1850  char *s = new char[1 + (1 << i)];
1851  std::memset(s, 'A', 1 << i);
1852  s[1 << i] = 0;
1853  const unsigned n = 1 << 7;
1854  double avg = 0., min = 1e42, max = -1e42;
1855  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1856  for (unsigned j = n; j--; ) {
1857  struct timeval t1;
1858  ::gettimeofday(&t1, 0);
1859  *pipe << s << BidirMMapPipe::flush;
1860  if (!*pipe || pipe->eof()) break;
1861  *pipe >> s;
1862  if (!*pipe || pipe->eof()) break;
1863  struct timeval t2;
1864  ::gettimeofday(&t2, 0);
1865  t2.tv_sec -= t1.tv_sec;
1866  t2.tv_usec -= t1.tv_usec;
1867  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1868  if (dt < min) min = dt;
1869  if (dt > max) max = dt;
1870  avg += dt;
1871  }
1872  // send a shutdown string
1873  *pipe << "" << BidirMMapPipe::flush;
1874  // get child's shutdown ok
1875  *pipe >> s;
1876  avg /= double(n);
1877  avg *= 1e6; min *= 1e6; max *= 1e6;
1878  int retVal = pipe->close();
1879  if (retVal) {
1880  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1881  delete[] s;
1882  return retVal;
1883  }
1884  delete pipe;
1885  // there is a factor 2 in the formula for the transfer rate below,
1886  // because we transfer data of twice the size of the block - once
1887  // to the child, and once for the return trip
1888  std::cout << "block size " << std::setw(9) << (1 << i) <<
1889  " avg " << std::setw(7) << avg << " us min " <<
1890  std::setw(7) << min << " us max " << std::setw(7) << max <<
1891  "us speed " << std::setw(9) <<
1892  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1893  " MB/s" << std::endl;
1894  delete[] s;
1895  }
1896  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1897  }
1898  // little benchmark - child as sink
1899  {
1900  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1901  for (unsigned i = 0; i <= 24; ++i) {
1902  char *s = new char[1 + (1 << i)];
1903  std::memset(s, 'A', 1 << i);
1904  s[1 << i] = 0;
1905  const unsigned n = 1 << 7;
1906  double avg = 0., min = 1e42, max = -1e42;
1907  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1908  for (unsigned j = n; j--; ) {
1909  struct timeval t1;
1910  ::gettimeofday(&t1, 0);
1911  // streaming mode - we do not flush here
1912  *pipe << s;
1913  if (!*pipe || pipe->eof()) break;
1914  struct timeval t2;
1915  ::gettimeofday(&t2, 0);
1916  t2.tv_sec -= t1.tv_sec;
1917  t2.tv_usec -= t1.tv_usec;
1918  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1919  if (dt < min) min = dt;
1920  if (dt > max) max = dt;
1921  avg += dt;
1922  }
1923  // send a shutdown string
1924  *pipe << "" << BidirMMapPipe::flush;
1925  // get child's shutdown ok
1926  *pipe >> s;
1927  avg /= double(n);
1928  avg *= 1e6; min *= 1e6; max *= 1e6;
1929  int retVal = pipe->close();
1930  if (retVal) {
1931  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1932  return retVal;
1933  }
1934  delete pipe;
1935  std::cout << "block size " << std::setw(9) << (1 << i) <<
1936  " avg " << std::setw(7) << avg << " us min " <<
1937  std::setw(7) << min << " us max " << std::setw(7) << max <<
1938  "us speed " << std::setw(9) <<
1939  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1940  " MB/s" << std::endl;
1941  delete[] s;
1942  }
1943  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1944  }
1945  // little benchmark - child as source
1946  {
1947  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1948  char *s = 0;
1949  double avg = 0., min = 1e42, max = -1e42;
1950  unsigned n = 0, bsz = 0;
1951  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1952  while (*pipe && !pipe->eof()) {
1953  struct timeval t1;
1954  ::gettimeofday(&t1, 0);
1955  // streaming mode - we do not flush here
1956  *pipe >> s;
1957  if (!*pipe || pipe->eof()) break;
1958  struct timeval t2;
1959  ::gettimeofday(&t2, 0);
1960  t2.tv_sec -= t1.tv_sec;
1961  t2.tv_usec -= t1.tv_usec;
1962  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1963  if (std::strlen(s)) {
1964  ++n;
1965  if (dt < min) min = dt;
1966  if (dt > max) max = dt;
1967  avg += dt;
1968  bsz = std::strlen(s);
1969  } else {
1970  if (!n) break;
1971  // next block size
1972  avg /= double(n);
1973  avg *= 1e6; min *= 1e6; max *= 1e6;
1974 
1975  std::cout << "block size " << std::setw(9) << bsz <<
1976  " avg " << std::setw(7) << avg << " us min " <<
1977  std::setw(7) << min << " us max " << std::setw(7) <<
1978  max << "us speed " << std::setw(9) <<
1979  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1980  " MB/s" << std::endl;
1981  n = 0;
1982  avg = 0.;
1983  min = 1e42;
1984  max = -1e42;
1985  }
1986  }
1987  int retVal = pipe->close();
1988  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1989  if (retVal) return retVal;
1990  delete pipe;
1991  std::free(s);
1992  }
1993  return 0;
1994 }
1995 #endif // TEST_BIDIRMMAPPIPE
1996 #endif // _WIN32
1997 
1998 // vim: ft=cpp:sw=4:tw=78:et
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
pages shared (child + parent)
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
end of file reached
unsigned revents
events that happened (or&#39;ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
bool empty() const
return true if no used page groups in this chunk
read pipe in end-of-file state
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
unsigned m_refcnt
reference counter
bool bad() const
true on I/O error
Page * m_freelist
linked list: free pages
write pipe in end-of-file state
fill
Definition: fit1_py.py:6
don&#39;t know yet what&#39;ll work
Definition: BidirMMapPipe.h:47
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or&#39;ed bitmask)
bool isChild() const
return if this end of the pipe is the child end
BidirMMapPipe & operator<<(const char *str)
write a C-style string
#define f(i)
Definition: RSha256.hxx:104
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
#define malloc
Definition: civetweb.c:1536
pid_t m_childPid
pid of the child (zero if we&#39;re child)
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
Page * page(unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:43
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
size_type read(void *addr, size_type sz)
read from pipe
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:89
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
void sendpages(Page *plist)
send page(s) to the other end (may block)
pipe can be written to
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:87
Page * busypage()
get a busy page to read data from (may block)
unsigned nPagesPerGroup() const
return number of pages per page group
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
int main(int argc, char **argv)
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:55
void swap(Pages &other)
swap with other&#39;s contents
static constexpr double s
Page * dirtypage()
get a dirty page to write data to (may block)
#define realloc
Definition: civetweb.c:1538
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
error reporting with exceptions
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
bool eof() const
true if end-of-file
static const char * what
Definition: stlLoader.cc:6
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
pages per pipe end
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
bool isParent() const
return if this end of the pipe is the parent end
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
logical failure (e.g. pipe closed)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
unsigned pageno(Page *p) const
perform page to page number mapping
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:46
void zap(Pages &p)
free all pages except for those pointed to by p
auto * t1
Definition: textangle.C:20
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
#define free
Definition: civetweb.c:1539
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
bool contains(const Pages &p) const
return if p is contained in this PageChunk
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
unsigned len() const
return length of chunk
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
unsigned npages() const
return number of pages accessible
auto * l
Definition: textangle.C:4
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
#define c(i)
Definition: RSha256.hxx:101
static unsigned long masks[]
Definition: gifencode.c:211
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
bool good() const
status of stream is good
nothing special on this pipe
bool closed() const
true if closed
const Int_t n
Definition: legend1.C:16
int m_flags
flags (e.g. end of file)
mmap doesn&#39;t work, have to copy back and forth
Definition: BidirMMapPipe.h:48
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:85
char name[80]
Definition: TGX11.cxx:109
#define BEGIN_NAMESPACE_ROOFIT