Logo ROOT   6.18/05
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1/** @file BidirMMapPipe.cxx
2 *
3 * implementation of BidirMMapPipe, a class which forks off a child process
4 * and serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9#ifndef _WIN32
10#include <map>
11#include <cerrno>
12#include <limits>
13#include <string>
14#include <cstdlib>
15#include <cstring>
16#include <iostream>
17#include <algorithm>
18#include <exception>
19
20#include <poll.h>
21#include <fcntl.h>
22#include <signal.h>
23#include <string.h>
24#include <unistd.h>
25#include <stdlib.h>
26#include <pthread.h>
27#include <sys/mman.h>
28#include <sys/stat.h>
29#include <sys/wait.h>
30#include <sys/socket.h>
31
32#include "BidirMMapPipe.h"
33
34#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35#define END_NAMESPACE_ROOFIT }
36
38
39/// namespace for implementation details of BidirMMapPipe
41 /** @brief exception to throw if low-level OS calls go wrong
42 *
43 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44 * @date 2013-07-07
45 */
46 class BidirMMapPipeException : public std::exception
47 {
48 private:
49 enum {
50 s_sz = 256 ///< length of buffer
51 };
52 char m_buf[s_sz]; ///< buffer containing the error message
53
54 /// for the POSIX version of strerror_r
55 static int dostrerror_r(int err, char* buf, std::size_t sz,
56 int (*f)(int, char*, std::size_t))
57 { return f(err, buf, sz); }
58 /// for the GNU version of strerror_r
59 static int dostrerror_r(int, char*, std::size_t,
60 char* (*f)(int, char*, std::size_t));
61 public:
62 /// constructor taking error code, hint on operation (msg)
63 BidirMMapPipeException(const char* msg, int err);
64 /// return a destcription of what went wrong
65 virtual const char* what() const noexcept { return m_buf; }
66 };
67
68 BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69 {
70 std::size_t msgsz = std::strlen(msg);
71 if (msgsz) {
72 msgsz = std::min(msgsz, std::size_t(s_sz));
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76 }
77 if (msgsz < s_sz) {
78 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79 // have to sort it out with overloads
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81 }
82 m_buf[s_sz - 1] = 0; // enforce zero-termination
83 }
84
85 int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86 std::size_t sz, char* (*f)(int, char*, std::size_t))
87 {
88 buf[0] = 0;
89 char *tmp = f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
92 buf[sz - 1] = 0;
93 if (std::strlen(tmp) > sz - 1) return ERANGE;
94 }
95 return 0;
96 }
97
98 /** @brief class representing the header structure in an mmapped page
99 *
100 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101 * @date 2013-07-07
102 *
103 * contains a field to put pages into a linked list, a field for the size
104 * of the data being transmitted, and a field for the position until which
105 * the data has been read
106 */
107 class Page
108 {
109 private:
110 // use as small a data type as possible to maximise payload area
111 // of pages
112 short m_next; ///< next page in list (in pagesizes)
113 unsigned short m_size; ///< size of payload (in bytes)
114 unsigned short m_pos; ///< index of next byte in payload area
115 /// copy construction forbidden
116 Page(const Page&) {}
117 /// assigment forbidden
118 Page& operator=(const Page&) = delete;
119 public:
120 /// constructor
121 Page() : m_next(0), m_size(0), m_pos(0)
122 {
123 // check that short is big enough - must be done at runtime
124 // because the page size is not known until runtime
125 assert(std::numeric_limits<unsigned short>::max() >=
127 }
128 /// set pointer to next page
129 void setNext(const Page* p);
130 /// return pointer to next page
131 Page* next() const;
132 /// return reference to size field
133 unsigned short& size() { return m_size; }
134 /// return size (of payload data)
135 unsigned size() const { return m_size; }
136 /// return reference to position field
137 unsigned short& pos() { return m_pos; }
138 /// return position
139 unsigned pos() const { return m_pos; }
140 /// return pointer to first byte in payload data area of page
141 inline unsigned char* begin() const
142 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
143 + sizeof(Page); }
144 /// return pointer to first byte in payload data area of page
145 inline unsigned char* end() const
146 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148 /// return the capacity of the page
149 static unsigned capacity()
150 { return PageChunk::pagesize() - sizeof(Page); }
151 /// true if page empty
152 bool empty() const { return !m_size; }
153 /// true if page partially filled
154 bool filled() const { return !empty(); }
155 /// free space left (to be written to)
156 unsigned free() const { return capacity() - m_size; }
157 /// bytes remaining to be read
158 unsigned remaining() const { return m_size - m_pos; }
159 /// true if page completely full
160 bool full() const { return !free(); }
161 };
162
163 void Page::setNext(const Page* p)
164 {
165 if (!p) {
166 m_next = 0;
167 } else {
168 const char* p1 = reinterpret_cast<char*>(this);
169 const char* p2 = reinterpret_cast<const char*>(p);
170 std::ptrdiff_t tmp = p2 - p1;
171 // difference must be divisible by page size
172 assert(!(tmp % PageChunk::pagesize()));
173 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
174 m_next = tmp;
175 // no truncation when saving in a short
176 assert(m_next == tmp);
177 // final check: next() must return p
178 assert(next() == p);
179 }
180 }
181
182 Page* Page::next() const
183 {
184 if (!m_next) return 0;
185 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
186 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
187 return reinterpret_cast<Page*>(ptmp);
188 }
189
190 /** @brief class representing a page pool
191 *
192 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
193 * @date 2013-07-24
194 *
195 * pool of mmapped pages (on systems which support it, on all others, the
196 * functionality is emulated with dynamically allocated memory)
197 *
198 * in most operating systems there is a limit to how many mappings any one
199 * process is allowed to request; for this reason, we mmap a relatively
200 * large amount up front, and then carve off little pieces as we need them
201 *
202 * Moreover, some systems have too large a physical page size in their MMU
203 * for the code to handle (we want offsets and lengths to fit into 16
204 * bits), so we carve such big physical pages into smaller logical Pages
205 * if needed. The largest logical page size is currently 16 KiB.
206 */
207 class PagePool {
208 private:
209 /// convenience typedef
210 typedef BidirMMapPipeException Exception;
211
212 enum {
213 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
214 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
215 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
216 };
217 /// a chunk of memory in the pool
218 typedef BidirMMapPipe_impl::PageChunk Chunk;
219 /// list of chunks
220 typedef std::list<Chunk*> ChunkList;
221
223 public:
224 /// convenience typedef
226 /// constructor
227 PagePool(unsigned nPagesPerGroup);
228 /// destructor
229 ~PagePool();
230 /// pop a free element out of the pool
231 Pages pop();
232
233 /// return (logical) page size of the system
234 static unsigned pagesize() { return PageChunk::pagesize(); }
235 /// return variety of mmap supported on the system
236 static MMapVariety mmapVariety()
237 { return PageChunk::mmapVariety(); }
238
239 /// return number of pages per group (ie. as returned by pop())
240 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
241
242 /// zap the pool (unmap all but Pages p)
243 void zap(Pages& p);
244
245 private:
246 /// list of chunks used by the pool
247 ChunkList m_chunks;
248 /// list of chunks used by the pool which are not full
249 ChunkList m_freelist;
250 /// chunk size map (histogram of chunk sizes)
251 unsigned m_szmap[(maxsz - minsz) / szincr];
252 /// current chunk size
253 int m_cursz;
254 /// page group size
255 unsigned m_nPgPerGrp;
256
257 /// adjust _cursz to current largest block
258 void updateCurSz(int sz, int incr);
259 /// find size of next chunk to allocate (in a hopefully smart way)
260 int nextChunkSz() const;
261 /// release a chunk
262 void putOnFreeList(Chunk* chunk);
263 /// release a chunk
264 void release(Chunk* chunk);
265 };
266
267 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
268 m_pimpl(new impl)
269 {
270 assert(npg < 256);
271 m_pimpl->m_parent = parent;
272 m_pimpl->m_pages = pages;
273 m_pimpl->m_refcnt = 1;
274 m_pimpl->m_npages = npg;
275 /// initialise pages
276 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
277 }
278
280 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
282
284 {
285 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
286 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
287 delete m_pimpl;
288 }
289 }
290
291 Pages::Pages(const Pages& other) :
292 m_pimpl(other.m_pimpl)
293 { ++(m_pimpl->m_refcnt); }
294
296 {
297 if (&other == this) return *this;
298 if (--(m_pimpl->m_refcnt)) {
299 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
300 delete m_pimpl;
301 }
302 m_pimpl = other.m_pimpl;
303 ++(m_pimpl->m_refcnt);
304 return *this;
305 }
306
307 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
308
309 Page* Pages::page(unsigned pgno) const
310 {
311 assert(pgno < m_pimpl->m_npages);
312 unsigned char* pptr =
313 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
314 pptr += pgno * pagesize();
315 return reinterpret_cast<Page*>(pptr);
316 }
317
318 unsigned Pages::pageno(Page* p) const
319 {
320 const unsigned char* pptr =
321 reinterpret_cast<const unsigned char*>(p);
322 const unsigned char* bptr =
323 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
324 assert(0 == ((pptr - bptr) % pagesize()));
325 const unsigned nr = (pptr - bptr) / pagesize();
326 assert(nr < m_pimpl->m_npages);
327 return nr;
328 }
329
331 {
332 // find out page size of system
333 long pgsz = sysconf(_SC_PAGESIZE);
334 if (-1 == pgsz) throw Exception("sysconf", errno);
335 if (pgsz > 512 && pgsz > long(sizeof(Page)))
336 return pgsz;
337
338 // in case of failure or implausible value, use a safe default: 4k
339 // page size, and do not try to mmap
341 return 1 << 12;
342 }
343
344 PageChunk::PageChunk(PagePool* parent,
345 unsigned length, unsigned nPgPerGroup) :
346 m_begin(dommap(length)),
347 m_end(reinterpret_cast<void*>(
348 reinterpret_cast<unsigned char*>(m_begin) + length)),
349 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
350 {
351 // ok, push groups of pages onto freelist here
352 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
353 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
354 while (p < pend) {
355 m_freelist.push_back(reinterpret_cast<void*>(p));
356 p += nPgPerGroup * PagePool::pagesize();
357 }
358 }
359
361 {
362 if (m_parent) assert(empty());
363 if (m_begin) domunmap(m_begin, len());
364 }
365
366 bool PageChunk::contains(const Pages& p) const
367 { return p.m_pimpl->m_parent == this; }
368
370 {
371 assert(!m_freelist.empty());
372 void* p = m_freelist.front();
373 m_freelist.pop_front();
374 ++m_nUsedGrp;
375 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
376 }
377
378 void PageChunk::push(const Pages& p)
379 {
380 assert(contains(p));
381 bool wasempty = m_freelist.empty();
382 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
383 --m_nUsedGrp;
384 if (m_parent) {
385 // notify parent if we need to be put on the free list again
386 if (wasempty) m_parent->putOnFreeList(this);
387 // notify parent if we're empty
388 if (empty()) return m_parent->release(this);
389 }
390 }
391
392 void* PageChunk::dommap(unsigned len)
393 {
394 assert(len && 0 == (len % s_physpgsz));
395 // ok, the idea here is to try the different methods of mmapping, and
396 // choose the first one that works. we have four flavours:
397 // 1 - anonymous mmap (best)
398 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
399 // bit more tedious to set up, since you need to open/close a
400 // device file)
401 // 3 - mmap of a temporary file (very tedious to set up - need to
402 // create a temporary file, delete it, make the underlying storage
403 // large enough, then mmap the fd and close it)
404 // 4 - if all those fail, we malloc the buffers, and copy the data
405 // through the OS (then we're no better than normal pipes)
406 static bool msgprinted = false;
408#if defined(MAP_ANONYMOUS)
409#undef MYANONFLAG
410#define MYANONFLAG MAP_ANONYMOUS
411#elif defined(MAP_ANON)
412#undef MYANONFLAG
413#define MYANONFLAG MAP_ANON
414#else
415#undef MYANONFLAG
416#endif
417#ifdef MYANONFLAG
418 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419 MYANONFLAG | MAP_SHARED, -1, 0);
420 if (MAP_FAILED == retVal) {
421 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
422 } else {
423 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
425 if (BidirMMapPipe::debugflag() && !msgprinted) {
426 std::cerr << " INFO: In " << __func__ << " (" <<
427 __FILE__ << ", line " << __LINE__ <<
428 "): anonymous mmapping works, excellent!" <<
429 std::endl;
430 msgprinted = true;
431 }
432 return retVal;
433 }
434#endif
435#undef MYANONFLAG
436 }
437 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
438 // ok, no anonymous mappings supported directly, so try to map
439 // /dev/zero which has much the same effect on many systems
440 int fd = ::open("/dev/zero", O_RDWR);
441 if (-1 == fd)
442 throw Exception("open /dev/zero", errno);
443 void* retVal = ::mmap(0, len,
444 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445 if (MAP_FAILED == retVal) {
446 int errsv = errno;
447 ::close(fd);
448 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
449 } else {
450 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
452 }
453 if (-1 == ::close(fd))
454 throw Exception("close /dev/zero", errno);
455 if (BidirMMapPipe::debugflag() && !msgprinted) {
456 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
457 ", line " << __LINE__ << "): mmapping /dev/zero works, "
458 "very good!" << std::endl;
459 msgprinted = true;
460 }
461 return retVal;
462 }
464 char name[] = "/tmp/BidirMMapPipe-XXXXXX";
465 int fd;
466 // open temp file
467 if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
468 // remove it, but keep fd open
469 if (-1 == ::unlink(name)) {
470 int errsv = errno;
471 ::close(fd);
472 throw Exception("unlink", errsv);
473 }
474 // make it the right size: lseek
475 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476 int errsv = errno;
477 ::close(fd);
478 throw Exception("lseek", errsv);
479 }
480 // make it the right size: write a byte
481 if (1 != ::write(fd, name, 1)) {
482 int errsv = errno;
483 ::close(fd);
484 throw Exception("write", errsv);
485 }
486 // do mmap
487 void* retVal = ::mmap(0, len,
488 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489 if (MAP_FAILED == retVal) {
490 int errsv = errno;
491 ::close(fd);
492 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
493 } else {
494 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
496 }
497 if (-1 == ::close(fd)) {
498 int errsv = errno;
499 ::munmap(retVal, len);
500 throw Exception("close", errsv);
501 }
502 if (BidirMMapPipe::debugflag() && !msgprinted) {
503 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
504 ", line " << __LINE__ << "): mmapping temporary files "
505 "works, good!" << std::endl;
506 msgprinted = true;
507 }
508 return retVal;
509 }
510 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
511 // fallback solution: mmap does not work on this OS (or does not
512 // work for what we want to use it), so use a normal buffer of
513 // memory instead, and collect data in that buffer - this needs an
514 // additional write/read to/from the pipe(s), but there you go...
515 if (BidirMMapPipe::debugflag() && !msgprinted) {
516 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
517 ", line " << __LINE__ << "): anonymous mmapping of "
518 "shared buffers failed, falling back to read/write on "
519 " pipes!" << std::endl;
520 msgprinted = true;
521 }
523 void* retVal = std::malloc(len);
524 if (!retVal) throw Exception("malloc", errno);
525 return retVal;
526 }
527 // should never get here
528 assert(false);
529 return 0;
530 }
531
532 void PageChunk::domunmap(void* addr, unsigned len)
533 {
534 assert(len && 0 == (len % s_physpgsz));
535 if (addr) {
536 assert(Unknown != s_mmapworks);
537 if (Copy != s_mmapworks) {
538 if (-1 == ::munmap(addr, len))
539 throw Exception("munmap", errno);
540 } else {
541 std::free(addr);
542 }
543 }
544 }
545
547 {
548 // try to mprotect the other bits of the pool with no access...
549 // we'd really like a version of mremap here that can unmap all the
550 // other pages in the chunk, but that does not exist, so we protect
551 // the other pages in this chunk such that they may neither be read,
552 // written nor executed, only the pages we're interested in for
553 // communications stay readable and writable
554 //
555 // if an OS does not support changing the protection of a part of an
556 // mmapped area, the mprotect calls below should just fail and not
557 // change any protection, so we're a little less safe against
558 // corruption, but everything should still work
559 if (Copy != s_mmapworks) {
560 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
561 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
562 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
563 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
564 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
566 }
567 m_parent = 0;
568 m_freelist.clear();
569 m_nUsedGrp = 1;
570 p.m_pimpl->m_parent = 0;
571 m_begin = m_end = 0;
572 // commit suicide
573 delete this;
574 }
575
576 PagePool::PagePool(unsigned nPgPerGroup) :
577 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
578 {
579 // if logical and physical page size differ, we may have to adjust
580 // m_nPgPerGrp to make things fit
582 const unsigned mult =
584 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
585 // round up to to next physical page boundary
586 const unsigned actual = mult *
587 (desired / mult + bool(desired % mult));
588 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
590 std::cerr << " INFO: In " << __func__ << " (" <<
591 __FILE__ << ", line " << __LINE__ <<
592 "): physical page size " << PageChunk::physPgSz() <<
593 ", subdividing into logical pages of size " <<
594 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
595 m_nPgPerGrp << " -> " << newPgPerGrp <<
596 std::endl;
597 }
598 assert(newPgPerGrp >= m_nPgPerGrp);
599 m_nPgPerGrp = newPgPerGrp;
600 }
601 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
602 }
603
604 PagePool::~PagePool()
605 {
606 m_freelist.clear();
607 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
608 delete *it;
609 m_chunks.clear();
610 }
611
612 void PagePool::zap(Pages& p)
613 {
614 // unmap all pages but those pointed to by p
615 m_freelist.clear();
616 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617 if ((*it)->contains(p)) {
618 (*it)->zap(p);
619 } else {
620 delete *it;
621 }
622 }
623 m_chunks.clear();
624 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
625 m_cursz = minsz;
626 }
627
628 Pages PagePool::pop()
629 {
630 if (m_freelist.empty()) {
631 // allocate and register new chunk and put it on the freelist
632 const int sz = nextChunkSz();
633 Chunk *c = new Chunk(this,
635 m_chunks.push_front(c);
636 m_freelist.push_back(c);
637 updateCurSz(sz, +1);
638 }
639 // get free element from first chunk on _freelist
640 Chunk* c = m_freelist.front();
641 Pages p(c->pop());
642 // full chunks are removed from _freelist
643 if (c->full()) m_freelist.pop_front();
644 return p;
645 }
646
647 void PagePool::release(PageChunk* chunk)
648 {
649 assert(chunk->empty());
650 // find chunk on freelist and remove
651 ChunkList::iterator it = std::find(
652 m_freelist.begin(), m_freelist.end(), chunk);
653 if (m_freelist.end() == it)
654 throw Exception("PagePool::release(PageChunk*)", EINVAL);
655 m_freelist.erase(it);
656 // find chunk in m_chunks and remove
657 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658 if (m_chunks.end() == it)
659 throw Exception("PagePool::release(PageChunk*)", EINVAL);
660 m_chunks.erase(it);
661 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
662 delete chunk;
663 updateCurSz(sz, -1);
664 }
665
666 void PagePool::putOnFreeList(PageChunk* chunk)
667 {
668 assert(!chunk->full());
669 m_freelist.push_back(chunk);
670 }
671
672 void PagePool::updateCurSz(int sz, int incr)
673 {
674 m_szmap[(sz - minsz) / szincr] += incr;
675 m_cursz = minsz;
676 for (int i = (maxsz - minsz) / szincr; i--; ) {
677 if (m_szmap[i]) {
678 m_cursz += i * szincr;
679 break;
680 }
681 }
682 }
683
684 int PagePool::nextChunkSz() const
685 {
686 // no chunks with space available, figure out chunk size
687 int sz = m_cursz;
688 if (m_chunks.empty()) {
689 // if we start allocating chunks, we start from minsz
690 sz = minsz;
691 } else {
692 if (minsz >= sz) {
693 // minimal sized chunks are always grown
694 sz = minsz + szincr;
695 } else {
696 if (1 != m_chunks.size()) {
697 // if we have more than one completely filled chunk, grow
698 sz += szincr;
699 } else {
700 // just one chunk left, try shrinking chunk size
701 sz -= szincr;
702 }
703 }
704 }
705 // clamp size to allowed range
706 if (sz > maxsz) sz = maxsz;
707 if (sz < minsz) sz = minsz;
708 return sz;
709 }
710}
711
712// static BidirMMapPipe members
713pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
714std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
715BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
718
719BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
720{
721 if (!s_pagepool)
722 s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
723 return *s_pagepool;
724}
725
727{
728 pthread_mutex_lock(&s_openpipesmutex);
729 while (!s_openpipes.empty()) {
730 BidirMMapPipe *p = s_openpipes.front();
731 pthread_mutex_unlock(&s_openpipesmutex);
732 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
733 p->doClose(true, true);
734 pthread_mutex_lock(&s_openpipesmutex);
735 }
736 pthread_mutex_unlock(&s_openpipesmutex);
737}
738
740 m_pages(pagepool().pop())
741{
742 // free pages again
744 if (!s_pagepoolrefcnt) {
745 delete s_pagepool;
746 s_pagepool = 0;
747 }
748}
749
750BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
751 m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
752 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
753 m_parentPid(::getpid())
754
755{
757 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
758 int fds[4] = { -1, -1, -1, -1 };
759 int myerrno;
760 static bool firstcall = true;
761 if (useExceptions) m_flags |= exceptionsbit;
762
763 try {
764 if (firstcall) {
765 firstcall = false;
766 // register a cleanup handler to make sure all BidirMMapPipes are torn
767 // down, and child processes are sent a SIGTERM
768 if (0 != atexit(BidirMMapPipe::teardownall))
769 throw Exception("atexit", errno);
770 }
771
772 // build free lists
773 for (unsigned i = 1; i < TotPages; ++i)
774 m_pages[i - 1]->setNext(m_pages[i]);
775 m_pages[PagesPerEnd - 1]->setNext(0);
776 if (!useSocketpair) {
777 // create pipes
778 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
779 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
780 } else {
781 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
782 throw Exception("socketpair", errno);
783 }
784 // fork the child
785 pthread_mutex_lock(&s_openpipesmutex);
786 char c;
787 switch ((m_childPid = ::fork())) {
788 case -1: // error in fork()
789 myerrno = errno;
790 pthread_mutex_unlock(&s_openpipesmutex);
791 m_childPid = 0;
792 throw Exception("fork", myerrno);
793 case 0: // child
794 // put the ends in the right place
795 if (-1 != fds[2]) {
796 // pair of pipes
797 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
798 myerrno = errno;
799 pthread_mutex_unlock(&s_openpipesmutex);
800 throw Exception("close", myerrno);
801 }
802 fds[0] = fds[3] = -1;
803 m_outpipe = fds[1];
804 m_inpipe = fds[2];
805 } else {
806 // socket pair
807 if (-1 == ::close(fds[0])) {
808 myerrno = errno;
809 pthread_mutex_unlock(&s_openpipesmutex);
810 throw Exception("close", myerrno);
811 }
812 fds[0] = -1;
813 m_inpipe = m_outpipe = fds[1];
814 }
815 // close other pipes our parent may have open - we have no business
816 // reading from/writing to those...
817 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
818 s_openpipes.end() != it; ) {
819 BidirMMapPipe* p = *it;
820 it = s_openpipes.erase(it);
821 p->doClose(true, true);
822 }
823 pagepool().zap(m_pages);
825 delete s_pagepool;
826 s_pagepool = 0;
827 s_openpipes.push_front(this);
828 pthread_mutex_unlock(&s_openpipesmutex);
829 // ok, put our pages on freelist
831 // handshare with other end (to make sure it's alive)...
832 c = 'C'; // ...hild
833 if (1 != xferraw(m_outpipe, &c, 1, ::write))
834 throw Exception("handshake: xferraw write", EPIPE);
835 if (1 != xferraw(m_inpipe, &c, 1, ::read))
836 throw Exception("handshake: xferraw read", EPIPE);
837 if ('P' != c) throw Exception("handshake", EPIPE);
838 break;
839 default: // parent
840 // put the ends in the right place
841 if (-1 != fds[2]) {
842 // pair of pipes
843 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
844 myerrno = errno;
845 pthread_mutex_unlock(&s_openpipesmutex);
846 throw Exception("close", myerrno);
847 }
848 fds[1] = fds[2] = -1;
849 m_outpipe = fds[3];
850 m_inpipe = fds[0];
851 } else {
852 // socketpair
853 if (-1 == ::close(fds[1])) {
854 myerrno = errno;
855 pthread_mutex_unlock(&s_openpipesmutex);
856 throw Exception("close", myerrno);
857 }
858 fds[1] = -1;
859 m_inpipe = m_outpipe = fds[0];
860 }
861 // put on list of open pipes (so we can kill child processes
862 // if things go wrong)
863 s_openpipes.push_front(this);
864 pthread_mutex_unlock(&s_openpipesmutex);
865 // ok, put our pages on freelist
866 m_freelist = m_pages[0u];
867 // handshare with other end (to make sure it's alive)...
868 c = 'P'; // ...arent
869 if (1 != xferraw(m_outpipe, &c, 1, ::write))
870 throw Exception("handshake: xferraw write", EPIPE);
871 if (1 != xferraw(m_inpipe, &c, 1, ::read))
872 throw Exception("handshake: xferraw read", EPIPE);
873 if ('C' != c) throw Exception("handshake", EPIPE);
874 break;
875 }
876 // mark file descriptors for close on exec (we do not want to leak the
877 // connection to anything we happen to exec)
878 int fdflags = 0;
879 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
880 throw Exception("fcntl", errno);
881 fdflags |= FD_CLOEXEC;
882 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
883 throw Exception("fcntl", errno);
884 if (m_inpipe != m_outpipe) {
885 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
886 throw Exception("fcntl", errno);
887 fdflags |= FD_CLOEXEC;
888 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
889 throw Exception("fcntl", errno);
890 }
891 // ok, finally, clear the failbit
892 m_flags &= ~failbit;
893 // all done
894 } catch (BidirMMapPipe::Exception&) {
895 if (0 != m_childPid) kill(m_childPid, SIGTERM);
896 for (int i = 0; i < 4; ++i)
897 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
898 {
899 // free resources associated with mmapped pages
901 }
902 if (!--s_pagepoolrefcnt) {
903 delete s_pagepool;
904 s_pagepool = 0;
905 }
906 throw;
907 }
908}
909
911{
912 assert(!(m_flags & failbit));
913 return doClose(false);
914}
915
916int BidirMMapPipe::doClose(bool force, bool holdlock)
917{
918 if (m_flags & failbit) return 0;
919 // flush data to be written
920 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
921 // shut down the write direction (no more writes from our side)
922 if (m_inpipe == m_outpipe) {
923 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
924 throw Exception("shutdown", errno);
925 m_outpipe = -1;
926 } else {
927 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
928 if (!force) throw Exception("close", errno);
929 m_outpipe = -1;
930 }
931 // shut down the write direction (no more writes from our side)
932 // drain anything the other end might still want to send
933 if (!force && -1 != m_inpipe) {
934 // **************** THIS IS EXTREMELY UGLY: ****************
935 // POLLHUP is not set reliably on pipe/socket shutdown on all
936 // platforms, unfortunately, so we poll for readability here until
937 // the other end closes, too
938 //
939 // the read loop below ensures that the other end sees the POLLIN that
940 // is set on shutdown instead, and goes ahead to close its end
941 //
942 // if we don't do this, and close straight away, the other end
943 // will catch a SIGPIPE or similar, and we don't want that
944 int err;
945 struct pollfd fds;
946 fds.fd = m_inpipe;
947 fds.events = POLLIN;
948 fds.revents = 0;
949 do {
950 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
951 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
952 if (fds.revents & POLLIN) {
953 char c;
954 if (1 > ::read(m_inpipe, &c, 1)) break;
955 }
956 }
957 } while (0 > err && EINTR == errno);
958 // ignore all other poll errors
959 }
960 // close read end
961 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
962 if (!force) throw Exception("close", errno);
963 m_inpipe = -1;
964 // unmap memory
965 try {
967 if (!--s_pagepoolrefcnt) {
968 delete s_pagepool;
969 s_pagepool = 0;
970 }
971 } catch (std::exception&) {
972 if (!force) throw;
973 }
975 // wait for child process
976 int retVal = 0;
977 if (isParent()) {
978 int tmp;
979 do {
980 tmp = waitpid(m_childPid, &retVal, 0);
981 } while (-1 == tmp && EINTR == errno);
982 if (-1 == tmp)
983 if (!force) throw Exception("waitpid", errno);
984 m_childPid = 0;
985 }
986 // remove from list of open pipes
987 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
988 std::list<BidirMMapPipe*>::iterator it = std::find(
989 s_openpipes.begin(), s_openpipes.end(), this);
990 if (s_openpipes.end() != it) s_openpipes.erase(it);
991 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
992 m_flags |= failbit;
993 return retVal;
994}
995
997{ doClose(false); }
998
1000 int fd, void* addr, size_type len,
1001 ssize_t (*xferfn)(int, void*, std::size_t))
1002{
1003 size_type xferred = 0;
1004 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1005 while (len) {
1006 ssize_t tmp = xferfn(fd, buf, len);
1007 if (tmp > 0) {
1008 xferred += tmp;
1009 len -= tmp;
1010 buf += tmp;
1011 continue;
1012 } else if (0 == tmp) {
1013 // check for end-of-file on pipe
1014 break;
1015 } else if (-1 == tmp) {
1016 // ok some error occurred, so figure out if we want to retry of throw
1017 switch (errno) {
1018 default:
1019 // if anything was transferred, return number of bytes
1020 // transferred so far, we can start throwing on the next
1021 // transfer...
1022 if (xferred) return xferred;
1023 // else throw
1024 throw Exception("xferraw", errno);
1025 case EAGAIN: // fallthrough intended
1026#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1027 case EWOULDBLOCK: // fallthrough intended
1028#endif
1029 std::cerr << " ERROR: In " << __func__ << " (" <<
1030 __FILE__ << ", line " << __LINE__ <<
1031 "): expect transfer to block!" << std::endl;
1032 case EINTR:
1033 break;
1034 }
1035 continue;
1036 } else {
1037 throw Exception("xferraw: unexpected return value from read/write",
1038 errno);
1039 }
1040 }
1041 return xferred;
1042}
1043
1045{
1046 if (plist) {
1047 unsigned char pg = m_pages[plist];
1048 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1051 // ok, have to copy pages through pipe
1052 for (Page* p = plist; p; p = p->next()) {
1053 if (sizeof(Page) + p->size() !=
1054 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1055 ::write)) {
1056 throw Exception("sendpages: short write", EPIPE);
1057 }
1058 }
1059 }
1060 } else {
1061 throw Exception("sendpages: short write", EPIPE);
1062 }
1063 } else { assert(plist); }
1064}
1065
1067{
1068 unsigned char pg;
1069 unsigned retVal = 0;
1070 Page *plisthead = 0, *plisttail = 0;
1071 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1072 plisthead = plisttail = m_pages[pg];
1073 // ok, have number of pages
1076 // ok, need to copy pages through pipe
1077 for (; plisttail; ++retVal) {
1078 Page* p = plisttail;
1079 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1080 ::read)) {
1081 plisttail = p->next();
1082 if (!p->size()) continue;
1083 // break in case of read error
1084 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1085 ::read)) break;
1086 }
1087 }
1088 } else {
1089 retVal = lenPageList(plisthead);
1090 }
1091 }
1092 // put list of pages we just received into correct lists (busy/free)
1093 if (plisthead) feedPageLists(plisthead);
1094 // ok, retVal contains the number of pages read, so put them on the
1095 // correct lists
1096 return retVal;
1097}
1098
1100{
1101 struct pollfd fds;
1102 fds.fd = m_inpipe;
1103 fds.events = POLLIN;
1104 fds.revents = 0;
1105 unsigned retVal = 0;
1106 do {
1107 int rc = ::poll(&fds, 1, 0);
1108 if (0 > rc) {
1109 if (EINTR == errno) continue;
1110 break;
1111 }
1112 if (1 == retVal && fds.revents & POLLIN &&
1113 !(fds.revents & (POLLNVAL | POLLERR))) {
1114 // ok, we can read without blocking, so the other end has
1115 // something for us
1116 return recvpages();
1117 } else {
1118 break;
1119 }
1120 } while (true);
1121 return retVal;
1122}
1123
1125{
1126 unsigned n = 0;
1127 for ( ; p; p = p->next()) ++n;
1128 return n;
1129}
1130
1132{
1133 assert(plist);
1134 // get end of busy list
1135 Page *blend = m_busylist;
1136 while (blend && blend->next()) blend = blend->next();
1137 // ok, might have to send free pages to other end, and (if we do have to
1138 // send something to the other end) while we're at it, send any dirty
1139 // pages which are completely full, too
1140 Page *sendlisthead = 0, *sendlisttail = 0;
1141 // loop over plist
1142 while (plist) {
1143 Page* p = plist;
1144 plist = p->next();
1145 p->setNext(0);
1146 if (p->size()) {
1147 // busy page...
1148 p->pos() = 0;
1149 // put at end of busy list
1150 if (blend) blend->setNext(p);
1151 else m_busylist = p;
1152 blend = p;
1153 } else {
1154 // free page...
1155 // Very simple algorithm: once we're done with a page, we send it back
1156 // where it came from. If it's from our end, we put it on the free list, if
1157 // it's from the other end, we send it back.
1158 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1159 (isChild() && m_pages[p] < PagesPerEnd)) {
1160 // page "belongs" to other end
1161 if (!sendlisthead) sendlisthead = p;
1162 if (sendlisttail) sendlisttail->setNext(p);
1163 sendlisttail = p;
1164 } else {
1165 // add page to freelist
1166 p->setNext(m_freelist);
1167 m_freelist = p;
1168 }
1169 }
1170 }
1171 // check if we have to send stuff to the other end
1172 if (sendlisthead) {
1173 // go through our list of dirty pages, and see what we can
1174 // send along
1175 Page* dp;
1176 while ((dp = m_dirtylist) && dp->full()) {
1177 Page* p = dp;
1178 // move head of dirty list
1179 m_dirtylist = p->next();
1180 // queue for sending
1181 p->setNext(0);
1182 sendlisttail->setNext(p);
1183 sendlisttail = p;
1184 }
1185 // poll if the other end is still alive - this needs that we first
1186 // close the write pipe of the other end when the remote end of the
1187 // connection is shutting down in doClose; we'll see that because we
1188 // get a POLLHUP on our inpipe
1189 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1190 struct pollfd fds[2];
1191 fds[0].fd = m_outpipe;
1192 fds[0].events = fds[0].revents = 0;
1193 if (m_outpipe != m_inpipe) {
1194 fds[1].fd = m_inpipe;
1195 fds[1].events = fds[1].revents = 0;
1196 } else {
1197 fds[0].events |= POLLIN;
1198 }
1199 int retVal = 0;
1200 do {
1201 retVal = ::poll(fds, nfds, 0);
1202 if (0 > retVal && EINTR == errno)
1203 continue;
1204 break;
1205 } while (true);
1206 if (0 <= retVal) {
1207 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208 if (m_outpipe != m_inpipe) {
1209 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1210 } else {
1211 if (ok && fds[0].revents & POLLIN) {
1212 unsigned ret = recvpages();
1213 if (!ret) ok = false;
1214 }
1215 }
1216
1217 if (ok) sendpages(sendlisthead);
1218 // (if the pipe is dead already, we don't care that we leak the
1219 // contents of the pages on the send list here, so that is why
1220 // there's no else clause here)
1221 } else {
1222 throw Exception("feedPageLists: poll", errno);
1223 }
1224 }
1225}
1226
1228{
1229 assert(p);
1230 assert(p == m_freelist);
1231 // remove from freelist
1232 m_freelist = p->next();
1233 p->setNext(0);
1234 // append to dirty list
1235 Page* dl = m_dirtylist;
1236 while (dl && dl->next()) dl = dl->next();
1237 if (dl) dl->setNext(p);
1238 else m_dirtylist = p;
1239}
1240
1242{
1243 // queue any pages available for reading we can without blocking
1245 Page* p;
1246 // if there are no busy pages, try to get them from the other end,
1247 // block if we have to...
1248 while (!(p = m_busylist)) if (!recvpages()) return 0;
1249 return p;
1250}
1251
1253{
1254 // queue any pages available for reading we can without blocking
1256 Page* p = m_dirtylist;
1257 // go to end of dirty list
1258 if (p) while (p->next()) p = p->next();
1259 if (!p || p->full()) {
1260 // need to append free page, so get one
1261 while (!(p = m_freelist)) if (!recvpages()) return 0;
1262 markPageDirty(p);
1263 }
1264 return p;
1265}
1266
1268{ return doFlush(true); }
1269
1270void BidirMMapPipe::doFlush(bool forcePartialPages)
1271{
1272 assert(!(m_flags & failbit));
1273 // build a list of pages to flush
1274 Page *flushlisthead = 0, *flushlisttail = 0;
1275 while (m_dirtylist) {
1276 Page* p = m_dirtylist;
1277 if (!forcePartialPages && !p->full()) break;
1278 // remove dirty page from dirty list
1279 m_dirtylist = p->next();
1280 p->setNext(0);
1281 // and send it to other end
1282 if (!flushlisthead) flushlisthead = p;
1283 if (flushlisttail) flushlisttail->setNext(p);
1284 flushlisttail = p;
1285 }
1286 if (flushlisthead) sendpages(flushlisthead);
1287}
1288
1290{
1291 assert(!(m_flags & failbit));
1292 // join busy and dirty lists
1293 {
1294 Page *l = m_busylist;
1295 while (l && l->next()) l = l->next();
1296 if (l) l->setNext(m_dirtylist);
1297 else m_busylist = m_dirtylist;
1298 }
1299 // empty busy and dirty pages
1300 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1301 // put them on the free list
1303 m_busylist = m_dirtylist = 0;
1304}
1305
1307{
1308 // queue all pages waiting for consumption in the pipe before we give an
1309 // answer
1311 size_type retVal = 0;
1312 for (Page* p = m_busylist; p; p = p->next())
1313 retVal += p->size() - p->pos();
1314 return retVal;
1315}
1316
1318{
1319 // queue all pages waiting for consumption in the pipe before we give an
1320 // answer
1322 // check if we could write to the pipe without blocking (we need to know
1323 // because we might need to check if flushing of dirty pages would block)
1324 bool couldwrite = false;
1325 {
1326 struct pollfd fds;
1327 fds.fd = m_outpipe;
1328 fds.events = POLLOUT;
1329 fds.revents = 0;
1330 int retVal = 0;
1331 do {
1332 retVal = ::poll(&fds, 1, 0);
1333 if (0 > retVal) {
1334 if (EINTR == errno) continue;
1335 throw Exception("bytesWritableNonBlocking: poll", errno);
1336 }
1337 if (1 == retVal && fds.revents & POLLOUT &&
1338 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1339 couldwrite = true;
1340 break;
1341 } while (true);
1342 }
1343 // ok, start counting bytes
1344 size_type retVal = 0;
1345 unsigned npages = 0;
1346 // go through the dirty list
1347 for (Page* p = m_dirtylist; p; p = p->next()) {
1348 ++npages;
1349 // if page only partially filled
1350 if (!p->full())
1351 retVal += p->free();
1352 if (npages >= FlushThresh && !couldwrite) break;
1353 }
1354 // go through the free list
1355 for (Page* p = m_freelist; p && (!m_dirtylist ||
1356 npages < FlushThresh || couldwrite); p = p->next()) {
1357 ++npages;
1358 retVal += Page::capacity();
1359 }
1360 return retVal;
1361}
1362
1364{
1365 assert(!(m_flags & failbit));
1366 size_type nread = 0;
1367 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1368 try {
1369 while (sz) {
1370 // find next page to read from
1371 Page* p = busypage();
1372 if (!p) {
1373 m_flags |= eofbit;
1374 return nread;
1375 }
1376 unsigned char* pp = p->begin() + p->pos();
1377 size_type csz = std::min(size_type(p->remaining()), sz);
1378 std::copy(pp, pp + csz, ap);
1379 nread += csz;
1380 ap += csz;
1381 sz -= csz;
1382 p->pos() += csz;
1383 assert(p->size() >= p->pos());
1384 if (p->size() == p->pos()) {
1385 // if no unread data remains, page is free
1386 m_busylist = p->next();
1387 p->setNext(0);
1388 p->size() = 0;
1389 feedPageLists(p);
1390 }
1391 }
1392 } catch (Exception&) {
1393 m_flags |= rderrbit;
1394 if (m_flags & exceptionsbit) throw;
1395 }
1396 return nread;
1397}
1398
1400{
1401 assert(!(m_flags & failbit));
1402 size_type written = 0;
1403 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1404 try {
1405 while (sz) {
1406 // find next page to write to
1407 Page* p = dirtypage();
1408 if (!p) {
1409 m_flags |= eofbit;
1410 return written;
1411 }
1412 unsigned char* pp = p->begin() + p->size();
1413 size_type csz = std::min(size_type(p->free()), sz);
1414 std::copy(ap, ap + csz, pp);
1415 written += csz;
1416 ap += csz;
1417 p->size() += csz;
1418 sz -= csz;
1419 assert(p->capacity() >= p->size());
1420 if (p->full()) {
1421 // if page is full, see if we're above the flush threshold of
1422 // 3/4 of our pages
1424 doFlush(false);
1425 }
1426 }
1427 } catch (Exception&) {
1428 m_flags |= wrerrbit;
1429 if (m_flags & exceptionsbit) throw;
1430 }
1431 return written;
1432}
1433
1435{
1436 // go through pipes, and change flags where we already know without really
1437 // polling - stuff where we don't need poll to wait for its timeout in the
1438 // OS...
1439 bool canskiptimeout = false;
1440 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1441 std::vector<unsigned>::iterator mit = masks.begin();
1442 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1443 ++it, ++mit) {
1444 PollEntry& pe = *it;
1445 pe.revents = None;
1446 // null pipe pointer or closed pipe is invalid
1447 if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1448 // check for error
1449 if (pe.pipe->bad()) pe.revents |= Error;
1450 // check for end of file
1451 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1452 // check if readable
1453 if (pe.events & Readable) {
1454 *mit |= Readable;
1455 if (pe.pipe->m_busylist) pe.revents |= Readable;
1456 }
1457 // check if writable
1458 if (pe.events & Writable) {
1459 *mit |= Writable;
1460 if (pe.pipe->m_freelist) {
1461 pe.revents |= Writable;
1462 } else {
1463 Page *dl = pe.pipe->m_dirtylist;
1464 while (dl && dl->next()) dl = dl->next();
1465 if (dl && dl->pos() < Page::capacity())
1466 pe.revents |= Writable;
1467 }
1468 }
1469 if (pe.revents) canskiptimeout = true;
1470 }
1471 // set up the data structures required for the poll syscall
1472 std::vector<pollfd> fds;
1473 fds.reserve(2 * pipes.size());
1474 std::map<int, PollEntry*> fds2pipes;
1475 for (PollVector::const_iterator it = pipes.begin();
1476 pipes.end() != it; ++it) {
1477 const PollEntry& pe = *it;
1478 struct pollfd tmp;
1479 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1480 const_cast<PollEntry*>(&pe)));
1481 tmp.events = tmp.revents = 0;
1482 // we always poll for readability; this allows us to queue pages
1483 // early
1484 tmp.events |= POLLIN;
1485 if (pe.pipe->m_outpipe != tmp.fd) {
1486 // ok, it's a pair of pipes
1487 fds.push_back(tmp);
1488 fds2pipes.insert(std::make_pair(
1489 unsigned(tmp.fd = pe.pipe->m_outpipe),
1490 const_cast<PollEntry*>(&pe)));
1491 tmp.events = 0;
1492
1493 }
1494 if (pe.events & Writable) tmp.events |= POLLOUT;
1495 fds.push_back(tmp);
1496 }
1497 // poll
1498 int retVal = 0;
1499 do {
1500 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1501 if (0 > retVal) {
1502 if (EINTR == errno) continue;
1503 throw Exception("poll", errno);
1504 }
1505 break;
1506 } while (true);
1507 // fds may have changed state, so update...
1508 for (std::vector<pollfd>::iterator it = fds.begin();
1509 fds.end() != it; ++it) {
1510 pollfd& fe = *it;
1511 //if (!fe.revents) continue;
1512 --retVal;
1513 PollEntry& pe = *fds2pipes[fe.fd];
1514oncemore:
1515 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1516 pe.revents |= ReadInvalid;
1517 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1518 pe.revents |= WriteInvalid;
1519 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1520 pe.revents |= ReadError;
1521 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1522 pe.revents |= WriteError;
1523 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1524 pe.revents |= ReadEndOfFile;
1525 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1526 pe.revents |= WriteEndOfFile;
1527 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1528 !(fe.revents & (POLLNVAL | POLLERR))) {
1529 // ok, there is at least one page for us to receive from the
1530 // other end
1531 if (0 == pe.pipe->recvpages()) continue;
1532 // more pages there?
1533 do {
1534 int tmp = ::poll(&fe, 1, 0);
1535 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1536 if (0 > tmp) {
1537 if (EINTR == errno) continue;
1538 throw Exception("poll", errno);
1539 }
1540 break;
1541 } while (true);
1542 }
1543 if (pe.pipe->m_busylist) pe.revents |= Readable;
1544 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1545 if (pe.pipe->m_freelist) {
1546 pe.revents |= Writable;
1547 } else {
1548 Page *dl = pe.pipe->m_dirtylist;
1549 while (dl && dl->next()) dl = dl->next();
1550 if (dl && dl->pos() < Page::capacity())
1551 pe.revents |= Writable;
1552 }
1553 }
1554 }
1555 // apply correct masks, and count pipes with pending events
1556 int npipes = 0;
1557 mit = masks.begin();
1558 for (PollVector::iterator it = pipes.begin();
1559 pipes.end() != it; ++it, ++mit)
1560 if ((it->revents &= *mit)) ++npipes;
1561 return npipes;
1562}
1563
1565{
1566 size_t sz = std::strlen(str);
1567 *this << sz;
1568 if (sz) write(str, sz);
1569 return *this;
1570}
1571
1573{
1574 size_t sz = 0;
1575 *this >> sz;
1576 if (good() && !eof()) {
1577 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1578 if (!str) throw Exception("realloc", errno);
1579 if (sz) read(str, sz);
1580 str[sz] = 0;
1581 }
1582 return *this;
1583}
1584
1586{
1587 size_t sz = str.size();
1588 *this << sz;
1589 write(str.data(), sz);
1590 return *this;
1591}
1592
1594{
1595 str.clear();
1596 size_t sz = 0;
1597 *this >> sz;
1598 if (good() && !eof()) {
1599 str.reserve(sz);
1600 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1601 }
1602 return *this;
1603}
1604
1606
1607#ifdef TEST_BIDIRMMAPPIPE
1608using namespace RooFit;
1609
1610int simplechild(BidirMMapPipe& pipe)
1611{
1612 // child does an echo loop
1613 while (pipe.good() && !pipe.eof()) {
1614 // read a string
1615 std::string str;
1616 pipe >> str;
1617 if (!pipe) return -1;
1618 if (pipe.eof()) break;
1619 if (!str.empty()) {
1620 std::cout << "[CHILD] : read: " << str << std::endl;
1621 str = "... early in the morning?";
1622 }
1623 pipe << str << BidirMMapPipe::flush;
1624 // did our parent tell us to shut down?
1625 if (str.empty()) break;
1626 if (!pipe) return -1;
1627 if (pipe.eof()) break;
1628 std::cout << "[CHILD] : wrote: " << str << std::endl;
1629 }
1630 pipe.close();
1631 return 0;
1632}
1633
1634#include <sstream>
1635int randomchild(BidirMMapPipe& pipe)
1636{
1637 // child sends out something at random intervals
1638 ::srand48(::getpid());
1639 {
1640 // wait for parent's go ahead signal
1641 std::string s;
1642 pipe >> s;
1643 }
1644 // no shutdown sequence needed on this side - we're producing the data,
1645 // and the parent can just read until we're done (when it'll get EOF)
1646 for (int i = 0; i < 5; ++i) {
1647 // sleep a random time between 0 and .9 seconds
1648 ::usleep(int(1e6 * ::drand48()));
1649 std::ostringstream buf;
1650 buf << "child pid " << ::getpid() << " sends message " << i;
1651 std::string str = buf.str();
1652 std::cout << "[CHILD] : " << str << std::endl;
1653 pipe << str << BidirMMapPipe::flush;
1654 if (!pipe) return -1;
1655 if (pipe.eof()) break;
1656 }
1657 // tell parent we're shutting down
1658 pipe << "" << BidirMMapPipe::flush;
1659 // wait for parent to acknowledge
1660 std::string s;
1661 pipe >> s;
1662 pipe.close();
1663 return 0;
1664}
1665
1666int benchchildrtt(BidirMMapPipe& pipe)
1667{
1668 // child does the equivalent of listening for pings and sending the
1669 // packet back
1670 char* str = 0;
1671 while (pipe && !pipe.eof()) {
1672 pipe >> str;
1673 if (!pipe) {
1674 std::free(str);
1675 pipe.close();
1676 return -1;
1677 }
1678 if (pipe.eof()) break;
1679 pipe << str << BidirMMapPipe::flush;
1680 // if we have just completed the shutdown handshake, we break here
1681 if (!std::strlen(str)) break;
1682 }
1683 std::free(str);
1684 pipe.close();
1685 return 0;
1686}
1687
1688int benchchildsink(BidirMMapPipe& pipe)
1689{
1690 // child behaves like a sink
1691 char* str = 0;
1692 while (pipe && !pipe.eof()) {
1693 pipe >> str;
1694 if (!std::strlen(str)) break;
1695 }
1696 pipe << "" << BidirMMapPipe::flush;
1697 std::free(str);
1698 pipe.close();
1699 return 0;
1700}
1701
1702int benchchildsource(BidirMMapPipe& pipe)
1703{
1704 // child behaves like a source
1705 char* str = 0;
1706 for (unsigned i = 0; i <= 24; ++i) {
1707 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1708 std::memset(str, '4', 1 << i);
1709 str[1 << i] = 0;
1710 for (unsigned j = 0; j < 1 << 7; ++j) {
1711 pipe << str;
1712 if (!pipe || pipe.eof()) {
1713 std::free(str);
1714 pipe.close();
1715 return -1;
1716 }
1717 }
1718 // tell parent we're done with this block size
1719 pipe << "" << BidirMMapPipe::flush;
1720 }
1721 // tell parent to shut down
1722 pipe << "" << BidirMMapPipe::flush;
1723 std::free(str);
1724 pipe.close();
1725 return 0;
1726}
1727
1728BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1729{
1730 // create a pipe with the given child at the remote end
1731 BidirMMapPipe *p = new BidirMMapPipe();
1732 if (p->isChild()) {
1733 int retVal = childexec(*p);
1734 delete p;
1735 std::exit(retVal);
1736 }
1737 return p;
1738}
1739
1740#include <sys/time.h>
1741#include <iomanip>
1742int main()
1743{
1744 // simple echo loop test
1745 {
1746 std::cout << "[PARENT]: simple challenge-response test, "
1747 "one child:" << std::endl;
1748 BidirMMapPipe* pipe = spawnChild(simplechild);
1749 for (int i = 0; i < 5; ++i) {
1750 std::string str("What shall we do with a drunken sailor...");
1751 *pipe << str << BidirMMapPipe::flush;
1752 if (!*pipe) return -1;
1753 std::cout << "[PARENT]: wrote: " << str << std::endl;
1754 *pipe >> str;
1755 if (!*pipe) return -1;
1756 std::cout << "[PARENT]: read: " << str << std::endl;
1757 }
1758 // send shutdown string
1759 *pipe << "" << BidirMMapPipe::flush;
1760 // wait for shutdown handshake
1761 std::string s;
1762 *pipe >> s;
1763 int retVal = pipe->close();
1764 std::cout << "[PARENT]: exit status of child: " << retVal <<
1765 std::endl;
1766 if (retVal) return retVal;
1767 delete pipe;
1768 }
1769 // simple poll test - children send 5 results in random intervals
1770 {
1771 unsigned nch = 20;
1772 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1773 " children:" << std::endl;
1774 typedef BidirMMapPipe::PollEntry PollEntry;
1775 // poll data structure
1777 pipes.reserve(nch);
1778 // spawn children
1779 for (unsigned i = 0; i < nch; ++i) {
1780 std::cout << "[PARENT]: spawning child " << i << std::endl;
1781 pipes.push_back(PollEntry(spawnChild(randomchild),
1783 }
1784 // wake children up
1785 std::cout << "[PARENT]: waking up children" << std::endl;
1786 for (unsigned i = 0; i < nch; ++i)
1787 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1788 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1789 // while at least some children alive
1790 while (!pipes.empty()) {
1791 // poll, wait until status change (infinite timeout)
1792 int npipes = BidirMMapPipe::poll(pipes, -1);
1793 // scan for pipes with changed status
1794 for (std::vector<PollEntry>::iterator it = pipes.begin();
1795 npipes && pipes.end() != it; ) {
1796 if (!it->revents) {
1797 // unchanged, next one
1798 ++it;
1799 continue;
1800 }
1801 --npipes; // maybe we can stop early...
1802 // read from pipes which are readable
1803 if (it->revents & BidirMMapPipe::Readable) {
1804 std::string s;
1805 *(it->pipe) >> s;
1806 if (!s.empty()) {
1807 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1808 ": " << s << std::endl;
1809 ++it;
1810 continue;
1811 } else {
1812 // child is shutting down...
1813 *(it->pipe) << "" << BidirMMapPipe::flush;
1814 goto childcloses;
1815 }
1816 }
1817 // retire pipes with error or end-of-file condition
1818 if (it->revents & (BidirMMapPipe::Error |
1821 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1822 " revents" <<
1823 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1824 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1825 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1826 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1827 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1828 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1829 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1830 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1831 std::endl;
1832childcloses:
1833 int retVal = it->pipe->close();
1834 std::cout << "[PARENT]: child exit status: " <<
1835 retVal << ", number of children still alive: " <<
1836 (pipes.size() - 1) << std::endl;
1837 if (retVal) return retVal;
1838 delete it->pipe;
1839 it = pipes.erase(it);
1840 continue;
1841 }
1842 }
1843 }
1844 }
1845 // little benchmark - round trip time
1846 {
1847 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848 for (unsigned i = 0; i <= 24; ++i) {
1849 char *s = new char[1 + (1 << i)];
1850 std::memset(s, 'A', 1 << i);
1851 s[1 << i] = 0;
1852 const unsigned n = 1 << 7;
1853 double avg = 0., min = 1e42, max = -1e42;
1854 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1855 for (unsigned j = n; j--; ) {
1856 struct timeval t1;
1857 ::gettimeofday(&t1, 0);
1858 *pipe << s << BidirMMapPipe::flush;
1859 if (!*pipe || pipe->eof()) break;
1860 *pipe >> s;
1861 if (!*pipe || pipe->eof()) break;
1862 struct timeval t2;
1863 ::gettimeofday(&t2, 0);
1864 t2.tv_sec -= t1.tv_sec;
1865 t2.tv_usec -= t1.tv_usec;
1866 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867 if (dt < min) min = dt;
1868 if (dt > max) max = dt;
1869 avg += dt;
1870 }
1871 // send a shutdown string
1872 *pipe << "" << BidirMMapPipe::flush;
1873 // get child's shutdown ok
1874 *pipe >> s;
1875 avg /= double(n);
1876 avg *= 1e6; min *= 1e6; max *= 1e6;
1877 int retVal = pipe->close();
1878 if (retVal) {
1879 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1880 delete[] s;
1881 return retVal;
1882 }
1883 delete pipe;
1884 // there is a factor 2 in the formula for the transfer rate below,
1885 // because we transfer data of twice the size of the block - once
1886 // to the child, and once for the return trip
1887 std::cout << "block size " << std::setw(9) << (1 << i) <<
1888 " avg " << std::setw(7) << avg << " us min " <<
1889 std::setw(7) << min << " us max " << std::setw(7) << max <<
1890 "us speed " << std::setw(9) <<
1891 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1892 " MB/s" << std::endl;
1893 delete[] s;
1894 }
1895 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1896 }
1897 // little benchmark - child as sink
1898 {
1899 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1900 for (unsigned i = 0; i <= 24; ++i) {
1901 char *s = new char[1 + (1 << i)];
1902 std::memset(s, 'A', 1 << i);
1903 s[1 << i] = 0;
1904 const unsigned n = 1 << 7;
1905 double avg = 0., min = 1e42, max = -1e42;
1906 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1907 for (unsigned j = n; j--; ) {
1908 struct timeval t1;
1909 ::gettimeofday(&t1, 0);
1910 // streaming mode - we do not flush here
1911 *pipe << s;
1912 if (!*pipe || pipe->eof()) break;
1913 struct timeval t2;
1914 ::gettimeofday(&t2, 0);
1915 t2.tv_sec -= t1.tv_sec;
1916 t2.tv_usec -= t1.tv_usec;
1917 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1918 if (dt < min) min = dt;
1919 if (dt > max) max = dt;
1920 avg += dt;
1921 }
1922 // send a shutdown string
1923 *pipe << "" << BidirMMapPipe::flush;
1924 // get child's shutdown ok
1925 *pipe >> s;
1926 avg /= double(n);
1927 avg *= 1e6; min *= 1e6; max *= 1e6;
1928 int retVal = pipe->close();
1929 if (retVal) {
1930 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1931 return retVal;
1932 }
1933 delete pipe;
1934 std::cout << "block size " << std::setw(9) << (1 << i) <<
1935 " avg " << std::setw(7) << avg << " us min " <<
1936 std::setw(7) << min << " us max " << std::setw(7) << max <<
1937 "us speed " << std::setw(9) <<
1938 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1939 " MB/s" << std::endl;
1940 delete[] s;
1941 }
1942 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1943 }
1944 // little benchmark - child as source
1945 {
1946 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1947 char *s = 0;
1948 double avg = 0., min = 1e42, max = -1e42;
1949 unsigned n = 0, bsz = 0;
1950 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1951 while (*pipe && !pipe->eof()) {
1952 struct timeval t1;
1953 ::gettimeofday(&t1, 0);
1954 // streaming mode - we do not flush here
1955 *pipe >> s;
1956 if (!*pipe || pipe->eof()) break;
1957 struct timeval t2;
1958 ::gettimeofday(&t2, 0);
1959 t2.tv_sec -= t1.tv_sec;
1960 t2.tv_usec -= t1.tv_usec;
1961 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1962 if (std::strlen(s)) {
1963 ++n;
1964 if (dt < min) min = dt;
1965 if (dt > max) max = dt;
1966 avg += dt;
1967 bsz = std::strlen(s);
1968 } else {
1969 if (!n) break;
1970 // next block size
1971 avg /= double(n);
1972 avg *= 1e6; min *= 1e6; max *= 1e6;
1973
1974 std::cout << "block size " << std::setw(9) << bsz <<
1975 " avg " << std::setw(7) << avg << " us min " <<
1976 std::setw(7) << min << " us max " << std::setw(7) <<
1977 max << "us speed " << std::setw(9) <<
1978 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1979 " MB/s" << std::endl;
1980 n = 0;
1981 avg = 0.;
1982 min = 1e42;
1983 max = -1e42;
1984 }
1985 }
1986 int retVal = pipe->close();
1987 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1988 if (retVal) return retVal;
1989 delete pipe;
1990 std::free(s);
1991 }
1992 return 0;
1993}
1994#endif // TEST_BIDIRMMAPPIPE
1995#endif // _WIN32
1996
1997// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
#define e(i)
Definition: RSha256.hxx:103
char name[80]
Definition: TGX11.cxx:109
Binding & operator=(OUT(*fun)(void))
typedef void((*Func_t)())
#define realloc
Definition: civetweb.c:1538
#define free
Definition: civetweb.c:1539
#define malloc
Definition: civetweb.c:1536
for poll() interface
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:81
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:62
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:71
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:64
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:88
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:56
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:65
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:69
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:68
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:59
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
@ Copy
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
@ Unknown
don't know yet what'll work
Definition: BidirMMapPipe.h:48
@ DevZero
mmapping /dev/zero works
Definition: BidirMMapPipe.h:51
@ Anonymous
anonymous mmap works
Definition: BidirMMapPipe.h:52
@ FileBacked
mmapping a temp file works
Definition: BidirMMapPipe.h:50
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:57
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:70
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:90
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:86
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
BidirMMapPipe_impl::Page Page
convenience typedef for Page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
@ wrerrbit
write error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
int main(int argc, char **argv)
const Int_t n
Definition: legend1.C:16
namespace for implementation details of BidirMMapPipe
Template specialisation used in RooAbsArg:
static constexpr double s
fill
Definition: fit1_py.py:6
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
auto * l
Definition: textangle.C:4
auto * t1
Definition: textangle.C:20
static unsigned long masks[]
Definition: gifencode.c:211