Logo ROOT   6.08/07
Reference Guide
TThreadPool.h
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Anar Manafov 20/09/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadPool
13 #define ROOT_TThreadPool
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TThreadPool //
18 // //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 // ROOT
23 #ifndef ROOT_TObject
24 #include "TObject.h"
25 #endif
26 #ifndef ROOT_TMutex
27 #include "TMutex.h"
28 #endif
29 #ifndef ROOT_TCondition
30 #include "TCondition.h"
31 #endif
32 #include "TThread.h"
33 // STD
34 #include <queue>
35 #include <vector>
36 #include <iostream>
37 #include <sstream>
38 
39 
40 //////////////////////////////////////////////////////////////////////////
41 // //
42 // TNonCopyable //
43 // Class which makes child to be non-copyable object. //
44 // //
45 //////////////////////////////////////////////////////////////////////////
46 class TNonCopyable {
47 protected:
50 private:
51  TNonCopyable(const TNonCopyable&);
52  const TNonCopyable& operator=(const TNonCopyable&);
53 };
54 
55 //////////////////////////////////////////////////////////////////////////
56 // //
57 // TThreadPoolTaskImp //
58 // A base class for thread pool tasks. Users must inherit their //
59 // tasks classes from it. //
60 // Example: //
61 // class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
62 // //
63 // in this example, //
64 // TTestTask - is a user class, which implements //
65 // thread pool task object. //
66 // int - is a type of argument to TTestTask::run method. //
67 // //
68 // Please see the tutorial "tutorials/thread/threadPool.C" for //
69 // more details on how to use TThreadPool. //
70 // //
71 //////////////////////////////////////////////////////////////////////////
72 template <class aTask, class aParam>
74 public:
75  bool run(aParam &param) {
76  aTask *pThis = reinterpret_cast<aTask *>(this);
77  return pThis->runTask(param);
78  }
79 };
80 
81 //////////////////////////////////////////////////////////////////////////
82 // //
83 // TThreadPoolTask //
84 // This is a supporting class for TThreadPool. //
85 // It wraps users task objects in order to pass tasks arguments in //
86 // type-safe way. //
87 // //
88 //////////////////////////////////////////////////////////////////////////
89 template <class aTask, class aParam>
91 public:
93 
94 public:
95  TThreadPoolTask(task_t &task, aParam &param):
96  fTask(task),
97  fTaskParam(param) {
98  }
99  bool run() {
100  return fTask.run(fTaskParam);
101  }
102 
103 private:
104  task_t &fTask;
105  aParam fTaskParam;
106 };
107 
108 //////////////////////////////////////////////////////////////////////////
109 // //
110 // TThreadPool //
111 // This class implement a simple Thread Pool pattern. //
112 // So far it supports only one type of queue - FIFO //
113 // //
114 // Please see the tutorial "tutorials/thread/threadPool.C" for //
115 // more details on how to use TThreadPool. //
116 // //
117 //////////////////////////////////////////////////////////////////////////
118 template <class aTask, class aParam>
119 class TThreadPool : public TNonCopyable {
120 
122  typedef std::queue<task_t*> taskqueue_t;
123  typedef std::vector<TThread*> threads_array_t;
124 
125 public:
126  TThreadPool(size_t threadsCount, bool needDbg = false):
127  fStopped(false),
128  fSuccessfulTasks(0),
129  fTasksCount(0),
130  fIdleThreads(threadsCount),
131  fSilent(!needDbg) {
132  fThreadNeeded = new TCondition(&fMutex);
133  fThreadAvailable = new TCondition(&fMutex);
134  fAllTasksDone = new TCondition(&fMutexAllTasksDone);
135 
136  for (size_t i = 0; i < threadsCount; ++i) {
137  TThread *pThread = new TThread(&TThreadPool::Executor, this);
138  fThreads.push_back(pThread);
139  pThread->Run();
140  }
141 
142  fThreadJoinHelper = new TThread(&TThreadPool::JoinHelper, this);
143 
144  if (needDbg) {
145  fThreadMonitor = new TThread(&TThreadPool::Monitor, this);
146  fThreadMonitor->Run();
147  }
148  }
149 
151  Stop();
152  // deleting threads
153  threads_array_t::const_iterator iter = fThreads.begin();
154  threads_array_t::const_iterator iter_end = fThreads.end();
155  for (; iter != iter_end; ++iter)
156  delete(*iter);
157 
158  delete fThreadJoinHelper;
159 
160  delete fThreadNeeded;
161  delete fThreadAvailable;
162  delete fAllTasksDone;
163  }
164 
165  void AddThread() {
166  TLockGuard lock(&fMutex);
167  TThread *pThread = new TThread(&TThreadPool::Executor, this);
168  fThreads.push_back(pThread);
169  pThread->Run();
170  ++fIdleThreads;
171  }
172 
173  void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
174  {
175  DbgLog("Main thread. Try to push a task");
176 
177  TLockGuard lock(&fMutex);
178  task_t *t = new task_t(task, param);
179  fTasks.push(t);
180  ++fTasksCount;
181 
182  DbgLog("Main thread. the task is pushed");
183  }
184  TLockGuard lock(&fMutex);
185  fThreadNeeded->Broadcast();
186  }
187 
188  void Stop(bool processRemainingJobs = false) {
189  // prevent more jobs from being added to the queue
190  if (fStopped)
191  return;
192 
193  if (processRemainingJobs) {
194  TLockGuard lock(&fMutex);
195  // wait for queue to drain
196  while (!fTasks.empty() && !fStopped) {
197  DbgLog("Main thread is waiting");
198  fThreadAvailable->Wait();
199  DbgLog("Main thread is DONE waiting");
200  }
201  }
202  // tell all threads to stop
203  {
204  TLockGuard lock(&fMutex);
205  fStopped = true;
206  fThreadNeeded->Broadcast();
207  DbgLog("Main threads requests to STOP");
208  }
209 
210  // Waiting for all threads to complete
211  fThreadJoinHelper->Run();
212  fThreadJoinHelper->Join();
213  }
214 
215  void Drain() {
216  // This method stops the calling thread until the task queue is empty
217 
218  TLockGuard lock(&fMutexAllTasksDone);
219  fAllTasksDone->Wait();
220  }
221 
222  size_t TasksCount() const {
223  return fTasksCount;
224  }
225 
226  size_t SuccessfulTasks() const {
227  return fSuccessfulTasks;
228  }
229 
230  size_t IdleThreads() const {
231  return fIdleThreads;
232  }
233 
234 private:
235  static void* Monitor(void *arg) {
236  if (NULL == arg)
237  return NULL;
238 
239  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
240  while (true && !pThis->fStopped) {
241  std::stringstream ss;
242  ss
243  << ">>>> Check for tasks."
244  << " Number of Tasks: " << pThis->fTasks.size()
245  << "; Idle threads: " << pThis->IdleThreads();
246  pThis->DbgLog(ss.str());
247  sleep(1);
248  }
249  return NULL;
250  }
251 
252  static void* Executor(void *arg) {
253  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
254 
255  while (!pThis->fStopped) {
256  task_t *task(NULL);
257 
258  // There is a task, let's take it
259  {
260  // Find a task to perform
261  TLockGuard lock(&pThis->fMutex);
262  if (pThis->fTasks.empty() && !pThis->fStopped) {
263  pThis->DbgLog("waiting for a task");
264 
265  if (pThis->fThreads.size() == pThis->fIdleThreads) {
267  pThis->fAllTasksDone->Broadcast();
268  }
269 
270  // No tasks, we wait for a task to come
271  pThis->fThreadNeeded->Wait();
272 
273  pThis->DbgLog("done waiting for tasks");
274  }
275  }
276 
277  {
278  TLockGuard lock(&pThis->fMutex);
279  if (!pThis->fTasks.empty()) {
280  --pThis->fIdleThreads;
281  task = pThis->fTasks.front();
282  pThis->fTasks.pop();
283 
284  pThis->DbgLog("get the task");
285  } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
287  pThis->fAllTasksDone->Broadcast();
288  }
289  pThis->DbgLog("done Check <<<<");
290  }
291 
292  // Execute the task
293  if (task) {
294  pThis->DbgLog("Run the task");
295 
296  if (task->run()) {
297  TLockGuard lock(&pThis->fMutex);
298  ++pThis->fSuccessfulTasks;
299  }
300  delete task;
301  task = NULL;
302 
303  TLockGuard lock(&pThis->fMutex);
304  ++pThis->fIdleThreads;
305 
306  pThis->DbgLog("Done Running the task");
307  }
308  // Task is done, report that the thread is free
309  TLockGuard lock(&pThis->fMutex);
310  pThis->fThreadAvailable->Broadcast();
311  }
312 
313  pThis->DbgLog("**** DONE ***");
314  return NULL;
315  }
316 
317  static void *JoinHelper(void *arg) {
318  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
319  threads_array_t::const_iterator iter = pThis->fThreads.begin();
320  threads_array_t::const_iterator iter_end = pThis->fThreads.end();
321  for (; iter != iter_end; ++iter)
322  (*iter)->Join();
323 
324  return NULL;
325  }
326 
327  static bool IsThreadActive(TThread *pThread) {
328  // so far we consider only kRunningState as activity
329  return (pThread->GetState() == TThread::kRunningState);
330  }
331 
332  void DbgLog(const std::string &msg) {
333  if (fSilent)
334  return;
335  TLockGuard lock(&fDbgOutputMutex);
336  std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
337  }
338 
339 private:
340  taskqueue_t fTasks;
346  threads_array_t fThreads;
349  volatile bool fStopped;
351  size_t fTasksCount;
352  size_t fIdleThreads;
354  bool fSilent; // No DBG messages
355 };
356 
357 #endif
Definition: TMutex.h:34
TCondition * fThreadNeeded
Definition: TThreadPool.h:342
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:126
EState GetState() const
Definition: TThread.h:136
void Drain()
Definition: TThreadPool.h:215
TCondition * fThreadAvailable
Definition: TThreadPool.h:343
bool run(aParam &param)
Definition: TThreadPool.h:75
TCondition * fAllTasksDone
Definition: TThreadPool.h:345
static void * Executor(void *arg)
Definition: TThreadPool.h:252
task_t & fTask
Definition: TThreadPool.h:104
void AddThread()
Definition: TThreadPool.h:165
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:121
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:92
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:538
Int_t Broadcast()
Definition: TCondition.h:58
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:552
TMutex fDbgOutputMutex
Definition: TThreadPool.h:353
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:75
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:317
TThread * fThreadMonitor
Definition: TThreadPool.h:348
size_t TasksCount() const
Definition: TThreadPool.h:222
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:173
TMutex fMutex
Definition: TThreadPool.h:341
const TNonCopyable & operator=(const TNonCopyable &)
threads_array_t fThreads
Definition: TThreadPool.h:346
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:327
size_t fTasksCount
Definition: TThreadPool.h:351
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:123
TLine * l
Definition: textangle.C:4
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:122
volatile bool fStopped
Definition: TThreadPool.h:349
taskqueue_t fTasks
Definition: TThreadPool.h:340
size_t IdleThreads() const
Definition: TThreadPool.h:230
static void * Monitor(void *arg)
Definition: TThreadPool.h:235
size_t fSuccessfulTasks
Definition: TThreadPool.h:350
TThread * fThreadJoinHelper
Definition: TThreadPool.h:347
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:344
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:332
R__EXTERN C unsigned int sleep(unsigned int seconds)
size_t SuccessfulTasks() const
Definition: TThreadPool.h:226
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:188
#define NULL
Definition: Rtypes.h:82
size_t fIdleThreads
Definition: TThreadPool.h:352
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:95