30 #include <sys/socket.h>
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
40 namespace BidirMMapPipe_impl {
46 class BidirMMapPipeException :
public std::exception
55 static int dostrerror_r(
int err,
char* buf,
std::size_t sz,
57 {
return f(err, buf, sz); }
63 BidirMMapPipeException(
const char* msg,
int err);
65 virtual const char*
what()
const throw() {
return m_buf; }
68 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
85 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
89 char *tmp =
f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
93 if (std::strlen(tmp) > sz - 1)
return ERANGE;
113 unsigned short m_size;
114 unsigned short m_pos;
119 {
return *
reinterpret_cast<Page*
>(0); }
122 Page() : m_next(0), m_size(0), m_pos(0)
130 void setNext(
const Page* p);
134 unsigned short& size() {
return m_size; }
136 unsigned size()
const {
return m_size; }
138 unsigned short& pos() {
return m_pos; }
140 unsigned pos()
const {
return m_pos; }
142 inline unsigned char* begin()
const
143 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
146 inline unsigned char* end()
const
147 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
150 static unsigned capacity()
153 bool empty()
const {
return !m_size; }
155 bool filled()
const {
return !empty(); }
157 unsigned free()
const {
return capacity() - m_size; }
159 unsigned remaining()
const {
return m_size - m_pos; }
161 bool full()
const {
return !
free(); }
164 void Page::setNext(
const Page* p)
169 const char*
p1 =
reinterpret_cast<char*
>(
this);
170 const char*
p2 =
reinterpret_cast<const char*
>(p);
171 std::ptrdiff_t tmp = p2 -
p1;
185 if (!m_next)
return 0;
186 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
188 return reinterpret_cast<Page*
>(ptmp);
206 typedef BidirMMapPipeException
Exception;
216 typedef std::list<Chunk*> ChunkList;
247 unsigned m_szmap[(maxsz - minsz) / szincr];
254 void updateCurSz(
int sz,
int incr);
256 int nextChunkSz()
const;
258 void putOnFreeList(Chunk* chunk);
260 void release(Chunk* chunk);
287 m_pimpl(other.m_pimpl)
292 if (&other ==
this)
return *
this;
306 assert(pgno < m_pimpl->m_npages);
307 unsigned char* pptr =
310 return reinterpret_cast<Page*
>(pptr);
315 const unsigned char* pptr =
316 reinterpret_cast<const unsigned char*
>(p);
317 const unsigned char* bptr =
320 const unsigned nr = (pptr - bptr) /
pagesize();
321 assert(nr < m_pimpl->m_npages);
328 long pgsz = sysconf(_SC_PAGESIZE);
329 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
330 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
340 unsigned length,
unsigned nPgPerGroup) :
343 reinterpret_cast<unsigned char*>(
m_begin) + length)),
347 unsigned char* p =
reinterpret_cast<unsigned char*
>(
m_begin);
348 unsigned char* pend =
reinterpret_cast<unsigned char*
>(
m_end);
350 m_freelist.push_back(reinterpret_cast<void*>(p));
351 p += nPgPerGroup * PagePool::pagesize();
377 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
381 if (wasempty)
m_parent->putOnFreeList(
this);
401 static bool msgprinted =
false;
403 #if defined(MAP_ANONYMOUS)
405 #define MYANONFLAG MAP_ANONYMOUS
406 #elif defined(MAP_ANON)
408 #define MYANONFLAG MAP_ANON
413 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
414 MYANONFLAG | MAP_SHARED, -1, 0);
415 if (MAP_FAILED == retVal) {
421 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
422 __FILE__ <<
", line " << __LINE__ <<
423 "): anonymous mmapping works, excellent!" <<
435 int fd =
::open(
"/dev/zero", O_RDWR);
437 throw Exception(
"open /dev/zero", errno);
438 void* retVal = ::mmap(0, len,
439 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
440 if (MAP_FAILED == retVal) {
448 if (-1 == ::
close(fd))
449 throw Exception(
"close /dev/zero", errno);
451 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
452 ", line " << __LINE__ <<
"): mmapping /dev/zero works, "
453 "very good!" << std::endl;
459 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
462 if (-1 == (fd = ::mkstemp(name)))
throw Exception(
"mkstemp", errno);
464 if (-1 == ::unlink(name)) {
470 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476 if (1 != ::
write(fd, name, 1)) {
482 void* retVal = ::mmap(0, len,
483 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
484 if (MAP_FAILED == retVal) {
492 if (-1 == ::
close(fd)) {
494 ::munmap(retVal, len);
498 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
499 ", line " << __LINE__ <<
"): mmapping temporary files "
500 "works, good!" << std::endl;
511 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
512 ", line " << __LINE__ <<
"): anonymous mmapping of "
513 "shared buffers failed, falling back to read/write on "
514 " pipes!" << std::endl;
519 if (!retVal)
throw Exception(
"malloc", errno);
533 if (-1 == ::munmap(addr, len))
555 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
556 unsigned char*
p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
558 unsigned char*
p3 =
reinterpret_cast<unsigned char*
>(
m_end);
559 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
560 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
571 PagePool::PagePool(
unsigned nPgPerGroup) :
573 {
std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0); }
575 PagePool::~PagePool()
578 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
583 void PagePool::zap(Pages& p)
587 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
588 if ((*it)->contains(p)) {
595 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
599 Pages PagePool::pop()
603 const int sz = nextChunkSz();
604 Chunk *c =
new Chunk(
this,
606 m_chunks.push_front(c);
622 ChunkList::iterator it = std::find(
625 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
628 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
629 if (m_chunks.end() == it)
630 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
637 void PagePool::putOnFreeList(
PageChunk* chunk)
643 void PagePool::updateCurSz(
int sz,
int incr)
645 m_szmap[(sz - minsz) / szincr] += incr;
647 for (
int i = (maxsz - minsz) / szincr; i--; ) {
649 m_cursz += i * szincr;
655 int PagePool::nextChunkSz()
const
659 if (m_chunks.empty()) {
667 if (1 != m_chunks.size()) {
677 if (sz > maxsz) sz = maxsz;
678 if (sz < minsz) sz = minsz;
684 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
686 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
687 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
710 m_pages(pagepool().
pop())
721 m_pages(pagepool().
pop()), m_busylist(0),
m_freelist(0), m_dirtylist(0),
722 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
723 m_parentPid(::getpid())
728 int fds[4] = { -1, -1, -1, -1 };
730 static bool firstcall =
true;
743 for (
unsigned i = 1; i <
TotPages; ++i)
746 if (!useSocketpair) {
748 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
749 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
751 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
767 if (-1 == ::
close(fds[0]) || (-1 == ::
close(fds[3]))) {
772 fds[0] = fds[3] = -1;
777 if (-1 == ::
close(fds[0])) {
787 for (std::list<BidirMMapPipe*>::iterator it =
s_openpipes.begin();
804 throw Exception(
"handshake: xferraw write", EPIPE);
806 throw Exception(
"handshake: xferraw read", EPIPE);
807 if (
'P' != c)
throw Exception(
"handshake", EPIPE);
813 if (-1 == ::
close(fds[1]) || -1 == ::
close(fds[2])) {
818 fds[1] = fds[2] = -1;
823 if (-1 == ::
close(fds[1])) {
840 throw Exception(
"handshake: xferraw write", EPIPE);
842 throw Exception(
"handshake: xferraw read", EPIPE);
843 if (
'C' != c)
throw Exception(
"handshake", EPIPE);
849 if (-1 == ::fcntl(
m_outpipe, F_GETFD, &fdflags))
851 fdflags |= FD_CLOEXEC;
852 if (-1 == ::fcntl(
m_outpipe, F_SETFD, fdflags))
855 if (-1 == ::fcntl(
m_inpipe, F_GETFD, &fdflags))
857 fdflags |= FD_CLOEXEC;
858 if (-1 == ::fcntl(
m_inpipe, F_SETFD, fdflags))
866 for (
int i = 0; i < 4; ++i)
867 if (-1 != fds[i] && 0 != fds[i])
::close(fds[i]);
898 if (!force)
throw Exception(
"close", errno);
920 while ((err = ::
poll(&fds, 1, 1 << 20)) >= 0) {
921 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
922 if (fds.revents & POLLIN) {
927 }
while (0 > err && EINTR == errno);
932 if (!force)
throw Exception(
"close", errno);
941 }
catch (
const std::exception& e) {
951 }
while (-1 == tmp && EINTR == errno);
953 if (!force)
throw Exception(
"waitpid", errno);
958 std::list<BidirMMapPipe*>::iterator it = std::find(
974 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
976 ssize_t tmp = xferfn(fd, buf, len);
982 }
else if (0 == tmp) {
985 }
else if (-1 == tmp) {
992 if (xferred)
return xferred;
996 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
999 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1000 __FILE__ <<
", line " << __LINE__ <<
1001 "): expect transfer to block!" << std::endl;
1007 throw Exception(
"xferraw: unexpected return value from read/write",
1017 unsigned char pg =
m_pages[plist];
1022 for (
Page* p = plist; p; p = p->next()) {
1023 if (
sizeof(
Page) + p->size() !=
1026 throw Exception(
"sendpages: short write", EPIPE);
1031 throw Exception(
"sendpages: short write", EPIPE);
1033 }
else {
assert(plist); }
1039 unsigned retVal = 0;
1040 Page *plisthead = 0, *plisttail = 0;
1042 plisthead = plisttail =
m_pages[pg];
1047 for (; plisttail; ++retVal) {
1048 Page* p = plisttail;
1051 plisttail = p->next();
1052 if (!p->size())
continue;
1073 fds.events = POLLIN;
1075 unsigned retVal = 0;
1077 int rc =
::poll(&fds, 1, 0);
1079 if (EINTR == errno)
continue;
1082 if (1 == retVal && fds.revents & POLLIN &&
1083 !(fds.revents & (POLLNVAL | POLLERR))) {
1097 for ( ; p; p = p->next()) ++n;
1106 while (blend && blend->next()) blend = blend->next();
1110 Page *sendlisthead = 0, *sendlisttail = 0;
1120 if (blend) blend->setNext(p);
1131 if (!sendlisthead) sendlisthead = p;
1132 if (sendlisttail) sendlisttail->setNext(p);
1152 sendlisttail->setNext(p);
1160 struct pollfd fds[2];
1162 fds[0].events = fds[0].revents = 0;
1165 fds[1].events = fds[1].revents = 0;
1167 fds[0].events |= POLLIN;
1171 retVal =
::poll(fds, nfds, 0);
1172 if (0 > retVal && EINTR == errno)
1177 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1179 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1181 if (ok && fds[0].revents & POLLIN) {
1183 if (!ret) ok =
false;
1192 throw Exception(
"feedPageLists: poll", errno);
1206 while (dl && dl->next()) dl = dl->next();
1207 if (dl) dl->setNext(p);
1228 if (p)
while (p->next()) p = p->next();
1229 if (!p || p->full()) {
1244 Page *flushlisthead = 0, *flushlisttail = 0;
1247 if (!forcePartialPages && !p->full())
break;
1252 if (!flushlisthead) flushlisthead = p;
1253 if (flushlisttail) flushlisttail->setNext(p);
1256 if (flushlisthead)
sendpages(flushlisthead);
1265 while (l && l->next()) l = l->next();
1283 retVal += p->size() - p->pos();
1294 bool couldwrite =
false;
1298 fds.events = POLLOUT;
1302 retVal =
::poll(&fds, 1, 0);
1304 if (EINTR == errno)
continue;
1305 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1307 if (1 == retVal && fds.revents & POLLOUT &&
1308 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1315 unsigned npages = 0;
1321 retVal += p->free();
1326 npages <
FlushThresh || couldwrite); p = p->next()) {
1328 retVal += Page::capacity();
1337 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1346 unsigned char* pp = p->begin() + p->pos();
1348 std::copy(pp, pp + csz, ap);
1353 assert(p->size() >= p->pos());
1354 if (p->size() == p->pos()) {
1373 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1382 unsigned char* pp = p->begin() + p->size();
1384 std::copy(ap, ap + csz, pp);
1389 assert(p->capacity() >= p->size());
1409 bool canskiptimeout =
false;
1411 std::vector<unsigned>::iterator mit = masks.begin();
1412 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1414 PollEntry& pe = *it;
1434 while (dl && dl->next()) dl = dl->next();
1435 if (dl && dl->pos() < Page::capacity())
1439 if (pe.
revents) canskiptimeout =
true;
1442 std::vector<pollfd> fds;
1443 fds.reserve(2 * pipes.size());
1444 std::map<int, PollEntry*> fds2pipes;
1445 for (PollVector::const_iterator it = pipes.begin();
1446 pipes.end() != it; ++it) {
1447 const PollEntry& pe = *it;
1449 fds2pipes.insert(std::make_pair((tmp.fd = pe.
pipe->
m_inpipe),
1450 const_cast<PollEntry*>(&pe)));
1451 tmp.events = tmp.revents = 0;
1454 tmp.events |= POLLIN;
1458 fds2pipes.insert(std::make_pair(
1460 const_cast<PollEntry*>(&pe)));
1470 retVal =
::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1472 if (EINTR == errno)
continue;
1478 for (std::vector<pollfd>::iterator it = fds.begin();
1479 fds.end() != it; ++it) {
1483 PollEntry& pe = *fds2pipes[fe.fd];
1485 if (fe.revents & POLLNVAL && fe.fd == pe.
pipe->
m_inpipe)
1489 if (fe.revents & POLLERR && fe.fd == pe.
pipe->
m_inpipe)
1493 if (fe.revents & POLLHUP && fe.fd == pe.
pipe->
m_inpipe)
1497 if ((fe.revents & POLLIN) && fe.fd == pe.
pipe->
m_inpipe &&
1498 !(fe.revents & (POLLNVAL | POLLERR))) {
1504 int tmp =
::poll(&fe, 1, 0);
1505 if (tmp > 0)
goto oncemore;
1507 if (EINTR == errno)
continue;
1519 while (dl && dl->next()) dl = dl->next();
1520 if (dl && dl->pos() < Page::capacity())
1527 mit = masks.begin();
1528 for (PollVector::iterator it = pipes.begin();
1529 pipes.end() != it; ++it, ++mit)
1530 if ((it->revents &= *mit)) ++npipes;
1536 size_t sz = std::strlen(str);
1538 if (sz)
write(str, sz);
1547 str =
reinterpret_cast<char*
>(std::realloc(str, sz + 1));
1548 if (!str)
throw Exception(
"realloc", errno);
1549 if (sz)
read(str, sz);
1557 size_t sz = str.size();
1559 write(str.data(), sz);
1570 for (
unsigned char c; sz--; str.push_back(c)) *
this >> c;
1577 #ifdef TEST_BIDIRMMAPPIPE
1578 using namespace RooFit;
1583 while (pipe.
good() && !pipe.
eof()) {
1587 if (!pipe)
return -1;
1588 if (pipe.
eof())
break;
1590 std::cout <<
"[CHILD] : read: " << str << std::endl;
1591 str =
"... early in the morning?";
1595 if (str.empty())
break;
1596 if (!pipe)
return -1;
1597 if (pipe.
eof())
break;
1598 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1608 ::srand48(::getpid());
1616 for (
int i = 0; i < 5; ++i) {
1618 ::usleep(
int(1e6 * ::drand48()));
1619 std::ostringstream buf;
1620 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1621 std::string str = buf.str();
1622 std::cout <<
"[CHILD] : " << str << std::endl;
1624 if (!pipe)
return -1;
1625 if (pipe.
eof())
break;
1641 while (pipe && !pipe.
eof()) {
1648 if (pipe.
eof())
break;
1651 if (!std::strlen(str))
break;
1662 while (pipe && !pipe.
eof()) {
1664 if (!std::strlen(str))
break;
1676 for (
unsigned i = 0; i <= 24; ++i) {
1677 str =
reinterpret_cast<char*
>(std::realloc(str, (1 << i) + 1));
1678 std::memset(str,
'4', 1 << i);
1680 for (
unsigned j = 0; j < 1 << 7; ++j) {
1682 if (!pipe || pipe.
eof()) {
1703 int retVal = childexec(*p);
1710 #include <sys/time.h>
1716 std::cout <<
"[PARENT]: simple challenge-response test, "
1717 "one child:" << std::endl;
1719 for (
int i = 0; i < 5; ++i) {
1720 std::string str(
"What shall we do with a drunken sailor...");
1722 if (!*pipe)
return -1;
1723 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1725 if (!*pipe)
return -1;
1726 std::cout <<
"[PARENT]: read: " << str << std::endl;
1733 int retVal = pipe->
close();
1734 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1736 if (retVal)
return retVal;
1742 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1743 " children:" << std::endl;
1749 for (
unsigned i = 0; i < nch; ++i) {
1750 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1751 pipes.push_back(PollEntry(spawnChild(randomchild),
1755 std::cout <<
"[PARENT]: waking up children" << std::endl;
1756 for (
unsigned i = 0; i < nch; ++i)
1757 *pipes[i].pipe <<
"" << BidirMMapPipe::flush;
1758 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1760 while (!pipes.empty()) {
1764 for (std::vector<PollEntry>::iterator it = pipes.begin();
1765 npipes && pipes.end() != it; ) {
1777 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1778 ": " << s << std::endl;
1783 *(it->pipe) <<
"" << BidirMMapPipe::flush;
1791 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1803 int retVal = it->pipe->close();
1804 std::cout <<
"[PARENT]: child exit status: " <<
1805 retVal <<
", number of children still alive: " <<
1806 (pipes.size() - 1) << std::endl;
1807 if (retVal)
return retVal;
1809 it = pipes.erase(it);
1817 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1818 for (
unsigned i = 0; i <= 24; ++i) {
1819 char *s =
new char[1 + (1 << i)];
1820 std::memset(s,
'A', 1 << i);
1822 const unsigned n = 1 << 7;
1823 double avg = 0.,
min = 1e42, max = -1e42;
1825 for (
unsigned j = n; j--; ) {
1827 ::gettimeofday(&
t1, 0);
1829 if (!*pipe || pipe->
eof())
break;
1831 if (!*pipe || pipe->
eof())
break;
1833 ::gettimeofday(&
t2, 0);
1834 t2.tv_sec -=
t1.tv_sec;
1835 t2.tv_usec -=
t1.tv_usec;
1838 if (dt > max) max = dt;
1846 avg *= 1e6;
min *= 1e6; max *= 1e6;
1847 int retVal = pipe->
close();
1849 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1856 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1857 " avg " << std::setw(7) << avg <<
" us min " <<
1858 std::setw(7) <<
min <<
" us max " << std::setw(7) << max <<
1859 "us speed " << std::setw(9) <<
1860 2. * (
double(1 << i) /
double(1 << 20) / (1e-6 * avg)) <<
1861 " MB/s" << std::endl;
1864 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1868 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1869 for (
unsigned i = 0; i <= 24; ++i) {
1870 char *s =
new char[1 + (1 << i)];
1871 std::memset(s,
'A', 1 << i);
1873 const unsigned n = 1 << 7;
1874 double avg = 0.,
min = 1e42, max = -1e42;
1876 for (
unsigned j = n; j--; ) {
1878 ::gettimeofday(&
t1, 0);
1881 if (!*pipe || pipe->
eof())
break;
1883 ::gettimeofday(&
t2, 0);
1884 t2.tv_sec -=
t1.tv_sec;
1885 t2.tv_usec -=
t1.tv_usec;
1888 if (dt > max) max = dt;
1896 avg *= 1e6;
min *= 1e6; max *= 1e6;
1897 int retVal = pipe->
close();
1899 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1903 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1904 " avg " << std::setw(7) << avg <<
" us min " <<
1905 std::setw(7) <<
min <<
" us max " << std::setw(7) << max <<
1906 "us speed " << std::setw(9) <<
1908 " MB/s" << std::endl;
1911 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1915 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1917 double avg = 0.,
min = 1e42, max = -1e42;
1918 unsigned n = 0, bsz = 0;
1920 while (*pipe && !pipe->
eof()) {
1922 ::gettimeofday(&
t1, 0);
1925 if (!*pipe || pipe->
eof())
break;
1927 ::gettimeofday(&
t2, 0);
1928 t2.tv_sec -=
t1.tv_sec;
1929 t2.tv_usec -=
t1.tv_usec;
1931 if (std::strlen(s)) {
1934 if (dt > max) max = dt;
1936 bsz = std::strlen(s);
1941 avg *= 1e6;
min *= 1e6; max *= 1e6;
1943 std::cout <<
"block size " << std::setw(9) << bsz <<
1944 " avg " << std::setw(7) << avg <<
" us min " <<
1945 std::setw(7) <<
min <<
" us max " << std::setw(7) <<
1946 max <<
"us speed " << std::setw(9) <<
1948 " MB/s" << std::endl;
1955 int retVal = pipe->
close();
1956 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1957 if (retVal)
return retVal;
1963 #endif // TEST_BIDIRMMAPPIPE
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
bool eof() const
true if end-of-file
static Vc_ALWAYS_INLINE int_v min(const int_v &x, const int_v &y)
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
unsigned m_refcnt
reference counter
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
BidirMMapPipeException Exception
convenience typedef
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
static MMapVariety s_mmapworks
mmap variety that works on this system
unsigned m_nUsedGrp
number of used page groups
pid_t m_childPid
pid of the child (zero if we're child)
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
mmapping a temp file works
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
pages shared (child + parent)
static MMapVariety mmapVariety()
return mmap variety support found
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
Page * busypage()
get a busy page to read data from (may block)
if(pyself &&pyself!=Py_None)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
unsigned m_nPgPerGrp
number of pages per group
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
Double_t length(const TVector2 &v)
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
logical failure (e.g. pipe closed)
void * m_begin
pointer to start of mmapped area
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
error reporting with exceptions
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
MMapVariety
type of mmap support found
void zap(Pages &p)
free all pages except for those pointed to by p
unsigned recvpages()
receive a pages from the other end (may block), queue them
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static unsigned long masks[]
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
pipe has data for reading
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
nothing special on this pipe
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
static unsigned pagesize()
return the page size of the system
unsigned pageno(Page *p) const
perform page to page number mapping
#define BEGIN_NAMESPACE_ROOFIT