Logo ROOT  
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1/** @file BidirMMapPipe.cxx
2 *
3 * implementation of BidirMMapPipe, a class which forks off a child process
4 * and serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9#ifndef _WIN32
10#include <map>
11#include <cerrno>
12#include <limits>
13#include <string>
14#include <cstdlib>
15#include <cstring>
16#include <cassert>
17#include <iostream>
18#include <algorithm>
19#include <exception>
20
21#include <poll.h>
22#include <fcntl.h>
23#include <signal.h>
24#include <unistd.h>
25#include <pthread.h>
26#include <sys/mman.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#include <sys/socket.h>
30
31#include "BidirMMapPipe.h"
32
33#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
34#define END_NAMESPACE_ROOFIT }
35
37
38/// namespace for implementation details of BidirMMapPipe
40 /** @brief exception to throw if low-level OS calls go wrong
41 *
42 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
43 * @date 2013-07-07
44 */
45 class BidirMMapPipeException : public std::exception
46 {
47 private:
48 enum {
49 s_sz = 256 ///< length of buffer
50 };
51 char m_buf[s_sz]; ///< buffer containing the error message
52
53 /// for the POSIX version of strerror_r
54 static int dostrerror_r(int err, char* buf, std::size_t sz,
55 int (*f)(int, char*, std::size_t))
56 { return f(err, buf, sz); }
57 /// for the GNU version of strerror_r
58 static int dostrerror_r(int, char*, std::size_t,
59 char* (*f)(int, char*, std::size_t));
60 public:
61 /// constructor taking error code, hint on operation (msg)
62 BidirMMapPipeException(const char* msg, int err);
63 /// return a destcription of what went wrong
64 virtual const char* what() const noexcept { return m_buf; }
65 };
66
68 {
69 std::size_t msgsz = std::strlen(msg);
70 if (msgsz) {
71 msgsz = std::min(msgsz, std::size_t(s_sz));
72 std::copy(msg, msg + msgsz, m_buf);
73 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
74 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
75 }
76 if (msgsz < s_sz) {
77 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
78 // have to sort it out with overloads
79 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
80 }
81 m_buf[s_sz - 1] = 0; // enforce zero-termination
82 }
83
85 std::size_t sz, char* (*f)(int, char*, std::size_t))
86 {
87 buf[0] = 0;
88 char *tmp = f(err, buf, sz);
89 if (tmp && tmp != buf) {
90 std::strncpy(buf, tmp, sz);
91 buf[sz - 1] = 0;
92 if (std::strlen(tmp) > sz - 1) return ERANGE;
93 }
94 return 0;
95 }
96
97 /** @brief class representing the header structure in an mmapped page
98 *
99 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
100 * @date 2013-07-07
101 *
102 * contains a field to put pages into a linked list, a field for the size
103 * of the data being transmitted, and a field for the position until which
104 * the data has been read
105 */
106 class Page
107 {
108 private:
109 // use as small a data type as possible to maximise payload area
110 // of pages
111 short m_next; ///< next page in list (in pagesizes)
112 unsigned short m_size; ///< size of payload (in bytes)
113 unsigned short m_pos; ///< index of next byte in payload area
114 /// copy construction forbidden
115 Page(const Page&) {}
116 /// assigment forbidden
117 Page& operator=(const Page&) = delete;
118 public:
119 /// constructor
120 Page() : m_next(0), m_size(0), m_pos(0)
121 {
122 // check that short is big enough - must be done at runtime
123 // because the page size is not known until runtime
124 assert(std::numeric_limits<unsigned short>::max() >=
126 }
127 /// set pointer to next page
128 void setNext(const Page* p);
129 /// return pointer to next page
130 Page* next() const;
131 /// return reference to size field
132 unsigned short& size() { return m_size; }
133 /// return size (of payload data)
134 unsigned size() const { return m_size; }
135 /// return reference to position field
136 unsigned short& pos() { return m_pos; }
137 /// return position
138 unsigned pos() const { return m_pos; }
139 /// return pointer to first byte in payload data area of page
140 inline unsigned char* begin() const
141 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
142 + sizeof(Page); }
143 /// return pointer to first byte in payload data area of page
144 inline unsigned char* end() const
145 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
147 /// return the capacity of the page
148 static unsigned capacity()
149 { return PageChunk::pagesize() - sizeof(Page); }
150 /// true if page empty
151 bool empty() const { return !m_size; }
152 /// true if page partially filled
153 bool filled() const { return !empty(); }
154 /// free space left (to be written to)
155 unsigned free() const { return capacity() - m_size; }
156 /// bytes remaining to be read
157 unsigned remaining() const { return m_size - m_pos; }
158 /// true if page completely full
159 bool full() const { return !free(); }
160 };
161
162 void Page::setNext(const Page* p)
163 {
164 if (!p) {
165 m_next = 0;
166 } else {
167 const char* p1 = reinterpret_cast<char*>(this);
168 const char* p2 = reinterpret_cast<const char*>(p);
169 std::ptrdiff_t tmp = p2 - p1;
170 // difference must be divisible by page size
171 assert(!(tmp % PageChunk::pagesize()));
172 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
173 m_next = tmp;
174 // no truncation when saving in a short
175 assert(m_next == tmp);
176 // final check: next() must return p
177 assert(next() == p);
178 }
179 }
180
182 {
183 if (!m_next) return 0;
184 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
185 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
186 return reinterpret_cast<Page*>(ptmp);
187 }
188
189 /** @brief class representing a page pool
190 *
191 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
192 * @date 2013-07-24
193 *
194 * pool of mmapped pages (on systems which support it, on all others, the
195 * functionality is emulated with dynamically allocated memory)
196 *
197 * in most operating systems there is a limit to how many mappings any one
198 * process is allowed to request; for this reason, we mmap a relatively
199 * large amount up front, and then carve off little pieces as we need them
200 *
201 * Moreover, some systems have too large a physical page size in their MMU
202 * for the code to handle (we want offsets and lengths to fit into 16
203 * bits), so we carve such big physical pages into smaller logical Pages
204 * if needed. The largest logical page size is currently 16 KiB.
205 */
206 class PagePool {
207 private:
208 /// convenience typedef
210
211 enum {
212 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
213 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
214 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
215 };
216 /// a chunk of memory in the pool
218 /// list of chunks
219 typedef std::list<Chunk*> ChunkList;
220
222 public:
223 /// convenience typedef
225 /// constructor
226 PagePool(unsigned nPagesPerGroup);
227 /// destructor
228 ~PagePool();
229 /// pop a free element out of the pool
230 Pages pop();
231
232 /// return (logical) page size of the system
233 static unsigned pagesize() { return PageChunk::pagesize(); }
234 /// return variety of mmap supported on the system
236 { return PageChunk::mmapVariety(); }
237
238 /// return number of pages per group (ie. as returned by pop())
239 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
240
241 /// zap the pool (unmap all but Pages p)
242 void zap(Pages& p);
243
244 private:
245 /// list of chunks used by the pool
247 /// list of chunks used by the pool which are not full
249 /// chunk size map (histogram of chunk sizes)
250 unsigned m_szmap[(maxsz - minsz) / szincr];
251 /// current chunk size
253 /// page group size
254 unsigned m_nPgPerGrp;
255
256 /// adjust _cursz to current largest block
257 void updateCurSz(int sz, int incr);
258 /// find size of next chunk to allocate (in a hopefully smart way)
259 int nextChunkSz() const;
260 /// release a chunk
261 void putOnFreeList(Chunk* chunk);
262 /// release a chunk
263 void release(Chunk* chunk);
264 };
265
266 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
267 m_pimpl(new impl)
268 {
269 assert(npg < 256);
270 m_pimpl->m_parent = parent;
271 m_pimpl->m_pages = pages;
272 m_pimpl->m_refcnt = 1;
273 m_pimpl->m_npages = npg;
274 /// initialise pages
275 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
276 }
277
279 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
281
283 {
284 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
285 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
286 delete m_pimpl;
287 }
288 }
289
290 Pages::Pages(const Pages& other) :
291 m_pimpl(other.m_pimpl)
292 { ++(m_pimpl->m_refcnt); }
293
295 {
296 if (&other == this) return *this;
297 if (!--(m_pimpl->m_refcnt)) {
298 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
299 delete m_pimpl;
300 }
301 m_pimpl = other.m_pimpl;
302 ++(m_pimpl->m_refcnt);
303 return *this;
304 }
305
306 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
307
308 Page* Pages::page(unsigned pgno) const
309 {
310 assert(pgno < m_pimpl->m_npages);
311 unsigned char* pptr =
312 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
313 pptr += pgno * pagesize();
314 return reinterpret_cast<Page*>(pptr);
315 }
316
317 unsigned Pages::pageno(Page* p) const
318 {
319 const unsigned char* pptr =
320 reinterpret_cast<const unsigned char*>(p);
321 const unsigned char* bptr =
322 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
323 assert(0 == ((pptr - bptr) % pagesize()));
324 const unsigned nr = (pptr - bptr) / pagesize();
325 assert(nr < m_pimpl->m_npages);
326 return nr;
327 }
328
330 {
331 // find out page size of system
332 long pgsz = sysconf(_SC_PAGESIZE);
333 if (-1 == pgsz) throw Exception("sysconf", errno);
334 if (pgsz > 512 && pgsz > long(sizeof(Page)))
335 return pgsz;
336
337 // in case of failure or implausible value, use a safe default: 4k
338 // page size, and do not try to mmap
340 return 1 << 12;
341 }
342
344 unsigned length, unsigned nPgPerGroup) :
345 m_begin(dommap(length)),
346 m_end(reinterpret_cast<void*>(
347 reinterpret_cast<unsigned char*>(m_begin) + length)),
348 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
349 {
350 // ok, push groups of pages onto freelist here
351 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
352 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
353 while (p < pend) {
354 m_freelist.push_back(reinterpret_cast<void*>(p));
355 p += nPgPerGroup * PagePool::pagesize();
356 }
357 }
358
360 {
361 if (m_parent) assert(empty());
362 if (m_begin) domunmap(m_begin, len());
363 }
364
365 bool PageChunk::contains(const Pages& p) const
366 { return p.m_pimpl->m_parent == this; }
367
369 {
370 assert(!m_freelist.empty());
371 void* p = m_freelist.front();
372 m_freelist.pop_front();
373 ++m_nUsedGrp;
374 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
375 }
376
377 void PageChunk::push(const Pages& p)
378 {
379 assert(contains(p));
380 bool wasempty = m_freelist.empty();
381 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
382 --m_nUsedGrp;
383 if (m_parent) {
384 // notify parent if we need to be put on the free list again
385 if (wasempty) m_parent->putOnFreeList(this);
386 // notify parent if we're empty
387 if (empty()) return m_parent->release(this);
388 }
389 }
390
391 void* PageChunk::dommap(unsigned len)
392 {
393 assert(len && 0 == (len % s_physpgsz));
394 // ok, the idea here is to try the different methods of mmapping, and
395 // choose the first one that works. we have four flavours:
396 // 1 - anonymous mmap (best)
397 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
398 // bit more tedious to set up, since you need to open/close a
399 // device file)
400 // 3 - mmap of a temporary file (very tedious to set up - need to
401 // create a temporary file, delete it, make the underlying storage
402 // large enough, then mmap the fd and close it)
403 // 4 - if all those fail, we malloc the buffers, and copy the data
404 // through the OS (then we're no better than normal pipes)
405 static bool msgprinted = false;
407#if defined(MAP_ANONYMOUS)
408#undef MYANONFLAG
409#define MYANONFLAG MAP_ANONYMOUS
410#elif defined(MAP_ANON)
411#undef MYANONFLAG
412#define MYANONFLAG MAP_ANON
413#else
414#undef MYANONFLAG
415#endif
416#ifdef MYANONFLAG
417 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
418 MYANONFLAG | MAP_SHARED, -1, 0);
419 if (MAP_FAILED == retVal) {
420 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
421 } else {
422 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
424 if (BidirMMapPipe::debugflag() && !msgprinted) {
425 std::cerr << " INFO: In " << __func__ << " (" <<
426 __FILE__ << ", line " << __LINE__ <<
427 "): anonymous mmapping works, excellent!" <<
428 std::endl;
429 msgprinted = true;
430 }
431 return retVal;
432 }
433#endif
434#undef MYANONFLAG
435 }
436 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
437 // ok, no anonymous mappings supported directly, so try to map
438 // /dev/zero which has much the same effect on many systems
439 int fd = ::open("/dev/zero", O_RDWR);
440 if (-1 == fd)
441 throw Exception("open /dev/zero", errno);
442 void* retVal = ::mmap(0, len,
443 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
444 if (MAP_FAILED == retVal) {
445 int errsv = errno;
446 ::close(fd);
447 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
448 } else {
449 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
451 }
452 if (-1 == ::close(fd))
453 throw Exception("close /dev/zero", errno);
454 if (BidirMMapPipe::debugflag() && !msgprinted) {
455 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
456 ", line " << __LINE__ << "): mmapping /dev/zero works, "
457 "very good!" << std::endl;
458 msgprinted = true;
459 }
460 return retVal;
461 }
463 char name[] = "/tmp/BidirMMapPipe-XXXXXX";
464 int fd;
465 // open temp file
466 if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
467 // remove it, but keep fd open
468 if (-1 == ::unlink(name)) {
469 int errsv = errno;
470 ::close(fd);
471 throw Exception("unlink", errsv);
472 }
473 // make it the right size: lseek
474 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
475 int errsv = errno;
476 ::close(fd);
477 throw Exception("lseek", errsv);
478 }
479 // make it the right size: write a byte
480 if (1 != ::write(fd, name, 1)) {
481 int errsv = errno;
482 ::close(fd);
483 throw Exception("write", errsv);
484 }
485 // do mmap
486 void* retVal = ::mmap(0, len,
487 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
488 if (MAP_FAILED == retVal) {
489 int errsv = errno;
490 ::close(fd);
491 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
492 } else {
493 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
495 }
496 if (-1 == ::close(fd)) {
497 int errsv = errno;
498 ::munmap(retVal, len);
499 throw Exception("close", errsv);
500 }
501 if (BidirMMapPipe::debugflag() && !msgprinted) {
502 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
503 ", line " << __LINE__ << "): mmapping temporary files "
504 "works, good!" << std::endl;
505 msgprinted = true;
506 }
507 return retVal;
508 }
509 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
510 // fallback solution: mmap does not work on this OS (or does not
511 // work for what we want to use it), so use a normal buffer of
512 // memory instead, and collect data in that buffer - this needs an
513 // additional write/read to/from the pipe(s), but there you go...
514 if (BidirMMapPipe::debugflag() && !msgprinted) {
515 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
516 ", line " << __LINE__ << "): anonymous mmapping of "
517 "shared buffers failed, falling back to read/write on "
518 " pipes!" << std::endl;
519 msgprinted = true;
520 }
522 void* retVal = std::malloc(len);
523 if (!retVal) throw Exception("malloc", errno);
524 return retVal;
525 }
526 // should never get here
527 assert(false);
528 return 0;
529 }
530
531 void PageChunk::domunmap(void* addr, unsigned len)
532 {
533 assert(len && 0 == (len % s_physpgsz));
534 if (addr) {
535 assert(Unknown != s_mmapworks);
536 if (Copy != s_mmapworks) {
537 if (-1 == ::munmap(addr, len))
538 throw Exception("munmap", errno);
539 } else {
540 std::free(addr);
541 }
542 }
543 }
544
546 {
547 // try to mprotect the other bits of the pool with no access...
548 // we'd really like a version of mremap here that can unmap all the
549 // other pages in the chunk, but that does not exist, so we protect
550 // the other pages in this chunk such that they may neither be read,
551 // written nor executed, only the pages we're interested in for
552 // communications stay readable and writable
553 //
554 // if an OS does not support changing the protection of a part of an
555 // mmapped area, the mprotect calls below should just fail and not
556 // change any protection, so we're a little less safe against
557 // corruption, but everything should still work
558 if (Copy != s_mmapworks) {
559 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
560 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
561 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
562 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
563 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
564 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
565 }
566 m_parent = 0;
567 m_freelist.clear();
568 m_nUsedGrp = 1;
569 p.m_pimpl->m_parent = 0;
570 m_begin = m_end = 0;
571 // commit suicide
572 delete this;
573 }
574
575 PagePool::PagePool(unsigned nPgPerGroup) :
576 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
577 {
578 // if logical and physical page size differ, we may have to adjust
579 // m_nPgPerGrp to make things fit
581 const unsigned mult =
583 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
584 // round up to to next physical page boundary
585 const unsigned actual = mult *
586 (desired / mult + bool(desired % mult));
587 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
589 std::cerr << " INFO: In " << __func__ << " (" <<
590 __FILE__ << ", line " << __LINE__ <<
591 "): physical page size " << PageChunk::physPgSz() <<
592 ", subdividing into logical pages of size " <<
593 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
594 m_nPgPerGrp << " -> " << newPgPerGrp <<
595 std::endl;
596 }
597 assert(newPgPerGrp >= m_nPgPerGrp);
598 m_nPgPerGrp = newPgPerGrp;
599 }
600 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
601 }
602
604 {
605 m_freelist.clear();
606 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
607 delete *it;
608 m_chunks.clear();
609 }
610
612 {
613 // unmap all pages but those pointed to by p
614 m_freelist.clear();
615 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
616 if ((*it)->contains(p)) {
617 (*it)->zap(p);
618 } else {
619 delete *it;
620 }
621 }
622 m_chunks.clear();
623 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
624 m_cursz = minsz;
625 }
626
628 {
629 if (m_freelist.empty()) {
630 // allocate and register new chunk and put it on the freelist
631 const int sz = nextChunkSz();
632 Chunk *c = new Chunk(this,
634 m_chunks.push_front(c);
635 m_freelist.push_back(c);
636 updateCurSz(sz, +1);
637 }
638 // get free element from first chunk on _freelist
639 Chunk* c = m_freelist.front();
640 Pages p(c->pop());
641 // full chunks are removed from _freelist
642 if (c->full()) m_freelist.pop_front();
643 return p;
644 }
645
647 {
648 assert(chunk->empty());
649 // find chunk on freelist and remove
650 ChunkList::iterator it = std::find(
651 m_freelist.begin(), m_freelist.end(), chunk);
652 if (m_freelist.end() == it)
653 throw Exception("PagePool::release(PageChunk*)", EINVAL);
654 m_freelist.erase(it);
655 // find chunk in m_chunks and remove
656 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
657 if (m_chunks.end() == it)
658 throw Exception("PagePool::release(PageChunk*)", EINVAL);
659 m_chunks.erase(it);
660 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
661 delete chunk;
662 updateCurSz(sz, -1);
663 }
664
666 {
667 assert(!chunk->full());
668 m_freelist.push_back(chunk);
669 }
670
671 void PagePool::updateCurSz(int sz, int incr)
672 {
673 m_szmap[(sz - minsz) / szincr] += incr;
674 m_cursz = minsz;
675 for (int i = (maxsz - minsz) / szincr; i--; ) {
676 if (m_szmap[i]) {
677 m_cursz += i * szincr;
678 break;
679 }
680 }
681 }
682
684 {
685 // no chunks with space available, figure out chunk size
686 int sz = m_cursz;
687 if (m_chunks.empty()) {
688 // if we start allocating chunks, we start from minsz
689 sz = minsz;
690 } else {
691 if (minsz >= sz) {
692 // minimal sized chunks are always grown
693 sz = minsz + szincr;
694 } else {
695 if (1 != m_chunks.size()) {
696 // if we have more than one completely filled chunk, grow
697 sz += szincr;
698 } else {
699 // just one chunk left, try shrinking chunk size
700 sz -= szincr;
701 }
702 }
703 }
704 // clamp size to allowed range
705 if (sz > maxsz) sz = maxsz;
706 if (sz < minsz) sz = minsz;
707 return sz;
708 }
709}
710
711// static BidirMMapPipe members
712pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
713std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
717
719{
720 if (!s_pagepool)
722 return *s_pagepool;
723}
724
726{
727 pthread_mutex_lock(&s_openpipesmutex);
728 while (!s_openpipes.empty()) {
729 BidirMMapPipe *p = s_openpipes.front();
730 pthread_mutex_unlock(&s_openpipesmutex);
731 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
732 p->doClose(true, true);
733 pthread_mutex_lock(&s_openpipesmutex);
734 }
735 pthread_mutex_unlock(&s_openpipesmutex);
736}
737
739 m_pages(pagepool().pop())
740{
741 // free pages again
743 if (!s_pagepoolrefcnt) {
744 delete s_pagepool;
745 s_pagepool = 0;
746 }
747}
748
749BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
750 m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
751 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
752 m_parentPid(::getpid())
753
754{
756 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
757 int fds[4] = { -1, -1, -1, -1 };
758 int myerrno;
759 static bool firstcall = true;
760 if (useExceptions) m_flags |= exceptionsbit;
761
762 try {
763 if (firstcall) {
764 firstcall = false;
765 // register a cleanup handler to make sure all BidirMMapPipes are torn
766 // down, and child processes are sent a SIGTERM
767 if (0 != atexit(BidirMMapPipe::teardownall))
768 throw Exception("atexit", errno);
769 }
770
771 // build free lists
772 for (unsigned i = 1; i < TotPages; ++i)
773 m_pages[i - 1]->setNext(m_pages[i]);
774 m_pages[PagesPerEnd - 1]->setNext(0);
775 if (!useSocketpair) {
776 // create pipes
777 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
778 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
779 } else {
780 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
781 throw Exception("socketpair", errno);
782 }
783 // fork the child
784 pthread_mutex_lock(&s_openpipesmutex);
785 char c;
786 switch ((m_childPid = ::fork())) {
787 case -1: // error in fork()
788 myerrno = errno;
789 pthread_mutex_unlock(&s_openpipesmutex);
790 m_childPid = 0;
791 throw Exception("fork", myerrno);
792 case 0: // child
793 // put the ends in the right place
794 if (-1 != fds[2]) {
795 // pair of pipes
796 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
797 myerrno = errno;
798 pthread_mutex_unlock(&s_openpipesmutex);
799 throw Exception("close", myerrno);
800 }
801 fds[0] = fds[3] = -1;
802 m_outpipe = fds[1];
803 m_inpipe = fds[2];
804 } else {
805 // socket pair
806 if (-1 == ::close(fds[0])) {
807 myerrno = errno;
808 pthread_mutex_unlock(&s_openpipesmutex);
809 throw Exception("close", myerrno);
810 }
811 fds[0] = -1;
812 m_inpipe = m_outpipe = fds[1];
813 }
814 // close other pipes our parent may have open - we have no business
815 // reading from/writing to those...
816 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
817 s_openpipes.end() != it; ) {
818 BidirMMapPipe* p = *it;
819 it = s_openpipes.erase(it);
820 p->doClose(true, true);
821 }
824 delete s_pagepool;
825 s_pagepool = 0;
826 s_openpipes.push_front(this);
827 pthread_mutex_unlock(&s_openpipesmutex);
828 // ok, put our pages on freelist
830 // handshare with other end (to make sure it's alive)...
831 c = 'C'; // ...hild
832 if (1 != xferraw(m_outpipe, &c, 1, ::write))
833 throw Exception("handshake: xferraw write", EPIPE);
834 if (1 != xferraw(m_inpipe, &c, 1, ::read))
835 throw Exception("handshake: xferraw read", EPIPE);
836 if ('P' != c) throw Exception("handshake", EPIPE);
837 break;
838 default: // parent
839 // put the ends in the right place
840 if (-1 != fds[2]) {
841 // pair of pipes
842 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
843 myerrno = errno;
844 pthread_mutex_unlock(&s_openpipesmutex);
845 throw Exception("close", myerrno);
846 }
847 fds[1] = fds[2] = -1;
848 m_outpipe = fds[3];
849 m_inpipe = fds[0];
850 } else {
851 // socketpair
852 if (-1 == ::close(fds[1])) {
853 myerrno = errno;
854 pthread_mutex_unlock(&s_openpipesmutex);
855 throw Exception("close", myerrno);
856 }
857 fds[1] = -1;
858 m_inpipe = m_outpipe = fds[0];
859 }
860 // put on list of open pipes (so we can kill child processes
861 // if things go wrong)
862 s_openpipes.push_front(this);
863 pthread_mutex_unlock(&s_openpipesmutex);
864 // ok, put our pages on freelist
865 m_freelist = m_pages[0u];
866 // handshare with other end (to make sure it's alive)...
867 c = 'P'; // ...arent
868 if (1 != xferraw(m_outpipe, &c, 1, ::write))
869 throw Exception("handshake: xferraw write", EPIPE);
870 if (1 != xferraw(m_inpipe, &c, 1, ::read))
871 throw Exception("handshake: xferraw read", EPIPE);
872 if ('C' != c) throw Exception("handshake", EPIPE);
873 break;
874 }
875 // mark file descriptors for close on exec (we do not want to leak the
876 // connection to anything we happen to exec)
877 int fdflags = 0;
878 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
879 throw Exception("fcntl", errno);
880 fdflags |= FD_CLOEXEC;
881 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
882 throw Exception("fcntl", errno);
883 if (m_inpipe != m_outpipe) {
884 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
885 throw Exception("fcntl", errno);
886 fdflags |= FD_CLOEXEC;
887 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
888 throw Exception("fcntl", errno);
889 }
890 // ok, finally, clear the failbit
891 m_flags &= ~failbit;
892 // all done
893 } catch (BidirMMapPipe::Exception&) {
894 if (0 != m_childPid) kill(m_childPid, SIGTERM);
895 for (int i = 0; i < 4; ++i)
896 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
897 {
898 // free resources associated with mmapped pages
900 }
901 if (!--s_pagepoolrefcnt) {
902 delete s_pagepool;
903 s_pagepool = 0;
904 }
905 throw;
906 }
907}
908
910{
911 assert(!(m_flags & failbit));
912 return doClose(false);
913}
914
915int BidirMMapPipe::doClose(bool force, bool holdlock)
916{
917 if (m_flags & failbit) return 0;
918 // flush data to be written
919 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
920 // shut down the write direction (no more writes from our side)
921 if (m_inpipe == m_outpipe) {
922 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
923 throw Exception("shutdown", errno);
924 m_outpipe = -1;
925 } else {
926 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
927 if (!force) throw Exception("close", errno);
928 m_outpipe = -1;
929 }
930 // shut down the write direction (no more writes from our side)
931 // drain anything the other end might still want to send
932 if (!force && -1 != m_inpipe) {
933 // **************** THIS IS EXTREMELY UGLY: ****************
934 // POLLHUP is not set reliably on pipe/socket shutdown on all
935 // platforms, unfortunately, so we poll for readability here until
936 // the other end closes, too
937 //
938 // the read loop below ensures that the other end sees the POLLIN that
939 // is set on shutdown instead, and goes ahead to close its end
940 //
941 // if we don't do this, and close straight away, the other end
942 // will catch a SIGPIPE or similar, and we don't want that
943 int err;
944 struct pollfd fds;
945 fds.fd = m_inpipe;
946 fds.events = POLLIN;
947 fds.revents = 0;
948 do {
949 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
950 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
951 if (fds.revents & POLLIN) {
952 char c;
953 if (1 > ::read(m_inpipe, &c, 1)) break;
954 }
955 }
956 } while (0 > err && EINTR == errno);
957 // ignore all other poll errors
958 }
959 // close read end
960 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
961 if (!force) throw Exception("close", errno);
962 m_inpipe = -1;
963 // unmap memory
964 try {
966 if (!--s_pagepoolrefcnt) {
967 delete s_pagepool;
968 s_pagepool = 0;
969 }
970 } catch (std::exception&) {
971 if (!force) throw;
972 }
974 // wait for child process
975 int retVal = 0;
976 if (isParent()) {
977 int tmp;
978 do {
979 tmp = waitpid(m_childPid, &retVal, 0);
980 } while (-1 == tmp && EINTR == errno);
981 if (-1 == tmp)
982 if (!force) throw Exception("waitpid", errno);
983 m_childPid = 0;
984 }
985 // remove from list of open pipes
986 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
987 std::list<BidirMMapPipe*>::iterator it = std::find(
988 s_openpipes.begin(), s_openpipes.end(), this);
989 if (s_openpipes.end() != it) s_openpipes.erase(it);
990 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
991 m_flags |= failbit;
992 return retVal;
993}
994
996{ doClose(false); }
997
999 int fd, void* addr, size_type len,
1000 ssize_t (*xferfn)(int, void*, std::size_t))
1001{
1002 size_type xferred = 0;
1003 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1004 while (len) {
1005 ssize_t tmp = xferfn(fd, buf, len);
1006 if (tmp > 0) {
1007 xferred += tmp;
1008 len -= tmp;
1009 buf += tmp;
1010 continue;
1011 } else if (0 == tmp) {
1012 // check for end-of-file on pipe
1013 break;
1014 } else if (-1 == tmp) {
1015 // ok some error occurred, so figure out if we want to retry of throw
1016 switch (errno) {
1017 default:
1018 // if anything was transferred, return number of bytes
1019 // transferred so far, we can start throwing on the next
1020 // transfer...
1021 if (xferred) return xferred;
1022 // else throw
1023 throw Exception("xferraw", errno);
1024 case EAGAIN: // fallthrough intended
1025#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1026 case EWOULDBLOCK: // fallthrough intended
1027#endif
1028 std::cerr << " ERROR: In " << __func__ << " (" <<
1029 __FILE__ << ", line " << __LINE__ <<
1030 "): expect transfer to block!" << std::endl;
1031 case EINTR:
1032 break;
1033 }
1034 continue;
1035 } else {
1036 throw Exception("xferraw: unexpected return value from read/write",
1037 errno);
1038 }
1039 }
1040 return xferred;
1041}
1042
1044{
1045 if (plist) {
1046 unsigned char pg = m_pages[plist];
1047 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1050 // ok, have to copy pages through pipe
1051 for (Page* p = plist; p; p = p->next()) {
1052 if (sizeof(Page) + p->size() !=
1053 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1054 ::write)) {
1055 throw Exception("sendpages: short write", EPIPE);
1056 }
1057 }
1058 }
1059 } else {
1060 throw Exception("sendpages: short write", EPIPE);
1061 }
1062 } else { assert(plist); }
1063}
1064
1066{
1067 unsigned char pg;
1068 unsigned retVal = 0;
1069 Page *plisthead = 0, *plisttail = 0;
1070 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1071 plisthead = plisttail = m_pages[pg];
1072 // ok, have number of pages
1075 // ok, need to copy pages through pipe
1076 for (; plisttail; ++retVal) {
1077 Page* p = plisttail;
1078 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1079 ::read)) {
1080 plisttail = p->next();
1081 if (!p->size()) continue;
1082 // break in case of read error
1083 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1084 ::read)) break;
1085 }
1086 }
1087 } else {
1088 retVal = lenPageList(plisthead);
1089 }
1090 }
1091 // put list of pages we just received into correct lists (busy/free)
1092 if (plisthead) feedPageLists(plisthead);
1093 // ok, retVal contains the number of pages read, so put them on the
1094 // correct lists
1095 return retVal;
1096}
1097
1099{
1100 struct pollfd fds;
1101 fds.fd = m_inpipe;
1102 fds.events = POLLIN;
1103 fds.revents = 0;
1104 unsigned retVal = 0;
1105 do {
1106 int rc = ::poll(&fds, 1, 0);
1107 if (0 > rc) {
1108 if (EINTR == errno) continue;
1109 break;
1110 }
1111 if (1 == retVal && fds.revents & POLLIN &&
1112 !(fds.revents & (POLLNVAL | POLLERR))) {
1113 // ok, we can read without blocking, so the other end has
1114 // something for us
1115 return recvpages();
1116 } else {
1117 break;
1118 }
1119 } while (true);
1120 return retVal;
1121}
1122
1124{
1125 unsigned n = 0;
1126 for ( ; p; p = p->next()) ++n;
1127 return n;
1128}
1129
1131{
1132 assert(plist);
1133 // get end of busy list
1134 Page *blend = m_busylist;
1135 while (blend && blend->next()) blend = blend->next();
1136 // ok, might have to send free pages to other end, and (if we do have to
1137 // send something to the other end) while we're at it, send any dirty
1138 // pages which are completely full, too
1139 Page *sendlisthead = 0, *sendlisttail = 0;
1140 // loop over plist
1141 while (plist) {
1142 Page* p = plist;
1143 plist = p->next();
1144 p->setNext(0);
1145 if (p->size()) {
1146 // busy page...
1147 p->pos() = 0;
1148 // put at end of busy list
1149 if (blend) blend->setNext(p);
1150 else m_busylist = p;
1151 blend = p;
1152 } else {
1153 // free page...
1154 // Very simple algorithm: once we're done with a page, we send it back
1155 // where it came from. If it's from our end, we put it on the free list, if
1156 // it's from the other end, we send it back.
1157 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1158 (isChild() && m_pages[p] < PagesPerEnd)) {
1159 // page "belongs" to other end
1160 if (!sendlisthead) sendlisthead = p;
1161 if (sendlisttail) sendlisttail->setNext(p);
1162 sendlisttail = p;
1163 } else {
1164 // add page to freelist
1165 p->setNext(m_freelist);
1166 m_freelist = p;
1167 }
1168 }
1169 }
1170 // check if we have to send stuff to the other end
1171 if (sendlisthead) {
1172 // go through our list of dirty pages, and see what we can
1173 // send along
1174 Page* dp;
1175 while ((dp = m_dirtylist) && dp->full()) {
1176 Page* p = dp;
1177 // move head of dirty list
1178 m_dirtylist = p->next();
1179 // queue for sending
1180 p->setNext(0);
1181 sendlisttail->setNext(p);
1182 sendlisttail = p;
1183 }
1184 // poll if the other end is still alive - this needs that we first
1185 // close the write pipe of the other end when the remote end of the
1186 // connection is shutting down in doClose; we'll see that because we
1187 // get a POLLHUP on our inpipe
1188 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1189 struct pollfd fds[2];
1190 fds[0].fd = m_outpipe;
1191 fds[0].events = fds[0].revents = 0;
1192 if (m_outpipe != m_inpipe) {
1193 fds[1].fd = m_inpipe;
1194 fds[1].events = fds[1].revents = 0;
1195 } else {
1196 fds[0].events |= POLLIN;
1197 }
1198 int retVal = 0;
1199 do {
1200 retVal = ::poll(fds, nfds, 0);
1201 if (0 > retVal && EINTR == errno)
1202 continue;
1203 break;
1204 } while (true);
1205 if (0 <= retVal) {
1206 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1207 if (m_outpipe != m_inpipe) {
1208 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1209 } else {
1210 if (ok && fds[0].revents & POLLIN) {
1211 unsigned ret = recvpages();
1212 if (!ret) ok = false;
1213 }
1214 }
1215
1216 if (ok) sendpages(sendlisthead);
1217 // (if the pipe is dead already, we don't care that we leak the
1218 // contents of the pages on the send list here, so that is why
1219 // there's no else clause here)
1220 } else {
1221 throw Exception("feedPageLists: poll", errno);
1222 }
1223 }
1224}
1225
1227{
1228 assert(p);
1229 assert(p == m_freelist);
1230 // remove from freelist
1231 m_freelist = p->next();
1232 p->setNext(0);
1233 // append to dirty list
1234 Page* dl = m_dirtylist;
1235 while (dl && dl->next()) dl = dl->next();
1236 if (dl) dl->setNext(p);
1237 else m_dirtylist = p;
1238}
1239
1241{
1242 // queue any pages available for reading we can without blocking
1244 Page* p;
1245 // if there are no busy pages, try to get them from the other end,
1246 // block if we have to...
1247 while (!(p = m_busylist)) if (!recvpages()) return 0;
1248 return p;
1249}
1250
1252{
1253 // queue any pages available for reading we can without blocking
1255 Page* p = m_dirtylist;
1256 // go to end of dirty list
1257 if (p) while (p->next()) p = p->next();
1258 if (!p || p->full()) {
1259 // need to append free page, so get one
1260 while (!(p = m_freelist)) if (!recvpages()) return 0;
1261 markPageDirty(p);
1262 }
1263 return p;
1264}
1265
1267{ return doFlush(true); }
1268
1269void BidirMMapPipe::doFlush(bool forcePartialPages)
1270{
1271 assert(!(m_flags & failbit));
1272 // build a list of pages to flush
1273 Page *flushlisthead = 0, *flushlisttail = 0;
1274 while (m_dirtylist) {
1275 Page* p = m_dirtylist;
1276 if (!forcePartialPages && !p->full()) break;
1277 // remove dirty page from dirty list
1278 m_dirtylist = p->next();
1279 p->setNext(0);
1280 // and send it to other end
1281 if (!flushlisthead) flushlisthead = p;
1282 if (flushlisttail) flushlisttail->setNext(p);
1283 flushlisttail = p;
1284 }
1285 if (flushlisthead) sendpages(flushlisthead);
1286}
1287
1289{
1290 assert(!(m_flags & failbit));
1291 // join busy and dirty lists
1292 {
1293 Page *l = m_busylist;
1294 while (l && l->next()) l = l->next();
1295 if (l) l->setNext(m_dirtylist);
1296 else m_busylist = m_dirtylist;
1297 }
1298 // empty busy and dirty pages
1299 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1300 // put them on the free list
1302 m_busylist = m_dirtylist = 0;
1303}
1304
1306{
1307 // queue all pages waiting for consumption in the pipe before we give an
1308 // answer
1310 size_type retVal = 0;
1311 for (Page* p = m_busylist; p; p = p->next())
1312 retVal += p->size() - p->pos();
1313 return retVal;
1314}
1315
1317{
1318 // queue all pages waiting for consumption in the pipe before we give an
1319 // answer
1321 // check if we could write to the pipe without blocking (we need to know
1322 // because we might need to check if flushing of dirty pages would block)
1323 bool couldwrite = false;
1324 {
1325 struct pollfd fds;
1326 fds.fd = m_outpipe;
1327 fds.events = POLLOUT;
1328 fds.revents = 0;
1329 int retVal = 0;
1330 do {
1331 retVal = ::poll(&fds, 1, 0);
1332 if (0 > retVal) {
1333 if (EINTR == errno) continue;
1334 throw Exception("bytesWritableNonBlocking: poll", errno);
1335 }
1336 if (1 == retVal && fds.revents & POLLOUT &&
1337 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1338 couldwrite = true;
1339 break;
1340 } while (true);
1341 }
1342 // ok, start counting bytes
1343 size_type retVal = 0;
1344 unsigned npages = 0;
1345 // go through the dirty list
1346 for (Page* p = m_dirtylist; p; p = p->next()) {
1347 ++npages;
1348 // if page only partially filled
1349 if (!p->full())
1350 retVal += p->free();
1351 if (npages >= FlushThresh && !couldwrite) break;
1352 }
1353 // go through the free list
1354 for (Page* p = m_freelist; p && (!m_dirtylist ||
1355 npages < FlushThresh || couldwrite); p = p->next()) {
1356 ++npages;
1357 retVal += Page::capacity();
1358 }
1359 return retVal;
1360}
1361
1363{
1364 assert(!(m_flags & failbit));
1365 size_type nread = 0;
1366 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1367 try {
1368 while (sz) {
1369 // find next page to read from
1370 Page* p = busypage();
1371 if (!p) {
1372 m_flags |= eofbit;
1373 return nread;
1374 }
1375 unsigned char* pp = p->begin() + p->pos();
1376 size_type csz = std::min(size_type(p->remaining()), sz);
1377 std::copy(pp, pp + csz, ap);
1378 nread += csz;
1379 ap += csz;
1380 sz -= csz;
1381 p->pos() += csz;
1382 assert(p->size() >= p->pos());
1383 if (p->size() == p->pos()) {
1384 // if no unread data remains, page is free
1385 m_busylist = p->next();
1386 p->setNext(0);
1387 p->size() = 0;
1388 feedPageLists(p);
1389 }
1390 }
1391 } catch (Exception&) {
1392 m_flags |= rderrbit;
1393 if (m_flags & exceptionsbit) throw;
1394 }
1395 return nread;
1396}
1397
1399{
1400 assert(!(m_flags & failbit));
1401 size_type written = 0;
1402 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1403 try {
1404 while (sz) {
1405 // find next page to write to
1406 Page* p = dirtypage();
1407 if (!p) {
1408 m_flags |= eofbit;
1409 return written;
1410 }
1411 unsigned char* pp = p->begin() + p->size();
1412 size_type csz = std::min(size_type(p->free()), sz);
1413 std::copy(ap, ap + csz, pp);
1414 written += csz;
1415 ap += csz;
1416 p->size() += csz;
1417 sz -= csz;
1418 assert(p->capacity() >= p->size());
1419 if (p->full()) {
1420 // if page is full, see if we're above the flush threshold of
1421 // 3/4 of our pages
1423 doFlush(false);
1424 }
1425 }
1426 } catch (Exception&) {
1427 m_flags |= wrerrbit;
1428 if (m_flags & exceptionsbit) throw;
1429 }
1430 return written;
1431}
1432
1434{
1435 // go through pipes, and change flags where we already know without really
1436 // polling - stuff where we don't need poll to wait for its timeout in the
1437 // OS...
1438 bool canskiptimeout = false;
1439 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1440 std::vector<unsigned>::iterator mit = masks.begin();
1441 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1442 ++it, ++mit) {
1443 PollEntry& pe = *it;
1444 pe.revents = None;
1445 // null pipe is invalid
1446 if (!pe.pipe) {
1447 pe.revents |= Invalid;
1448 canskiptimeout = true;
1449 continue;
1450 }
1451 // closed pipe is invalid
1452 if (pe.pipe->closed()) pe.revents |= Invalid;
1453 // check for error
1454 if (pe.pipe->bad()) pe.revents |= Error;
1455 // check for end of file
1456 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1457 // check if readable
1458 if (pe.events & Readable) {
1459 *mit |= Readable;
1460 if (pe.pipe->m_busylist) pe.revents |= Readable;
1461 }
1462 // check if writable
1463 if (pe.events & Writable) {
1464 *mit |= Writable;
1465 if (pe.pipe->m_freelist) {
1466 pe.revents |= Writable;
1467 } else {
1468 Page *dl = pe.pipe->m_dirtylist;
1469 while (dl && dl->next()) dl = dl->next();
1470 if (dl && dl->pos() < Page::capacity())
1471 pe.revents |= Writable;
1472 }
1473 }
1474 if (pe.revents) canskiptimeout = true;
1475 }
1476 // set up the data structures required for the poll syscall
1477 std::vector<pollfd> fds;
1478 fds.reserve(2 * pipes.size());
1479 std::map<int, PollEntry*> fds2pipes;
1480 for (PollVector::const_iterator it = pipes.begin();
1481 pipes.end() != it; ++it) {
1482 const PollEntry& pe = *it;
1483 struct pollfd tmp;
1484 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1485 const_cast<PollEntry*>(&pe)));
1486 tmp.events = tmp.revents = 0;
1487 // we always poll for readability; this allows us to queue pages
1488 // early
1489 tmp.events |= POLLIN;
1490 if (pe.pipe->m_outpipe != tmp.fd) {
1491 // ok, it's a pair of pipes
1492 fds.push_back(tmp);
1493 fds2pipes.insert(std::make_pair(
1494 unsigned(tmp.fd = pe.pipe->m_outpipe),
1495 const_cast<PollEntry*>(&pe)));
1496 tmp.events = 0;
1497
1498 }
1499 if (pe.events & Writable) tmp.events |= POLLOUT;
1500 fds.push_back(tmp);
1501 }
1502 // poll
1503 int retVal = 0;
1504 do {
1505 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1506 if (0 > retVal) {
1507 if (EINTR == errno) continue;
1508 throw Exception("poll", errno);
1509 }
1510 break;
1511 } while (true);
1512 // fds may have changed state, so update...
1513 for (std::vector<pollfd>::iterator it = fds.begin();
1514 fds.end() != it; ++it) {
1515 pollfd& fe = *it;
1516 //if (!fe.revents) continue;
1517 --retVal;
1518 PollEntry& pe = *fds2pipes[fe.fd];
1519oncemore:
1520 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1521 pe.revents |= ReadInvalid;
1522 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1523 pe.revents |= WriteInvalid;
1524 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1525 pe.revents |= ReadError;
1526 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1527 pe.revents |= WriteError;
1528 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1529 pe.revents |= ReadEndOfFile;
1530 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1531 pe.revents |= WriteEndOfFile;
1532 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1533 !(fe.revents & (POLLNVAL | POLLERR))) {
1534 // ok, there is at least one page for us to receive from the
1535 // other end
1536 if (0 == pe.pipe->recvpages()) continue;
1537 // more pages there?
1538 do {
1539 int tmp = ::poll(&fe, 1, 0);
1540 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1541 if (0 > tmp) {
1542 if (EINTR == errno) continue;
1543 throw Exception("poll", errno);
1544 }
1545 break;
1546 } while (true);
1547 }
1548 if (pe.pipe->m_busylist) pe.revents |= Readable;
1549 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1550 if (pe.pipe->m_freelist) {
1551 pe.revents |= Writable;
1552 } else {
1553 Page *dl = pe.pipe->m_dirtylist;
1554 while (dl && dl->next()) dl = dl->next();
1555 if (dl && dl->pos() < Page::capacity())
1556 pe.revents |= Writable;
1557 }
1558 }
1559 }
1560 // apply correct masks, and count pipes with pending events
1561 int npipes = 0;
1562 mit = masks.begin();
1563 for (PollVector::iterator it = pipes.begin();
1564 pipes.end() != it; ++it, ++mit)
1565 if ((it->revents &= *mit)) ++npipes;
1566 return npipes;
1567}
1568
1570{
1571 size_t sz = std::strlen(str);
1572 *this << sz;
1573 if (sz) write(str, sz);
1574 return *this;
1575}
1576
1578{
1579 size_t sz = 0;
1580 *this >> sz;
1581 if (good() && !eof()) {
1582 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1583 if (!str) throw Exception("realloc", errno);
1584 if (sz) read(str, sz);
1585 str[sz] = 0;
1586 }
1587 return *this;
1588}
1589
1591{
1592 size_t sz = str.size();
1593 *this << sz;
1594 write(str.data(), sz);
1595 return *this;
1596}
1597
1599{
1600 str.clear();
1601 size_t sz = 0;
1602 *this >> sz;
1603 if (good() && !eof()) {
1604 str.reserve(sz);
1605 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1606 }
1607 return *this;
1608}
1609
1611
1612#ifdef TEST_BIDIRMMAPPIPE
1613using namespace RooFit;
1614
1615int simplechild(BidirMMapPipe& pipe)
1616{
1617 // child does an echo loop
1618 while (pipe.good() && !pipe.eof()) {
1619 // read a string
1620 std::string str;
1621 pipe >> str;
1622 if (!pipe) return -1;
1623 if (pipe.eof()) break;
1624 if (!str.empty()) {
1625 std::cout << "[CHILD] : read: " << str << std::endl;
1626 str = "... early in the morning?";
1627 }
1628 pipe << str << BidirMMapPipe::flush;
1629 // did our parent tell us to shut down?
1630 if (str.empty()) break;
1631 if (!pipe) return -1;
1632 if (pipe.eof()) break;
1633 std::cout << "[CHILD] : wrote: " << str << std::endl;
1634 }
1635 pipe.close();
1636 return 0;
1637}
1638
1639#include <sstream>
1640int randomchild(BidirMMapPipe& pipe)
1641{
1642 // child sends out something at random intervals
1643 ::srand48(::getpid());
1644 {
1645 // wait for parent's go ahead signal
1646 std::string s;
1647 pipe >> s;
1648 }
1649 // no shutdown sequence needed on this side - we're producing the data,
1650 // and the parent can just read until we're done (when it'll get EOF)
1651 for (int i = 0; i < 5; ++i) {
1652 // sleep a random time between 0 and .9 seconds
1653 ::usleep(int(1e6 * ::drand48()));
1654 std::ostringstream buf;
1655 buf << "child pid " << ::getpid() << " sends message " << i;
1656 std::string str = buf.str();
1657 std::cout << "[CHILD] : " << str << std::endl;
1658 pipe << str << BidirMMapPipe::flush;
1659 if (!pipe) return -1;
1660 if (pipe.eof()) break;
1661 }
1662 // tell parent we're shutting down
1663 pipe << "" << BidirMMapPipe::flush;
1664 // wait for parent to acknowledge
1665 std::string s;
1666 pipe >> s;
1667 pipe.close();
1668 return 0;
1669}
1670
1671int benchchildrtt(BidirMMapPipe& pipe)
1672{
1673 // child does the equivalent of listening for pings and sending the
1674 // packet back
1675 char* str = 0;
1676 while (pipe && !pipe.eof()) {
1677 pipe >> str;
1678 if (!pipe) {
1679 std::free(str);
1680 pipe.close();
1681 return -1;
1682 }
1683 if (pipe.eof()) break;
1684 pipe << str << BidirMMapPipe::flush;
1685 // if we have just completed the shutdown handshake, we break here
1686 if (!std::strlen(str)) break;
1687 }
1688 std::free(str);
1689 pipe.close();
1690 return 0;
1691}
1692
1693int benchchildsink(BidirMMapPipe& pipe)
1694{
1695 // child behaves like a sink
1696 char* str = 0;
1697 while (pipe && !pipe.eof()) {
1698 pipe >> str;
1699 if (!std::strlen(str)) break;
1700 }
1701 pipe << "" << BidirMMapPipe::flush;
1702 std::free(str);
1703 pipe.close();
1704 return 0;
1705}
1706
1707int benchchildsource(BidirMMapPipe& pipe)
1708{
1709 // child behaves like a source
1710 char* str = 0;
1711 for (unsigned i = 0; i <= 24; ++i) {
1712 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1713 std::memset(str, '4', 1 << i);
1714 str[1 << i] = 0;
1715 for (unsigned j = 0; j < 1 << 7; ++j) {
1716 pipe << str;
1717 if (!pipe || pipe.eof()) {
1718 std::free(str);
1719 pipe.close();
1720 return -1;
1721 }
1722 }
1723 // tell parent we're done with this block size
1724 pipe << "" << BidirMMapPipe::flush;
1725 }
1726 // tell parent to shut down
1727 pipe << "" << BidirMMapPipe::flush;
1728 std::free(str);
1729 pipe.close();
1730 return 0;
1731}
1732
1733BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1734{
1735 // create a pipe with the given child at the remote end
1736 BidirMMapPipe *p = new BidirMMapPipe();
1737 if (p->isChild()) {
1738 int retVal = childexec(*p);
1739 delete p;
1740 std::exit(retVal);
1741 }
1742 return p;
1743}
1744
1745#include <sys/time.h>
1746#include <iomanip>
1747int main()
1748{
1749 // simple echo loop test
1750 {
1751 std::cout << "[PARENT]: simple challenge-response test, "
1752 "one child:" << std::endl;
1753 BidirMMapPipe* pipe = spawnChild(simplechild);
1754 for (int i = 0; i < 5; ++i) {
1755 std::string str("What shall we do with a drunken sailor...");
1756 *pipe << str << BidirMMapPipe::flush;
1757 if (!*pipe) return -1;
1758 std::cout << "[PARENT]: wrote: " << str << std::endl;
1759 *pipe >> str;
1760 if (!*pipe) return -1;
1761 std::cout << "[PARENT]: read: " << str << std::endl;
1762 }
1763 // send shutdown string
1764 *pipe << "" << BidirMMapPipe::flush;
1765 // wait for shutdown handshake
1766 std::string s;
1767 *pipe >> s;
1768 int retVal = pipe->close();
1769 std::cout << "[PARENT]: exit status of child: " << retVal <<
1770 std::endl;
1771 if (retVal) return retVal;
1772 delete pipe;
1773 }
1774 // simple poll test - children send 5 results in random intervals
1775 {
1776 unsigned nch = 20;
1777 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1778 " children:" << std::endl;
1779 typedef BidirMMapPipe::PollEntry PollEntry;
1780 // poll data structure
1782 pipes.reserve(nch);
1783 // spawn children
1784 for (unsigned i = 0; i < nch; ++i) {
1785 std::cout << "[PARENT]: spawning child " << i << std::endl;
1786 pipes.push_back(PollEntry(spawnChild(randomchild),
1788 }
1789 // wake children up
1790 std::cout << "[PARENT]: waking up children" << std::endl;
1791 for (unsigned i = 0; i < nch; ++i)
1792 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1793 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1794 // while at least some children alive
1795 while (!pipes.empty()) {
1796 // poll, wait until status change (infinite timeout)
1797 int npipes = BidirMMapPipe::poll(pipes, -1);
1798 // scan for pipes with changed status
1799 for (std::vector<PollEntry>::iterator it = pipes.begin();
1800 npipes && pipes.end() != it; ) {
1801 if (!it->revents) {
1802 // unchanged, next one
1803 ++it;
1804 continue;
1805 }
1806 --npipes; // maybe we can stop early...
1807 // read from pipes which are readable
1808 if (it->revents & BidirMMapPipe::Readable) {
1809 std::string s;
1810 *(it->pipe) >> s;
1811 if (!s.empty()) {
1812 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1813 ": " << s << std::endl;
1814 ++it;
1815 continue;
1816 } else {
1817 // child is shutting down...
1818 *(it->pipe) << "" << BidirMMapPipe::flush;
1819 goto childcloses;
1820 }
1821 }
1822 // retire pipes with error or end-of-file condition
1823 if (it->revents & (BidirMMapPipe::Error |
1826 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1827 " revents" <<
1828 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1829 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1830 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1831 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1832 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1833 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1834 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1835 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1836 std::endl;
1837childcloses:
1838 int retVal = it->pipe->close();
1839 std::cout << "[PARENT]: child exit status: " <<
1840 retVal << ", number of children still alive: " <<
1841 (pipes.size() - 1) << std::endl;
1842 if (retVal) return retVal;
1843 delete it->pipe;
1844 it = pipes.erase(it);
1845 continue;
1846 }
1847 }
1848 }
1849 }
1850 // little benchmark - round trip time
1851 {
1852 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1853 for (unsigned i = 0; i <= 24; ++i) {
1854 char *s = new char[1 + (1 << i)];
1855 std::memset(s, 'A', 1 << i);
1856 s[1 << i] = 0;
1857 const unsigned n = 1 << 7;
1858 double avg = 0., min = 1e42, max = -1e42;
1859 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1860 for (unsigned j = n; j--; ) {
1861 struct timeval t1;
1862 ::gettimeofday(&t1, 0);
1863 *pipe << s << BidirMMapPipe::flush;
1864 if (!*pipe || pipe->eof()) break;
1865 *pipe >> s;
1866 if (!*pipe || pipe->eof()) break;
1867 struct timeval t2;
1868 ::gettimeofday(&t2, 0);
1869 t2.tv_sec -= t1.tv_sec;
1870 t2.tv_usec -= t1.tv_usec;
1871 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1872 if (dt < min) min = dt;
1873 if (dt > max) max = dt;
1874 avg += dt;
1875 }
1876 // send a shutdown string
1877 *pipe << "" << BidirMMapPipe::flush;
1878 // get child's shutdown ok
1879 *pipe >> s;
1880 avg /= double(n);
1881 avg *= 1e6; min *= 1e6; max *= 1e6;
1882 int retVal = pipe->close();
1883 if (retVal) {
1884 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1885 delete[] s;
1886 return retVal;
1887 }
1888 delete pipe;
1889 // there is a factor 2 in the formula for the transfer rate below,
1890 // because we transfer data of twice the size of the block - once
1891 // to the child, and once for the return trip
1892 std::cout << "block size " << std::setw(9) << (1 << i) <<
1893 " avg " << std::setw(7) << avg << " us min " <<
1894 std::setw(7) << min << " us max " << std::setw(7) << max <<
1895 "us speed " << std::setw(9) <<
1896 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1897 " MB/s" << std::endl;
1898 delete[] s;
1899 }
1900 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1901 }
1902 // little benchmark - child as sink
1903 {
1904 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1905 for (unsigned i = 0; i <= 24; ++i) {
1906 char *s = new char[1 + (1 << i)];
1907 std::memset(s, 'A', 1 << i);
1908 s[1 << i] = 0;
1909 const unsigned n = 1 << 7;
1910 double avg = 0., min = 1e42, max = -1e42;
1911 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1912 for (unsigned j = n; j--; ) {
1913 struct timeval t1;
1914 ::gettimeofday(&t1, 0);
1915 // streaming mode - we do not flush here
1916 *pipe << s;
1917 if (!*pipe || pipe->eof()) break;
1918 struct timeval t2;
1919 ::gettimeofday(&t2, 0);
1920 t2.tv_sec -= t1.tv_sec;
1921 t2.tv_usec -= t1.tv_usec;
1922 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1923 if (dt < min) min = dt;
1924 if (dt > max) max = dt;
1925 avg += dt;
1926 }
1927 // send a shutdown string
1928 *pipe << "" << BidirMMapPipe::flush;
1929 // get child's shutdown ok
1930 *pipe >> s;
1931 avg /= double(n);
1932 avg *= 1e6; min *= 1e6; max *= 1e6;
1933 int retVal = pipe->close();
1934 if (retVal) {
1935 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1936 return retVal;
1937 }
1938 delete pipe;
1939 std::cout << "block size " << std::setw(9) << (1 << i) <<
1940 " avg " << std::setw(7) << avg << " us min " <<
1941 std::setw(7) << min << " us max " << std::setw(7) << max <<
1942 "us speed " << std::setw(9) <<
1943 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1944 " MB/s" << std::endl;
1945 delete[] s;
1946 }
1947 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1948 }
1949 // little benchmark - child as source
1950 {
1951 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1952 char *s = 0;
1953 double avg = 0., min = 1e42, max = -1e42;
1954 unsigned n = 0, bsz = 0;
1955 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1956 while (*pipe && !pipe->eof()) {
1957 struct timeval t1;
1958 ::gettimeofday(&t1, 0);
1959 // streaming mode - we do not flush here
1960 *pipe >> s;
1961 if (!*pipe || pipe->eof()) break;
1962 struct timeval t2;
1963 ::gettimeofday(&t2, 0);
1964 t2.tv_sec -= t1.tv_sec;
1965 t2.tv_usec -= t1.tv_usec;
1966 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1967 if (std::strlen(s)) {
1968 ++n;
1969 if (dt < min) min = dt;
1970 if (dt > max) max = dt;
1971 avg += dt;
1972 bsz = std::strlen(s);
1973 } else {
1974 if (!n) break;
1975 // next block size
1976 avg /= double(n);
1977 avg *= 1e6; min *= 1e6; max *= 1e6;
1978
1979 std::cout << "block size " << std::setw(9) << bsz <<
1980 " avg " << std::setw(7) << avg << " us min " <<
1981 std::setw(7) << min << " us max " << std::setw(7) <<
1982 max << "us speed " << std::setw(9) <<
1983 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1984 " MB/s" << std::endl;
1985 n = 0;
1986 avg = 0.;
1987 min = 1e42;
1988 max = -1e42;
1989 }
1990 }
1991 int retVal = pipe->close();
1992 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1993 if (retVal) return retVal;
1994 delete pipe;
1995 std::free(s);
1996 }
1997 return 0;
1998}
1999#endif // TEST_BIDIRMMAPPIPE
2000#endif // _WIN32
2001
2002// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
double
Definition: Converters.cxx:939
typedef void(GLAPIENTRYP _GLUfuncptr)(void)
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
#define e(i)
Definition: RSha256.hxx:103
char name[80]
Definition: TGX11.cxx:110
int main(int argc, char *argv[])
Definition: cef_main.cxx:54
#define realloc
Definition: civetweb.c:1538
#define free
Definition: civetweb.c:1539
#define malloc
Definition: civetweb.c:1536
for poll() interface
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
exception to throw if low-level OS calls go wrong
char m_buf[s_sz]
buffer containing the error message
static int dostrerror_r(int err, char *buf, std::size_t sz, int(*f)(int, char *, std::size_t))
for the POSIX version of strerror_r
virtual const char * what() const noexcept
return a destcription of what went wrong
BidirMMapPipeException(const char *msg, int err)
constructor taking error code, hint on operation (msg)
class representing a chunk of pages
Definition: BidirMMapPipe.h:43
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
unsigned len() const
return length of chunk
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:87
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:55
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:46
@ Copy
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:48
@ Unknown
don't know yet what'll work
Definition: BidirMMapPipe.h:47
@ DevZero
mmapping /dev/zero works
Definition: BidirMMapPipe.h:50
@ Anonymous
anonymous mmap works
Definition: BidirMMapPipe.h:51
@ FileBacked
mmapping a temp file works
Definition: BidirMMapPipe.h:49
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:56
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:89
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:85
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
class representing a page pool
Pages pop()
pop a free element out of the pool
static MMapVariety mmapVariety()
return variety of mmap supported on the system
BidirMMapPipeException Exception
convenience typedef
unsigned nPagesPerGroup() const
return number of pages per group (ie. as returned by pop())
void zap(Pages &p)
zap the pool (unmap all but Pages p)
PageChunk::MMapVariety MMapVariety
convenience typedef
void putOnFreeList(Chunk *chunk)
release a chunk
BidirMMapPipe_impl::PageChunk Chunk
a chunk of memory in the pool
@ minsz
minimum chunk size (just below 1 << minsz bytes)
@ szincr
size class increment (sz = 1 << (minsz + k * szincr))
@ maxsz
maximum chunk size (just below 1 << maxsz bytes)
ChunkList m_freelist
list of chunks used by the pool which are not full
ChunkList m_chunks
list of chunks used by the pool
unsigned m_nPgPerGrp
page group size
std::list< Chunk * > ChunkList
list of chunks
static unsigned pagesize()
return (logical) page size of the system
int m_cursz
current chunk size
unsigned m_szmap[(maxsz - minsz)/szincr]
chunk size map (histogram of chunk sizes)
int nextChunkSz() const
find size of next chunk to allocate (in a hopefully smart way)
void updateCurSz(int sz, int incr)
adjust _cursz to current largest block
PagePool(unsigned nPagesPerGroup)
constructor
void release(Chunk *chunk)
release a chunk
class representing the header structure in an mmapped page
bool full() const
true if page completely full
unsigned short m_pos
index of next byte in payload area
unsigned short & pos()
return reference to position field
short m_next
next page in list (in pagesizes)
unsigned char * end() const
return pointer to first byte in payload data area of page
Page & operator=(const Page &)=delete
assigment forbidden
bool filled() const
true if page partially filled
Page(const Page &)
copy construction forbidden
Page * next() const
return pointer to next page
unsigned short & size()
return reference to size field
unsigned short m_size
size of payload (in bytes)
unsigned char * begin() const
return pointer to first byte in payload data area of page
static unsigned capacity()
return the capacity of the page
void setNext(const Page *p)
set pointer to next page
unsigned free() const
free space left (to be written to)
unsigned size() const
return size (of payload data)
unsigned pos() const
return position
unsigned remaining() const
bytes remaining to be read
bool empty() const
true if page empty
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
@ wrerrbit
write error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
const Int_t n
Definition: legend1.C:16
namespace for implementation details of BidirMMapPipe
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
static constexpr double s
fill
Definition: fit1_py.py:6
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
auto * l
Definition: textangle.C:4
auto * t1
Definition: textangle.C:20
static unsigned long masks[]
Definition: gifencode.c:211