#include "TFileMerger.h"
#include "TDirectory.h"
#include "TUrl.h"
#include "TFile.h"
#include "TUUID.h"
#include "TSystem.h"
#include "TKey.h"
#include "THashList.h"
#include "TObjString.h"
#include "TClass.h"
#include "TMethodCall.h"
#include "Riostream.h"
#include "TFileMergeInfo.h"
#include "TClassRef.h"
#include "TROOT.h"
#include "TMemFile.h"
#ifdef WIN32
#include <stdio.h>
#else
#include <sys/time.h>
#include <sys/resource.h>
#endif
ClassImp(TFileMerger)
TClassRef R__TH1_Class("TH1");
TClassRef R__TTree_Class("TTree");
static const Int_t kCpProgress = BIT(14);
static const Int_t kCintFileNumber = 100;
static Int_t R__GetSystemMaxOpenedFiles()
{
int maxfiles;
#ifdef WIN32
maxfiles = _getmaxstdio();
#else
rlimit filelimit;
if (getrlimit(RLIMIT_NOFILE,&filelimit)==0) {
maxfiles = filelimit.rlim_cur;
} else {
maxfiles = 512;
}
#endif
if (maxfiles > kCintFileNumber) {
return maxfiles - kCintFileNumber;
} else if (maxfiles > 5) {
return maxfiles - 5;
} else {
return maxfiles;
}
}
TFileMerger::TFileMerger(Bool_t isLocal, Bool_t histoOneGo)
: fOutputFile(0), fFastMethod(kTRUE), fNoTrees(kFALSE), fExplicitCompLevel(kFALSE), fCompressionChange(kFALSE),
fPrintLevel(0), fMsgPrefix("TFileMerger"), fMaxOpenedFiles( R__GetSystemMaxOpenedFiles() ),
fLocal(isLocal), fHistoOneGo(histoOneGo), fObjectNames()
{
fFileList = new TList;
fMergeList = new TList;
fMergeList->SetOwner(kTRUE);
fExcessFiles = new TList;
fExcessFiles->SetOwner(kTRUE);
gROOT->GetListOfCleanups()->Add(this);
}
TFileMerger::~TFileMerger()
{
gROOT->GetListOfCleanups()->Remove(this);
SafeDelete(fFileList);
SafeDelete(fMergeList);
SafeDelete(fOutputFile);
SafeDelete(fExcessFiles);
}
void TFileMerger::Reset()
{
fFileList->Clear();
fMergeList->Clear();
fExcessFiles->Clear();
fObjectNames.Clear();
}
Bool_t TFileMerger::AddFile(const char *url, Bool_t cpProgress)
{
if (fPrintLevel > 0) {
Printf("%s Source file %d: %s",fMsgPrefix.Data(),fFileList->GetEntries()+fExcessFiles->GetEntries()+1,url);
}
TFile *newfile = 0;
TString localcopy;
if (fFileList->GetEntries() >= (fMaxOpenedFiles-1)) {
TObjString *urlObj = new TObjString(url);
fMergeList->Add(urlObj);
urlObj = new TObjString(url);
urlObj->SetBit(kCpProgress);
fExcessFiles->Add(urlObj);
return kTRUE;
}
TDirectory::TContext ctxt;
if (fLocal) {
TUUID uuid;
localcopy.Form("file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
if (!TFile::Cp(url, localcopy, cpProgress)) {
Error("AddFile", "cannot get a local copy of file %s", url);
return kFALSE;
}
newfile = TFile::Open(localcopy, "READ");
} else {
newfile = TFile::Open(url, "READ");
}
if (!newfile) {
if (fLocal)
Error("AddFile", "cannot open local copy %s of URL %s",
localcopy.Data(), url);
else
Error("AddFile", "cannot open file %s", url);
return kFALSE;
} else {
if (fOutputFile && fOutputFile->GetCompressionLevel() != newfile->GetCompressionLevel()) fCompressionChange = kTRUE;
newfile->SetBit(kCanDelete);
fFileList->Add(newfile);
TObjString *urlObj = new TObjString(url);
fMergeList->Add(urlObj);
return kTRUE;
}
}
Bool_t TFileMerger::AddFile(TFile *source, Bool_t cpProgress)
{
return AddFile(source,kFALSE,cpProgress);
}
Bool_t TFileMerger::AddAdoptFile(TFile *source, Bool_t cpProgress)
{
return AddFile(source,kTRUE,cpProgress);
}
Bool_t TFileMerger::AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
{
if (source == 0) {
return kFALSE;
}
if (fPrintLevel > 0) {
Printf("%s Source file %d: %s",fMsgPrefix.Data(),fFileList->GetEntries()+1,source->GetName());
}
TFile *newfile = 0;
TString localcopy;
TDirectory::TContext ctxt;
if (fLocal && !source->InheritsFrom(TMemFile::Class())) {
TUUID uuid;
localcopy.Form("file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
if (!source->Cp(localcopy, cpProgress)) {
Error("AddFile", "cannot get a local copy of file %s", source->GetName());
return kFALSE;
}
newfile = TFile::Open(localcopy, "READ");
} else {
newfile = source;
}
if (!newfile) {
if (fLocal)
Error("AddFile", "cannot open local copy %s of URL %s",
localcopy.Data(), source->GetName());
else
Error("AddFile", "cannot open file %s", source->GetName());
return kFALSE;
} else {
if (fOutputFile && fOutputFile->GetCompressionLevel() != newfile->GetCompressionLevel()) fCompressionChange = kTRUE;
if (own || newfile != source) {
newfile->SetBit(kCanDelete);
} else {
newfile->ResetBit(kCanDelete);
}
fFileList->Add(newfile);
if (!fMergeList) {
fMergeList = new TList;
}
TObjString *urlObj = new TObjString(source->GetName());
fMergeList->Add(urlObj);
if (newfile != source && own) {
delete source;
}
return kTRUE;
}
}
Bool_t TFileMerger::OutputFile(const char *outputfile, Bool_t force, Int_t compressionLevel)
{
return OutputFile(outputfile,(force?"RECREATE":"CREATE"),compressionLevel);
}
Bool_t TFileMerger::OutputFile(const char *outputfile, Bool_t force)
{
Bool_t res = OutputFile(outputfile,(force?"RECREATE":"CREATE"),1);
fExplicitCompLevel = kFALSE;
return res;
}
Bool_t TFileMerger::OutputFile(const char *outputfile, const char *mode, Int_t compressionLevel)
{
fExplicitCompLevel = kTRUE;
TFile *oldfile = fOutputFile;
fOutputFile = 0;
SafeDelete(oldfile);
fOutputFilename = outputfile;
TDirectory::TContext ctxt;
if (!(fOutputFile = TFile::Open(outputfile, mode, "", compressionLevel)) || fOutputFile->IsZombie()) {
Error("OutputFile", "cannot open the MERGER output file %s", fOutputFilename.Data());
return kFALSE;
}
return kTRUE;
}
Bool_t TFileMerger::OutputFile(const char *outputfile, const char *mode )
{
Bool_t res = OutputFile(outputfile,mode,1);
fExplicitCompLevel = kFALSE;
return res;
}
void TFileMerger::PrintFiles(Option_t *options)
{
fFileList->Print(options);
fExcessFiles->Print(options);
}
Bool_t TFileMerger::Merge(Bool_t)
{
return PartialMerge(kAll | kRegular);
}
Bool_t TFileMerger::MergeRecursive(TDirectory *target, TList *sourcelist, Int_t type )
{
Bool_t status = kTRUE;
Bool_t onlyListed = kFALSE;
if (fPrintLevel > 0) {
Printf("%s Target path: %s",fMsgPrefix.Data(),target->GetPath());
}
TString path(target->GetPath());
path.Remove(0, path.Last(':') + 2);
Int_t nguess = sourcelist->GetSize()+1000;
THashList allNames(nguess);
allNames.SetOwner(kTRUE);
if (type & kSkipListed) {
TObjArray *arr = fObjectNames.Tokenize(" ");
arr->SetOwner(kFALSE);
for (Int_t iname=0; iname<arr->GetEntriesFast(); iname++)
allNames.Add(arr->At(iname));
delete arr;
}
((THashList*)target->GetList())->Rehash(nguess);
((THashList*)target->GetListOfKeys())->Rehash(nguess);
TFileMergeInfo info(target);
if ((fFastMethod && !fCompressionChange)) {
info.fOptions.Append(" fast");
}
TFile *current_file;
TDirectory *current_sourcedir;
if (type & kIncremental) {
current_file = 0;
current_sourcedir = target;
} else {
current_file = (TFile*)sourcelist->First();
current_sourcedir = current_file->GetDirectory(path);
}
while (current_file || current_sourcedir) {
if (current_sourcedir && (current_file == 0 || current_sourcedir != target)) {
TIter nextkey( current_sourcedir->GetListOfKeys() );
TKey *key;
TString oldkeyname;
while ( (key = (TKey*)nextkey())) {
Bool_t alreadyseen = (oldkeyname == key->GetName()) ? kTRUE : kFALSE;
if (strcmp(key->GetClassName(),"TProcessID") == 0) { key->ReadObj(); continue;}
if (allNames.FindObject(key->GetName())) {
oldkeyname = key->GetName();
continue;
}
TClass *cl = TClass::GetClass(key->GetClassName());
if (!cl) {
Info("MergeRecursive", "cannot indentify object type (%s), name: %s title: %s",
key->GetClassName(), key->GetName(), key->GetTitle());
continue;
}
if (cl->GetMerge() || cl->InheritsFrom(TDirectory::Class()) ||
(cl->IsTObject() &&
(cl->GetMethodWithPrototype("Merge", "TCollection*,TFileMergeInfo*") ||
cl->GetMethodWithPrototype("Merge", "TCollection*"))))
allNames.Add(new TObjString(key->GetName()));
if (fNoTrees && cl->InheritsFrom(R__TTree_Class)) {
oldkeyname = key->GetName();
continue;
}
if (type & kOnlyListed) {
onlyListed = kFALSE;
oldkeyname = key->GetName();
oldkeyname += " ";
onlyListed = fObjectNames.Contains(oldkeyname);
oldkeyname = key->GetName();
if ((!onlyListed) && (!cl->InheritsFrom(TDirectory::Class()))) continue;
}
if (!(type&kResetable && type&kNonResetable)) {
if (!(type&kResetable)) {
if (cl->GetResetAfterMerge()) {
oldkeyname = key->GetName();
continue;
}
}
if (!(type&kNonResetable)) {
if (!cl->GetResetAfterMerge()) {
oldkeyname = key->GetName();
continue;
}
}
}
TObject *obj;
if (type & kIncremental) {
obj = current_sourcedir->GetList()->FindObject(key->GetName());
if (!obj) {
obj = key->ReadObj();
}
} else {
obj = key->ReadObj();
}
if (!obj) {
Info("MergeRecursive", "could not read object for key {%s, %s}",
key->GetName(), key->GetTitle());
continue;
}
if (cl->IsTObject() && cl != obj->IsA()) {
Error("MergeRecursive", "TKey and object retrieve disagree on type (%s vs %s). Continuing with %s.",
key->GetClassName(), obj->IsA()->GetName(), obj->IsA()->GetName());
cl = obj->IsA();
}
Bool_t canBeMerged = kTRUE;
if ( cl->InheritsFrom( TDirectory::Class() ) ) {
target->cd();
TDirectory *newdir;
if (type & kIncremental || alreadyseen) {
newdir = target->GetDirectory(obj->GetName());
if (!newdir) {
newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
}
} else {
newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
}
if (onlyListed) type &= ~kOnlyListed;
status = MergeRecursive(newdir, sourcelist, type);
if (onlyListed) type |= kOnlyListed;
if (!status) return status;
} else if (cl->GetMerge()) {
if (alreadyseen) continue;
TList inputs;
Bool_t oneGo = fHistoOneGo && cl->InheritsFrom(R__TH1_Class);
TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
if (nextsource == 0) {
ROOT::MergeFunc_t func = cl->GetMerge();
func(obj, &inputs, &info);
info.fIsFirst = kFALSE;
} else {
do {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *hobj = key2->ReadObj();
if (!hobj) {
Info("MergeRecursive", "could not read object for key {%s, %s}; skipping file %s",
key->GetName(), key->GetTitle(), nextsource->GetName());
nextsource = (TFile*)sourcelist->After(nextsource);
continue;
}
if (hobj->InheritsFrom(TCollection::Class())) {
((TCollection*)hobj)->SetOwner();
}
hobj->ResetBit(kMustCleanup);
inputs.Add(hobj);
if (!oneGo) {
ROOT::MergeFunc_t func = cl->GetMerge();
Long64_t result = func(obj, &inputs, &info);
info.fIsFirst = kFALSE;
if (result < 0) {
Error("MergeRecursive", "calling Merge() on '%s' with the corresponding object in '%s'",
obj->GetName(), nextsource->GetName());
}
inputs.Delete();
}
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
} while (nextsource);
if (oneGo || info.fIsFirst) {
ROOT::MergeFunc_t func = cl->GetMerge();
func(obj, &inputs, &info);
info.fIsFirst = kFALSE;
inputs.Delete();
}
}
} else if (cl->IsTObject() &&
cl->GetMethodWithPrototype("Merge", "TCollection*,TFileMergeInfo*") ) {
if (alreadyseen) continue;
TList listH;
TString listHargs;
listHargs.Form("(TCollection*)0x%lx,(TFileMergeInfo*)0x%lx", (ULong_t)&listH,(ULong_t)&info);
TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
if (nextsource == 0) {
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
info.fIsFirst = kFALSE;
if (error) {
Error("MergeRecursive", "calling Merge() on '%s' with the corresponding object in '%s'",
obj->GetName(), key->GetName());
}
} else {
while (nextsource) {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *hobj = key2->ReadObj();
if (!hobj) {
Info("MergeRecursive", "could not read object for key {%s, %s}; skipping file %s",
key->GetName(), key->GetTitle(), nextsource->GetName());
nextsource = (TFile*)sourcelist->After(nextsource);
continue;
}
if (hobj->InheritsFrom(TCollection::Class())) {
((TCollection*)hobj)->SetOwner();
}
hobj->ResetBit(kMustCleanup);
listH.Add(hobj);
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
info.fIsFirst = kFALSE;
if (error) {
Error("MergeRecursive", "calling Merge() on '%s' with the corresponding object in '%s'",
obj->GetName(), nextsource->GetName());
}
listH.Delete();
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
}
if (info.fIsFirst) {
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
info.fIsFirst = kFALSE;
listH.Delete();
}
}
} else if (cl->IsTObject() &&
cl->GetMethodWithPrototype("Merge", "TCollection*") ) {
if (alreadyseen) continue;
TList listH;
TString listHargs;
listHargs.Form("((TCollection*)0x%lx)", (ULong_t)&listH);
TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
if (nextsource == 0) {
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
if (error) {
Error("MergeRecursive", "calling Merge() on '%s' with the corresponding object in '%s'",
obj->GetName(), key->GetName());
}
} else {
while (nextsource) {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *hobj = key2->ReadObj();
if (!hobj) {
Info("MergeRecursive", "could not read object for key {%s, %s}; skipping file %s",
key->GetName(), key->GetTitle(), nextsource->GetName());
nextsource = (TFile*)sourcelist->After(nextsource);
continue;
}
if (hobj->InheritsFrom(TCollection::Class())) {
((TCollection*)hobj)->SetOwner();
}
hobj->ResetBit(kMustCleanup);
listH.Add(hobj);
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
info.fIsFirst = kFALSE;
if (error) {
Error("MergeRecursive", "calling Merge() on '%s' with the corresponding object in '%s'",
obj->GetName(), nextsource->GetName());
}
listH.Delete();
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
}
if (info.fIsFirst) {
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
info.fIsFirst = kFALSE;
listH.Delete();
}
}
} else {
canBeMerged = kFALSE;
}
target->cd();
oldkeyname = key->GetName();
if(cl->InheritsFrom( TDirectory::Class() )) {
if (!(type&kIncremental) || dynamic_cast<TDirectory*>(obj)->GetFile() != target) {
delete obj;
}
} else if (cl->InheritsFrom( TCollection::Class() )) {
if ( obj->Write( oldkeyname, canBeMerged ? TObject::kSingleKey | TObject::kOverwrite : TObject::kSingleKey) <= 0 ) {
status = kFALSE;
}
((TCollection*)obj)->SetOwner();
delete obj;
} else {
if (cl->IsTObject()) {
if ( obj->Write( oldkeyname, canBeMerged ? TObject::kOverwrite : 0) <= 0) {
status = kFALSE;
}
} else {
if ( target->WriteObjectAny( (void*)obj, cl, oldkeyname, canBeMerged ? "OverWrite" : "" ) <= 0) {
status = kFALSE;
}
}
cl->Destructor(obj);
}
info.Reset();
}
}
current_file = current_file ? (TFile*)sourcelist->After(current_file) : (TFile*)sourcelist->First();
if (current_file) {
current_sourcedir = current_file->GetDirectory(path);
} else {
current_sourcedir = 0;
}
}
if (!(type&kIncremental)) {
target->SaveSelf(kTRUE);
}
return status;
}
Bool_t TFileMerger::PartialMerge(Int_t in_type)
{
if (!fOutputFile) {
TString outf(fOutputFilename);
if (outf.IsNull()) {
outf.Form("file:%s/FileMerger.root", gSystem->TempDirectory());
Info("PartialMerge", "will merge the results to the file %s\n"
"since you didn't specify a merge filename",
TUrl(outf).GetFile());
}
if (!OutputFile(outf.Data())) {
return kFALSE;
}
}
if ((fFileList->GetEntries() == 1) && !fExcessFiles->GetEntries() &&
!(in_type & kIncremental) && !fCompressionChange && !fExplicitCompLevel) {
fOutputFile->Close();
SafeDelete(fOutputFile);
TFile *file = (TFile *) fFileList->First();
if (!file || (file && file->IsZombie())) {
Error("PartialMerge", "one-file case: problem attaching to file");
return kFALSE;
}
Bool_t result = kTRUE;
if (!(result = file->Cp(fOutputFilename))) {
Error("PartialMerge", "one-file case: could not copy '%s' to '%s'",
file->GetPath(), fOutputFilename.Data());
return kFALSE;
}
if (file->TestBit(kCanDelete)) file->Close();
if (fLocal) {
TUrl u(file->GetPath(), kTRUE);
if (gSystem->Unlink(u.GetFile()) != 0)
Warning("PartialMerge", "problems removing temporary local file '%s'", u.GetFile());
}
fFileList->Clear();
return result;
}
fOutputFile->SetBit(kMustCleanup);
TDirectory::TContext ctxt;
Bool_t result = kTRUE;
Int_t type = in_type;
while (result && fFileList->GetEntries()>0) {
result = MergeRecursive(fOutputFile, fFileList, type);
TIter next(fFileList);
TFile *file;
while ((file = (TFile*) next())) {
if (file->TestBit(kCanDelete)) file->Close();
if(fLocal) {
TString p(file->GetPath());
p = p(0, p.Index(':',0));
gSystem->Unlink(p);
}
}
fFileList->Clear();
if (fExcessFiles->GetEntries() > 0) {
type = type | kIncremental;
OpenExcessFiles();
}
}
if (!result) {
Error("Merge", "error during merge of your ROOT files");
} else {
if (in_type & kIncremental) {
fOutputFile->Write("",TObject::kOverwrite);
} else {
fOutputFile->Close();
}
}
if (in_type & kIncremental) {
Clear();
} else {
fOutputFile->ResetBit(kMustCleanup);
SafeDelete(fOutputFile);
}
return result;
}
Bool_t TFileMerger::OpenExcessFiles()
{
if (fPrintLevel > 0) {
Printf("%s Opening the next %d files",fMsgPrefix.Data(),TMath::Min(fExcessFiles->GetEntries(),(fMaxOpenedFiles-1)));
}
Int_t nfiles = 0;
TIter next(fExcessFiles);
TObjString *url = 0;
TString localcopy;
TDirectory::TContext ctxt;
while( nfiles < (fMaxOpenedFiles-1) && ( url = (TObjString*)next() ) ) {
TFile *newfile = 0;
if (fLocal) {
TUUID uuid;
localcopy.Form("file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
if (!TFile::Cp(url->GetName(), localcopy, url->TestBit(kCpProgress))) {
Error("OpenExcessFiles", "cannot get a local copy of file %s", url->GetName());
return kFALSE;
}
newfile = TFile::Open(localcopy, "READ");
} else {
newfile = TFile::Open(url->GetName(), "READ");
}
if (!newfile) {
if (fLocal)
Error("OpenExcessFiles", "cannot open local copy %s of URL %s",
localcopy.Data(), url->GetName());
else
Error("OpenExcessFiles", "cannot open file %s", url->GetName());
return kFALSE;
} else {
if (fOutputFile && fOutputFile->GetCompressionLevel() != newfile->GetCompressionLevel()) fCompressionChange = kTRUE;
newfile->SetBit(kCanDelete);
fFileList->Add(newfile);
++nfiles;
fExcessFiles->Remove(url);
}
}
return kTRUE;
}
void TFileMerger::RecursiveRemove(TObject *obj)
{
if (obj == fOutputFile) {
Fatal("RecursiveRemove","Output file of the TFile Merger (targeting %s) has been deleted (likely due to a TTree larger than 100Gb)", fOutputFilename.Data());
}
}
void TFileMerger::SetMaxOpenedFiles(Int_t newmax)
{
Int_t sysmax = R__GetSystemMaxOpenedFiles();
if (newmax < sysmax) {
fMaxOpenedFiles = newmax;
} else {
fMaxOpenedFiles = sysmax;
}
if (fMaxOpenedFiles < 2) {
fMaxOpenedFiles = 2;
}
}
void TFileMerger::SetMsgPrefix(const char *prefix)
{
fMsgPrefix = prefix;
}