30 #include <sys/socket.h> 34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit { 35 #define END_NAMESPACE_ROOFIT } 46 class BidirMMapPipeException :
public std::exception
55 static int dostrerror_r(
int err,
char* buf, std::size_t sz,
56 int (*
f)(
int,
char*, std::size_t))
57 {
return f(err, buf, sz); }
59 static int dostrerror_r(
int,
char*, std::size_t,
60 char* (*
f)(
int,
char*, std::size_t));
63 BidirMMapPipeException(
const char* msg,
int err);
65 virtual const char* what()
const noexcept {
return m_buf; }
68 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
70 std::size_t msgsz = std::strlen(msg);
72 msgsz = std::min(msgsz, std::size_t(s_sz));
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
85 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
86 std::size_t sz,
char* (*
f)(
int,
char*, std::size_t))
89 char *tmp =
f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
93 if (std::strlen(tmp) > sz - 1)
return ERANGE;
113 unsigned short m_size;
114 unsigned short m_pos;
121 Page() : m_next(0), m_size(0), m_pos(0)
125 assert(std::numeric_limits<unsigned short>::max() >=
129 void setNext(
const Page* p);
133 unsigned short& size() {
return m_size; }
135 unsigned size()
const {
return m_size; }
137 unsigned short& pos() {
return m_pos; }
139 unsigned pos()
const {
return m_pos; }
141 inline unsigned char* begin()
const 142 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
145 inline unsigned char* end()
const 146 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
149 static unsigned capacity()
152 bool empty()
const {
return !m_size; }
154 bool filled()
const {
return !empty(); }
156 unsigned free()
const {
return capacity() - m_size; }
158 unsigned remaining()
const {
return m_size - m_pos; }
160 bool full()
const {
return !
free(); }
163 void Page::setNext(
const Page* p)
168 const char*
p1 =
reinterpret_cast<char*
>(
this);
169 const char*
p2 =
reinterpret_cast<const char*
>(p);
170 std::ptrdiff_t tmp = p2 -
p1;
176 assert(m_next == tmp);
182 Page* Page::next()
const 184 if (!m_next)
return 0;
185 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
187 return reinterpret_cast<Page*
>(ptmp);
210 typedef BidirMMapPipeException
Exception;
220 typedef std::list<Chunk*> ChunkList;
251 unsigned m_szmap[(maxsz - minsz) / szincr];
258 void updateCurSz(
int sz,
int incr);
260 int nextChunkSz()
const;
262 void putOnFreeList(Chunk* chunk);
264 void release(Chunk* chunk);
297 if (&other ==
this)
return *
this;
311 assert(pgno < m_pimpl->m_npages);
312 unsigned char* pptr =
315 return reinterpret_cast<Page*
>(pptr);
320 const unsigned char* pptr =
321 reinterpret_cast<const unsigned char*
>(p);
322 const unsigned char* bptr =
324 assert(0 == ((pptr - bptr) %
pagesize()));
325 const unsigned nr = (pptr - bptr) /
pagesize();
326 assert(nr < m_pimpl->m_npages);
333 long pgsz = sysconf(_SC_PAGESIZE);
334 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
335 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
345 unsigned length,
unsigned nPgPerGroup) :
348 reinterpret_cast<unsigned char*>(
m_begin) + length)),
352 unsigned char* p =
reinterpret_cast<unsigned char*
>(
m_begin);
353 unsigned char* pend =
reinterpret_cast<unsigned char*
>(
m_end);
355 m_freelist.push_back(reinterpret_cast<void*>(p));
356 p += nPgPerGroup * PagePool::pagesize();
382 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
386 if (wasempty)
m_parent->putOnFreeList(
this);
406 static bool msgprinted =
false;
408 #if defined(MAP_ANONYMOUS) 410 #define MYANONFLAG MAP_ANONYMOUS 411 #elif defined(MAP_ANON) 413 #define MYANONFLAG MAP_ANON 418 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419 MYANONFLAG | MAP_SHARED, -1, 0);
420 if (MAP_FAILED == retVal) {
426 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
427 __FILE__ <<
", line " << __LINE__ <<
428 "): anonymous mmapping works, excellent!" <<
440 int fd =
::open(
"/dev/zero", O_RDWR);
442 throw Exception(
"open /dev/zero", errno);
443 void* retVal = ::mmap(0, len,
444 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445 if (MAP_FAILED == retVal) {
453 if (-1 == ::
close(fd))
454 throw Exception(
"close /dev/zero", errno);
456 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
457 ", line " << __LINE__ <<
"): mmapping /dev/zero works, " 458 "very good!" << std::endl;
464 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
467 if (-1 == (fd = ::mkstemp(name)))
throw Exception(
"mkstemp", errno);
469 if (-1 == ::unlink(name)) {
475 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
481 if (1 != ::
write(fd, name, 1)) {
487 void* retVal = ::mmap(0, len,
488 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489 if (MAP_FAILED == retVal) {
497 if (-1 == ::
close(fd)) {
499 ::munmap(retVal, len);
503 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
504 ", line " << __LINE__ <<
"): mmapping temporary files " 505 "works, good!" << std::endl;
516 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
517 ", line " << __LINE__ <<
"): anonymous mmapping of " 518 "shared buffers failed, falling back to read/write on " 519 " pipes!" << std::endl;
524 if (!retVal)
throw Exception(
"malloc", errno);
538 if (-1 == ::munmap(addr, len))
560 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
561 unsigned char*
p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
563 unsigned char*
p3 =
reinterpret_cast<unsigned char*
>(
m_end);
564 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
576 PagePool::PagePool(
unsigned nPgPerGroup) :
582 const unsigned mult =
586 const unsigned actual = mult *
587 (desired / mult + bool(desired % mult));
590 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
591 __FILE__ <<
", line " << __LINE__ <<
593 ", subdividing into logical pages of size " <<
601 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
604 PagePool::~PagePool()
607 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
612 void PagePool::zap(
Pages& p)
616 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617 if ((*it)->contains(p)) {
624 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
628 Pages PagePool::pop()
632 const int sz = nextChunkSz();
633 Chunk *c =
new Chunk(
this,
635 m_chunks.push_front(c);
649 assert(chunk->
empty());
651 ChunkList::iterator it = std::find(
654 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
657 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658 if (m_chunks.end() == it)
659 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
666 void PagePool::putOnFreeList(
PageChunk* chunk)
668 assert(!chunk->
full());
672 void PagePool::updateCurSz(
int sz,
int incr)
674 m_szmap[(sz - minsz) / szincr] += incr;
676 for (
int i = (maxsz - minsz) / szincr; i--; ) {
678 m_cursz += i * szincr;
684 int PagePool::nextChunkSz()
const 688 if (m_chunks.empty()) {
696 if (1 != m_chunks.size()) {
706 if (sz > maxsz) sz = maxsz;
707 if (sz < minsz) sz = minsz;
722 s_pagepool =
new BidirMMapPipe_impl::PagePool(TotPages);
728 pthread_mutex_lock(&s_openpipesmutex);
729 while (!s_openpipes.empty()) {
731 pthread_mutex_unlock(&s_openpipesmutex);
734 pthread_mutex_lock(&s_openpipesmutex);
736 pthread_mutex_unlock(&s_openpipesmutex);
740 m_pages(pagepool().
pop())
758 int fds[4] = { -1, -1, -1, -1 };
760 static bool firstcall =
true;
773 for (
unsigned i = 1; i <
TotPages; ++i)
776 if (!useSocketpair) {
778 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
779 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
781 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
797 if (-1 == ::
close(fds[0]) || (-1 == ::
close(fds[3]))) {
802 fds[0] = fds[3] = -1;
807 if (-1 == ::
close(fds[0])) {
817 for (std::list<BidirMMapPipe*>::iterator it =
s_openpipes.begin();
834 throw Exception(
"handshake: xferraw write", EPIPE);
836 throw Exception(
"handshake: xferraw read", EPIPE);
837 if (
'P' != c)
throw Exception(
"handshake", EPIPE);
843 if (-1 == ::
close(fds[1]) || -1 == ::
close(fds[2])) {
848 fds[1] = fds[2] = -1;
853 if (-1 == ::
close(fds[1])) {
870 throw Exception(
"handshake: xferraw write", EPIPE);
872 throw Exception(
"handshake: xferraw read", EPIPE);
873 if (
'C' != c)
throw Exception(
"handshake", EPIPE);
879 if (-1 == ::fcntl(
m_outpipe, F_GETFD, &fdflags))
881 fdflags |= FD_CLOEXEC;
882 if (-1 == ::fcntl(
m_outpipe, F_SETFD, fdflags))
885 if (-1 == ::fcntl(
m_inpipe, F_GETFD, &fdflags))
887 fdflags |= FD_CLOEXEC;
888 if (-1 == ::fcntl(
m_inpipe, F_SETFD, fdflags))
896 for (
int i = 0; i < 4; ++i)
897 if (-1 != fds[i] && 0 != fds[i])
::close(fds[i]);
928 if (!force)
throw Exception(
"close", errno);
950 while ((err = ::
poll(&fds, 1, 1 << 20)) >= 0) {
951 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
952 if (fds.revents & POLLIN) {
957 }
while (0 > err && EINTR == errno);
962 if (!force)
throw Exception(
"close", errno);
971 }
catch (
const std::exception&
e) {
981 }
while (-1 == tmp && EINTR == errno);
983 if (!force)
throw Exception(
"waitpid", errno);
988 std::list<BidirMMapPipe*>::iterator it = std::find(
1001 ssize_t (*xferfn)(
int,
void*, std::size_t))
1004 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
1006 ssize_t tmp = xferfn(fd, buf, len);
1012 }
else if (0 == tmp) {
1015 }
else if (-1 == tmp) {
1022 if (xferred)
return xferred;
1026 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN 1029 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1030 __FILE__ <<
", line " << __LINE__ <<
1031 "): expect transfer to block!" << std::endl;
1037 throw Exception(
"xferraw: unexpected return value from read/write",
1047 unsigned char pg =
m_pages[plist];
1052 for (
Page* p = plist; p; p = p->next()) {
1053 if (
sizeof(
Page) + p->size() !=
1056 throw Exception(
"sendpages: short write", EPIPE);
1061 throw Exception(
"sendpages: short write", EPIPE);
1063 }
else { assert(plist); }
1069 unsigned retVal = 0;
1070 Page *plisthead = 0, *plisttail = 0;
1072 plisthead = plisttail =
m_pages[pg];
1077 for (; plisttail; ++retVal) {
1078 Page* p = plisttail;
1081 plisttail = p->next();
1082 if (!p->size())
continue;
1103 fds.events = POLLIN;
1105 unsigned retVal = 0;
1107 int rc =
::poll(&fds, 1, 0);
1109 if (EINTR == errno)
continue;
1112 if (1 == retVal && fds.revents & POLLIN &&
1113 !(fds.revents & (POLLNVAL | POLLERR))) {
1127 for ( ; p; p = p->next()) ++n;
1136 while (blend && blend->next()) blend = blend->next();
1140 Page *sendlisthead = 0, *sendlisttail = 0;
1150 if (blend) blend->setNext(p);
1161 if (!sendlisthead) sendlisthead = p;
1162 if (sendlisttail) sendlisttail->setNext(p);
1182 sendlisttail->setNext(p);
1190 struct pollfd fds[2];
1192 fds[0].events = fds[0].revents = 0;
1195 fds[1].events = fds[1].revents = 0;
1197 fds[0].events |= POLLIN;
1201 retVal =
::poll(fds, nfds, 0);
1202 if (0 > retVal && EINTR == errno)
1207 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1209 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1211 if (ok && fds[0].revents & POLLIN) {
1213 if (!ret) ok =
false;
1222 throw Exception(
"feedPageLists: poll", errno);
1236 while (dl && dl->next()) dl = dl->next();
1237 if (dl) dl->setNext(p);
1258 if (p)
while (p->next()) p = p->next();
1259 if (!p || p->full()) {
1274 Page *flushlisthead = 0, *flushlisttail = 0;
1277 if (!forcePartialPages && !p->full())
break;
1282 if (!flushlisthead) flushlisthead = p;
1283 if (flushlisttail) flushlisttail->setNext(p);
1286 if (flushlisthead)
sendpages(flushlisthead);
1295 while (l && l->next()) l = l->next();
1313 retVal += p->size() - p->pos();
1324 bool couldwrite =
false;
1328 fds.events = POLLOUT;
1332 retVal =
::poll(&fds, 1, 0);
1334 if (EINTR == errno)
continue;
1335 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1337 if (1 == retVal && fds.revents & POLLOUT &&
1338 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1345 unsigned npages = 0;
1351 retVal += p->free();
1356 npages <
FlushThresh || couldwrite); p = p->next()) {
1358 retVal += Page::capacity();
1367 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1376 unsigned char* pp = p->begin() + p->pos();
1378 std::copy(pp, pp + csz, ap);
1383 assert(p->size() >= p->pos());
1384 if (p->size() == p->pos()) {
1403 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1412 unsigned char* pp = p->begin() + p->size();
1414 std::copy(ap, ap + csz, pp);
1419 assert(p->capacity() >= p->size());
1439 bool canskiptimeout =
false;
1441 std::vector<unsigned>::iterator mit = masks.begin();
1442 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1464 while (dl && dl->next()) dl = dl->next();
1465 if (dl && dl->pos() < Page::capacity())
1469 if (pe.
revents) canskiptimeout =
true;
1472 std::vector<pollfd> fds;
1473 fds.reserve(2 * pipes.size());
1474 std::map<int, PollEntry*> fds2pipes;
1475 for (PollVector::const_iterator it = pipes.begin();
1476 pipes.end() != it; ++it) {
1479 fds2pipes.insert(std::make_pair((tmp.fd = pe.
pipe->
m_inpipe),
1480 const_cast<PollEntry*>(&pe)));
1481 tmp.events = tmp.revents = 0;
1484 tmp.events |= POLLIN;
1488 fds2pipes.insert(std::make_pair(
1490 const_cast<PollEntry*>(&pe)));
1500 retVal =
::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1502 if (EINTR == errno)
continue;
1508 for (std::vector<pollfd>::iterator it = fds.begin();
1509 fds.end() != it; ++it) {
1515 if (fe.revents & POLLNVAL && fe.fd == pe.
pipe->
m_inpipe)
1519 if (fe.revents & POLLERR && fe.fd == pe.
pipe->
m_inpipe)
1523 if (fe.revents & POLLHUP && fe.fd == pe.
pipe->
m_inpipe)
1527 if ((fe.revents & POLLIN) && fe.fd == pe.
pipe->
m_inpipe &&
1528 !(fe.revents & (POLLNVAL | POLLERR))) {
1534 int tmp =
::poll(&fe, 1, 0);
1535 if (tmp > 0)
goto oncemore;
1537 if (EINTR == errno)
continue;
1549 while (dl && dl->next()) dl = dl->next();
1550 if (dl && dl->pos() < Page::capacity())
1557 mit = masks.begin();
1558 for (PollVector::iterator it = pipes.begin();
1559 pipes.end() != it; ++it, ++mit)
1560 if ((it->revents &= *mit)) ++npipes;
1566 size_t sz = std::strlen(str);
1568 if (sz)
write(str, sz);
1577 str =
reinterpret_cast<char*
>(
std::realloc(str, sz + 1));
1578 if (!str)
throw Exception(
"realloc", errno);
1579 if (sz)
read(str, sz);
1587 size_t sz = str.size();
1589 write(str.data(), sz);
1600 for (
unsigned char c; sz--; str.push_back(c)) *
this >> c;
1607 #ifdef TEST_BIDIRMMAPPIPE 1613 while (pipe.
good() && !pipe.
eof()) {
1617 if (!pipe)
return -1;
1618 if (pipe.
eof())
break;
1620 std::cout <<
"[CHILD] : read: " << str << std::endl;
1621 str =
"... early in the morning?";
1625 if (str.empty())
break;
1626 if (!pipe)
return -1;
1627 if (pipe.
eof())
break;
1628 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1638 ::srand48(::getpid());
1646 for (
int i = 0; i < 5; ++i) {
1648 ::usleep(
int(1e6 * ::drand48()));
1649 std::ostringstream buf;
1650 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1651 std::string str = buf.str();
1652 std::cout <<
"[CHILD] : " << str << std::endl;
1654 if (!pipe)
return -1;
1655 if (pipe.
eof())
break;
1671 while (pipe && !pipe.
eof()) {
1678 if (pipe.
eof())
break;
1681 if (!std::strlen(str))
break;
1692 while (pipe && !pipe.
eof()) {
1694 if (!std::strlen(str))
break;
1706 for (
unsigned i = 0; i <= 24; ++i) {
1707 str =
reinterpret_cast<char*
>(
std::realloc(str, (1 << i) + 1));
1708 std::memset(str,
'4', 1 << i);
1710 for (
unsigned j = 0; j < 1 << 7; ++j) {
1712 if (!pipe || pipe.
eof()) {
1733 int retVal = childexec(*p);
1740 #include <sys/time.h> 1746 std::cout <<
"[PARENT]: simple challenge-response test, " 1747 "one child:" << std::endl;
1749 for (
int i = 0; i < 5; ++i) {
1750 std::string str(
"What shall we do with a drunken sailor...");
1752 if (!*pipe)
return -1;
1753 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1755 if (!*pipe)
return -1;
1756 std::cout <<
"[PARENT]: read: " << str << std::endl;
1763 int retVal = pipe->
close();
1764 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1766 if (retVal)
return retVal;
1772 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1773 " children:" << std::endl;
1779 for (
unsigned i = 0; i < nch; ++i) {
1780 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1781 pipes.push_back(PollEntry(spawnChild(randomchild),
1785 std::cout <<
"[PARENT]: waking up children" << std::endl;
1786 for (
unsigned i = 0; i < nch; ++i)
1788 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1790 while (!pipes.empty()) {
1794 for (std::vector<PollEntry>::iterator it = pipes.begin();
1795 npipes && pipes.end() != it; ) {
1807 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1808 ": " << s << std::endl;
1821 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1833 int retVal = it->pipe->close();
1834 std::cout <<
"[PARENT]: child exit status: " <<
1835 retVal <<
", number of children still alive: " <<
1836 (pipes.size() - 1) << std::endl;
1837 if (retVal)
return retVal;
1839 it = pipes.erase(it);
1847 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848 for (
unsigned i = 0; i <= 24; ++i) {
1849 char *s =
new char[1 + (1 << i)];
1850 std::memset(s,
'A', 1 << i);
1852 const unsigned n = 1 << 7;
1853 double avg = 0., min = 1e42, max = -1e42;
1855 for (
unsigned j = n; j--; ) {
1857 ::gettimeofday(&t1, 0);
1859 if (!*pipe || pipe->
eof())
break;
1861 if (!*pipe || pipe->
eof())
break;
1863 ::gettimeofday(&t2, 0);
1864 t2.tv_sec -= t1.tv_sec;
1865 t2.tv_usec -= t1.tv_usec;
1866 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867 if (dt < min) min = dt;
1868 if (dt > max) max = dt;
1876 avg *= 1e6; min *= 1e6; max *= 1e6;
1877 int retVal = pipe->
close();
1879 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1886 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1887 " avg " << std::setw(7) << avg <<
" us min " <<
1888 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1889 "us speed " << std::setw(9) <<
1890 2. * (double(1 << i) / double(1 << 20) / (1
e-6 * avg)) <<
1891 " MB/s" << std::endl;
1894 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1898 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1899 for (
unsigned i = 0; i <= 24; ++i) {
1900 char *s =
new char[1 + (1 << i)];
1901 std::memset(s,
'A', 1 << i);
1903 const unsigned n = 1 << 7;
1904 double avg = 0., min = 1e42, max = -1e42;
1906 for (
unsigned j = n; j--; ) {
1908 ::gettimeofday(&t1, 0);
1911 if (!*pipe || pipe->
eof())
break;
1913 ::gettimeofday(&t2, 0);
1914 t2.tv_sec -= t1.tv_sec;
1915 t2.tv_usec -= t1.tv_usec;
1916 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1917 if (dt < min) min = dt;
1918 if (dt > max) max = dt;
1926 avg *= 1e6; min *= 1e6; max *= 1e6;
1927 int retVal = pipe->
close();
1929 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1933 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1934 " avg " << std::setw(7) << avg <<
" us min " <<
1935 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1936 "us speed " << std::setw(9) <<
1937 (double(1 << i) / double(1 << 20) / (1
e-6 * avg)) <<
1938 " MB/s" << std::endl;
1941 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1945 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1947 double avg = 0., min = 1e42, max = -1e42;
1948 unsigned n = 0, bsz = 0;
1950 while (*pipe && !pipe->
eof()) {
1952 ::gettimeofday(&t1, 0);
1955 if (!*pipe || pipe->
eof())
break;
1957 ::gettimeofday(&t2, 0);
1958 t2.tv_sec -= t1.tv_sec;
1959 t2.tv_usec -= t1.tv_usec;
1960 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1961 if (std::strlen(s)) {
1963 if (dt < min) min = dt;
1964 if (dt > max) max = dt;
1966 bsz = std::strlen(s);
1971 avg *= 1e6; min *= 1e6; max *= 1e6;
1973 std::cout <<
"block size " << std::setw(9) << bsz <<
1974 " avg " << std::setw(7) << avg <<
" us min " <<
1975 std::setw(7) << min <<
" us max " << std::setw(7) <<
1976 max <<
"us speed " << std::setw(9) <<
1977 (double(bsz) / double(1 << 20) / (1
e-6 * avg)) <<
1978 " MB/s" << std::endl;
1985 int retVal = pipe->
close();
1986 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1987 if (retVal)
return retVal;
1993 #endif // TEST_BIDIRMMAPPIPE std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
bool empty() const
return true if no used page groups in this chunk
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
unsigned m_refcnt
reference counter
bool bad() const
true on I/O error
Page * m_freelist
linked list: free pages
write pipe in end-of-file state
don't know yet what'll work
BidirMMapPipeException Exception
convenience typedef
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or'ed bitmask)
bool isChild() const
return if this end of the pipe is the child end
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
static MMapVariety s_mmapworks
mmap variety that works on this system
unsigned m_nUsedGrp
number of used page groups
pid_t m_childPid
pid of the child (zero if we're child)
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
Page * page(unsigned pgno) const
return page number pageno
mmapping a temp file works
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
size_type read(void *addr, size_type sz)
read from pipe
static MMapVariety mmapVariety()
return mmap variety support found
void flush()
flush buffers with unwritten data
logical failure (e.g. pipe closed)
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
static unsigned physPgSz()
return the physical page size of the system
Page * busypage()
get a busy page to read data from (may block)
unsigned nPagesPerGroup() const
return number of pages per page group
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
static unsigned s_physpgsz
system physical page size
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
bool eof() const
true if end-of-file
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
void * m_begin
pointer to start of mmapped area
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
bool isParent() const
return if this end of the pipe is the parent end
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
unsigned pageno(Page *p) const
perform page to page number mapping
MMapVariety
type of mmap support found
void zap(Pages &p)
free all pages except for those pointed to by p
void Copy(void *source, void *dest)
unsigned recvpages()
receive a pages from the other end (may block), queue them
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
bool contains(const Pages &p) const
return if p is contained in this PageChunk
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
unsigned len() const
return length of chunk
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
error reporting with exceptions
unsigned npages() const
return number of pages accessible
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
pages shared (child + parent)
static unsigned long masks[]
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
bool full() const
return true if no free page groups in this chunk
pipe has data for reading
bool good() const
status of stream is good
nothing special on this pipe
bool closed() const
true if closed
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
static unsigned pagesize()
return the logical page size
int main(int argc, char **argv)
#define BEGIN_NAMESPACE_ROOFIT