SObjectizer-5 Extra
unique_subscribers.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of unique_subscribers mbox.
4  *
5  * \since
6  * v.1.5.0
7  */
8 
9 #pragma once
10 
11 #include <so_5/version.hpp>
12 
13 #if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 7u, 4u)
14 #error "SObjectizer-5.7.4 of newest is required"
15 #endif
16 
17 #include <so_5_extra/error_ranges.hpp>
18 
19 #include <so_5/details/sync_helpers.hpp>
20 
21 #include <so_5/mbox.hpp>
22 
23 #include <so_5/impl/agent_ptr_compare.hpp>
24 #include <so_5/impl/message_limit_internals.hpp>
25 #include <so_5/impl/msg_tracing_helpers.hpp>
26 #include <so_5/impl/local_mbox_basic_subscription_info.hpp>
27 
28 #include <so_5/details/invoke_noexcept_code.hpp>
29 
30 #include <memory>
31 #include <tuple>
32 #include <utility>
33 
34 namespace so_5 {
35 
36 namespace extra {
37 
38 namespace mboxes {
39 
40 namespace unique_subscribers {
41 
42 namespace errors {
43 
44 /*!
45  * \brief An attempt to make another subscription to the same message type.
46  *
47  * This error is reported when there is an existing subscription to
48  * the same message type. And this subscription is made for another agent.
49  *
50  * \since
51  * v.1.5.0
52  */
55 
56 /*!
57  * \brief An attempt to set a delivery filter.
58  *
59  * \deprecated Since v.1.5.1 delivery filters are supported by
60  * unique_subscribers mbox.
61  *
62  * \since v.1.5.0
63  */
64 [[deprecated]]
67 
68 } /* namespace errors */
69 
70 namespace details {
71 
72 //
73 // subscriber_info_t
74 //
75 /*!
76  * \brief Description of a subscriber.
77  *
78  * \since v.1.5.0, v.1.5.1
79  */
82  {
84 
85  //! Subscriber.
86  /*!
87  * \attention
88  * It's assumed that this pointer can't be null.
89  */
91 
92  public:
93  //! Constructor for case when agent and limits are known.
95  agent_t & agent,
96  const so_5::message_limit::control_block_t * limit )
97  : base_type_t{ limit }
98  , m_agent{ &agent }
99  {}
100 
101  //! Constructor for case when agent and delivery filter are known.
103  agent_t & agent,
104  const so_5::delivery_filter_t & filter )
105  : base_type_t{ &filter }
106  , m_agent{ &agent }
107  {}
108 
109  [[nodiscard]]
110  agent_t *
111  receiver() const noexcept
112  {
113  return m_agent;
114  }
115  };
116 
117 //
118 // data_t
119 //
120 
121 /*!
122  * \brief A coolection of data required for local mbox implementation.
123  *
124  * \since v.1.5.0
125  */
126 struct data_t
127  {
128  data_t( mbox_id_t id, environment_t & env )
129  : m_id{ id }
130  , m_env{ env }
131  {}
132 
133  //! ID of this mbox.
135 
136  //! Environment for which the mbox is created.
138 
139  /*!
140  * \brief Map from message type to subscribers.
141  */
143 
144  //! Map of subscribers to messages.
146  };
147 
148 //
149 // actual_mbox_t
150 //
151 
152 //! Actual implementation of unique_subscribers mbox.
153 /*!
154  * \tparam Mutex type of lock to be used for thread safety.
155  * \tparam Tracing_Base base class with implementation of message
156  * delivery tracing methods.
157  *
158  * \since v.1.5.0
159  */
160 template<
161  typename Mutex,
162  typename Tracing_Base >
164  : public abstract_message_box_t
165  , private ::so_5::details::lock_holder_detector< Mutex >::type
166  , private data_t
167  , private Tracing_Base
168  {
169  public:
170  template< typename... Tracing_Args >
172  //! ID of this mbox.
173  mbox_id_t id,
174  //! Environment for which the mbox is created.
175  outliving_reference_t< environment_t > env,
176  //! Optional parameters for Tracing_Base's constructor.
177  Tracing_Args &&... args )
178  : data_t{ id, env.get() }
180  {}
181 
182  mbox_id_t
183  id() const override
184  {
185  return this->m_id;
186  }
187 
188  void
190  const std::type_index & msg_type,
191  const so_5::message_limit::control_block_t * limit,
192  agent_t & subscriber ) override
193  {
195  msg_type,
196  subscriber,
197  [&] {
199  },
200  [&]( subscriber_info_t & info ) {
201  info.set_limit( limit );
202  } );
203  }
204 
205  void
207  const std::type_index & msg_type,
208  agent_t & subscriber ) override
209  {
211  msg_type,
212  subscriber,
213  []( subscriber_info_t & info ) {
214  info.drop_limit();
215  } );
216  }
217 
218  std::string
219  query_name() const override
220  {
222  s << "<mbox:type=UNIQUESUBSCRIBERS:id=" << m_id << ">";
223 
224  return s.str();
225  }
226 
228  type() const override
229  {
231  }
232 
233  void
235  const std::type_index & msg_type,
236  const message_ref_t & message,
237  unsigned int overlimit_reaction_deep ) override
238  {
240  *this, // as Tracing_base
241  *this, // as abstract_message_box_t
242  "deliver_message",
244 
246  tracer,
247  msg_type,
248  message,
250  }
251 
252  void
254  const std::type_index & msg_type,
255  const delivery_filter_t & filter,
256  agent_t & subscriber ) override
257  {
259  msg_type,
260  subscriber,
261  [&] {
263  },
264  [&]( subscriber_info_t & info ) {
266  } );
267  }
268 
269  void
271  const std::type_index & msg_type,
272  agent_t & subscriber ) noexcept override
273  {
275  msg_type,
276  subscriber,
277  []( subscriber_info_t & info ) {
278  info.drop_filter();
279  } );
280  }
281 
282  environment_t &
283  environment() const noexcept override
284  {
285  return m_env;
286  }
287 
288  private :
289  template< typename Info_Maker, typename Info_Changer >
290  void
292  const std::type_index & msg_type,
293  agent_t & subscriber,
294  Info_Maker maker,
295  Info_Changer changer )
296  {
297  this->lock_and_perform( [&] {
298  auto it = this->m_subscribers.find( msg_type );
299  if( it == this->m_subscribers.end() )
300  {
301  // There isn't such message type yet.
303  }
304  else
305  {
306  // If subscription or delivery filter is already set by
307  // a different agent then we can't continue.
308  if( it->second.receiver() != &subscriber )
311  std::string{ "subscription is already exists "
312  "for message type '" }
313  + msg_type.name()
314  + "'" );
315  else
316  changer( it->second );
317  }
318  } );
319  }
320 
321  template< typename Info_Changer >
322  void
324  const std::type_index & msg_type,
325  agent_t & subscriber,
326  Info_Changer changer )
327  {
328  this->lock_and_perform( [&] {
329  auto it = this->m_subscribers.find( msg_type );
330  if( it != this->m_subscribers.end() )
331  {
332  auto & subscriber_info = it->second;
333 
334  // Skip all other actions if the subscription is
335  // made for a different agent.
337  {
338  // Subscriber is found and must be modified.
340 
341  // If info about subscriber becomes empty after
342  // modification then subscriber info must be removed.
343  if( subscriber_info.empty() )
344  this->m_subscribers.erase( it );
345  }
346  }
347  } );
348  }
349 
350  void
352  const std::type_index & msg_type,
353  agent_t * subscriber )
354  {
355  this->lock_and_perform( [&] {
356  auto it = this->m_subscribers.find( msg_type );
357  if( it != this->m_subscribers.end() )
358  {
359  auto & subscriber_info = it->second;
360 
361  // Skip all other actions if the subscription is
362  // made for a different agent.
364  {
365  // Subscriber must be removed.
366  this->m_subscribers.erase( it );
367  }
368  }
369  } );
370  }
371 
372  void
374  typename Tracing_Base::deliver_op_tracer const & tracer,
375  const std::type_index & msg_type,
376  const message_ref_t & message,
377  unsigned int overlimit_reaction_deep )
378  {
379  this->lock_and_perform( [&] {
380  auto it = this->m_subscribers.find( msg_type );
381  if( it != this->m_subscribers.end() )
382  {
384  it->second,
385  tracer,
386  msg_type,
387  message,
389  }
390  else
392  } );
393  }
394 
395  void
397  const subscriber_info_t & subscriber_info,
398  typename Tracing_Base::deliver_op_tracer const & tracer,
399  const std::type_index & msg_type,
400  const message_ref_t & message,
401  unsigned int overlimit_reaction_deep ) const
402  {
403  const auto delivery_status =
406  message,
407  []( const message_ref_t & msg ) -> message_t & {
408  return *msg;
409  } );
410 
412  {
413  using namespace so_5::message_limit::impl;
414 
416  this->m_id,
419  msg_type,
420  message,
423  [&] {
425 
429  this->m_id,
430  msg_type,
431  message );
432  } );
433  }
434  else
437  }
438  };
439 
440 } /* namespace details */
441 
442 //
443 // make_mbox
444 //
445 /*!
446  * \brief Factory function for creation of a new instance of unique_subscribers
447  * mbox.
448  *
449  * Usage examples:
450  *
451  * Create a mbox with std::mutex as Lock_Type (this mbox can safely be
452  * used in multi-threaded environments):
453  * \code
454  * so_5::environment_t & env = ...;
455  * auto mbox = so_5::extra::mboxes::unique_subscribers::make_mbox(env);
456  * \endcode
457  *
458  * Create a mbox with so_5::null_mutex_t as Lock_Type (this mbox can only
459  * be used in single-threaded environments):
460  * \code
461  * so_5::environment_t & env = ...;
462  * auto mbox = so_5::extra::mboxes::unique_subscribers::make_mbox<so_5::null_mutex_t>(env);
463  * \endcode
464  *
465  * \tparam Lock_Type type of lock to be used for thread safety. It can be
466  * std::mutex or so_5::null_mutex_t (or any other type which can be used
467  * with std::lock_quard).
468  *
469  * \since v.1.5.0
470  */
471 template<
472  typename Lock_Type = std::mutex >
473 [[nodiscard]]
474 mbox_t
476  {
477  return env.make_custom_mbox(
478  [&]( const mbox_creation_data_t & data ) {
479  mbox_t result;
480 
482  {
483  using T = details::actual_mbox_t<
484  Lock_Type,
486 
487  result = mbox_t{ new T{
488  data.m_id,
489  data.m_env,
490  data.m_tracer.get()
491  } };
492  }
493  else
494  {
495  using T = details::actual_mbox_t<
496  Lock_Type,
498  result = mbox_t{ new T{
499  data.m_id,
500  data.m_env
501  } };
502  }
503 
504  return result;
505  } );
506  }
507 
508 } /* namespace unique_subscribers */
509 
510 } /* namespace mboxes */
511 
512 } /* namespace extra */
513 
514 } /* namespace so_5 */
void do_deliver_message_to_subscriber(const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, agent_t &subscriber) override
Actual implementation of unique_subscribers mbox.
const int rc_subscription_exists
An attempt to make another subscription to the same message type.
void modify_and_remove_subscriber_if_needed(const std::type_index &msg_type, agent_t &subscriber, Info_Changer changer)
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override
Ranges for error codes of each submodules.
Definition: details.hpp:13
void insert_or_modify_subscriber(const std::type_index &msg_type, agent_t &subscriber, Info_Maker maker, Info_Changer changer)
messages_table_t m_subscribers
Map of subscribers to messages.
subscriber_info_t(agent_t &agent, const so_5::message_limit::control_block_t *limit)
Constructor for case when agent and limits are known.
void subscribe_event_handler(const std::type_index &msg_type, const so_5::message_limit::control_block_t *limit, agent_t &subscriber) override
const int rc_delivery_filters_not_supported
An attempt to set a delivery filter.
mbox_t make_mbox(so_5::environment_t &env)
Factory function for creation of a new instance of unique_subscribers mbox.
void remove_subscriber_if_needed(const std::type_index &msg_type, agent_t *subscriber)
subscriber_info_t(agent_t &agent, const so_5::delivery_filter_t &filter)
Constructor for case when agent and delivery filter are known.
actual_mbox_t(mbox_id_t id, outliving_reference_t< environment_t > env, Tracing_Args &&... args)
A coolection of data required for local mbox implementation.
void drop_delivery_filter(const std::type_index &msg_type, agent_t &subscriber) noexcept override
void unsubscribe_event_handlers(const std::type_index &msg_type, agent_t &subscriber) override
environment_t & m_env
Environment for which the mbox is created.