ROOT  6.06/09
Reference Guide
TThreadPool.h
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Anar Manafov 20/09/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadPool
13 #define ROOT_TThreadPool
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TThreadPool //
18 // //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 // ROOT
23 #ifndef ROOT_TObject
24 #include "TObject.h"
25 #endif
26 #ifndef ROOT_TMutex
27 #include "TMutex.h"
28 #endif
29 #ifndef ROOT_TCondition
30 #include "TCondition.h"
31 #endif
32 // STD
33 #include <queue>
34 #include <vector>
35 #include <iostream>
36 #include <sstream>
37 
38 
39 //////////////////////////////////////////////////////////////////////////
40 // //
41 // TNonCopyable //
42 // Class which makes child to be non-copyable object. //
43 // //
44 //////////////////////////////////////////////////////////////////////////
45 class TNonCopyable {
46 protected:
49 private:
50  TNonCopyable(const TNonCopyable&);
51  const TNonCopyable& operator=(const TNonCopyable&);
52 };
53 
54 //////////////////////////////////////////////////////////////////////////
55 // //
56 // TThreadPoolTaskImp //
57 // A base class for thread pool tasks. Users must inherit their //
58 // tasks classes from it. //
59 // Example: //
60 // class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
61 // //
62 // in this example, //
63 // TTestTask - is a user class, which implements //
64 // thread pool task object. //
65 // int - is a type of argument to TTestTask::run method. //
66 // //
67 // Please see the tutorial "tutorials/thread/threadPool.C" for //
68 // more details on how to use TThreadPool. //
69 // //
70 //////////////////////////////////////////////////////////////////////////
71 template <class aTask, class aParam>
73 public:
74  bool run(aParam &param) {
75  aTask *pThis = reinterpret_cast<aTask *>(this);
76  return pThis->runTask(param);
77  }
78 };
79 
80 //////////////////////////////////////////////////////////////////////////
81 // //
82 // TThreadPoolTask //
83 // This is a supporting class for TThreadPool. //
84 // It wraps users task objects in order to pass tasks arguments in //
85 // type-safe way. //
86 // //
87 //////////////////////////////////////////////////////////////////////////
88 template <class aTask, class aParam>
90 public:
92 
93 public:
94  TThreadPoolTask(task_t &task, aParam &param):
95  fTask(task),
96  fTaskParam(param) {
97  }
98  bool run() {
99  return fTask.run(fTaskParam);
100  }
101 
102 private:
103  task_t &fTask;
104  aParam fTaskParam;
105 };
106 
107 //////////////////////////////////////////////////////////////////////////
108 // //
109 // TThreadPool //
110 // This class implement a simple Thread Pool pattern. //
111 // So far it supports only one type of queue - FIFO //
112 // //
113 // Please see the tutorial "tutorials/thread/threadPool.C" for //
114 // more details on how to use TThreadPool. //
115 // //
116 //////////////////////////////////////////////////////////////////////////
117 template <class aTask, class aParam>
118 class TThreadPool : public TNonCopyable {
119 
121  typedef std::queue<task_t*> taskqueue_t;
122  typedef std::vector<TThread*> threads_array_t;
123 
124 public:
125  TThreadPool(size_t threadsCount, bool needDbg = false):
126  fStopped(false),
127  fSuccessfulTasks(0),
128  fTasksCount(0),
129  fIdleThreads(threadsCount),
130  fSilent(!needDbg) {
134 
135  for (size_t i = 0; i < threadsCount; ++i) {
136  TThread *pThread = new TThread(&TThreadPool::Executor, this);
137  fThreads.push_back(pThread);
138  pThread->Run();
139  }
140 
142 
143  if (needDbg) {
145  fThreadMonitor->Run();
146  }
147  }
148 
150  Stop();
151  // deleting threads
152  threads_array_t::const_iterator iter = fThreads.begin();
153  threads_array_t::const_iterator iter_end = fThreads.end();
154  for (; iter != iter_end; ++iter)
155  delete(*iter);
156 
157  delete fThreadJoinHelper;
158 
159  delete fThreadNeeded;
160  delete fThreadAvailable;
161  delete fAllTasksDone;
162  }
163 
164  void AddThread() {
165  TLockGuard lock(&fMutex);
166  TThread *pThread = new TThread(&TThreadPool::Executor, this);
167  fThreads.push_back(pThread);
168  pThread->Run();
169  ++fIdleThreads;
170  }
171 
172  void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
173  {
174  DbgLog("Main thread. Try to push a task");
175 
176  TLockGuard lock(&fMutex);
177  task_t *t = new task_t(task, param);
178  fTasks.push(t);
179  ++fTasksCount;
180 
181  DbgLog("Main thread. the task is pushed");
182  }
183  TLockGuard lock(&fMutex);
185  }
186 
187  void Stop(bool processRemainingJobs = false) {
188  // prevent more jobs from being added to the queue
189  if (fStopped)
190  return;
191 
192  if (processRemainingJobs) {
193  TLockGuard lock(&fMutex);
194  // wait for queue to drain
195  while (!fTasks.empty() && !fStopped) {
196  DbgLog("Main thread is waiting");
198  DbgLog("Main thread is DONE waiting");
199  }
200  }
201  // tell all threads to stop
202  {
203  TLockGuard lock(&fMutex);
204  fStopped = true;
206  DbgLog("Main threads requests to STOP");
207  }
208 
209  // Waiting for all threads to complete
212  }
213 
214  void Drain() {
215  // This method stops the calling thread until the task queue is empty
216 
218  fAllTasksDone->Wait();
219  }
220 
221  size_t TasksCount() const {
222  return fTasksCount;
223  }
224 
225  size_t SuccessfulTasks() const {
226  return fSuccessfulTasks;
227  }
228 
229  size_t IdleThreads() const {
230  return fIdleThreads;
231  }
232 
233 private:
234  static void* Monitor(void *arg) {
235  if (NULL == arg)
236  return NULL;
237 
238  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
239  while (true && !pThis->fStopped) {
240  std::stringstream ss;
241  ss
242  << ">>>> Check for tasks."
243  << " Number of Tasks: " << pThis->fTasks.size()
244  << "; Idle threads: " << pThis->IdleThreads();
245  pThis->DbgLog(ss.str());
246  sleep(1);
247  }
248  return NULL;
249  }
250 
251  static void* Executor(void *arg) {
252  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
253 
254  while (!pThis->fStopped) {
255  task_t *task(NULL);
256 
257  // There is a task, let's take it
258  {
259  // Find a task to perform
260  TLockGuard lock(&pThis->fMutex);
261  if (pThis->fTasks.empty() && !pThis->fStopped) {
262  pThis->DbgLog("waiting for a task");
263 
264  if (pThis->fThreads.size() == pThis->fIdleThreads) {
266  pThis->fAllTasksDone->Broadcast();
267  }
268 
269  // No tasks, we wait for a task to come
270  pThis->fThreadNeeded->Wait();
271 
272  pThis->DbgLog("done waiting for tasks");
273  }
274  }
275 
276  {
277  TLockGuard lock(&pThis->fMutex);
278  if (!pThis->fTasks.empty()) {
279  --pThis->fIdleThreads;
280  task = pThis->fTasks.front();
281  pThis->fTasks.pop();
282 
283  pThis->DbgLog("get the task");
284  } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
286  pThis->fAllTasksDone->Broadcast();
287  }
288  pThis->DbgLog("done Check <<<<");
289  }
290 
291  // Execute the task
292  if (task) {
293  pThis->DbgLog("Run the task");
294 
295  if (task->run()) {
296  TLockGuard lock(&pThis->fMutex);
297  ++pThis->fSuccessfulTasks;
298  }
299  delete task;
300  task = NULL;
301 
302  TLockGuard lock(&pThis->fMutex);
303  ++pThis->fIdleThreads;
304 
305  pThis->DbgLog("Done Running the task");
306  }
307  // Task is done, report that the thread is free
308  TLockGuard lock(&pThis->fMutex);
309  pThis->fThreadAvailable->Broadcast();
310  }
311 
312  pThis->DbgLog("**** DONE ***");
313  return NULL;
314  }
315 
316  static void *JoinHelper(void *arg) {
317  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
318  threads_array_t::const_iterator iter = pThis->fThreads.begin();
319  threads_array_t::const_iterator iter_end = pThis->fThreads.end();
320  for (; iter != iter_end; ++iter)
321  (*iter)->Join();
322 
323  return NULL;
324  }
325 
326  static bool IsThreadActive(TThread *pThread) {
327  // so far we consider only kRunningState as activity
328  return (pThread->GetState() == TThread::kRunningState);
329  }
330 
331  void DbgLog(const std::string &msg) {
332  if (fSilent)
333  return;
335  std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
336  }
337 
338 private:
339  taskqueue_t fTasks;
345  threads_array_t fThreads;
348  volatile bool fStopped;
350  size_t fTasksCount;
351  size_t fIdleThreads;
353  bool fSilent; // No DBG messages
354 };
355 
356 #endif
Definition: TMutex.h:37
EState GetState() const
Definition: TThread.h:138
TCondition * fThreadNeeded
Definition: TThreadPool.h:341
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:125
void Drain()
Definition: TThreadPool.h:214
TCondition * fThreadAvailable
Definition: TThreadPool.h:342
bool run(aParam &param)
Definition: TThreadPool.h:74
TCondition * fAllTasksDone
Definition: TThreadPool.h:344
static void * Executor(void *arg)
Definition: TThreadPool.h:251
task_t & fTask
Definition: TThreadPool.h:103
ClassImp(TIterator) Bool_t TIterator return false
Compare two iterator objects.
Definition: TIterator.cxx:20
void AddThread()
Definition: TThreadPool.h:164
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:120
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:91
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:537
Int_t Broadcast()
Definition: TCondition.h:58
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:551
TMutex fDbgOutputMutex
Definition: TThreadPool.h:352
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:74
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:316
TThread * fThreadMonitor
Definition: TThreadPool.h:347
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:172
TMutex fMutex
Definition: TThreadPool.h:340
const TNonCopyable & operator=(const TNonCopyable &)
threads_array_t fThreads
Definition: TThreadPool.h:345
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:326
size_t fTasksCount
Definition: TThreadPool.h:350
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:122
TLine * l
Definition: textangle.C:4
size_t SuccessfulTasks() const
Definition: TThreadPool.h:225
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:498
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:121
volatile bool fStopped
Definition: TThreadPool.h:348
taskqueue_t fTasks
Definition: TThreadPool.h:339
size_t IdleThreads() const
Definition: TThreadPool.h:229
static void * Monitor(void *arg)
Definition: TThreadPool.h:234
size_t fSuccessfulTasks
Definition: TThreadPool.h:349
TThread * fThreadJoinHelper
Definition: TThreadPool.h:346
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:343
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:331
R__EXTERN C unsigned int sleep(unsigned int seconds)
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:187
size_t TasksCount() const
Definition: TThreadPool.h:221
#define NULL
Definition: Rtypes.h:82
size_t fIdleThreads
Definition: TThreadPool.h:351
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:94