RESTinio
ws_connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  WebSocket connection routine.
7 */
8 
9 #pragma once
10 
11 #include <queue>
12 
13 #include <restinio/asio_include.hpp>
14 
15 #include <llhttp.h>
16 
17 #include <restinio/impl/include_fmtlib.hpp>
18 
19 #include <restinio/core.hpp>
20 #include <restinio/impl/executor_wrapper.hpp>
21 #include <restinio/impl/write_group_output_ctx.hpp>
22 #include <restinio/websocket/message.hpp>
23 #include <restinio/websocket/impl/ws_parser.hpp>
24 #include <restinio/websocket/impl/ws_protocol_validator.hpp>
25 
26 #include <restinio/utils/impl/safe_uint_truncate.hpp>
27 
28 #include <restinio/compiler_features.hpp>
29 
30 namespace restinio
31 {
32 
33 namespace websocket
34 {
35 
36 namespace basic
37 {
38 
39 namespace impl
40 {
41 
43 
44 //! Max possible size of websocket frame header (a part before payload).
45 constexpr size_t
47 {
48  return 14;
49 }
50 
51 //
52 // ws_outgoing_data_t
53 //
54 
55 //! A queue for outgoing buffers.
57 {
58  public:
59  //! Add buffers to queue.
60  void
61  append( write_group_t wg )
62  {
63  m_awaiting_write_groups.emplace( std::move( wg ) );
64  }
65 
68  {
69  std::optional< write_group_t > result;
70 
71  if( !m_awaiting_write_groups.empty() )
72  {
73  result = std::move( m_awaiting_write_groups.front() );
74  m_awaiting_write_groups.pop();
75  }
76 
77  return result;
78  }
79 
80  private:
81  //! A queue of buffers.
83 };
84 
85 //
86 // connection_input_t
87 //
88 
89 //! Websocket input stuff.
91 {
92  connection_input_t( std::size_t buffer_size )
93  : m_buf{ buffer_size }
94  {}
95 
96  //! websocket parser.
98 
99  //! Input buffer.
101 
102  //! Current payload.
104 
105  //! Prepare parser for reading new http-message.
106  void
108  {
109  m_parser.reset();
110  m_payload.clear();
111  }
112 };
113 
114 //
115 // ws_connection_t
116 //
117 
118 //! Context for handling websocket connections.
119 template <
120  typename Traits,
121  typename WS_Message_Handler >
123  : public ws_connection_base_t
124  , public restinio::impl::executor_wrapper_t< typename Traits::strand_t >
125 {
127 
128  public:
129  using message_handler_t = WS_Message_Handler;
130 
131  using timer_manager_t = typename Traits::timer_manager_t;
133  using timer_guard_t = typename timer_manager_t::timer_guard_t;
134  using logger_t = typename Traits::logger_t;
135  using strand_t = typename Traits::strand_t;
136  using stream_socket_t = typename Traits::stream_socket_t;
137  using lifetime_monitor_t =
139 
141 
142  ws_connection_t(
143  //! Connection id.
144  connection_id_t conn_id,
145  //! Data inherited from http-connection.
146  //! \{
147  restinio::impl::connection_settings_handle_t< Traits > settings,
148  stream_socket_t socket,
149  lifetime_monitor_t lifetime_monitor,
150  //! \}
151  message_handler_t msg_handler )
154  , m_settings{ std::move( settings ) }
155  , m_socket{ std::move( socket ) }
160  , m_logger{ *( m_settings->m_logger ) }
161  {
162  // Notify of a new connection instance.
163  m_logger.trace( [&]{
164  return fmt::format(
166  "[connection:{}] move socket to [ws_connection:{}]" ),
167  connection_id(),
168  connection_id() );
169  } );
170 
171  m_logger.trace( [&]{
172  return fmt::format(
174  "[ws_connection:{}] start connection with {}" ),
175  connection_id(),
177  } );
178 
179  // Inform state listener if it used.
180  m_settings->call_state_listener( [this]() noexcept {
181  return connection_state::notice_t {
182  connection_id(),
185  };
186  } );
187  }
188 
189  ws_connection_t( const ws_connection_t & ) = delete;
190  ws_connection_t( ws_connection_t && ) = delete;
191  ws_connection_t & operator = ( const ws_connection_t & ) = delete;
192  ws_connection_t & operator = ( ws_connection_t && ) = delete;
193 
195  {
196  try
197  {
198  // Notify of a new connection instance.
199  m_logger.trace( [&]{
200  return fmt::format(
202  "[ws_connection:{}] destructor called" ),
203  connection_id() );
204  } );
205  }
206  catch( ... )
207  {}
208  }
209 
210  //! Shutdown websocket.
211  virtual void
212  shutdown() override
213  {
214  asio_ns::dispatch(
215  this->get_executor(),
216  [ this, ctx = shared_from_this() ]
217  // NOTE: this lambda is noexcept since v.0.6.0.
218  () noexcept {
219  try
220  {
221  // An exception from logger shouldn't prevent
222  // main shutdown actions.
224  [&]{
225  return fmt::format(
227  "[ws_connection:{}] shutdown" ),
228  connection_id() );
229  } );
230 
232  graceful_close();
233  }
234  catch( const std::exception & ex )
235  {
237  [&]{
238  return fmt::format(
240  "[ws_connection:{}] shutdown operation error: {}" ),
241  connection_id(),
242  ex.what() );
243  } );
244  }
245  } );
246  }
247 
248  //! Kill websocket.
249  virtual void
250  kill() override
251  {
252  asio_ns::dispatch(
253  this->get_executor(),
254  [ this, ctx = shared_from_this() ]
255  // NOTE: this lambda is noexcept since v.0.6.0.
256  () noexcept
257  {
258  try
259  {
260  // An exception from logger shouldn't prevent
261  // main kill actions.
263  [&]{
264  return fmt::format(
266  "[ws_connection:{}] kill" ),
267  connection_id() );
268  } );
269 
272 
273  close_impl();
274  }
275  catch( const std::exception & ex )
276  {
278  [&]{
279  return fmt::format(
281  "[ws_connection:{}] kill operation error: {}" ),
282  connection_id(),
283  ex.what() );
284  } );
285  }
286  } );
287  }
288 
289  //! Start reading ws-messages.
290  void
291  init_read( ws_handle_t wsh ) override
292  {
294 
295  // Run write message on io_context loop (direct invocation if possible).
296  asio_ns::dispatch(
297  this->get_executor(),
298  [ this, ctx = shared_from_this(), wswh = std::move( wswh ) ]
299  // NOTE: this lambda is noexcept since v.0.6.0.
300  () noexcept
301  {
302  try
303  {
304  // Start timeout checking.
307 
311  }
312  catch( const std::exception & ex )
313  {
316  [&]{
317  return fmt::format(
319  "[ws_connection:{}] unable to init read: {}" ),
320  connection_id(),
321  ex.what() );
322  } );
323  }
324  } );
325  }
326 
327  //! Write pieces of outgoing data.
328  virtual void
330  write_group_t wg,
331  bool is_close_frame ) override
332  {
333  //! Run write message on io_context loop if possible.
334  asio_ns::dispatch(
335  this->get_executor(),
336  [ this,
337  actual_wg = std::move( wg ),
338  ctx = shared_from_this(),
340  // NOTE: this lambda is noexcept since v.0.6.0.
341  () mutable noexcept
342  {
343  try
344  {
347  std::move( actual_wg ),
348  is_close_frame );
349  else
350  {
351  m_logger.warn( [&]{
352  return fmt::format(
354  "[ws_connection:{}] cannot write to websocket: "
355  "write operations disabled" ),
356  connection_id() );
357  } );
358  }
359  }
360  catch( const std::exception & ex )
361  {
364  [&]{
365  return fmt::format(
367  "[ws_connection:{}] unable to write data: {}" ),
368  connection_id(),
369  ex.what() );
370  } );
371  }
372  } );
373  }
374  private:
375  //! Standard close routine.
376  /*!
377  * @note
378  * This method is noexcept since v.0.6.0.
379  */
380  void
381  close_impl() noexcept
382  {
384  [&]() noexcept {
386  [&]{
387  return fmt::format(
389  "[ws_connection:{}] close socket" ),
390  connection_id() );
391  } );
392 
393  // This actions can throw and because of that we have
394  // to wrap them...
396  m_logger,
397  "ws_connection.close_impl.socket.shutdown",
398  [&] {
402  ignored_ec );
403  } );
404 
406  m_logger,
407  "ws_connection.close_impl.socket.close",
408  [&] {
409  m_socket.close();
410  } );
411  } );
412  }
413 
414  //! Start waiting for close-frame.
415  void
417  {
420  }
421 
422  //! Close WebSocket connection in a graceful manner.
423  void
425  {
427  [&]{
430  } );
431  }
432 
433  //! Send close frame to peer.
434  void
435  send_close_frame_to_peer( std::string payload )
436  {
438  bufs.reserve( 2 );
439 
442  final_frame,
444  payload.size() ) );
445 
448 
450 
451  // No more data must be written.
453  }
454 
455  //! Send close frame to peer.
456  void
458  status_code_t code,
459  std::string desc = std::string{} )
460  {
462  }
463 
464  //! Trigger an error.
465  /*!
466  Writes error message to log,
467  closes socket,
468  and sends close frame to user if necessary.
469 
470  @note
471  This method is noexcept since v.0.6.0
472  */
473  template< typename MSG_BUILDER >
474  void
476  status_code_t status,
477  MSG_BUILDER msg_builder ) noexcept
478  {
479  // An exception in logger shouldn't prevent the main actions.
481  m_logger, std::move( msg_builder ) );
482 
483  // This can throw but we have to suppress any exceptions.
485  m_logger, "ws_connection.call_close_handler_if_necessary",
486  [this, status] {
488  } );
489 
491  }
492 
493 
494  //! Start the process of reading ws messages from socket.
495  void
497  {
498  m_logger.trace( [&]{
499  return fmt::format(
501  "[ws_connection:{}] start reading header" ),
502  connection_id() );
503  } );
504 
505  // Prepare parser for consuming new message.
507 
508  if( 0 == m_input.m_buf.length() )
509  {
511  }
512  else
513  {
514  // Has something to read from m_input.m_buf.
517  }
518  }
519 
520  //! Initiate read operation on socket to receive bytes for header.
521  void
523  {
524  m_logger.trace( [&]{
525  return fmt::format(
527  "[ws_connection:{}] continue reading message" ),
528  connection_id() );
529  } );
530 
534  this->get_executor(),
535  [ this, ctx = shared_from_this() ]
536  // NOTE: this lambda is noexcept since v.0.6.0.
537  ( const asio_ns::error_code & ec, std::size_t length ) noexcept
538  {
539  try
540  {
542  }
543  catch( const std::exception & ex )
544  {
547  [&]{
548  return fmt::format(
550  "[ws_connection:{}] after read header "
551  "callback error: {}" ),
552  connection_id(),
553  ex.what() );
554  } );
555  }
556  } ) );
557  }
558 
559  //! Handle read error (reading header or payload)
560  void
561  handle_read_error( const char * desc, const asio_ns::error_code & ec )
562  {
563  // Assume that connection is lost.
566  [&]{
567  return fmt::format(
568  RESTINIO_FMT_FORMAT_STRING( "[ws_connection:{}] {}: {}" ),
569  connection_id(),
570  desc,
571  ec.message() );
572  } );
573  }
574 
575  //! Handle read operation result, when reading header.
576  void
578  const asio_ns::error_code & ec,
579  std::size_t length )
580  {
581  if( !ec )
582  {
583  m_logger.trace( [&]{
584  return fmt::format(
586  "[ws_connection:{}] received {} bytes" ),
587  this->connection_id(),
588  length );
589  } );
590 
593  }
594  else
595  {
596  handle_read_error( "reading message header error", ec );
597  }
598  }
599 
600  //! Parse header from internal buffer.
601  void
602  consume_header_from_buffer( const char * data, std::size_t length )
603  {
605 
607 
609  {
611  }
612  else
613  {
614  assert( nparsed == length );
616  }
617  }
618 
619  //! Handle parsed header.
620  void
621  handle_parsed_header( const message_details_t & md )
622  {
623  m_logger.trace( [&]{
624  return fmt::format(
626  "[ws_connection:{}] start handling {} ({:#x})" ),
627  connection_id(),
629  static_cast<std::uint16_t>(md.m_opcode) );
630  } );
631 
632  const auto validation_result =
634 
636  {
637  m_logger.error( [&]{
638  return fmt::format(
640  "[ws_connection:{}] invalid header" ),
641  connection_id() );
642  } );
643 
645  {
647  [&]{
649  // Do not wait anything in return, because
650  // protocol is violated.
651  } );
652 
654  }
656  {
657  // Wait for close frame cannot be done.
658  close_impl();
659  }
660 
661  return;
662  }
663 
665  }
666 
667  //! Handle parsed and valid header.
668  void
669  handle_parsed_and_valid_header( const message_details_t & md )
670  {
671  const auto payload_length =
673 
675 
676  if( payload_length == 0 )
677  {
678  // Callback for message with 0-size payload.
680  }
681  else
682  {
683  const auto payload_part_size =
685 
686  std::memcpy(
688  m_input.m_buf.bytes(),
690 
692 
693  const std::size_t length_remaining =
695 
699  length_remaining ) )
700  {
701  if( 0 == length_remaining )
702  {
703  // All message is obtained.
705  }
706  else
707  {
708  // Read the rest of payload:
712  }
713  }
714  // Else payload is invalid and validate_payload_part()
715  // has handled the case so do nothing.
716  }
717  }
718 
719  //! Start reading message payload.
720  void
722  //! A pointer to the remainder of unfetched payload.
723  char * payload_data,
724  //! The size of the remainder of unfetched payload.
725  std::size_t length_remaining,
726  //! Validate payload and call handler.
727  bool do_validate_payload_and_call_msg_handler = true )
728  {
732  this->get_executor(),
733  [ this,
734  ctx = shared_from_this(),
735  payload_data,
738  // NOTE: this lambda is noexcept since v.0.6.0.
739  ( const asio_ns::error_code & ec, std::size_t length ) noexcept
740  {
741  try
742  {
744  payload_data,
746  ec,
747  length,
749  }
750  catch( const std::exception & ex )
751  {
754  [&]{
755  return fmt::format(
757  "[ws_connection:{}] after read payload "
758  "callback error: {}" ),
759  connection_id(),
760  ex.what() );
761  } );
762  }
763  } ) );
764  }
765 
766  //! Handle read operation result, when reading payload.
767  void
769  char * payload_data,
770  std::size_t length_remaining,
771  const asio_ns::error_code & ec,
772  std::size_t length,
773  bool do_validate_payload_and_call_msg_handler = true )
774  {
775  if( !ec )
776  {
777  m_logger.trace( [&]{
778  return fmt::format(
780  "[ws_connection:{}] received {} bytes" ),
781  this->connection_id(),
782  length );
783  } );
784 
786 
789 
791  {
793  {
794  if( 0 == next_length_remaining )
795  {
796  // Here: all the payload is ready.
797 
798  // All message is obtained.
800  }
801  else
802  {
803  //Here: not all payload is obtained,
804  // so inintiate read once again:
809  }
810  }
811  // Else payload is invalid and validate_payload_part()
812  // has handled the case so do nothing.
813  }
814  else
815  {
816  if( 0 == next_length_remaining )
817  {
819  }
820  else
821  {
826  }
827  }
828  }
829  else
830  {
831  handle_read_error( "reading message payload error", ec );
832  }
833  }
834 
835  //! Call user message handler with current message.
836  void
837  call_message_handler( message_handle_t close_frame )
838  {
839  if( auto wsh = m_websocket_weak_handle.lock() )
840  {
841  try
842  {
844  std::move( wsh ),
845  std::move( close_frame ) );
846  }
847  catch( const std::exception & ex )
848  {
849  m_logger.error( [&]{
850  return fmt::format(
852  "[ws_connection:{}] execute handler error: {}" ),
853  connection_id(),
854  ex.what() );
855  } );
856  }
857  }
858  }
859 
860  //! Validates a part of received payload.
861  bool
863  char * payload_data,
864  std::size_t length,
865  std::size_t next_length_remaining )
866  {
867  const auto validation_result =
869 
871  {
873 
875  {
876  // Can skip this payload because it was not a bad close frame.
877 
878  // It is the case we are expecting close frame
879  // so validator must be ready to receive more headers
880  // and payloads after this frame.
882 
883  if( 0 == next_length_remaining )
884  {
886  }
887  else
888  {
889  // Skip checking payload for this frame:
890  const bool do_validate_payload_and_call_msg_handler = false;
895  }
896  }
897  return false;
898  }
899 
900  return true;
901  }
902 
903  //! Handle payload errors.
904  void
905  handle_invalid_payload( validation_state_t validation_result )
906  {
907  m_logger.error( [&]{
908  return fmt::format(
910  "[ws_connection:{}] invalid paload" ),
911  connection_id() );
912  } );
913 
915  {
916  // A corner case: invalid payload in close frame.
917 
919  {
920  // Case: close frame was not expected.
921 
922  // This actually must be executed:
924  [&]{
926  // Do not wait anything in return, because
927  // protocol is violated.
928  } );
929 
930  // Notify user of a close but use a correct close code.
932  }
934  {
935  // Case: close frame was expected.
936 
937  // We got a close frame but it is incorrect,
938  // so just close (there is not too much we can do).
939  close_impl();
940  }
941  }
942  else
943  {
945  {
947  [&]{
950  } );
951 
953  }
954  }
955  }
956 
957  void
959  {
960  auto & md = m_input.m_parser.current_message();
961 
964  {
966  {
968  {
969  m_logger.trace( [&]{
970  return fmt::format(
972  "[ws_connection:{}] got close frame from "
973  "peer, status: {}" ),
974  connection_id(),
975  static_cast<std::uint16_t>(
977  } );
978 
981  [&]{
983  } );
984 
986  }
987 
991  md.m_opcode,
992  std::move( m_input.m_payload ) ) );
993 
996  }
997  else
998  {
1000 
1002  {
1003  // Got it!
1005 
1006  close_impl();
1007 
1008  m_logger.trace( [&]{
1009  return fmt::format(
1011  "[ws_connection:{}] expected close frame came" ),
1012  connection_id() );
1013  } );
1014  }
1015  else
1016  {
1017  // Wait for next frame.
1019  }
1020  }
1021  }
1022  else
1023  {
1025  }
1026  }
1027 
1028  void
1029  call_close_handler_if_necessary( status_code_t status )
1030  {
1032  [&]{
1035  final_frame,
1037  status_code_to_bin( status ) ) );
1038  } );
1039  }
1040 
1041  //! Implementation of writing data performed on the asio_ns::io_context.
1042  void
1043  write_data_impl( write_group_t wg, bool is_close_frame )
1044  {
1045  if( m_socket.is_open() )
1046  {
1047  if( is_close_frame )
1048  {
1049  m_logger.trace( [&]{
1050  return fmt::format(
1052  "[ws_connection:{}] user sends close frame" ),
1053  connection_id() );
1054  } );
1055 
1056  m_close_frame_to_peer.disable(); // It is formed and sent by user
1057  m_close_frame_to_user.disable(); // And user knows that websocket is closed.
1058  // No more writes.
1060 
1061  // Start waiting only close-frame.
1063  }
1064 
1065  // Push write_group to queue.
1066  m_outgoing_data.append( std::move( wg ) );
1067 
1069  }
1070  else
1071  {
1072  m_logger.warn( [&]{
1073  return fmt::format(
1075  "[ws_connection:{}] try to write while "
1076  "socket is closed" ),
1077  connection_id() );
1078  } );
1079 
1080  try
1081  {
1085  }
1086  catch( ... )
1087  {}
1088  }
1089  }
1090 
1091  //! Checks if there is something to write,
1092  //! and if so starts write operation.
1093  void
1095  {
1097  {
1098  init_write();
1099  }
1100  }
1101 
1102  //! Initiate write operation.
1103  void
1105  {
1106  // Here: not writing anything to socket, so
1107  // write operation can be initiated.
1109 
1110  if( next_write_group )
1111  {
1112  m_logger.trace( [&]{
1113  return fmt::format(
1115  "[ws_connection:{}] start next write group, "
1116  "size: {}" ),
1117  this->connection_id(),
1119  } );
1120 
1121  // Initialize write context with a new write group.
1123  std::move( next_write_group ) );
1124 
1125  // Start the loop of sending data from current write group.
1127  }
1128  }
1129 
1130  // Use aliases for shorter names.
1134 
1135  void
1137  {
1138  try
1139  {
1141 
1143  {
1145  }
1147  {
1149  }
1150  else
1151  {
1153  throw exception_t{ "sendfile write operation not implemented" };
1154  }
1155  }
1156  catch( const std::exception & ex )
1157  {
1160  [&]{
1161  return fmt::format(
1163  "[ws_connection:{}] handle_current_write_ctx failed: {}" ),
1164  connection_id(),
1165  ex.what() );
1166  } );
1167  }
1168  }
1169 
1170  void
1171  handle_trivial_write_operation( const trivial_write_operation_t & op )
1172  {
1173  // Asio buffers (param for async write):
1174  auto & bufs = op.get_trivial_bufs();
1175 
1176  m_logger.trace( [&]{
1177  return fmt::format(
1179  "[ws_connection:{}] sending data with "
1180  "buf count: {}, "
1181  "total size: {}" ),
1182  connection_id(),
1183  bufs.size(),
1184  op.size() ); } );
1185 
1187 
1188  // There is somethig to write.
1190  m_socket,
1191  bufs,
1193  this->get_executor(),
1194  [ this,
1195  ctx = shared_from_this() ]
1196  // NOTE: this lambda is noexcept since v.0.6.0.
1197  ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1198  {
1199  try
1200  {
1201  if( !ec )
1202  {
1203  m_logger.trace( [&]{
1204  return fmt::format(
1206  "[ws_connection:{}] outgoing data was "
1207  "sent: {} bytes" ),
1208  connection_id(),
1209  written );
1210  } );
1211  }
1212 
1213  after_write( ec );
1214  }
1215  catch( const std::exception & ex )
1216  {
1219  [&]{
1220  return fmt::format(
1222  "[ws_connection:{}] after write "
1223  "callback error: {}" ),
1224  connection_id(),
1225  ex.what() );
1226  } );
1227  }
1228  } ) );
1229  }
1230 
1231  //! Do post write actions for current write group.
1232  void
1234  {
1235  // Finishing writing this group.
1236  m_logger.trace( [&]{
1237  return fmt::format(
1239  "[ws_connection:{}] finishing current write group" ),
1240  this->connection_id() );
1241  } );
1242 
1243  // Group notificators are called from here (if exist):
1245 
1246  // Start another write opertion
1247  // if there is something to send.
1249  }
1250 
1251  //! Handle write response finished.
1252  void
1253  after_write( const asio_ns::error_code & ec )
1254  {
1255  if( !ec )
1256  {
1258  }
1259  else
1260  {
1263  [&]{
1264  return fmt::format(
1266  "[ws_connection:{}] unable to write: {}" ),
1267  connection_id(),
1268  ec.message() );
1269  } );
1270 
1271  try
1272  {
1274  }
1275  catch( const std::exception & ex )
1276  {
1277  m_logger.error( [&]{
1278  return fmt::format(
1280  "[ws_connection:{}] notificator error: {}" ),
1281  connection_id(),
1282  ex.what() );
1283  } );
1284  }
1285  }
1286  }
1287 
1288  //! Common paramaters of a connection.
1290 
1291  //! Connection.
1292  stream_socket_t m_socket;
1293 
1294  /*!
1295  * @brief Monitor of the connection lifetime.
1296  *
1297  * @since v.0.6.12
1298  */
1300 
1301  //! Timers.
1302  //! \{
1303  static ws_connection_t &
1304  cast_to_self( tcp_connection_ctx_base_t & base )
1305  {
1306  return static_cast< ws_connection_t & >( base );
1307  }
1308 
1309  virtual void
1310  check_timeout( tcp_connection_ctx_handle_t & self ) override
1311  {
1312  asio_ns::dispatch(
1313  this->get_executor(),
1314  [ ctx = std::move( self ) ]
1315  // NOTE: this lambda is noexcept since v.0.6.0.
1316  () noexcept
1317  {
1318  auto & conn_object = cast_to_self( *ctx );
1319  // If an exception will be thrown we can only
1320  // close the connection.
1321  try
1322  {
1324  }
1325  catch( const std::exception & x )
1326  {
1329  [&] {
1330  return fmt::format(
1332  "[connection: {}] unexpected "
1333  "error during timeout handling: {}" ),
1335  x.what() );
1336  } );
1337  }
1338  } );
1339  }
1340 
1345  timer_guard_t m_timer_guard;
1346 
1347  void
1349  {
1350  const auto now = std::chrono::steady_clock::now();
1352  {
1353  m_logger.trace( [&]{
1354  return fmt::format(
1356  "[wd_connection:{}] write operation timed out" ),
1357  connection_id() );
1358  } );
1361  close_impl();
1362  }
1364  {
1365  m_logger.trace( [&]{
1366  return fmt::format(
1368  "[wd_connection:{}] waiting for close-frame "
1369  "from peer timed out" ),
1370  connection_id() );
1371  } );
1372  close_impl();
1373  }
1374  else
1375  {
1377  }
1378  }
1379 
1380  //! schedule next timeout checking.
1381  void
1383  {
1385  }
1386 
1387  //! Start guard write operation if necessary.
1388  void
1390  {
1393  }
1394 
1395  void
1397  {
1400  }
1401  //! \}
1402 
1403  //! Input routine.
1405 
1406  //! Helper for validating protocol.
1408 
1409  //! Websocket message handler provided by user.
1410  message_handler_t m_msg_handler;
1411 
1412  //! Logger for operation
1413  logger_t & m_logger;
1414 
1415  //! Write to socket operation context.
1417 
1418  //! Output buffers queue.
1420 
1421  //! A waek handler for owning ws_t to use it when call message handler.
1423 
1424  //! Websocket output states.
1425  enum class write_state_t
1426  {
1427  //! Able to append outgoing data.
1428  write_enabled,
1429  //! No more outgoing data can be added (e.g. close-frame was sent).
1431  };
1432 
1433  //! A state of a websocket output.
1435 
1436  //! Websocket input states.
1437  enum class read_state_t
1438  {
1439  //! Reads any type of frame and serve it to user.
1441  //! Reads only close frame: skip all frames until close-frame.
1443  //! Do not read anything (before activation).
1444  read_nothing
1445  };
1446 
1447  //! A state of a websocket input.
1449 
1450  //! A helper class for running exclusive action.
1451  //! Only a first action will run.
1453  {
1454  public:
1455  template < typename Action >
1456  void
1457  run_if_first( Action && action ) noexcept(noexcept(action()))
1458  {
1459  if( m_not_executed_yet )
1460  {
1461  m_not_executed_yet = false;
1462  action();
1463  }
1464  }
1465 
1466  //! Disable ation: action will not be executed even on a first shot.
1467  void
1469  {
1470  m_not_executed_yet = false;
1471  }
1472 
1473  private:
1474  bool m_not_executed_yet{ true };
1475  };
1476 
1480 };
1481 
1482 } /* namespace impl */
1483 
1484 } /* namespace basic */
1485 
1486 } /* namespace websocket */
1487 
1488 } /* namespace restinio */
read_state_t m_read_state
A state of a websocket input.
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
Context for handling websocket connections.
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
restinio::impl::fixed_buffer_t m_buf
Input buffer.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
void close_impl() noexcept
Standard close routine.
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
void run_if_first(Action &&action) noexcept(noexcept(action()))
virtual void kill() override
Kill websocket.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
std::optional< write_group_t > pop_ready_buffers()
ws_connection_t & operator=(const ws_connection_t &)=delete
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
message_handler_t m_msg_handler
Websocket message handler provided by user.
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
ws_connection_t(const ws_connection_t &)=delete
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, std::optional< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in std::optional<Value_Type> if parameter exis...
Definition: value_or.hpp:64
void append(write_group_t wg)
Add buffers to queue.
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
void disable()
Disable ation: action will not be executed even on a first shot.
No more outgoing data can be added (e.g. close-frame was sent).
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
Reads only close frame: skip all frames until close-frame.
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder) noexcept
Trigger an error.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void call_close_handler_if_necessary(status_code_t status)
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
void start_waiting_close_frame_only()
Start waiting for close-frame.
void start_read_header()
Start the process of reading ws messages from socket.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
ws_connection_t & operator=(ws_connection_t &&)=delete
void init_next_timeout_checking()
schedule next timeout checking.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
write_state_t m_write_state
A state of a websocket output.
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
void guard_write_operation()
Start guard write operation if necessary.
void graceful_close()
Close WebSocket connection in a graceful manner.
void reset_parser_and_payload()
Prepare parser for reading new http-message.
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
A helper class for running exclusive action. Only a first action will run.