Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.cxx
Go to the documentation of this file.
1/** @file BidirMMapPipe.cxx
2 *
3 * implementation of BidirMMapPipe, a class which forks off a child process
4 * and serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9#ifndef _WIN32
10
11#include "BidirMMapPipe.h"
12
13#include <RooFit/Common.h>
14
15#include <map>
16#include <cerrno>
17#include <limits>
18#include <cstdlib>
19#include <cstring>
20#include <cassert>
21#include <iostream>
22#include <algorithm>
23#include <exception>
24
25#include <poll.h>
26#include <fcntl.h>
27#include <signal.h>
28#include <unistd.h>
29#include <sys/mman.h>
30#include <sys/stat.h>
31#include <sys/wait.h>
32#include <sys/socket.h>
33
34#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35#define END_NAMESPACE_ROOFIT }
36
38
39/// namespace for implementation details of BidirMMapPipe
41 /** @brief exception to throw if low-level OS calls go wrong
42 *
43 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44 * @date 2013-07-07
45 */
46 class BidirMMapPipeException : public std::exception
47 {
48 private:
49 enum {
50 s_sz = 256 ///< length of buffer
51 };
52 char m_buf[s_sz]; ///< buffer containing the error message
53
54 /// for the POSIX version of strerror_r
55 static int dostrerror_r(int err, char* buf, std::size_t sz,
56 int (*f)(int, char*, std::size_t))
57 { return f(err, buf, sz); }
58 /// for the GNU version of strerror_r
59 static int dostrerror_r(int, char*, std::size_t,
60 char* (*f)(int, char*, std::size_t));
61 public:
62 /// constructor taking error code, hint on operation (msg)
63 BidirMMapPipeException(const char* msg, int err);
64 /// return a destcription of what went wrong
65 virtual const char* what() const noexcept { return m_buf; }
66 };
67
69 {
70 std::size_t msgsz = std::strlen(msg);
71 if (msgsz) {
72 msgsz = std::min(msgsz, std::size_t(s_sz));
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76 }
77 if (msgsz < s_sz) {
78 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79 // have to sort it out with overloads
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81 }
82 m_buf[s_sz - 1] = 0; // enforce zero-termination
83 }
84
86 std::size_t sz, char* (*f)(int, char*, std::size_t))
87 {
88 buf[0] = 0;
89 char *tmp = f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
92 buf[sz - 1] = 0;
93 if (std::strlen(tmp) > sz - 1) return ERANGE;
94 }
95 return 0;
96 }
97
98 /** @brief class representing the header structure in an mmapped page
99 *
100 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101 * @date 2013-07-07
102 *
103 * contains a field to put pages into a linked list, a field for the size
104 * of the data being transmitted, and a field for the position until which
105 * the data has been read
106 */
107 class Page
108 {
109 private:
110 // use as small a data type as possible to maximise payload area
111 // of pages
112 short m_next; ///< next page in list (in pagesizes)
113 unsigned short m_size; ///< size of payload (in bytes)
114 unsigned short m_pos; ///< index of next byte in payload area
115 /// copy construction forbidden
116 Page(const Page&) {}
117 /// assigment forbidden
118 Page& operator=(const Page&) = delete;
119 public:
120 /// constructor
121 Page() : m_next(0), m_size(0), m_pos(0)
122 {
123 // check that short is big enough - must be done at runtime
124 // because the page size is not known until runtime
125 assert(std::numeric_limits<unsigned short>::max() >=
127 }
128 /// set pointer to next page
129 void setNext(const Page* p);
130 /// return pointer to next page
131 Page* next() const;
132 /// return reference to size field
133 unsigned short& size() { return m_size; }
134 /// return size (of payload data)
135 unsigned size() const { return m_size; }
136 /// return reference to position field
137 unsigned short& pos() { return m_pos; }
138 /// return position
139 unsigned pos() const { return m_pos; }
140 /// return pointer to first byte in payload data area of page
141 inline unsigned char* begin() const
142 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
143 + sizeof(Page); }
144 /// return pointer to first byte in payload data area of page
145 inline unsigned char* end() const
146 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148 /// return the capacity of the page
149 static unsigned capacity()
150 { return PageChunk::pagesize() - sizeof(Page); }
151 /// true if page empty
152 bool empty() const { return !m_size; }
153 /// true if page partially filled
154 bool filled() const { return !empty(); }
155 /// free space left (to be written to)
156 unsigned free() const { return capacity() - m_size; }
157 /// bytes remaining to be read
158 unsigned remaining() const { return m_size - m_pos; }
159 /// true if page completely full
160 bool full() const { return !free(); }
161 };
162
163 void Page::setNext(const Page* p)
164 {
165 if (!p) {
166 m_next = 0;
167 } else {
168 const char* p1 = reinterpret_cast<char*>(this);
169 const char* p2 = reinterpret_cast<const char*>(p);
170 std::ptrdiff_t tmp = p2 - p1;
171 // difference must be divisible by page size
172 assert(!(tmp % PageChunk::pagesize()));
173 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
174 m_next = tmp;
175 // no truncation when saving in a short
176 assert(m_next == tmp);
177 // final check: next() must return p
178 assert(next() == p);
179 }
180 }
181
183 {
184 if (!m_next) return 0;
185 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
186 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
187 return reinterpret_cast<Page*>(ptmp);
188 }
189
190 /** @brief class representing a page pool
191 *
192 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
193 * @date 2013-07-24
194 *
195 * pool of mmapped pages (on systems which support it, on all others, the
196 * functionality is emulated with dynamically allocated memory)
197 *
198 * in most operating systems there is a limit to how many mappings any one
199 * process is allowed to request; for this reason, we mmap a relatively
200 * large amount up front, and then carve off little pieces as we need them
201 *
202 * Moreover, some systems have too large a physical page size in their MMU
203 * for the code to handle (we want offsets and lengths to fit into 16
204 * bits), so we carve such big physical pages into smaller logical Pages
205 * if needed. The largest logical page size is currently 16 KiB.
206 */
207 class PagePool {
208 private:
209 /// convenience typedef
211
212 enum {
213 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
214 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
215 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
216 };
217 /// a chunk of memory in the pool
219 /// list of chunks
220 typedef std::list<Chunk*> ChunkList;
221
223 public:
224 /// convenience typedef
226 /// constructor
227 PagePool(unsigned nPagesPerGroup);
228 /// destructor
229 ~PagePool();
230 /// pop a free element out of the pool
231 Pages pop();
232
233 /// return (logical) page size of the system
234 static unsigned pagesize() { return PageChunk::pagesize(); }
235 /// return variety of mmap supported on the system
237 { return PageChunk::mmapVariety(); }
238
239 /// return number of pages per group (ie. as returned by pop())
240 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
241
242 /// zap the pool (unmap all but Pages p)
243 void zap(Pages& p);
244
245 private:
246 /// list of chunks used by the pool
248 /// list of chunks used by the pool which are not full
250 /// chunk size map (histogram of chunk sizes)
251 unsigned m_szmap[(maxsz - minsz) / szincr];
252 /// current chunk size
254 /// page group size
255 unsigned m_nPgPerGrp;
256
257 /// adjust _cursz to current largest block
258 void updateCurSz(int sz, int incr);
259 /// find size of next chunk to allocate (in a hopefully smart way)
260 int nextChunkSz() const;
261 /// release a chunk
262 void putOnFreeList(Chunk* chunk);
263 /// release a chunk
264 void release(Chunk* chunk);
265 };
266
267 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
268 m_pimpl(new impl)
269 {
270 assert(npg < 256);
271 m_pimpl->m_parent = parent;
272 m_pimpl->m_pages = pages;
273 m_pimpl->m_refcnt = 1;
274 m_pimpl->m_npages = npg;
275 /// initialise pages
276 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
277 }
278
280 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
282
284 {
285 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
286 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
287 delete m_pimpl;
288 }
289 }
290
291 Pages::Pages(const Pages& other) :
292 m_pimpl(other.m_pimpl)
293 { ++(m_pimpl->m_refcnt); }
294
296 {
297 if (&other == this) return *this;
298 if (!--(m_pimpl->m_refcnt)) {
299 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
300 delete m_pimpl;
301 }
302 m_pimpl = other.m_pimpl;
303 ++(m_pimpl->m_refcnt);
304 return *this;
305 }
306
307 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
308
309 Page* Pages::page(unsigned pgno) const
310 {
311 assert(pgno < m_pimpl->m_npages);
312 unsigned char* pptr =
313 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
314 pptr += pgno * pagesize();
315 return reinterpret_cast<Page*>(pptr);
316 }
317
318 unsigned Pages::pageno(Page* p) const
319 {
320 const unsigned char* pptr =
321 reinterpret_cast<const unsigned char*>(p);
322 const unsigned char* bptr =
323 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
324 assert(0 == ((pptr - bptr) % pagesize()));
325 const unsigned nr = (pptr - bptr) / pagesize();
326 assert(nr < m_pimpl->m_npages);
327 return nr;
328 }
329
331 {
332 // find out page size of system
333 long pgsz = sysconf(_SC_PAGESIZE);
334 if (-1 == pgsz) throw Exception("sysconf", errno);
335 if (pgsz > 512 && pgsz > long(sizeof(Page)))
336 return pgsz;
337
338 // in case of failure or implausible value, use a safe default: 4k
339 // page size, and do not try to mmap
341 return 1 << 12;
342 }
343
345 unsigned length, unsigned nPgPerGroup) :
346 m_begin(dommap(length)),
347 m_end(reinterpret_cast<void*>(
348 reinterpret_cast<unsigned char*>(m_begin) + length)),
349 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
350 {
351 // ok, push groups of pages onto freelist here
352 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
353 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
354 while (p < pend) {
355 m_freelist.push_back(reinterpret_cast<void*>(p));
356 p += nPgPerGroup * PagePool::pagesize();
357 }
358 }
359
361 {
362 if (m_parent) assert(empty());
363 if (m_begin) domunmap(m_begin, len());
364 }
365
366 bool PageChunk::contains(const Pages& p) const
367 { return p.m_pimpl->m_parent == this; }
368
370 {
371 assert(!m_freelist.empty());
372 void* p = m_freelist.front();
373 m_freelist.pop_front();
374 ++m_nUsedGrp;
375 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
376 }
377
378 void PageChunk::push(const Pages& p)
379 {
380 assert(contains(p));
381 bool wasempty = m_freelist.empty();
382 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
383 --m_nUsedGrp;
384 if (m_parent) {
385 // notify parent if we need to be put on the free list again
386 if (wasempty) m_parent->putOnFreeList(this);
387 // notify parent if we're empty
388 if (empty()) return m_parent->release(this);
389 }
390 }
391
392 void* PageChunk::dommap(unsigned len)
393 {
394 assert(len && 0 == (len % s_physpgsz));
395 // ok, the idea here is to try the different methods of mmapping, and
396 // choose the first one that works. we have four flavours:
397 // 1 - anonymous mmap (best)
398 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
399 // bit more tedious to set up, since you need to open/close a
400 // device file)
401 // 3 - mmap of a temporary file (very tedious to set up - need to
402 // create a temporary file, delete it, make the underlying storage
403 // large enough, then mmap the fd and close it)
404 // 4 - if all those fail, we malloc the buffers, and copy the data
405 // through the OS (then we're no better than normal pipes)
406 static bool msgprinted = false;
408#if defined(MAP_ANONYMOUS)
409#undef MYANONFLAG
410#define MYANONFLAG MAP_ANONYMOUS
411#elif defined(MAP_ANON)
412#undef MYANONFLAG
413#define MYANONFLAG MAP_ANON
414#else
415#undef MYANONFLAG
416#endif
417#ifdef MYANONFLAG
418 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419 MYANONFLAG | MAP_SHARED, -1, 0);
420 if (MAP_FAILED == retVal) {
421 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
422 } else {
423 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
425 if (BidirMMapPipe::debugflag() && !msgprinted) {
426 std::cerr << " INFO: In " << __func__ << " (" <<
427 __FILE__ << ", line " << __LINE__ <<
428 "): anonymous mmapping works, excellent!" <<
429 std::endl;
430 msgprinted = true;
431 }
432 return retVal;
433 }
434#endif
435#undef MYANONFLAG
436 }
437 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
438 // ok, no anonymous mappings supported directly, so try to map
439 // /dev/zero which has much the same effect on many systems
440 int fd = ::open("/dev/zero", O_RDWR);
441 if (-1 == fd)
442 throw Exception("open /dev/zero", errno);
443 void* retVal = ::mmap(0, len,
444 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445 if (MAP_FAILED == retVal) {
446 int errsv = errno;
447 ::close(fd);
448 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
449 } else {
450 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
452 }
453 if (-1 == ::close(fd))
454 throw Exception("close /dev/zero", errno);
455 if (BidirMMapPipe::debugflag() && !msgprinted) {
456 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
457 ", line " << __LINE__ << "): mmapping /dev/zero works, "
458 "very good!" << std::endl;
459 msgprinted = true;
460 }
461 return retVal;
462 }
464 std::string name = RooFit::tmpPath() + "BidirMMapPipe-XXXXXX";
465 int fd;
466 // open temp file
467 if (-1 == (fd = ::mkstemp(const_cast<char*>(name.c_str())))) throw Exception("mkstemp", errno);
468 // remove it, but keep fd open
469 if (-1 == ::unlink(name.c_str())) {
470 int errsv = errno;
471 ::close(fd);
472 throw Exception("unlink", errsv);
473 }
474 // make it the right size: lseek
475 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476 int errsv = errno;
477 ::close(fd);
478 throw Exception("lseek", errsv);
479 }
480 // make it the right size: write a byte
481 if (1 != ::write(fd, name.c_str(), 1)) {
482 int errsv = errno;
483 ::close(fd);
484 throw Exception("write", errsv);
485 }
486 // do mmap
487 void* retVal = ::mmap(0, len,
488 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489 if (MAP_FAILED == retVal) {
490 int errsv = errno;
491 ::close(fd);
492 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
493 } else {
494 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
496 }
497 if (-1 == ::close(fd)) {
498 int errsv = errno;
499 ::munmap(retVal, len);
500 throw Exception("close", errsv);
501 }
502 if (BidirMMapPipe::debugflag() && !msgprinted) {
503 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
504 ", line " << __LINE__ << "): mmapping temporary files "
505 "works, good!" << std::endl;
506 msgprinted = true;
507 }
508 return retVal;
509 }
510 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
511 // fallback solution: mmap does not work on this OS (or does not
512 // work for what we want to use it), so use a normal buffer of
513 // memory instead, and collect data in that buffer - this needs an
514 // additional write/read to/from the pipe(s), but there you go...
515 if (BidirMMapPipe::debugflag() && !msgprinted) {
516 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
517 ", line " << __LINE__ << "): anonymous mmapping of "
518 "shared buffers failed, falling back to read/write on "
519 " pipes!" << std::endl;
520 msgprinted = true;
521 }
523 void* retVal = std::malloc(len);
524 if (!retVal) throw Exception("malloc", errno);
525 return retVal;
526 }
527 // should never get here
528 assert(false);
529 return 0;
530 }
531
532 void PageChunk::domunmap(void* addr, unsigned len)
533 {
534 assert(len && 0 == (len % s_physpgsz));
535 if (addr) {
536 assert(Unknown != s_mmapworks);
537 if (Copy != s_mmapworks) {
538 if (-1 == ::munmap(addr, len))
539 throw Exception("munmap", errno);
540 } else {
541 std::free(addr);
542 }
543 }
544 }
545
547 {
548 // try to mprotect the other bits of the pool with no access...
549 // we'd really like a version of mremap here that can unmap all the
550 // other pages in the chunk, but that does not exist, so we protect
551 // the other pages in this chunk such that they may neither be read,
552 // written nor executed, only the pages we're interested in for
553 // communications stay readable and writable
554 //
555 // if an OS does not support changing the protection of a part of an
556 // mmapped area, the mprotect calls below should just fail and not
557 // change any protection, so we're a little less safe against
558 // corruption, but everything should still work
559 if (Copy != s_mmapworks) {
560 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
561 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
562 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
563 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
564 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
566 }
567 m_parent = 0;
568 m_freelist.clear();
569 m_nUsedGrp = 1;
570 p.m_pimpl->m_parent = 0;
571 m_begin = m_end = 0;
572 // commit suicide
573 delete this;
574 }
575
576 PagePool::PagePool(unsigned nPgPerGroup) :
577 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
578 {
579 // if logical and physical page size differ, we may have to adjust
580 // m_nPgPerGrp to make things fit
582 const unsigned mult =
584 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
585 // round up to to next physical page boundary
586 const unsigned actual = mult *
587 (desired / mult + bool(desired % mult));
588 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
590 std::cerr << " INFO: In " << __func__ << " (" <<
591 __FILE__ << ", line " << __LINE__ <<
592 "): physical page size " << PageChunk::physPgSz() <<
593 ", subdividing into logical pages of size " <<
594 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
595 m_nPgPerGrp << " -> " << newPgPerGrp <<
596 std::endl;
597 }
598 assert(newPgPerGrp >= m_nPgPerGrp);
599 m_nPgPerGrp = newPgPerGrp;
600 }
601 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
602 }
603
605 {
606 m_freelist.clear();
607 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
608 delete *it;
609 m_chunks.clear();
610 }
611
613 {
614 // unmap all pages but those pointed to by p
615 m_freelist.clear();
616 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617 if ((*it)->contains(p)) {
618 (*it)->zap(p);
619 } else {
620 delete *it;
621 }
622 }
623 m_chunks.clear();
624 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
625 m_cursz = minsz;
626 }
627
629 {
630 if (m_freelist.empty()) {
631 // allocate and register new chunk and put it on the freelist
632 const int sz = nextChunkSz();
633 Chunk *c = new Chunk(this,
635 m_chunks.push_front(c);
636 m_freelist.push_back(c);
637 updateCurSz(sz, +1);
638 }
639 // get free element from first chunk on _freelist
640 Chunk* c = m_freelist.front();
641 Pages p(c->pop());
642 // full chunks are removed from _freelist
643 if (c->full()) m_freelist.pop_front();
644 return p;
645 }
646
648 {
649 assert(chunk->empty());
650 // find chunk on freelist and remove
651 ChunkList::iterator it = std::find(
652 m_freelist.begin(), m_freelist.end(), chunk);
653 if (m_freelist.end() == it)
654 throw Exception("PagePool::release(PageChunk*)", EINVAL);
655 m_freelist.erase(it);
656 // find chunk in m_chunks and remove
657 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658 if (m_chunks.end() == it)
659 throw Exception("PagePool::release(PageChunk*)", EINVAL);
660 m_chunks.erase(it);
661 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
662 delete chunk;
663 updateCurSz(sz, -1);
664 }
665
667 {
668 assert(!chunk->full());
669 m_freelist.push_back(chunk);
670 }
671
672 void PagePool::updateCurSz(int sz, int incr)
673 {
674 m_szmap[(sz - minsz) / szincr] += incr;
675 m_cursz = minsz;
676 for (int i = (maxsz - minsz) / szincr; i--; ) {
677 if (m_szmap[i]) {
678 m_cursz += i * szincr;
679 break;
680 }
681 }
682 }
683
685 {
686 // no chunks with space available, figure out chunk size
687 int sz = m_cursz;
688 if (m_chunks.empty()) {
689 // if we start allocating chunks, we start from minsz
690 sz = minsz;
691 } else {
692 if (minsz >= sz) {
693 // minimal sized chunks are always grown
694 sz = minsz + szincr;
695 } else {
696 if (1 != m_chunks.size()) {
697 // if we have more than one completely filled chunk, grow
698 sz += szincr;
699 } else {
700 // just one chunk left, try shrinking chunk size
701 sz -= szincr;
702 }
703 }
704 }
705 // clamp size to allowed range
706 if (sz > maxsz) sz = maxsz;
707 if (sz < minsz) sz = minsz;
708 return sz;
709 }
710}
711
712// static BidirMMapPipe members
713pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
714std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
718
720{
721 if (!s_pagepool)
723 return *s_pagepool;
724}
725
727{
728 pthread_mutex_lock(&s_openpipesmutex);
729 while (!s_openpipes.empty()) {
730 BidirMMapPipe *p = s_openpipes.front();
731 pthread_mutex_unlock(&s_openpipesmutex);
732 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
733 p->doClose(true, true);
734 pthread_mutex_lock(&s_openpipesmutex);
735 }
736 pthread_mutex_unlock(&s_openpipesmutex);
737}
738
740 m_pages(pagepool().pop())
741{
742 // free pages again
744 if (!s_pagepoolrefcnt) {
745 delete s_pagepool;
746 s_pagepool = 0;
747 }
748}
749
750BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
751 m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
752 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
753 m_parentPid(::getpid())
754
755{
757 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
758 int fds[4] = { -1, -1, -1, -1 };
759 int myerrno;
760 static bool firstcall = true;
761 if (useExceptions) m_flags |= exceptionsbit;
762
763 try {
764 if (firstcall) {
765 firstcall = false;
766 // register a cleanup handler to make sure all BidirMMapPipes are torn
767 // down, and child processes are sent a SIGTERM
768 if (0 != atexit(BidirMMapPipe::teardownall))
769 throw Exception("atexit", errno);
770 }
771
772 // build free lists
773 for (unsigned i = 1; i < TotPages; ++i)
774 m_pages[i - 1]->setNext(m_pages[i]);
775 m_pages[PagesPerEnd - 1]->setNext(0);
776 if (!useSocketpair) {
777 // create pipes
778 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
779 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
780 } else {
781 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
782 throw Exception("socketpair", errno);
783 }
784 // fork the child
785 pthread_mutex_lock(&s_openpipesmutex);
786 char c;
787 switch ((m_childPid = ::fork())) {
788 case -1: // error in fork()
789 myerrno = errno;
790 pthread_mutex_unlock(&s_openpipesmutex);
791 m_childPid = 0;
792 throw Exception("fork", myerrno);
793 case 0: // child
794 // put the ends in the right place
795 if (-1 != fds[2]) {
796 // pair of pipes
797 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
798 myerrno = errno;
799 pthread_mutex_unlock(&s_openpipesmutex);
800 throw Exception("close", myerrno);
801 }
802 fds[0] = fds[3] = -1;
803 m_outpipe = fds[1];
804 m_inpipe = fds[2];
805 } else {
806 // socket pair
807 if (-1 == ::close(fds[0])) {
808 myerrno = errno;
809 pthread_mutex_unlock(&s_openpipesmutex);
810 throw Exception("close", myerrno);
811 }
812 fds[0] = -1;
813 m_inpipe = m_outpipe = fds[1];
814 }
815 // close other pipes our parent may have open - we have no business
816 // reading from/writing to those...
817 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
818 s_openpipes.end() != it; ) {
819 BidirMMapPipe* p = *it;
820 it = s_openpipes.erase(it);
821 p->doClose(true, true);
822 }
825 delete s_pagepool;
826 s_pagepool = 0;
827 s_openpipes.push_front(this);
828 pthread_mutex_unlock(&s_openpipesmutex);
829 // ok, put our pages on freelist
831 // handshare with other end (to make sure it's alive)...
832 c = 'C'; // ...hild
833 if (1 != xferraw(m_outpipe, &c, 1, ::write))
834 throw Exception("handshake: xferraw write", EPIPE);
835 if (1 != xferraw(m_inpipe, &c, 1, ::read))
836 throw Exception("handshake: xferraw read", EPIPE);
837 if ('P' != c) throw Exception("handshake", EPIPE);
838 break;
839 default: // parent
840 // put the ends in the right place
841 if (-1 != fds[2]) {
842 // pair of pipes
843 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
844 myerrno = errno;
845 pthread_mutex_unlock(&s_openpipesmutex);
846 throw Exception("close", myerrno);
847 }
848 fds[1] = fds[2] = -1;
849 m_outpipe = fds[3];
850 m_inpipe = fds[0];
851 } else {
852 // socketpair
853 if (-1 == ::close(fds[1])) {
854 myerrno = errno;
855 pthread_mutex_unlock(&s_openpipesmutex);
856 throw Exception("close", myerrno);
857 }
858 fds[1] = -1;
859 m_inpipe = m_outpipe = fds[0];
860 }
861 // put on list of open pipes (so we can kill child processes
862 // if things go wrong)
863 s_openpipes.push_front(this);
864 pthread_mutex_unlock(&s_openpipesmutex);
865 // ok, put our pages on freelist
866 m_freelist = m_pages[0u];
867 // handshare with other end (to make sure it's alive)...
868 c = 'P'; // ...arent
869 if (1 != xferraw(m_outpipe, &c, 1, ::write))
870 throw Exception("handshake: xferraw write", EPIPE);
871 if (1 != xferraw(m_inpipe, &c, 1, ::read))
872 throw Exception("handshake: xferraw read", EPIPE);
873 if ('C' != c) throw Exception("handshake", EPIPE);
874 break;
875 }
876 // mark file descriptors for close on exec (we do not want to leak the
877 // connection to anything we happen to exec)
878 int fdflags = 0;
879 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
880 throw Exception("fcntl", errno);
881 fdflags |= FD_CLOEXEC;
882 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
883 throw Exception("fcntl", errno);
884 if (m_inpipe != m_outpipe) {
885 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
886 throw Exception("fcntl", errno);
887 fdflags |= FD_CLOEXEC;
888 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
889 throw Exception("fcntl", errno);
890 }
891 // ok, finally, clear the failbit
892 m_flags &= ~failbit;
893 // all done
894 } catch (BidirMMapPipe::Exception&) {
895 if (0 != m_childPid) kill(m_childPid, SIGTERM);
896 for (int i = 0; i < 4; ++i)
897 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
898 {
899 // free resources associated with mmapped pages
901 }
902 if (!--s_pagepoolrefcnt) {
903 delete s_pagepool;
904 s_pagepool = 0;
905 }
906 throw;
907 }
908}
909
911{
912 assert(!(m_flags & failbit));
913 return doClose(false);
914}
915
916int BidirMMapPipe::doClose(bool force, bool holdlock)
917{
918 if (m_flags & failbit) return 0;
919 // flush data to be written
920 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
921 // shut down the write direction (no more writes from our side)
922 if (m_inpipe == m_outpipe) {
923 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
924 throw Exception("shutdown", errno);
925 m_outpipe = -1;
926 } else {
927 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
928 if (!force) throw Exception("close", errno);
929 m_outpipe = -1;
930 }
931 // shut down the write direction (no more writes from our side)
932 // drain anything the other end might still want to send
933 if (!force && -1 != m_inpipe) {
934 // **************** THIS IS EXTREMELY UGLY: ****************
935 // POLLHUP is not set reliably on pipe/socket shutdown on all
936 // platforms, unfortunately, so we poll for readability here until
937 // the other end closes, too
938 //
939 // the read loop below ensures that the other end sees the POLLIN that
940 // is set on shutdown instead, and goes ahead to close its end
941 //
942 // if we don't do this, and close straight away, the other end
943 // will catch a SIGPIPE or similar, and we don't want that
944 int err;
945 struct pollfd fds;
946 fds.fd = m_inpipe;
947 fds.events = POLLIN;
948 fds.revents = 0;
949 do {
950 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
951 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
952 if (fds.revents & POLLIN) {
953 char c;
954 if (1 > ::read(m_inpipe, &c, 1)) break;
955 }
956 }
957 } while (0 > err && EINTR == errno);
958 // ignore all other poll errors
959 }
960 // close read end
961 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
962 if (!force) throw Exception("close", errno);
963 m_inpipe = -1;
964 // unmap memory
965 try {
967 if (!--s_pagepoolrefcnt) {
968 delete s_pagepool;
969 s_pagepool = 0;
970 }
971 } catch (std::exception&) {
972 if (!force) throw;
973 }
975 // wait for child process
976 int retVal = 0;
977 if (isParent()) {
978 int tmp;
979 do {
980 tmp = waitpid(m_childPid, &retVal, 0);
981 } while (-1 == tmp && EINTR == errno);
982 if (-1 == tmp)
983 if (!force) throw Exception("waitpid", errno);
984 m_childPid = 0;
985 }
986 // remove from list of open pipes
987 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
988 std::list<BidirMMapPipe*>::iterator it = std::find(
989 s_openpipes.begin(), s_openpipes.end(), this);
990 if (s_openpipes.end() != it) s_openpipes.erase(it);
991 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
992 m_flags |= failbit;
993 return retVal;
994}
995
997{ doClose(false); }
998
1000 int fd, void* addr, size_type len,
1001 ssize_t (*xferfn)(int, void*, std::size_t))
1002{
1003 size_type xferred = 0;
1004 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1005 while (len) {
1006 ssize_t tmp = xferfn(fd, buf, len);
1007 if (tmp > 0) {
1008 xferred += tmp;
1009 len -= tmp;
1010 buf += tmp;
1011 continue;
1012 } else if (0 == tmp) {
1013 // check for end-of-file on pipe
1014 break;
1015 } else if (-1 == tmp) {
1016 // ok some error occurred, so figure out if we want to retry of throw
1017 switch (errno) {
1018 default:
1019 // if anything was transferred, return number of bytes
1020 // transferred so far, we can start throwing on the next
1021 // transfer...
1022 if (xferred) return xferred;
1023 // else throw
1024 throw Exception("xferraw", errno);
1025 case EAGAIN: // fallthrough intended
1026#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1027 case EWOULDBLOCK: // fallthrough intended
1028#endif
1029 std::cerr << " ERROR: In " << __func__ << " (" <<
1030 __FILE__ << ", line " << __LINE__ <<
1031 "): expect transfer to block!" << std::endl;
1032 case EINTR:
1033 break;
1034 }
1035 continue;
1036 } else {
1037 throw Exception("xferraw: unexpected return value from read/write",
1038 errno);
1039 }
1040 }
1041 return xferred;
1042}
1043
1045{
1046 if (plist) {
1047 unsigned char pg = m_pages[plist];
1048 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1051 // ok, have to copy pages through pipe
1052 for (Page* p = plist; p; p = p->next()) {
1053 if (sizeof(Page) + p->size() !=
1054 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1055 ::write)) {
1056 throw Exception("sendpages: short write", EPIPE);
1057 }
1058 }
1059 }
1060 } else {
1061 throw Exception("sendpages: short write", EPIPE);
1062 }
1063 } else { assert(plist); }
1064}
1065
1067{
1068 unsigned char pg;
1069 unsigned retVal = 0;
1070 Page *plisthead = 0, *plisttail = 0;
1071 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1072 plisthead = plisttail = m_pages[pg];
1073 // ok, have number of pages
1076 // ok, need to copy pages through pipe
1077 for (; plisttail; ++retVal) {
1078 Page* p = plisttail;
1079 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1080 ::read)) {
1081 plisttail = p->next();
1082 if (!p->size()) continue;
1083 // break in case of read error
1084 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1085 ::read)) break;
1086 }
1087 }
1088 } else {
1089 retVal = lenPageList(plisthead);
1090 }
1091 }
1092 // put list of pages we just received into correct lists (busy/free)
1093 if (plisthead) feedPageLists(plisthead);
1094 // ok, retVal contains the number of pages read, so put them on the
1095 // correct lists
1096 return retVal;
1097}
1098
1100{
1101 struct pollfd fds;
1102 fds.fd = m_inpipe;
1103 fds.events = POLLIN;
1104 fds.revents = 0;
1105 unsigned retVal = 0;
1106 do {
1107 int rc = ::poll(&fds, 1, 0);
1108 if (0 > rc) {
1109 if (EINTR == errno) continue;
1110 break;
1111 }
1112 if (1 == retVal && fds.revents & POLLIN &&
1113 !(fds.revents & (POLLNVAL | POLLERR))) {
1114 // ok, we can read without blocking, so the other end has
1115 // something for us
1116 return recvpages();
1117 } else {
1118 break;
1119 }
1120 } while (true);
1121 return retVal;
1122}
1123
1125{
1126 unsigned n = 0;
1127 for ( ; p; p = p->next()) ++n;
1128 return n;
1129}
1130
1132{
1133 assert(plist);
1134 // get end of busy list
1135 Page *blend = m_busylist;
1136 while (blend && blend->next()) blend = blend->next();
1137 // ok, might have to send free pages to other end, and (if we do have to
1138 // send something to the other end) while we're at it, send any dirty
1139 // pages which are completely full, too
1140 Page *sendlisthead = 0, *sendlisttail = 0;
1141 // loop over plist
1142 while (plist) {
1143 Page* p = plist;
1144 plist = p->next();
1145 p->setNext(0);
1146 if (p->size()) {
1147 // busy page...
1148 p->pos() = 0;
1149 // put at end of busy list
1150 if (blend) blend->setNext(p);
1151 else m_busylist = p;
1152 blend = p;
1153 } else {
1154 // free page...
1155 // Very simple algorithm: once we're done with a page, we send it back
1156 // where it came from. If it's from our end, we put it on the free list, if
1157 // it's from the other end, we send it back.
1158 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1159 (isChild() && m_pages[p] < PagesPerEnd)) {
1160 // page "belongs" to other end
1161 if (!sendlisthead) sendlisthead = p;
1162 if (sendlisttail) sendlisttail->setNext(p);
1163 sendlisttail = p;
1164 } else {
1165 // add page to freelist
1166 p->setNext(m_freelist);
1167 m_freelist = p;
1168 }
1169 }
1170 }
1171 // check if we have to send stuff to the other end
1172 if (sendlisthead) {
1173 // go through our list of dirty pages, and see what we can
1174 // send along
1175 Page* dp;
1176 while ((dp = m_dirtylist) && dp->full()) {
1177 Page* p = dp;
1178 // move head of dirty list
1179 m_dirtylist = p->next();
1180 // queue for sending
1181 p->setNext(0);
1182 sendlisttail->setNext(p);
1183 sendlisttail = p;
1184 }
1185 // poll if the other end is still alive - this needs that we first
1186 // close the write pipe of the other end when the remote end of the
1187 // connection is shutting down in doClose; we'll see that because we
1188 // get a POLLHUP on our inpipe
1189 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1190 struct pollfd fds[2];
1191 fds[0].fd = m_outpipe;
1192 fds[0].events = fds[0].revents = 0;
1193 if (m_outpipe != m_inpipe) {
1194 fds[1].fd = m_inpipe;
1195 fds[1].events = fds[1].revents = 0;
1196 } else {
1197 fds[0].events |= POLLIN;
1198 }
1199 int retVal = 0;
1200 do {
1201 retVal = ::poll(fds, nfds, 0);
1202 if (0 > retVal && EINTR == errno)
1203 continue;
1204 break;
1205 } while (true);
1206 if (0 <= retVal) {
1207 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208 if (m_outpipe != m_inpipe) {
1209 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1210 } else {
1211 if (ok && fds[0].revents & POLLIN) {
1212 unsigned ret = recvpages();
1213 if (!ret) ok = false;
1214 }
1215 }
1216
1217 if (ok) sendpages(sendlisthead);
1218 // (if the pipe is dead already, we don't care that we leak the
1219 // contents of the pages on the send list here, so that is why
1220 // there's no else clause here)
1221 } else {
1222 throw Exception("feedPageLists: poll", errno);
1223 }
1224 }
1225}
1226
1228{
1229 assert(p);
1230 assert(p == m_freelist);
1231 // remove from freelist
1232 m_freelist = p->next();
1233 p->setNext(0);
1234 // append to dirty list
1235 Page* dl = m_dirtylist;
1236 while (dl && dl->next()) dl = dl->next();
1237 if (dl) dl->setNext(p);
1238 else m_dirtylist = p;
1239}
1240
1242{
1243 // queue any pages available for reading we can without blocking
1245 Page* p;
1246 // if there are no busy pages, try to get them from the other end,
1247 // block if we have to...
1248 while (!(p = m_busylist)) if (!recvpages()) return 0;
1249 return p;
1250}
1251
1253{
1254 // queue any pages available for reading we can without blocking
1256 Page* p = m_dirtylist;
1257 // go to end of dirty list
1258 if (p) while (p->next()) p = p->next();
1259 if (!p || p->full()) {
1260 // need to append free page, so get one
1261 while (!(p = m_freelist)) if (!recvpages()) return 0;
1262 markPageDirty(p);
1263 }
1264 return p;
1265}
1266
1268{ return doFlush(true); }
1269
1270void BidirMMapPipe::doFlush(bool forcePartialPages)
1271{
1272 assert(!(m_flags & failbit));
1273 // build a list of pages to flush
1274 Page *flushlisthead = 0, *flushlisttail = 0;
1275 while (m_dirtylist) {
1276 Page* p = m_dirtylist;
1277 if (!forcePartialPages && !p->full()) break;
1278 // remove dirty page from dirty list
1279 m_dirtylist = p->next();
1280 p->setNext(0);
1281 // and send it to other end
1282 if (!flushlisthead) flushlisthead = p;
1283 if (flushlisttail) flushlisttail->setNext(p);
1284 flushlisttail = p;
1285 }
1286 if (flushlisthead) sendpages(flushlisthead);
1287}
1288
1290{
1291 assert(!(m_flags & failbit));
1292 // join busy and dirty lists
1293 {
1294 Page *l = m_busylist;
1295 while (l && l->next()) l = l->next();
1296 if (l) l->setNext(m_dirtylist);
1297 else m_busylist = m_dirtylist;
1298 }
1299 // empty busy and dirty pages
1300 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1301 // put them on the free list
1303 m_busylist = m_dirtylist = 0;
1304}
1305
1307{
1308 // queue all pages waiting for consumption in the pipe before we give an
1309 // answer
1311 size_type retVal = 0;
1312 for (Page* p = m_busylist; p; p = p->next())
1313 retVal += p->size() - p->pos();
1314 return retVal;
1315}
1316
1318{
1319 // queue all pages waiting for consumption in the pipe before we give an
1320 // answer
1322 // check if we could write to the pipe without blocking (we need to know
1323 // because we might need to check if flushing of dirty pages would block)
1324 bool couldwrite = false;
1325 {
1326 struct pollfd fds;
1327 fds.fd = m_outpipe;
1328 fds.events = POLLOUT;
1329 fds.revents = 0;
1330 int retVal = 0;
1331 do {
1332 retVal = ::poll(&fds, 1, 0);
1333 if (0 > retVal) {
1334 if (EINTR == errno) continue;
1335 throw Exception("bytesWritableNonBlocking: poll", errno);
1336 }
1337 if (1 == retVal && fds.revents & POLLOUT &&
1338 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1339 couldwrite = true;
1340 break;
1341 } while (true);
1342 }
1343 // ok, start counting bytes
1344 size_type retVal = 0;
1345 unsigned npages = 0;
1346 // go through the dirty list
1347 for (Page* p = m_dirtylist; p; p = p->next()) {
1348 ++npages;
1349 // if page only partially filled
1350 if (!p->full())
1351 retVal += p->free();
1352 if (npages >= FlushThresh && !couldwrite) break;
1353 }
1354 // go through the free list
1355 for (Page* p = m_freelist; p && (!m_dirtylist ||
1356 npages < FlushThresh || couldwrite); p = p->next()) {
1357 ++npages;
1358 retVal += Page::capacity();
1359 }
1360 return retVal;
1361}
1362
1364{
1365 assert(!(m_flags & failbit));
1366 size_type nread = 0;
1367 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1368 try {
1369 while (sz) {
1370 // find next page to read from
1371 Page* p = busypage();
1372 if (!p) {
1373 m_flags |= eofbit;
1374 return nread;
1375 }
1376 unsigned char* pp = p->begin() + p->pos();
1377 size_type csz = std::min(size_type(p->remaining()), sz);
1378 std::copy(pp, pp + csz, ap);
1379 nread += csz;
1380 ap += csz;
1381 sz -= csz;
1382 p->pos() += csz;
1383 assert(p->size() >= p->pos());
1384 if (p->size() == p->pos()) {
1385 // if no unread data remains, page is free
1386 m_busylist = p->next();
1387 p->setNext(0);
1388 p->size() = 0;
1389 feedPageLists(p);
1390 }
1391 }
1392 } catch (Exception&) {
1393 m_flags |= rderrbit;
1394 if (m_flags & exceptionsbit) throw;
1395 }
1396 return nread;
1397}
1398
1400{
1401 assert(!(m_flags & failbit));
1402 size_type written = 0;
1403 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1404 try {
1405 while (sz) {
1406 // find next page to write to
1407 Page* p = dirtypage();
1408 if (!p) {
1409 m_flags |= eofbit;
1410 return written;
1411 }
1412 unsigned char* pp = p->begin() + p->size();
1413 size_type csz = std::min(size_type(p->free()), sz);
1414 std::copy(ap, ap + csz, pp);
1415 written += csz;
1416 ap += csz;
1417 p->size() += csz;
1418 sz -= csz;
1419 assert(p->capacity() >= p->size());
1420 if (p->full()) {
1421 // if page is full, see if we're above the flush threshold of
1422 // 3/4 of our pages
1424 doFlush(false);
1425 }
1426 }
1427 } catch (Exception&) {
1428 m_flags |= wrerrbit;
1429 if (m_flags & exceptionsbit) throw;
1430 }
1431 return written;
1432}
1433
1435{
1436 // go through pipes, and change flags where we already know without really
1437 // polling - stuff where we don't need poll to wait for its timeout in the
1438 // OS...
1439 bool canskiptimeout = false;
1440 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1441 std::vector<unsigned>::iterator mit = masks.begin();
1442 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1443 ++it, ++mit) {
1444 PollEntry& pe = *it;
1445 pe.revents = None;
1446 // null pipe is invalid
1447 if (!pe.pipe) {
1448 pe.revents |= Invalid;
1449 canskiptimeout = true;
1450 continue;
1451 }
1452 // closed pipe is invalid
1453 if (pe.pipe->closed()) pe.revents |= Invalid;
1454 // check for error
1455 if (pe.pipe->bad()) pe.revents |= Error;
1456 // check for end of file
1457 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1458 // check if readable
1459 if (pe.events & Readable) {
1460 *mit |= Readable;
1461 if (pe.pipe->m_busylist) pe.revents |= Readable;
1462 }
1463 // check if writable
1464 if (pe.events & Writable) {
1465 *mit |= Writable;
1466 if (pe.pipe->m_freelist) {
1467 pe.revents |= Writable;
1468 } else {
1469 Page *dl = pe.pipe->m_dirtylist;
1470 while (dl && dl->next()) dl = dl->next();
1471 if (dl && dl->pos() < Page::capacity())
1472 pe.revents |= Writable;
1473 }
1474 }
1475 if (pe.revents) canskiptimeout = true;
1476 }
1477 // set up the data structures required for the poll syscall
1478 std::vector<pollfd> fds;
1479 fds.reserve(2 * pipes.size());
1480 std::map<int, PollEntry*> fds2pipes;
1481 for (PollVector::const_iterator it = pipes.begin();
1482 pipes.end() != it; ++it) {
1483 const PollEntry& pe = *it;
1484 struct pollfd tmp;
1485 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1486 const_cast<PollEntry*>(&pe)));
1487 tmp.events = tmp.revents = 0;
1488 // we always poll for readability; this allows us to queue pages
1489 // early
1490 tmp.events |= POLLIN;
1491 if (pe.pipe->m_outpipe != tmp.fd) {
1492 // ok, it's a pair of pipes
1493 fds.push_back(tmp);
1494 fds2pipes.insert(std::make_pair(
1495 unsigned(tmp.fd = pe.pipe->m_outpipe),
1496 const_cast<PollEntry*>(&pe)));
1497 tmp.events = 0;
1498
1499 }
1500 if (pe.events & Writable) tmp.events |= POLLOUT;
1501 fds.push_back(tmp);
1502 }
1503 // poll
1504 int retVal = 0;
1505 do {
1506 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1507 if (0 > retVal) {
1508 if (EINTR == errno) continue;
1509 throw Exception("poll", errno);
1510 }
1511 break;
1512 } while (true);
1513 // fds may have changed state, so update...
1514 for (std::vector<pollfd>::iterator it = fds.begin();
1515 fds.end() != it; ++it) {
1516 pollfd& fe = *it;
1517 //if (!fe.revents) continue;
1518 --retVal;
1519 PollEntry& pe = *fds2pipes[fe.fd];
1520oncemore:
1521 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1522 pe.revents |= ReadInvalid;
1523 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1524 pe.revents |= WriteInvalid;
1525 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1526 pe.revents |= ReadError;
1527 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1528 pe.revents |= WriteError;
1529 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1530 pe.revents |= ReadEndOfFile;
1531 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1532 pe.revents |= WriteEndOfFile;
1533 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1534 !(fe.revents & (POLLNVAL | POLLERR))) {
1535 // ok, there is at least one page for us to receive from the
1536 // other end
1537 if (0 == pe.pipe->recvpages()) continue;
1538 // more pages there?
1539 do {
1540 int tmp = ::poll(&fe, 1, 0);
1541 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1542 if (0 > tmp) {
1543 if (EINTR == errno) continue;
1544 throw Exception("poll", errno);
1545 }
1546 break;
1547 } while (true);
1548 }
1549 if (pe.pipe->m_busylist) pe.revents |= Readable;
1550 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1551 if (pe.pipe->m_freelist) {
1552 pe.revents |= Writable;
1553 } else {
1554 Page *dl = pe.pipe->m_dirtylist;
1555 while (dl && dl->next()) dl = dl->next();
1556 if (dl && dl->pos() < Page::capacity())
1557 pe.revents |= Writable;
1558 }
1559 }
1560 }
1561 // apply correct masks, and count pipes with pending events
1562 int npipes = 0;
1563 mit = masks.begin();
1564 for (PollVector::iterator it = pipes.begin();
1565 pipes.end() != it; ++it, ++mit)
1566 if ((it->revents &= *mit)) ++npipes;
1567 return npipes;
1568}
1569
1571{
1572 size_t sz = std::strlen(str);
1573 *this << sz;
1574 if (sz) write(str, sz);
1575 return *this;
1576}
1577
1579{
1580 size_t sz = 0;
1581 *this >> sz;
1582 if (good() && !eof()) {
1583 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1584 if (!str) throw Exception("realloc", errno);
1585 if (sz) read(str, sz);
1586 str[sz] = 0;
1587 }
1588 return *this;
1589}
1590
1592{
1593 size_t sz = str.size();
1594 *this << sz;
1595 write(str.data(), sz);
1596 return *this;
1597}
1598
1600{
1601 str.clear();
1602 size_t sz = 0;
1603 *this >> sz;
1604 if (good() && !eof()) {
1605 str.reserve(sz);
1606 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1607 }
1608 return *this;
1609}
1610
1612
1613#ifdef TEST_BIDIRMMAPPIPE
1614using namespace RooFit;
1615
1616int simplechild(BidirMMapPipe& pipe)
1617{
1618 // child does an echo loop
1619 while (pipe.good() && !pipe.eof()) {
1620 // read a string
1621 std::string str;
1622 pipe >> str;
1623 if (!pipe) return -1;
1624 if (pipe.eof()) break;
1625 if (!str.empty()) {
1626 std::cout << "[CHILD] : read: " << str << std::endl;
1627 str = "... early in the morning?";
1628 }
1629 pipe << str << BidirMMapPipe::flush;
1630 // did our parent tell us to shut down?
1631 if (str.empty()) break;
1632 if (!pipe) return -1;
1633 if (pipe.eof()) break;
1634 std::cout << "[CHILD] : wrote: " << str << std::endl;
1635 }
1636 pipe.close();
1637 return 0;
1638}
1639
1640#include <sstream>
1641int randomchild(BidirMMapPipe& pipe)
1642{
1643 // child sends out something at random intervals
1644 ::srand48(::getpid());
1645 {
1646 // wait for parent's go ahead signal
1647 std::string s;
1648 pipe >> s;
1649 }
1650 // no shutdown sequence needed on this side - we're producing the data,
1651 // and the parent can just read until we're done (when it'll get EOF)
1652 for (int i = 0; i < 5; ++i) {
1653 // sleep a random time between 0 and .9 seconds
1654 ::usleep(int(1e6 * ::drand48()));
1655 std::ostringstream buf;
1656 buf << "child pid " << ::getpid() << " sends message " << i;
1657 std::string str = buf.str();
1658 std::cout << "[CHILD] : " << str << std::endl;
1659 pipe << str << BidirMMapPipe::flush;
1660 if (!pipe) return -1;
1661 if (pipe.eof()) break;
1662 }
1663 // tell parent we're shutting down
1664 pipe << "" << BidirMMapPipe::flush;
1665 // wait for parent to acknowledge
1666 std::string s;
1667 pipe >> s;
1668 pipe.close();
1669 return 0;
1670}
1671
1672int benchchildrtt(BidirMMapPipe& pipe)
1673{
1674 // child does the equivalent of listening for pings and sending the
1675 // packet back
1676 char* str = 0;
1677 while (pipe && !pipe.eof()) {
1678 pipe >> str;
1679 if (!pipe) {
1680 std::free(str);
1681 pipe.close();
1682 return -1;
1683 }
1684 if (pipe.eof()) break;
1685 pipe << str << BidirMMapPipe::flush;
1686 // if we have just completed the shutdown handshake, we break here
1687 if (!std::strlen(str)) break;
1688 }
1689 std::free(str);
1690 pipe.close();
1691 return 0;
1692}
1693
1694int benchchildsink(BidirMMapPipe& pipe)
1695{
1696 // child behaves like a sink
1697 char* str = 0;
1698 while (pipe && !pipe.eof()) {
1699 pipe >> str;
1700 if (!std::strlen(str)) break;
1701 }
1702 pipe << "" << BidirMMapPipe::flush;
1703 std::free(str);
1704 pipe.close();
1705 return 0;
1706}
1707
1708int benchchildsource(BidirMMapPipe& pipe)
1709{
1710 // child behaves like a source
1711 char* str = 0;
1712 for (unsigned i = 0; i <= 24; ++i) {
1713 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1714 std::memset(str, '4', 1 << i);
1715 str[1 << i] = 0;
1716 for (unsigned j = 0; j < 1 << 7; ++j) {
1717 pipe << str;
1718 if (!pipe || pipe.eof()) {
1719 std::free(str);
1720 pipe.close();
1721 return -1;
1722 }
1723 }
1724 // tell parent we're done with this block size
1725 pipe << "" << BidirMMapPipe::flush;
1726 }
1727 // tell parent to shut down
1728 pipe << "" << BidirMMapPipe::flush;
1729 std::free(str);
1730 pipe.close();
1731 return 0;
1732}
1733
1734BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1735{
1736 // create a pipe with the given child at the remote end
1737 BidirMMapPipe *p = new BidirMMapPipe();
1738 if (p->isChild()) {
1739 int retVal = childexec(*p);
1740 delete p;
1741 std::exit(retVal);
1742 }
1743 return p;
1744}
1745
1746#include <sys/time.h>
1747#include <iomanip>
1748int main()
1749{
1750 // simple echo loop test
1751 {
1752 std::cout << "[PARENT]: simple challenge-response test, "
1753 "one child:" << std::endl;
1754 BidirMMapPipe* pipe = spawnChild(simplechild);
1755 for (int i = 0; i < 5; ++i) {
1756 std::string str("What shall we do with a drunken sailor...");
1757 *pipe << str << BidirMMapPipe::flush;
1758 if (!*pipe) return -1;
1759 std::cout << "[PARENT]: wrote: " << str << std::endl;
1760 *pipe >> str;
1761 if (!*pipe) return -1;
1762 std::cout << "[PARENT]: read: " << str << std::endl;
1763 }
1764 // send shutdown string
1765 *pipe << "" << BidirMMapPipe::flush;
1766 // wait for shutdown handshake
1767 std::string s;
1768 *pipe >> s;
1769 int retVal = pipe->close();
1770 std::cout << "[PARENT]: exit status of child: " << retVal <<
1771 std::endl;
1772 if (retVal) return retVal;
1773 delete pipe;
1774 }
1775 // simple poll test - children send 5 results in random intervals
1776 {
1777 unsigned nch = 20;
1778 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1779 " children:" << std::endl;
1780 typedef BidirMMapPipe::PollEntry PollEntry;
1781 // poll data structure
1783 pipes.reserve(nch);
1784 // spawn children
1785 for (unsigned i = 0; i < nch; ++i) {
1786 std::cout << "[PARENT]: spawning child " << i << std::endl;
1787 pipes.push_back(PollEntry(spawnChild(randomchild),
1789 }
1790 // wake children up
1791 std::cout << "[PARENT]: waking up children" << std::endl;
1792 for (unsigned i = 0; i < nch; ++i)
1793 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1794 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1795 // while at least some children alive
1796 while (!pipes.empty()) {
1797 // poll, wait until status change (infinite timeout)
1798 int npipes = BidirMMapPipe::poll(pipes, -1);
1799 // scan for pipes with changed status
1800 for (std::vector<PollEntry>::iterator it = pipes.begin();
1801 npipes && pipes.end() != it; ) {
1802 if (!it->revents) {
1803 // unchanged, next one
1804 ++it;
1805 continue;
1806 }
1807 --npipes; // maybe we can stop early...
1808 // read from pipes which are readable
1809 if (it->revents & BidirMMapPipe::Readable) {
1810 std::string s;
1811 *(it->pipe) >> s;
1812 if (!s.empty()) {
1813 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1814 ": " << s << std::endl;
1815 ++it;
1816 continue;
1817 } else {
1818 // child is shutting down...
1819 *(it->pipe) << "" << BidirMMapPipe::flush;
1820 goto childcloses;
1821 }
1822 }
1823 // retire pipes with error or end-of-file condition
1824 if (it->revents & (BidirMMapPipe::Error |
1827 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1828 " revents" <<
1829 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1830 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1831 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1832 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1833 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1834 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1835 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1836 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1837 std::endl;
1838childcloses:
1839 int retVal = it->pipe->close();
1840 std::cout << "[PARENT]: child exit status: " <<
1841 retVal << ", number of children still alive: " <<
1842 (pipes.size() - 1) << std::endl;
1843 if (retVal) return retVal;
1844 delete it->pipe;
1845 it = pipes.erase(it);
1846 continue;
1847 }
1848 }
1849 }
1850 }
1851 // little benchmark - round trip time
1852 {
1853 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1854 for (unsigned i = 0; i <= 24; ++i) {
1855 char *s = new char[1 + (1 << i)];
1856 std::memset(s, 'A', 1 << i);
1857 s[1 << i] = 0;
1858 const unsigned n = 1 << 7;
1859 double avg = 0., min = 1e42, max = -1e42;
1860 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1861 for (unsigned j = n; j--; ) {
1862 struct timeval t1;
1863 ::gettimeofday(&t1, 0);
1864 *pipe << s << BidirMMapPipe::flush;
1865 if (!*pipe || pipe->eof()) break;
1866 *pipe >> s;
1867 if (!*pipe || pipe->eof()) break;
1868 struct timeval t2;
1869 ::gettimeofday(&t2, 0);
1870 t2.tv_sec -= t1.tv_sec;
1871 t2.tv_usec -= t1.tv_usec;
1872 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1873 if (dt < min) min = dt;
1874 if (dt > max) max = dt;
1875 avg += dt;
1876 }
1877 // send a shutdown string
1878 *pipe << "" << BidirMMapPipe::flush;
1879 // get child's shutdown ok
1880 *pipe >> s;
1881 avg /= double(n);
1882 avg *= 1e6; min *= 1e6; max *= 1e6;
1883 int retVal = pipe->close();
1884 if (retVal) {
1885 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1886 delete[] s;
1887 return retVal;
1888 }
1889 delete pipe;
1890 // there is a factor 2 in the formula for the transfer rate below,
1891 // because we transfer data of twice the size of the block - once
1892 // to the child, and once for the return trip
1893 std::cout << "block size " << std::setw(9) << (1 << i) <<
1894 " avg " << std::setw(7) << avg << " us min " <<
1895 std::setw(7) << min << " us max " << std::setw(7) << max <<
1896 "us speed " << std::setw(9) <<
1897 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1898 " MB/s" << std::endl;
1899 delete[] s;
1900 }
1901 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1902 }
1903 // little benchmark - child as sink
1904 {
1905 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1906 for (unsigned i = 0; i <= 24; ++i) {
1907 char *s = new char[1 + (1 << i)];
1908 std::memset(s, 'A', 1 << i);
1909 s[1 << i] = 0;
1910 const unsigned n = 1 << 7;
1911 double avg = 0., min = 1e42, max = -1e42;
1912 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1913 for (unsigned j = n; j--; ) {
1914 struct timeval t1;
1915 ::gettimeofday(&t1, 0);
1916 // streaming mode - we do not flush here
1917 *pipe << s;
1918 if (!*pipe || pipe->eof()) break;
1919 struct timeval t2;
1920 ::gettimeofday(&t2, 0);
1921 t2.tv_sec -= t1.tv_sec;
1922 t2.tv_usec -= t1.tv_usec;
1923 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1924 if (dt < min) min = dt;
1925 if (dt > max) max = dt;
1926 avg += dt;
1927 }
1928 // send a shutdown string
1929 *pipe << "" << BidirMMapPipe::flush;
1930 // get child's shutdown ok
1931 *pipe >> s;
1932 avg /= double(n);
1933 avg *= 1e6; min *= 1e6; max *= 1e6;
1934 int retVal = pipe->close();
1935 if (retVal) {
1936 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1937 return retVal;
1938 }
1939 delete pipe;
1940 std::cout << "block size " << std::setw(9) << (1 << i) <<
1941 " avg " << std::setw(7) << avg << " us min " <<
1942 std::setw(7) << min << " us max " << std::setw(7) << max <<
1943 "us speed " << std::setw(9) <<
1944 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1945 " MB/s" << std::endl;
1946 delete[] s;
1947 }
1948 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1949 }
1950 // little benchmark - child as source
1951 {
1952 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1953 char *s = 0;
1954 double avg = 0., min = 1e42, max = -1e42;
1955 unsigned n = 0, bsz = 0;
1956 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1957 while (*pipe && !pipe->eof()) {
1958 struct timeval t1;
1959 ::gettimeofday(&t1, 0);
1960 // streaming mode - we do not flush here
1961 *pipe >> s;
1962 if (!*pipe || pipe->eof()) break;
1963 struct timeval t2;
1964 ::gettimeofday(&t2, 0);
1965 t2.tv_sec -= t1.tv_sec;
1966 t2.tv_usec -= t1.tv_usec;
1967 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1968 if (std::strlen(s)) {
1969 ++n;
1970 if (dt < min) min = dt;
1971 if (dt > max) max = dt;
1972 avg += dt;
1973 bsz = std::strlen(s);
1974 } else {
1975 if (!n) break;
1976 // next block size
1977 avg /= double(n);
1978 avg *= 1e6; min *= 1e6; max *= 1e6;
1979
1980 std::cout << "block size " << std::setw(9) << bsz <<
1981 " avg " << std::setw(7) << avg << " us min " <<
1982 std::setw(7) << min << " us max " << std::setw(7) <<
1983 max << "us speed " << std::setw(9) <<
1984 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1985 " MB/s" << std::endl;
1986 n = 0;
1987 avg = 0.;
1988 min = 1e42;
1989 max = -1e42;
1990 }
1991 }
1992 int retVal = pipe->close();
1993 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1994 if (retVal) return retVal;
1995 delete pipe;
1996 std::free(s);
1997 }
1998 return 0;
1999}
2000#endif // TEST_BIDIRMMAPPIPE
2001#endif // _WIN32
2002
2003// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
double
typedef void(GLAPIENTRYP _GLUfuncptr)(void)
int main()
Definition Prototype.cxx:12
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define e(i)
Definition RSha256.hxx:103
char name[80]
Definition TGX11.cxx:110
for poll() interface
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
exception to throw if low-level OS calls go wrong
char m_buf[s_sz]
buffer containing the error message
static int dostrerror_r(int err, char *buf, std::size_t sz, int(*f)(int, char *, std::size_t))
for the POSIX version of strerror_r
virtual const char * what() const noexcept
return a destcription of what went wrong
BidirMMapPipeException(const char *msg, int err)
constructor taking error code, hint on operation (msg)
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
class representing a page pool
Pages pop()
pop a free element out of the pool
static MMapVariety mmapVariety()
return variety of mmap supported on the system
BidirMMapPipeException Exception
convenience typedef
unsigned nPagesPerGroup() const
return number of pages per group (ie. as returned by pop())
void zap(Pages &p)
zap the pool (unmap all but Pages p)
PageChunk::MMapVariety MMapVariety
convenience typedef
void putOnFreeList(Chunk *chunk)
release a chunk
BidirMMapPipe_impl::PageChunk Chunk
a chunk of memory in the pool
@ minsz
minimum chunk size (just below 1 << minsz bytes)
@ szincr
size class increment (sz = 1 << (minsz + k * szincr))
@ maxsz
maximum chunk size (just below 1 << maxsz bytes)
ChunkList m_freelist
list of chunks used by the pool which are not full
ChunkList m_chunks
list of chunks used by the pool
unsigned m_nPgPerGrp
page group size
std::list< Chunk * > ChunkList
list of chunks
static unsigned pagesize()
return (logical) page size of the system
int m_cursz
current chunk size
unsigned m_szmap[(maxsz - minsz)/szincr]
chunk size map (histogram of chunk sizes)
int nextChunkSz() const
find size of next chunk to allocate (in a hopefully smart way)
void updateCurSz(int sz, int incr)
adjust _cursz to current largest block
PagePool(unsigned nPagesPerGroup)
constructor
void release(Chunk *chunk)
release a chunk
class representing the header structure in an mmapped page
bool full() const
true if page completely full
unsigned short m_pos
index of next byte in payload area
unsigned short & pos()
return reference to position field
short m_next
next page in list (in pagesizes)
unsigned char * end() const
return pointer to first byte in payload data area of page
Page & operator=(const Page &)=delete
assigment forbidden
bool filled() const
true if page partially filled
Page(const Page &)
copy construction forbidden
Page * next() const
return pointer to next page
unsigned short & size()
return reference to size field
unsigned short m_size
size of payload (in bytes)
unsigned char * begin() const
return pointer to first byte in payload data area of page
static unsigned capacity()
return the capacity of the page
void setNext(const Page *p)
set pointer to next page
unsigned free() const
free space left (to be written to)
unsigned size() const
return size (of payload data)
unsigned pos() const
return position
unsigned remaining() const
bytes remaining to be read
bool empty() const
true if page empty
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
@ wrerrbit
write error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
const Int_t n
Definition legend1.C:16
namespace for implementation details of BidirMMapPipe
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18
std::string const & tmpPath()
Returns the path to the directory that should be used for temporary RooFit files (e....
Definition Common.cxx:24
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
auto * l
Definition textangle.C:4
auto * t1
Definition textangle.C:20
static unsigned long masks[]
Definition gifencode.c:211