Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.cxx
Go to the documentation of this file.
1/// \cond ROOFIT_INTERNAL
2
3/** @file BidirMMapPipe.cxx
4 *
5 * implementation of BidirMMapPipe, a class which forks off a child process
6 * and serves as communications channel between parent and child
7 *
8 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
9 * @date 2013-07-07
10 */
11
12#ifndef _WIN32
13
14#include "BidirMMapPipe.h"
15
16#include <TSystem.h>
17
18#include <map>
19#include <cerrno>
20#include <limits>
21#include <cstdlib>
22#include <cstring>
23#include <cassert>
24#include <iostream>
25#include <algorithm>
26#include <exception>
27
28#include <poll.h>
29#include <fcntl.h>
30#include <csignal>
31#include <unistd.h>
32#include <sys/mman.h>
33#include <sys/stat.h>
34#include <sys/wait.h>
35#include <sys/socket.h>
36
37#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
38#define END_NAMESPACE_ROOFIT }
39
40BEGIN_NAMESPACE_ROOFIT
41
42/// namespace for implementation details of BidirMMapPipe
43namespace BidirMMapPipe_impl {
44 /** @brief exception to throw if low-level OS calls go wrong
45 *
46 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
47 * @date 2013-07-07
48 */
49 class BidirMMapPipeException : public std::exception
50 {
51 private:
52 enum {
53 s_sz = 256 ///< length of buffer
54 };
55 char m_buf[s_sz]; ///< buffer containing the error message
56
57 /// for the POSIX version of strerror_r
58 static int dostrerror_r(int err, char* buf, std::size_t sz,
59 int (*f)(int, char*, std::size_t))
60 { return f(err, buf, sz); }
61 /// for the GNU version of strerror_r
62 static int dostrerror_r(int, char*, std::size_t,
63 char* (*f)(int, char*, std::size_t));
64 public:
65 /// constructor taking error code, hint on operation (msg)
66 BidirMMapPipeException(const char* msg, int err);
67 /// return a destcription of what went wrong
68 const char* what() const noexcept override { return m_buf; }
69 };
70
71 BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
72 {
73 std::size_t msgsz = std::strlen(msg);
74 if (msgsz) {
75 msgsz = std::min(msgsz, std::size_t(s_sz));
76 std::copy(msg, msg + msgsz, m_buf);
77 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
78 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
79 }
80 if (msgsz < s_sz) {
81 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
82 // have to sort it out with overloads
83 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
84 }
85 m_buf[s_sz - 1] = 0; // enforce zero-termination
86 }
87
88 int BidirMMapPipeException::dostrerror_r(int err, char* buf,
89 std::size_t sz, char* (*f)(int, char*, std::size_t))
90 {
91 buf[0] = 0;
92 char *tmp = f(err, buf, sz);
93 if (tmp && tmp != buf) {
94 std::strncpy(buf, tmp, sz);
95 buf[sz - 1] = 0;
96 if (std::strlen(tmp) > sz - 1) return ERANGE;
97 }
98 return 0;
99 }
100
101 /** @brief class representing the header structure in an mmapped page
102 *
103 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
104 * @date 2013-07-07
105 *
106 * contains a field to put pages into a linked list, a field for the size
107 * of the data being transmitted, and a field for the position until which
108 * the data has been read
109 */
110 class Page
111 {
112 private:
113 // use as small a data type as possible to maximise payload area
114 // of pages
115 short m_next = 0; ///< next page in list (in pagesizes)
116 unsigned short m_size = 0; ///< size of payload (in bytes)
117 unsigned short m_pos = 0; ///< index of next byte in payload area
118 public:
119 /// constructor
120 Page()
121 {
122 // check that short is big enough - must be done at runtime
123 // because the page size is not known until runtime
124 assert(std::numeric_limits<unsigned short>::max() >=
125 PageChunk::pagesize());
126 }
127 /// copy construction forbidden
128 Page(const Page &) = delete;
129 /// assignment forbidden
130 Page &operator=(const Page &) = delete;
131 /// set pointer to next page
132 void setNext(const Page* p);
133 /// return pointer to next page
134 Page* next() const;
135 /// return reference to size field
136 unsigned short& size() { return m_size; }
137 /// return size (of payload data)
138 unsigned size() const { return m_size; }
139 /// return reference to position field
140 unsigned short& pos() { return m_pos; }
141 /// return position
142 unsigned pos() const { return m_pos; }
143 /// return pointer to first byte in payload data area of page
144 inline unsigned char* begin() const
145 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
146 + sizeof(Page); }
147 /// return pointer to first byte in payload data area of page
148 inline unsigned char* end() const
149 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
150 + PageChunk::pagesize(); }
151 /// return the capacity of the page
152 static unsigned capacity()
153 { return PageChunk::pagesize() - sizeof(Page); }
154 /// true if page empty
155 bool empty() const { return !m_size; }
156 /// true if page partially filled
157 bool filled() const { return !empty(); }
158 /// free space left (to be written to)
159 unsigned free() const { return capacity() - m_size; }
160 /// bytes remaining to be read
161 unsigned remaining() const { return m_size - m_pos; }
162 /// true if page completely full
163 bool full() const { return !free(); }
164 };
165
166 void Page::setNext(const Page* p)
167 {
168 if (!p) {
169 m_next = 0;
170 } else {
171 const char* p1 = reinterpret_cast<char*>(this);
172 const char* p2 = reinterpret_cast<const char*>(p);
173 std::ptrdiff_t tmp = p2 - p1;
174 // difference must be divisible by page size
175 assert(!(tmp % PageChunk::pagesize()));
176 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
177 m_next = tmp;
178 // no truncation when saving in a short
179 assert(m_next == tmp);
180 // final check: next() must return p
181 assert(next() == p);
182 }
183 }
184
185 Page* Page::next() const
186 {
187 if (!m_next) return nullptr;
188 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
189 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
190 return reinterpret_cast<Page*>(ptmp);
191 }
192
193 /** @brief class representing a page pool
194 *
195 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
196 * @date 2013-07-24
197 *
198 * pool of mmapped pages (on systems which support it, on all others, the
199 * functionality is emulated with dynamically allocated memory)
200 *
201 * in most operating systems there is a limit to how many mappings any one
202 * process is allowed to request; for this reason, we mmap a relatively
203 * large amount up front, and then carve off little pieces as we need them
204 *
205 * Moreover, some systems have too large a physical page size in their MMU
206 * for the code to handle (we want offsets and lengths to fit into 16
207 * bits), so we carve such big physical pages into smaller logical Pages
208 * if needed. The largest logical page size is currently 16 KiB.
209 */
210 class PagePool {
211 private:
212 /// convenience typedef
213 typedef BidirMMapPipeException Exception;
214
215 enum {
216 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
217 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
218 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
219 };
220 /// a chunk of memory in the pool
221 typedef BidirMMapPipe_impl::PageChunk Chunk;
222 /// list of chunks
223 typedef std::list<Chunk*> ChunkList;
224
225 friend class BidirMMapPipe_impl::PageChunk;
226 public:
227 /// convenience typedef
228 typedef PageChunk::MMapVariety MMapVariety;
229 /// constructor
230 PagePool(unsigned nPagesPerGroup);
231 /// destructor
232 ~PagePool();
233 /// pop a free element out of the pool
234 Pages pop();
235
236 /// return (logical) page size of the system
237 static unsigned pagesize() { return PageChunk::pagesize(); }
238 /// return variety of mmap supported on the system
239 static MMapVariety mmapVariety()
240 { return PageChunk::mmapVariety(); }
241
242 /// return number of pages per group (ie. as returned by pop())
243 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
244
245 /// zap the pool (unmap all but Pages p)
246 void zap(Pages& p);
247
248 private:
249 /// list of chunks used by the pool
250 ChunkList m_chunks;
251 /// list of chunks used by the pool which are not full
252 ChunkList m_freelist;
253 /// chunk size map (histogram of chunk sizes)
254 unsigned m_szmap[(maxsz - minsz) / szincr];
255 /// current chunk size
256 int m_cursz = minsz;
257 /// page group size
258 unsigned m_nPgPerGrp;
259
260 /// adjust _cursz to current largest block
261 void updateCurSz(int sz, int incr);
262 /// find size of next chunk to allocate (in a hopefully smart way)
263 int nextChunkSz() const;
264 /// release a chunk
265 void putOnFreeList(Chunk* chunk);
266 /// release a chunk
267 void release(Chunk* chunk);
268 };
269
270 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
271 m_pimpl(new impl)
272 {
273 assert(npg < 256);
274 m_pimpl->m_parent = parent;
275 m_pimpl->m_pages = pages;
276 m_pimpl->m_refcnt = 1;
277 m_pimpl->m_npages = npg;
278 /// initialise pages
279 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
280 }
281
282 unsigned PageChunk::s_physpgsz = PageChunk::getPageSize();
283 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
284 PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
285
286 Pages::~Pages()
287 {
288 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
289 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
290 delete m_pimpl;
291 }
292 }
293
294 Pages::Pages(const Pages& other) :
295 m_pimpl(other.m_pimpl)
296 { ++(m_pimpl->m_refcnt); }
297
298 Pages& Pages::operator=(const Pages& other)
299 {
300 if (&other == this) return *this;
301 if (!--(m_pimpl->m_refcnt)) {
302 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
303 delete m_pimpl;
304 }
305 m_pimpl = other.m_pimpl;
306 ++(m_pimpl->m_refcnt);
307 return *this;
308 }
309
310 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
311
312 Page* Pages::page(unsigned pgno) const
313 {
314 assert(pgno < m_pimpl->m_npages);
315 unsigned char* pptr =
316 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
317 pptr += pgno * pagesize();
318 return reinterpret_cast<Page*>(pptr);
319 }
320
321 unsigned Pages::pageno(Page* p) const
322 {
323 const unsigned char* pptr =
324 reinterpret_cast<const unsigned char*>(p);
325 const unsigned char* bptr =
326 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
327 assert(0 == ((pptr - bptr) % pagesize()));
328 const unsigned nr = (pptr - bptr) / pagesize();
329 assert(nr < m_pimpl->m_npages);
330 return nr;
331 }
332
333 unsigned PageChunk::getPageSize()
334 {
335 // find out page size of system
336 long pgsz = sysconf(_SC_PAGESIZE);
337 if (-1 == pgsz) throw Exception("sysconf", errno);
338 if (pgsz > 512 && pgsz > long(sizeof(Page)))
339 return pgsz;
340
341 // in case of failure or implausible value, use a safe default: 4k
342 // page size, and do not try to mmap
343 s_mmapworks = Copy;
344 return 1 << 12;
345 }
346
347 PageChunk::PageChunk(PagePool* parent,
348 unsigned length, unsigned nPgPerGroup) :
349 m_begin(dommap(length)),
350 m_end(reinterpret_cast<void*>(
351 reinterpret_cast<unsigned char*>(m_begin) + length)),
352 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
353 {
354 // ok, push groups of pages onto freelist here
355 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
356 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
357 while (p < pend) {
358 m_freelist.push_back(reinterpret_cast<void*>(p));
359 p += nPgPerGroup * PagePool::pagesize();
360 }
361 }
362
363 PageChunk::~PageChunk()
364 {
365 if (m_parent) assert(empty());
366 if (m_begin) domunmap(m_begin, len());
367 }
368
369 bool PageChunk::contains(const Pages& p) const
370 { return p.m_pimpl->m_parent == this; }
371
372 Pages PageChunk::pop()
373 {
374 assert(!m_freelist.empty());
375 void* p = m_freelist.front();
376 m_freelist.pop_front();
377 ++m_nUsedGrp;
378 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
379 }
380
381 void PageChunk::push(const Pages& p)
382 {
383 assert(contains(p));
384 bool wasempty = m_freelist.empty();
385 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
386 --m_nUsedGrp;
387 if (m_parent) {
388 // notify parent if we need to be put on the free list again
389 if (wasempty) m_parent->putOnFreeList(this);
390 // notify parent if we're empty
391 if (empty()) return m_parent->release(this);
392 }
393 }
394
395 void* PageChunk::dommap(unsigned len)
396 {
397 assert(len && 0 == (len % s_physpgsz));
398 // ok, the idea here is to try the different methods of mmapping, and
399 // choose the first one that works. we have four flavours:
400 // 1 - anonymous mmap (best)
401 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
402 // bit more tedious to set up, since you need to open/close a
403 // device file)
404 // 3 - mmap of a temporary file (very tedious to set up - need to
405 // create a temporary file, delete it, make the underlying storage
406 // large enough, then mmap the fd and close it)
407 // 4 - if all those fail, we malloc the buffers, and copy the data
408 // through the OS (then we're no better than normal pipes)
409 static bool msgprinted = false;
410 if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
411#if defined(MAP_ANONYMOUS)
412#undef MYANONFLAG
413#define MYANONFLAG MAP_ANONYMOUS
414#elif defined(MAP_ANON)
415#undef MYANONFLAG
416#define MYANONFLAG MAP_ANON
417#else
418#undef MYANONFLAG
419#endif
420#ifdef MYANONFLAG
421 void* retVal = ::mmap(nullptr, len, PROT_READ | PROT_WRITE,
422 MYANONFLAG | MAP_SHARED, -1, 0);
423 if (MAP_FAILED == retVal) {
424 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
425 } else {
426 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
427 s_mmapworks = Anonymous;
428 if (BidirMMapPipe::debugflag() && !msgprinted) {
429 std::cerr << " INFO: In " << __func__ << " (" <<
430 __FILE__ << ", line " << __LINE__ <<
431 "): anonymous mmapping works, excellent!" <<
432 std::endl;
433 msgprinted = true;
434 }
435 return retVal;
436 }
437#endif
438#undef MYANONFLAG
439 }
440 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
441 // ok, no anonymous mappings supported directly, so try to map
442 // /dev/zero which has much the same effect on many systems
443 int fd = ::open("/dev/zero", O_RDWR);
444 if (-1 == fd)
445 throw Exception("open /dev/zero", errno);
446 void* retVal = ::mmap(nullptr, len,
447 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
448 if (MAP_FAILED == retVal) {
449 int errsv = errno;
450 ::close(fd);
451 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
452 } else {
453 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
454 s_mmapworks = DevZero;
455 }
456 if (-1 == ::close(fd))
457 throw Exception("close /dev/zero", errno);
458 if (BidirMMapPipe::debugflag() && !msgprinted) {
459 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
460 ", line " << __LINE__ << "): mmapping /dev/zero works, "
461 "very good!" << std::endl;
462 msgprinted = true;
463 }
464 return retVal;
465 }
466 if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
467 std::string tmpPath = gSystem->TempDirectory();
468 std::string name = tmpPath + "/roofit_BidirMMapPipe-XXXXXX";
469 int fd;
470 // open temp file
471 if (-1 == (fd = ::mkstemp(const_cast<char*>(name.c_str())))) throw Exception("mkstemp", errno);
472 // remove it, but keep fd open
473 if (-1 == ::unlink(name.c_str())) {
474 int errsv = errno;
475 ::close(fd);
476 throw Exception("unlink", errsv);
477 }
478 // make it the right size: lseek
479 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
480 int errsv = errno;
481 ::close(fd);
482 throw Exception("lseek", errsv);
483 }
484 // make it the right size: write a byte
485 if (1 != ::write(fd, name.c_str(), 1)) {
486 int errsv = errno;
487 ::close(fd);
488 throw Exception("write", errsv);
489 }
490 // do mmap
491 void* retVal = ::mmap(nullptr, len,
492 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
493 if (MAP_FAILED == retVal) {
494 int errsv = errno;
495 ::close(fd);
496 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
497 } else {
498 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
499 s_mmapworks = FileBacked;
500 }
501 if (-1 == ::close(fd)) {
502 int errsv = errno;
503 ::munmap(retVal, len);
504 throw Exception("close", errsv);
505 }
506 if (BidirMMapPipe::debugflag() && !msgprinted) {
507 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
508 ", line " << __LINE__ << "): mmapping temporary files "
509 "works, good!" << std::endl;
510 msgprinted = true;
511 }
512 return retVal;
513 }
514 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
515 // fallback solution: mmap does not work on this OS (or does not
516 // work for what we want to use it), so use a normal buffer of
517 // memory instead, and collect data in that buffer - this needs an
518 // additional write/read to/from the pipe(s), but there you go...
519 if (BidirMMapPipe::debugflag() && !msgprinted) {
520 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
521 ", line " << __LINE__ << "): anonymous mmapping of "
522 "shared buffers failed, falling back to read/write on "
523 " pipes!" << std::endl;
524 msgprinted = true;
525 }
526 s_mmapworks = Copy;
527 void* retVal = std::malloc(len);
528 if (!retVal) throw Exception("malloc", errno);
529 return retVal;
530 }
531 // should never get here
532 assert(false);
533 return nullptr;
534 }
535
536 void PageChunk::domunmap(void* addr, unsigned len)
537 {
538 assert(len && 0 == (len % s_physpgsz));
539 if (addr) {
540 assert(Unknown != s_mmapworks);
541 if (Copy != s_mmapworks) {
542 if (-1 == ::munmap(addr, len))
543 throw Exception("munmap", errno);
544 } else {
545 std::free(addr);
546 }
547 }
548 }
549
550 void PageChunk::zap(Pages& p)
551 {
552 // try to mprotect the other bits of the pool with no access...
553 // we'd really like a version of mremap here that can unmap all the
554 // other pages in the chunk, but that does not exist, so we protect
555 // the other pages in this chunk such that they may neither be read,
556 // written nor executed, only the pages we're interested in for
557 // communications stay readable and writable
558 //
559 // if an OS does not support changing the protection of a part of an
560 // mmapped area, the mprotect calls below should just fail and not
561 // change any protection, so we're a little less safe against
562 // corruption, but everything should still work
563 if (Copy != s_mmapworks) {
564 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
565 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
566 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
567 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
568 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
569 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
570 }
571 m_parent = nullptr;
572 m_freelist.clear();
573 m_nUsedGrp = 1;
574 p.m_pimpl->m_parent = nullptr;
575 m_begin = m_end = nullptr;
576 // commit suicide
577 delete this;
578 }
579
580 PagePool::PagePool(unsigned nPgPerGroup) : m_nPgPerGrp(nPgPerGroup)
581 {
582 // if logical and physical page size differ, we may have to adjust
583 // m_nPgPerGrp to make things fit
584 if (PageChunk::pagesize() != PageChunk::physPgSz()) {
585 const unsigned mult =
586 PageChunk::physPgSz() / PageChunk::pagesize();
587 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
588 // round up to to next physical page boundary
589 const unsigned actual = mult *
590 (desired / mult + bool(desired % mult));
591 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
592 if (BidirMMapPipe::debugflag()) {
593 std::cerr << " INFO: In " << __func__ << " (" <<
594 __FILE__ << ", line " << __LINE__ <<
595 "): physical page size " << PageChunk::physPgSz() <<
596 ", subdividing into logical pages of size " <<
597 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
598 m_nPgPerGrp << " -> " << newPgPerGrp <<
599 std::endl;
600 }
601 assert(newPgPerGrp >= m_nPgPerGrp);
602 m_nPgPerGrp = newPgPerGrp;
603 }
604 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
605 }
606
607 PagePool::~PagePool()
608 {
609 m_freelist.clear();
610 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
611 delete *it;
612 m_chunks.clear();
613 }
614
615 void PagePool::zap(Pages& p)
616 {
617 // unmap all pages but those pointed to by p
618 m_freelist.clear();
619 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
620 if ((*it)->contains(p)) {
621 (*it)->zap(p);
622 } else {
623 delete *it;
624 }
625 }
626 m_chunks.clear();
627 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
628 m_cursz = minsz;
629 }
630
631 Pages PagePool::pop()
632 {
633 if (m_freelist.empty()) {
634 // allocate and register new chunk and put it on the freelist
635 const int sz = nextChunkSz();
636 Chunk *c = new Chunk(this,
637 sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
638 m_chunks.push_front(c);
639 m_freelist.push_back(c);
640 updateCurSz(sz, +1);
641 }
642 // get free element from first chunk on _freelist
643 Chunk* c = m_freelist.front();
644 Pages p(c->pop());
645 // full chunks are removed from _freelist
646 if (c->full()) m_freelist.pop_front();
647 return p;
648 }
649
650 void PagePool::release(PageChunk* chunk)
651 {
652 assert(chunk->empty());
653 // find chunk on freelist and remove
654 ChunkList::iterator it = std::find(
655 m_freelist.begin(), m_freelist.end(), chunk);
656 if (m_freelist.end() == it)
657 throw Exception("PagePool::release(PageChunk*)", EINVAL);
658 m_freelist.erase(it);
659 // find chunk in m_chunks and remove
660 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
661 if (m_chunks.end() == it)
662 throw Exception("PagePool::release(PageChunk*)", EINVAL);
663 m_chunks.erase(it);
664 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
665 delete chunk;
666 updateCurSz(sz, -1);
667 }
668
669 void PagePool::putOnFreeList(PageChunk* chunk)
670 {
671 assert(!chunk->full());
672 m_freelist.push_back(chunk);
673 }
674
675 void PagePool::updateCurSz(int sz, int incr)
676 {
677 m_szmap[(sz - minsz) / szincr] += incr;
678 m_cursz = minsz;
679 for (int i = (maxsz - minsz) / szincr; i--; ) {
680 if (m_szmap[i]) {
681 m_cursz += i * szincr;
682 break;
683 }
684 }
685 }
686
687 int PagePool::nextChunkSz() const
688 {
689 // no chunks with space available, figure out chunk size
690 int sz = m_cursz;
691 if (m_chunks.empty()) {
692 // if we start allocating chunks, we start from minsz
693 sz = minsz;
694 } else {
695 if (minsz >= sz) {
696 // minimal sized chunks are always grown
697 sz = minsz + szincr;
698 } else {
699 if (1 != m_chunks.size()) {
700 // if we have more than one completely filled chunk, grow
701 sz += szincr;
702 } else {
703 // just one chunk left, try shrinking chunk size
704 sz -= szincr;
705 }
706 }
707 }
708 // clamp size to allowed range
709 if (sz > maxsz) sz = maxsz;
710 if (sz < minsz) sz = minsz;
711 return sz;
712 }
713}
714
715// static BidirMMapPipe members
716pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
717std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
718BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = nullptr;
719unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
720int BidirMMapPipe::s_debugflag = 0;
721
722BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
723{
724 if (!s_pagepool)
725 s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
726 return *s_pagepool;
727}
728
729void BidirMMapPipe::teardownall(void)
730{
731 pthread_mutex_lock(&s_openpipesmutex);
732 while (!s_openpipes.empty()) {
733 BidirMMapPipe *p = s_openpipes.front();
734 pthread_mutex_unlock(&s_openpipesmutex);
735 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
736 p->doClose(true, true);
737 pthread_mutex_lock(&s_openpipesmutex);
738 }
739 pthread_mutex_unlock(&s_openpipesmutex);
740}
741
742BidirMMapPipe::BidirMMapPipe(const BidirMMapPipe&) :
743 m_pages(pagepool().pop())
744{
745 // free pages again
746 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
747 if (!s_pagepoolrefcnt) {
748 delete s_pagepool;
749 s_pagepool = nullptr;
750 }
751}
752
753BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
754 m_pages(pagepool().pop()), m_busylist(nullptr), m_freelist(nullptr), m_dirtylist(nullptr),
755 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
756 m_parentPid(::getpid())
757
758{
759 ++s_pagepoolrefcnt;
760 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
761 int fds[4] = { -1, -1, -1, -1 };
762 int myerrno;
763 static bool firstcall = true;
764 if (useExceptions) m_flags |= exceptionsbit;
765
766 try {
767 if (firstcall) {
768 firstcall = false;
769 // register a cleanup handler to make sure all BidirMMapPipes are torn
770 // down, and child processes are sent a SIGTERM
771 if (0 != atexit(BidirMMapPipe::teardownall))
772 throw Exception("atexit", errno);
773 }
774
775 // build free lists
776 for (unsigned i = 1; i < TotPages; ++i)
777 m_pages[i - 1]->setNext(m_pages[i]);
778 m_pages[PagesPerEnd - 1]->setNext(nullptr);
779 if (!useSocketpair) {
780 // create pipes
781 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
782 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
783 } else {
784 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
785 throw Exception("socketpair", errno);
786 }
787 // fork the child
788 pthread_mutex_lock(&s_openpipesmutex);
789 char c;
790 switch ((m_childPid = ::fork())) {
791 case -1: // error in fork()
792 myerrno = errno;
793 pthread_mutex_unlock(&s_openpipesmutex);
794 m_childPid = 0;
795 throw Exception("fork", myerrno);
796 case 0: // child
797 // put the ends in the right place
798 if (-1 != fds[2]) {
799 // pair of pipes
800 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
801 myerrno = errno;
802 pthread_mutex_unlock(&s_openpipesmutex);
803 throw Exception("close", myerrno);
804 }
805 fds[0] = fds[3] = -1;
806 m_outpipe = fds[1];
807 m_inpipe = fds[2];
808 } else {
809 // socket pair
810 if (-1 == ::close(fds[0])) {
811 myerrno = errno;
812 pthread_mutex_unlock(&s_openpipesmutex);
813 throw Exception("close", myerrno);
814 }
815 fds[0] = -1;
816 m_inpipe = m_outpipe = fds[1];
817 }
818 // close other pipes our parent may have open - we have no business
819 // reading from/writing to those...
820 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
821 s_openpipes.end() != it; ) {
822 BidirMMapPipe* p = *it;
823 it = s_openpipes.erase(it);
824 p->doClose(true, true);
825 }
826 pagepool().zap(m_pages);
827 s_pagepoolrefcnt = 0;
828 delete s_pagepool;
829 s_pagepool = nullptr;
830 s_openpipes.push_front(this);
831 pthread_mutex_unlock(&s_openpipesmutex);
832 // ok, put our pages on freelist
833 m_freelist = m_pages[PagesPerEnd];
834 // handshake with other end (to make sure it's alive)...
835 c = 'C'; // ...hild
836 if (1 != xferraw(m_outpipe, &c, 1, ::write))
837 throw Exception("handshake: xferraw write", EPIPE);
838 if (1 != xferraw(m_inpipe, &c, 1, ::read))
839 throw Exception("handshake: xferraw read", EPIPE);
840 if ('P' != c) throw Exception("handshake", EPIPE);
841 break;
842 default: // parent
843 // put the ends in the right place
844 if (-1 != fds[2]) {
845 // pair of pipes
846 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
847 myerrno = errno;
848 pthread_mutex_unlock(&s_openpipesmutex);
849 throw Exception("close", myerrno);
850 }
851 fds[1] = fds[2] = -1;
852 m_outpipe = fds[3];
853 m_inpipe = fds[0];
854 } else {
855 // socketpair
856 if (-1 == ::close(fds[1])) {
857 myerrno = errno;
858 pthread_mutex_unlock(&s_openpipesmutex);
859 throw Exception("close", myerrno);
860 }
861 fds[1] = -1;
862 m_inpipe = m_outpipe = fds[0];
863 }
864 // put on list of open pipes (so we can kill child processes
865 // if things go wrong)
866 s_openpipes.push_front(this);
867 pthread_mutex_unlock(&s_openpipesmutex);
868 // ok, put our pages on freelist
869 m_freelist = m_pages[0u];
870 // handshake with other end (to make sure it's alive)...
871 c = 'P'; // ...arent
872 if (1 != xferraw(m_outpipe, &c, 1, ::write))
873 throw Exception("handshake: xferraw write", EPIPE);
874 if (1 != xferraw(m_inpipe, &c, 1, ::read))
875 throw Exception("handshake: xferraw read", EPIPE);
876 if ('C' != c) throw Exception("handshake", EPIPE);
877 break;
878 }
879 // mark file descriptors for close on exec (we do not want to leak the
880 // connection to anything we happen to exec)
881 int fdflags = 0;
882 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
883 throw Exception("fcntl", errno);
884 fdflags |= FD_CLOEXEC;
885 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
886 throw Exception("fcntl", errno);
887 if (m_inpipe != m_outpipe) {
888 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
889 throw Exception("fcntl", errno);
890 fdflags |= FD_CLOEXEC;
891 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
892 throw Exception("fcntl", errno);
893 }
894 // ok, finally, clear the failbit
895 m_flags &= ~failbit;
896 // all done
897 } catch (BidirMMapPipe::Exception&) {
898 if (0 != m_childPid) kill(m_childPid, SIGTERM);
899 for (int i = 0; i < 4; ++i)
900 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
901 {
902 // free resources associated with mmapped pages
903 BidirMMapPipe_impl::Pages p; p.swap(m_pages);
904 }
905 if (!--s_pagepoolrefcnt) {
906 delete s_pagepool;
907 s_pagepool = nullptr;
908 }
909 throw;
910 }
911}
912
913int BidirMMapPipe::close()
914{
915 assert(!(m_flags & failbit));
916 return doClose(false);
917}
918
919int BidirMMapPipe::doClose(bool force, bool holdlock)
920{
921 if (m_flags & failbit) return 0;
922 // flush data to be written
923 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
924 // shut down the write direction (no more writes from our side)
925 if (m_inpipe == m_outpipe) {
926 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
927 throw Exception("shutdown", errno);
928 m_outpipe = -1;
929 } else {
930 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
931 if (!force) throw Exception("close", errno);
932 m_outpipe = -1;
933 }
934 // shut down the write direction (no more writes from our side)
935 // drain anything the other end might still want to send
936 if (!force && -1 != m_inpipe) {
937 // **************** THIS IS EXTREMELY UGLY: ****************
938 // POLLHUP is not set reliably on pipe/socket shutdown on all
939 // platforms, unfortunately, so we poll for readability here until
940 // the other end closes, too
941 //
942 // the read loop below ensures that the other end sees the POLLIN that
943 // is set on shutdown instead, and goes ahead to close its end
944 //
945 // if we don't do this, and close straight away, the other end
946 // will catch a SIGPIPE or similar, and we don't want that
947 int err;
948 struct pollfd fds;
949 fds.fd = m_inpipe;
950 fds.events = POLLIN;
951 fds.revents = 0;
952 do {
953 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
954 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
955 if (fds.revents & POLLIN) {
956 char c;
957 if (1 > ::read(m_inpipe, &c, 1)) break;
958 }
959 }
960 } while (0 > err && EINTR == errno);
961 // ignore all other poll errors
962 }
963 // close read end
964 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
965 if (!force) throw Exception("close", errno);
966 m_inpipe = -1;
967 // unmap memory
968 try {
969 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
970 if (!--s_pagepoolrefcnt) {
971 delete s_pagepool;
972 s_pagepool = nullptr;
973 }
974 } catch (std::exception&) {
975 if (!force) throw;
976 }
977 m_busylist = m_freelist = m_dirtylist = nullptr;
978 // wait for child process
979 int retVal = 0;
980 if (isParent()) {
981 int tmp;
982 do {
983 tmp = waitpid(m_childPid, &retVal, 0);
984 } while (-1 == tmp && EINTR == errno);
985 if (-1 == tmp)
986 if (!force) throw Exception("waitpid", errno);
987 m_childPid = 0;
988 }
989 // remove from list of open pipes
990 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
991 std::list<BidirMMapPipe*>::iterator it = std::find(
992 s_openpipes.begin(), s_openpipes.end(), this);
993 if (s_openpipes.end() != it) s_openpipes.erase(it);
994 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
995 m_flags |= failbit;
996 return retVal;
997}
998
999BidirMMapPipe::~BidirMMapPipe()
1000{ doClose(false); }
1001
1002BidirMMapPipe::size_type BidirMMapPipe::xferraw(
1003 int fd, void* addr, size_type len,
1004 ssize_t (*xferfn)(int, void*, std::size_t))
1005{
1006 size_type xferred = 0;
1007 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1008 while (len) {
1009 ssize_t tmp = xferfn(fd, buf, len);
1010 if (tmp > 0) {
1011 xferred += tmp;
1012 len -= tmp;
1013 buf += tmp;
1014 continue;
1015 } else if (0 == tmp) {
1016 // check for end-of-file on pipe
1017 break;
1018 } else if (-1 == tmp) {
1019 // ok some error occurred, so figure out if we want to retry of throw
1020 switch (errno) {
1021 default:
1022 // if anything was transferred, return number of bytes
1023 // transferred so far, we can start throwing on the next
1024 // transfer...
1025 if (xferred) return xferred;
1026 // else throw
1027 throw Exception("xferraw", errno);
1028 case EAGAIN: // fallthrough intended
1029#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1030 case EWOULDBLOCK: // fallthrough intended
1031#endif
1032 std::cerr << " ERROR: In " << __func__ << " (" <<
1033 __FILE__ << ", line " << __LINE__ <<
1034 "): expect transfer to block!" << std::endl;
1035 case EINTR:
1036 break;
1037 }
1038 continue;
1039 } else {
1040 throw Exception("xferraw: unexpected return value from read/write",
1041 errno);
1042 }
1043 }
1044 return xferred;
1045}
1046
1047void BidirMMapPipe::sendpages(Page* plist)
1048{
1049 if (plist) {
1050 unsigned char pg = m_pages[plist];
1051 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1052 if (BidirMMapPipe_impl::PageChunk::Copy ==
1053 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1054 // ok, have to copy pages through pipe
1055 for (Page* p = plist; p; p = p->next()) {
1056 if (sizeof(Page) + p->size() !=
1057 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1058 ::write)) {
1059 throw Exception("sendpages: short write", EPIPE);
1060 }
1061 }
1062 }
1063 } else {
1064 throw Exception("sendpages: short write", EPIPE);
1065 }
1066 } else { assert(plist); }
1067}
1068
1069unsigned BidirMMapPipe::recvpages()
1070{
1071 unsigned char pg;
1072 unsigned retVal = 0;
1073 Page *plisthead = nullptr;
1074 Page *plisttail = nullptr;
1075 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1076 plisthead = plisttail = m_pages[pg];
1077 // ok, have number of pages
1078 if (BidirMMapPipe_impl::PageChunk::Copy ==
1079 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1080 // ok, need to copy pages through pipe
1081 for (; plisttail; ++retVal) {
1082 Page* p = plisttail;
1083 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1084 ::read)) {
1085 plisttail = p->next();
1086 if (p->empty()) continue;
1087 // break in case of read error
1088 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1089 ::read)) break;
1090 }
1091 }
1092 } else {
1093 retVal = lenPageList(plisthead);
1094 }
1095 }
1096 // put list of pages we just received into correct lists (busy/free)
1097 if (plisthead) feedPageLists(plisthead);
1098 // ok, retVal contains the number of pages read, so put them on the
1099 // correct lists
1100 return retVal;
1101}
1102
1103unsigned BidirMMapPipe::recvpages_nonblock()
1104{
1105 struct pollfd fds;
1106 fds.fd = m_inpipe;
1107 fds.events = POLLIN;
1108 fds.revents = 0;
1109 unsigned retVal = 0;
1110 do {
1111 int rc = ::poll(&fds, 1, 0);
1112 if (0 > rc) {
1113 if (EINTR == errno) continue;
1114 break;
1115 }
1116 if (1 == retVal && fds.revents & POLLIN &&
1117 !(fds.revents & (POLLNVAL | POLLERR))) {
1118 // ok, we can read without blocking, so the other end has
1119 // something for us
1120 return recvpages();
1121 } else {
1122 break;
1123 }
1124 } while (true);
1125 return retVal;
1126}
1127
1128unsigned BidirMMapPipe::lenPageList(const Page* p)
1129{
1130 unsigned n = 0;
1131 for ( ; p; p = p->next()) ++n;
1132 return n;
1133}
1134
1135void BidirMMapPipe::feedPageLists(Page* plist)
1136{
1137 assert(plist);
1138 // get end of busy list
1139 Page *blend = m_busylist;
1140 while (blend && blend->next()) blend = blend->next();
1141 // ok, might have to send free pages to other end, and (if we do have to
1142 // send something to the other end) while we're at it, send any dirty
1143 // pages which are completely full, too
1144 Page *sendlisthead = nullptr;
1145 Page *sendlisttail = nullptr;
1146 // loop over plist
1147 while (plist) {
1148 Page* p = plist;
1149 plist = p->next();
1150 p->setNext(nullptr);
1151 if (!p->empty()) {
1152 // busy page...
1153 p->pos() = 0;
1154 // put at end of busy list
1155 if (blend) blend->setNext(p);
1156 else m_busylist = p;
1157 blend = p;
1158 } else {
1159 // free page...
1160 // Very simple algorithm: once we're done with a page, we send it back
1161 // where it came from. If it's from our end, we put it on the free list, if
1162 // it's from the other end, we send it back.
1163 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1164 (isChild() && m_pages[p] < PagesPerEnd)) {
1165 // page "belongs" to other end
1166 if (!sendlisthead) sendlisthead = p;
1167 if (sendlisttail) sendlisttail->setNext(p);
1168 sendlisttail = p;
1169 } else {
1170 // add page to freelist
1171 p->setNext(m_freelist);
1172 m_freelist = p;
1173 }
1174 }
1175 }
1176 // check if we have to send stuff to the other end
1177 if (sendlisthead) {
1178 // go through our list of dirty pages, and see what we can
1179 // send along
1180 Page* dp;
1181 while ((dp = m_dirtylist) && dp->full()) {
1182 Page* p = dp;
1183 // move head of dirty list
1184 m_dirtylist = p->next();
1185 // queue for sending
1186 p->setNext(nullptr);
1187 sendlisttail->setNext(p);
1188 sendlisttail = p;
1189 }
1190 // poll if the other end is still alive - this needs that we first
1191 // close the write pipe of the other end when the remote end of the
1192 // connection is shutting down in doClose; we'll see that because we
1193 // get a POLLHUP on our inpipe
1194 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1195 struct pollfd fds[2];
1196 fds[0].fd = m_outpipe;
1197 fds[0].events = fds[0].revents = 0;
1198 if (m_outpipe != m_inpipe) {
1199 fds[1].fd = m_inpipe;
1200 fds[1].events = fds[1].revents = 0;
1201 } else {
1202 fds[0].events |= POLLIN;
1203 }
1204 int retVal = 0;
1205 do {
1206 retVal = ::poll(fds, nfds, 0);
1207 if (0 > retVal && EINTR == errno)
1208 continue;
1209 break;
1210 } while (true);
1211 if (0 <= retVal) {
1212 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1213 if (m_outpipe != m_inpipe) {
1214 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1215 } else {
1216 if (ok && fds[0].revents & POLLIN) {
1217 unsigned ret = recvpages();
1218 if (!ret) ok = false;
1219 }
1220 }
1221
1222 if (ok) sendpages(sendlisthead);
1223 // (if the pipe is dead already, we don't care that we leak the
1224 // contents of the pages on the send list here, so that is why
1225 // there's no else clause here)
1226 } else {
1227 throw Exception("feedPageLists: poll", errno);
1228 }
1229 }
1230}
1231
1232void BidirMMapPipe::markPageDirty(Page* p)
1233{
1234 assert(p);
1235 assert(p == m_freelist);
1236 // remove from freelist
1237 m_freelist = p->next();
1238 p->setNext(nullptr);
1239 // append to dirty list
1240 Page* dl = m_dirtylist;
1241 while (dl && dl->next()) dl = dl->next();
1242 if (dl) dl->setNext(p);
1243 else m_dirtylist = p;
1244}
1245
1246BidirMMapPipe::Page* BidirMMapPipe::busypage()
1247{
1248 // queue any pages available for reading we can without blocking
1249 recvpages_nonblock();
1250 Page* p;
1251 // if there are no busy pages, try to get them from the other end,
1252 // block if we have to...
1253 while (!(p = m_busylist)) if (!recvpages()) return nullptr;
1254 return p;
1255}
1256
1257BidirMMapPipe::Page* BidirMMapPipe::dirtypage()
1258{
1259 // queue any pages available for reading we can without blocking
1260 recvpages_nonblock();
1261 Page* p = m_dirtylist;
1262 // go to end of dirty list
1263 if (p) while (p->next()) p = p->next();
1264 if (!p || p->full()) {
1265 // need to append free page, so get one
1266 while (!(p = m_freelist)) if (!recvpages()) return nullptr;
1267 markPageDirty(p);
1268 }
1269 return p;
1270}
1271
1272void BidirMMapPipe::flush()
1273{ return doFlush(true); }
1274
1275void BidirMMapPipe::doFlush(bool forcePartialPages)
1276{
1277 assert(!(m_flags & failbit));
1278 // build a list of pages to flush
1279 Page *flushlisthead = nullptr;
1280 Page *flushlisttail = nullptr;
1281 while (m_dirtylist) {
1282 Page* p = m_dirtylist;
1283 if (!forcePartialPages && !p->full()) break;
1284 // remove dirty page from dirty list
1285 m_dirtylist = p->next();
1286 p->setNext(nullptr);
1287 // and send it to other end
1288 if (!flushlisthead) flushlisthead = p;
1289 if (flushlisttail) flushlisttail->setNext(p);
1290 flushlisttail = p;
1291 }
1292 if (flushlisthead) sendpages(flushlisthead);
1293}
1294
1295void BidirMMapPipe::purge()
1296{
1297 assert(!(m_flags & failbit));
1298 // join busy and dirty lists
1299 {
1300 Page *l = m_busylist;
1301 while (l && l->next()) l = l->next();
1302 if (l) l->setNext(m_dirtylist);
1303 else m_busylist = m_dirtylist;
1304 }
1305 // empty busy and dirty pages
1306 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1307 // put them on the free list
1308 if (m_busylist) feedPageLists(m_busylist);
1309 m_busylist = m_dirtylist = nullptr;
1310}
1311
1312BidirMMapPipe::size_type BidirMMapPipe::bytesReadableNonBlocking()
1313{
1314 // queue all pages waiting for consumption in the pipe before we give an
1315 // answer
1316 recvpages_nonblock();
1317 size_type retVal = 0;
1318 for (Page* p = m_busylist; p; p = p->next())
1319 retVal += p->size() - p->pos();
1320 return retVal;
1321}
1322
1323BidirMMapPipe::size_type BidirMMapPipe::bytesWritableNonBlocking()
1324{
1325 // queue all pages waiting for consumption in the pipe before we give an
1326 // answer
1327 recvpages_nonblock();
1328 // check if we could write to the pipe without blocking (we need to know
1329 // because we might need to check if flushing of dirty pages would block)
1330 bool couldwrite = false;
1331 {
1332 struct pollfd fds;
1333 fds.fd = m_outpipe;
1334 fds.events = POLLOUT;
1335 fds.revents = 0;
1336 int retVal = 0;
1337 do {
1338 retVal = ::poll(&fds, 1, 0);
1339 if (0 > retVal) {
1340 if (EINTR == errno) continue;
1341 throw Exception("bytesWritableNonBlocking: poll", errno);
1342 }
1343 if (1 == retVal && fds.revents & POLLOUT &&
1344 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1345 couldwrite = true;
1346 break;
1347 } while (true);
1348 }
1349 // ok, start counting bytes
1350 size_type retVal = 0;
1351 unsigned npages = 0;
1352 // go through the dirty list
1353 for (Page* p = m_dirtylist; p; p = p->next()) {
1354 ++npages;
1355 // if page only partially filled
1356 if (!p->full())
1357 retVal += p->free();
1358 if (npages >= FlushThresh && !couldwrite) break;
1359 }
1360 // go through the free list
1361 for (Page* p = m_freelist; p && (!m_dirtylist ||
1362 npages < FlushThresh || couldwrite); p = p->next()) {
1363 ++npages;
1364 retVal += Page::capacity();
1365 }
1366 return retVal;
1367}
1368
1369BidirMMapPipe::size_type BidirMMapPipe::read(void* addr, size_type sz)
1370{
1371 assert(!(m_flags & failbit));
1372 size_type nread = 0;
1373 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1374 try {
1375 while (sz) {
1376 // find next page to read from
1377 Page* p = busypage();
1378 if (!p) {
1379 m_flags |= eofbit;
1380 return nread;
1381 }
1382 unsigned char* pp = p->begin() + p->pos();
1383 size_type csz = std::min(size_type(p->remaining()), sz);
1384 std::copy(pp, pp + csz, ap);
1385 nread += csz;
1386 ap += csz;
1387 sz -= csz;
1388 p->pos() += csz;
1389 assert(p->size() >= p->pos());
1390 if (p->size() == p->pos()) {
1391 // if no unread data remains, page is free
1392 m_busylist = p->next();
1393 p->setNext(nullptr);
1394 p->size() = 0;
1395 feedPageLists(p);
1396 }
1397 }
1398 } catch (Exception&) {
1399 m_flags |= rderrbit;
1400 if (m_flags & exceptionsbit) throw;
1401 }
1402 return nread;
1403}
1404
1405BidirMMapPipe::size_type BidirMMapPipe::write(const void* addr, size_type sz)
1406{
1407 assert(!(m_flags & failbit));
1408 size_type written = 0;
1409 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1410 try {
1411 while (sz) {
1412 // find next page to write to
1413 Page* p = dirtypage();
1414 if (!p) {
1415 m_flags |= eofbit;
1416 return written;
1417 }
1418 unsigned char* pp = p->begin() + p->size();
1419 size_type csz = std::min(size_type(p->free()), sz);
1420 std::copy(ap, ap + csz, pp);
1421 written += csz;
1422 ap += csz;
1423 p->size() += csz;
1424 sz -= csz;
1425 assert(p->capacity() >= p->size());
1426 if (p->full()) {
1427 // if page is full, see if we're above the flush threshold of
1428 // 3/4 of our pages
1429 if (lenPageList(m_dirtylist) >= FlushThresh)
1430 doFlush(false);
1431 }
1432 }
1433 } catch (Exception&) {
1434 m_flags |= wrerrbit;
1435 if (m_flags & exceptionsbit) throw;
1436 }
1437 return written;
1438}
1439
1440int BidirMMapPipe::poll(BidirMMapPipe::PollVector& pipes, int timeout)
1441{
1442 // go through pipes, and change flags where we already know without really
1443 // polling - stuff where we don't need poll to wait for its timeout in the
1444 // OS...
1445 bool canskiptimeout = false;
1446 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1447 std::vector<unsigned>::iterator mit = masks.begin();
1448 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1449 ++it, ++mit) {
1450 PollEntry& pe = *it;
1451 pe.revents = None;
1452 // null pipe is invalid
1453 if (!pe.pipe) {
1454 pe.revents |= Invalid;
1455 canskiptimeout = true;
1456 continue;
1457 }
1458 // closed pipe is invalid
1459 if (pe.pipe->closed()) pe.revents |= Invalid;
1460 // check for error
1461 if (pe.pipe->bad()) pe.revents |= Error;
1462 // check for end of file
1463 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1464 // check if readable
1465 if (pe.events & Readable) {
1466 *mit |= Readable;
1467 if (pe.pipe->m_busylist) pe.revents |= Readable;
1468 }
1469 // check if writable
1470 if (pe.events & Writable) {
1471 *mit |= Writable;
1472 if (pe.pipe->m_freelist) {
1473 pe.revents |= Writable;
1474 } else {
1475 Page *dl = pe.pipe->m_dirtylist;
1476 while (dl && dl->next()) dl = dl->next();
1477 if (dl && dl->pos() < Page::capacity())
1478 pe.revents |= Writable;
1479 }
1480 }
1481 if (pe.revents) canskiptimeout = true;
1482 }
1483 // set up the data structures required for the poll syscall
1484 std::vector<pollfd> fds;
1485 fds.reserve(2 * pipes.size());
1486 std::map<int, PollEntry*> fds2pipes;
1487 for (PollVector::const_iterator it = pipes.begin();
1488 pipes.end() != it; ++it) {
1489 const PollEntry& pe = *it;
1490 struct pollfd tmp;
1491 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1492 const_cast<PollEntry*>(&pe)));
1493 tmp.events = tmp.revents = 0;
1494 // we always poll for readability; this allows us to queue pages
1495 // early
1496 tmp.events |= POLLIN;
1497 if (pe.pipe->m_outpipe != tmp.fd) {
1498 // ok, it's a pair of pipes
1499 fds.push_back(tmp);
1500 fds2pipes.insert(std::make_pair(
1501 unsigned(tmp.fd = pe.pipe->m_outpipe),
1502 const_cast<PollEntry*>(&pe)));
1503 tmp.events = 0;
1504
1505 }
1506 if (pe.events & Writable) tmp.events |= POLLOUT;
1507 fds.push_back(tmp);
1508 }
1509 // poll
1510 int retVal = 0;
1511 do {
1512 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1513 if (0 > retVal) {
1514 if (EINTR == errno) continue;
1515 throw Exception("poll", errno);
1516 }
1517 break;
1518 } while (true);
1519 // fds may have changed state, so update...
1520 for (std::vector<pollfd>::iterator it = fds.begin();
1521 fds.end() != it; ++it) {
1522 pollfd& fe = *it;
1523 //if (!fe.revents) continue;
1524 --retVal;
1525 PollEntry& pe = *fds2pipes[fe.fd];
1526oncemore:
1527 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1528 pe.revents |= ReadInvalid;
1529 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1530 pe.revents |= WriteInvalid;
1531 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1532 pe.revents |= ReadError;
1533 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1534 pe.revents |= WriteError;
1535 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1536 pe.revents |= ReadEndOfFile;
1537 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1538 pe.revents |= WriteEndOfFile;
1539 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1540 !(fe.revents & (POLLNVAL | POLLERR))) {
1541 // ok, there is at least one page for us to receive from the
1542 // other end
1543 if (0 == pe.pipe->recvpages()) continue;
1544 // more pages there?
1545 do {
1546 int tmp = ::poll(&fe, 1, 0);
1547 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1548 if (0 > tmp) {
1549 if (EINTR == errno) continue;
1550 throw Exception("poll", errno);
1551 }
1552 break;
1553 } while (true);
1554 }
1555 if (pe.pipe->m_busylist) pe.revents |= Readable;
1556 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1557 if (pe.pipe->m_freelist) {
1558 pe.revents |= Writable;
1559 } else {
1560 Page *dl = pe.pipe->m_dirtylist;
1561 while (dl && dl->next()) dl = dl->next();
1562 if (dl && dl->pos() < Page::capacity())
1563 pe.revents |= Writable;
1564 }
1565 }
1566 }
1567 // apply correct masks, and count pipes with pending events
1568 int npipes = 0;
1569 mit = masks.begin();
1570 for (PollVector::iterator it = pipes.begin();
1571 pipes.end() != it; ++it, ++mit)
1572 if ((it->revents &= *mit)) ++npipes;
1573 return npipes;
1574}
1575
1576BidirMMapPipe& BidirMMapPipe::operator<<(const char* str)
1577{
1578 size_t sz = std::strlen(str);
1579 *this << sz;
1580 if (sz) write(str, sz);
1581 return *this;
1582}
1583
1584BidirMMapPipe& BidirMMapPipe::operator>>(char* (&str))
1585{
1586 size_t sz = 0;
1587 *this >> sz;
1588 if (good() && !eof()) {
1589 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1590 if (!str) throw Exception("realloc", errno);
1591 if (sz) read(str, sz);
1592 str[sz] = 0;
1593 }
1594 return *this;
1595}
1596
1597BidirMMapPipe& BidirMMapPipe::operator<<(const std::string& str)
1598{
1599 size_t sz = str.size();
1600 *this << sz;
1601 write(str.data(), sz);
1602 return *this;
1603}
1604
1605BidirMMapPipe& BidirMMapPipe::operator>>(std::string& str)
1606{
1607 str.clear();
1608 size_t sz = 0;
1609 *this >> sz;
1610 if (good() && !eof()) {
1611 str.reserve(sz);
1612 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1613 }
1614 return *this;
1615}
1616
1617END_NAMESPACE_ROOFIT
1618
1619#ifdef TEST_BIDIRMMAPPIPE
1620using namespace RooFit;
1621
1622int simplechild(BidirMMapPipe& pipe)
1623{
1624 // child does an echo loop
1625 while (pipe.good() && !pipe.eof()) {
1626 // read a string
1627 std::string str;
1628 pipe >> str;
1629 if (!pipe) return -1;
1630 if (pipe.eof()) break;
1631 if (!str.empty()) {
1632 std::cout << "[CHILD] : read: " << str << std::endl;
1633 str = "... early in the morning?";
1634 }
1635 pipe << str << BidirMMapPipe::flush;
1636 // did our parent tell us to shut down?
1637 if (str.empty()) break;
1638 if (!pipe) return -1;
1639 if (pipe.eof()) break;
1640 std::cout << "[CHILD] : wrote: " << str << std::endl;
1641 }
1642 pipe.close();
1643 return 0;
1644}
1645
1646#include <sstream>
1647int randomchild(BidirMMapPipe& pipe)
1648{
1649 // child sends out something at random intervals
1650 ::srand48(::getpid());
1651 {
1652 // wait for parent's go ahead signal
1653 std::string s;
1654 pipe >> s;
1655 }
1656 // no shutdown sequence needed on this side - we're producing the data,
1657 // and the parent can just read until we're done (when it'll get EOF)
1658 for (int i = 0; i < 5; ++i) {
1659 // sleep a random time between 0 and .9 seconds
1660 ::usleep(int(1e6 * ::drand48()));
1661 std::ostringstream buf;
1662 buf << "child pid " << ::getpid() << " sends message " << i;
1663 std::string str = buf.str();
1664 std::cout << "[CHILD] : " << str << std::endl;
1665 pipe << str << BidirMMapPipe::flush;
1666 if (!pipe) return -1;
1667 if (pipe.eof()) break;
1668 }
1669 // tell parent we're shutting down
1670 pipe << "" << BidirMMapPipe::flush;
1671 // wait for parent to acknowledge
1672 std::string s;
1673 pipe >> s;
1674 pipe.close();
1675 return 0;
1676}
1677
1678int benchchildrtt(BidirMMapPipe& pipe)
1679{
1680 // child does the equivalent of listening for pings and sending the
1681 // packet back
1682 char* str = 0;
1683 while (pipe && !pipe.eof()) {
1684 pipe >> str;
1685 if (!pipe) {
1686 std::free(str);
1687 pipe.close();
1688 return -1;
1689 }
1690 if (pipe.eof()) break;
1691 pipe << str << BidirMMapPipe::flush;
1692 // if we have just completed the shutdown handshake, we break here
1693 if (!std::strlen(str)) break;
1694 }
1695 std::free(str);
1696 pipe.close();
1697 return 0;
1698}
1699
1700int benchchildsink(BidirMMapPipe& pipe)
1701{
1702 // child behaves like a sink
1703 char* str = 0;
1704 while (pipe && !pipe.eof()) {
1705 pipe >> str;
1706 if (!std::strlen(str)) break;
1707 }
1708 pipe << "" << BidirMMapPipe::flush;
1709 std::free(str);
1710 pipe.close();
1711 return 0;
1712}
1713
1714int benchchildsource(BidirMMapPipe& pipe)
1715{
1716 // child behaves like a source
1717 char* str = 0;
1718 for (unsigned i = 0; i <= 24; ++i) {
1719 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1720 std::memset(str, '4', 1 << i);
1721 str[1 << i] = 0;
1722 for (unsigned j = 0; j < 1 << 7; ++j) {
1723 pipe << str;
1724 if (!pipe || pipe.eof()) {
1725 std::free(str);
1726 pipe.close();
1727 return -1;
1728 }
1729 }
1730 // tell parent we're done with this block size
1731 pipe << "" << BidirMMapPipe::flush;
1732 }
1733 // tell parent to shut down
1734 pipe << "" << BidirMMapPipe::flush;
1735 std::free(str);
1736 pipe.close();
1737 return 0;
1738}
1739
1740BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1741{
1742 // create a pipe with the given child at the remote end
1743 BidirMMapPipe *p = new BidirMMapPipe();
1744 if (p->isChild()) {
1745 int retVal = childexec(*p);
1746 delete p;
1747 std::exit(retVal);
1748 }
1749 return p;
1750}
1751
1752#include <sys/time.h>
1753#include <iomanip>
1754int main()
1755{
1756 // simple echo loop test
1757 {
1758 std::cout << "[PARENT]: simple challenge-response test, "
1759 "one child:" << std::endl;
1760 BidirMMapPipe* pipe = spawnChild(simplechild);
1761 for (int i = 0; i < 5; ++i) {
1762 std::string str("What shall we do with a drunken sailor...");
1763 *pipe << str << BidirMMapPipe::flush;
1764 if (!*pipe) return -1;
1765 std::cout << "[PARENT]: wrote: " << str << std::endl;
1766 *pipe >> str;
1767 if (!*pipe) return -1;
1768 std::cout << "[PARENT]: read: " << str << std::endl;
1769 }
1770 // send shutdown string
1771 *pipe << "" << BidirMMapPipe::flush;
1772 // wait for shutdown handshake
1773 std::string s;
1774 *pipe >> s;
1775 int retVal = pipe->close();
1776 std::cout << "[PARENT]: exit status of child: " << retVal <<
1777 std::endl;
1778 if (retVal) return retVal;
1779 delete pipe;
1780 }
1781 // simple poll test - children send 5 results in random intervals
1782 {
1783 unsigned nch = 20;
1784 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1785 " children:" << std::endl;
1786 typedef BidirMMapPipe::PollEntry PollEntry;
1787 // poll data structure
1788 BidirMMapPipe::PollVector pipes;
1789 pipes.reserve(nch);
1790 // spawn children
1791 for (unsigned i = 0; i < nch; ++i) {
1792 std::cout << "[PARENT]: spawning child " << i << std::endl;
1793 pipes.push_back(PollEntry(spawnChild(randomchild),
1794 BidirMMapPipe::Readable));
1795 }
1796 // wake children up
1797 std::cout << "[PARENT]: waking up children" << std::endl;
1798 for (unsigned i = 0; i < nch; ++i)
1799 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1800 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1801 // while at least some children alive
1802 while (!pipes.empty()) {
1803 // poll, wait until status change (infinite timeout)
1804 int npipes = BidirMMapPipe::poll(pipes, -1);
1805 // scan for pipes with changed status
1806 for (std::vector<PollEntry>::iterator it = pipes.begin();
1807 npipes && pipes.end() != it; ) {
1808 if (!it->revents) {
1809 // unchanged, next one
1810 ++it;
1811 continue;
1812 }
1813 --npipes; // maybe we can stop early...
1814 // read from pipes which are readable
1815 if (it->revents & BidirMMapPipe::Readable) {
1816 std::string s;
1817 *(it->pipe) >> s;
1818 if (!s.empty()) {
1819 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1820 ": " << s << std::endl;
1821 ++it;
1822 continue;
1823 } else {
1824 // child is shutting down...
1825 *(it->pipe) << "" << BidirMMapPipe::flush;
1826 goto childcloses;
1827 }
1828 }
1829 // retire pipes with error or end-of-file condition
1830 if (it->revents & (BidirMMapPipe::Error |
1831 BidirMMapPipe::EndOfFile |
1832 BidirMMapPipe::Invalid)) {
1833 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1834 " revents" <<
1835 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1836 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1837 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1838 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1839 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1840 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1841 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1842 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1843 std::endl;
1844childcloses:
1845 int retVal = it->pipe->close();
1846 std::cout << "[PARENT]: child exit status: " <<
1847 retVal << ", number of children still alive: " <<
1848 (pipes.size() - 1) << std::endl;
1849 if (retVal) return retVal;
1850 delete it->pipe;
1851 it = pipes.erase(it);
1852 continue;
1853 }
1854 }
1855 }
1856 }
1857 // little benchmark - round trip time
1858 {
1859 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1860 for (unsigned i = 0; i <= 24; ++i) {
1861 std::vector<char> s(1 + (1 << i));
1862 std::memset(s, 'A', 1 << i);
1863 s[1 << i] = 0;
1864 const unsigned n = 1 << 7;
1865 double avg = 0., min = 1e42, max = -1e42;
1866 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1867 for (unsigned j = n; j--; ) {
1868 struct timeval t1;
1869 ::gettimeofday(&t1, 0);
1870 *pipe << s << BidirMMapPipe::flush;
1871 if (!*pipe || pipe->eof()) break;
1872 *pipe >> s;
1873 if (!*pipe || pipe->eof()) break;
1874 struct timeval t2;
1875 ::gettimeofday(&t2, 0);
1876 t2.tv_sec -= t1.tv_sec;
1877 t2.tv_usec -= t1.tv_usec;
1878 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1879 if (dt < min) min = dt;
1880 if (dt > max) max = dt;
1881 avg += dt;
1882 }
1883 // send a shutdown string
1884 *pipe << "" << BidirMMapPipe::flush;
1885 // get child's shutdown ok
1886 *pipe >> s;
1887 avg /= double(n);
1888 avg *= 1e6; min *= 1e6; max *= 1e6;
1889 int retVal = pipe->close();
1890 if (retVal) {
1891 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1892 return retVal;
1893 }
1894 delete pipe;
1895 // there is a factor 2 in the formula for the transfer rate below,
1896 // because we transfer data of twice the size of the block - once
1897 // to the child, and once for the return trip
1898 std::cout << "block size " << std::setw(9) << (1 << i) <<
1899 " avg " << std::setw(7) << avg << " us min " <<
1900 std::setw(7) << min << " us max " << std::setw(7) << max <<
1901 "us speed " << std::setw(9) <<
1902 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1903 " MB/s" << std::endl;
1904 }
1905 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1906 }
1907 // little benchmark - child as sink
1908 {
1909 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1910 for (unsigned i = 0; i <= 24; ++i) {
1911 std::vector<char> s(1 + (1 << i));
1912 std::memset(s, 'A', 1 << i);
1913 s[1 << i] = 0;
1914 const unsigned n = 1 << 7;
1915 double avg = 0., min = 1e42, max = -1e42;
1916 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1917 for (unsigned j = n; j--; ) {
1918 struct timeval t1;
1919 ::gettimeofday(&t1, 0);
1920 // streaming mode - we do not flush here
1921 *pipe << s;
1922 if (!*pipe || pipe->eof()) break;
1923 struct timeval t2;
1924 ::gettimeofday(&t2, 0);
1925 t2.tv_sec -= t1.tv_sec;
1926 t2.tv_usec -= t1.tv_usec;
1927 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1928 if (dt < min) min = dt;
1929 if (dt > max) max = dt;
1930 avg += dt;
1931 }
1932 // send a shutdown string
1933 *pipe << "" << BidirMMapPipe::flush;
1934 // get child's shutdown ok
1935 *pipe >> s;
1936 avg /= double(n);
1937 avg *= 1e6; min *= 1e6; max *= 1e6;
1938 int retVal = pipe->close();
1939 if (retVal) {
1940 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1941 return retVal;
1942 }
1943 delete pipe;
1944 std::cout << "block size " << std::setw(9) << (1 << i) <<
1945 " avg " << std::setw(7) << avg << " us min " <<
1946 std::setw(7) << min << " us max " << std::setw(7) << max <<
1947 "us speed " << std::setw(9) <<
1948 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1949 " MB/s" << std::endl;
1950 }
1951 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1952 }
1953 // little benchmark - child as source
1954 {
1955 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1956 char *s = 0;
1957 double avg = 0., min = 1e42, max = -1e42;
1958 unsigned n = 0, bsz = 0;
1959 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1960 while (*pipe && !pipe->eof()) {
1961 struct timeval t1;
1962 ::gettimeofday(&t1, 0);
1963 // streaming mode - we do not flush here
1964 *pipe >> s;
1965 if (!*pipe || pipe->eof()) break;
1966 struct timeval t2;
1967 ::gettimeofday(&t2, 0);
1968 t2.tv_sec -= t1.tv_sec;
1969 t2.tv_usec -= t1.tv_usec;
1970 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1971 if (std::strlen(s)) {
1972 ++n;
1973 if (dt < min) min = dt;
1974 if (dt > max) max = dt;
1975 avg += dt;
1976 bsz = std::strlen(s);
1977 } else {
1978 if (!n) break;
1979 // next block size
1980 avg /= double(n);
1981 avg *= 1e6; min *= 1e6; max *= 1e6;
1982
1983 std::cout << "block size " << std::setw(9) << bsz <<
1984 " avg " << std::setw(7) << avg << " us min " <<
1985 std::setw(7) << min << " us max " << std::setw(7) <<
1986 max << "us speed " << std::setw(9) <<
1987 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1988 " MB/s" << std::endl;
1989 n = 0;
1990 avg = 0.;
1991 min = 1e42;
1992 max = -1e42;
1993 }
1994 }
1995 int retVal = pipe->close();
1996 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1997 if (retVal) return retVal;
1998 delete pipe;
1999 std::free(s);
2000 }
2001 return 0;
2002}
2003#endif // TEST_BIDIRMMAPPIPE
2004#endif // _WIN32
2005
2006// vim: ft=cpp:sw=4:tw=78:et
2007
2008/// \endcond
ROOT::R::TRInterface & Exception()
Definition Exception.C:4
int main()
Definition Prototype.cxx:12
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define e(i)
Definition RSha256.hxx:103
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
Binding & operator=(OUT(*fun)(void))
R__EXTERN TSystem * gSystem
Definition TSystem.h:561
#define free
Definition civetweb.c:1539
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition TSystem.cxx:1482
const Int_t n
Definition legend1.C:16
void(off) SmallVectorTemplateBase< T
void Copy(void *source, void *dest)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
__device__ AFloat max(AFloat x, AFloat y)
Definition Kernels.cuh:207
static const char * what
Definition stlLoader.cc:5
TLine l
Definition textangle.C:4
auto * t1
Definition textangle.C:20
static unsigned long masks[]
Definition gifencode.c:211