ROOT
Version v6.32
master
v6.34
v6.30
v6.28
v6.26
v6.24
v6.22
v6.20
v6.18
v6.16
v6.14
v6.12
v6.10
v6.08
v6.06
Reference Guide
▼
ROOT
ROOT Reference Documentation
Tutorials
►
Functional Parts
►
Namespaces
►
All Classes
▼
Files
▼
File List
►
bindings
►
core
►
documentation
►
geom
►
graf2d
►
graf3d
►
gui
►
hist
►
html
►
io
►
main
►
math
►
montecarlo
►
net
►
proof
►
roofit
►
sql
►
tmva
►
tree
▼
tutorials
►
cocoa
►
cont
▼
dataframe
df000_simple.C
df000_simple.py
df001_introduction.C
df001_introduction.py
df002_dataModel.C
df002_dataModel.py
df003_profiles.C
df003_profiles.py
df004_cutFlowReport.C
df004_cutFlowReport.py
df005_fillAnyObject.C
df006_ranges.C
df006_ranges.py
df007_snapshot.C
df007_snapshot.py
df008_createDataSetFromScratch.C
df008_createDataSetFromScratch.py
df009_FromScratchVSTTree.C
df010_trivialDataSource.C
df010_trivialDataSource.py
df012_DefinesAndFiltersAsStrings.C
df012_DefinesAndFiltersAsStrings.py
df013_InspectAnalysis.C
df014_CSVDataSource.C
df014_CSVDataSource.py
df015_LazyDataSource.C
df016_vecOps.C
df016_vecOps.py
df017_vecOpsHEP.C
df017_vecOpsHEP.py
df018_customActions.C
df019_Cache.C
df019_Cache.py
df020_helpers.C
df021_createTGraph.C
df021_createTGraph.py
df022_useKahan.C
df023_aggregate.C
df024_Display.C
df024_Display.py
df025_RNode.C
df026_AsNumpyArrays.py
df027_SQliteDependencyOverVersion.C
df028_SQliteIPLocation.C
df029_SQlitePlatformDistribution.C
df030_SQliteVersionsOfROOT.C
df031_Stats.C
df031_Stats.py
df032_RDFFromNumpy.py
df033_Describe.py
df034_SaveGraph.C
df034_SaveGraph.py
df035_RDFFromPandas.py
df101_h1Analysis.C
df102_NanoAODDimuonAnalysis.C
df102_NanoAODDimuonAnalysis.py
df103_NanoAODHiggsAnalysis.C
df103_NanoAODHiggsAnalysis.py
df103_NanoAODHiggsAnalysis_python.h
df104_HiggsToTwoPhotons.py
df105_WBosonAnalysis.py
df106_HiggsToFourLeptons.C
df106_HiggsToFourLeptons.py
df107_SingleTopAnalysis.py
distrdf001_spark_connection.py
distrdf002_dask_connection.py
distrdf003_live_visualization.py
distrdf004_dask_lxbatch.py
►
eve
►
eve7
►
fft
►
fit
►
fitsio
►
foam
►
geom
►
gl
►
graphics
►
graphs
►
gui
►
hist
►
histfactory
►
html
►
http
►
image
►
io
►
legacy
►
math
►
matrix
►
mc
►
multicore
►
net
►
physics
►
proof
►
pyroot
►
pythia
►
quadp
►
r
►
rcanvas
►
roofit
►
roostats
►
spectrum
►
splot
►
sql
►
tmva
►
tree
►
unfold
►
unuran
►
v7
►
vecops
►
webgui
►
xml
.rootlogon.py
demos.C
demoshelp.C
hsimple.C
rootlogoff.C
rootlogon.C
►
v6-32-00-patches
►
File Members
Release Notes
•
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Properties
Friends
Macros
Modules
Pages
Loading...
Searching...
No Matches
distrdf004_dask_lxbatch.py
Go to the documentation of this file.
1
# \file
2
# \ingroup tutorial_dataframe
3
#
4
# Configure a Dask connection to a HTCondor cluster hosted by the CERN batch
5
# service. To reproduce this tutorial, run the following steps:
6
#
7
# 1. Login to lxplus
8
# 2. Source an LCG release (minimum LCG104). See
9
# https://lcgdocs.web.cern.ch/lcgdocs/lcgreleases/introduction/ for details
10
# 3. Install the `dask_lxplus` package, which provides the `CernCluster` class
11
# needed to properly connect to the CERN condor pools. See
12
# https://batchdocs.web.cern.ch/specialpayload/dask.html for instructions
13
# 4. Run this tutorial
14
#
15
# The tutorial defines resources that each job will request to the condor
16
# scheduler, then creates a Dask client that can be used by RDataFrame to
17
# distribute computations.
18
#
19
# \macro_code
20
#
21
# \date September 2023
22
# \author Vincenzo Eduardo Padulano CERN
23
from
datetime
import
datetime
24
import
socket
25
import
time
26
27
from
dask.distributed
import
Client
28
from
dask_lxplus
import
CernCluster
29
30
import
ROOT
31
RDataFrame =
ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
32
33
34
def
create_connection
() -> Client:
35
"""
36
Creates a connection to HTCondor cluster offered by the CERN batch service.
37
Returns a Dask client that RDataFrame will use to distribute computations.
38
"""
39
# The resources described in the specified arguments to this class represent
40
# the submission of a single job and will spawn a single Dask worker when
41
# the condor scheduler launches the job. Specifically, this example has Dask
42
# workers each with 1 core and 2 GB of memory.
43
cluster =
CernCluster
(
44
cores=1,
45
memory=
'2000MB'
,
46
disk=
'1GB'
,
47
death_timeout=
'60'
,
48
lcg=
True
,
49
nanny=
True
,
50
container_runtime=
'none'
,
51
scheduler_options={
52
'port'
: 8786,
53
'host'
:
socket.gethostname
(),
54
},
55
job_extra={
56
'MY.JobFlavour'
:
'"espresso"'
,
57
},
58
)
59
60
# The scale method allows to launch N jobs with the description above (thus
61
# N Dask workers). Calling this method on the cluster object launches the
62
# condor jobs (i.e. it is equivalent to `condor_submit myjob.sub`). In this
63
# example, two jobs are requested so two Dask workers will be eventually
64
# launched for a total of 2 cores.
65
n_workers = 2
66
cluster.scale
(n_workers)
67
68
# The Dask client can be created after the condor jobs have been submitted.
69
# At this point, the jobs may or may not have actually started. Thus, it is
70
# not guaranteed that the application already has the requested resources
71
# available.
72
client = Client(cluster)
73
74
# It is possible to tell the Dask client to wait until the condor scheduler
75
# has started the requested jobs and launched the Dask workers.
76
# The client will wait until 'n_workers' workers have been launched. In this
77
# example, the client waits for all the jobs requested to start before
78
# continuing with the application.
79
print(f
"Waiting for {n_workers} workers to start."
)
80
start =
time.time
()
81
client.wait_for_workers
(n_workers)
82
end =
time.time
()
83
print(f
"All workers are ready, took {round(end - start, 2)} seconds."
)
84
85
return
client
86
87
88
def
run_analysis
(connection: Client) ->
None
:
89
"""
90
Run a simple example with RDataFrame, using the previously created
91
connection to the HTCondor cluster.
92
"""
93
df = RDataFrame(10_000, daskclient=connection).Define(
94
"x"
,
"gRandom->Rndm() * 100"
)
95
96
nentries =
df.Count
()
97
meanv =
df.Mean
(
"x"
)
98
maxv =
df.Max
(
"x"
)
99
minv =
df.Min
(
"x"
)
100
101
print(f
"Dataset has {nentries.GetValue()} entries"
)
102
print(
"Column x stats:"
)
103
print(f
"\tmean: {meanv.GetValue()}"
)
104
print(f
"\tmax: {maxv.GetValue()}"
)
105
print(f
"\tmin: {minv.GetValue()}"
)
106
107
108
if
__name__ ==
"__main__"
:
109
connection =
create_connection
()
110
print(f
"Starting the computations at {datetime.now()}"
)
111
start =
time.time
()
112
run_analysis
(connection)
113
end =
time.time
()
114
print(f
"Computations ended at {datetime.now()}, "
115
f
"took {round(end - start, 2)} seconds."
)
TRangeDynCast
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Definition
TCollection.h:358
ROOT::Detail::TRangeCast
Definition
TCollection.h:311
tutorials
dataframe
distrdf004_dask_lxbatch.py
ROOT v6-32 - Reference Guide Generated on Sun Mar 16 2025 14:03:36 (GVA Time) using Doxygen 1.10.0