28#include <XrdCl/XrdClURL.hh>
29#include <XrdCl/XrdClFile.hh>
30#include <XrdCl/XrdClXRootDResponses.hh>
31#include <XrdCl/XrdClDefaultEnv.hh>
32#include <XrdVersion.hh>
39class TAsyncOpenHandler:
public XrdCl::ResponseHandler
56 virtual void HandleResponse(XrdCl::XRootDStatus *status,
57 XrdCl::AnyObject *response)
81class TAsyncReadvHandler:
public XrdCl::ResponseHandler
88 TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
91 fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
98 virtual void HandleResponse(XrdCl::XRootDStatus *status,
99 XrdCl::AnyObject *response)
101 fStatuses->at(fStatusIndex) = status;
108 std::vector<XrdCl::XRootDStatus*> *fStatuses;
132 TFile(url,
"NET", title, compress)
134 using namespace XrdCl;
138 if (val.IsNull()) val =
gEnv->
GetValue(
"NetXNG.Debug",
"");
139 if (!val.IsNull()) XrdCl::DefaultEnv::SetLogLevel(val.Data());
145 TUrl urlnoanchor(url);
147 fUrl =
new URL(std::string(urlnoanchor.
GetUrl()));
152 fUrl->SetProtocol(std::string(
"root"));
158 Error(
"Open",
"could not parse open mode %s", mode);
179 TAsyncOpenHandler *handler =
new TAsyncOpenHandler(
this);
180 status =
fFile->Open(
fUrl->GetURL(),
fMode, Access::None, handler);
181 if (!status.IsOK()) {
182 Error(
"Open",
"%s", status.ToStr().c_str());
190 if (!status.IsOK()) {
191#if XrdVNUMBER >= 40000
192 if( status.code == errRedirect )
193 fNewUrl = status.GetErrorMessage().c_str();
195 Error(
"Open",
"%s", status.ToStr().c_str());
197 Error(
"Open",
"%s", status.ToStr().c_str());
203 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) ||
204 (
fMode & OpenFlags::Update) )
209 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) )
235 using namespace XrdCl;
238 if (
gDebug > 1)
Info(
"Init",
"TFile::Init already called once");
274 using namespace XrdCl;
280 bool forceStat =
true;
281 if(
fMode == XrdCl::OpenFlags::Read )
285 if( !
fFile->Stat( forceStat, info ).IsOK() )
297 return fFile->IsOpen();
320 XrdCl::XRootDStatus status =
fFile->Close();
321 if (!status.IsOK()) {
322 Error(
"Close",
"%s", status.ToStr().c_str());
338 using namespace XrdCl;
340 OpenFlags::Flags mode;
345 if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
346 Error(
"ReOpen",
"mode must be either READ or UPDATE, not %s", modestr);
351 if (mode ==
fMode || (mode == OpenFlags::Update
352 &&
fMode == OpenFlags::New)) {
356 XRootDStatus st =
fFile->Close();
358 Error(
"ReOpen",
"%s", st.ToStr().c_str());
366 Error(
"ReOpen",
"%s", st.ToStr().c_str());
395 using namespace XrdCl;
397 Info(
"ReadBuffer",
"offset: %lld length: %d", position, length);
416 uint32_t bytesRead = 0;
417 XRootDStatus st =
fFile->Read(
fOffset, length, buffer, bytesRead);
419 Info(
"ReadBuffer",
"%s bytes read: %u", st.ToStr().c_str(), bytesRead);
422 Error(
"ReadBuffer",
"%s", st.ToStr().c_str());
426 if ((
Int_t)bytesRead != length) {
427 Error(
"ReadBuffer",
"error reading all requested bytes, got %u of %d",
463 using namespace XrdCl;
469 std::vector<ChunkList> chunkLists;
471 std::vector<XRootDStatus*> *statuses;
473 Int_t totalBytes = 0;
475 char *cursor = buffer;
481 for (
Int_t i = 0; i < nbuffs; i++)
485 for (
Int_t i = 0; i < nbuffs; ++i) {
486 totalBytes += length[i];
495 for (j = 0; j < nsplit; ++j) {
497 chunks.push_back(ChunkInfo(offset,
fReadvIorMax, cursor));
503 chunks.push_back(ChunkInfo(offset, rem, cursor));
506 chunks.push_back(ChunkInfo(position[i], length[i], cursor));
512 chunkLists.push_back(chunks);
513 chunks = ChunkList();
515 chunkLists.push_back(ChunkList(chunks.begin(),
517 chunks = ChunkList(chunks.begin() +
fReadvIovMax, chunks.end());
522 if( !chunks.empty() )
523 chunkLists.push_back(chunks);
525 TAsyncReadvHandler *handler;
528 statuses =
new std::vector<XRootDStatus*>(chunkLists.size());
531 std::vector<ChunkList>::iterator it;
532 for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
534 handler =
new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
536 status =
fFile->VectorRead(*it, 0, handler);
538 if (!status.IsOK()) {
539 Error(
"ReadBuffers",
"%s", status.ToStr().c_str());
545 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
550 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
551 XRootDStatus *st = statuses->at(it - chunkLists.begin());
554 Error(
"ReadBuffers",
"%s", st->ToStr().c_str());
555 for( ; it != chunkLists.end(); ++it )
557 st = statuses->at( it - chunkLists.begin() );
576 gPerfStats->FileReadEvent(
this, totalBytes, start);
596 using namespace XrdCl;
604 Info(
"WriteBuffer",
"file not writable");
617 XRootDStatus st =
fFile->Write(
fOffset, length, buffer);
619 Error(
"WriteBuffer",
"%s", st.ToStr().c_str());
640 Info(
"Flush",
"file not writable - do nothing");
648 XrdCl::XRootDStatus status =
fFile->Sync();
650 Error(
"Flush",
"%s", status.ToStr().c_str());
653 Info(
"Flush",
"XrdClient::Sync succeeded.");
679 XrdCl::OpenFlags::Flags &mode,
682 using namespace XrdCl;
683 modestr =
ToUpper(TString(in));
685 if (modestr ==
"NEW" || modestr ==
"CREATE") mode = OpenFlags::New;
686 else if (modestr ==
"RECREATE") mode = OpenFlags::Delete;
687 else if (modestr ==
"UPDATE") mode = OpenFlags::Update;
688 else if (modestr ==
"READ") mode = OpenFlags::Read;
694 mode = OpenFlags::Read;
706 Error(
"TNetXNGFile",
"Object is in 'zombie' state");
711 Error(
"TNetXNGFile",
"The remote file is not open");
724 using namespace XrdCl;
733#if XrdVNUMBER >= 40000
734 std::string dataServerStr;
735 if( !
fFile->GetProperty(
"DataServer", dataServerStr ) )
737 URL dataServer(dataServerStr);
739 URL dataServer(
fFile->GetDataServer());
741 FileSystem fs(dataServer);
744 arg.FromString(std::string(
"readv_ior_max readv_iov_max"));
746 XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
753 std::vector<TString> resps;
754 while (TString(response->ToString()).Tokenize(token, from,
"\n"))
755 resps.push_back(token);
757 if (resps.size() != 2)
760 if (resps[0].IsDigit())
763 if (resps[1].IsDigit())
784 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
785 const char *cenv = 0;
789 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CONNECTIONWINDOW"))
790 || strlen(cenv) <= 0))
791 env->PutInt(
"ConnectionWindow", val.Atoi());
794 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CONNECTIONRETRY"))
795 || strlen(cenv) <= 0))
796 env->PutInt(
"RequestTimeout", val.Atoi());
799 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_REQUESTTIMEOUT"))
800 || strlen(cenv) <= 0))
801 env->PutInt(
"RequestTimeout", val.Atoi());
803 val =
gEnv->
GetValue(
"NetXNG.SubStreamsPerChannel",
"");
804 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_SUBSTREAMSPERCHANNEL"))
805 || strlen(cenv) <= 0))
806 env->PutInt(
"SubStreamsPerChannel", val.Atoi());
809 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_TIMEOUTRESOLUTION"))
810 || strlen(cenv) <= 0))
811 env->PutInt(
"TimeoutResolution", val.Atoi());
814 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_STREAMERRORWINDOW"))
815 || strlen(cenv) <= 0))
816 env->PutInt(
"StreamErrorWindow", val.Atoi());
819 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_RUNFORKHANDLER"))
820 || strlen(cenv) <= 0))
821 env->PutInt(
"RunForkHandler", val.Atoi());
824 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_REDIRECTLIMIT"))
825 || strlen(cenv) <= 0))
826 env->PutInt(
"RedirectLimit", val.Atoi());
829 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_WORKERTHREADS"))
830 || strlen(cenv) <= 0))
831 env->PutInt(
"WorkerThreads", val.Atoi());
834 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CPCHUNKSIZE"))
835 || strlen(cenv) <= 0))
836 env->PutInt(
"CPChunkSize", val.Atoi());
839 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CPPARALLELCHUNKS"))
840 || strlen(cenv) <= 0))
841 env->PutInt(
"CPParallelChunks", val.Atoi());
844 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_POLLERPREFERENCE"))
845 || strlen(cenv) <= 0))
846 env->PutString(
"PollerPreference", val.Data());
849 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CLIENTMONITOR"))
850 || strlen(cenv) <= 0))
851 env->PutString(
"ClientMonitor", val.Data());
854 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CLIENTMONITORPARAM"))
855 || strlen(cenv) <= 0))
856 env->PutString(
"ClientMonitorParam", val.Data());
859 env->PutInt(
"MultiProtocol",
gEnv->
GetValue(
"TFile.CrossProtocolRedirects", 1));
868 if (val.Length() > 0)
872 if (val.Length() > 0)
876 if (val.Length() > 0)
880 if (val.Length() > 0)
884 if (val.Length() > 0)
888 if (val.Length() > 0)
892 if (val.Length() > 0)
896 if (val.Length() > 0)
900 if (val.Length() > 0)
904 if (val.Length() > 0)
908 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN"))
909 || strlen(cenv) <= 0))
913 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK"))
914 || strlen(cenv) <= 0))
918 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY"))
919 || strlen(cenv) <= 0))
923 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY"))
924 || strlen(cenv) <= 0))
928 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG"))
929 || strlen(cenv) <= 0))
933 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV"))
934 || strlen(cenv) <= 0))
TString ToUpper(const TString &s)
Return an upper-case version of str.
R__EXTERN TSystem * gSystem
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
TArchiveMember * GetMember() const
Long64_t GetDecompressedSize() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t fWritable
True if directory is writable.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Int_t fReadCalls
Number of read calls ( not counting the cache calls )
Long64_t fBytesRead
Number of bytes read from this file.
TArchiveFile * fArchive
!Archive file from which we read this file
virtual void Close(Option_t *option="")
Close a file.
TList * fOpenPhases
!Time info about open phases
Int_t WriteBufferViaCache(const char *buf, Int_t len)
Write buffer via cache.
Int_t ReadBufferViaCache(char *buf, Int_t len)
Read buffer via cache.
Long64_t fArchiveOffset
!Offset at which file starts in archive
virtual void Init(Bool_t create)
Initialize a TFile object.
Long64_t GetRelOffset() const
TString fOption
File options.
EAsyncOpenStatus
Asynchronous open request status.
Bool_t FlushWriteCache()
Flush the write cache if active.
Long64_t fBytesWrite
Number of bytes written to this file.
Bool_t fInitDone
!True if the file has been initialized
virtual void SetOffset(Long64_t offset, ERelativeTo pos=kBeg)
Set position from where to start reading.
Long64_t fOffset
!Seek offset cache
static std::atomic< Long64_t > fgBytesWrite
Number of bytes written by all TFile objects.
EAsyncOpenStatus fAsyncOpenStatus
!Status of an asynchronous open request
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
virtual void SetEnv()
Map ROOT and xrootd environment variables.
virtual Bool_t IsUseable() const
Check the file is open and isn't a zombie.
virtual void SetAsyncOpenStatus(EAsyncOpenStatus status)
Set the status of an asynchronous file open.
XrdCl::OpenFlags::Flags fMode
virtual void Close(const Option_t *option="")
Close the file.
Int_t ParseOpenMode(Option_t *in, TString &modestr, XrdCl::OpenFlags::Flags &mode, Bool_t assumeRead)
Parse a file open mode given as a string into a canonically formatted output mode string and an integ...
virtual Int_t ReOpen(Option_t *modestr)
Reopen the file with the new access mode.
virtual Bool_t WriteBuffer(const char *buffer, Int_t length)
Write a data chunk.
virtual Bool_t ReadBuffer(char *buffer, Int_t length)
Read a data chunk of the given size.
virtual Bool_t ReadBuffers(char *buffer, Long64_t *position, Int_t *length, Int_t nbuffs)
Read scattered data chunks in one operation.
virtual Long64_t GetSize() const
Get the file size.
XrdSysCondVar * fInitCondVar
virtual Bool_t IsOpen() const
Check if the file is open.
virtual void Init(Bool_t create)
Initialize the file.
virtual ~TNetXNGFile()
Destructor.
virtual void Flush()
Synchronize a file's in-memory and on-disk states.
virtual Bool_t GetVectorReadLimits()
Find the server-specific readv config params.
virtual void Seek(Long64_t offset, ERelativeTo position=kBeg)
Set the position within the file.
R__ALWAYS_INLINE Bool_t IsZombie() const
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
virtual const char * Getenv(const char *env)
Get environment variable.
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
The TTimeStamp encapsulates seconds and ns since EPOCH.
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
void SetAnchor(const char *anchor)
virtual Bool_t SendFileOpenProgress(TFile *, TList *, const char *, Bool_t=kFALSE)
virtual Bool_t SendFileReadProgress(TFile *)