Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TThreadPool.h
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Anar Manafov 20/09/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadPool
13#define ROOT_TThreadPool
14
15//////////////////////////////////////////////////////////////////////////
16// //
17// TThreadPool //
18// //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22// ROOT
23#include "RtypesCore.h"
24#include "TMutex.h"
25#include "TCondition.h"
26#include "TThread.h"
27// STD
28#include <queue>
29#include <vector>
30#include <iostream>
31#include <sstream>
32#include <utility>
33#ifdef _MSC_VER
34#define sleep(s) _sleep(s)
35#else
36#include <unistd.h>
37#endif
38
39
40//////////////////////////////////////////////////////////////////////////
41// //
42// TNonCopyable //
43// Class which makes child to be non-copyable object. //
44// //
45//////////////////////////////////////////////////////////////////////////
47protected:
50private:
53};
54
55//////////////////////////////////////////////////////////////////////////
56// //
57// TThreadPoolTaskImp //
58// A base class for thread pool tasks. Users must inherit their //
59// tasks classes from it. //
60// Example: //
61// class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
62// //
63// in this example, //
64// TTestTask - is a user class, which implements //
65// thread pool task object. //
66// int - is a type of argument to TTestTask::run method. //
67// //
68// Please see the tutorial "tutorials/thread/threadPool.C" for //
69// more details on how to use TThreadPool. //
70// //
71//////////////////////////////////////////////////////////////////////////
72template <class aTask, class aParam>
74public:
75 bool run(aParam &param) {
76 aTask *pThis = reinterpret_cast<aTask *>(this);
77 return pThis->runTask(param);
78 }
79};
80
81//////////////////////////////////////////////////////////////////////////
82// //
83// TThreadPoolTask //
84// This is a supporting class for TThreadPool. //
85// It wraps users task objects in order to pass tasks arguments in //
86// type-safe way. //
87// //
88//////////////////////////////////////////////////////////////////////////
89template <class aTask, class aParam>
91public:
93
94public:
95 TThreadPoolTask(task_t &task, aParam &param):
96 fTask(task),
97 fTaskParam(param) {
98 }
99 bool run() {
100 return fTask.run(fTaskParam);
101 }
102
103private:
106};
107
108//////////////////////////////////////////////////////////////////////////
109// //
110// TThreadPool //
111// This class implement a simple Thread Pool pattern. //
112// So far it supports only one type of queue - FIFO //
113// //
114// Please see the tutorial "tutorials/thread/threadPool.C" for //
115// more details on how to use TThreadPool. //
116// //
117//////////////////////////////////////////////////////////////////////////
118template <class aTask, class aParam>
119class TThreadPool : public TNonCopyable {
120
122 typedef std::queue<task_t*> taskqueue_t;
123 typedef std::vector<TThread*> threads_array_t;
124
125public:
126 TThreadPool(size_t threadsCount, bool needDbg = false):
127 fStopped(false),
129 fTasksCount(0),
130 fIdleThreads(threadsCount),
131 fSilent(!needDbg) {
135
136 for (size_t i = 0; i < threadsCount; ++i) {
137 TThread *pThread = new TThread(&TThreadPool::Executor, this);
138 fThreads.push_back(pThread);
139 pThread->Run();
140 }
141
143
144 if (needDbg) {
147 }
148 }
149
151 Stop();
152 // deleting threads
153 threads_array_t::const_iterator iter = fThreads.begin();
154 threads_array_t::const_iterator iter_end = fThreads.end();
155 for (; iter != iter_end; ++iter)
156 delete(*iter);
157
158 delete fThreadJoinHelper;
159
160 delete fThreadNeeded;
161 delete fThreadAvailable;
162 delete fAllTasksDone;
163 }
164
165 void AddThread() {
166 TLockGuard lock(&fMutex);
167 TThread *pThread = new TThread(&TThreadPool::Executor, this);
168 fThreads.push_back(pThread);
169 pThread->Run();
170 ++fIdleThreads;
171 }
172
173 void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
174 {
175 DbgLog("Main thread. Try to push a task");
176
177 TLockGuard lock(&fMutex);
178 task_t *t = new task_t(task, param);
179 fTasks.push(t);
180 ++fTasksCount;
181
182 DbgLog("Main thread. the task is pushed");
183 }
184 TLockGuard lock(&fMutex);
186 }
187
188 void Stop(bool processRemainingJobs = false) {
189 // prevent more jobs from being added to the queue
190 if (fStopped)
191 return;
192
193 if (processRemainingJobs) {
194 TLockGuard lock(&fMutex);
195 // wait for queue to drain
196 while (!fTasks.empty() && !fStopped) {
197 DbgLog("Main thread is waiting");
199 DbgLog("Main thread is DONE waiting");
200 }
201 }
202 // tell all threads to stop
203 {
204 TLockGuard lock(&fMutex);
205 fStopped = true;
207 DbgLog("Main threads requests to STOP");
208 }
209
210 // Waiting for all threads to complete
213 }
214
215 void Drain() {
216 // This method stops the calling thread until the task queue is empty
217
220 }
221
222 size_t TasksCount() const {
223 return fTasksCount;
224 }
225
226 size_t SuccessfulTasks() const {
227 return fSuccessfulTasks;
228 }
229
230 size_t IdleThreads() const {
231 return fIdleThreads;
232 }
233
234private:
235 static void* Monitor(void *arg) {
236 if (NULL == arg)
237 return NULL;
238
239 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
240 while (true && !pThis->fStopped) {
241 std::stringstream ss;
242 ss
243 << ">>>> Check for tasks."
244 << " Number of Tasks: " << pThis->fTasks.size()
245 << "; Idle threads: " << pThis->IdleThreads();
246 pThis->DbgLog(ss.str());
247 sleep(1);
248 }
249 return NULL;
250 }
251
252 static void* Executor(void *arg) {
253 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
254
255 while (!pThis->fStopped) {
256 task_t *task(NULL);
257
258 // There is a task, let's take it
259 {
260 // Find a task to perform
261 TLockGuard lock(&pThis->fMutex);
262 if (pThis->fTasks.empty() && !pThis->fStopped) {
263 pThis->DbgLog("waiting for a task");
264
265 if (pThis->fThreads.size() == pThis->fIdleThreads) {
267 pThis->fAllTasksDone->Broadcast();
268 }
269
270 // No tasks, we wait for a task to come
271 pThis->fThreadNeeded->Wait();
272
273 pThis->DbgLog("done waiting for tasks");
274 }
275 }
276
277 {
278 TLockGuard lock(&pThis->fMutex);
279 if (!pThis->fTasks.empty()) {
280 --pThis->fIdleThreads;
281 task = pThis->fTasks.front();
282 pThis->fTasks.pop();
283
284 pThis->DbgLog("get the task");
285 } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
287 pThis->fAllTasksDone->Broadcast();
288 }
289 pThis->DbgLog("done Check <<<<");
290 }
291
292 // Execute the task
293 if (task) {
294 pThis->DbgLog("Run the task");
295
296 if (task->run()) {
297 TLockGuard lock(&pThis->fMutex);
298 ++pThis->fSuccessfulTasks;
299 }
300 delete task;
301 task = NULL;
302
303 TLockGuard lock(&pThis->fMutex);
304 ++pThis->fIdleThreads;
305
306 pThis->DbgLog("Done Running the task");
307 }
308 // Task is done, report that the thread is free
309 TLockGuard lock(&pThis->fMutex);
310 pThis->fThreadAvailable->Broadcast();
311 }
312
313 pThis->DbgLog("**** DONE ***");
314 return NULL;
315 }
316
317 static void *JoinHelper(void *arg) {
318 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
319 threads_array_t::const_iterator iter = pThis->fThreads.begin();
320 threads_array_t::const_iterator iter_end = pThis->fThreads.end();
321 for (; iter != iter_end; ++iter)
322 (*iter)->Join();
323
324 return NULL;
325 }
326
327 static bool IsThreadActive(TThread *pThread) {
328 // so far we consider only kRunningState as activity
329 return (pThread->GetState() == TThread::kRunningState);
330 }
331
332 void DbgLog(const std::string &msg) {
333 if (fSilent)
334 return;
336 std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
337 }
338
339private:
349 volatile bool fStopped;
354 bool fSilent; // No DBG messages
355};
356
357#endif
R__EXTERN C unsigned int sleep(unsigned int seconds)
Int_t Broadcast()
Definition TCondition.h:54
Int_t Wait()
Wait to be signaled.
const TNonCopyable & operator=(const TNonCopyable &)
TNonCopyable(const TNonCopyable &)
bool run(aParam &param)
Definition TThreadPool.h:75
TThreadPoolTask(task_t &task, aParam &param)
Definition TThreadPool.h:95
TThreadPoolTaskImp< aTask, aParam > task_t
Definition TThreadPool.h:92
TThreadPool(size_t threadsCount, bool needDbg=false)
TThreadPoolTask< aTask, aParam > task_t
TCondition * fThreadAvailable
size_t fSuccessfulTasks
threads_array_t fThreads
size_t TasksCount() const
TMutex fMutexAllTasksDone
void Stop(bool processRemainingJobs=false)
TThread * fThreadJoinHelper
TCondition * fAllTasksDone
static void * Executor(void *arg)
size_t fIdleThreads
static bool IsThreadActive(TThread *pThread)
static void * JoinHelper(void *arg)
void DbgLog(const std::string &msg)
static void * Monitor(void *arg)
TCondition * fThreadNeeded
TMutex fMutex
size_t fTasksCount
TMutex fDbgOutputMutex
void AddThread()
size_t IdleThreads() const
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
taskqueue_t fTasks
TThread * fThreadMonitor
volatile bool fStopped
size_t SuccessfulTasks() const
std::vector< TThread * > threads_array_t
std::queue< task_t * > taskqueue_t
<div class="legacybox"><h2>Legacy Code</h2> TThread is a legacy interface: there will be no bug fixes...
Definition TThread.h:40
EState GetState() const
Definition TThread.h:129
Long_t Join(void **ret=nullptr)
Join this thread.
Definition TThread.cxx:510
static Long_t SelfId()
Static method returning the id for the current thread.
Definition TThread.cxx:549
Int_t Run(void *arg=nullptr, const int affinity=-1)
Start the thread.
Definition TThread.cxx:566
@ kRunningState
Definition TThread.h:64
TLine l
Definition textangle.C:4