Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TUDPSocket.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Marcelo Sousa 26/10/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\file TUDPSocket.cxx
14\class TUDPSocket
15\brief This class implements UDP client sockets.
16\note This class deals with sockets: the user is entirely responsible for the security of their usage, for example, but
17not limited to, the management of the connections to said sockets.
18
19A socket is an endpoint for communication between two machines. The actual work is done via the TSystem class (either
20TUnixSystem or TWinNTSystem).
21**/
22
23#include "Bytes.h"
24#include "Compression.h"
25#include "NetErrors.h"
26#include "TError.h"
27#include "TMessage.h"
28#include "TUDPSocket.h"
29#include "TObjString.h"
30#include "TPluginManager.h"
31#include "TROOT.h"
32#include "TString.h"
33#include "TSystem.h"
34#include "TUrl.h"
35#include "TVirtualAuth.h"
36#include "TStreamerInfo.h"
37#include "TProcessID.h"
38
39#include <limits>
40
43
44
46
47////////////////////////////////////////////////////////////////////////////////
48/// Create a socket. Connect to the named service at address addr.
49/// Use tcpwindowsize to specify the size of the receive buffer, it has
50/// to be specified here to make sure the window scale option is set (for
51/// tcpwindowsize > 65KB and for platforms supporting window scaling).
52/// Returns when connection has been accepted by remote side. Use IsValid()
53/// to check the validity of the socket. Every socket is added to the TROOT
54/// sockets list which will make sure that any open sockets are properly
55/// closed on program termination.
56
58 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
59{
62
64 fSecContext = 0;
67 if (fService.Contains("root"))
69 if (fService.Contains("proof"))
71 fAddress = addr;
73 fBytesSent = 0;
74 fBytesRecv = 0;
75 fUUIDs = 0;
76 fLastUsageMtx = 0;
78
79 if (fAddress.GetPort() != -1) {
81 -1, "upd");
82
83 if (fSocket != -1) {
85 gROOT->GetListOfSockets()->Add(this);
86 }
87 } else
88 fSocket = -1;
89
90}
91
92
93////////////////////////////////////////////////////////////////////////////////
94/// Create a socket. Connect to the specified port # at address addr.
95/// Use tcpwindowsize to specify the size of the receive buffer, it has
96/// to be specified here to make sure the window scale option is set (for
97/// tcpwindowsize > 65KB and for platforms supporting window scaling).
98/// Returns when connection has been accepted by remote side. Use IsValid()
99/// to check the validity of the socket. Every socket is added to the TROOT
100/// sockets list which will make sure that any open sockets are properly
101/// closed on program termination.
102
104 : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
105{
108
110 fSecContext = 0;
111 fRemoteProtocol= -1;
113 if (fService.Contains("root"))
115 if (fService.Contains("proof"))
117 fAddress = addr;
118 fAddress.fPort = port;
120 fBytesSent = 0;
121 fBytesRecv = 0;
122 fUUIDs = 0;
123 fLastUsageMtx = 0;
125
127 -1, "upd");
128 if (fSocket == -1)
129 fAddress.fPort = -1;
130 else {
132 gROOT->GetListOfSockets()->Add(this);
133 }
134}
135
136////////////////////////////////////////////////////////////////////////////////
137/// Create a socket. Connect to named service on the remote host.
138/// Use tcpwindowsize to specify the size of the receive buffer, it has
139/// to be specified here to make sure the window scale option is set (for
140/// tcpwindowsize > 65KB and for platforms supporting window scaling).
141/// Returns when connection has been accepted by remote side. Use IsValid()
142/// to check the validity of the socket. Every socket is added to the TROOT
143/// sockets list which will make sure that any open sockets are properly
144/// closed on program termination.
145
146TUDPSocket::TUDPSocket(const char *host, const char *service)
147 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
148{
151
153 fSecContext = 0;
154 fRemoteProtocol= -1;
156 if (fService.Contains("root"))
158 if (fService.Contains("proof"))
163 fBytesSent = 0;
164 fBytesRecv = 0;
165 fUUIDs = 0;
166 fLastUsageMtx = 0;
168
169 if (fAddress.GetPort() != -1) {
170 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
171 if (fSocket != -1) {
173 gROOT->GetListOfSockets()->Add(this);
174 }
175 } else
176 fSocket = -1;
177}
178
179////////////////////////////////////////////////////////////////////////////////
180/// Create a socket; see CreateAuthSocket for the form of url.
181/// Connect to the specified port # on the remote host.
182/// If user is specified in url, try authentication as user.
183/// Use tcpwindowsize to specify the size of the receive buffer, it has
184/// to be specified here to make sure the window scale option is set (for
185/// tcpwindowsize > 65KB and for platforms supporting window scaling).
186/// Returns when connection has been accepted by remote side. Use IsValid()
187/// to check the validity of the socket. Every socket is added to the TROOT
188/// sockets list which will make sure that any open sockets are properly
189/// closed on program termination.
190
192 : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
193{
196
197 fUrl = TString(url);
198 TString host(TUrl(fUrl).GetHost());
199
201 fSecContext = 0;
202 fRemoteProtocol= -1;
204 if (fUrl.Contains("root"))
206 if (fUrl.Contains("proof"))
209 fAddress.fPort = port;
212 fBytesSent = 0;
213 fBytesRecv = 0;
214 fUUIDs = 0;
215 fLastUsageMtx = 0;
217
218 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
219 if (fSocket == -1) {
220 fAddress.fPort = -1;
221 } else {
223 gROOT->GetListOfSockets()->Add(this);
224 }
225}
226
227////////////////////////////////////////////////////////////////////////////////
228/// Create a socket in the Unix domain on 'sockpath'.
229/// Returns when connection has been accepted by the server. Use IsValid()
230/// to check the validity of the socket. Every socket is added to the TROOT
231/// sockets list which will make sure that any open sockets are properly
232/// closed on program termination.
233
235 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
236{
239
240 fUrl = sockpath;
241
242 fService = "unix";
243 fSecContext = 0;
244 fRemoteProtocol= -1;
246 fAddress.fPort = -1;
247 fName.Form("unix:%s", sockpath);
249 fBytesSent = 0;
250 fBytesRecv = 0;
251 fUUIDs = 0;
252 fLastUsageMtx = 0;
254
255 fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
256 if (fSocket > 0) {
258 gROOT->GetListOfSockets()->Add(this);
259 }
260}
261
262////////////////////////////////////////////////////////////////////////////////
263/// Create a socket. The socket will adopt previously opened TCP socket with
264/// descriptor desc.
265
266TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
267{
270
271 fSecContext = 0;
272 fRemoteProtocol = 0;
273 fService = (char *)kSOCKD;
275 fBytesSent = 0;
276 fBytesRecv = 0;
277 fUUIDs = 0;
278 fLastUsageMtx = 0;
280
281 if (desc >= 0) {
282 fSocket = desc;
285 gROOT->GetListOfSockets()->Add(this);
286 } else
287 fSocket = -1;
288}
289
290////////////////////////////////////////////////////////////////////////////////
291/// Create a socket. The socket will adopt previously opened Unix socket with
292/// descriptor desc. The sockpath arg is for info purposes only. Use
293/// this method to adopt e.g. a socket created via socketpair().
294
296 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
297{
300
301 fUrl = sockpath;
302
303 fService = "unix";
304 fSecContext = 0;
305 fRemoteProtocol= -1;
307 fAddress.fPort = -1;
308 fName.Form("unix:%s", sockpath);
310 fBytesSent = 0;
311 fBytesRecv = 0;
312 fUUIDs = 0;
313 fLastUsageMtx = 0;
315
316 if (desc >= 0) {
317 fSocket = desc;
319 gROOT->GetListOfSockets()->Add(this);
320 } else
321 fSocket = -1;
322}
323
324
325////////////////////////////////////////////////////////////////////////////////
326/// TUDPSocket copy ctor.
327
329{
330 fSocket = s.fSocket;
331 fService = s.fService;
332 fAddress = s.fAddress;
340 fUUIDs = 0;
341 fLastUsageMtx = 0;
343
344 if (fSocket != -1) {
346 gROOT->GetListOfSockets()->Add(this);
347 }
348}
349
350////////////////////////////////////////////////////////////////////////////////
351/// Close the socket. If option is "force", calls shutdown(id,2) to
352/// shut down the connection. This will close the connection also
353/// for the parent of this process. Also called via the dtor (without
354/// option "force", call explicitly Close("force") if this is desired).
355
357{
358 Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
359
360 if (fSocket != -1) {
363 gROOT->GetListOfSockets()->Remove(this);
364 }
365 fSocket = -1;
366
369}
370
371////////////////////////////////////////////////////////////////////////////////
372/// Return internet address of local host to which the socket is bound.
373/// In case of error TInetAddress::IsValid() returns kFALSE.
374
376{
377 if (IsValid()) {
378 if (fLocalAddress.GetPort() == -1)
380 return fLocalAddress;
381 }
382 return TInetAddress();
383}
384
385////////////////////////////////////////////////////////////////////////////////
386/// Return the local port # to which the socket is bound.
387/// In case of error return -1.
388
390{
391 if (IsValid()) {
392 if (fLocalAddress.GetPort() == -1)
394 return fLocalAddress.GetPort();
395 }
396 return -1;
397}
398
399////////////////////////////////////////////////////////////////////////////////
400/// Waits for this socket to change status. If interest=kRead,
401/// the socket will be watched to see if characters become available for
402/// reading; if interest=kWrite the socket will be watched to
403/// see if a write will not block.
404/// The argument 'timeout' specifies a maximum time to wait in millisec.
405/// Default no timeout.
406/// Returns 1 if a change of status of interest has been detected within
407/// timeout; 0 in case of timeout; < 0 if an error occured.
408
410{
411 Int_t rc = 1;
412
413 // Associate a TFileHandler to this socket
415
416 // Wait for an event now
417 rc = gSystem->Select(&fh, timeout);
418
419 return rc;
420}
421
422////////////////////////////////////////////////////////////////////////////////
423/// Send a single message opcode. Use kind (opcode) to set the
424/// TMessage "what" field. Returns the number of bytes that were sent
425/// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
426/// been or'ed with kMESS_ACK, the call will only return after having
427/// received an acknowledgement, making the sending process synchronous.
428
430{
431 TMessage mess(kind);
432
433 Int_t nsent;
434 if ((nsent = Send(mess)) < 0)
435 return -1;
436
437 return nsent;
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Send a status and a single message opcode. Use kind (opcode) to set the
442/// TMessage "what" field. Returns the number of bytes that were sent
443/// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
444/// been or'ed with kMESS_ACK, the call will only return after having
445/// received an acknowledgement, making the sending process synchronous.
446
448{
449 TMessage mess(kind);
450 mess << status;
451
452 Int_t nsent;
453 if ((nsent = Send(mess)) < 0)
454 return -1;
455
456 return nsent;
457}
458
459////////////////////////////////////////////////////////////////////////////////
460/// Send a character string buffer. Use kind to set the TMessage "what" field.
461/// Returns the number of bytes in the string str that were sent and -1 in
462/// case of error. In case the kind has been or'ed with kMESS_ACK, the call
463/// will only return after having received an acknowledgement, making the
464/// sending process synchronous.
465
466Int_t TUDPSocket::Send(const char *str, Int_t kind)
467{
468 TMessage mess(kind);
469 if (str) mess.WriteString(str);
470
471 Int_t nsent;
472 if ((nsent = Send(mess)) < 0)
473 return -1;
474
475 return nsent - sizeof(Int_t); // - TMessage::What()
476}
477
478////////////////////////////////////////////////////////////////////////////////
479/// Send a TMessage object. Returns the number of bytes in the TMessage
480/// that were sent and -1 in case of error. In case the TMessage::What
481/// has been or'ed with kMESS_ACK, the call will only return after having
482/// received an acknowledgement, making the sending process synchronous.
483/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
484/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
485/// support for streaming TStreamerInfo added by Rene Brun May 2008
486/// support for streaming TProcessID added by Rene Brun June 2008
487
489{
491
492 if (fSocket == -1) return -1;
493
494 if (mess.IsReading()) {
495 Error("Send", "cannot send a message used for reading");
496 return -1;
497 }
498
499 // send streamer infos in case schema evolution is enabled in the TMessage
501
502 // send the process id's so TRefs work
504
505 mess.SetLength(); //write length in first word of buffer
506
507 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
509
510 if (mess.GetCompressionLevel() > 0)
511 const_cast<TMessage&>(mess).Compress();
512
513 char *mbuf = mess.Buffer();
514 Int_t mlen = mess.Length();
515 if (mess.CompBuffer()) {
516 mbuf = mess.CompBuffer();
517 mlen = mess.CompLength();
518 }
519
521 Int_t nsent;
522 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
523 if (nsent == -5) {
524 // Connection reset by peer or broken
526 Close();
527 }
528 return nsent;
529 }
530
531 fBytesSent += nsent;
533
534 // If acknowledgement is desired, wait for it
535 if (mess.What() & kMESS_ACK) {
538 char buf[2];
539 Int_t n = 0;
540 if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
541 if (n == -5) {
542 // Connection reset by peer or broken
544 Close();
545 } else
546 n = -1;
547 return n;
548 }
549 if (strncmp(buf, "ok", 2)) {
550 Error("Send", "bad acknowledgement");
551 return -1;
552 }
553 fBytesRecv += 2;
554 fgBytesRecv += 2;
555 }
556
557 Touch(); // update usage timestamp
558
559 return nsent - sizeof(UInt_t); //length - length header
560}
561
562////////////////////////////////////////////////////////////////////////////////
563/// Send an object. Returns the number of bytes sent and -1 in case of error.
564/// In case the "kind" has been or'ed with kMESS_ACK, the call will only
565/// return after having received an acknowledgement, making the sending
566/// synchronous.
567
569{
570 //stream object to message buffer
571 TMessage mess(kind);
572 mess.WriteObject(obj);
573
574 //now sending the object itself
575 Int_t nsent;
576 if ((nsent = Send(mess)) < 0)
577 return -1;
578
579 return nsent;
580}
581
582////////////////////////////////////////////////////////////////////////////////
583/// Send a raw buffer of specified length. Using option kOob one can send
584/// OOB data. Returns the number of bytes sent or -1 in case of error.
585/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
586/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
587
589{
591
592 if (fSocket == -1) return -1;
593
595 Int_t nsent;
596 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
597 if (nsent == -5) {
598 // Connection reset or broken: close
600 Close();
601 }
602 return nsent;
603 }
604
605 fBytesSent += nsent;
607
608 Touch(); // update usage timestamp
609
610 return nsent;
611}
612
613////////////////////////////////////////////////////////////////////////////////
614/// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
615/// in the object in the message is in the fInfos list of the message.
616/// We send only the TStreamerInfos not yet sent on this socket.
617
619{
620 if (mess.fInfos && mess.fInfos->GetEntries()) {
621 TIter next(mess.fInfos);
623 TList *minilist = 0;
624 while ((info = (TStreamerInfo*)next())) {
625 Int_t uid = info->GetNumber();
626 if (fBitsInfo.TestBitNumber(uid))
627 continue; //TStreamerInfo had already been sent
629 if (!minilist)
630 minilist = new TList();
631 if (gDebug > 0)
632 Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
633 info->GetName(),info->GetClassVersion());
634 minilist->Add(info);
635 }
636 if (minilist) {
638 messinfo.WriteObject(minilist);
639 delete minilist;
640 if (messinfo.fInfos)
641 messinfo.fInfos->Clear();
642 if (Send(messinfo) < 0)
643 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
644 }
645 }
646}
647
648////////////////////////////////////////////////////////////////////////////////
649/// Check if TProcessIDs must be sent. The list of TProcessIDs
650/// in the object in the message is found by looking in the TMessage bits.
651/// We send only the TProcessIDs not yet send on this socket.
652
654{
655 if (mess.TestBitNumber(0)) {
657 Int_t npids = pids->GetEntries();
658 TProcessID *pid;
659 TList *minilist = 0;
660 for (Int_t ipid = 0; ipid < npids; ipid++) {
661 pid = (TProcessID*)pids->At(ipid);
662 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
663 continue;
664 //check if a pid with this title has already been sent through the socket
665 //if not add it to the fUUIDs list
666 if (!fUUIDs) {
667 fUUIDs = new TList();
668 } else {
669 if (fUUIDs->FindObject(pid->GetTitle()))
670 continue;
671 }
672 fUUIDs->Add(new TObjString(pid->GetTitle()));
673 if (!minilist)
674 minilist = new TList();
675 if (gDebug > 0)
676 Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
677 minilist->Add(pid);
678 }
679 if (minilist) {
681 messpid.WriteObject(minilist);
682 delete minilist;
683 if (Send(messpid) < 0)
684 Warning("SendProcessIDs", "problems sending TProcessID's ...");
685 }
686 }
687}
688
689////////////////////////////////////////////////////////////////////////////////
690/// Receive a character string message of maximum max length. The expected
691/// message must be of type kMESS_STRING. Returns length of received string
692/// (can be 0 if otherside of connection is closed) or -1 in case of error
693/// or -4 in case a non-blocking socket would block (i.e. there is nothing
694/// to be read).
695
697{
698 Int_t n, kind;
699
701 if ((n = Recv(str, max, kind)) <= 0) {
702 if (n == -5) {
704 n = -1;
705 }
706 return n;
707 }
708
709 if (kind != kMESS_STRING) {
710 Error("Recv", "got message of wrong kind (expected %d, got %d)",
711 kMESS_STRING, kind);
712 return -1;
713 }
714
715 return n;
716}
717
718////////////////////////////////////////////////////////////////////////////////
719/// Receive a character string message of maximum max length. Returns in
720/// kind the message type. Returns length of received string+4 (can be 0 if
721/// other side of connection is closed) or -1 in case of error or -4 in
722/// case a non-blocking socket would block (i.e. there is nothing to be read).
723
724Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
725{
726 Int_t n;
727 TMessage *mess;
728
730 if ((n = Recv(mess)) <= 0) {
731 if (n == -5) {
733 n = -1;
734 }
735 return n;
736 }
737
738 kind = mess->What();
739 if (str) {
740 if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
741 mess->ReadString(str, max);
742 else
743 str[0] = 0;
744 }
745
746 delete mess;
747
748 return n; // number of bytes read (len of str + sizeof(kind)
749}
750
751////////////////////////////////////////////////////////////////////////////////
752/// Receives a status and a message type. Returns length of received
753/// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
754/// is closed) or -1 in case of error or -4 in case a non-blocking
755/// socket would block (i.e. there is nothing to be read).
756
758{
759 Int_t n;
760 TMessage *mess;
761
763 if ((n = Recv(mess)) <= 0) {
764 if (n == -5) {
766 n = -1;
767 }
768 return n;
769 }
770
771 kind = mess->What();
772 (*mess) >> status;
773
774 delete mess;
775
776 return n; // number of bytes read (2 * sizeof(Int_t)
777}
778
779////////////////////////////////////////////////////////////////////////////////
780/// Receive a TMessage object. The user must delete the TMessage object.
781/// Returns length of message in bytes (can be 0 if other side of connection
782/// is closed) or -1 in case of error or -4 in case a non-blocking socket
783/// would block (i.e. there is nothing to be read) or -5 if pipe broken
784/// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
785
787{
789
790 if (fSocket == -1) {
791 mess = 0;
792 return -1;
793 }
794
797 Int_t n;
798 UInt_t len;
799 if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
800 if (n == 0 || n == -5) {
801 // Connection closed, reset or broken
803 Close();
804 }
805 mess = 0;
806 return n;
807 }
808 len = net2host(len); //from network to host byte order
809
810 if (len > (std::numeric_limits<decltype(len)>::max() - sizeof(decltype(len)))) {
811 Error("Recv", "Buffer length is %u and %u+sizeof(UInt_t) cannot be represented as an UInt_t.", len, len);
812 return -1;
813 }
814
816 char *buf = new char[len+sizeof(UInt_t)];
817 if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
818 if (n == 0 || n == -5) {
819 // Connection closed, reset or broken
821 Close();
822 }
823 delete [] buf;
824 mess = 0;
825 return n;
826 }
827
828 fBytesRecv += n + sizeof(UInt_t);
829 fgBytesRecv += n + sizeof(UInt_t);
830
831 mess = new TMessage(buf, len+sizeof(UInt_t));
832
833 // receive any streamer infos
835 goto oncemore;
836
837 // receive any process ids
838 if (RecvProcessIDs(mess))
839 goto oncemore;
840
841 if (mess->What() & kMESS_ACK) {
843 char ok[2] = { 'o', 'k' };
844 Int_t n2 = 0;
845 if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
846 if (n2 == -5) {
847 // Connection reset or broken
849 Close();
850 }
851 delete mess;
852 mess = 0;
853 return n2;
854 }
855 mess->SetWhat(mess->What() & ~kMESS_ACK);
856
857 fBytesSent += 2;
858 fgBytesSent += 2;
859 }
860
861 Touch(); // update usage timestamp
862
863 return n;
864}
865
866////////////////////////////////////////////////////////////////////////////////
867/// Receive a raw buffer of specified length bytes. Using option kPeek
868/// one can peek at incoming data. Returns number of received bytes.
869/// Returns -1 in case of error. In case of opt == kOob: -2 means
870/// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
871/// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
872/// peer (EPIPE || ECONNRESET).
873
875{
877
878 if (fSocket == -1) return -1;
879 if (length == 0) return 0;
880
882 Int_t n;
883 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
884 if (n == 0 || n == -5) {
885 // Connection closed, reset or broken
887 Close();
888 }
889 return n;
890 }
891
892 fBytesRecv += n;
893 fgBytesRecv += n;
894
895 Touch(); // update usage timestamp
896
897 return n;
898}
899
900////////////////////////////////////////////////////////////////////////////////
901/// Receive a message containing streamer infos. In case the message contains
902/// streamer infos they are imported, the message will be deleted and the
903/// method returns kTRUE.
904
906{
907 if (mess->What() == kMESS_STREAMERINFO) {
908 TList *list = (TList*)mess->ReadObject(TList::Class());
909 TIter next(list);
911 TObjLink *lnk = list->FirstLink();
912 // First call BuildCheck for regular class
913 while (lnk) {
914 info = (TStreamerInfo*)lnk->GetObject();
915 TObject *element = info->GetElements()->UncheckedAt(0);
916 Bool_t isstl = element && strcmp("This",element->GetName())==0;
917 if (!isstl) {
918 info->BuildCheck();
919 if (gDebug > 0)
920 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
921 info->GetName(), info->GetClassVersion());
922 }
923 lnk = lnk->Next();
924 }
925 // Then call BuildCheck for stl class
926 lnk = list->FirstLink();
927 while (lnk) {
928 info = (TStreamerInfo*)lnk->GetObject();
929 TObject *element = info->GetElements()->UncheckedAt(0);
930 Bool_t isstl = element && strcmp("This",element->GetName())==0;
931 if (isstl) {
932 info->BuildCheck();
933 if (gDebug > 0)
934 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
935 info->GetName(), info->GetClassVersion());
936 }
937 lnk = lnk->Next();
938 }
939 delete list;
940 delete mess;
941
942 return kTRUE;
943 }
944 return kFALSE;
945}
946
947////////////////////////////////////////////////////////////////////////////////
948/// Receive a message containing process ids. In case the message contains
949/// process ids they are imported, the message will be deleted and the
950/// method returns kTRUE.
951
953{
954 if (mess->What() == kMESS_PROCESSID) {
955 TList *list = (TList*)mess->ReadObject(TList::Class());
956 TIter next(list);
957 TProcessID *pid;
958 while ((pid = (TProcessID*)next())) {
959 // check that a similar pid is not already registered in fgPIDs
962 TProcessID *p;
963 while ((p = (TProcessID*)nextpid())) {
964 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
965 delete pid;
966 pid = 0;
967 break;
968 }
969 }
970 if (pid) {
971 if (gDebug > 0)
972 Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
973 pid->IncrementCount();
974 pidslist->Add(pid);
975 Int_t ind = pidslist->IndexOf(pid);
976 pid->SetUniqueID((UInt_t)ind);
977 }
978 }
979 delete list;
980 delete mess;
981
982 return kTRUE;
983 }
984 return kFALSE;
985}
986
987////////////////////////////////////////////////////////////////////////////////
988/// Set socket options.
989
991{
992 if (fSocket == -1) return -1;
993
994 return gSystem->SetSockOpt(fSocket, opt, val);
995}
996
997////////////////////////////////////////////////////////////////////////////////
998/// Get socket options. Returns -1 in case of error.
999
1001{
1002 if (fSocket == -1) return -1;
1003
1004 return gSystem->GetSockOpt(fSocket, opt, &val);
1005}
1006
1007////////////////////////////////////////////////////////////////////////////////
1008/// Returns error code. Meaning depends on context where it is called.
1009/// If no error condition returns 0 else a value < 0.
1010/// For example see TServerSocket ctor.
1011
1013{
1014 if (!IsValid())
1015 return fSocket;
1016
1017 return 0;
1018}
1019
1020////////////////////////////////////////////////////////////////////////////////
1021/// See comments for function SetCompressionSettings
1022
1024{
1026 if (fCompress < 0) {
1027 // if the level is not defined yet use 4 as a default (with ZLIB was 1)
1029 } else {
1030 int level = fCompress % 100;
1031 fCompress = 100 * algorithm + level;
1032 }
1033}
1034
1035////////////////////////////////////////////////////////////////////////////////
1036/// See comments for function SetCompressionSettings
1037
1039{
1040 if (level < 0) level = 0;
1041 if (level > 99) level = 99;
1042 if (fCompress < 0) {
1043 // if the algorithm is not defined yet use 0 as a default
1044 fCompress = level;
1045 } else {
1046 int algorithm = fCompress / 100;
1048 fCompress = 100 * algorithm + level;
1049 }
1050}
1051
1052////////////////////////////////////////////////////////////////////////////////
1053/// Used to specify the compression level and algorithm:
1054/// settings = 100 * algorithm + level
1055///
1056/// level = 0, objects written to this file will not be compressed.
1057/// level = 1, minimal compression level but fast.
1058/// ....
1059/// level = 9, maximal compression level but slower and might use more memory.
1060/// (For the currently supported algorithms, the maximum level is 9)
1061/// If compress is negative it indicates the compression level is not set yet.
1062///
1063/// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1064/// algorithm with a number. There is a utility function to help
1065/// to set the value of the argument. For example,
1066/// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1067/// will build an integer which will set the compression to use
1068/// the LZMA algorithm and compression level 1. These are defined
1069/// in the header file Compression.h.
1070///
1071/// Note that the compression settings may be changed at any time.
1072/// The new compression settings will only apply to branches created
1073/// or attached after the setting is changed and other objects written
1074/// after the setting is changed.
1075
1080
1081////////////////////////////////////////////////////////////////////////////////
1082/// Print error string depending on error code.
1083
1084void TUDPSocket::NetError(const char *where, Int_t err)
1085{
1086 // Make sure it is in range
1087 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1088
1089 if (gDebug > 0)
1090 ::Error(where, "%s", gRootdErrStr[err]);
1091}
1092
1093////////////////////////////////////////////////////////////////////////////////
1094/// Get total number of bytes sent via all sockets.
1095
1100
1101////////////////////////////////////////////////////////////////////////////////
1102/// Get total number of bytes received via all sockets.
1103
UShort_t net2host(UShort_t x)
Definition Bytes.h:575
@ kMESS_STRING
@ kMESS_ACK
@ kMESS_PROCESSID
@ kMESS_STREAMERINFO
R__EXTERN const char * gRootdErrStr[]
Definition NetErrors.h:72
@ kErrError
Definition NetErrors.h:69
#define SafeDelete(p)
Definition RConfig.hxx:538
int Int_t
Definition RtypesCore.h:45
long Long_t
Definition RtypesCore.h:54
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:94
unsigned long long ULong64_t
Definition RtypesCore.h:70
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:374
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Int_t gDebug
Definition TROOT.cxx:622
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define gROOT
Definition TROOT.h:414
ESockOptions
Definition TSystem.h:229
ESendRecvOptions
Definition TSystem.h:242
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__LOCKGUARD(mutex)
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
This class represents an Internet Protocol (IP) address.
Int_t GetPort() const
const char * GetHostName() const
A doubly linked list.
Definition TList.h:38
static TClass * Class()
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:576
void Add(TObject *obj) override
Definition TList.h:81
Int_t Compress()
Compress the message.
Definition TMessage.cxx:319
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:174
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:50
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:150
An array of TObjects.
Definition TObjArray.h:31
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:41
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:475
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1057
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:864
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1071
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition TObject.cxx:875
void ResetBit(UInt_t f)
Definition TObject.h:204
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
Int_t IncrementCount()
Increase the reference count to this object.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
Describes a persistent version of a class.
Basic string class.
Definition TString.h:139
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2356
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
virtual int GetServiceByName(const char *service)
Get port # of internet service.
Definition TSystem.cxx:2330
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Definition TSystem.cxx:2321
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:284
virtual char * GetServiceByPort(int port)
Get name of internet service.
Definition TSystem.cxx:2339
virtual int SetSockOpt(int sock, int kind, int val)
Set socket option.
Definition TSystem.cxx:2448
virtual TInetAddress GetPeerName(int sock)
Get Internet Protocol (IP) address of remote host and port #.
Definition TSystem.cxx:2312
virtual int OpenConnection(const char *server, int port, int tcpwindowsize=-1, const char *protocol="tcp")
Open a connection to another host.
Definition TSystem.cxx:2348
virtual int GetSockOpt(int sock, int kind, int *val)
Get socket option.
Definition TSystem.cxx:2457
virtual int RecvRaw(int sock, void *buffer, int length, int flag)
Receive exactly length bytes into buffer.
Definition TSystem.cxx:2411
virtual Int_t Select(TList *active, Long_t timeout)
Select on active file descriptors (called by TMonitor).
Definition TSystem.cxx:445
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition TSystem.cxx:2303
virtual int SendRaw(int sock, const void *buffer, int length, int flag)
Send exactly length bytes from buffer.
Definition TSystem.cxx:2421
virtual void CloseConnection(int sock, Bool_t force=kFALSE)
Close socket connection.
Definition TSystem.cxx:2402
This class implements UDP client sockets.
Definition TUDPSocket.h:37
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
UInt_t fBytesRecv
Definition TUDPSocket.h:50
Int_t GetErrorCode() const
Returns error code.
UInt_t fBytesSent
Definition TUDPSocket.h:51
Int_t fCompress
Definition TUDPSocket.h:52
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
EServiceType fServType
Definition TUDPSocket.h:58
TInetAddress fLocalAddress
Definition TUDPSocket.h:53
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
See comments for function SetCompressionSettings.
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
TVirtualMutex * fLastUsageMtx
Definition TUDPSocket.h:64
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
TInetAddress fAddress
Definition TUDPSocket.h:49
TString fService
Definition TUDPSocket.h:57
static ULong64_t fgBytesRecv
Definition TUDPSocket.h:67
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
virtual Bool_t IsValid() const
Definition TUDPSocket.h:119
Int_t fRemoteProtocol
Definition TUDPSocket.h:54
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
See comments for function SetCompressionSettings.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void Close(Option_t *opt="")
Close the socket.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
static ULong64_t fgBytesSent
Definition TUDPSocket.h:68
Int_t GetCompressionLevel() const
Definition TUDPSocket.h:161
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
TList * fUUIDs
Definition TUDPSocket.h:62
void Touch()
Definition TUDPSocket.h:144
TBits fBitsInfo
Definition TUDPSocket.h:61
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
TString fUrl
Definition TUDPSocket.h:60
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Option_t * GetOption() const override
Definition TUDPSocket.h:83
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
Int_t fSocket
Definition TUDPSocket.h:59
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
TSecContext * fSecContext
Definition TUDPSocket.h:55
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::ostream & Info()
Definition hadd.cxx:171
const Int_t n
Definition legend1.C:16
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:72