Logo ROOT   6.21/01
Reference Guide
THttpWSHandler.cxx
Go to the documentation of this file.
1 // $Id$
2 // Author: Sergey Linev 20/10/2017
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "THttpWSHandler.h"
13 
14 #include "THttpWSEngine.h"
15 #include "THttpCallArg.h"
16 #include "TSystem.h"
17 
18 #include <thread>
19 #include <chrono>
20 
21 /////////////////////////////////////////////////////////////////////////
22 ///
23 /// THttpWSHandler
24 ///
25 /// Class for user-side handling of websocket with THttpServer
26 /// 1. Create derived from THttpWSHandler class and implement
27 /// ProcessWS() method, where all web sockets request handled.
28 /// 2. Register instance of derived class to running THttpServer
29 ///
30 /// TUserWSHandler *handler = new TUserWSHandler("name1","title");
31 /// THttpServer *server = new THttpServer("http:8090");
32 /// server->Register("/subfolder", handler)
33 ///
34 /// 3. Now server can accept web socket connection from outside.
35 /// For instance, from JavaScirpt one can connect to it with code:
36 ///
37 /// var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
38 ///
39 /// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
40 ///
41 /// if (arg->IsMethod("WS_CONNECT")) {
42 /// return true; // to accept incoming request
43 /// }
44 ///
45 /// if (arg->IsMethod("WS_READY")) {
46 /// fWSId = arg->GetWSId(); // fWSId should be member of the user class
47 /// return true; // connection established
48 /// }
49 ///
50 /// if (arg->IsMethod("WS_CLOSE")) {
51 /// fWSId = 0;
52 /// return true; // confirm close of socket
53 /// }
54 ///
55 /// if (arg->IsMethod("WS_DATA")) {
56 /// // received data stored as POST data
57 /// std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
58 /// std::cout << "got string " << str << std::endl;
59 /// // immediately send data back using websocket id
60 /// SendCharStarWS(fWSId, "our reply");
61 /// return true;
62 /// }
63 ///
64 ///////////////////////////////////////////////////////////////////////////
65 
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// normal constructor
70 
71 THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
72 {
73 }
74 
75 ////////////////////////////////////////////////////////////////////////////////
76 /// destructor
77 /// Make sure that all sending threads are stopped correctly
78 
80 {
81  SetDisabled();
82 
83  std::vector<std::shared_ptr<THttpWSEngine>> clr;
84 
85  {
86  std::lock_guard<std::mutex> grd(fMutex);
87  std::swap(clr, fEngines);
88  }
89 
90  for (auto &eng : clr) {
91  eng->fDisabled = true;
92  if (eng->fHasSendThrd) {
93  eng->fHasSendThrd = false;
94  if (eng->fWaiting)
95  eng->fCond.notify_all();
96  eng->fSendThrd.join();
97  }
98  eng->ClearHandle(kTRUE); // terminate connection before starting destructor
99  }
100 }
101 
102 /// Returns current number of websocket connections
104 {
105  std::lock_guard<std::mutex> grd(fMutex);
106  return fEngines.size();
107 }
108 
109 ////////////////////////////////////////////////////////////////////////////////
110 /// Return websocket id with given sequential number
111 /// Number of websockets returned with GetNumWS() method
112 
114 {
115  std::lock_guard<std::mutex> grd(fMutex);
116  auto iter = fEngines.begin() + num;
117  return (*iter)->GetId();
118 }
119 
120 ////////////////////////////////////////////////////////////////////////////////
121 /// Find websocket connection handle with given id
122 /// If book_send parameter specified, have to book send operation under the mutex
123 
124 std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
125 {
126  if (IsDisabled())
127  return nullptr;
128 
129  std::lock_guard<std::mutex> grd(fMutex);
130 
131  for (auto &eng : fEngines)
132  if (eng->GetId() == wsid) {
133 
134  // not allow to work with disabled engine
135  if (eng->fDisabled)
136  return nullptr;
137 
138  if (book_send) {
139  if (eng->fMTSend) {
140  Error("FindEngine", "Try to book next send operation before previous completed");
141  return nullptr;
142  }
143  eng->fMTSend = kTRUE;
144  }
145  return eng;
146  }
147 
148  return nullptr;
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Remove and destroy WS connection
153 
154 void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
155 {
156  if (!engine) return;
157 
158  {
159  std::lock_guard<std::mutex> grd(fMutex);
160 
161  for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
162  if (*iter == engine) {
163  if (engine->fMTSend)
164  Error("RemoveEngine", "Trying to remove WS engine during send operation");
165 
166  engine->fDisabled = true;
167  fEngines.erase(iter);
168  break;
169  }
170  }
171 
172  engine->ClearHandle(terminate);
173 
174  if (engine->fHasSendThrd) {
175  engine->fHasSendThrd = false;
176  if (engine->fWaiting)
177  engine->fCond.notify_all();
178  engine->fSendThrd.join();
179  }
180 }
181 
182 ////////////////////////////////////////////////////////////////////////////////
183 /// Process request to websocket
184 /// Different kind of requests coded into THttpCallArg::Method
185 /// "WS_CONNECT" - connection request
186 /// "WS_READY" - connection ready
187 /// "WS_CLOSE" - connection closed
188 /// All other are normal data, which are delivered to users
189 
190 Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
191 {
192  if (IsDisabled())
193  return kFALSE;
194 
195  if (!arg->GetWSId())
196  return ProcessWS(arg.get());
197 
198  // normally here one accept or reject connection requests
199  if (arg->IsMethod("WS_CONNECT"))
200  return ProcessWS(arg.get());
201 
202  auto engine = FindEngine(arg->GetWSId());
203 
204  if (arg->IsMethod("WS_READY")) {
205 
206  if (engine) {
207  Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
208  RemoveEngine(engine, kTRUE);
209  }
210 
211  engine = arg->TakeWSEngine();
212  {
213  std::lock_guard<std::mutex> grd(fMutex);
214  fEngines.emplace_back(engine);
215  }
216 
217  if (!ProcessWS(arg.get())) {
218  // if connection refused, remove engine again
219  RemoveEngine(engine, kTRUE);
220  return kFALSE;
221  }
222 
223  return kTRUE;
224  }
225 
226  if (arg->IsMethod("WS_CLOSE")) {
227  // connection is closed, one can remove handle
228 
229  RemoveEngine(engine);
230 
231  return ProcessWS(arg.get());
232  }
233 
234  if (engine && engine->PreProcess(arg)) {
235  PerformSend(engine);
236  return kTRUE;
237  }
238 
239  Bool_t res = ProcessWS(arg.get());
240 
241  if (engine)
242  engine->PostProcess(arg);
243 
244  return res;
245 }
246 
247 ////////////////////////////////////////////////////////////////////////////////
248 /// Close connection with given websocket id
249 
251 {
252  auto engine = FindEngine(wsid);
253 
254  RemoveEngine(engine, kTRUE);
255 }
256 
257 ////////////////////////////////////////////////////////////////////////////////
258 /// Send data stored in the buffer
259 /// Returns 0 - when operation was executed immediately
260 /// 1 - when send operation will be performed in different thread
261 
262 Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
263 {
264  if (IsSyncMode() || !engine->SupportSendThrd()) {
265  // this is case of longpoll engine, no extra thread is required for it
266  if (engine->CanSendDirectly())
267  return PerformSend(engine);
268 
269  // handling will be performed in following http request handler
270 
271  if (!IsSyncMode()) return 1;
272 
273  // now we should wait until next polling requests is processed
274  // or when connection is closed or handler is shutdown
275 
276  Int_t sendcnt = fSendCnt, loopcnt(0);
277 
278  while (!IsDisabled() && !engine->fDisabled) {
280  // if send counter changed - current send operation is completed
281  if (sendcnt != fSendCnt)
282  return 0;
283  if (loopcnt++ > 1000) {
284  loopcnt = 0;
285  std::this_thread::sleep_for(std::chrono::milliseconds(1));
286  }
287  }
288 
289  return -1;
290  }
291 
292  // probably this thread can continuously run
293  std::thread thrd([this, engine] {
294  while (!IsDisabled() && !engine->fDisabled) {
295  PerformSend(engine);
296  if (IsDisabled() || engine->fDisabled) break;
297  std::unique_lock<std::mutex> lk(engine->fMutex);
298  if (engine->fKind == THttpWSEngine::kNone) {
299  engine->fWaiting = true;
300  engine->fCond.wait(lk);
301  engine->fWaiting = false;
302  }
303  }
304  });
305 
306  engine->fSendThrd.swap(thrd);
307 
308  engine->fHasSendThrd = true;
309 
310  return 1;
311 }
312 
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Perform send operation, stored in buffer
316 
317 Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
318 {
319  {
320  std::lock_guard<std::mutex> grd(engine->fMutex);
321 
322  // no need to do something - operation was processed already by somebody else
323  if (engine->fKind == THttpWSEngine::kNone)
324  return 0;
325 
326  if (engine->fSending)
327  return 1;
328  engine->fSending = true;
329  }
330 
331  if (IsDisabled() || engine->fDisabled)
332  return 0;
333 
334  switch (engine->fKind) {
336  engine->Send(engine->fData.data(), engine->fData.length());
337  break;
339  engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
340  break;
342  engine->SendCharStar(engine->fData.c_str());
343  break;
344  default:
345  break;
346  }
347 
348  engine->fData.clear();
349  engine->fHdr.clear();
350 
351  {
352  std::lock_guard<std::mutex> grd(engine->fMutex);
353  engine->fSending = false;
354  engine->fKind = THttpWSEngine::kNone;
355  }
356 
357  return CompleteSend(engine);
358 }
359 
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// Complete current send operation
363 
364 Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
365 {
366  fSendCnt++;
367  engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
368  CompleteWSSend(engine->GetId());
369  return 0; // indicates that operation is completed
370 }
371 
372 
373 ////////////////////////////////////////////////////////////////////////////////
374 /// Send binary data via given websocket id
375 /// Returns -1 - in case of error
376 /// 0 - when operation was executed immediately
377 /// 1 - when send operation will be performed in different thread
378 
379 Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
380 {
381  auto engine = FindEngine(wsid, kTRUE);
382  if (!engine) return -1;
383 
384  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
385  engine->Send(buf, len);
386  return CompleteSend(engine);
387  }
388 
389  bool notify = false;
390 
391  // now we indicate that there is data and any thread can access it
392  {
393  std::lock_guard<std::mutex> grd(engine->fMutex);
394 
395  if (engine->fKind != THttpWSEngine::kNone) {
396  Error("SendWS", "Data kind is not empty - something screwed up");
397  return -1;
398  }
399 
400  notify = engine->fWaiting;
401 
402  engine->fKind = THttpWSEngine::kData;
403 
404  engine->fData.resize(len);
405  std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
406  }
407 
408  if (engine->fHasSendThrd) {
409  if (notify) engine->fCond.notify_all();
410  return 1;
411  }
412 
413  return RunSendingThrd(engine);
414 }
415 
416 
417 ////////////////////////////////////////////////////////////////////////////////
418 /// Send binary data with text header via given websocket id
419 /// Returns -1 - in case of error,
420 /// 0 - when operation was executed immediately,
421 /// 1 - when send operation will be performed in different thread,
422 
423 Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
424 {
425  auto engine = FindEngine(wsid, kTRUE);
426  if (!engine) return -1;
427 
428  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
429  engine->SendHeader(hdr, buf, len);
430  return CompleteSend(engine);
431  }
432 
433  bool notify = false;
434 
435  // now we indicate that there is data and any thread can access it
436  {
437  std::lock_guard<std::mutex> grd(engine->fMutex);
438 
439  if (engine->fKind != THttpWSEngine::kNone) {
440  Error("SendWS", "Data kind is not empty - something screwed up");
441  return -1;
442  }
443 
444  notify = engine->fWaiting;
445 
446  engine->fKind = THttpWSEngine::kHeader;
447 
448  engine->fHdr = hdr;
449  engine->fData.resize(len);
450  std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
451  }
452 
453  if (engine->fHasSendThrd) {
454  if (notify) engine->fCond.notify_all();
455  return 1;
456  }
457 
458  return RunSendingThrd(engine);
459 }
460 
461 ////////////////////////////////////////////////////////////////////////////////
462 /// Send string via given websocket id
463 /// Returns -1 - in case of error,
464 /// 0 - when operation was executed immediately,
465 /// 1 - when send operation will be performed in different thread,
466 
468 {
469  auto engine = FindEngine(wsid, kTRUE);
470  if (!engine) return -1;
471 
472  if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
473  engine->SendCharStar(str);
474  return CompleteSend(engine);
475  }
476 
477  bool notify = false;
478 
479  // now we indicate that there is data and any thread can access it
480  {
481  std::lock_guard<std::mutex> grd(engine->fMutex);
482 
483  if (engine->fKind != THttpWSEngine::kNone) {
484  Error("SendWS", "Data kind is not empty - something screwed up");
485  return -1;
486  }
487 
488  notify = engine->fWaiting;
489 
490  engine->fKind = THttpWSEngine::kText;
491  engine->fData = str;
492  }
493 
494  if (engine->fHasSendThrd) {
495  if (notify) engine->fCond.notify_all();
496  return 1;
497  }
498 
499  return RunSendingThrd(engine);
500 }
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id Returns -1 - in case of error 0 - when operation was executed...
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition: TSystem.cxx:410
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id Returns -1 - in case of error, 0 - when operation was executed imm...
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method "WS_CONNECT" ...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t fSendCnt
! counter for completed send operations
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
enum THttpWSEngine::@146 kNone
! kind of operation
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer Returns 0 - when operation was executed immediately 1 - when send oper...
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
R__EXTERN TSystem * gSystem
Definition: TSystem.h:557
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
THttpWSHandler.
void swap(RDirectoryEntry &e1, RDirectoryEntry &e2) noexcept
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
Bool_t fSyncMode
! is handler runs in synchronous mode (default, no multi-threading)
const Bool_t kFALSE
Definition: RtypesCore.h:88
#define ClassImp(name)
Definition: Rtypes.h:365
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::mutex fMutex
! protect list of engines
const Bool_t kTRUE
Definition: RtypesCore.h:87
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id Returns -1 - in case of error, 0 - when operation was executed immediately, 1 - when send operation will be performed in different thread,.
char name[80]
Definition: TGX11.cxx:109
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.