Logo ROOT  
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <cassert>
17 #include <iostream>
18 #include <algorithm>
19 #include <exception>
20 
21 #include <poll.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <unistd.h>
25 #include <pthread.h>
26 #include <sys/mman.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #include <sys/socket.h>
30 
31 #include "BidirMMapPipe.h"
32 
33 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
34 #define END_NAMESPACE_ROOFIT }
35 
37 
38 /// namespace for implementation details of BidirMMapPipe
39 namespace BidirMMapPipe_impl {
40  /** @brief exception to throw if low-level OS calls go wrong
41  *
42  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
43  * @date 2013-07-07
44  */
45  class BidirMMapPipeException : public std::exception
46  {
47  private:
48  enum {
49  s_sz = 256 ///< length of buffer
50  };
51  char m_buf[s_sz]; ///< buffer containing the error message
52 
53  /// for the POSIX version of strerror_r
54  static int dostrerror_r(int err, char* buf, std::size_t sz,
55  int (*f)(int, char*, std::size_t))
56  { return f(err, buf, sz); }
57  /// for the GNU version of strerror_r
58  static int dostrerror_r(int, char*, std::size_t,
59  char* (*f)(int, char*, std::size_t));
60  public:
61  /// constructor taking error code, hint on operation (msg)
62  BidirMMapPipeException(const char* msg, int err);
63  /// return a destcription of what went wrong
64  virtual const char* what() const noexcept { return m_buf; }
65  };
66 
67  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
68  {
69  std::size_t msgsz = std::strlen(msg);
70  if (msgsz) {
71  msgsz = std::min(msgsz, std::size_t(s_sz));
72  std::copy(msg, msg + msgsz, m_buf);
73  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
74  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
75  }
76  if (msgsz < s_sz) {
77  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
78  // have to sort it out with overloads
79  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
80  }
81  m_buf[s_sz - 1] = 0; // enforce zero-termination
82  }
83 
84  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
85  std::size_t sz, char* (*f)(int, char*, std::size_t))
86  {
87  buf[0] = 0;
88  char *tmp = f(err, buf, sz);
89  if (tmp && tmp != buf) {
90  std::strncpy(buf, tmp, sz);
91  buf[sz - 1] = 0;
92  if (std::strlen(tmp) > sz - 1) return ERANGE;
93  }
94  return 0;
95  }
96 
97  /** @brief class representing the header structure in an mmapped page
98  *
99  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
100  * @date 2013-07-07
101  *
102  * contains a field to put pages into a linked list, a field for the size
103  * of the data being transmitted, and a field for the position until which
104  * the data has been read
105  */
106  class Page
107  {
108  private:
109  // use as small a data type as possible to maximise payload area
110  // of pages
111  short m_next; ///< next page in list (in pagesizes)
112  unsigned short m_size; ///< size of payload (in bytes)
113  unsigned short m_pos; ///< index of next byte in payload area
114  /// copy construction forbidden
115  Page(const Page&) {}
116  /// assigment forbidden
117  Page& operator=(const Page&) = delete;
118  public:
119  /// constructor
120  Page() : m_next(0), m_size(0), m_pos(0)
121  {
122  // check that short is big enough - must be done at runtime
123  // because the page size is not known until runtime
124  assert(std::numeric_limits<unsigned short>::max() >=
126  }
127  /// set pointer to next page
128  void setNext(const Page* p);
129  /// return pointer to next page
130  Page* next() const;
131  /// return reference to size field
132  unsigned short& size() { return m_size; }
133  /// return size (of payload data)
134  unsigned size() const { return m_size; }
135  /// return reference to position field
136  unsigned short& pos() { return m_pos; }
137  /// return position
138  unsigned pos() const { return m_pos; }
139  /// return pointer to first byte in payload data area of page
140  inline unsigned char* begin() const
141  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
142  + sizeof(Page); }
143  /// return pointer to first byte in payload data area of page
144  inline unsigned char* end() const
145  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
146  + PageChunk::pagesize(); }
147  /// return the capacity of the page
148  static unsigned capacity()
149  { return PageChunk::pagesize() - sizeof(Page); }
150  /// true if page empty
151  bool empty() const { return !m_size; }
152  /// true if page partially filled
153  bool filled() const { return !empty(); }
154  /// free space left (to be written to)
155  unsigned free() const { return capacity() - m_size; }
156  /// bytes remaining to be read
157  unsigned remaining() const { return m_size - m_pos; }
158  /// true if page completely full
159  bool full() const { return !free(); }
160  };
161 
162  void Page::setNext(const Page* p)
163  {
164  if (!p) {
165  m_next = 0;
166  } else {
167  const char* p1 = reinterpret_cast<char*>(this);
168  const char* p2 = reinterpret_cast<const char*>(p);
169  std::ptrdiff_t tmp = p2 - p1;
170  // difference must be divisible by page size
171  assert(!(tmp % PageChunk::pagesize()));
172  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
173  m_next = tmp;
174  // no truncation when saving in a short
175  assert(m_next == tmp);
176  // final check: next() must return p
177  assert(next() == p);
178  }
179  }
180 
181  Page* Page::next() const
182  {
183  if (!m_next) return 0;
184  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
185  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
186  return reinterpret_cast<Page*>(ptmp);
187  }
188 
189  /** @brief class representing a page pool
190  *
191  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
192  * @date 2013-07-24
193  *
194  * pool of mmapped pages (on systems which support it, on all others, the
195  * functionality is emulated with dynamically allocated memory)
196  *
197  * in most operating systems there is a limit to how many mappings any one
198  * process is allowed to request; for this reason, we mmap a relatively
199  * large amount up front, and then carve off little pieces as we need them
200  *
201  * Moreover, some systems have too large a physical page size in their MMU
202  * for the code to handle (we want offsets and lengths to fit into 16
203  * bits), so we carve such big physical pages into smaller logical Pages
204  * if needed. The largest logical page size is currently 16 KiB.
205  */
206  class PagePool {
207  private:
208  /// convenience typedef
209  typedef BidirMMapPipeException Exception;
210 
211  enum {
212  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
213  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
214  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
215  };
216  /// a chunk of memory in the pool
217  typedef BidirMMapPipe_impl::PageChunk Chunk;
218  /// list of chunks
219  typedef std::list<Chunk*> ChunkList;
220 
221  friend class BidirMMapPipe_impl::PageChunk;
222  public:
223  /// convenience typedef
225  /// constructor
226  PagePool(unsigned nPagesPerGroup);
227  /// destructor
228  ~PagePool();
229  /// pop a free element out of the pool
230  Pages pop();
231 
232  /// return (logical) page size of the system
233  static unsigned pagesize() { return PageChunk::pagesize(); }
234  /// return variety of mmap supported on the system
235  static MMapVariety mmapVariety()
236  { return PageChunk::mmapVariety(); }
237 
238  /// return number of pages per group (ie. as returned by pop())
239  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
240 
241  /// zap the pool (unmap all but Pages p)
242  void zap(Pages& p);
243 
244  private:
245  /// list of chunks used by the pool
246  ChunkList m_chunks;
247  /// list of chunks used by the pool which are not full
248  ChunkList m_freelist;
249  /// chunk size map (histogram of chunk sizes)
250  unsigned m_szmap[(maxsz - minsz) / szincr];
251  /// current chunk size
252  int m_cursz;
253  /// page group size
254  unsigned m_nPgPerGrp;
255 
256  /// adjust _cursz to current largest block
257  void updateCurSz(int sz, int incr);
258  /// find size of next chunk to allocate (in a hopefully smart way)
259  int nextChunkSz() const;
260  /// release a chunk
261  void putOnFreeList(Chunk* chunk);
262  /// release a chunk
263  void release(Chunk* chunk);
264  };
265 
266  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
267  m_pimpl(new impl)
268  {
269  assert(npg < 256);
270  m_pimpl->m_parent = parent;
271  m_pimpl->m_pages = pages;
272  m_pimpl->m_refcnt = 1;
273  m_pimpl->m_npages = npg;
274  /// initialise pages
275  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
276  }
277 
279  unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
281 
283  {
284  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
285  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
286  delete m_pimpl;
287  }
288  }
289 
290  Pages::Pages(const Pages& other) :
291  m_pimpl(other.m_pimpl)
292  { ++(m_pimpl->m_refcnt); }
293 
294  Pages& Pages::operator=(const Pages& other)
295  {
296  if (&other == this) return *this;
297  if (--(m_pimpl->m_refcnt)) {
298  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
299  delete m_pimpl;
300  }
301  m_pimpl = other.m_pimpl;
302  ++(m_pimpl->m_refcnt);
303  return *this;
304  }
305 
306  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
307 
308  Page* Pages::page(unsigned pgno) const
309  {
310  assert(pgno < m_pimpl->m_npages);
311  unsigned char* pptr =
312  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
313  pptr += pgno * pagesize();
314  return reinterpret_cast<Page*>(pptr);
315  }
316 
317  unsigned Pages::pageno(Page* p) const
318  {
319  const unsigned char* pptr =
320  reinterpret_cast<const unsigned char*>(p);
321  const unsigned char* bptr =
322  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
323  assert(0 == ((pptr - bptr) % pagesize()));
324  const unsigned nr = (pptr - bptr) / pagesize();
325  assert(nr < m_pimpl->m_npages);
326  return nr;
327  }
328 
330  {
331  // find out page size of system
332  long pgsz = sysconf(_SC_PAGESIZE);
333  if (-1 == pgsz) throw Exception("sysconf", errno);
334  if (pgsz > 512 && pgsz > long(sizeof(Page)))
335  return pgsz;
336 
337  // in case of failure or implausible value, use a safe default: 4k
338  // page size, and do not try to mmap
339  s_mmapworks = Copy;
340  return 1 << 12;
341  }
342 
343  PageChunk::PageChunk(PagePool* parent,
344  unsigned length, unsigned nPgPerGroup) :
345  m_begin(dommap(length)),
346  m_end(reinterpret_cast<void*>(
347  reinterpret_cast<unsigned char*>(m_begin) + length)),
348  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
349  {
350  // ok, push groups of pages onto freelist here
351  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
352  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
353  while (p < pend) {
354  m_freelist.push_back(reinterpret_cast<void*>(p));
355  p += nPgPerGroup * PagePool::pagesize();
356  }
357  }
358 
360  {
361  if (m_parent) assert(empty());
362  if (m_begin) domunmap(m_begin, len());
363  }
364 
365  bool PageChunk::contains(const Pages& p) const
366  { return p.m_pimpl->m_parent == this; }
367 
369  {
370  assert(!m_freelist.empty());
371  void* p = m_freelist.front();
372  m_freelist.pop_front();
373  ++m_nUsedGrp;
374  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
375  }
376 
377  void PageChunk::push(const Pages& p)
378  {
379  assert(contains(p));
380  bool wasempty = m_freelist.empty();
381  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
382  --m_nUsedGrp;
383  if (m_parent) {
384  // notify parent if we need to be put on the free list again
385  if (wasempty) m_parent->putOnFreeList(this);
386  // notify parent if we're empty
387  if (empty()) return m_parent->release(this);
388  }
389  }
390 
391  void* PageChunk::dommap(unsigned len)
392  {
393  assert(len && 0 == (len % s_physpgsz));
394  // ok, the idea here is to try the different methods of mmapping, and
395  // choose the first one that works. we have four flavours:
396  // 1 - anonymous mmap (best)
397  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
398  // bit more tedious to set up, since you need to open/close a
399  // device file)
400  // 3 - mmap of a temporary file (very tedious to set up - need to
401  // create a temporary file, delete it, make the underlying storage
402  // large enough, then mmap the fd and close it)
403  // 4 - if all those fail, we malloc the buffers, and copy the data
404  // through the OS (then we're no better than normal pipes)
405  static bool msgprinted = false;
406  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
407 #if defined(MAP_ANONYMOUS)
408 #undef MYANONFLAG
409 #define MYANONFLAG MAP_ANONYMOUS
410 #elif defined(MAP_ANON)
411 #undef MYANONFLAG
412 #define MYANONFLAG MAP_ANON
413 #else
414 #undef MYANONFLAG
415 #endif
416 #ifdef MYANONFLAG
417  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
418  MYANONFLAG | MAP_SHARED, -1, 0);
419  if (MAP_FAILED == retVal) {
420  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
421  } else {
422  assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
424  if (BidirMMapPipe::debugflag() && !msgprinted) {
425  std::cerr << " INFO: In " << __func__ << " (" <<
426  __FILE__ << ", line " << __LINE__ <<
427  "): anonymous mmapping works, excellent!" <<
428  std::endl;
429  msgprinted = true;
430  }
431  return retVal;
432  }
433 #endif
434 #undef MYANONFLAG
435  }
436  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
437  // ok, no anonymous mappings supported directly, so try to map
438  // /dev/zero which has much the same effect on many systems
439  int fd = ::open("/dev/zero", O_RDWR);
440  if (-1 == fd)
441  throw Exception("open /dev/zero", errno);
442  void* retVal = ::mmap(0, len,
443  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
444  if (MAP_FAILED == retVal) {
445  int errsv = errno;
446  ::close(fd);
447  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
448  } else {
449  assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
451  }
452  if (-1 == ::close(fd))
453  throw Exception("close /dev/zero", errno);
454  if (BidirMMapPipe::debugflag() && !msgprinted) {
455  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
456  ", line " << __LINE__ << "): mmapping /dev/zero works, "
457  "very good!" << std::endl;
458  msgprinted = true;
459  }
460  return retVal;
461  }
462  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
463  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
464  int fd;
465  // open temp file
466  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
467  // remove it, but keep fd open
468  if (-1 == ::unlink(name)) {
469  int errsv = errno;
470  ::close(fd);
471  throw Exception("unlink", errsv);
472  }
473  // make it the right size: lseek
474  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
475  int errsv = errno;
476  ::close(fd);
477  throw Exception("lseek", errsv);
478  }
479  // make it the right size: write a byte
480  if (1 != ::write(fd, name, 1)) {
481  int errsv = errno;
482  ::close(fd);
483  throw Exception("write", errsv);
484  }
485  // do mmap
486  void* retVal = ::mmap(0, len,
487  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
488  if (MAP_FAILED == retVal) {
489  int errsv = errno;
490  ::close(fd);
491  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
492  } else {
493  assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
495  }
496  if (-1 == ::close(fd)) {
497  int errsv = errno;
498  ::munmap(retVal, len);
499  throw Exception("close", errsv);
500  }
501  if (BidirMMapPipe::debugflag() && !msgprinted) {
502  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
503  ", line " << __LINE__ << "): mmapping temporary files "
504  "works, good!" << std::endl;
505  msgprinted = true;
506  }
507  return retVal;
508  }
509  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
510  // fallback solution: mmap does not work on this OS (or does not
511  // work for what we want to use it), so use a normal buffer of
512  // memory instead, and collect data in that buffer - this needs an
513  // additional write/read to/from the pipe(s), but there you go...
514  if (BidirMMapPipe::debugflag() && !msgprinted) {
515  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
516  ", line " << __LINE__ << "): anonymous mmapping of "
517  "shared buffers failed, falling back to read/write on "
518  " pipes!" << std::endl;
519  msgprinted = true;
520  }
521  s_mmapworks = Copy;
522  void* retVal = std::malloc(len);
523  if (!retVal) throw Exception("malloc", errno);
524  return retVal;
525  }
526  // should never get here
527  assert(false);
528  return 0;
529  }
530 
531  void PageChunk::domunmap(void* addr, unsigned len)
532  {
533  assert(len && 0 == (len % s_physpgsz));
534  if (addr) {
535  assert(Unknown != s_mmapworks);
536  if (Copy != s_mmapworks) {
537  if (-1 == ::munmap(addr, len))
538  throw Exception("munmap", errno);
539  } else {
540  std::free(addr);
541  }
542  }
543  }
544 
546  {
547  // try to mprotect the other bits of the pool with no access...
548  // we'd really like a version of mremap here that can unmap all the
549  // other pages in the chunk, but that does not exist, so we protect
550  // the other pages in this chunk such that they may neither be read,
551  // written nor executed, only the pages we're interested in for
552  // communications stay readable and writable
553  //
554  // if an OS does not support changing the protection of a part of an
555  // mmapped area, the mprotect calls below should just fail and not
556  // change any protection, so we're a little less safe against
557  // corruption, but everything should still work
558  if (Copy != s_mmapworks) {
559  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
560  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
561  unsigned char* p2 = p1 + p.npages() * s_physpgsz;
562  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
563  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
564  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
565  }
566  m_parent = 0;
567  m_freelist.clear();
568  m_nUsedGrp = 1;
569  p.m_pimpl->m_parent = 0;
570  m_begin = m_end = 0;
571  // commit suicide
572  delete this;
573  }
574 
575  PagePool::PagePool(unsigned nPgPerGroup) :
576  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
577  {
578  // if logical and physical page size differ, we may have to adjust
579  // m_nPgPerGrp to make things fit
581  const unsigned mult =
583  const unsigned desired = nPgPerGroup * PageChunk::pagesize();
584  // round up to to next physical page boundary
585  const unsigned actual = mult *
586  (desired / mult + bool(desired % mult));
587  const unsigned newPgPerGrp = actual / PageChunk::pagesize();
588  if (BidirMMapPipe::debugflag()) {
589  std::cerr << " INFO: In " << __func__ << " (" <<
590  __FILE__ << ", line " << __LINE__ <<
591  "): physical page size " << PageChunk::physPgSz() <<
592  ", subdividing into logical pages of size " <<
593  PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
594  m_nPgPerGrp << " -> " << newPgPerGrp <<
595  std::endl;
596  }
597  assert(newPgPerGrp >= m_nPgPerGrp);
598  m_nPgPerGrp = newPgPerGrp;
599  }
600  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
601  }
602 
603  PagePool::~PagePool()
604  {
605  m_freelist.clear();
606  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
607  delete *it;
608  m_chunks.clear();
609  }
610 
611  void PagePool::zap(Pages& p)
612  {
613  // unmap all pages but those pointed to by p
614  m_freelist.clear();
615  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
616  if ((*it)->contains(p)) {
617  (*it)->zap(p);
618  } else {
619  delete *it;
620  }
621  }
622  m_chunks.clear();
623  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
624  m_cursz = minsz;
625  }
626 
627  Pages PagePool::pop()
628  {
629  if (m_freelist.empty()) {
630  // allocate and register new chunk and put it on the freelist
631  const int sz = nextChunkSz();
632  Chunk *c = new Chunk(this,
633  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
634  m_chunks.push_front(c);
635  m_freelist.push_back(c);
636  updateCurSz(sz, +1);
637  }
638  // get free element from first chunk on _freelist
639  Chunk* c = m_freelist.front();
640  Pages p(c->pop());
641  // full chunks are removed from _freelist
642  if (c->full()) m_freelist.pop_front();
643  return p;
644  }
645 
646  void PagePool::release(PageChunk* chunk)
647  {
648  assert(chunk->empty());
649  // find chunk on freelist and remove
650  ChunkList::iterator it = std::find(
651  m_freelist.begin(), m_freelist.end(), chunk);
652  if (m_freelist.end() == it)
653  throw Exception("PagePool::release(PageChunk*)", EINVAL);
654  m_freelist.erase(it);
655  // find chunk in m_chunks and remove
656  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
657  if (m_chunks.end() == it)
658  throw Exception("PagePool::release(PageChunk*)", EINVAL);
659  m_chunks.erase(it);
660  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
661  delete chunk;
662  updateCurSz(sz, -1);
663  }
664 
665  void PagePool::putOnFreeList(PageChunk* chunk)
666  {
667  assert(!chunk->full());
668  m_freelist.push_back(chunk);
669  }
670 
671  void PagePool::updateCurSz(int sz, int incr)
672  {
673  m_szmap[(sz - minsz) / szincr] += incr;
674  m_cursz = minsz;
675  for (int i = (maxsz - minsz) / szincr; i--; ) {
676  if (m_szmap[i]) {
677  m_cursz += i * szincr;
678  break;
679  }
680  }
681  }
682 
683  int PagePool::nextChunkSz() const
684  {
685  // no chunks with space available, figure out chunk size
686  int sz = m_cursz;
687  if (m_chunks.empty()) {
688  // if we start allocating chunks, we start from minsz
689  sz = minsz;
690  } else {
691  if (minsz >= sz) {
692  // minimal sized chunks are always grown
693  sz = minsz + szincr;
694  } else {
695  if (1 != m_chunks.size()) {
696  // if we have more than one completely filled chunk, grow
697  sz += szincr;
698  } else {
699  // just one chunk left, try shrinking chunk size
700  sz -= szincr;
701  }
702  }
703  }
704  // clamp size to allowed range
705  if (sz > maxsz) sz = maxsz;
706  if (sz < minsz) sz = minsz;
707  return sz;
708  }
709 }
710 
711 // static BidirMMapPipe members
712 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
713 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
714 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
717 
718 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
719 {
720  if (!s_pagepool)
721  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
722  return *s_pagepool;
723 }
724 
726 {
727  pthread_mutex_lock(&s_openpipesmutex);
728  while (!s_openpipes.empty()) {
729  BidirMMapPipe *p = s_openpipes.front();
730  pthread_mutex_unlock(&s_openpipesmutex);
731  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
732  p->doClose(true, true);
733  pthread_mutex_lock(&s_openpipesmutex);
734  }
735  pthread_mutex_unlock(&s_openpipesmutex);
736 }
737 
739  m_pages(pagepool().pop())
740 {
741  // free pages again
743  if (!s_pagepoolrefcnt) {
744  delete s_pagepool;
745  s_pagepool = 0;
746  }
747 }
748 
749 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
750  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
751  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
752  m_parentPid(::getpid())
753 
754 {
756  assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
757  int fds[4] = { -1, -1, -1, -1 };
758  int myerrno;
759  static bool firstcall = true;
760  if (useExceptions) m_flags |= exceptionsbit;
761 
762  try {
763  if (firstcall) {
764  firstcall = false;
765  // register a cleanup handler to make sure all BidirMMapPipes are torn
766  // down, and child processes are sent a SIGTERM
767  if (0 != atexit(BidirMMapPipe::teardownall))
768  throw Exception("atexit", errno);
769  }
770 
771  // build free lists
772  for (unsigned i = 1; i < TotPages; ++i)
773  m_pages[i - 1]->setNext(m_pages[i]);
774  m_pages[PagesPerEnd - 1]->setNext(0);
775  if (!useSocketpair) {
776  // create pipes
777  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
778  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
779  } else {
780  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
781  throw Exception("socketpair", errno);
782  }
783  // fork the child
784  pthread_mutex_lock(&s_openpipesmutex);
785  char c;
786  switch ((m_childPid = ::fork())) {
787  case -1: // error in fork()
788  myerrno = errno;
789  pthread_mutex_unlock(&s_openpipesmutex);
790  m_childPid = 0;
791  throw Exception("fork", myerrno);
792  case 0: // child
793  // put the ends in the right place
794  if (-1 != fds[2]) {
795  // pair of pipes
796  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
797  myerrno = errno;
798  pthread_mutex_unlock(&s_openpipesmutex);
799  throw Exception("close", myerrno);
800  }
801  fds[0] = fds[3] = -1;
802  m_outpipe = fds[1];
803  m_inpipe = fds[2];
804  } else {
805  // socket pair
806  if (-1 == ::close(fds[0])) {
807  myerrno = errno;
808  pthread_mutex_unlock(&s_openpipesmutex);
809  throw Exception("close", myerrno);
810  }
811  fds[0] = -1;
812  m_inpipe = m_outpipe = fds[1];
813  }
814  // close other pipes our parent may have open - we have no business
815  // reading from/writing to those...
816  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
817  s_openpipes.end() != it; ) {
818  BidirMMapPipe* p = *it;
819  it = s_openpipes.erase(it);
820  p->doClose(true, true);
821  }
822  pagepool().zap(m_pages);
823  s_pagepoolrefcnt = 0;
824  delete s_pagepool;
825  s_pagepool = 0;
826  s_openpipes.push_front(this);
827  pthread_mutex_unlock(&s_openpipesmutex);
828  // ok, put our pages on freelist
830  // handshare with other end (to make sure it's alive)...
831  c = 'C'; // ...hild
832  if (1 != xferraw(m_outpipe, &c, 1, ::write))
833  throw Exception("handshake: xferraw write", EPIPE);
834  if (1 != xferraw(m_inpipe, &c, 1, ::read))
835  throw Exception("handshake: xferraw read", EPIPE);
836  if ('P' != c) throw Exception("handshake", EPIPE);
837  break;
838  default: // parent
839  // put the ends in the right place
840  if (-1 != fds[2]) {
841  // pair of pipes
842  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
843  myerrno = errno;
844  pthread_mutex_unlock(&s_openpipesmutex);
845  throw Exception("close", myerrno);
846  }
847  fds[1] = fds[2] = -1;
848  m_outpipe = fds[3];
849  m_inpipe = fds[0];
850  } else {
851  // socketpair
852  if (-1 == ::close(fds[1])) {
853  myerrno = errno;
854  pthread_mutex_unlock(&s_openpipesmutex);
855  throw Exception("close", myerrno);
856  }
857  fds[1] = -1;
858  m_inpipe = m_outpipe = fds[0];
859  }
860  // put on list of open pipes (so we can kill child processes
861  // if things go wrong)
862  s_openpipes.push_front(this);
863  pthread_mutex_unlock(&s_openpipesmutex);
864  // ok, put our pages on freelist
865  m_freelist = m_pages[0u];
866  // handshare with other end (to make sure it's alive)...
867  c = 'P'; // ...arent
868  if (1 != xferraw(m_outpipe, &c, 1, ::write))
869  throw Exception("handshake: xferraw write", EPIPE);
870  if (1 != xferraw(m_inpipe, &c, 1, ::read))
871  throw Exception("handshake: xferraw read", EPIPE);
872  if ('C' != c) throw Exception("handshake", EPIPE);
873  break;
874  }
875  // mark file descriptors for close on exec (we do not want to leak the
876  // connection to anything we happen to exec)
877  int fdflags = 0;
878  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
879  throw Exception("fcntl", errno);
880  fdflags |= FD_CLOEXEC;
881  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
882  throw Exception("fcntl", errno);
883  if (m_inpipe != m_outpipe) {
884  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
885  throw Exception("fcntl", errno);
886  fdflags |= FD_CLOEXEC;
887  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
888  throw Exception("fcntl", errno);
889  }
890  // ok, finally, clear the failbit
891  m_flags &= ~failbit;
892  // all done
893  } catch (BidirMMapPipe::Exception&) {
894  if (0 != m_childPid) kill(m_childPid, SIGTERM);
895  for (int i = 0; i < 4; ++i)
896  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
897  {
898  // free resources associated with mmapped pages
900  }
901  if (!--s_pagepoolrefcnt) {
902  delete s_pagepool;
903  s_pagepool = 0;
904  }
905  throw;
906  }
907 }
908 
910 {
911  assert(!(m_flags & failbit));
912  return doClose(false);
913 }
914 
915 int BidirMMapPipe::doClose(bool force, bool holdlock)
916 {
917  if (m_flags & failbit) return 0;
918  // flush data to be written
919  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
920  // shut down the write direction (no more writes from our side)
921  if (m_inpipe == m_outpipe) {
922  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
923  throw Exception("shutdown", errno);
924  m_outpipe = -1;
925  } else {
926  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
927  if (!force) throw Exception("close", errno);
928  m_outpipe = -1;
929  }
930  // shut down the write direction (no more writes from our side)
931  // drain anything the other end might still want to send
932  if (!force && -1 != m_inpipe) {
933  // **************** THIS IS EXTREMELY UGLY: ****************
934  // POLLHUP is not set reliably on pipe/socket shutdown on all
935  // platforms, unfortunately, so we poll for readability here until
936  // the other end closes, too
937  //
938  // the read loop below ensures that the other end sees the POLLIN that
939  // is set on shutdown instead, and goes ahead to close its end
940  //
941  // if we don't do this, and close straight away, the other end
942  // will catch a SIGPIPE or similar, and we don't want that
943  int err;
944  struct pollfd fds;
945  fds.fd = m_inpipe;
946  fds.events = POLLIN;
947  fds.revents = 0;
948  do {
949  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
950  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
951  if (fds.revents & POLLIN) {
952  char c;
953  if (1 > ::read(m_inpipe, &c, 1)) break;
954  }
955  }
956  } while (0 > err && EINTR == errno);
957  // ignore all other poll errors
958  }
959  // close read end
960  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
961  if (!force) throw Exception("close", errno);
962  m_inpipe = -1;
963  // unmap memory
964  try {
966  if (!--s_pagepoolrefcnt) {
967  delete s_pagepool;
968  s_pagepool = 0;
969  }
970  } catch (std::exception&) {
971  if (!force) throw;
972  }
974  // wait for child process
975  int retVal = 0;
976  if (isParent()) {
977  int tmp;
978  do {
979  tmp = waitpid(m_childPid, &retVal, 0);
980  } while (-1 == tmp && EINTR == errno);
981  if (-1 == tmp)
982  if (!force) throw Exception("waitpid", errno);
983  m_childPid = 0;
984  }
985  // remove from list of open pipes
986  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
987  std::list<BidirMMapPipe*>::iterator it = std::find(
988  s_openpipes.begin(), s_openpipes.end(), this);
989  if (s_openpipes.end() != it) s_openpipes.erase(it);
990  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
991  m_flags |= failbit;
992  return retVal;
993 }
994 
996 { doClose(false); }
997 
999  int fd, void* addr, size_type len,
1000  ssize_t (*xferfn)(int, void*, std::size_t))
1001 {
1002  size_type xferred = 0;
1003  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1004  while (len) {
1005  ssize_t tmp = xferfn(fd, buf, len);
1006  if (tmp > 0) {
1007  xferred += tmp;
1008  len -= tmp;
1009  buf += tmp;
1010  continue;
1011  } else if (0 == tmp) {
1012  // check for end-of-file on pipe
1013  break;
1014  } else if (-1 == tmp) {
1015  // ok some error occurred, so figure out if we want to retry of throw
1016  switch (errno) {
1017  default:
1018  // if anything was transferred, return number of bytes
1019  // transferred so far, we can start throwing on the next
1020  // transfer...
1021  if (xferred) return xferred;
1022  // else throw
1023  throw Exception("xferraw", errno);
1024  case EAGAIN: // fallthrough intended
1025 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1026  case EWOULDBLOCK: // fallthrough intended
1027 #endif
1028  std::cerr << " ERROR: In " << __func__ << " (" <<
1029  __FILE__ << ", line " << __LINE__ <<
1030  "): expect transfer to block!" << std::endl;
1031  case EINTR:
1032  break;
1033  }
1034  continue;
1035  } else {
1036  throw Exception("xferraw: unexpected return value from read/write",
1037  errno);
1038  }
1039  }
1040  return xferred;
1041 }
1042 
1044 {
1045  if (plist) {
1046  unsigned char pg = m_pages[plist];
1047  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1050  // ok, have to copy pages through pipe
1051  for (Page* p = plist; p; p = p->next()) {
1052  if (sizeof(Page) + p->size() !=
1053  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1054  ::write)) {
1055  throw Exception("sendpages: short write", EPIPE);
1056  }
1057  }
1058  }
1059  } else {
1060  throw Exception("sendpages: short write", EPIPE);
1061  }
1062  } else { assert(plist); }
1063 }
1064 
1066 {
1067  unsigned char pg;
1068  unsigned retVal = 0;
1069  Page *plisthead = 0, *plisttail = 0;
1070  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1071  plisthead = plisttail = m_pages[pg];
1072  // ok, have number of pages
1075  // ok, need to copy pages through pipe
1076  for (; plisttail; ++retVal) {
1077  Page* p = plisttail;
1078  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1079  ::read)) {
1080  plisttail = p->next();
1081  if (!p->size()) continue;
1082  // break in case of read error
1083  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1084  ::read)) break;
1085  }
1086  }
1087  } else {
1088  retVal = lenPageList(plisthead);
1089  }
1090  }
1091  // put list of pages we just received into correct lists (busy/free)
1092  if (plisthead) feedPageLists(plisthead);
1093  // ok, retVal contains the number of pages read, so put them on the
1094  // correct lists
1095  return retVal;
1096 }
1097 
1099 {
1100  struct pollfd fds;
1101  fds.fd = m_inpipe;
1102  fds.events = POLLIN;
1103  fds.revents = 0;
1104  unsigned retVal = 0;
1105  do {
1106  int rc = ::poll(&fds, 1, 0);
1107  if (0 > rc) {
1108  if (EINTR == errno) continue;
1109  break;
1110  }
1111  if (1 == retVal && fds.revents & POLLIN &&
1112  !(fds.revents & (POLLNVAL | POLLERR))) {
1113  // ok, we can read without blocking, so the other end has
1114  // something for us
1115  return recvpages();
1116  } else {
1117  break;
1118  }
1119  } while (true);
1120  return retVal;
1121 }
1122 
1124 {
1125  unsigned n = 0;
1126  for ( ; p; p = p->next()) ++n;
1127  return n;
1128 }
1129 
1131 {
1132  assert(plist);
1133  // get end of busy list
1134  Page *blend = m_busylist;
1135  while (blend && blend->next()) blend = blend->next();
1136  // ok, might have to send free pages to other end, and (if we do have to
1137  // send something to the other end) while we're at it, send any dirty
1138  // pages which are completely full, too
1139  Page *sendlisthead = 0, *sendlisttail = 0;
1140  // loop over plist
1141  while (plist) {
1142  Page* p = plist;
1143  plist = p->next();
1144  p->setNext(0);
1145  if (p->size()) {
1146  // busy page...
1147  p->pos() = 0;
1148  // put at end of busy list
1149  if (blend) blend->setNext(p);
1150  else m_busylist = p;
1151  blend = p;
1152  } else {
1153  // free page...
1154  // Very simple algorithm: once we're done with a page, we send it back
1155  // where it came from. If it's from our end, we put it on the free list, if
1156  // it's from the other end, we send it back.
1157  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1158  (isChild() && m_pages[p] < PagesPerEnd)) {
1159  // page "belongs" to other end
1160  if (!sendlisthead) sendlisthead = p;
1161  if (sendlisttail) sendlisttail->setNext(p);
1162  sendlisttail = p;
1163  } else {
1164  // add page to freelist
1165  p->setNext(m_freelist);
1166  m_freelist = p;
1167  }
1168  }
1169  }
1170  // check if we have to send stuff to the other end
1171  if (sendlisthead) {
1172  // go through our list of dirty pages, and see what we can
1173  // send along
1174  Page* dp;
1175  while ((dp = m_dirtylist) && dp->full()) {
1176  Page* p = dp;
1177  // move head of dirty list
1178  m_dirtylist = p->next();
1179  // queue for sending
1180  p->setNext(0);
1181  sendlisttail->setNext(p);
1182  sendlisttail = p;
1183  }
1184  // poll if the other end is still alive - this needs that we first
1185  // close the write pipe of the other end when the remote end of the
1186  // connection is shutting down in doClose; we'll see that because we
1187  // get a POLLHUP on our inpipe
1188  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1189  struct pollfd fds[2];
1190  fds[0].fd = m_outpipe;
1191  fds[0].events = fds[0].revents = 0;
1192  if (m_outpipe != m_inpipe) {
1193  fds[1].fd = m_inpipe;
1194  fds[1].events = fds[1].revents = 0;
1195  } else {
1196  fds[0].events |= POLLIN;
1197  }
1198  int retVal = 0;
1199  do {
1200  retVal = ::poll(fds, nfds, 0);
1201  if (0 > retVal && EINTR == errno)
1202  continue;
1203  break;
1204  } while (true);
1205  if (0 <= retVal) {
1206  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1207  if (m_outpipe != m_inpipe) {
1208  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1209  } else {
1210  if (ok && fds[0].revents & POLLIN) {
1211  unsigned ret = recvpages();
1212  if (!ret) ok = false;
1213  }
1214  }
1215 
1216  if (ok) sendpages(sendlisthead);
1217  // (if the pipe is dead already, we don't care that we leak the
1218  // contents of the pages on the send list here, so that is why
1219  // there's no else clause here)
1220  } else {
1221  throw Exception("feedPageLists: poll", errno);
1222  }
1223  }
1224 }
1225 
1227 {
1228  assert(p);
1229  assert(p == m_freelist);
1230  // remove from freelist
1231  m_freelist = p->next();
1232  p->setNext(0);
1233  // append to dirty list
1234  Page* dl = m_dirtylist;
1235  while (dl && dl->next()) dl = dl->next();
1236  if (dl) dl->setNext(p);
1237  else m_dirtylist = p;
1238 }
1239 
1241 {
1242  // queue any pages available for reading we can without blocking
1244  Page* p;
1245  // if there are no busy pages, try to get them from the other end,
1246  // block if we have to...
1247  while (!(p = m_busylist)) if (!recvpages()) return 0;
1248  return p;
1249 }
1250 
1252 {
1253  // queue any pages available for reading we can without blocking
1255  Page* p = m_dirtylist;
1256  // go to end of dirty list
1257  if (p) while (p->next()) p = p->next();
1258  if (!p || p->full()) {
1259  // need to append free page, so get one
1260  while (!(p = m_freelist)) if (!recvpages()) return 0;
1261  markPageDirty(p);
1262  }
1263  return p;
1264 }
1265 
1267 { return doFlush(true); }
1268 
1269 void BidirMMapPipe::doFlush(bool forcePartialPages)
1270 {
1271  assert(!(m_flags & failbit));
1272  // build a list of pages to flush
1273  Page *flushlisthead = 0, *flushlisttail = 0;
1274  while (m_dirtylist) {
1275  Page* p = m_dirtylist;
1276  if (!forcePartialPages && !p->full()) break;
1277  // remove dirty page from dirty list
1278  m_dirtylist = p->next();
1279  p->setNext(0);
1280  // and send it to other end
1281  if (!flushlisthead) flushlisthead = p;
1282  if (flushlisttail) flushlisttail->setNext(p);
1283  flushlisttail = p;
1284  }
1285  if (flushlisthead) sendpages(flushlisthead);
1286 }
1287 
1289 {
1290  assert(!(m_flags & failbit));
1291  // join busy and dirty lists
1292  {
1293  Page *l = m_busylist;
1294  while (l && l->next()) l = l->next();
1295  if (l) l->setNext(m_dirtylist);
1296  else m_busylist = m_dirtylist;
1297  }
1298  // empty busy and dirty pages
1299  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1300  // put them on the free list
1302  m_busylist = m_dirtylist = 0;
1303 }
1304 
1306 {
1307  // queue all pages waiting for consumption in the pipe before we give an
1308  // answer
1310  size_type retVal = 0;
1311  for (Page* p = m_busylist; p; p = p->next())
1312  retVal += p->size() - p->pos();
1313  return retVal;
1314 }
1315 
1317 {
1318  // queue all pages waiting for consumption in the pipe before we give an
1319  // answer
1321  // check if we could write to the pipe without blocking (we need to know
1322  // because we might need to check if flushing of dirty pages would block)
1323  bool couldwrite = false;
1324  {
1325  struct pollfd fds;
1326  fds.fd = m_outpipe;
1327  fds.events = POLLOUT;
1328  fds.revents = 0;
1329  int retVal = 0;
1330  do {
1331  retVal = ::poll(&fds, 1, 0);
1332  if (0 > retVal) {
1333  if (EINTR == errno) continue;
1334  throw Exception("bytesWritableNonBlocking: poll", errno);
1335  }
1336  if (1 == retVal && fds.revents & POLLOUT &&
1337  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1338  couldwrite = true;
1339  break;
1340  } while (true);
1341  }
1342  // ok, start counting bytes
1343  size_type retVal = 0;
1344  unsigned npages = 0;
1345  // go through the dirty list
1346  for (Page* p = m_dirtylist; p; p = p->next()) {
1347  ++npages;
1348  // if page only partially filled
1349  if (!p->full())
1350  retVal += p->free();
1351  if (npages >= FlushThresh && !couldwrite) break;
1352  }
1353  // go through the free list
1354  for (Page* p = m_freelist; p && (!m_dirtylist ||
1355  npages < FlushThresh || couldwrite); p = p->next()) {
1356  ++npages;
1357  retVal += Page::capacity();
1358  }
1359  return retVal;
1360 }
1361 
1363 {
1364  assert(!(m_flags & failbit));
1365  size_type nread = 0;
1366  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1367  try {
1368  while (sz) {
1369  // find next page to read from
1370  Page* p = busypage();
1371  if (!p) {
1372  m_flags |= eofbit;
1373  return nread;
1374  }
1375  unsigned char* pp = p->begin() + p->pos();
1376  size_type csz = std::min(size_type(p->remaining()), sz);
1377  std::copy(pp, pp + csz, ap);
1378  nread += csz;
1379  ap += csz;
1380  sz -= csz;
1381  p->pos() += csz;
1382  assert(p->size() >= p->pos());
1383  if (p->size() == p->pos()) {
1384  // if no unread data remains, page is free
1385  m_busylist = p->next();
1386  p->setNext(0);
1387  p->size() = 0;
1388  feedPageLists(p);
1389  }
1390  }
1391  } catch (Exception&) {
1392  m_flags |= rderrbit;
1393  if (m_flags & exceptionsbit) throw;
1394  }
1395  return nread;
1396 }
1397 
1399 {
1400  assert(!(m_flags & failbit));
1401  size_type written = 0;
1402  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1403  try {
1404  while (sz) {
1405  // find next page to write to
1406  Page* p = dirtypage();
1407  if (!p) {
1408  m_flags |= eofbit;
1409  return written;
1410  }
1411  unsigned char* pp = p->begin() + p->size();
1412  size_type csz = std::min(size_type(p->free()), sz);
1413  std::copy(ap, ap + csz, pp);
1414  written += csz;
1415  ap += csz;
1416  p->size() += csz;
1417  sz -= csz;
1418  assert(p->capacity() >= p->size());
1419  if (p->full()) {
1420  // if page is full, see if we're above the flush threshold of
1421  // 3/4 of our pages
1423  doFlush(false);
1424  }
1425  }
1426  } catch (Exception&) {
1427  m_flags |= wrerrbit;
1428  if (m_flags & exceptionsbit) throw;
1429  }
1430  return written;
1431 }
1432 
1434 {
1435  // go through pipes, and change flags where we already know without really
1436  // polling - stuff where we don't need poll to wait for its timeout in the
1437  // OS...
1438  bool canskiptimeout = false;
1439  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1440  std::vector<unsigned>::iterator mit = masks.begin();
1441  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1442  ++it, ++mit) {
1443  PollEntry& pe = *it;
1444  pe.revents = None;
1445  // null pipe pointer or closed pipe is invalid
1446  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1447  // check for error
1448  if (pe.pipe->bad()) pe.revents |= Error;
1449  // check for end of file
1450  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1451  // check if readable
1452  if (pe.events & Readable) {
1453  *mit |= Readable;
1454  if (pe.pipe->m_busylist) pe.revents |= Readable;
1455  }
1456  // check if writable
1457  if (pe.events & Writable) {
1458  *mit |= Writable;
1459  if (pe.pipe->m_freelist) {
1460  pe.revents |= Writable;
1461  } else {
1462  Page *dl = pe.pipe->m_dirtylist;
1463  while (dl && dl->next()) dl = dl->next();
1464  if (dl && dl->pos() < Page::capacity())
1465  pe.revents |= Writable;
1466  }
1467  }
1468  if (pe.revents) canskiptimeout = true;
1469  }
1470  // set up the data structures required for the poll syscall
1471  std::vector<pollfd> fds;
1472  fds.reserve(2 * pipes.size());
1473  std::map<int, PollEntry*> fds2pipes;
1474  for (PollVector::const_iterator it = pipes.begin();
1475  pipes.end() != it; ++it) {
1476  const PollEntry& pe = *it;
1477  struct pollfd tmp;
1478  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1479  const_cast<PollEntry*>(&pe)));
1480  tmp.events = tmp.revents = 0;
1481  // we always poll for readability; this allows us to queue pages
1482  // early
1483  tmp.events |= POLLIN;
1484  if (pe.pipe->m_outpipe != tmp.fd) {
1485  // ok, it's a pair of pipes
1486  fds.push_back(tmp);
1487  fds2pipes.insert(std::make_pair(
1488  unsigned(tmp.fd = pe.pipe->m_outpipe),
1489  const_cast<PollEntry*>(&pe)));
1490  tmp.events = 0;
1491 
1492  }
1493  if (pe.events & Writable) tmp.events |= POLLOUT;
1494  fds.push_back(tmp);
1495  }
1496  // poll
1497  int retVal = 0;
1498  do {
1499  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1500  if (0 > retVal) {
1501  if (EINTR == errno) continue;
1502  throw Exception("poll", errno);
1503  }
1504  break;
1505  } while (true);
1506  // fds may have changed state, so update...
1507  for (std::vector<pollfd>::iterator it = fds.begin();
1508  fds.end() != it; ++it) {
1509  pollfd& fe = *it;
1510  //if (!fe.revents) continue;
1511  --retVal;
1512  PollEntry& pe = *fds2pipes[fe.fd];
1513 oncemore:
1514  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1515  pe.revents |= ReadInvalid;
1516  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1517  pe.revents |= WriteInvalid;
1518  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1519  pe.revents |= ReadError;
1520  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1521  pe.revents |= WriteError;
1522  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1523  pe.revents |= ReadEndOfFile;
1524  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1525  pe.revents |= WriteEndOfFile;
1526  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1527  !(fe.revents & (POLLNVAL | POLLERR))) {
1528  // ok, there is at least one page for us to receive from the
1529  // other end
1530  if (0 == pe.pipe->recvpages()) continue;
1531  // more pages there?
1532  do {
1533  int tmp = ::poll(&fe, 1, 0);
1534  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1535  if (0 > tmp) {
1536  if (EINTR == errno) continue;
1537  throw Exception("poll", errno);
1538  }
1539  break;
1540  } while (true);
1541  }
1542  if (pe.pipe->m_busylist) pe.revents |= Readable;
1543  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1544  if (pe.pipe->m_freelist) {
1545  pe.revents |= Writable;
1546  } else {
1547  Page *dl = pe.pipe->m_dirtylist;
1548  while (dl && dl->next()) dl = dl->next();
1549  if (dl && dl->pos() < Page::capacity())
1550  pe.revents |= Writable;
1551  }
1552  }
1553  }
1554  // apply correct masks, and count pipes with pending events
1555  int npipes = 0;
1556  mit = masks.begin();
1557  for (PollVector::iterator it = pipes.begin();
1558  pipes.end() != it; ++it, ++mit)
1559  if ((it->revents &= *mit)) ++npipes;
1560  return npipes;
1561 }
1562 
1564 {
1565  size_t sz = std::strlen(str);
1566  *this << sz;
1567  if (sz) write(str, sz);
1568  return *this;
1569 }
1570 
1572 {
1573  size_t sz = 0;
1574  *this >> sz;
1575  if (good() && !eof()) {
1576  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1577  if (!str) throw Exception("realloc", errno);
1578  if (sz) read(str, sz);
1579  str[sz] = 0;
1580  }
1581  return *this;
1582 }
1583 
1585 {
1586  size_t sz = str.size();
1587  *this << sz;
1588  write(str.data(), sz);
1589  return *this;
1590 }
1591 
1593 {
1594  str.clear();
1595  size_t sz = 0;
1596  *this >> sz;
1597  if (good() && !eof()) {
1598  str.reserve(sz);
1599  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1600  }
1601  return *this;
1602 }
1603 
1605 
1606 #ifdef TEST_BIDIRMMAPPIPE
1607 using namespace RooFit;
1608 
1609 int simplechild(BidirMMapPipe& pipe)
1610 {
1611  // child does an echo loop
1612  while (pipe.good() && !pipe.eof()) {
1613  // read a string
1614  std::string str;
1615  pipe >> str;
1616  if (!pipe) return -1;
1617  if (pipe.eof()) break;
1618  if (!str.empty()) {
1619  std::cout << "[CHILD] : read: " << str << std::endl;
1620  str = "... early in the morning?";
1621  }
1622  pipe << str << BidirMMapPipe::flush;
1623  // did our parent tell us to shut down?
1624  if (str.empty()) break;
1625  if (!pipe) return -1;
1626  if (pipe.eof()) break;
1627  std::cout << "[CHILD] : wrote: " << str << std::endl;
1628  }
1629  pipe.close();
1630  return 0;
1631 }
1632 
1633 #include <sstream>
1634 int randomchild(BidirMMapPipe& pipe)
1635 {
1636  // child sends out something at random intervals
1637  ::srand48(::getpid());
1638  {
1639  // wait for parent's go ahead signal
1640  std::string s;
1641  pipe >> s;
1642  }
1643  // no shutdown sequence needed on this side - we're producing the data,
1644  // and the parent can just read until we're done (when it'll get EOF)
1645  for (int i = 0; i < 5; ++i) {
1646  // sleep a random time between 0 and .9 seconds
1647  ::usleep(int(1e6 * ::drand48()));
1648  std::ostringstream buf;
1649  buf << "child pid " << ::getpid() << " sends message " << i;
1650  std::string str = buf.str();
1651  std::cout << "[CHILD] : " << str << std::endl;
1652  pipe << str << BidirMMapPipe::flush;
1653  if (!pipe) return -1;
1654  if (pipe.eof()) break;
1655  }
1656  // tell parent we're shutting down
1657  pipe << "" << BidirMMapPipe::flush;
1658  // wait for parent to acknowledge
1659  std::string s;
1660  pipe >> s;
1661  pipe.close();
1662  return 0;
1663 }
1664 
1665 int benchchildrtt(BidirMMapPipe& pipe)
1666 {
1667  // child does the equivalent of listening for pings and sending the
1668  // packet back
1669  char* str = 0;
1670  while (pipe && !pipe.eof()) {
1671  pipe >> str;
1672  if (!pipe) {
1673  std::free(str);
1674  pipe.close();
1675  return -1;
1676  }
1677  if (pipe.eof()) break;
1678  pipe << str << BidirMMapPipe::flush;
1679  // if we have just completed the shutdown handshake, we break here
1680  if (!std::strlen(str)) break;
1681  }
1682  std::free(str);
1683  pipe.close();
1684  return 0;
1685 }
1686 
1687 int benchchildsink(BidirMMapPipe& pipe)
1688 {
1689  // child behaves like a sink
1690  char* str = 0;
1691  while (pipe && !pipe.eof()) {
1692  pipe >> str;
1693  if (!std::strlen(str)) break;
1694  }
1695  pipe << "" << BidirMMapPipe::flush;
1696  std::free(str);
1697  pipe.close();
1698  return 0;
1699 }
1700 
1701 int benchchildsource(BidirMMapPipe& pipe)
1702 {
1703  // child behaves like a source
1704  char* str = 0;
1705  for (unsigned i = 0; i <= 24; ++i) {
1706  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1707  std::memset(str, '4', 1 << i);
1708  str[1 << i] = 0;
1709  for (unsigned j = 0; j < 1 << 7; ++j) {
1710  pipe << str;
1711  if (!pipe || pipe.eof()) {
1712  std::free(str);
1713  pipe.close();
1714  return -1;
1715  }
1716  }
1717  // tell parent we're done with this block size
1718  pipe << "" << BidirMMapPipe::flush;
1719  }
1720  // tell parent to shut down
1721  pipe << "" << BidirMMapPipe::flush;
1722  std::free(str);
1723  pipe.close();
1724  return 0;
1725 }
1726 
1727 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1728 {
1729  // create a pipe with the given child at the remote end
1730  BidirMMapPipe *p = new BidirMMapPipe();
1731  if (p->isChild()) {
1732  int retVal = childexec(*p);
1733  delete p;
1734  std::exit(retVal);
1735  }
1736  return p;
1737 }
1738 
1739 #include <sys/time.h>
1740 #include <iomanip>
1741 int main()
1742 {
1743  // simple echo loop test
1744  {
1745  std::cout << "[PARENT]: simple challenge-response test, "
1746  "one child:" << std::endl;
1747  BidirMMapPipe* pipe = spawnChild(simplechild);
1748  for (int i = 0; i < 5; ++i) {
1749  std::string str("What shall we do with a drunken sailor...");
1750  *pipe << str << BidirMMapPipe::flush;
1751  if (!*pipe) return -1;
1752  std::cout << "[PARENT]: wrote: " << str << std::endl;
1753  *pipe >> str;
1754  if (!*pipe) return -1;
1755  std::cout << "[PARENT]: read: " << str << std::endl;
1756  }
1757  // send shutdown string
1758  *pipe << "" << BidirMMapPipe::flush;
1759  // wait for shutdown handshake
1760  std::string s;
1761  *pipe >> s;
1762  int retVal = pipe->close();
1763  std::cout << "[PARENT]: exit status of child: " << retVal <<
1764  std::endl;
1765  if (retVal) return retVal;
1766  delete pipe;
1767  }
1768  // simple poll test - children send 5 results in random intervals
1769  {
1770  unsigned nch = 20;
1771  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1772  " children:" << std::endl;
1773  typedef BidirMMapPipe::PollEntry PollEntry;
1774  // poll data structure
1776  pipes.reserve(nch);
1777  // spawn children
1778  for (unsigned i = 0; i < nch; ++i) {
1779  std::cout << "[PARENT]: spawning child " << i << std::endl;
1780  pipes.push_back(PollEntry(spawnChild(randomchild),
1782  }
1783  // wake children up
1784  std::cout << "[PARENT]: waking up children" << std::endl;
1785  for (unsigned i = 0; i < nch; ++i)
1786  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1787  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1788  // while at least some children alive
1789  while (!pipes.empty()) {
1790  // poll, wait until status change (infinite timeout)
1791  int npipes = BidirMMapPipe::poll(pipes, -1);
1792  // scan for pipes with changed status
1793  for (std::vector<PollEntry>::iterator it = pipes.begin();
1794  npipes && pipes.end() != it; ) {
1795  if (!it->revents) {
1796  // unchanged, next one
1797  ++it;
1798  continue;
1799  }
1800  --npipes; // maybe we can stop early...
1801  // read from pipes which are readable
1802  if (it->revents & BidirMMapPipe::Readable) {
1803  std::string s;
1804  *(it->pipe) >> s;
1805  if (!s.empty()) {
1806  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1807  ": " << s << std::endl;
1808  ++it;
1809  continue;
1810  } else {
1811  // child is shutting down...
1812  *(it->pipe) << "" << BidirMMapPipe::flush;
1813  goto childcloses;
1814  }
1815  }
1816  // retire pipes with error or end-of-file condition
1817  if (it->revents & (BidirMMapPipe::Error |
1820  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1821  " revents" <<
1822  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1823  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1824  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1825  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1826  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1827  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1828  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1829  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1830  std::endl;
1831 childcloses:
1832  int retVal = it->pipe->close();
1833  std::cout << "[PARENT]: child exit status: " <<
1834  retVal << ", number of children still alive: " <<
1835  (pipes.size() - 1) << std::endl;
1836  if (retVal) return retVal;
1837  delete it->pipe;
1838  it = pipes.erase(it);
1839  continue;
1840  }
1841  }
1842  }
1843  }
1844  // little benchmark - round trip time
1845  {
1846  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1847  for (unsigned i = 0; i <= 24; ++i) {
1848  char *s = new char[1 + (1 << i)];
1849  std::memset(s, 'A', 1 << i);
1850  s[1 << i] = 0;
1851  const unsigned n = 1 << 7;
1852  double avg = 0., min = 1e42, max = -1e42;
1853  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1854  for (unsigned j = n; j--; ) {
1855  struct timeval t1;
1856  ::gettimeofday(&t1, 0);
1857  *pipe << s << BidirMMapPipe::flush;
1858  if (!*pipe || pipe->eof()) break;
1859  *pipe >> s;
1860  if (!*pipe || pipe->eof()) break;
1861  struct timeval t2;
1862  ::gettimeofday(&t2, 0);
1863  t2.tv_sec -= t1.tv_sec;
1864  t2.tv_usec -= t1.tv_usec;
1865  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1866  if (dt < min) min = dt;
1867  if (dt > max) max = dt;
1868  avg += dt;
1869  }
1870  // send a shutdown string
1871  *pipe << "" << BidirMMapPipe::flush;
1872  // get child's shutdown ok
1873  *pipe >> s;
1874  avg /= double(n);
1875  avg *= 1e6; min *= 1e6; max *= 1e6;
1876  int retVal = pipe->close();
1877  if (retVal) {
1878  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1879  delete[] s;
1880  return retVal;
1881  }
1882  delete pipe;
1883  // there is a factor 2 in the formula for the transfer rate below,
1884  // because we transfer data of twice the size of the block - once
1885  // to the child, and once for the return trip
1886  std::cout << "block size " << std::setw(9) << (1 << i) <<
1887  " avg " << std::setw(7) << avg << " us min " <<
1888  std::setw(7) << min << " us max " << std::setw(7) << max <<
1889  "us speed " << std::setw(9) <<
1890  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1891  " MB/s" << std::endl;
1892  delete[] s;
1893  }
1894  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1895  }
1896  // little benchmark - child as sink
1897  {
1898  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1899  for (unsigned i = 0; i <= 24; ++i) {
1900  char *s = new char[1 + (1 << i)];
1901  std::memset(s, 'A', 1 << i);
1902  s[1 << i] = 0;
1903  const unsigned n = 1 << 7;
1904  double avg = 0., min = 1e42, max = -1e42;
1905  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1906  for (unsigned j = n; j--; ) {
1907  struct timeval t1;
1908  ::gettimeofday(&t1, 0);
1909  // streaming mode - we do not flush here
1910  *pipe << s;
1911  if (!*pipe || pipe->eof()) break;
1912  struct timeval t2;
1913  ::gettimeofday(&t2, 0);
1914  t2.tv_sec -= t1.tv_sec;
1915  t2.tv_usec -= t1.tv_usec;
1916  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1917  if (dt < min) min = dt;
1918  if (dt > max) max = dt;
1919  avg += dt;
1920  }
1921  // send a shutdown string
1922  *pipe << "" << BidirMMapPipe::flush;
1923  // get child's shutdown ok
1924  *pipe >> s;
1925  avg /= double(n);
1926  avg *= 1e6; min *= 1e6; max *= 1e6;
1927  int retVal = pipe->close();
1928  if (retVal) {
1929  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1930  return retVal;
1931  }
1932  delete pipe;
1933  std::cout << "block size " << std::setw(9) << (1 << i) <<
1934  " avg " << std::setw(7) << avg << " us min " <<
1935  std::setw(7) << min << " us max " << std::setw(7) << max <<
1936  "us speed " << std::setw(9) <<
1937  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1938  " MB/s" << std::endl;
1939  delete[] s;
1940  }
1941  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1942  }
1943  // little benchmark - child as source
1944  {
1945  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1946  char *s = 0;
1947  double avg = 0., min = 1e42, max = -1e42;
1948  unsigned n = 0, bsz = 0;
1949  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1950  while (*pipe && !pipe->eof()) {
1951  struct timeval t1;
1952  ::gettimeofday(&t1, 0);
1953  // streaming mode - we do not flush here
1954  *pipe >> s;
1955  if (!*pipe || pipe->eof()) break;
1956  struct timeval t2;
1957  ::gettimeofday(&t2, 0);
1958  t2.tv_sec -= t1.tv_sec;
1959  t2.tv_usec -= t1.tv_usec;
1960  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1961  if (std::strlen(s)) {
1962  ++n;
1963  if (dt < min) min = dt;
1964  if (dt > max) max = dt;
1965  avg += dt;
1966  bsz = std::strlen(s);
1967  } else {
1968  if (!n) break;
1969  // next block size
1970  avg /= double(n);
1971  avg *= 1e6; min *= 1e6; max *= 1e6;
1972 
1973  std::cout << "block size " << std::setw(9) << bsz <<
1974  " avg " << std::setw(7) << avg << " us min " <<
1975  std::setw(7) << min << " us max " << std::setw(7) <<
1976  max << "us speed " << std::setw(9) <<
1977  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1978  " MB/s" << std::endl;
1979  n = 0;
1980  avg = 0.;
1981  min = 1e42;
1982  max = -1e42;
1983  }
1984  }
1985  int retVal = pipe->close();
1986  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1987  if (retVal) return retVal;
1988  delete pipe;
1989  std::free(s);
1990  }
1991  return 0;
1992 }
1993 #endif // TEST_BIDIRMMAPPIPE
1994 #endif // _WIN32
1995 
1996 // vim: ft=cpp:sw=4:tw=78:et
c
#define c(i)
Definition: RSha256.hxx:119
l
auto * l
Definition: textangle.C:4
BidirMMapPipe_impl::PageChunk::m_nPgPerGrp
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
n
const Int_t n
Definition: legend1.C:16
BidirMMapPipe::ReadError
@ ReadError
pipe error read end
Definition: BidirMMapPipe.h:510
BidirMMapPipe::PollEntry::pipe
BidirMMapPipe * pipe
pipe of interest
Definition: BidirMMapPipe.h:524
fit1_py.fill
fill
Definition: fit1_py.py:6
BidirMMapPipe::write
size_type write(const void *addr, size_type sz)
wirte to pipe
Definition: BidirMMapPipe.cxx:1398
BidirMMapPipe_impl::PageChunk::empty
bool empty() const
return true if no used page groups in this chunk
Definition: BidirMMapPipe.h:116
BidirMMapPipe_impl::Pages::pageno
unsigned pageno(Page *p) const
perform page to page number mapping
Definition: BidirMMapPipe.cxx:317
e
#define e(i)
Definition: RSha256.hxx:121
BidirMMapPipe::WriteError
@ WriteError
pipe error Write end
Definition: BidirMMapPipe.h:511
BidirMMapPipe::s_debugflag
static int s_debugflag
debug flag
Definition: BidirMMapPipe.h:893
BidirMMapPipe::ReadInvalid
@ ReadInvalid
read end of pipe invalid
Definition: BidirMMapPipe.h:516
Exception
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
BidirMMapPipe::PagesPerEnd
@ PagesPerEnd
pages per pipe end
Definition: BidirMMapPipe.h:877
BidirMMapPipe_impl::Pages::impl::m_refcnt
unsigned m_refcnt
reference counter
Definition: BidirMMapPipe.h:138
f
#define f(i)
Definition: RSha256.hxx:122
BidirMMapPipe::s_pagepool
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
Definition: BidirMMapPipe.h:889
BidirMMapPipe_impl::PageChunk::Unknown
@ Unknown
don't know yet what'll work
Definition: BidirMMapPipe.h:47
BidirMMapPipe::eofbit
@ eofbit
end of file reached
Definition: BidirMMapPipe.h:387
BidirMMapPipe::s_openpipes
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
Definition: BidirMMapPipe.h:887
BidirMMapPipe::m_flags
int m_flags
flags (e.g. end of file)
Definition: BidirMMapPipe.h:905
BidirMMapPipe::s_openpipesmutex
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
Definition: BidirMMapPipe.h:885
BidirMMapPipe
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
Definition: BidirMMapPipe.h:378
BidirMMapPipe_impl::Pages::impl
implementation
Definition: BidirMMapPipe.h:135
BidirMMapPipe::operator<<
BidirMMapPipe & operator<<(const char *str)
write a C-style string
Definition: BidirMMapPipe.cxx:1563
masks
static unsigned long masks[]
Definition: gifencode.c:211
BidirMMapPipe::Writable
@ Writable
pipe can be written to
Definition: BidirMMapPipe.h:509
BidirMMapPipe_impl::PageChunk::pagesize
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:85
BidirMMapPipe::~BidirMMapPipe
~BidirMMapPipe()
destructor
Definition: BidirMMapPipe.cxx:995
BidirMMapPipe::PollEntry::events
unsigned events
events of interest (or'ed bitmask)
Definition: BidirMMapPipe.h:525
BidirMMapPipe_impl::Pages::impl::m_pages
Page * m_pages
pointer to first page
Definition: BidirMMapPipe.h:137
BidirMMapPipe_impl::PageChunk::Exception
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
BidirMMapPipe_impl::PageChunk::domunmap
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
Definition: BidirMMapPipe.cxx:531
BidirMMapPipe_impl::PageChunk::getPageSize
static unsigned getPageSize()
determine page size at run time
Definition: BidirMMapPipe.cxx:329
BidirMMapPipe::TotPages
@ TotPages
pages shared (child + parent)
Definition: BidirMMapPipe.h:875
BidirMMapPipe_impl::Pages::impl::m_parent
PageChunk * m_parent
pointer to parent pool
Definition: BidirMMapPipe.h:136
BidirMMapPipe_impl::PageChunk::push
void push(const Pages &p)
push a group of pages onto the free list
Definition: BidirMMapPipe.cxx:377
operator=
Binding & operator=(OUT(*fun)(void))
Definition: TRInterface_Binding.h:11
TGeant4Unit::s
static constexpr double s
Definition: TGeant4SystemOfUnits.h:168
BidirMMapPipe::Exception
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
Definition: BidirMMapPipe.h:384
BidirMMapPipe_impl::Pages::impl::m_npages
unsigned char m_npages
length in pages
Definition: BidirMMapPipe.h:139
BidirMMapPipe::eof
bool eof() const
true if end-of-file
Definition: BidirMMapPipe.h:711
BidirMMapPipe::markPageDirty
void markPageDirty(Page *p)
put on dirty pages list
Definition: BidirMMapPipe.cxx:1226
BidirMMapPipe_impl::Pages
handle class for a number of Pages
Definition: BidirMMapPipe.h:132
BidirMMapPipe_impl::Pages::npages
unsigned npages() const
return number of pages accessible
Definition: BidirMMapPipe.h:166
BidirMMapPipe_impl::PageChunk::m_parent
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
BidirMMapPipe::Page
BidirMMapPipe_impl::Page Page
convenience typedef for Page
Definition: BidirMMapPipe.h:868
BidirMMapPipe::m_childPid
pid_t m_childPid
pid of the child (zero if we're child)
Definition: BidirMMapPipe.h:906
BidirMMapPipe_impl::PageChunk::MMapVariety
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:46
BidirMMapPipe_impl::PageChunk::m_begin
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
ROOT::Math::detail::open
@ open
Definition: GenVectorIO.h:55
BidirMMapPipe::isChild
bool isChild() const
return if this end of the pipe is the child end
Definition: BidirMMapPipe.h:687
bool
BidirMMapPipe_impl::PageChunk::s_mmapworks
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
BidirMMapPipe::PollEntry
for poll() interface
Definition: BidirMMapPipe.h:522
BidirMMapPipe_impl::PageChunk::PageChunk
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
BidirMMapPipe::bytesWritableNonBlocking
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
Definition: BidirMMapPipe.cxx:1316
BidirMMapPipe::None
@ None
nothing special on this pipe
Definition: BidirMMapPipe.h:507
BidirMMapPipe::PollVector
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
Definition: BidirMMapPipe.h:535
BidirMMapPipe_impl::PageChunk::Anonymous
@ Anonymous
anonymous mmap works
Definition: BidirMMapPipe.h:51
BidirMMapPipe::busypage
Page * busypage()
get a busy page to read data from (may block)
Definition: BidirMMapPipe.cxx:1240
BidirMMapPipe::poll
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
Definition: BidirMMapPipe.cxx:1433
BidirMMapPipe::WriteEndOfFile
@ WriteEndOfFile
write pipe in end-of-file state
Definition: BidirMMapPipe.h:514
BidirMMapPipe_impl::PageChunk::m_freelist
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
BidirMMapPipe::lenPageList
static unsigned lenPageList(const Page *list)
return length of a page list
Definition: BidirMMapPipe.cxx:1123
BidirMMapPipe::m_inpipe
int m_inpipe
pipe end from which data may be read
Definition: BidirMMapPipe.h:903
BidirMMapPipe_impl::Pages::m_pimpl
impl * m_pimpl
pointer to implementation
Definition: BidirMMapPipe.h:193
BidirMMapPipe_impl::PageChunk::m_nUsedGrp
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
malloc
#define malloc
Definition: civetweb.c:1536
BidirMMapPipe::purge
void purge()
purge buffered data waiting to be read and/or written
Definition: BidirMMapPipe.cxx:1288
BidirMMapPipe::failbit
@ failbit
logical failure (e.g. pipe closed)
Definition: BidirMMapPipe.h:388
BidirMMapPipe::Readable
@ Readable
pipe has data for reading
Definition: BidirMMapPipe.h:508
BidirMMapPipe::read
size_type read(void *addr, size_type sz)
read from pipe
Definition: BidirMMapPipe.cxx:1362
BidirMMapPipe::m_freelist
Page * m_freelist
linked list: free pages
Definition: BidirMMapPipe.h:901
BidirMMapPipe::m_outpipe
int m_outpipe
pipe end to which data may be written
Definition: BidirMMapPipe.h:904
BidirMMapPipe::pagepool
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
Definition: BidirMMapPipe.cxx:718
BEGIN_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.cxx:33
RooFit
Definition: RooCFunction1Binding.h:29
BidirMMapPipe::WriteInvalid
@ WriteInvalid
write end of pipe invalid
Definition: BidirMMapPipe.h:517
BidirMMapPipe::exceptionsbit
@ exceptionsbit
error reporting with exceptions
Definition: BidirMMapPipe.h:392
BidirMMapPipe::good
bool good() const
status of stream is good
Definition: BidirMMapPipe.h:729
BidirMMapPipe_impl::Pages::~Pages
~Pages()
destructor
Definition: BidirMMapPipe.cxx:282
BidirMMapPipe_impl::PageChunk::nPagesPerGroup
unsigned nPagesPerGroup() const
return number of pages per page group
Definition: BidirMMapPipe.h:113
BidirMMapPipe::Invalid
@ Invalid
invalid pipe
Definition: BidirMMapPipe.h:518
BidirMMapPipe::Error
@ Error
pipe error
Definition: BidirMMapPipe.h:512
BidirMMapPipe::bytesReadableNonBlocking
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Definition: BidirMMapPipe.cxx:1305
main
int main(int argc, char **argv)
Definition: histspeedtest.cxx:752
BidirMMapPipe::size_type
std::size_t size_type
type used to represent sizes
Definition: BidirMMapPipe.h:382
BidirMMapPipe::BidirMMapPipe
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
Definition: BidirMMapPipe.cxx:749
BidirMMapPipe::FlushThresh
@ FlushThresh
flush threshold
Definition: BidirMMapPipe.h:881
BidirMMapPipe_impl::PageChunk::Copy
@ Copy
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:48
BidirMMapPipe::flush
void flush()
flush buffers with unwritten data
Definition: BidirMMapPipe.cxx:1266
BidirMMapPipe::sendpages
void sendpages(Page *plist)
send page(s) to the other end (may block)
Definition: BidirMMapPipe.cxx:1043
what
static const char * what
Definition: stlLoader.cc:6
BidirMMapPipe::wrerrbit
@ wrerrbit
write error
Definition: BidirMMapPipe.h:390
BidirMMapPipe::EndOfFile
@ EndOfFile
end of file
Definition: BidirMMapPipe.h:515
BidirMMapPipe_impl::PageChunk::s_physpgsz
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:55
BidirMMapPipe_impl::PageChunk
class representing a chunk of pages
Definition: BidirMMapPipe.h:43
double
double
Definition: Converters.cxx:921
END_NAMESPACE_ROOFIT
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.cxx:34
BidirMMapPipe::rderrbit
@ rderrbit
read error
Definition: BidirMMapPipe.h:389
BidirMMapPipe::recvpages_nonblock
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
Definition: BidirMMapPipe.cxx:1098
realloc
#define realloc
Definition: civetweb.c:1538
BidirMMapPipe::operator>>
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
Definition: BidirMMapPipe.cxx:1571
BidirMMapPipe::doFlush
void doFlush(bool forcePartialPages=true)
perform the flush
Definition: BidirMMapPipe.cxx:1269
void
typedef void((*Func_t)())
BidirMMapPipe_impl::PageChunk::FileBacked
@ FileBacked
mmapping a temp file works
Definition: BidirMMapPipe.h:49
BidirMMapPipe_impl::Pages::page
Page * page(unsigned pgno) const
return page number pageno
Definition: BidirMMapPipe.cxx:308
BidirMMapPipe_impl::PageChunk::dommap
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
Definition: BidirMMapPipe.cxx:391
ROOT::Math::detail::close
@ close
Definition: GenVectorIO.h:55
BidirMMapPipe_impl::Pages::pagesize
static unsigned pagesize()
return page size
Definition: BidirMMapPipe.cxx:306
BidirMMapPipe::closed
bool closed() const
true if closed
Definition: BidirMMapPipe.h:735
BidirMMapPipe_impl::PageChunk::DevZero
@ DevZero
mmapping /dev/zero works
Definition: BidirMMapPipe.h:50
BidirMMapPipe::ReadEndOfFile
@ ReadEndOfFile
read pipe in end-of-file state
Definition: BidirMMapPipe.h:513
t1
auto * t1
Definition: textangle.C:20
BidirMMapPipe_impl
namespace for implementation details of BidirMMapPipe
Definition: BidirMMapPipe.cxx:39
BidirMMapPipe_impl::Pages::operator=
Pages & operator=(const Pages &other)
assignment operator
Definition: BidirMMapPipe.cxx:294
BidirMMapPipe_impl::Pages::swap
void swap(Pages &other)
swap with other's contents
Definition: BidirMMapPipe.h:181
BidirMMapPipe::xferraw
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
Definition: BidirMMapPipe.cxx:998
name
char name[80]
Definition: TGX11.cxx:110
BidirMMapPipe::doClose
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
Definition: BidirMMapPipe.cxx:915
BidirMMapPipe::close
int close()
flush buffers, close pipe
Definition: BidirMMapPipe.cxx:909
BidirMMapPipe::dirtypage
Page * dirtypage()
get a dirty page to write data to (may block)
Definition: BidirMMapPipe.cxx:1251
BidirMMapPipe::bad
bool bad() const
true on I/O error
Definition: BidirMMapPipe.h:723
BidirMMapPipe_impl::PageChunk::s_pagesize
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:56
BidirMMapPipe::s_pagepoolrefcnt
static unsigned s_pagepoolrefcnt
page pool reference counter
Definition: BidirMMapPipe.h:891
BidirMMapPipe::debugflag
static int debugflag()
return the current setting of the debug flag
Definition: BidirMMapPipe.h:431
free
#define free
Definition: civetweb.c:1539
BidirMMapPipe::feedPageLists
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
Definition: BidirMMapPipe.cxx:1130
BidirMMapPipe::isParent
bool isParent() const
return if this end of the pipe is the parent end
Definition: BidirMMapPipe.h:681
BidirMMapPipe_impl::PageChunk::contains
bool contains(const Pages &p) const
return if p is contained in this PageChunk
Definition: BidirMMapPipe.cxx:365
BidirMMapPipe_impl::PageChunk::zap
void zap(Pages &p)
free all pages except for those pointed to by p
Definition: BidirMMapPipe.cxx:545
BidirMMapPipe::m_pages
BidirMMapPipe_impl::Pages m_pages
mmapped pages
Definition: BidirMMapPipe.h:899
BidirMMapPipe::recvpages
unsigned recvpages()
receive a pages from the other end (may block), queue them
Definition: BidirMMapPipe.cxx:1065
BidirMMapPipe::m_dirtylist
Page * m_dirtylist
linked list: dirty pages (data to be sent)
Definition: BidirMMapPipe.h:902
BidirMMapPipe_impl::PageChunk::physPgSz
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:87
BidirMMapPipe_impl::Pages::Pages
Pages()
default constructor
Definition: BidirMMapPipe.h:143
BidirMMapPipe_impl::PageChunk::pop
Pages pop()
pop a group of pages off the free list
Definition: BidirMMapPipe.cxx:368
BidirMMapPipe_impl::PageChunk::len
unsigned len() const
return length of chunk
Definition: BidirMMapPipe.h:107
BidirMMapPipe.h
BidirMMapPipe_impl::PageChunk::m_end
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
BidirMMapPipe::PollEntry::revents
unsigned revents
events that happened (or'ed bitmask)
Definition: BidirMMapPipe.h:526
BidirMMapPipe_impl::PageChunk::mmapVariety
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:89
BidirMMapPipe::m_busylist
Page * m_busylist
linked list: busy pages (data to be read)
Definition: BidirMMapPipe.h:900
BidirMMapPipe_impl::PageChunk::~PageChunk
~PageChunk()
destructor
Definition: BidirMMapPipe.cxx:359
BidirMMapPipe::teardownall
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
Definition: BidirMMapPipe.cxx:725