SObjectizer-5 Extra
round_robin.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of round-robin mbox.
4  */
5 
6 #pragma once
7 
8 #include <so_5_extra/error_ranges.hpp>
9 
10 #include <so_5/impl/msg_tracing_helpers.hpp>
11 #include <so_5/impl/message_limit_internals.hpp>
12 
13 namespace so_5 {
14 
15 namespace extra {
16 
17 namespace mboxes {
18 
19 namespace round_robin {
20 
21 namespace errors {
22 
23 /*!
24  * \brief An attempt to set delivery filter to round_robin mbox.
25  *
26  * \since
27  * v.1.0.1
28  */
31 
32 } /* namespace errors */
33 
34 namespace details {
35 
36 //
37 // subscriber_info_t
38 //
39 
40 /*!
41  * \brief An information block about one subscriber.
42  */
44 {
45  //! Subscriber.
47 
48  //! Optional message limit for that subscriber.
50 
51  //! Constructor for the case when subscriber info is being
52  //! created during event subscription.
54  agent_t * agent,
55  const so_5::message_limit::control_block_t * limit )
56  : m_agent( agent )
57  , m_limit( limit )
58  {}
59 };
60 
61 //
62 // subscriber_container_t
63 //
64 
65 /*!
66  * \brief Type of container for holding subscribers for one message type.
67  */
69  {
70  public :
72 
73  bool
74  empty() const noexcept
75  {
76  return m_subscribers.empty();
77  }
78 
79  void
81  agent_t * agent,
82  const so_5::message_limit::control_block_t * limit )
83  {
84  m_subscribers.emplace_back( agent, limit );
85  }
86 
88  end() noexcept
89  {
90  return m_subscribers.end();
91  }
92 
94  find( agent_t * subscriber ) noexcept
95  {
96  return std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
97  [&]( const auto & info ) {
98  return info.m_agent == subscriber;
99  } );
100  }
101 
102  void
103  erase( storage_t::iterator it ) noexcept
104  {
105  m_subscribers.erase(it);
107  }
108 
109  const subscriber_info_t &
110  current_subscriber() const noexcept
111  {
112  return m_subscribers[ m_current_subscriber ];
113  }
114 
115  void
117  {
118  ++m_current_subscriber;
120  }
121 
122  private :
125 
126  void
128  {
129  if( m_current_subscriber >= m_subscribers.size() )
130  m_current_subscriber = 0;
131  }
132  };
133 
134 //
135 // data_t
136 //
137 
138 /*!
139  * \brief Common part of round-robin mbox implementation.
140  *
141  * This part depends only on Lock type but not on tracing facilities.
142  *
143  * \tparam Lock type of lock object to be used.
144  */
145 template< typename Lock >
146 struct data_t
147  {
148  //! Initializing constructor.
150  environment_t & env,
151  mbox_id_t id )
152  : m_env{ env }
153  , m_id{ id }
154  {}
155 
156  //! SObjectizer Environment to work in.
158 
159  //! ID of this mbox.
161 
162  //! Object lock.
163  Lock m_lock;
164 
165  /*!
166  * \brief Map from message type to subscribers.
167  */
168  using messages_table_t = std::map<
169  std::type_index,
171 
172  //! Map of subscribers to messages.
174  };
175 
176 //
177 // mbox_template_t
178 //
179 
180 //! A template with implementation of round-robin mbox.
181 /*!
182  * \tparam Lock_Type type of lock to be used for thread safety.
183  * \tparam Tracing_Base base class with implementation of message
184  * delivery tracing methods. Expected to be tracing_enabled_base or
185  * tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
186  */
187 template<
188  typename Lock_Type,
189  typename Tracing_Base >
191  : public abstract_message_box_t
192  , private data_t< Lock_Type >
193  , private Tracing_Base
194  {
195  using data_type = data_t< Lock_Type >;
196 
197  public:
198  //! Initializing constructor.
199  template< typename... Tracing_Args >
201  //! SObjectizer Environment to work in.
202  environment_t & env,
203  //! ID of this mbox.
204  mbox_id_t id,
205  //! Optional parameters for Tracing_Base's constructor.
206  Tracing_Args &&... args )
207  : data_type{ env, id }
209  {}
210 
211  mbox_id_t
212  id() const override
213  {
214  return this->m_id;
215  }
216 
217  void
219  const std::type_index & type_wrapper,
220  const so_5::message_limit::control_block_t * limit,
221  agent_t & subscriber ) override
222  {
223  std::lock_guard< Lock_Type > lock( this->m_lock );
224 
225  auto it = this->m_subscribers.find( type_wrapper );
226  if( it == this->m_subscribers.end() )
227  {
228  // There isn't such message type yet.
231 
233  }
234  else
235  {
236  auto & agents = it->second;
237 
238  auto pos = agents.find( &subscriber );
239  if( pos == agents.end() )
240  // There is no subscriber in the container.
241  // It must be added.
243  }
244  }
245 
246  void
248  const std::type_index & type_wrapper,
249  agent_t & subscriber ) override
250  {
251  std::lock_guard< Lock_Type > lock( this->m_lock );
252 
253  auto it = this->m_subscribers.find( type_wrapper );
254  if( it != this->m_subscribers.end() )
255  {
256  auto & agents = it->second;
257 
258  auto pos = agents.find( &subscriber );
259  if( pos != agents.end() )
260  {
261  agents.erase( pos );
262  }
263 
264  if( agents.empty() )
265  this->m_subscribers.erase( it );
266  }
267  }
268 
269  std::string
270  query_name() const override
271  {
273  s << "<mbox:type=RRMPSC:id=" << this->m_id << ">";
274 
275  return s.str();
276  }
277 
279  type() const override
280  {
282  }
283 
284  void
286  const std::type_index & msg_type,
287  const message_ref_t & message,
288  unsigned int overlimit_reaction_deep ) override
289  {
291  *this, // as Tracing_Base
292  *this, // as abstract_message_box_t
293  "deliver_message",
295 
297  tracer,
298  msg_type,
299  message,
301  }
302 
303  void
305  const std::type_index & /*msg_type*/,
306  const delivery_filter_t & /*filter*/,
307  agent_t & /*subscriber*/ ) override
308  {
309  using namespace so_5::extra::mboxes::round_robin::errors;
310 
313  "set_delivery_filter is called for round_robin-mbox" );
314  }
315 
316  void
318  const std::type_index & /*msg_type*/,
319  agent_t & /*subscriber*/ ) noexcept override
320  {
321  // Nothing to do.
322  }
323 
325  environment() const noexcept override
326  {
327  return this->m_env;
328  }
329 
330  private :
331  void
333  typename Tracing_Base::deliver_op_tracer const & tracer,
334  const std::type_index & msg_type,
335  const message_ref_t & message,
336  unsigned int overlimit_reaction_deep )
337  {
338  std::lock_guard< Lock_Type > lock( this->m_lock );
339 
340  auto it = this->m_subscribers.find( msg_type );
341  if( it != this->m_subscribers.end() )
342  {
343  const auto & agent_info = it->second.current_subscriber();
345 
347  agent_info,
348  tracer,
349  msg_type,
350  message,
352  }
353  else
355  }
356 
357  void
359  const subscriber_info_t & agent_info,
360  typename Tracing_Base::deliver_op_tracer const & tracer,
361  const std::type_index & msg_type,
362  const message_ref_t & message,
363  unsigned int overlimit_reaction_deep )
364  {
365  using namespace so_5::message_limit::impl;
366 
368  this->m_id,
369  *(agent_info.m_agent),
371  msg_type,
372  message,
375  [&] {
377 
379  *(agent_info.m_agent),
381  this->m_id,
382  msg_type,
383  message );
384  } );
385  }
386  };
387 
388 } /* namespace details */
389 
390 //
391 // make_mbox
392 //
393 /*!
394  * \brief Create an implementation of round-robin mbox.
395  *
396  * Usage example:
397  * \code
398  so_5::environment_t & env = ...;
399  const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
400  ...
401  so_5::send< some_message >( rrmbox, ... );
402  * \endcode
403  *
404  * \tparam Lock_Type type of lock to be used for thread safety.
405  */
406 template< typename Lock_Type = std::mutex >
407 mbox_t
409  {
410  return env.make_custom_mbox(
411  []( const mbox_creation_data_t & data ) {
412  mbox_t result;
413 
415  {
416  using T = details::mbox_template_t<
417  Lock_Type,
419 
421  data.m_env.get(),
422  data.m_id,
423  data.m_tracer.get() )
424  };
425  }
426  else
427  {
428  using T = details::mbox_template_t<
429  Lock_Type,
431 
433  data.m_env.get(),
434  data.m_id )
435  };
436  }
437 
438  return result;
439  } );
440  }
441 
442 } /* namespace round_robin */
443 
444 } /* namespace mboxes */
445 
446 } /* namespace extra */
447 
448 } /* namespace so_5 */
void subscribe_event_handler(const std::type_index &type_wrapper, const so_5::message_limit::control_block_t *limit, agent_t &subscriber) override
environment_t & m_env
SObjectizer Environment to work in.
const subscriber_info_t & current_subscriber() const noexcept
void do_deliver_message_to_subscriber(const subscriber_info_t &agent_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
Ranges for error codes of each submodules.
Definition: details.hpp:13
void set_delivery_filter(const std::type_index &, const delivery_filter_t &, agent_t &) override
void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override
void unsubscribe_event_handlers(const std::type_index &type_wrapper, agent_t &subscriber) override
A template with implementation of round-robin mbox.
subscriber_info_t(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Constructor for the case when subscriber info is being created during event subscription.
Definition: round_robin.hpp:53
so_5::environment_t & environment() const noexcept override
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
Common part of round-robin mbox implementation.
Type of container for holding subscribers for one message type.
Definition: round_robin.hpp:68
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
void emplace_back(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Definition: round_robin.hpp:80
const int rc_delivery_filter_cannot_be_used_on_round_robin_mbox
An attempt to set delivery filter to round_robin mbox.
Definition: round_robin.hpp:29
storage_t::iterator find(agent_t *subscriber) noexcept
Definition: round_robin.hpp:94
const so_5::message_limit::control_block_t * m_limit
Optional message limit for that subscriber.
Definition: round_robin.hpp:49
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(environment_t &env, mbox_id_t id)
Initializing constructor.
mbox_template_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
void drop_delivery_filter(const std::type_index &, agent_t &) noexcept override
An information block about one subscriber.
Definition: round_robin.hpp:43