Logo ROOT  
Reference Guide
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/////////////////////////////////////////////////////////////////////////
22///
23/// THttpWSHandler
24///
25/// Class for user-side handling of websocket with THttpServer
26/// 1. Create derived from THttpWSHandler class and implement
27/// ProcessWS() method, where all web sockets request handled.
28/// 2. Register instance of derived class to running THttpServer
29///
30/// TUserWSHandler *handler = new TUserWSHandler("name1","title");
31/// THttpServer *server = new THttpServer("http:8090");
32/// server->Register("/subfolder", handler)
33///
34/// 3. Now server can accept web socket connection from outside.
35/// For instance, from JavaScirpt one can connect to it with code:
36///
37/// var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
38///
39/// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
40///
41/// if (arg->IsMethod("WS_CONNECT")) {
42/// return true; // to accept incoming request
43/// }
44///
45/// if (arg->IsMethod("WS_READY")) {
46/// fWSId = arg->GetWSId(); // fWSId should be member of the user class
47/// return true; // connection established
48/// }
49///
50/// if (arg->IsMethod("WS_CLOSE")) {
51/// fWSId = 0;
52/// return true; // confirm close of socket
53/// }
54///
55/// if (arg->IsMethod("WS_DATA")) {
56/// // received data stored as POST data
57/// std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
58/// std::cout << "got string " << str << std::endl;
59/// // immediately send data back using websocket id
60/// SendCharStarWS(fWSId, "our reply");
61/// return true;
62/// }
63///
64///////////////////////////////////////////////////////////////////////////
65
67
68////////////////////////////////////////////////////////////////////////////////
69/// normal constructor
70
71THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
72{
73}
74
75////////////////////////////////////////////////////////////////////////////////
76/// destructor
77/// Make sure that all sending threads are stopped correctly
78
80{
82
83 std::vector<std::shared_ptr<THttpWSEngine>> clr;
84
85 {
86 std::lock_guard<std::mutex> grd(fMutex);
87 std::swap(clr, fEngines);
88 }
89
90 for (auto &eng : clr) {
91 eng->fDisabled = true;
92 if (eng->fHasSendThrd) {
93 eng->fHasSendThrd = false;
94 if (eng->fWaiting)
95 eng->fCond.notify_all();
96 eng->fSendThrd.join();
97 }
98 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
99 }
100}
101
102/// Returns current number of websocket connections
104{
105 std::lock_guard<std::mutex> grd(fMutex);
106 return fEngines.size();
107}
108
109////////////////////////////////////////////////////////////////////////////////
110/// Return websocket id with given sequential number
111/// Number of websockets returned with GetNumWS() method
112
114{
115 std::lock_guard<std::mutex> grd(fMutex);
116 auto iter = fEngines.begin() + num;
117 return (*iter)->GetId();
118}
119
120////////////////////////////////////////////////////////////////////////////////
121/// Find websocket connection handle with given id
122/// If book_send parameter specified, have to book send operation under the mutex
123
124std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
125{
126 if (IsDisabled())
127 return nullptr;
128
129 std::lock_guard<std::mutex> grd(fMutex);
130
131 for (auto &eng : fEngines)
132 if (eng->GetId() == wsid) {
133
134 // not allow to work with disabled engine
135 if (eng->fDisabled)
136 return nullptr;
137
138 if (book_send) {
139 if (eng->fMTSend) {
140 Error("FindEngine", "Try to book next send operation before previous completed");
141 return nullptr;
142 }
143 eng->fMTSend = kTRUE;
144 }
145 return eng;
146 }
147
148 return nullptr;
149}
150
151////////////////////////////////////////////////////////////////////////////////
152/// Remove and destroy WS connection
153
154void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
155{
156 if (!engine) return;
157
158 {
159 std::lock_guard<std::mutex> grd(fMutex);
160
161 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
162 if (*iter == engine) {
163 if (engine->fMTSend)
164 Error("RemoveEngine", "Trying to remove WS engine during send operation");
165
166 engine->fDisabled = true;
167 fEngines.erase(iter);
168 break;
169 }
170 }
171
172 engine->ClearHandle(terminate);
173
174 if (engine->fHasSendThrd) {
175 engine->fHasSendThrd = false;
176 if (engine->fWaiting)
177 engine->fCond.notify_all();
178 engine->fSendThrd.join();
179 }
180}
181
182////////////////////////////////////////////////////////////////////////////////
183/// Process request to websocket
184/// Different kind of requests coded into THttpCallArg::Method
185/// "WS_CONNECT" - connection request
186/// "WS_READY" - connection ready
187/// "WS_CLOSE" - connection closed
188/// All other are normal data, which are delivered to users
189
190Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
191{
192 if (IsDisabled())
193 return kFALSE;
194
195 if (!arg->GetWSId())
196 return ProcessWS(arg.get());
197
198 // normally here one accept or reject connection requests
199 if (arg->IsMethod("WS_CONNECT"))
200 return ProcessWS(arg.get());
201
202 auto engine = FindEngine(arg->GetWSId());
203
204 if (arg->IsMethod("WS_READY")) {
205
206 if (engine) {
207 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
208 RemoveEngine(engine, kTRUE);
209 }
210
211 engine = arg->TakeWSEngine();
212 {
213 std::lock_guard<std::mutex> grd(fMutex);
214 fEngines.emplace_back(engine);
215 }
216
217 if (!ProcessWS(arg.get())) {
218 // if connection refused, remove engine again
219 RemoveEngine(engine, kTRUE);
220 return kFALSE;
221 }
222
223 return kTRUE;
224 }
225
226 if (arg->IsMethod("WS_CLOSE")) {
227 // connection is closed, one can remove handle
228
229 RemoveEngine(engine);
230
231 return ProcessWS(arg.get());
232 }
233
234 if (engine && engine->PreProcess(arg)) {
235 PerformSend(engine);
236 return kTRUE;
237 }
238
239 Bool_t res = ProcessWS(arg.get());
240
241 if (engine)
242 engine->PostProcess(arg);
243
244 return res;
245}
246
247////////////////////////////////////////////////////////////////////////////////
248/// Close connection with given websocket id
249
251{
252 auto engine = FindEngine(wsid);
253
254 RemoveEngine(engine, kTRUE);
255}
256
257////////////////////////////////////////////////////////////////////////////////
258/// Send data stored in the buffer
259/// Returns 0 - when operation was executed immediately
260/// 1 - when send operation will be performed in different thread
261
262Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
263{
264 if (IsSyncMode() || !engine->SupportSendThrd()) {
265 // this is case of longpoll engine, no extra thread is required for it
266 if (engine->CanSendDirectly())
267 return PerformSend(engine);
268
269 // handling will be performed in following http request handler
270
271 if (!IsSyncMode()) return 1;
272
273 // now we should wait until next polling requests is processed
274 // or when connection is closed or handler is shutdown
275
276 Int_t sendcnt = fSendCnt, loopcnt(0);
277
278 while (!IsDisabled() && !engine->fDisabled) {
280 // if send counter changed - current send operation is completed
281 if (sendcnt != fSendCnt)
282 return 0;
283 if (loopcnt++ > 1000) {
284 loopcnt = 0;
285 std::this_thread::sleep_for(std::chrono::milliseconds(1));
286 }
287 }
288
289 return -1;
290 }
291
292 // probably this thread can continuously run
293 std::thread thrd([this, engine] {
294 while (!IsDisabled() && !engine->fDisabled) {
295 PerformSend(engine);
296 if (IsDisabled() || engine->fDisabled) break;
297 std::unique_lock<std::mutex> lk(engine->fMutex);
298 if (engine->fKind == THttpWSEngine::kNone) {
299 engine->fWaiting = true;
300 engine->fCond.wait(lk);
301 engine->fWaiting = false;
302 }
303 }
304 });
305
306 engine->fSendThrd.swap(thrd);
307
308 engine->fHasSendThrd = true;
309
310 return 1;
311}
312
313
314////////////////////////////////////////////////////////////////////////////////
315/// Perform send operation, stored in buffer
316
317Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
318{
319 {
320 std::lock_guard<std::mutex> grd(engine->fMutex);
321
322 // no need to do something - operation was processed already by somebody else
323 if (engine->fKind == THttpWSEngine::kNone)
324 return 0;
325
326 if (engine->fSending)
327 return 1;
328 engine->fSending = true;
329 }
330
331 if (IsDisabled() || engine->fDisabled)
332 return 0;
333
334 switch (engine->fKind) {
336 engine->Send(engine->fData.data(), engine->fData.length());
337 break;
339 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
340 break;
342 engine->SendCharStar(engine->fData.c_str());
343 break;
344 default:
345 break;
346 }
347
348 engine->fData.clear();
349 engine->fHdr.clear();
350
351 {
352 std::lock_guard<std::mutex> grd(engine->fMutex);
353 engine->fSending = false;
354 engine->fKind = THttpWSEngine::kNone;
355 }
356
357 return CompleteSend(engine);
358}
359
360
361////////////////////////////////////////////////////////////////////////////////
362/// Complete current send operation
363
364Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
365{
366 fSendCnt++;
367 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
368 CompleteWSSend(engine->GetId());
369 return 0; // indicates that operation is completed
370}
371
372
373////////////////////////////////////////////////////////////////////////////////
374/// Send binary data via given websocket id
375/// Returns -1 - in case of error
376/// 0 - when operation was executed immediately
377/// 1 - when send operation will be performed in different thread
378
379Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
380{
381 auto engine = FindEngine(wsid, kTRUE);
382 if (!engine) return -1;
383
384 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
385 engine->Send(buf, len);
386 return CompleteSend(engine);
387 }
388
389 bool notify = false;
390
391 // now we indicate that there is data and any thread can access it
392 {
393 std::lock_guard<std::mutex> grd(engine->fMutex);
394
395 if (engine->fKind != THttpWSEngine::kNone) {
396 Error("SendWS", "Data kind is not empty - something screwed up");
397 return -1;
398 }
399
400 notify = engine->fWaiting;
401
402 engine->fKind = THttpWSEngine::kData;
403
404 engine->fData.resize(len);
405 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
406 }
407
408 if (engine->fHasSendThrd) {
409 if (notify) engine->fCond.notify_all();
410 return 1;
411 }
412
413 return RunSendingThrd(engine);
414}
415
416
417////////////////////////////////////////////////////////////////////////////////
418/// Send binary data with text header via given websocket id
419/// Returns -1 - in case of error,
420/// 0 - when operation was executed immediately,
421/// 1 - when send operation will be performed in different thread,
422
423Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
424{
425 auto engine = FindEngine(wsid, kTRUE);
426 if (!engine) return -1;
427
428 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
429 engine->SendHeader(hdr, buf, len);
430 return CompleteSend(engine);
431 }
432
433 bool notify = false;
434
435 // now we indicate that there is data and any thread can access it
436 {
437 std::lock_guard<std::mutex> grd(engine->fMutex);
438
439 if (engine->fKind != THttpWSEngine::kNone) {
440 Error("SendWS", "Data kind is not empty - something screwed up");
441 return -1;
442 }
443
444 notify = engine->fWaiting;
445
446 engine->fKind = THttpWSEngine::kHeader;
447
448 engine->fHdr = hdr;
449 engine->fData.resize(len);
450 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
451 }
452
453 if (engine->fHasSendThrd) {
454 if (notify) engine->fCond.notify_all();
455 return 1;
456 }
457
458 return RunSendingThrd(engine);
459}
460
461////////////////////////////////////////////////////////////////////////////////
462/// Send string via given websocket id
463/// Returns -1 - in case of error,
464/// 0 - when operation was executed immediately,
465/// 1 - when send operation will be performed in different thread,
466
468{
469 auto engine = FindEngine(wsid, kTRUE);
470 if (!engine) return -1;
471
472 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
473 engine->SendCharStar(str);
474 return CompleteSend(engine);
475 }
476
477 bool notify = false;
478
479 // now we indicate that there is data and any thread can access it
480 {
481 std::lock_guard<std::mutex> grd(engine->fMutex);
482
483 if (engine->fKind != THttpWSEngine::kNone) {
484 Error("SendWS", "Data kind is not empty - something screwed up");
485 return -1;
486 }
487
488 notify = engine->fWaiting;
489
490 engine->fKind = THttpWSEngine::kText;
491 engine->fData = str;
492 }
493
494 if (engine->fHasSendThrd) {
495 if (notify) engine->fCond.notify_all();
496 return 1;
497 }
498
499 return RunSendingThrd(engine);
500}
const Bool_t kFALSE
Definition: RtypesCore.h:90
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassImp(name)
Definition: Rtypes.h:361
char name[80]
Definition: TGX11.cxx:109
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
enum THttpWSEngine::@149 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method "WS_CONNECT" ...
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id Returns -1 - in case of error,...
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id Returns -1 - in case of error, 0 - when operation was executed imm...
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
THttpWSHandler.
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id Returns -1 - in case of error 0 - when operation was executed...
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer Returns 0 - when operation was executed immediately 1 - when send oper...
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition: TSystem.cxx:414
void swap(RDirectoryEntry &e1, RDirectoryEntry &e2) noexcept