SObjectizer-5 Extra
inflight_limit.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of proxy mbox with inflight limit.
4  *
5  * \since v.1.5.2
6  */
7 
8 #pragma once
9 
10 #include <so_5_extra/mboxes/proxy.hpp>
11 
12 #include <so_5_extra/error_ranges.hpp>
13 
14 #include <so_5_extra/enveloped_msg/just_envelope.hpp>
15 
16 #include <so_5/impl/msg_tracing_helpers.hpp>
17 
18 #include <so_5/environment.hpp>
19 
20 #include <atomic>
21 
22 namespace so_5 {
23 
25 
26 /*!
27  * \brief Type to be used for limit and counter of inflight messages.
28  *
29  * \since v.1.5.2
30  */
31 using underlying_counter_t = unsigned int;
32 
33 } /* namespace extra::mboxes::inflight_limit */
34 
35 namespace impl::msg_tracing_helpers::details {
36 
37 // Special extension for inflight_limit specific data.
39 
40 struct limit_info
41  {
42  so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit;
43  so_5::extra::mboxes::inflight_limit::underlying_counter_t m_current_number;
44  };
45 
46 inline void
48  {
49  s << "[inflight_limit=" << info.m_limit << ",inflight_current="
50  << info.m_current_number << "]";
51  }
52 
53 inline void
55  actual_trace_data_t & /*d*/,
57  {
58  // Nothing to do.
59  }
60 
61 } /* namespace extra_inflight_limit_specifics */
62 
63 } /* namespace impl::msg_tracing_helpers::details */
64 
65 namespace extra {
66 
67 namespace mboxes {
68 
69 namespace inflight_limit {
70 
71 namespace errors {
72 
73 /*!
74  * \brief An attempt to use a message type that differs from mbox's message
75  * type.
76  *
77  * Type of message to be used with inflight_limit_mbox
78  * is fixed as part of inflight_limit_mbox type.
79  * An attempt to use different message type (for subscription, delivery or
80  * setting delivery filter) will lead to an exception with this error code.
81  *
82  * \since v.1.5.2
83  */
86 
87 /*!
88  * \brief Null pointer to underlying mbox.
89  *
90  * A inflight_limit_mbox uses underlying mbox and delegates all actions to that mbox.
91  * Because of that underlying mbox can't be nullptr.
92  *
93  * \since v.1.5.2
94  */
97 
98 } /* namespace impl */
99 
100 namespace impl {
101 
102 /*!
103  * \brief Separate type for holding inflight message counter as a separate object.
104  *
105  * It's expected that an instance of instances_counter_t will be created in
106  * dynamic memory and shared between entities via shared_ptr.
107  *
108  * \since v.1.5.2
109  */
111  {
112  //! Counter of inflight instances.
114  };
115 
116 /*!
117  * \brief An alias for shared_ptr to instances_counter.
118  *
119  * \since v.1.5.2
120  */
122 
123 /*!
124  * \brief Helper class for incrementing/decrementing number of messages in
125  * RAII style.
126  *
127  * An instance always increments the counter in the constructor. The result
128  * value is stored inside counter_incrementer_t instance. That value is available
129  * via value() method.
130  *
131  * The destructor decrements the counter if there weren't a call to
132  * do_not_decrement_in_destructor().
133  *
134  * The intended usage scenario is:
135  *
136  * - create an instance of counter_incrementer_t;
137  * - check the counter via value() method;
138  * - if limit wasn't exceeded then create an appropriate envelope for a message and
139  * call do_not_decrement_in_destructor(). In such a case the envelope will
140  * decrement the number of inflight messages;
141  * - if limit was exceeded then just stop the operation and the destructor of
142  * counter_incrementer_t will decrement number of messages automatically.
143  *
144  * \since v.1.5.2
145  */
147  {
149  const underlying_counter_t m_value;
150 
152 
153  public:
155  outliving_reference_t< instances_counter_t > counter ) noexcept
156  : m_counter{ counter.get() }
157  , m_value{ ++(m_counter.m_instances) }
158  {}
159 
161  {
162  if( m_should_decrement_in_destructor )
163  (m_counter.m_instances)--;
164  }
165 
166  void
168  {
170  }
171 
172  [[nodiscard]]
173  underlying_counter_t
174  value() const noexcept
175  {
176  return m_value;
177  }
178  };
179 
180 /*!
181  * \brief Type of envelope to be used by inflight_limit_mbox.
182  *
183  * \attention
184  * The envelope expects that the number of messages is already incremented before
185  * the creation of the envelope. That number is always decremented in the
186  * destructor.
187  *
188  * \since v.1.5.2
189  */
190 class special_envelope_t final : public so_5::extra::enveloped_msg::just_envelope_t
191  {
192  using base_type_t = so_5::extra::enveloped_msg::just_envelope_t;
193 
195 
196  public:
197  //! Initializing constructor.
199  message_ref_t payload,
200  instances_counter_shptr_t counter )
201  : base_type_t{ std::move(payload) }
202  , m_counter{ std::move(counter) }
203  {}
204 
206  {
207  // Counter should always be decremented because it was
208  // incremented before the creation of envelope instance.
209  (m_counter->m_instances)--;
210  }
211  };
212 
213 /*!
214  * \brief Helper type that tells that underlying mbox isn't nullptr.
215  *
216  * \since v.1.5.2
217  */
219  {
222 
224 
225  not_null_underlying_mbox_t( so_5::mbox_t value )
226  : m_value{ std::move(value) }
227  {}
228 
229  public:
230  [[nodiscard]]
231  const so_5::mbox_t &
232  value() const noexcept { return m_value; }
233  };
234 
235 //! Ensure that underlying mbox is not nullptr.
236 /*!
237  * \throw so_5::exception_t if \a mbox is nullptr.
238  */
239 [[nodiscard]]
242  so_5::mbox_t mbox )
243  {
244  if( !mbox )
245  SO_5_THROW_EXCEPTION(
246  errors::rc_nullptr_as_underlying_mbox,
247  "nullptr is used as underlying mbox" );
248 
249  return { std::move(mbox) };
250  }
251 
252 /*!
253  * \brief Actual implementation of inflight_limit_mbox.
254  *
255  * \tparam Tracing_Base base class with implementation of message
256  * delivery tracing methods.
257  *
258  * \since v.1.5.2
259  */
260 template< typename Tracing_Base >
262  : public so_5::abstract_message_box_t
263  , private Tracing_Base
264  {
265  //! Actual underlying mbox to be used for all calls.
266  /*!
267  * \attention Should not be nullptr.
268  */
270 
271  //! Type of a message for that mbox is created.
273 
274  //! The limit of inflight messages.
275  const underlying_counter_t m_limit;
276 
277  //! Counter for inflight instances.
279 
280  public:
281  /*!
282  * \brief Initializing constructor.
283  *
284  * \tparam Tracing_Args parameters for Tracing_Base constructor
285  * (can be empty list if Tracing_Base have only the default constructor).
286  */
287  template< typename... Tracing_Args >
289  //! Destination mbox.
290  const not_null_underlying_mbox_t & dest_mbox,
291  //! Type of a message for that mbox.
292  std::type_index msg_type,
293  //! The limit of inflight messages.
294  underlying_counter_t limit,
295  //! Optional parameters for Tracing_Base's constructor.
296  Tracing_Args &&... args )
299  , m_msg_type{ std::move(msg_type) }
300  , m_limit{ limit }
302  {}
303 
304  mbox_id_t
305  id() const override
306  {
307  return m_underlying_mbox->id();
308  }
309 
310  void
312  const std::type_index & msg_type,
313  const ::so_5::message_limit::control_block_t * limit,
314  ::so_5::agent_t & subscriber ) override
315  {
317  msg_type,
318  "an attempt to subscribe with different message type" );
319 
322  }
323 
324  void
326  const std::type_index & msg_type,
327  ::so_5::agent_t & subscriber ) override
328  {
330  msg_type,
331  "an attempt to drop subscription for different message type" );
332 
334  msg_type, subscriber );
335  }
336 
337  std::string
338  query_name() const override
339  {
340  return m_underlying_mbox->query_name();
341  }
342 
344  type() const override
345  {
346  return m_underlying_mbox->type();
347  }
348 
349  void
351  const std::type_index & msg_type,
352  const ::so_5::message_ref_t & message,
353  unsigned int overlimit_reaction_deep ) override
354  {
356  msg_type,
357  "an attempt to deliver message of a different message type" );
358 
360  *this, // as Tracing_base
361  *this, // as abstract_message_box_t
362  "deliver_message",
364 
365  // Step 1: increment the counter and check that the limit
366  // isn't exceeded yet.
369  };
370  if( incrementer.value() <= m_limit )
371  {
372  // NOTE: if there will be an exception then the number
373  // of instance will be decremented by incrementer.
376  std::move(message),
378  };
379 
380  // incrementer shouldn't control the number of instances
381  // anymore.
383 
384  // Our envelope object has to be sent.
386  msg_type,
387  our_envelope,
389  }
390  else
391  {
392  using namespace so_5::impl::msg_tracing_helpers::details::
394 
396  "too_many_inflight_messages",
398  }
399  }
400 
401  void
403  const std::type_index & msg_type,
404  const ::so_5::delivery_filter_t & filter,
405  ::so_5::agent_t & subscriber ) override
406  {
408  msg_type,
409  "an attempt to set delivery_filter for different "
410  "message type" );
411 
413  msg_type,
414  filter,
415  subscriber );
416  }
417 
418  void
420  const std::type_index & msg_type,
421  ::so_5::agent_t & subscriber ) noexcept override
422  {
423  // Because drop_delivery_filter is noexcept we just ignore
424  // an errornous call with a different message type.
425  if( msg_type == m_msg_type )
426  {
428  msg_type,
429  subscriber );
430  }
431  }
432 
434  environment() const noexcept override
435  {
436  return m_underlying_mbox->environment();
437  }
438 
439  private:
440  /*!
441  * Throws an error if msg_type differs from expected message type.
442  */
443  void
445  const std::type_index & msg_type,
446  std::string_view error_description ) const
447  {
448  if( msg_type != m_msg_type )
451  //FIXME: we have to create std::string object because
452  //so_5::exception_t::raise expects std::string.
453  //This should be fixed after resolving:
454  //https://github.com/Stiffstream/sobjectizer/issues/46
456  }
457  };
458 
459 /*!
460  * \brief Check for compatibility between mbox type and message type.
461  *
462  * Throws if mutable message is used with MPMC mbox.
463  *
464  * \since v.1.5.2
465  */
466 template< typename Msg_Type >
467 void
469  //! NOTE: it's expected to be not-null.
470  const so_5::mbox_t & underlying_mbox )
471  {
472  // Use of mutable message type for MPMC mbox should be prohibited.
473  if constexpr( is_mutable_message< Msg_Type >::value )
474  {
475  switch( underlying_mbox->type() )
476  {
480  "an attempt to make MPMC mbox for mutable message, "
481  "msg_type=" + std::string(typeid(Msg_Type).name()) );
482  break;
483 
485  break;
486  }
487  }
488  }
489 
490 } /* namespace impl */
491 
492 /*!
493  * \brief Create an instance of inflight_limit_mbox.
494  *
495  * Usage example:
496  *
497  * \code
498  * namespace mbox_ns = so_5::extra::mboxes::inflight_limit;
499  *
500  * so_5::environment_t & env = ...;
501  *
502  * // Create an inflight_limit_mbox with underlying MPMC mbox for immutable message.
503  * auto my_mbox = mbox_ns::make_mbox<my_msg>(env.create_mbox(), 15u);
504  *
505  * // Create an inflight_limit_mbox with underlying MPSC mbox for mutable message.
506  * class demo_agent : public so_5::agent_t
507  * {
508  * const so_5::mbox_t my_mbox_;
509  * public:
510  * demo_agent(context_t ctx)
511  * : so_5::agent_t{std::move(ctx)}
512  * , my_mbox{ mbox_ns::make_mbox< so_5::mutable_msg<my_msg> >(so_direct_mbox(), 4u) }
513  * {...}
514  * ...
515  * };
516  * \endcode
517  *
518  * \tparam Msg_Type type of message to be used with a new mbox.
519  *
520  * \since v.1.5.2
521  */
522 template< typename Msg_Type >
523 [[nodiscard]]
524 mbox_t
526  //! Actual destination mbox.
528  //! The limit of inflight messages.
530  {
532  std::move(dest_mbox) );
533 
534  // Use of mutable message type for MPMC mbox should be prohibited.
537 
538  auto & env = underlying_mbox.value()->environment();
539 
540  return env.make_custom_mbox(
542  {
543  mbox_t result;
544 
546  {
547  using T = impl::actual_mbox_t<
549 
550  result = mbox_t{ new T{
554  data.m_tracer.get()
555  } };
556  }
557  else
558  {
559  using T = impl::actual_mbox_t<
561 
562  result = mbox_t{ new T{
566  } };
567  }
568 
569  return result;
570  } );
571  }
572 
573 } /* namespace inflight_limit */
574 
575 } /* namespace mboxes */
576 
577 } /* namespace extra */
578 
579 } /* namespace so_5 */
special_envelope_t(message_ref_t payload, instances_counter_shptr_t counter)
Initializing constructor.
void subscribe_event_handler(const std::type_index &msg_type, const ::so_5::message_limit::control_block_t *limit, ::so_5::agent_t &subscriber) override
void fill_trace_data_1(actual_trace_data_t &, extra_inflight_limit_specifics::limit_info)
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit
void ensure_expected_msg_type(const std::type_index &msg_type, std::string_view error_description) const
const int rc_nullptr_as_underlying_mbox
Null pointer to underlying mbox.
mbox_t make_mbox(mbox_t dest_mbox, underlying_counter_t inflight_limit)
Create an instance of inflight_limit_mbox.
const int rc_different_message_type
An attempt to use a message type that differs from mbox&#39;s message type.
counter_incrementer_t(outliving_reference_t< instances_counter_t > counter) noexcept
std::atomic< underlying_counter_t > m_instances
Counter of inflight instances.
void set_delivery_filter(const std::type_index &msg_type, const ::so_5::delivery_filter_t &filter, ::so_5::agent_t &subscriber) override
Separate type for holding inflight message counter as a separate object.
Ranges for error codes of each submodules.
Definition: details.hpp:13
so_5::environment_t & environment() const noexcept override
const std::type_index m_msg_type
Type of a message for that mbox is created.
void drop_delivery_filter(const std::type_index &msg_type, ::so_5::agent_t &subscriber) noexcept override
actual_mbox_t(const not_null_underlying_mbox_t &dest_mbox, std::type_index msg_type, underlying_counter_t limit, Tracing_Args &&... args)
Initializing constructor.
Helper class for incrementing/decrementing number of messages in RAII style.
not_null_underlying_mbox_t ensure_underlying_mbox_not_null(so_5::mbox_t mbox)
Ensure that underlying mbox is not nullptr.
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_current_number
instances_counter_shptr_t m_instances_counter
Counter for inflight instances.
void ensure_valid_message_type_for_underlying_mbox(const so_5::mbox_t &underlying_mbox)
Check for compatibility between mbox type and message type.
void unsubscribe_event_handlers(const std::type_index &msg_type, ::so_5::agent_t &subscriber) override
void do_deliver_message(const std::type_index &msg_type, const ::so_5::message_ref_t &message, unsigned int overlimit_reaction_deep) override
Helper type that tells that underlying mbox isn&#39;t nullptr.
const underlying_counter_t m_limit
The limit of inflight messages.
void make_trace_to_1(std::ostream &s, extra_inflight_limit_specifics::limit_info info)
const so_5::mbox_t m_underlying_mbox
Actual underlying mbox to be used for all calls.
Actual implementation of inflight_limit_mbox.