RESTinio
connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  HTTP-connection routine.
7 */
8 
9 #pragma once
10 
11 #include <restinio/asio_include.hpp>
12 
13 #include <llhttp.h>
14 
15 #include <restinio/impl/include_fmtlib.hpp>
16 
17 #include <restinio/exception.hpp>
18 #include <restinio/http_headers.hpp>
19 #include <restinio/request_handler.hpp>
20 #include <restinio/connection_count_limiter.hpp>
21 #include <restinio/impl/connection_base.hpp>
22 #include <restinio/impl/header_helpers.hpp>
23 #include <restinio/impl/response_coordinator.hpp>
24 #include <restinio/impl/connection_settings.hpp>
25 #include <restinio/impl/fixed_buffer.hpp>
26 #include <restinio/impl/write_group_output_ctx.hpp>
27 #include <restinio/impl/executor_wrapper.hpp>
28 #include <restinio/impl/sendfile_operation.hpp>
29 
30 #include <restinio/utils/impl/safe_uint_truncate.hpp>
31 #include <restinio/utils/at_scope_exit.hpp>
32 
33 namespace restinio
34 {
35 
36 namespace impl
37 {
38 
39 //
40 // http_parser_ctx_t
41 //
42 
43 //! Parsing result context for using in parser callbacks.
44 /*!
45  All data is used as temps, and is usable only
46  after parsing completes new requests then it is moved out.
47 */
49 {
50  //! Request data.
51  //! \{
54  //! \}
55 
56  //! Parser context temp values and flags.
57  //! \{
60 
61  /*!
62  * @since v.0.6.9
63  */
65 
66  /*!
67  * @since v.0.6.9
68  */
70 
71  /*!
72  * @brief Chunk extnsion's params if any.
73  *
74  * @since v.0.7.0
75  */
77 
78  /*!
79  * @brief How many bytes were parsed for current request.
80  *
81  * @since v.0.7.0
82  */
84 
85  //! \}
86 
87  //! Flag: is http message parsed completely.
88  bool m_message_complete{ false };
89 
90  /*!
91  * @brief Total number of parsed HTTP-fields.
92  *
93  * This number includes the number of leading HTTP-fields and the number
94  * of trailing HTTP-fields (in the case of chunked encoding).
95  *
96  * @since v.0.6.12
97  */
99 
100  /*!
101  * @brief Limits for the incoming message.
102  *
103  * @since v.0.6.12
104  */
106 
107  /*!
108  * @brief The main constructor.
109  *
110  * @since v.0.6.12
111  */
113  incoming_http_msg_limits_t limits )
114  : m_limits{ limits }
115  {}
116 
117  //! Prepare context to handle new request.
118  void
120  {
121  m_header = http_request_header_t{};
122  m_body.clear();
123  m_current_field_name.clear();
124  m_last_value_total_size = 0u;
126  m_bytes_parsed = 0;
127  m_message_complete = false;
128  m_total_field_count = 0u;
129  }
130 
131  //! Creates an instance of chunked_input_info if there is an info
132  //! about chunks in the body.
133  /*!
134  * @since v.0.6.9
135  */
136  [[nodiscard]]
139  {
140  chunked_input_info_unique_ptr_t result;
141 
142  if( !m_chunked_info_block.m_chunks.empty() ||
143  0u != m_chunked_info_block.m_trailing_fields.fields_count() )
144  {
145  result = std::make_unique< chunked_input_info_t >(
146  std::move( m_chunked_info_block ) );
147  }
148 
149  return result;
150  }
151 };
152 
153 //! Include parser callbacks.
154 #include "parser_callbacks.ipp"
155 
156 //
157 // create_parser_settings()
158 //
159 
160 //! Helper for setting parser settings.
161 /*!
162  Is used to initialize const value in connection_settings_t ctor.
163 */
164 template< typename Http_Methods >
165 inline llhttp_settings_t
167 {
170 
172  []( llhttp_t * parser, const char * at, size_t length ) -> int {
173  return restinio_url_cb( parser, at, length );
174  };
175 
177  []( llhttp_t * parser, const char * at, size_t length ) -> int {
179  };
180 
182  []( llhttp_t * parser ) -> int {
184  };
185 
187  []( llhttp_t * parser, const char * at, size_t length ) -> int {
189  };
190 
192  []( llhttp_t * parser ) -> int {
194  };
195 
197  []( llhttp_t * parser ) -> int {
199  };
200 
202  []( llhttp_t * parser, const char * at, size_t length ) -> int {
203  return restinio_body_cb( parser, at, length );
204  };
205 
207  []( llhttp_t * parser ) -> int {
209  };
210 
212  []( llhttp_t * parser ) -> int {
214  };
215 
217  []( llhttp_t * parser ) -> int {
219  };
220 
222  []( llhttp_t * parser, const char * at, size_t length ) -> int {
224  };
225 
227  []( llhttp_t * parser, const char * at, size_t length ) -> int {
229  };
230 
232  []( llhttp_t * parser ) -> int {
234  };
237  []( llhttp_t * parser ) -> int {
239  };
240 
241  return parser_settings;
242 }
244 //
245 // connection_upgrade_stage_t
246 //
247 
248 //! Enum for a flag specifying that connection is going to upgrade or not.
250 {
251  //! No connection request in progress
252  none,
253  //! Request with connection-upgrade header came and waits for
254  //! request handler to be called in non pipelined fashion
255  //! (it must be the only request that is handled at the moment).
257  //! Handler for request with connection-upgrade header was called
258  //! so any response data comming is for that request.
259  //! If connection transforms to websocket connection
260  //! then no further operations are expected.
262 };
263 
264 //
265 // connection_input_t
266 //
267 
268 //! Data associated with connection read routine.
270 {
272  std::size_t buffer_size,
273  incoming_http_msg_limits_t limits,
274  const llhttp_settings_t* settings )
275  : m_parser_ctx{ limits }
276  , m_buf{ buffer_size }
277  {
278  llhttp_init( &m_parser, llhttp_type_t::HTTP_REQUEST, settings );
279  m_parser.data = &m_parser_ctx;
280  }
281 
282  //! HTTP-parser.
283  //! \{
286  //! \}
287 
288  //! Input buffer.
290 
291  //! Connection upgrade request stage.
294 
295  //! Flag to track whether read operation is performed now.
297 
298  //! Prepare parser for reading new http-message.
299  void
301  {
302  // Reinit parser.
303  llhttp_reset( &m_parser);
304 
305  // Reset context and attach it to parser.
306  m_parser_ctx.reset();
307  }
308 };
309 
310 template < typename Connection, typename Start_Read_CB, typename Failed_CB >
311 void
313  asio_ns::ip::tcp::socket & ,
314  Connection & ,
315  Start_Read_CB start_read_cb,
316  Failed_CB )
317 {
318  // No preparation is needed, start
319  start_read_cb();
320 }
321 
322 // An overload for the case of non-TLS-connection.
323 inline tls_socket_t *
325  asio_ns::ip::tcp::socket & ) noexcept
326 {
327  return nullptr;
328 }
329 
330 //
331 // connection_t
332 //
333 
334 //! Context for handling http connections.
335 /*
336  Working circle consists of the following steps:
337  * wait for request -- reading from socket and parsing header and body;
338  * handling request -- once the request is completely obtained it's handling
339  is deligated to a handler chosen by handler factory;
340  * writing response -- writing response to socket;
341  * back to first step o close connection -- depending on keep-alive property
342  of the last response the connection goes back to first step or
343  shutdowns.
344 
345  Each step is controlled by timer (\see schedule_operation_timeout_callback())
346 
347  In case of errors connection closes itself.
348 */
349 template < typename Traits >
351  : public connection_base_t
352  , public executor_wrapper_t< typename Traits::strand_t >
353 {
355 
356  public:
357  using timer_manager_t = typename Traits::timer_manager_t;
358  using timer_guard_t = typename timer_manager_t::timer_guard_t;
361  using logger_t = typename Traits::logger_t;
362  using strand_t = typename Traits::strand_t;
363  using stream_socket_t = typename Traits::stream_socket_t;
364  using lifetime_monitor_t =
366 
368  //! Connection id.
369  connection_id_t conn_id,
370  //! Connection socket.
371  stream_socket_t && socket,
372  //! Settings that are common for connections.
373  connection_settings_handle_t< Traits > settings,
374  //! Remote endpoint for that connection.
375  endpoint_t remote_endpoint,
376  //! Lifetime monitor to be used for handling connection count.
377  lifetime_monitor_t lifetime_monitor )
380  , m_socket{ std::move( socket ) }
381  , m_settings{ std::move( settings ) }
383  , m_input{
387  }
391  , m_logger{ *( m_settings->m_logger ) }
393  {
394  // Notify of a new connection instance.
395  m_logger.trace( [&]{
396  return fmt::format(
398  "[connection:{}] start connection with {}" ),
399  connection_id(),
401  } );
402  }
403 
404  // Disable copy/move.
405  connection_t( const connection_t & ) = delete;
406  connection_t( connection_t && ) = delete;
407  connection_t & operator = ( const connection_t & ) = delete;
408  connection_t & operator = ( connection_t && ) = delete;
409 
411  {
413  [&]{
414  return fmt::format(
416  "[connection:{}] destructor called" ),
417  connection_id() );
418  } );
419  }
420 
421  void
423  {
425  m_socket,
426  *this,
427  [ & ]{
428  // Inform state listener if it used.
429  m_settings->call_state_listener( [this]() noexcept {
430  return connection_state::notice_t{
431  this->connection_id(),
432  this->m_remote_endpoint,
435  m_socket )
436  }
437  };
438  } );
439 
440  // Start timeout checking.
443 
444  // Start reading request.
446  },
447  [ & ]( const asio_ns::error_code & ec ){
449  return fmt::format(
451  "[connection:{}] prepare connection error: {}" ),
452  connection_id(),
453  ec.message() );
454  } );
455  } );
456  }
457 
458  //! Start reading next htttp-message.
459  void
461  {
462  m_logger.trace( [&]{
463  return fmt::format(
465  "[connection:{}] start waiting for request" ),
466  connection_id() );
467  } );
468 
469  // Prepare parser for consuming new request message.
471 
472  // Guard total time for a request to be read.
473  // guarding here makes the total read process
474  // to run in read_next_http_message_timelimit.
476 
477  if( 0 != m_input.m_buf.length() )
478  {
479  // If a pipeline requests were sent by client
480  // then the biginning (or even entire request) of it
481  // is in the buffer obtained from socket in previous
482  // read operation.
484  }
485  else
486  {
487  // Next request (if any) must be obtained from socket.
488  consume_message();
489  }
490  }
491 
492  //! Internals that are necessary for upgrade.
494  {
496  upgrade_internals_t && ) = default;
497 
499  connection_settings_handle_t< Traits > settings,
500  stream_socket_t socket,
501  lifetime_monitor_t lifetime_monitor )
502  : m_settings{ std::move(settings) }
503  , m_socket{ std::move( socket ) }
505  {}
506 
508  stream_socket_t m_socket;
510  };
511 
512  //! Move socket out of connection.
515  {
516  return upgrade_internals_t{
517  m_settings,
518  std::move(m_socket),
520  };
521  }
522 
523  private:
524  //! Start (continue) a chain of read-parse-read-... operations.
525  inline void
527  {
529  {
530  m_logger.trace( [&]{
531  return fmt::format(
533  "[connection:{}] continue reading request" ),
534  connection_id() );
535  } );
536 
537 
542  this->get_executor(),
543  [this, ctx = shared_from_this()]
544  // NOTE: this lambda is noexcept since v.0.6.0.
545  ( const asio_ns::error_code & ec,
546  std::size_t length ) noexcept {
549  } ) );
550  }
551  else
552  {
553  m_logger.trace( [&]{
554  return fmt::format(
556  "[connection:{}] skip read operation: already running" ),
557  connection_id() );
558  } );
559  }
560  }
561 
562  //! Handle read operation result.
563  inline void
564  after_read( const asio_ns::error_code & ec, std::size_t length ) noexcept
565  {
566  if( !ec )
567  {
568  // Exceptions shouldn't go out of `after_read`.
569  // So intercept them and close the connection in the case
570  // of an exception.
571  try
572  {
573  m_logger.trace( [&]{
574  return fmt::format(
576  "[connection:{}] received {} bytes" ),
577  this->connection_id(),
578  length );
579  } );
580 
582 
584  }
585  catch( const std::exception & x )
586  {
588  return fmt::format(
590  "[connection:{}] unexpected exception during the "
591  "handling of incoming data: {}" ),
592  connection_id(),
593  x.what() );
594  } );
595  }
596  }
597  else
598  {
599  // Well, if it is actually an error
600  // then close connection.
602  {
605  return fmt::format(
607  "[connection:{}] read socket error: {}; "
608  "parsed bytes: {}" ),
609  connection_id(),
610  ec.message(),
612  } );
613  else
614  {
615  // A case that is not such an error:
616  // on a connection (most probably keeped alive
617  // after previous request, but a new also applied)
618  // no bytes were consumed and remote peer closes connection.
620  [&]{
621  return fmt::format(
623  "[connection:{}] EOF and no request, "
624  "close connection" ),
625  connection_id() );
626  } );
627 
629  }
630  }
631  // else: read operation was cancelled.
632  }
633  }
634 
635  //! Parse some data.
636  void
637  consume_data( const char * data, std::size_t length )
638  {
639  auto * parser = &m_input.m_parser;
640 
641  const auto parse_err =
643 
644  const auto nparsed = [&]{
645  if( !parser->error_pos )
646  return length;
647  return static_cast< std::size_t >( parser->error_pos - data );
648  }();
649 
650  if( nparsed > length )
651  {
652  // Parser is in the unreliable state,
653  // so we done with this connection.
655  return fmt::format(
657  "[connection:{}] unexpected parser behavior: "
658  "llhttp_execute() reports parsed bytes number ({}) "
659  "is greater than the size of a buffer ({})"
660  "that was fed to the parser" ),
661  connection_id(),
662  nparsed,
663  length );
664  } );
665  return;
666  }
667 
669 
670  // If entire http-message was obtained,
671  // parser is stopped and the might be a part of consecutive request
672  // left in buffer, so we mark how many bytes were obtained.
673  // and next message read (if any) will be started from already existing
674  // data left in buffer.
676 
677  if( HPE_OK != parse_err &&
678  HPE_PAUSED != parse_err &&
680  {
681  // Parser error must be the one defined by llhttp_errno.
682  // It is possible to get a random error value
683  // (e.g. providing parser-callbacks that return
684  // unconventional errors), here we put an assert
685  // to quickly highlight the problems.
686  // As all the parser-callbacks are implemented by
687  // restinio the assert bellow MUST pass.
690 
691  // TODO: handle case when there are some request in process.
693  return fmt::format(
695  "[connection:{}] parser error {}: {}" ),
696  connection_id(),
699  } );
700 
701  // nothing to do.
702  return;
703  }
704 
706  {
708  }
709  else
710  consume_message();
711  }
712 
713  //! Handle a given request message.
714  void
716  {
717  try
718  {
719  auto & parser = m_input.m_parser;
721 
722  if( m_input.m_parser.upgrade )
723  {
724  // Start upgrade connection operation.
725 
726  // The first thing is to make sure
727  // that upgrade request will be handled in
728  // a non pipelined fashion.
731  }
732 
735  {
736  // Run ordinary HTTP logic.
738 
739  m_logger.trace( [&]{
740  return fmt::format(
742  "[connection:{}] request received (#{}): {} {}" ),
743  connection_id(),
744  request_id,
746  static_cast<llhttp_method>( parser.method ) ),
748  } );
749 
750  // TODO: mb there is a way to
751  // track if response was emmited immediately in handler
752  // or it was delegated
753  // so it is possible to omit this timer scheduling.
755 
756  const auto handling_result =
759  request_id,
766 
767  switch( handling_result )
768  {
771  // If handler refused request, say not implemented.
773  request_id,
778  break;
779 
782  {
783  // Request was accepted,
784  // didn't create immediate response that closes connection after,
785  // and it is possible to receive more requests
786  // then start consuming yet another request.
788  }
789  break;
790  }
791  }
792  else
793  {
794  m_logger.trace( [&]{
795  const std::string default_value{};
796 
797  return fmt::format(
799  "[connection:{}] upgrade request received: {} {}; "
800  "Upgrade: '{}';" ),
801  connection_id(),
803  static_cast<llhttp_method>( parser.method ) ),
807  } );
808 
810  {
811  // There are no requests in handling
812  // So the current request with upgrade
813  // is the only one and can be handled directly.
814  // It is safe to call a handler for it.
816  }
817  else
818  {
819  // There are pipelined request
820  m_logger.trace( [&]{
821  return fmt::format(
823  "[connection:{}] upgrade request happened to "
824  "be a pipelined one, "
825  "and will be handled after previous requests "
826  "are handled" ),
827  connection_id() );
828  } );
829  }
830 
831  // No further actions (like continue reading) in both cases are needed.
832  }
833 
834  }
835  catch( const std::exception & ex )
836  {
838  return fmt::format(
840  "[connection:{}] error while handling request: {}" ),
841  this->connection_id(),
842  ex.what() );
843  } );
844  }
845  }
846 
847  //! Calls handler for upgrade request.
848  /*!
849  Request data must be in input context (m_input).
850  */
851  void
853  {
854  auto & parser = m_input.m_parser;
856 
857  // If user responses with error
858  // then connection must be able to send
859  // (hence to receive) response.
860 
862 
863  m_logger.info( [&]{
864  return fmt::format(
866  "[connection:{}] handle upgrade request (#{}): {} {}" ),
867  connection_id(),
868  request_id,
870  static_cast<llhttp_method>( parser.method ) ),
872  } );
873 
874  // Do not guard upgrade request.
876 
877  // After calling handler we expect the results or
878  // no further operations with connection
881 
882  const auto handling_result = m_request_handler(
884  request_id,
891  switch( handling_result )
892  {
895  if( m_socket.is_open() )
896  {
897  // Request is rejected, so our socket
898  // must not be moved out to websocket connection.
899 
900  // If handler refused request, say not implemented.
902  request_id,
907  }
908  else
909  {
910  // Request is rejected, but the socket
911  // was moved out to somewhere else???
912 
913  m_logger.error( [&]{
914  return fmt::format(
916  "[connection:{}] upgrade request handler rejects "
917  "request, but socket was moved out from connection" ),
918  connection_id() );
919  } );
920  }
921  break;
922 
924  /* nothing to do */
925  break;
926  }
927 
928  // Else 2 cases:
929  // 1. request is handled asynchronously, so
930  // what happens next depends on handling.
931  // 2. handling was immediate, so further destiny
932  // of a connection was already determined.
933  //
934  // In both cases: here do nothing.
935  // We can't even do read-only access because
936  // upgrade handling might take place
937  // in distinct execution context.
938  // So no even a log messages here.
939  }
940 
941  //! Write parts for specified request.
942  virtual void
944  //! Request id.
945  request_id_t request_id,
946  //! Resp output flag.
947  response_output_flags_t response_output_flags,
948  //! Part of the response data.
949  write_group_t wg ) override
950  {
951  //! Run write message on io_context loop if possible.
952  asio_ns::dispatch(
953  this->get_executor(),
954  [ this,
955  request_id,
957  actual_wg = std::move( wg ),
958  ctx = shared_from_this() ]
959  // NOTE that this lambda is noexcept since v.0.6.0.
960  () mutable noexcept
961  {
962  try
963  {
965  request_id,
967  std::move( actual_wg ) );
968  }
969  catch( const std::exception & ex )
970  {
972  return fmt::format(
974  "[connection:{}] unable to handle response: {}" ),
975  connection_id(),
976  ex.what() );
977  } );
978  }
979  } );
980  }
981 
982  //! Write parts for specified request.
983  void
985  //! Request id.
986  request_id_t request_id,
987  //! Resp output flag.
988  response_output_flags_t response_output_flags,
989  //! Part of the response data.
990  write_group_t wg )
991  {
993  try
994  {
998  }
999  catch( const std::exception & ex )
1000  {
1001  m_logger.error( [&]{
1002  return fmt::format(
1004  "[connection:{}] notificator error: {}" ),
1005  connection_id(),
1006  ex.what() );
1007  } );
1008  }
1009  };
1010 
1011  if( m_socket.is_open() )
1012  {
1016  {
1017  // It is response for a connection-upgrade request.
1018  // If we receive it here then it is constructed via
1019  // message builder and so connection was not transformed
1020  // to websocket connection.
1021  // So it is necessary to resume pipeline logic that was stopped
1022  // for upgrade-request to be handled as the only request
1023  // on the connection for that moment.
1025  {
1027  }
1028  }
1029 
1031  {
1032  m_logger.trace( [&]{
1033  return fmt::format(
1035  "[connection:{}] append response (#{}), "
1036  "flags: {}, write group size: {}" ),
1037  connection_id(),
1038  request_id,
1040  wg.items_count() );
1041  } );
1042 
1044  request_id,
1046  std::move( wg ) );
1047 
1049  }
1050  else
1051  {
1052  m_logger.warn( [&]{
1053  return fmt::format(
1055  "[connection:{}] receive response parts for "
1056  "request (#{}), but response with connection-close "
1057  "attribute happened before" ),
1058  connection_id(),
1059  request_id );
1060  } );
1062  }
1063  }
1064  else
1065  {
1066  m_logger.warn( [&]{
1067  return fmt::format(
1069  "[connection:{}] try to write response, "
1070  "while socket is closed" ),
1071  connection_id() );
1072  } );
1074  }
1075  }
1076 
1077  // Check if there is something to write,
1078  // and if so starts write operation.
1079  void
1081  {
1083 
1085  {
1086  init_write();
1087  }
1088  }
1089 
1090  //! Initiate write operation.
1091  void
1093  {
1094  // Here: not writing anything to socket, so
1095  // write operation can be initiated.
1096 
1097  // Remember if all response cells were busy.
1100 
1102 
1103  if( next_write_group )
1104  {
1105  m_logger.trace( [&]{
1106  return fmt::format(
1108  "[connection:{}] start next write group for response (#{}), "
1109  "size: {}" ),
1110  this->connection_id(),
1113  } );
1114 
1115  // Check if all response cells busy:
1116  const bool response_coordinator_full_after =
1118 
1119  // Whether we need to resume read after this group is written?
1123 
1125  {
1126  // We need to extract status line out of the first buffer
1127  assert(
1130 
1131  m_logger.trace( [&]{
1132  // Get status line:
1133  const string_view_t status_line{
1134  asio_ns::buffer_cast< const char * >(
1137 
1138  return fmt::format(
1140  "[connection:{}] start response (#{}): {}" ),
1141  this->connection_id(),
1144  } );
1145  }
1146 
1147  // Initialize write context with a new write group.
1149  std::move( next_write_group->first ) );
1150 
1151  // Start the loop of sending data from current write group.
1153  }
1154  else
1155  {
1157  }
1158  }
1159 
1160  // Use aliases for shorter names.
1164 
1165  //! Start/continue/continue handling output data of current write group.
1166  /*!
1167  This function is a starting point of a loop process of sending data
1168  from a given write group.
1169  It extracts the next bunch of trivial buffers or a
1170  sendfile-runner and starts an appropriate write operation.
1171  In data of a given write group finishes,
1172  finish_handling_current_write_ctx() is invoked thus breaking the loop.
1173 
1174  @note
1175  Since v.0.6.0 this method is noexcept.
1176  */
1177  void
1179  {
1180  try
1181  {
1183 
1185  {
1187  }
1189  {
1191  }
1192  else
1193  {
1196  }
1197  }
1198  catch( const std::exception & ex )
1199  {
1201  return fmt::format(
1203  "[connection:{}] handle_current_write_ctx failed: {}" ),
1204  connection_id(),
1205  ex.what() );
1206  } );
1207  }
1208  }
1209 
1210  //! Run trivial buffers write operation.
1211  void
1212  handle_trivial_write_operation( const trivial_write_operation_t & op )
1213  {
1214  // Asio buffers (param for async write):
1215  auto & bufs = op.get_trivial_bufs();
1216 
1218  {
1219  m_logger.trace( [&]{
1220  return fmt::format(
1222  "[connection:{}] sending resp data with "
1223  "connection-close attribute "
1224  "buf count: {}, "
1225  "total size: {}" ),
1226  connection_id(),
1227  bufs.size(),
1228  op.size() );
1229  } );
1230 
1231  // Reading new requests is useless.
1234  }
1235  else
1236  {
1237  m_logger.trace( [&]{
1238  return fmt::format(
1240  "[connection:{}] sending resp data, "
1241  "buf count: {}, "
1242  "total size: {}" ),
1243  connection_id(),
1244  bufs.size(),
1245  op.size() ); } );
1246  }
1247 
1248  // There is somethig to write.
1250  m_socket,
1251  bufs,
1253  this->get_executor(),
1254  [this, ctx = shared_from_this()]
1255  // NOTE: since v.0.6.0 this lambda is noexcept.
1256  ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1257  {
1258  if( !ec )
1259  {
1261  [&]{
1262  return fmt::format(
1264  "[connection:{}] outgoing data was "
1265  "sent: {} bytes" ),
1266  connection_id(),
1267  written );
1268  } );
1269  }
1270 
1272  } ) );
1273 
1275  }
1276 
1277  //! Run sendfile write operation.
1278  void
1279  handle_file_write_operation( file_write_operation_t & op )
1280  {
1282  {
1283  m_logger.trace( [&]{
1284  return fmt::format(
1286  "[connection:{}] sending resp file data with "
1287  "connection-close attribute, "
1288  "total size: {}" ),
1289  connection_id(),
1290  op.size() );
1291  } );
1292 
1293  // Reading new requests is useless.
1296  }
1297  else
1298  {
1299  m_logger.trace( [&]{
1300  return fmt::format(
1302  "[connection:{}] sending resp file data, total size: {}" ),
1303  connection_id(),
1304  op.size() );
1305  } );
1306  }
1307 
1309 
1310  auto op_ctx = op;
1311 
1313  this->get_executor(),
1314  m_socket,
1316  this->get_executor(),
1317  [this, ctx = shared_from_this(),
1318  // Store operation context till the end
1319  op_ctx ]
1320  // NOTE: since v.0.6.0 this lambda is noexcept
1321  (const asio_ns::error_code & ec, file_size_t written ) mutable noexcept
1322  {
1323  // NOTE: op_ctx should be reset just before return from
1324  // that lambda. We can't call reset() until the end of
1325  // the lambda because lambda object itself will be
1326  // destroyed.
1328  [&op_ctx] {
1329  // Reset sendfile operation context.
1331  } );
1332 
1333  if( !ec )
1334  {
1336  [&]{
1337  return fmt::format(
1339  "[connection:{}] file data was sent: "
1340  "{} bytes" ),
1341  connection_id(),
1342  written );
1343  } );
1344  }
1345  else
1346  {
1348  [&]{
1349  return fmt::format(
1351  "[connection:{}] send file data error: "
1352  "{} ({}) bytes" ),
1353  connection_id(),
1354  ec.value(),
1355  ec.message() );
1356  } );
1357  }
1358 
1360  } ) );
1361  }
1362 
1363  //! Do post write actions for current write group.
1364  void
1366  {
1367  // Finishing writing this group.
1368  m_logger.trace( [&]{
1369  return fmt::format(
1371  "[connection:{}] finishing current write group" ),
1372  this->connection_id() );
1373  } );
1374 
1375  // Group notificators are called from here (if exist):
1377 
1379  {
1380  m_logger.trace( [&]{
1381  return fmt::format(
1383  "[connection:{}] should keep alive" ),
1384  this->connection_id() );
1385  } );
1386 
1389  {
1390  // Run ordinary HTTP logic.
1392  {
1394  }
1395 
1396  // Start another write opertion
1397  // if there is something to send.
1399  }
1400  else
1401  {
1403  {
1404  // Here upgrade req is the only request
1405  // to be handled by this connection.
1406  // So it is safe to call a handler for it.
1408  }
1409  else
1410  {
1411  // Do not start reading in any case,
1412  // but if there is at least one request preceding
1413  // upgrade-req, logic must continue http interaction.
1415  }
1416  }
1417  }
1418  else
1419  {
1420  // No keep-alive, close connection.
1421  close();
1422  }
1423  }
1424 
1425  void
1427  {
1429  {
1430  // Bufs empty but there happened to
1431  // be a response context marked as complete
1432  // (final_parts) and having connection-close attr.
1433  // It is because `init_write_if_necessary()`
1434  // is called only under `!m_response_coordinator.closed()`
1435  // condition, so if no bufs were obtained
1436  // and response coordinator is closed means
1437  // that a first response stored by
1438  // response coordinator was marked as complete
1439  // without data.
1440 
1441  m_logger.trace( [&]{
1442  return fmt::format(
1444  "[connection:{}] last sent response was marked "
1445  "as complete" ),
1446  connection_id() ); } );
1447  close();
1448  }
1449  else
1450  {
1451  // Not writing anything, so need to deal with timouts.
1453  {
1454  // No requests in processing.
1455  // So set read next request timeout.
1457  }
1458  else
1459  {
1460  // Have requests in process.
1461  // So take control over request handling.
1463  }
1464  }
1465  }
1466 
1467  //! Handle write response finished.
1468  /*!
1469  * @note
1470  * Since v.0.6.0 this method is noexcept.
1471  */
1472  void
1473  after_write( const asio_ns::error_code & ec ) noexcept
1474  {
1475  if( !ec )
1476  {
1478  }
1479  else
1480  {
1481  if( !error_is_operation_aborted( ec ) )
1482  {
1484  return fmt::format(
1486  "[connection:{}] unable to write: {}" ),
1487  connection_id(),
1488  ec.message() );
1489  } );
1490  }
1491  // else: Operation aborted only in case of close was called.
1492 
1493  try
1494  {
1496  }
1497  catch( const std::exception & ex )
1498  {
1500  [&]{
1501  return fmt::format(
1503  "[connection:{}] notificator error: {}" ),
1504  connection_id(),
1505  ex.what() );
1506  } );
1507  }
1508  }
1509  }
1510 
1511  //! Close connection functions.
1512  //! \{
1513 
1514  //! Standard close routine.
1515  void
1516  close() noexcept
1517  {
1519  [&]{
1520  return fmt::format(
1521  RESTINIO_FMT_FORMAT_STRING( "[connection:{}] close" ),
1522  connection_id() );
1523  } );
1524 
1525  // shutdown() and close() should be called regardless of
1526  // possible exceptions.
1528  m_logger,
1529  "connection.socket.shutdown",
1530  [this] {
1534  ignored_ec );
1535  } );
1537  m_logger,
1538  "connection.socket.close",
1539  [this] {
1540  m_socket.close();
1541  } );
1542 
1544  [&]{
1545  return fmt::format(
1547  "[connection:{}] close: close socket" ),
1548  connection_id() );
1549  } );
1550 
1551  // Clear stuff.
1553 
1555  [&]{
1556  return fmt::format(
1558  "[connection:{}] close: timer canceled" ),
1559  connection_id() );
1560  } );
1561 
1563 
1565  [&]{
1566  return fmt::format(
1568  "[connection:{}] close: reset responses data" ),
1569  connection_id() );
1570  } );
1571 
1572  // Inform state listener if it used.
1574  [this]() noexcept {
1575  return connection_state::notice_t{
1576  this->connection_id(),
1577  this->m_remote_endpoint,
1579  };
1580  } );
1581  }
1582 
1583  //! Trigger an error.
1584  /*!
1585  Closes the connection and write to log
1586  an error message.
1587  */
1588  template< typename Message_Builder >
1589  void
1590  trigger_error_and_close( Message_Builder msg_builder ) noexcept
1591  {
1592  // An exception from logger/msg_builder shouldn't prevent
1593  // a call to close().
1596 
1598  }
1599  //! \}
1600 
1601  //! Connection.
1602  stream_socket_t m_socket;
1603 
1604  //! Common paramaters of a connection.
1606 
1607  //! Remote endpoint for this connection.
1609 
1610  //! Input routine.
1612 
1613  //! Write to socket operation context.
1615 
1616  // Memo flag: whether we need to resume read after this group is written
1618 
1619  //! Response coordinator.
1621 
1622  //! Timer to controll operations.
1623  //! \{
1624 
1625  //! Check timeouts for all activities.
1626  static connection_t &
1627  cast_to_self( tcp_connection_ctx_base_t & base )
1628  {
1629  return static_cast< connection_t & >( base );
1630  }
1631 
1632  //! Schedules real timedout operations check on
1633  //! the executer of a connection.
1634  virtual void
1635  check_timeout( tcp_connection_ctx_handle_t & self ) override
1636  {
1637  asio_ns::dispatch(
1638  this->get_executor(),
1639  [ ctx = std::move( self ) ]
1640  // NOTE: this lambda is noexcept since v.0.6.0.
1641  () noexcept {
1642  auto & conn_object = cast_to_self( *ctx );
1643  // If an exception will be thrown we can only
1644  // close the connection.
1645  try
1646  {
1648  }
1649  catch( const std::exception & x )
1650  {
1652  return fmt::format(
1654  "[connection: {}] unexpected "
1655  "error during timeout handling: {}" ),
1657  x.what() );
1658  } );
1659  }
1660  } );
1661  }
1662 
1663  //! Callback type for timedout operations.
1664  using timout_cb_t = void (connection_t::* )( void );
1665 
1666  //! Callback to all if timeout happened.
1667  timout_cb_t m_current_timeout_cb{ nullptr };
1668 
1669  //! Timeout point of a current guarded operation.
1671  //! Timer guard.
1672  timer_guard_t m_timer_guard;
1673  //! A prepared weak handle for passing it to timer guard.
1675 
1676  //! Check timed out operation.
1677  void
1679  {
1681  {
1682  if( m_current_timeout_cb )
1683  (this->*m_current_timeout_cb)();
1684  }
1685  else
1686  {
1688  }
1689  }
1690 
1691  //! Schedule next timeout checking.
1692  void
1694  {
1696  }
1697 
1698  //! Stop timout guarding.
1699  void
1701  {
1702  m_current_timeout_cb = nullptr;
1704  }
1705 
1706  //! Helper function to work with timer guard.
1707  void
1709  std::chrono::steady_clock::time_point timeout_after,
1710  timout_cb_t timout_cb )
1711  {
1714  }
1715 
1716  void
1718  std::chrono::steady_clock::duration timeout,
1719  timout_cb_t timout_cb )
1720  {
1723  timout_cb );
1724  }
1725 
1726  void
1727  handle_xxx_timeout( const char * operation_name )
1728  {
1729  m_logger.trace( [&]{
1730  return fmt::format(
1731  RESTINIO_FMT_FORMAT_STRING( "[connection:{}] {} timed out" ),
1732  connection_id(),
1733  operation_name );
1734  } );
1735 
1736  close();
1737  }
1738 
1739  void
1741  {
1742  handle_xxx_timeout( "wait for request" );
1743  }
1744 
1745  //! Statr guard read operation if necessary.
1746  void
1748  {
1750  {
1754  }
1755  }
1756 
1757  void
1759  {
1760  handle_xxx_timeout( "handle request" );
1761  }
1762 
1763  //! Start guard request handling operation if necessary.
1764  void
1766  {
1768  {
1772  }
1773  }
1774 
1775  void
1777  {
1778  handle_xxx_timeout( "writing response" );
1779  }
1780 
1781  //! Start guard write operation if necessary.
1782  void
1784  {
1788  }
1789 
1790  void
1792  {
1793  handle_xxx_timeout( "writing response (sendfile)" );
1794  }
1795 
1796  void
1797  guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1798  {
1799  if( std::chrono::steady_clock::duration::zero() == timelimit )
1801 
1803  timelimit,
1805  }
1806  //! \}
1807 
1808  //! Request handler.
1810 
1811  //! Logger for operation
1812  logger_t & m_logger;
1813 
1814  /*!
1815  * @brief Monitor of the connection lifetime.
1816  *
1817  * It's required for controlling the count of active parallel
1818  * connections.
1819  *
1820  * @since v.0.6.12
1821  */
1823 };
1824 
1825 //
1826 // connection_factory_t
1827 //
1828 
1829 //! Factory for connections.
1830 template < typename Traits >
1832 {
1833  public:
1834  using logger_t = typename Traits::logger_t;
1835  using stream_socket_t = typename Traits::stream_socket_t;
1836  using lifetime_monitor_t =
1838 
1840  connection_settings_handle_t< Traits > connection_settings,
1841  std::unique_ptr< socket_options_setter_t > socket_options_setter )
1845  {}
1846 
1847  // NOTE: since v.0.6.3 it returns non-empty
1848  // shared_ptr<connection_t<Traits>> or an exception is thrown in
1849  // the case of an error.
1850  // NOTE: since v.0.6.12 it accepts yet another parameter: lifetime_monitor.
1851  auto
1853  stream_socket_t socket,
1854  endpoint_t remote_endpoint,
1855  lifetime_monitor_t lifetime_monitor )
1856  {
1857  using connection_type_t = connection_t< Traits >;
1858 
1859  {
1860  socket_options_t options{ socket.lowest_layer() };
1861  (*m_socket_options_setter)( options );
1862  }
1863 
1864  return std::make_shared< connection_type_t >(
1865  m_connection_id_counter++,
1866  std::move( socket ),
1867  m_connection_settings,
1868  std::move( remote_endpoint ),
1869  std::move( lifetime_monitor ) );
1870  }
1871 
1872  private:
1874 
1876 
1878 
1879  logger_t & m_logger;
1880 };
1881 
1882 } /* namespace impl */
1883 
1884 } /* namespace restinio */
http_request_header_t m_header
Request data.
Definition: connection.hpp:52
void check_timeout_impl()
Check timed out operation.
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
Definition: connection.hpp:514
chunk_ext_params_unique_ptr_t m_chunk_ext_params
Chunk extnsion&#39;s params if any.
Definition: connection.hpp:76
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
std::size_t m_total_field_count
Total number of parsed HTTP-fields.
Definition: connection.hpp:98
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
Definition: connection.hpp:526
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
std::string m_current_field_name
Parser context temp values and flags.
Definition: connection.hpp:58
response_coordinator_t m_response_coordinator
Response coordinator.
bool m_message_complete
Flag: is http message parsed completely.
Definition: connection.hpp:88
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
void after_write(const asio_ns::error_code &ec) noexcept
Handle write response finished.
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
tls_socket_t * make_tls_socket_pointer_for_state_listener(tls_socket_t &socket) noexcept
Definition: tls.hpp:325
upgrade_internals_t(upgrade_internals_t &&)=default
connection_t & operator=(connection_t &&)=delete
void handle_current_write_ctx() noexcept
Start/continue/continue handling output data of current write group.
connection_t & operator=(const connection_t &)=delete
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
logger_t & m_logger
Logger for operation.
connection_settings_handle_t< Traits > m_connection_settings
stream_socket_t m_socket
Connection.
void cancel_timeout_checking() noexcept
Stop timout guarding.
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:367
void finish_handling_current_write_ctx()
Do post write actions for current write group.
timer_guard_t m_timer_guard
Timer guard.
connection_input_t(std::size_t buffer_size, incoming_http_msg_limits_t limits, const llhttp_settings_t *settings)
Definition: connection.hpp:271
void consume_data(const char *data, std::size_t length)
Parse some data.
Definition: connection.hpp:637
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, std::optional< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in std::optional<Value_Type> if parameter exis...
Definition: value_or.hpp:64
chunked_input_info_block_t m_chunked_info_block
Definition: connection.hpp:69
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
Definition: connection.hpp:943
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
Definition: connection.hpp:312
void guard_write_operation()
Start guard write operation if necessary.
void close() noexcept
Close connection functions.
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Definition: connection.hpp:292
connection_input_t m_input
Input routine.
void reset()
Prepare context to handle new request.
Definition: connection.hpp:119
void handle_xxx_timeout(const char *operation_name)
void after_read(const asio_ns::error_code &ec, std::size_t length) noexcept
Handle read operation result.
Definition: connection.hpp:564
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
llhttp_settings_t create_parser_settings() noexcept
Helper for setting parser settings.
Definition: connection.hpp:166
connection_settings_handle_t< Traits > m_settings
Definition: connection.hpp:507
Handler for request with connection-upgrade header was called so any response data comming is for tha...
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
chunked_input_info_unique_ptr_t make_chunked_input_info_if_necessary()
Creates an instance of chunked_input_info if there is an info about chunks in the body...
Definition: connection.hpp:138
connection_t(const connection_t &)=delete
void reset_parser()
Prepare parser for reading new http-message.
Definition: connection.hpp:300
std::size_t m_bytes_parsed
How many bytes were parsed for current request.
Definition: connection.hpp:83
http_parser_ctx_t(incoming_http_msg_limits_t limits)
The main constructor.
Definition: connection.hpp:112
void on_request_message_complete()
Handle a given request message.
Definition: connection.hpp:715
Internals that are necessary for upgrade.
Definition: connection.hpp:493
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
connection_t(connection_t &&)=delete
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
Definition: connection.hpp:984
No connection request in progress.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
fixed_buffer_t m_buf
Input buffer.
Definition: connection.hpp:289
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
Definition: connection.hpp:296
void init_next_timeout_checking()
Schedule next timeout checking.
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:498
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
void trigger_error_and_close(Message_Builder msg_builder) noexcept
Trigger an error.
request_handler_t & m_request_handler
Request handler.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
Parsing result context for using in parser callbacks.
Definition: connection.hpp:48
void handle_upgrade_request()
Calls handler for upgrade request.
Definition: connection.hpp:852
const incoming_http_msg_limits_t m_limits
Limits for the incoming message.
Definition: connection.hpp:105
llhttp_t m_parser
HTTP-parser.
Definition: connection.hpp:284
void init_write()
Initiate write operation.
Data associated with connection read routine.
Definition: connection.hpp:269
void wait_for_http_message()
Start reading next htttp-message.
Definition: connection.hpp:460
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
Context for handling http connections.
Definition: connection.hpp:350
Factory for connections.
void guard_read_operation()
Statr guard read operation if necessary.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
Definition: connection.hpp:249