Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
BidirMMapPipe.cxx
Go to the documentation of this file.
1/// \cond ROOFIT_INTERNAL
2
3/** @file BidirMMapPipe.cxx
4 *
5 * implementation of BidirMMapPipe, a class which forks off a child process
6 * and serves as communications channel between parent and child
7 *
8 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
9 * @date 2013-07-07
10 */
11
12#ifndef _WIN32
13
14#include "BidirMMapPipe.h"
15
16#include <TSystem.h>
17
18#include <map>
19#include <cerrno>
20#include <limits>
21#include <cstdlib>
22#include <cstring>
23#include <cassert>
24#include <iostream>
25#include <algorithm>
26#include <exception>
27
28#include <poll.h>
29#include <fcntl.h>
30#include <csignal>
31#include <unistd.h>
32#include <sys/mman.h>
33#include <sys/stat.h>
34#include <sys/wait.h>
35#include <sys/socket.h>
36
37#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
38#define END_NAMESPACE_ROOFIT }
39
40BEGIN_NAMESPACE_ROOFIT
41
42/// namespace for implementation details of BidirMMapPipe
43namespace BidirMMapPipe_impl {
44 /** @brief exception to throw if low-level OS calls go wrong
45 *
46 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
47 * @date 2013-07-07
48 */
49 class BidirMMapPipeException : public std::exception
50 {
51 private:
52 enum {
53 s_sz = 256 ///< length of buffer
54 };
55 char m_buf[s_sz]; ///< buffer containing the error message
56
57 /// for the POSIX version of strerror_r
58 static int dostrerror_r(int err, char* buf, std::size_t sz,
59 int (*f)(int, char*, std::size_t))
60 { return f(err, buf, sz); }
61 /// for the GNU version of strerror_r
62 static int dostrerror_r(int, char*, std::size_t,
63 char* (*f)(int, char*, std::size_t));
64 public:
65 /// constructor taking error code, hint on operation (msg)
66 BidirMMapPipeException(const char* msg, int err);
67 /// return a destcription of what went wrong
68 const char* what() const noexcept override { return m_buf; }
69 };
70
71 BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
72 {
73 std::size_t msgsz = std::strlen(msg);
74 if (msgsz) {
75 msgsz = std::min(msgsz, std::size_t(s_sz));
76 std::copy(msg, msg + msgsz, m_buf);
77 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
78 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
79 }
80 if (msgsz < s_sz) {
81 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
82 // have to sort it out with overloads
83 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
84 }
85 m_buf[s_sz - 1] = 0; // enforce zero-termination
86 }
87
88 int BidirMMapPipeException::dostrerror_r(int err, char* buf,
89 std::size_t sz, char* (*f)(int, char*, std::size_t))
90 {
91 buf[0] = 0;
92 char *tmp = f(err, buf, sz);
93 if (tmp && tmp != buf) {
94 std::strncpy(buf, tmp, sz);
95 buf[sz - 1] = 0;
96 if (std::strlen(tmp) > sz - 1) return ERANGE;
97 }
98 return 0;
99 }
100
101 /** @brief class representing the header structure in an mmapped page
102 *
103 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
104 * @date 2013-07-07
105 *
106 * contains a field to put pages into a linked list, a field for the size
107 * of the data being transmitted, and a field for the position until which
108 * the data has been read
109 */
110 class Page
111 {
112 private:
113 // use as small a data type as possible to maximise payload area
114 // of pages
115 short m_next; ///< next page in list (in pagesizes)
116 unsigned short m_size; ///< size of payload (in bytes)
117 unsigned short m_pos; ///< index of next byte in payload area
118 /// copy construction forbidden
119 Page(const Page&) {}
120 /// assignment forbidden
121 Page& operator=(const Page&) = delete;
122 public:
123 /// constructor
124 Page() : m_next(0), m_size(0), m_pos(0)
125 {
126 // check that short is big enough - must be done at runtime
127 // because the page size is not known until runtime
128 assert(std::numeric_limits<unsigned short>::max() >=
129 PageChunk::pagesize());
130 }
131 /// set pointer to next page
132 void setNext(const Page* p);
133 /// return pointer to next page
134 Page* next() const;
135 /// return reference to size field
136 unsigned short& size() { return m_size; }
137 /// return size (of payload data)
138 unsigned size() const { return m_size; }
139 /// return reference to position field
140 unsigned short& pos() { return m_pos; }
141 /// return position
142 unsigned pos() const { return m_pos; }
143 /// return pointer to first byte in payload data area of page
144 inline unsigned char* begin() const
145 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
146 + sizeof(Page); }
147 /// return pointer to first byte in payload data area of page
148 inline unsigned char* end() const
149 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
150 + PageChunk::pagesize(); }
151 /// return the capacity of the page
152 static unsigned capacity()
153 { return PageChunk::pagesize() - sizeof(Page); }
154 /// true if page empty
155 bool empty() const { return !m_size; }
156 /// true if page partially filled
157 bool filled() const { return !empty(); }
158 /// free space left (to be written to)
159 unsigned free() const { return capacity() - m_size; }
160 /// bytes remaining to be read
161 unsigned remaining() const { return m_size - m_pos; }
162 /// true if page completely full
163 bool full() const { return !free(); }
164 };
165
166 void Page::setNext(const Page* p)
167 {
168 if (!p) {
169 m_next = 0;
170 } else {
171 const char* p1 = reinterpret_cast<char*>(this);
172 const char* p2 = reinterpret_cast<const char*>(p);
173 std::ptrdiff_t tmp = p2 - p1;
174 // difference must be divisible by page size
175 assert(!(tmp % PageChunk::pagesize()));
176 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
177 m_next = tmp;
178 // no truncation when saving in a short
179 assert(m_next == tmp);
180 // final check: next() must return p
181 assert(next() == p);
182 }
183 }
184
185 Page* Page::next() const
186 {
187 if (!m_next) return nullptr;
188 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
189 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
190 return reinterpret_cast<Page*>(ptmp);
191 }
192
193 /** @brief class representing a page pool
194 *
195 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
196 * @date 2013-07-24
197 *
198 * pool of mmapped pages (on systems which support it, on all others, the
199 * functionality is emulated with dynamically allocated memory)
200 *
201 * in most operating systems there is a limit to how many mappings any one
202 * process is allowed to request; for this reason, we mmap a relatively
203 * large amount up front, and then carve off little pieces as we need them
204 *
205 * Moreover, some systems have too large a physical page size in their MMU
206 * for the code to handle (we want offsets and lengths to fit into 16
207 * bits), so we carve such big physical pages into smaller logical Pages
208 * if needed. The largest logical page size is currently 16 KiB.
209 */
210 class PagePool {
211 private:
212 /// convenience typedef
213 typedef BidirMMapPipeException Exception;
214
215 enum {
216 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
217 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
218 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
219 };
220 /// a chunk of memory in the pool
221 typedef BidirMMapPipe_impl::PageChunk Chunk;
222 /// list of chunks
223 typedef std::list<Chunk*> ChunkList;
224
225 friend class BidirMMapPipe_impl::PageChunk;
226 public:
227 /// convenience typedef
228 typedef PageChunk::MMapVariety MMapVariety;
229 /// constructor
230 PagePool(unsigned nPagesPerGroup);
231 /// destructor
232 ~PagePool();
233 /// pop a free element out of the pool
234 Pages pop();
235
236 /// return (logical) page size of the system
237 static unsigned pagesize() { return PageChunk::pagesize(); }
238 /// return variety of mmap supported on the system
239 static MMapVariety mmapVariety()
240 { return PageChunk::mmapVariety(); }
241
242 /// return number of pages per group (ie. as returned by pop())
243 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
244
245 /// zap the pool (unmap all but Pages p)
246 void zap(Pages& p);
247
248 private:
249 /// list of chunks used by the pool
250 ChunkList m_chunks;
251 /// list of chunks used by the pool which are not full
252 ChunkList m_freelist;
253 /// chunk size map (histogram of chunk sizes)
254 unsigned m_szmap[(maxsz - minsz) / szincr];
255 /// current chunk size
256 int m_cursz;
257 /// page group size
258 unsigned m_nPgPerGrp;
259
260 /// adjust _cursz to current largest block
261 void updateCurSz(int sz, int incr);
262 /// find size of next chunk to allocate (in a hopefully smart way)
263 int nextChunkSz() const;
264 /// release a chunk
265 void putOnFreeList(Chunk* chunk);
266 /// release a chunk
267 void release(Chunk* chunk);
268 };
269
270 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
271 m_pimpl(new impl)
272 {
273 assert(npg < 256);
274 m_pimpl->m_parent = parent;
275 m_pimpl->m_pages = pages;
276 m_pimpl->m_refcnt = 1;
277 m_pimpl->m_npages = npg;
278 /// initialise pages
279 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
280 }
281
282 unsigned PageChunk::s_physpgsz = PageChunk::getPageSize();
283 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
284 PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
285
286 Pages::~Pages()
287 {
288 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
289 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
290 delete m_pimpl;
291 }
292 }
293
294 Pages::Pages(const Pages& other) :
295 m_pimpl(other.m_pimpl)
296 { ++(m_pimpl->m_refcnt); }
297
298 Pages& Pages::operator=(const Pages& other)
299 {
300 if (&other == this) return *this;
301 if (!--(m_pimpl->m_refcnt)) {
302 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
303 delete m_pimpl;
304 }
305 m_pimpl = other.m_pimpl;
306 ++(m_pimpl->m_refcnt);
307 return *this;
308 }
309
310 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
311
312 Page* Pages::page(unsigned pgno) const
313 {
314 assert(pgno < m_pimpl->m_npages);
315 unsigned char* pptr =
316 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
317 pptr += pgno * pagesize();
318 return reinterpret_cast<Page*>(pptr);
319 }
320
321 unsigned Pages::pageno(Page* p) const
322 {
323 const unsigned char* pptr =
324 reinterpret_cast<const unsigned char*>(p);
325 const unsigned char* bptr =
326 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
327 assert(0 == ((pptr - bptr) % pagesize()));
328 const unsigned nr = (pptr - bptr) / pagesize();
329 assert(nr < m_pimpl->m_npages);
330 return nr;
331 }
332
333 unsigned PageChunk::getPageSize()
334 {
335 // find out page size of system
336 long pgsz = sysconf(_SC_PAGESIZE);
337 if (-1 == pgsz) throw Exception("sysconf", errno);
338 if (pgsz > 512 && pgsz > long(sizeof(Page)))
339 return pgsz;
340
341 // in case of failure or implausible value, use a safe default: 4k
342 // page size, and do not try to mmap
343 s_mmapworks = Copy;
344 return 1 << 12;
345 }
346
347 PageChunk::PageChunk(PagePool* parent,
348 unsigned length, unsigned nPgPerGroup) :
349 m_begin(dommap(length)),
350 m_end(reinterpret_cast<void*>(
351 reinterpret_cast<unsigned char*>(m_begin) + length)),
352 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
353 {
354 // ok, push groups of pages onto freelist here
355 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
356 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
357 while (p < pend) {
358 m_freelist.push_back(reinterpret_cast<void*>(p));
359 p += nPgPerGroup * PagePool::pagesize();
360 }
361 }
362
363 PageChunk::~PageChunk()
364 {
365 if (m_parent) assert(empty());
366 if (m_begin) domunmap(m_begin, len());
367 }
368
369 bool PageChunk::contains(const Pages& p) const
370 { return p.m_pimpl->m_parent == this; }
371
372 Pages PageChunk::pop()
373 {
374 assert(!m_freelist.empty());
375 void* p = m_freelist.front();
376 m_freelist.pop_front();
377 ++m_nUsedGrp;
378 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
379 }
380
381 void PageChunk::push(const Pages& p)
382 {
383 assert(contains(p));
384 bool wasempty = m_freelist.empty();
385 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
386 --m_nUsedGrp;
387 if (m_parent) {
388 // notify parent if we need to be put on the free list again
389 if (wasempty) m_parent->putOnFreeList(this);
390 // notify parent if we're empty
391 if (empty()) return m_parent->release(this);
392 }
393 }
394
395 void* PageChunk::dommap(unsigned len)
396 {
397 assert(len && 0 == (len % s_physpgsz));
398 // ok, the idea here is to try the different methods of mmapping, and
399 // choose the first one that works. we have four flavours:
400 // 1 - anonymous mmap (best)
401 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
402 // bit more tedious to set up, since you need to open/close a
403 // device file)
404 // 3 - mmap of a temporary file (very tedious to set up - need to
405 // create a temporary file, delete it, make the underlying storage
406 // large enough, then mmap the fd and close it)
407 // 4 - if all those fail, we malloc the buffers, and copy the data
408 // through the OS (then we're no better than normal pipes)
409 static bool msgprinted = false;
410 if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
411#if defined(MAP_ANONYMOUS)
412#undef MYANONFLAG
413#define MYANONFLAG MAP_ANONYMOUS
414#elif defined(MAP_ANON)
415#undef MYANONFLAG
416#define MYANONFLAG MAP_ANON
417#else
418#undef MYANONFLAG
419#endif
420#ifdef MYANONFLAG
421 void* retVal = ::mmap(nullptr, len, PROT_READ | PROT_WRITE,
422 MYANONFLAG | MAP_SHARED, -1, 0);
423 if (MAP_FAILED == retVal) {
424 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
425 } else {
426 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
427 s_mmapworks = Anonymous;
428 if (BidirMMapPipe::debugflag() && !msgprinted) {
429 std::cerr << " INFO: In " << __func__ << " (" <<
430 __FILE__ << ", line " << __LINE__ <<
431 "): anonymous mmapping works, excellent!" <<
432 std::endl;
433 msgprinted = true;
434 }
435 return retVal;
436 }
437#endif
438#undef MYANONFLAG
439 }
440 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
441 // ok, no anonymous mappings supported directly, so try to map
442 // /dev/zero which has much the same effect on many systems
443 int fd = ::open("/dev/zero", O_RDWR);
444 if (-1 == fd)
445 throw Exception("open /dev/zero", errno);
446 void* retVal = ::mmap(nullptr, len,
447 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
448 if (MAP_FAILED == retVal) {
449 int errsv = errno;
450 ::close(fd);
451 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
452 } else {
453 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
454 s_mmapworks = DevZero;
455 }
456 if (-1 == ::close(fd))
457 throw Exception("close /dev/zero", errno);
458 if (BidirMMapPipe::debugflag() && !msgprinted) {
459 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
460 ", line " << __LINE__ << "): mmapping /dev/zero works, "
461 "very good!" << std::endl;
462 msgprinted = true;
463 }
464 return retVal;
465 }
466 if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
467 std::string tmpPath = gSystem->TempDirectory();
468 std::string name = tmpPath + "/roofit_BidirMMapPipe-XXXXXX";
469 int fd;
470 // open temp file
471 if (-1 == (fd = ::mkstemp(const_cast<char*>(name.c_str())))) throw Exception("mkstemp", errno);
472 // remove it, but keep fd open
473 if (-1 == ::unlink(name.c_str())) {
474 int errsv = errno;
475 ::close(fd);
476 throw Exception("unlink", errsv);
477 }
478 // make it the right size: lseek
479 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
480 int errsv = errno;
481 ::close(fd);
482 throw Exception("lseek", errsv);
483 }
484 // make it the right size: write a byte
485 if (1 != ::write(fd, name.c_str(), 1)) {
486 int errsv = errno;
487 ::close(fd);
488 throw Exception("write", errsv);
489 }
490 // do mmap
491 void* retVal = ::mmap(nullptr, len,
492 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
493 if (MAP_FAILED == retVal) {
494 int errsv = errno;
495 ::close(fd);
496 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
497 } else {
498 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
499 s_mmapworks = FileBacked;
500 }
501 if (-1 == ::close(fd)) {
502 int errsv = errno;
503 ::munmap(retVal, len);
504 throw Exception("close", errsv);
505 }
506 if (BidirMMapPipe::debugflag() && !msgprinted) {
507 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
508 ", line " << __LINE__ << "): mmapping temporary files "
509 "works, good!" << std::endl;
510 msgprinted = true;
511 }
512 return retVal;
513 }
514 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
515 // fallback solution: mmap does not work on this OS (or does not
516 // work for what we want to use it), so use a normal buffer of
517 // memory instead, and collect data in that buffer - this needs an
518 // additional write/read to/from the pipe(s), but there you go...
519 if (BidirMMapPipe::debugflag() && !msgprinted) {
520 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
521 ", line " << __LINE__ << "): anonymous mmapping of "
522 "shared buffers failed, falling back to read/write on "
523 " pipes!" << std::endl;
524 msgprinted = true;
525 }
526 s_mmapworks = Copy;
527 void* retVal = std::malloc(len);
528 if (!retVal) throw Exception("malloc", errno);
529 return retVal;
530 }
531 // should never get here
532 assert(false);
533 return nullptr;
534 }
535
536 void PageChunk::domunmap(void* addr, unsigned len)
537 {
538 assert(len && 0 == (len % s_physpgsz));
539 if (addr) {
540 assert(Unknown != s_mmapworks);
541 if (Copy != s_mmapworks) {
542 if (-1 == ::munmap(addr, len))
543 throw Exception("munmap", errno);
544 } else {
545 std::free(addr);
546 }
547 }
548 }
549
550 void PageChunk::zap(Pages& p)
551 {
552 // try to mprotect the other bits of the pool with no access...
553 // we'd really like a version of mremap here that can unmap all the
554 // other pages in the chunk, but that does not exist, so we protect
555 // the other pages in this chunk such that they may neither be read,
556 // written nor executed, only the pages we're interested in for
557 // communications stay readable and writable
558 //
559 // if an OS does not support changing the protection of a part of an
560 // mmapped area, the mprotect calls below should just fail and not
561 // change any protection, so we're a little less safe against
562 // corruption, but everything should still work
563 if (Copy != s_mmapworks) {
564 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
565 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
566 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
567 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
568 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
569 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
570 }
571 m_parent = nullptr;
572 m_freelist.clear();
573 m_nUsedGrp = 1;
574 p.m_pimpl->m_parent = nullptr;
575 m_begin = m_end = nullptr;
576 // commit suicide
577 delete this;
578 }
579
580 PagePool::PagePool(unsigned nPgPerGroup) :
581 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
582 {
583 // if logical and physical page size differ, we may have to adjust
584 // m_nPgPerGrp to make things fit
585 if (PageChunk::pagesize() != PageChunk::physPgSz()) {
586 const unsigned mult =
587 PageChunk::physPgSz() / PageChunk::pagesize();
588 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
589 // round up to to next physical page boundary
590 const unsigned actual = mult *
591 (desired / mult + bool(desired % mult));
592 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
593 if (BidirMMapPipe::debugflag()) {
594 std::cerr << " INFO: In " << __func__ << " (" <<
595 __FILE__ << ", line " << __LINE__ <<
596 "): physical page size " << PageChunk::physPgSz() <<
597 ", subdividing into logical pages of size " <<
598 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
599 m_nPgPerGrp << " -> " << newPgPerGrp <<
600 std::endl;
601 }
602 assert(newPgPerGrp >= m_nPgPerGrp);
603 m_nPgPerGrp = newPgPerGrp;
604 }
605 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
606 }
607
608 PagePool::~PagePool()
609 {
610 m_freelist.clear();
611 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
612 delete *it;
613 m_chunks.clear();
614 }
615
616 void PagePool::zap(Pages& p)
617 {
618 // unmap all pages but those pointed to by p
619 m_freelist.clear();
620 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
621 if ((*it)->contains(p)) {
622 (*it)->zap(p);
623 } else {
624 delete *it;
625 }
626 }
627 m_chunks.clear();
628 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
629 m_cursz = minsz;
630 }
631
632 Pages PagePool::pop()
633 {
634 if (m_freelist.empty()) {
635 // allocate and register new chunk and put it on the freelist
636 const int sz = nextChunkSz();
637 Chunk *c = new Chunk(this,
638 sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
639 m_chunks.push_front(c);
640 m_freelist.push_back(c);
641 updateCurSz(sz, +1);
642 }
643 // get free element from first chunk on _freelist
644 Chunk* c = m_freelist.front();
645 Pages p(c->pop());
646 // full chunks are removed from _freelist
647 if (c->full()) m_freelist.pop_front();
648 return p;
649 }
650
651 void PagePool::release(PageChunk* chunk)
652 {
653 assert(chunk->empty());
654 // find chunk on freelist and remove
655 ChunkList::iterator it = std::find(
656 m_freelist.begin(), m_freelist.end(), chunk);
657 if (m_freelist.end() == it)
658 throw Exception("PagePool::release(PageChunk*)", EINVAL);
659 m_freelist.erase(it);
660 // find chunk in m_chunks and remove
661 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
662 if (m_chunks.end() == it)
663 throw Exception("PagePool::release(PageChunk*)", EINVAL);
664 m_chunks.erase(it);
665 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
666 delete chunk;
667 updateCurSz(sz, -1);
668 }
669
670 void PagePool::putOnFreeList(PageChunk* chunk)
671 {
672 assert(!chunk->full());
673 m_freelist.push_back(chunk);
674 }
675
676 void PagePool::updateCurSz(int sz, int incr)
677 {
678 m_szmap[(sz - minsz) / szincr] += incr;
679 m_cursz = minsz;
680 for (int i = (maxsz - minsz) / szincr; i--; ) {
681 if (m_szmap[i]) {
682 m_cursz += i * szincr;
683 break;
684 }
685 }
686 }
687
688 int PagePool::nextChunkSz() const
689 {
690 // no chunks with space available, figure out chunk size
691 int sz = m_cursz;
692 if (m_chunks.empty()) {
693 // if we start allocating chunks, we start from minsz
694 sz = minsz;
695 } else {
696 if (minsz >= sz) {
697 // minimal sized chunks are always grown
698 sz = minsz + szincr;
699 } else {
700 if (1 != m_chunks.size()) {
701 // if we have more than one completely filled chunk, grow
702 sz += szincr;
703 } else {
704 // just one chunk left, try shrinking chunk size
705 sz -= szincr;
706 }
707 }
708 }
709 // clamp size to allowed range
710 if (sz > maxsz) sz = maxsz;
711 if (sz < minsz) sz = minsz;
712 return sz;
713 }
714}
715
716// static BidirMMapPipe members
717pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
718std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
719BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = nullptr;
720unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
721int BidirMMapPipe::s_debugflag = 0;
722
723BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
724{
725 if (!s_pagepool)
726 s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
727 return *s_pagepool;
728}
729
730void BidirMMapPipe::teardownall(void)
731{
732 pthread_mutex_lock(&s_openpipesmutex);
733 while (!s_openpipes.empty()) {
734 BidirMMapPipe *p = s_openpipes.front();
735 pthread_mutex_unlock(&s_openpipesmutex);
736 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
737 p->doClose(true, true);
738 pthread_mutex_lock(&s_openpipesmutex);
739 }
740 pthread_mutex_unlock(&s_openpipesmutex);
741}
742
743BidirMMapPipe::BidirMMapPipe(const BidirMMapPipe&) :
744 m_pages(pagepool().pop())
745{
746 // free pages again
747 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
748 if (!s_pagepoolrefcnt) {
749 delete s_pagepool;
750 s_pagepool = nullptr;
751 }
752}
753
754BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
755 m_pages(pagepool().pop()), m_busylist(nullptr), m_freelist(nullptr), m_dirtylist(nullptr),
756 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
757 m_parentPid(::getpid())
758
759{
760 ++s_pagepoolrefcnt;
761 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
762 int fds[4] = { -1, -1, -1, -1 };
763 int myerrno;
764 static bool firstcall = true;
765 if (useExceptions) m_flags |= exceptionsbit;
766
767 try {
768 if (firstcall) {
769 firstcall = false;
770 // register a cleanup handler to make sure all BidirMMapPipes are torn
771 // down, and child processes are sent a SIGTERM
772 if (0 != atexit(BidirMMapPipe::teardownall))
773 throw Exception("atexit", errno);
774 }
775
776 // build free lists
777 for (unsigned i = 1; i < TotPages; ++i)
778 m_pages[i - 1]->setNext(m_pages[i]);
779 m_pages[PagesPerEnd - 1]->setNext(nullptr);
780 if (!useSocketpair) {
781 // create pipes
782 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
783 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
784 } else {
785 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
786 throw Exception("socketpair", errno);
787 }
788 // fork the child
789 pthread_mutex_lock(&s_openpipesmutex);
790 char c;
791 switch ((m_childPid = ::fork())) {
792 case -1: // error in fork()
793 myerrno = errno;
794 pthread_mutex_unlock(&s_openpipesmutex);
795 m_childPid = 0;
796 throw Exception("fork", myerrno);
797 case 0: // child
798 // put the ends in the right place
799 if (-1 != fds[2]) {
800 // pair of pipes
801 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
802 myerrno = errno;
803 pthread_mutex_unlock(&s_openpipesmutex);
804 throw Exception("close", myerrno);
805 }
806 fds[0] = fds[3] = -1;
807 m_outpipe = fds[1];
808 m_inpipe = fds[2];
809 } else {
810 // socket pair
811 if (-1 == ::close(fds[0])) {
812 myerrno = errno;
813 pthread_mutex_unlock(&s_openpipesmutex);
814 throw Exception("close", myerrno);
815 }
816 fds[0] = -1;
817 m_inpipe = m_outpipe = fds[1];
818 }
819 // close other pipes our parent may have open - we have no business
820 // reading from/writing to those...
821 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
822 s_openpipes.end() != it; ) {
823 BidirMMapPipe* p = *it;
824 it = s_openpipes.erase(it);
825 p->doClose(true, true);
826 }
827 pagepool().zap(m_pages);
828 s_pagepoolrefcnt = 0;
829 delete s_pagepool;
830 s_pagepool = nullptr;
831 s_openpipes.push_front(this);
832 pthread_mutex_unlock(&s_openpipesmutex);
833 // ok, put our pages on freelist
834 m_freelist = m_pages[PagesPerEnd];
835 // handshake with other end (to make sure it's alive)...
836 c = 'C'; // ...hild
837 if (1 != xferraw(m_outpipe, &c, 1, ::write))
838 throw Exception("handshake: xferraw write", EPIPE);
839 if (1 != xferraw(m_inpipe, &c, 1, ::read))
840 throw Exception("handshake: xferraw read", EPIPE);
841 if ('P' != c) throw Exception("handshake", EPIPE);
842 break;
843 default: // parent
844 // put the ends in the right place
845 if (-1 != fds[2]) {
846 // pair of pipes
847 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
848 myerrno = errno;
849 pthread_mutex_unlock(&s_openpipesmutex);
850 throw Exception("close", myerrno);
851 }
852 fds[1] = fds[2] = -1;
853 m_outpipe = fds[3];
854 m_inpipe = fds[0];
855 } else {
856 // socketpair
857 if (-1 == ::close(fds[1])) {
858 myerrno = errno;
859 pthread_mutex_unlock(&s_openpipesmutex);
860 throw Exception("close", myerrno);
861 }
862 fds[1] = -1;
863 m_inpipe = m_outpipe = fds[0];
864 }
865 // put on list of open pipes (so we can kill child processes
866 // if things go wrong)
867 s_openpipes.push_front(this);
868 pthread_mutex_unlock(&s_openpipesmutex);
869 // ok, put our pages on freelist
870 m_freelist = m_pages[0u];
871 // handshake with other end (to make sure it's alive)...
872 c = 'P'; // ...arent
873 if (1 != xferraw(m_outpipe, &c, 1, ::write))
874 throw Exception("handshake: xferraw write", EPIPE);
875 if (1 != xferraw(m_inpipe, &c, 1, ::read))
876 throw Exception("handshake: xferraw read", EPIPE);
877 if ('C' != c) throw Exception("handshake", EPIPE);
878 break;
879 }
880 // mark file descriptors for close on exec (we do not want to leak the
881 // connection to anything we happen to exec)
882 int fdflags = 0;
883 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
884 throw Exception("fcntl", errno);
885 fdflags |= FD_CLOEXEC;
886 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
887 throw Exception("fcntl", errno);
888 if (m_inpipe != m_outpipe) {
889 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
890 throw Exception("fcntl", errno);
891 fdflags |= FD_CLOEXEC;
892 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
893 throw Exception("fcntl", errno);
894 }
895 // ok, finally, clear the failbit
896 m_flags &= ~failbit;
897 // all done
898 } catch (BidirMMapPipe::Exception&) {
899 if (0 != m_childPid) kill(m_childPid, SIGTERM);
900 for (int i = 0; i < 4; ++i)
901 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
902 {
903 // free resources associated with mmapped pages
904 BidirMMapPipe_impl::Pages p; p.swap(m_pages);
905 }
906 if (!--s_pagepoolrefcnt) {
907 delete s_pagepool;
908 s_pagepool = nullptr;
909 }
910 throw;
911 }
912}
913
914int BidirMMapPipe::close()
915{
916 assert(!(m_flags & failbit));
917 return doClose(false);
918}
919
920int BidirMMapPipe::doClose(bool force, bool holdlock)
921{
922 if (m_flags & failbit) return 0;
923 // flush data to be written
924 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
925 // shut down the write direction (no more writes from our side)
926 if (m_inpipe == m_outpipe) {
927 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
928 throw Exception("shutdown", errno);
929 m_outpipe = -1;
930 } else {
931 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
932 if (!force) throw Exception("close", errno);
933 m_outpipe = -1;
934 }
935 // shut down the write direction (no more writes from our side)
936 // drain anything the other end might still want to send
937 if (!force && -1 != m_inpipe) {
938 // **************** THIS IS EXTREMELY UGLY: ****************
939 // POLLHUP is not set reliably on pipe/socket shutdown on all
940 // platforms, unfortunately, so we poll for readability here until
941 // the other end closes, too
942 //
943 // the read loop below ensures that the other end sees the POLLIN that
944 // is set on shutdown instead, and goes ahead to close its end
945 //
946 // if we don't do this, and close straight away, the other end
947 // will catch a SIGPIPE or similar, and we don't want that
948 int err;
949 struct pollfd fds;
950 fds.fd = m_inpipe;
951 fds.events = POLLIN;
952 fds.revents = 0;
953 do {
954 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
955 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
956 if (fds.revents & POLLIN) {
957 char c;
958 if (1 > ::read(m_inpipe, &c, 1)) break;
959 }
960 }
961 } while (0 > err && EINTR == errno);
962 // ignore all other poll errors
963 }
964 // close read end
965 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
966 if (!force) throw Exception("close", errno);
967 m_inpipe = -1;
968 // unmap memory
969 try {
970 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
971 if (!--s_pagepoolrefcnt) {
972 delete s_pagepool;
973 s_pagepool = nullptr;
974 }
975 } catch (std::exception&) {
976 if (!force) throw;
977 }
978 m_busylist = m_freelist = m_dirtylist = nullptr;
979 // wait for child process
980 int retVal = 0;
981 if (isParent()) {
982 int tmp;
983 do {
984 tmp = waitpid(m_childPid, &retVal, 0);
985 } while (-1 == tmp && EINTR == errno);
986 if (-1 == tmp)
987 if (!force) throw Exception("waitpid", errno);
988 m_childPid = 0;
989 }
990 // remove from list of open pipes
991 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
992 std::list<BidirMMapPipe*>::iterator it = std::find(
993 s_openpipes.begin(), s_openpipes.end(), this);
994 if (s_openpipes.end() != it) s_openpipes.erase(it);
995 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
996 m_flags |= failbit;
997 return retVal;
998}
999
1000BidirMMapPipe::~BidirMMapPipe()
1001{ doClose(false); }
1002
1003BidirMMapPipe::size_type BidirMMapPipe::xferraw(
1004 int fd, void* addr, size_type len,
1005 ssize_t (*xferfn)(int, void*, std::size_t))
1006{
1007 size_type xferred = 0;
1008 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1009 while (len) {
1010 ssize_t tmp = xferfn(fd, buf, len);
1011 if (tmp > 0) {
1012 xferred += tmp;
1013 len -= tmp;
1014 buf += tmp;
1015 continue;
1016 } else if (0 == tmp) {
1017 // check for end-of-file on pipe
1018 break;
1019 } else if (-1 == tmp) {
1020 // ok some error occurred, so figure out if we want to retry of throw
1021 switch (errno) {
1022 default:
1023 // if anything was transferred, return number of bytes
1024 // transferred so far, we can start throwing on the next
1025 // transfer...
1026 if (xferred) return xferred;
1027 // else throw
1028 throw Exception("xferraw", errno);
1029 case EAGAIN: // fallthrough intended
1030#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1031 case EWOULDBLOCK: // fallthrough intended
1032#endif
1033 std::cerr << " ERROR: In " << __func__ << " (" <<
1034 __FILE__ << ", line " << __LINE__ <<
1035 "): expect transfer to block!" << std::endl;
1036 case EINTR:
1037 break;
1038 }
1039 continue;
1040 } else {
1041 throw Exception("xferraw: unexpected return value from read/write",
1042 errno);
1043 }
1044 }
1045 return xferred;
1046}
1047
1048void BidirMMapPipe::sendpages(Page* plist)
1049{
1050 if (plist) {
1051 unsigned char pg = m_pages[plist];
1052 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1053 if (BidirMMapPipe_impl::PageChunk::Copy ==
1054 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1055 // ok, have to copy pages through pipe
1056 for (Page* p = plist; p; p = p->next()) {
1057 if (sizeof(Page) + p->size() !=
1058 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1059 ::write)) {
1060 throw Exception("sendpages: short write", EPIPE);
1061 }
1062 }
1063 }
1064 } else {
1065 throw Exception("sendpages: short write", EPIPE);
1066 }
1067 } else { assert(plist); }
1068}
1069
1070unsigned BidirMMapPipe::recvpages()
1071{
1072 unsigned char pg;
1073 unsigned retVal = 0;
1074 Page *plisthead = nullptr, *plisttail = nullptr;
1075 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1076 plisthead = plisttail = m_pages[pg];
1077 // ok, have number of pages
1078 if (BidirMMapPipe_impl::PageChunk::Copy ==
1079 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1080 // ok, need to copy pages through pipe
1081 for (; plisttail; ++retVal) {
1082 Page* p = plisttail;
1083 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1084 ::read)) {
1085 plisttail = p->next();
1086 if (p->empty()) continue;
1087 // break in case of read error
1088 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1089 ::read)) break;
1090 }
1091 }
1092 } else {
1093 retVal = lenPageList(plisthead);
1094 }
1095 }
1096 // put list of pages we just received into correct lists (busy/free)
1097 if (plisthead) feedPageLists(plisthead);
1098 // ok, retVal contains the number of pages read, so put them on the
1099 // correct lists
1100 return retVal;
1101}
1102
1103unsigned BidirMMapPipe::recvpages_nonblock()
1104{
1105 struct pollfd fds;
1106 fds.fd = m_inpipe;
1107 fds.events = POLLIN;
1108 fds.revents = 0;
1109 unsigned retVal = 0;
1110 do {
1111 int rc = ::poll(&fds, 1, 0);
1112 if (0 > rc) {
1113 if (EINTR == errno) continue;
1114 break;
1115 }
1116 if (1 == retVal && fds.revents & POLLIN &&
1117 !(fds.revents & (POLLNVAL | POLLERR))) {
1118 // ok, we can read without blocking, so the other end has
1119 // something for us
1120 return recvpages();
1121 } else {
1122 break;
1123 }
1124 } while (true);
1125 return retVal;
1126}
1127
1128unsigned BidirMMapPipe::lenPageList(const Page* p)
1129{
1130 unsigned n = 0;
1131 for ( ; p; p = p->next()) ++n;
1132 return n;
1133}
1134
1135void BidirMMapPipe::feedPageLists(Page* plist)
1136{
1137 assert(plist);
1138 // get end of busy list
1139 Page *blend = m_busylist;
1140 while (blend && blend->next()) blend = blend->next();
1141 // ok, might have to send free pages to other end, and (if we do have to
1142 // send something to the other end) while we're at it, send any dirty
1143 // pages which are completely full, too
1144 Page *sendlisthead = nullptr, *sendlisttail = nullptr;
1145 // loop over plist
1146 while (plist) {
1147 Page* p = plist;
1148 plist = p->next();
1149 p->setNext(nullptr);
1150 if (!p->empty()) {
1151 // busy page...
1152 p->pos() = 0;
1153 // put at end of busy list
1154 if (blend) blend->setNext(p);
1155 else m_busylist = p;
1156 blend = p;
1157 } else {
1158 // free page...
1159 // Very simple algorithm: once we're done with a page, we send it back
1160 // where it came from. If it's from our end, we put it on the free list, if
1161 // it's from the other end, we send it back.
1162 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1163 (isChild() && m_pages[p] < PagesPerEnd)) {
1164 // page "belongs" to other end
1165 if (!sendlisthead) sendlisthead = p;
1166 if (sendlisttail) sendlisttail->setNext(p);
1167 sendlisttail = p;
1168 } else {
1169 // add page to freelist
1170 p->setNext(m_freelist);
1171 m_freelist = p;
1172 }
1173 }
1174 }
1175 // check if we have to send stuff to the other end
1176 if (sendlisthead) {
1177 // go through our list of dirty pages, and see what we can
1178 // send along
1179 Page* dp;
1180 while ((dp = m_dirtylist) && dp->full()) {
1181 Page* p = dp;
1182 // move head of dirty list
1183 m_dirtylist = p->next();
1184 // queue for sending
1185 p->setNext(nullptr);
1186 sendlisttail->setNext(p);
1187 sendlisttail = p;
1188 }
1189 // poll if the other end is still alive - this needs that we first
1190 // close the write pipe of the other end when the remote end of the
1191 // connection is shutting down in doClose; we'll see that because we
1192 // get a POLLHUP on our inpipe
1193 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1194 struct pollfd fds[2];
1195 fds[0].fd = m_outpipe;
1196 fds[0].events = fds[0].revents = 0;
1197 if (m_outpipe != m_inpipe) {
1198 fds[1].fd = m_inpipe;
1199 fds[1].events = fds[1].revents = 0;
1200 } else {
1201 fds[0].events |= POLLIN;
1202 }
1203 int retVal = 0;
1204 do {
1205 retVal = ::poll(fds, nfds, 0);
1206 if (0 > retVal && EINTR == errno)
1207 continue;
1208 break;
1209 } while (true);
1210 if (0 <= retVal) {
1211 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1212 if (m_outpipe != m_inpipe) {
1213 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1214 } else {
1215 if (ok && fds[0].revents & POLLIN) {
1216 unsigned ret = recvpages();
1217 if (!ret) ok = false;
1218 }
1219 }
1220
1221 if (ok) sendpages(sendlisthead);
1222 // (if the pipe is dead already, we don't care that we leak the
1223 // contents of the pages on the send list here, so that is why
1224 // there's no else clause here)
1225 } else {
1226 throw Exception("feedPageLists: poll", errno);
1227 }
1228 }
1229}
1230
1231void BidirMMapPipe::markPageDirty(Page* p)
1232{
1233 assert(p);
1234 assert(p == m_freelist);
1235 // remove from freelist
1236 m_freelist = p->next();
1237 p->setNext(nullptr);
1238 // append to dirty list
1239 Page* dl = m_dirtylist;
1240 while (dl && dl->next()) dl = dl->next();
1241 if (dl) dl->setNext(p);
1242 else m_dirtylist = p;
1243}
1244
1245BidirMMapPipe::Page* BidirMMapPipe::busypage()
1246{
1247 // queue any pages available for reading we can without blocking
1248 recvpages_nonblock();
1249 Page* p;
1250 // if there are no busy pages, try to get them from the other end,
1251 // block if we have to...
1252 while (!(p = m_busylist)) if (!recvpages()) return nullptr;
1253 return p;
1254}
1255
1256BidirMMapPipe::Page* BidirMMapPipe::dirtypage()
1257{
1258 // queue any pages available for reading we can without blocking
1259 recvpages_nonblock();
1260 Page* p = m_dirtylist;
1261 // go to end of dirty list
1262 if (p) while (p->next()) p = p->next();
1263 if (!p || p->full()) {
1264 // need to append free page, so get one
1265 while (!(p = m_freelist)) if (!recvpages()) return nullptr;
1266 markPageDirty(p);
1267 }
1268 return p;
1269}
1270
1271void BidirMMapPipe::flush()
1272{ return doFlush(true); }
1273
1274void BidirMMapPipe::doFlush(bool forcePartialPages)
1275{
1276 assert(!(m_flags & failbit));
1277 // build a list of pages to flush
1278 Page *flushlisthead = nullptr, *flushlisttail = nullptr;
1279 while (m_dirtylist) {
1280 Page* p = m_dirtylist;
1281 if (!forcePartialPages && !p->full()) break;
1282 // remove dirty page from dirty list
1283 m_dirtylist = p->next();
1284 p->setNext(nullptr);
1285 // and send it to other end
1286 if (!flushlisthead) flushlisthead = p;
1287 if (flushlisttail) flushlisttail->setNext(p);
1288 flushlisttail = p;
1289 }
1290 if (flushlisthead) sendpages(flushlisthead);
1291}
1292
1293void BidirMMapPipe::purge()
1294{
1295 assert(!(m_flags & failbit));
1296 // join busy and dirty lists
1297 {
1298 Page *l = m_busylist;
1299 while (l && l->next()) l = l->next();
1300 if (l) l->setNext(m_dirtylist);
1301 else m_busylist = m_dirtylist;
1302 }
1303 // empty busy and dirty pages
1304 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1305 // put them on the free list
1306 if (m_busylist) feedPageLists(m_busylist);
1307 m_busylist = m_dirtylist = nullptr;
1308}
1309
1310BidirMMapPipe::size_type BidirMMapPipe::bytesReadableNonBlocking()
1311{
1312 // queue all pages waiting for consumption in the pipe before we give an
1313 // answer
1314 recvpages_nonblock();
1315 size_type retVal = 0;
1316 for (Page* p = m_busylist; p; p = p->next())
1317 retVal += p->size() - p->pos();
1318 return retVal;
1319}
1320
1321BidirMMapPipe::size_type BidirMMapPipe::bytesWritableNonBlocking()
1322{
1323 // queue all pages waiting for consumption in the pipe before we give an
1324 // answer
1325 recvpages_nonblock();
1326 // check if we could write to the pipe without blocking (we need to know
1327 // because we might need to check if flushing of dirty pages would block)
1328 bool couldwrite = false;
1329 {
1330 struct pollfd fds;
1331 fds.fd = m_outpipe;
1332 fds.events = POLLOUT;
1333 fds.revents = 0;
1334 int retVal = 0;
1335 do {
1336 retVal = ::poll(&fds, 1, 0);
1337 if (0 > retVal) {
1338 if (EINTR == errno) continue;
1339 throw Exception("bytesWritableNonBlocking: poll", errno);
1340 }
1341 if (1 == retVal && fds.revents & POLLOUT &&
1342 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1343 couldwrite = true;
1344 break;
1345 } while (true);
1346 }
1347 // ok, start counting bytes
1348 size_type retVal = 0;
1349 unsigned npages = 0;
1350 // go through the dirty list
1351 for (Page* p = m_dirtylist; p; p = p->next()) {
1352 ++npages;
1353 // if page only partially filled
1354 if (!p->full())
1355 retVal += p->free();
1356 if (npages >= FlushThresh && !couldwrite) break;
1357 }
1358 // go through the free list
1359 for (Page* p = m_freelist; p && (!m_dirtylist ||
1360 npages < FlushThresh || couldwrite); p = p->next()) {
1361 ++npages;
1362 retVal += Page::capacity();
1363 }
1364 return retVal;
1365}
1366
1367BidirMMapPipe::size_type BidirMMapPipe::read(void* addr, size_type sz)
1368{
1369 assert(!(m_flags & failbit));
1370 size_type nread = 0;
1371 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1372 try {
1373 while (sz) {
1374 // find next page to read from
1375 Page* p = busypage();
1376 if (!p) {
1377 m_flags |= eofbit;
1378 return nread;
1379 }
1380 unsigned char* pp = p->begin() + p->pos();
1381 size_type csz = std::min(size_type(p->remaining()), sz);
1382 std::copy(pp, pp + csz, ap);
1383 nread += csz;
1384 ap += csz;
1385 sz -= csz;
1386 p->pos() += csz;
1387 assert(p->size() >= p->pos());
1388 if (p->size() == p->pos()) {
1389 // if no unread data remains, page is free
1390 m_busylist = p->next();
1391 p->setNext(nullptr);
1392 p->size() = 0;
1393 feedPageLists(p);
1394 }
1395 }
1396 } catch (Exception&) {
1397 m_flags |= rderrbit;
1398 if (m_flags & exceptionsbit) throw;
1399 }
1400 return nread;
1401}
1402
1403BidirMMapPipe::size_type BidirMMapPipe::write(const void* addr, size_type sz)
1404{
1405 assert(!(m_flags & failbit));
1406 size_type written = 0;
1407 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1408 try {
1409 while (sz) {
1410 // find next page to write to
1411 Page* p = dirtypage();
1412 if (!p) {
1413 m_flags |= eofbit;
1414 return written;
1415 }
1416 unsigned char* pp = p->begin() + p->size();
1417 size_type csz = std::min(size_type(p->free()), sz);
1418 std::copy(ap, ap + csz, pp);
1419 written += csz;
1420 ap += csz;
1421 p->size() += csz;
1422 sz -= csz;
1423 assert(p->capacity() >= p->size());
1424 if (p->full()) {
1425 // if page is full, see if we're above the flush threshold of
1426 // 3/4 of our pages
1427 if (lenPageList(m_dirtylist) >= FlushThresh)
1428 doFlush(false);
1429 }
1430 }
1431 } catch (Exception&) {
1432 m_flags |= wrerrbit;
1433 if (m_flags & exceptionsbit) throw;
1434 }
1435 return written;
1436}
1437
1438int BidirMMapPipe::poll(BidirMMapPipe::PollVector& pipes, int timeout)
1439{
1440 // go through pipes, and change flags where we already know without really
1441 // polling - stuff where we don't need poll to wait for its timeout in the
1442 // OS...
1443 bool canskiptimeout = false;
1444 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1445 std::vector<unsigned>::iterator mit = masks.begin();
1446 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1447 ++it, ++mit) {
1448 PollEntry& pe = *it;
1449 pe.revents = None;
1450 // null pipe is invalid
1451 if (!pe.pipe) {
1452 pe.revents |= Invalid;
1453 canskiptimeout = true;
1454 continue;
1455 }
1456 // closed pipe is invalid
1457 if (pe.pipe->closed()) pe.revents |= Invalid;
1458 // check for error
1459 if (pe.pipe->bad()) pe.revents |= Error;
1460 // check for end of file
1461 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1462 // check if readable
1463 if (pe.events & Readable) {
1464 *mit |= Readable;
1465 if (pe.pipe->m_busylist) pe.revents |= Readable;
1466 }
1467 // check if writable
1468 if (pe.events & Writable) {
1469 *mit |= Writable;
1470 if (pe.pipe->m_freelist) {
1471 pe.revents |= Writable;
1472 } else {
1473 Page *dl = pe.pipe->m_dirtylist;
1474 while (dl && dl->next()) dl = dl->next();
1475 if (dl && dl->pos() < Page::capacity())
1476 pe.revents |= Writable;
1477 }
1478 }
1479 if (pe.revents) canskiptimeout = true;
1480 }
1481 // set up the data structures required for the poll syscall
1482 std::vector<pollfd> fds;
1483 fds.reserve(2 * pipes.size());
1484 std::map<int, PollEntry*> fds2pipes;
1485 for (PollVector::const_iterator it = pipes.begin();
1486 pipes.end() != it; ++it) {
1487 const PollEntry& pe = *it;
1488 struct pollfd tmp;
1489 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1490 const_cast<PollEntry*>(&pe)));
1491 tmp.events = tmp.revents = 0;
1492 // we always poll for readability; this allows us to queue pages
1493 // early
1494 tmp.events |= POLLIN;
1495 if (pe.pipe->m_outpipe != tmp.fd) {
1496 // ok, it's a pair of pipes
1497 fds.push_back(tmp);
1498 fds2pipes.insert(std::make_pair(
1499 unsigned(tmp.fd = pe.pipe->m_outpipe),
1500 const_cast<PollEntry*>(&pe)));
1501 tmp.events = 0;
1502
1503 }
1504 if (pe.events & Writable) tmp.events |= POLLOUT;
1505 fds.push_back(tmp);
1506 }
1507 // poll
1508 int retVal = 0;
1509 do {
1510 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1511 if (0 > retVal) {
1512 if (EINTR == errno) continue;
1513 throw Exception("poll", errno);
1514 }
1515 break;
1516 } while (true);
1517 // fds may have changed state, so update...
1518 for (std::vector<pollfd>::iterator it = fds.begin();
1519 fds.end() != it; ++it) {
1520 pollfd& fe = *it;
1521 //if (!fe.revents) continue;
1522 --retVal;
1523 PollEntry& pe = *fds2pipes[fe.fd];
1524oncemore:
1525 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1526 pe.revents |= ReadInvalid;
1527 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1528 pe.revents |= WriteInvalid;
1529 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1530 pe.revents |= ReadError;
1531 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1532 pe.revents |= WriteError;
1533 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1534 pe.revents |= ReadEndOfFile;
1535 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1536 pe.revents |= WriteEndOfFile;
1537 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1538 !(fe.revents & (POLLNVAL | POLLERR))) {
1539 // ok, there is at least one page for us to receive from the
1540 // other end
1541 if (0 == pe.pipe->recvpages()) continue;
1542 // more pages there?
1543 do {
1544 int tmp = ::poll(&fe, 1, 0);
1545 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1546 if (0 > tmp) {
1547 if (EINTR == errno) continue;
1548 throw Exception("poll", errno);
1549 }
1550 break;
1551 } while (true);
1552 }
1553 if (pe.pipe->m_busylist) pe.revents |= Readable;
1554 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1555 if (pe.pipe->m_freelist) {
1556 pe.revents |= Writable;
1557 } else {
1558 Page *dl = pe.pipe->m_dirtylist;
1559 while (dl && dl->next()) dl = dl->next();
1560 if (dl && dl->pos() < Page::capacity())
1561 pe.revents |= Writable;
1562 }
1563 }
1564 }
1565 // apply correct masks, and count pipes with pending events
1566 int npipes = 0;
1567 mit = masks.begin();
1568 for (PollVector::iterator it = pipes.begin();
1569 pipes.end() != it; ++it, ++mit)
1570 if ((it->revents &= *mit)) ++npipes;
1571 return npipes;
1572}
1573
1574BidirMMapPipe& BidirMMapPipe::operator<<(const char* str)
1575{
1576 size_t sz = std::strlen(str);
1577 *this << sz;
1578 if (sz) write(str, sz);
1579 return *this;
1580}
1581
1582BidirMMapPipe& BidirMMapPipe::operator>>(char* (&str))
1583{
1584 size_t sz = 0;
1585 *this >> sz;
1586 if (good() && !eof()) {
1587 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1588 if (!str) throw Exception("realloc", errno);
1589 if (sz) read(str, sz);
1590 str[sz] = 0;
1591 }
1592 return *this;
1593}
1594
1595BidirMMapPipe& BidirMMapPipe::operator<<(const std::string& str)
1596{
1597 size_t sz = str.size();
1598 *this << sz;
1599 write(str.data(), sz);
1600 return *this;
1601}
1602
1603BidirMMapPipe& BidirMMapPipe::operator>>(std::string& str)
1604{
1605 str.clear();
1606 size_t sz = 0;
1607 *this >> sz;
1608 if (good() && !eof()) {
1609 str.reserve(sz);
1610 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1611 }
1612 return *this;
1613}
1614
1615END_NAMESPACE_ROOFIT
1616
1617#ifdef TEST_BIDIRMMAPPIPE
1618using namespace RooFit;
1619
1620int simplechild(BidirMMapPipe& pipe)
1621{
1622 // child does an echo loop
1623 while (pipe.good() && !pipe.eof()) {
1624 // read a string
1625 std::string str;
1626 pipe >> str;
1627 if (!pipe) return -1;
1628 if (pipe.eof()) break;
1629 if (!str.empty()) {
1630 std::cout << "[CHILD] : read: " << str << std::endl;
1631 str = "... early in the morning?";
1632 }
1633 pipe << str << BidirMMapPipe::flush;
1634 // did our parent tell us to shut down?
1635 if (str.empty()) break;
1636 if (!pipe) return -1;
1637 if (pipe.eof()) break;
1638 std::cout << "[CHILD] : wrote: " << str << std::endl;
1639 }
1640 pipe.close();
1641 return 0;
1642}
1643
1644#include <sstream>
1645int randomchild(BidirMMapPipe& pipe)
1646{
1647 // child sends out something at random intervals
1648 ::srand48(::getpid());
1649 {
1650 // wait for parent's go ahead signal
1651 std::string s;
1652 pipe >> s;
1653 }
1654 // no shutdown sequence needed on this side - we're producing the data,
1655 // and the parent can just read until we're done (when it'll get EOF)
1656 for (int i = 0; i < 5; ++i) {
1657 // sleep a random time between 0 and .9 seconds
1658 ::usleep(int(1e6 * ::drand48()));
1659 std::ostringstream buf;
1660 buf << "child pid " << ::getpid() << " sends message " << i;
1661 std::string str = buf.str();
1662 std::cout << "[CHILD] : " << str << std::endl;
1663 pipe << str << BidirMMapPipe::flush;
1664 if (!pipe) return -1;
1665 if (pipe.eof()) break;
1666 }
1667 // tell parent we're shutting down
1668 pipe << "" << BidirMMapPipe::flush;
1669 // wait for parent to acknowledge
1670 std::string s;
1671 pipe >> s;
1672 pipe.close();
1673 return 0;
1674}
1675
1676int benchchildrtt(BidirMMapPipe& pipe)
1677{
1678 // child does the equivalent of listening for pings and sending the
1679 // packet back
1680 char* str = 0;
1681 while (pipe && !pipe.eof()) {
1682 pipe >> str;
1683 if (!pipe) {
1684 std::free(str);
1685 pipe.close();
1686 return -1;
1687 }
1688 if (pipe.eof()) break;
1689 pipe << str << BidirMMapPipe::flush;
1690 // if we have just completed the shutdown handshake, we break here
1691 if (!std::strlen(str)) break;
1692 }
1693 std::free(str);
1694 pipe.close();
1695 return 0;
1696}
1697
1698int benchchildsink(BidirMMapPipe& pipe)
1699{
1700 // child behaves like a sink
1701 char* str = 0;
1702 while (pipe && !pipe.eof()) {
1703 pipe >> str;
1704 if (!std::strlen(str)) break;
1705 }
1706 pipe << "" << BidirMMapPipe::flush;
1707 std::free(str);
1708 pipe.close();
1709 return 0;
1710}
1711
1712int benchchildsource(BidirMMapPipe& pipe)
1713{
1714 // child behaves like a source
1715 char* str = 0;
1716 for (unsigned i = 0; i <= 24; ++i) {
1717 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1718 std::memset(str, '4', 1 << i);
1719 str[1 << i] = 0;
1720 for (unsigned j = 0; j < 1 << 7; ++j) {
1721 pipe << str;
1722 if (!pipe || pipe.eof()) {
1723 std::free(str);
1724 pipe.close();
1725 return -1;
1726 }
1727 }
1728 // tell parent we're done with this block size
1729 pipe << "" << BidirMMapPipe::flush;
1730 }
1731 // tell parent to shut down
1732 pipe << "" << BidirMMapPipe::flush;
1733 std::free(str);
1734 pipe.close();
1735 return 0;
1736}
1737
1738BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1739{
1740 // create a pipe with the given child at the remote end
1741 BidirMMapPipe *p = new BidirMMapPipe();
1742 if (p->isChild()) {
1743 int retVal = childexec(*p);
1744 delete p;
1745 std::exit(retVal);
1746 }
1747 return p;
1748}
1749
1750#include <sys/time.h>
1751#include <iomanip>
1752int main()
1753{
1754 // simple echo loop test
1755 {
1756 std::cout << "[PARENT]: simple challenge-response test, "
1757 "one child:" << std::endl;
1758 BidirMMapPipe* pipe = spawnChild(simplechild);
1759 for (int i = 0; i < 5; ++i) {
1760 std::string str("What shall we do with a drunken sailor...");
1761 *pipe << str << BidirMMapPipe::flush;
1762 if (!*pipe) return -1;
1763 std::cout << "[PARENT]: wrote: " << str << std::endl;
1764 *pipe >> str;
1765 if (!*pipe) return -1;
1766 std::cout << "[PARENT]: read: " << str << std::endl;
1767 }
1768 // send shutdown string
1769 *pipe << "" << BidirMMapPipe::flush;
1770 // wait for shutdown handshake
1771 std::string s;
1772 *pipe >> s;
1773 int retVal = pipe->close();
1774 std::cout << "[PARENT]: exit status of child: " << retVal <<
1775 std::endl;
1776 if (retVal) return retVal;
1777 delete pipe;
1778 }
1779 // simple poll test - children send 5 results in random intervals
1780 {
1781 unsigned nch = 20;
1782 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1783 " children:" << std::endl;
1784 typedef BidirMMapPipe::PollEntry PollEntry;
1785 // poll data structure
1786 BidirMMapPipe::PollVector pipes;
1787 pipes.reserve(nch);
1788 // spawn children
1789 for (unsigned i = 0; i < nch; ++i) {
1790 std::cout << "[PARENT]: spawning child " << i << std::endl;
1791 pipes.push_back(PollEntry(spawnChild(randomchild),
1792 BidirMMapPipe::Readable));
1793 }
1794 // wake children up
1795 std::cout << "[PARENT]: waking up children" << std::endl;
1796 for (unsigned i = 0; i < nch; ++i)
1797 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1798 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1799 // while at least some children alive
1800 while (!pipes.empty()) {
1801 // poll, wait until status change (infinite timeout)
1802 int npipes = BidirMMapPipe::poll(pipes, -1);
1803 // scan for pipes with changed status
1804 for (std::vector<PollEntry>::iterator it = pipes.begin();
1805 npipes && pipes.end() != it; ) {
1806 if (!it->revents) {
1807 // unchanged, next one
1808 ++it;
1809 continue;
1810 }
1811 --npipes; // maybe we can stop early...
1812 // read from pipes which are readable
1813 if (it->revents & BidirMMapPipe::Readable) {
1814 std::string s;
1815 *(it->pipe) >> s;
1816 if (!s.empty()) {
1817 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1818 ": " << s << std::endl;
1819 ++it;
1820 continue;
1821 } else {
1822 // child is shutting down...
1823 *(it->pipe) << "" << BidirMMapPipe::flush;
1824 goto childcloses;
1825 }
1826 }
1827 // retire pipes with error or end-of-file condition
1828 if (it->revents & (BidirMMapPipe::Error |
1829 BidirMMapPipe::EndOfFile |
1830 BidirMMapPipe::Invalid)) {
1831 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1832 " revents" <<
1833 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1834 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1835 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1836 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1837 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1838 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1839 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1840 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1841 std::endl;
1842childcloses:
1843 int retVal = it->pipe->close();
1844 std::cout << "[PARENT]: child exit status: " <<
1845 retVal << ", number of children still alive: " <<
1846 (pipes.size() - 1) << std::endl;
1847 if (retVal) return retVal;
1848 delete it->pipe;
1849 it = pipes.erase(it);
1850 continue;
1851 }
1852 }
1853 }
1854 }
1855 // little benchmark - round trip time
1856 {
1857 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1858 for (unsigned i = 0; i <= 24; ++i) {
1859 std::vector<char> s(1 + (1 << i));
1860 std::memset(s, 'A', 1 << i);
1861 s[1 << i] = 0;
1862 const unsigned n = 1 << 7;
1863 double avg = 0., min = 1e42, max = -1e42;
1864 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1865 for (unsigned j = n; j--; ) {
1866 struct timeval t1;
1867 ::gettimeofday(&t1, 0);
1868 *pipe << s << BidirMMapPipe::flush;
1869 if (!*pipe || pipe->eof()) break;
1870 *pipe >> s;
1871 if (!*pipe || pipe->eof()) break;
1872 struct timeval t2;
1873 ::gettimeofday(&t2, 0);
1874 t2.tv_sec -= t1.tv_sec;
1875 t2.tv_usec -= t1.tv_usec;
1876 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1877 if (dt < min) min = dt;
1878 if (dt > max) max = dt;
1879 avg += dt;
1880 }
1881 // send a shutdown string
1882 *pipe << "" << BidirMMapPipe::flush;
1883 // get child's shutdown ok
1884 *pipe >> s;
1885 avg /= double(n);
1886 avg *= 1e6; min *= 1e6; max *= 1e6;
1887 int retVal = pipe->close();
1888 if (retVal) {
1889 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1890 return retVal;
1891 }
1892 delete pipe;
1893 // there is a factor 2 in the formula for the transfer rate below,
1894 // because we transfer data of twice the size of the block - once
1895 // to the child, and once for the return trip
1896 std::cout << "block size " << std::setw(9) << (1 << i) <<
1897 " avg " << std::setw(7) << avg << " us min " <<
1898 std::setw(7) << min << " us max " << std::setw(7) << max <<
1899 "us speed " << std::setw(9) <<
1900 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1901 " MB/s" << std::endl;
1902 }
1903 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1904 }
1905 // little benchmark - child as sink
1906 {
1907 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1908 for (unsigned i = 0; i <= 24; ++i) {
1909 std::vector<char> s(1 + (1 << i));
1910 std::memset(s, 'A', 1 << i);
1911 s[1 << i] = 0;
1912 const unsigned n = 1 << 7;
1913 double avg = 0., min = 1e42, max = -1e42;
1914 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1915 for (unsigned j = n; j--; ) {
1916 struct timeval t1;
1917 ::gettimeofday(&t1, 0);
1918 // streaming mode - we do not flush here
1919 *pipe << s;
1920 if (!*pipe || pipe->eof()) break;
1921 struct timeval t2;
1922 ::gettimeofday(&t2, 0);
1923 t2.tv_sec -= t1.tv_sec;
1924 t2.tv_usec -= t1.tv_usec;
1925 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1926 if (dt < min) min = dt;
1927 if (dt > max) max = dt;
1928 avg += dt;
1929 }
1930 // send a shutdown string
1931 *pipe << "" << BidirMMapPipe::flush;
1932 // get child's shutdown ok
1933 *pipe >> s;
1934 avg /= double(n);
1935 avg *= 1e6; min *= 1e6; max *= 1e6;
1936 int retVal = pipe->close();
1937 if (retVal) {
1938 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1939 return retVal;
1940 }
1941 delete pipe;
1942 std::cout << "block size " << std::setw(9) << (1 << i) <<
1943 " avg " << std::setw(7) << avg << " us min " <<
1944 std::setw(7) << min << " us max " << std::setw(7) << max <<
1945 "us speed " << std::setw(9) <<
1946 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1947 " MB/s" << std::endl;
1948 }
1949 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1950 }
1951 // little benchmark - child as source
1952 {
1953 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1954 char *s = 0;
1955 double avg = 0., min = 1e42, max = -1e42;
1956 unsigned n = 0, bsz = 0;
1957 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1958 while (*pipe && !pipe->eof()) {
1959 struct timeval t1;
1960 ::gettimeofday(&t1, 0);
1961 // streaming mode - we do not flush here
1962 *pipe >> s;
1963 if (!*pipe || pipe->eof()) break;
1964 struct timeval t2;
1965 ::gettimeofday(&t2, 0);
1966 t2.tv_sec -= t1.tv_sec;
1967 t2.tv_usec -= t1.tv_usec;
1968 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1969 if (std::strlen(s)) {
1970 ++n;
1971 if (dt < min) min = dt;
1972 if (dt > max) max = dt;
1973 avg += dt;
1974 bsz = std::strlen(s);
1975 } else {
1976 if (!n) break;
1977 // next block size
1978 avg /= double(n);
1979 avg *= 1e6; min *= 1e6; max *= 1e6;
1980
1981 std::cout << "block size " << std::setw(9) << bsz <<
1982 " avg " << std::setw(7) << avg << " us min " <<
1983 std::setw(7) << min << " us max " << std::setw(7) <<
1984 max << "us speed " << std::setw(9) <<
1985 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1986 " MB/s" << std::endl;
1987 n = 0;
1988 avg = 0.;
1989 min = 1e42;
1990 max = -1e42;
1991 }
1992 }
1993 int retVal = pipe->close();
1994 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1995 if (retVal) return retVal;
1996 delete pipe;
1997 std::free(s);
1998 }
1999 return 0;
2000}
2001#endif // TEST_BIDIRMMAPPIPE
2002#endif // _WIN32
2003
2004// vim: ft=cpp:sw=4:tw=78:et
2005
2006/// \endcond
ROOT::R::TRInterface & Exception()
Definition Exception.C:4
int main()
Definition Prototype.cxx:12
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define e(i)
Definition RSha256.hxx:103
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
Binding & operator=(OUT(*fun)(void))
R__EXTERN TSystem * gSystem
Definition TSystem.h:560
#define free
Definition civetweb.c:1539
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition TSystem.cxx:1469
const Int_t n
Definition legend1.C:16
void(off) SmallVectorTemplateBase< T
void Copy(void *source, void *dest)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
__device__ AFloat max(AFloat x, AFloat y)
Definition Kernels.cuh:207
static const char * what
Definition stlLoader.cc:6
TLine l
Definition textangle.C:4
auto * t1
Definition textangle.C:20
static unsigned long masks[]
Definition gifencode.c:211