SObjectizer  5.8
Loading...
Searching...
No Matches
active_group/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5#include <so_5/disp/active_group/pub.hpp>
6
7#include <so_5/send_functions.hpp>
8
9#include <so_5/details/rollback_on_exception.hpp>
10
11#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
12#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
13#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
14
15#include <so_5/disp/reuse/work_thread/work_thread.hpp>
16
17#include <so_5/stats/repository.hpp>
18#include <so_5/stats/messages.hpp>
19#include <so_5/stats/std_names.hpp>
20
21#include <map>
22#include <mutex>
23#include <algorithm>
24
25namespace so_5
26{
27
28namespace disp
29{
30
31namespace active_group
32{
33
34namespace impl
35{
36
37namespace work_thread = so_5::disp::reuse::work_thread;
38namespace stats = so_5::stats;
39
40namespace
41{
42
43/*!
44 * \brief Just a helper function for consequetive call to shutdown and wait.
45 *
46 * \since
47 * v.5.5.4
48 */
49template< class T >
50void
52 {
53 w.shutdown();
54 w.wait();
55 }
56
57void
59 const so_5::mbox_t &,
60 const stats::prefix_t &,
61 work_thread::work_thread_no_activity_tracking_t & )
62 {
63 /* Nothing to do */
64 }
65
66void
68 const so_5::mbox_t & mbox,
69 const stats::prefix_t & prefix,
70 work_thread::work_thread_with_activity_tracking_t & wt )
71 {
72 so_5::send< stats::messages::work_thread_activity >(
73 mbox,
74 prefix,
75 stats::suffixes::work_thread_activity(),
76 wt.thread_id(),
77 wt.take_activity_stats() );
78 }
79
80} /* anonymous */
81
82//
83// actual_dispatcher_iface_t
84//
85/*!
86 * \brief An actual interface of active group dispatcher.
87 *
88 * \since
89 * v.5.6.0
90 */
92 {
93 public :
94 /*!
95 * \brief Create a new thread for a group if it necessary.
96 *
97 * If name \a group_name is unknown then a new work
98 * thread is started. This thread is marked as it has one
99 * working agent on it.
100 *
101 * If there already is a thread for \a group_name then the
102 * counter of working agents is incremented.
103 */
104 virtual void
105 allocate_thread_for_group( const std::string & group_name ) = 0;
106
107 /*!
108 * \brief Get the event_queue for the specified active group.
109 *
110 * It is expected that thread for the group is already
111 * created by calling allocate_thread_for_group() method.
112 */
113 virtual so_5::event_queue_t *
114 query_thread_for_group( const std::string & group_name ) noexcept = 0;
115
116 /*!
117 * \brief Release the thread for the specified active group.
118 *
119 * Method decrements the working agent count for the thread of
120 * \a group_name. If there no more working agents left then
121 * the event_queue and working thread for that group will be
122 * destroyed.
123 */
124 virtual void
125 release_thread_for_group( const std::string & group_name ) noexcept = 0;
126 };
127
128//
129// actual_dispatcher_iface_shptr_t
130//
133
134//
135// actual_binder_t
136//
137/*!
138 * \brief Implementation of binder interface for %active_group dispatcher.
139 *
140 * \since
141 * v.5.6.0
142 */
143class actual_binder_t final : public disp_binder_t
144 {
145 //! Dispatcher to be used.
147 //! Name of group for new agents.
149
150 public :
152 actual_dispatcher_iface_shptr_t disp,
153 nonempty_name_t group_name ) noexcept
154 : m_disp{ std::move(disp) }
156 {}
157
158 void
160 agent_t & /*agent*/ ) override
161 {
162 m_disp->allocate_thread_for_group( m_group_name );
163 }
164
165 void
167 agent_t & /*agent*/ ) noexcept override
168 {
169 m_disp->release_thread_for_group( m_group_name );
170 }
171
172 void
174 agent_t & agent ) noexcept override
175 {
176 auto queue = m_disp->query_thread_for_group( m_group_name );
177 agent.so_bind_to_dispatcher( *queue );
178 }
179
180 void
182 agent_t & /*agent*/ ) noexcept override
183 {
184 m_disp->release_thread_for_group( m_group_name );
185 }
186 };
187
188//
189// dispatcher_template_t
190//
191
192/*!
193 * \brief Implementation of active object dispatcher in form of template class.
194 */
195template< typename Work_Thread >
196class dispatcher_template_t final : public actual_dispatcher_iface_t
197 {
198 public:
200 //! SObjectizer Environment to work in.
202 //! Base part of data sources names.
203 const std::string_view name_base,
204 //! Dispatcher's parameters.
205 disp_params_t params )
206 : m_env{ env }
207 , m_params{ std::move(params) }
208 , m_data_source{
209 outliving_mutable(m_env.get().stats_repository()),
210 name_base,
211 outliving_mutable( *this )
212 }
213 {}
214
215 ~dispatcher_template_t() noexcept override
216 {
217 // All working threads should receive stop signal.
218 for( auto & p: m_groups )
219 p.second.m_thread->shutdown();
220
221 // All working threads should be joined.
222 for( auto & p: m_groups )
223 p.second.m_thread->wait();
224 }
225
228 {
229 return std::make_shared< actual_binder_t >(
230 this->shared_from_this(),
231 std::move(group_name) );
232 }
233
234 void
235 allocate_thread_for_group( const std::string & group_name ) override
236 {
237 std::lock_guard< std::mutex > lock{ m_lock };
238
239 auto it = m_groups.find( group_name );
240
241 if( m_groups.end() == it )
242 {
243 // New thread should be created.
244 auto thread = std::make_shared< Work_Thread >(
245 acquire_work_thread( m_params, m_env.get() ),
246 m_params.queue_params().lock_factory() );
247
248 thread->start();
249
250 so_5::details::do_with_rollback_on_exception(
251 [&] {
252 m_groups.emplace(
253 group_name,
254 thread_with_refcounter_t{ thread, 1u } );
255 },
256 [&thread] { shutdown_and_wait( *thread ); } );
257 }
258 else
259 {
260 // Number of agents bound has to be incremented now.
261 it->second.m_user_agent += 1u;
262 }
263 }
264
266 query_thread_for_group( const std::string & group_name ) noexcept override
267 {
268 std::lock_guard< std::mutex > lock{ m_lock };
269
270 return m_groups.find( group_name )->second.m_thread->
271 get_agent_binding();
272 }
273
274 void
275 release_thread_for_group( const std::string & group_name ) noexcept override
276 {
277 auto thread = search_and_try_remove_group_from_map( group_name );
278 if( thread )
279 shutdown_and_wait( *thread );
280 }
281
282 private:
283 friend class disp_data_source_t;
284
285 //! An alias for shared pointer to work thread.
286 using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
287
288 //! Auxiliary class for the working agent counting.
294
295 //! Typedef for mapping from group names to a single thread
296 //! dispatcher.
297 using active_group_map_t =
298 std::map< std::string, thread_with_refcounter_t >;
299
300 /*!
301 * \brief Data source for run-time monitoring of whole dispatcher.
302 *
303 * \since
304 * v.5.5.4
305 */
306 class disp_data_source_t final : public stats::source_t
307 {
308 //! Dispatcher to work with.
309 outliving_reference_t< dispatcher_template_t > m_dispatcher;
310
311 //! Basic prefix for data sources.
313
314 public :
316 const std::string_view name_base,
317 outliving_reference_t< dispatcher_template_t > disp )
318 : m_dispatcher{ disp }
319 {
320 using namespace so_5::disp::reuse;
321
322 m_base_prefix = make_disp_prefix(
323 "ag", // ao -- active_groups
324 name_base,
325 &m_dispatcher );
326 }
327
328 void
329 distribute( const so_5::mbox_t & mbox ) override
330 {
331 auto & disp = m_dispatcher.get();
332
333 std::lock_guard< std::mutex > lock{ disp.m_lock };
334
335 so_5::send< stats::messages::quantity< std::size_t > >(
336 mbox,
337 m_base_prefix,
338 stats::suffixes::disp_active_group_count(),
339 disp.m_groups.size() );
340
341 std::size_t agent_count = 0;
342 for( const auto & p : disp.m_groups )
343 {
344 distribute_value_for_work_thread(
345 mbox,
346 p.first,
347 p.second );
348
349 agent_count += p.second.m_user_agent;
350 }
351
352 so_5::send< stats::messages::quantity< std::size_t > >(
353 mbox,
354 m_base_prefix,
355 stats::suffixes::agent_count(),
356 agent_count );
357 }
358
359 private:
360 void
362 const so_5::mbox_t & mbox,
363 const std::string & group_name,
364 const thread_with_refcounter_t & wt )
365 {
366 std::ostringstream ss;
367 ss << m_base_prefix.c_str() << "/wt-" << group_name;
368
369 const stats::prefix_t prefix{ ss.str() };
370
371 so_5::send< stats::messages::quantity< std::size_t > >(
372 mbox,
373 prefix,
374 stats::suffixes::agent_count(),
375 wt.m_user_agent );
376
377 so_5::send< stats::messages::quantity< std::size_t > >(
378 mbox,
379 prefix,
380 stats::suffixes::work_thread_queue_size(),
381 wt.m_thread->demands_count() );
382
383 send_thread_activity_stats(
384 mbox,
385 prefix,
386 *(wt.m_thread) );
387 }
388 };
389
390 /*!
391 * \brief SObjectizer Environment to work in.
392 *
393 * It is necessary for calling work_thread_factory.
394 *
395 * \since v.5.7.3
396 */
398
399 //! Parameters for the dispatcher.
401
402 //! A map of dispatchers for active groups.
404
405 //! This object lock.
407
408 /*!
409 * \brief Data source for run-time monitoring.
410 *
411 * \since
412 * v.5.5.4, v.5.6.0
413 */
414 stats::auto_registered_source_holder_t< disp_data_source_t >
416
417 /*!
418 * \brief Helper function for searching and erasing agent's
419 * thread from map of active threads.
420 *
421 * \since
422 * v.5.5.4
423 *
424 * \note Does all actions on locked object.
425 *
426 * \return nullptr if thread for the group is not found
427 * or there are still some agents on it.
428 */
431 const std::string & group_name ) noexcept
432 {
433 work_thread_shptr_t result;
434
435 std::lock_guard< std::mutex > lock{ m_lock };
436
437 auto it = m_groups.find( group_name );
438
439 if( m_groups.end() != it && 0u == --(it->second.m_user_agent) )
440 {
441 result = it->second.m_thread;
442 m_groups.erase( it );
443 }
444
445 return result;
446 }
447};
448
449//
450// dispatcher_handle_maker_t
451//
453 {
454 public :
456 make( actual_dispatcher_iface_shptr_t disp ) noexcept
457 {
458 return { std::move( disp ) };
459 }
460 };
461
462} /* namespace impl */
463
464//
465// make_dispatcher
466//
469 environment_t & env,
470 const std::string_view data_sources_name_base,
471 disp_params_t params )
472 {
473 using namespace so_5::disp::reuse;
474
475 using dispatcher_no_activity_tracking_t =
476 impl::dispatcher_template_t<
477 work_thread::work_thread_no_activity_tracking_t >;
478
479 using dispatcher_with_activity_tracking_t =
480 impl::dispatcher_template_t<
481 work_thread::work_thread_with_activity_tracking_t >;
482
483 auto binder = so_5::disp::reuse::make_actual_dispatcher<
484 impl::actual_dispatcher_iface_t,
485 dispatcher_no_activity_tracking_t,
486 dispatcher_with_activity_tracking_t >(
487 outliving_mutable(env),
488 data_sources_name_base,
489 std::move(params) );
490
491 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
492 }
493
494} /* namespace active_group */
495
496} /* namespace disp */
497
498} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
Alias for namespace with traits of event queue.
A handle for active_group dispatcher.
const std::string m_group_name
Name of group for new agents.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, nonempty_name_t group_name) noexcept
An actual interface of active group dispatcher.
virtual so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept=0
Get the event_queue for the specified active group.
virtual void release_thread_for_group(const std::string &group_name) noexcept=0
Release the thread for the specified active group.
virtual void allocate_thread_for_group(const std::string &group_name)=0
Create a new thread for a group if it necessary.
The very basic interface of active_group dispatcher.
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute_value_for_work_thread(const so_5::mbox_t &mbox, const std::string &group_name, const thread_with_refcounter_t &wt)
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute(const so_5::mbox_t &mbox) override
Send appropriate notification about the current value.
void release_thread_for_group(const std::string &group_name) noexcept override
Release the thread for the specified active group.
void allocate_thread_for_group(const std::string &group_name) override
Create a new thread for a group if it necessary.
outliving_reference_t< environment_t > m_env
SObjectizer Environment to work in.
so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept override
Get the event_queue for the specified active group.
active_group_map_t m_groups
A map of dispatchers for active groups.
const disp_params_t m_params
Parameters for the dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
work_thread_shptr_t search_and_try_remove_group_from_map(const std::string &group_name) noexcept
Helper function for searching and erasing agent's thread from map of active threads.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
disp_binder_shptr_t binder(nonempty_name_t group_name) override
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
void shutdown_and_wait(T &w)
Just a helper function for consequetive call to shutdown and wait.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Active groups dispatcher implemetation details.
Active groups dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_group dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33