2
3
5#include <so_5/disp/active_group/pub.hpp>
7#include <so_5/send_functions.hpp>
9#include <so_5/details/rollback_on_exception.hpp>
11#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
12#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
13#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
15#include <so_5/disp/reuse/work_thread/work_thread.hpp>
17#include <so_5/stats/repository.hpp>
18#include <so_5/stats/messages.hpp>
19#include <so_5/stats/std_names.hpp>
44
45
46
47
48
61 work_thread::work_thread_no_activity_tracking_t & )
68 const so_5::mbox_t & mbox,
70 work_thread::work_thread_with_activity_tracking_t & wt )
72 so_5::send< stats::messages::work_thread_activity >(
75 stats::suffixes::work_thread_activity(),
77 wt.take_activity_stats() );
86
87
88
89
90
95
96
97
98
99
100
101
102
103
108
109
110
111
112
117
118
119
120
121
122
123
138
139
140
141
142
152 actual_dispatcher_iface_shptr_t disp,
162 m_disp->allocate_thread_for_group( m_group_name );
169 m_disp->release_thread_for_group( m_group_name );
174 agent_t & agent )
noexcept override
176 auto queue = m_disp->query_thread_for_group( m_group_name );
177 agent.so_bind_to_dispatcher( *queue );
184 m_disp->release_thread_for_group( m_group_name );
193
194
195template<
typename Work_Thread >
203 const std::string_view name_base,
207 , m_params{ std::move(params) }
209 outliving_mutable(m_env.get().stats_repository()),
211 outliving_mutable( *
this )
218 for(
auto & p: m_groups )
219 p.second.m_thread->shutdown();
222 for(
auto & p: m_groups )
223 p.second.m_thread->wait();
229 return std::make_shared< actual_binder_t >(
230 this->shared_from_this(),
231 std::move(group_name) );
237 std::lock_guard< std::mutex > lock{ m_lock };
239 auto it = m_groups.find( group_name );
241 if( m_groups.end() == it )
244 auto thread = std::make_shared< Work_Thread >(
245 acquire_work_thread( m_params, m_env.get() ),
246 m_params.queue_params().lock_factory() );
250 so_5::details::do_with_rollback_on_exception(
254 thread_with_refcounter_t{ thread, 1u } );
256 [&thread] { shutdown_and_wait( *thread ); } );
261 it->second.m_user_agent += 1u;
268 std::lock_guard< std::mutex > lock{ m_lock };
270 return m_groups.find( group_name )->second.m_thread->
277 auto thread = search_and_try_remove_group_from_map( group_name );
279 shutdown_and_wait( *thread );
283 friend class disp_data_source_t;
286 using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
297 using active_group_map_t =
298 std::map< std::string, thread_with_refcounter_t >;
301
302
303
304
305
306 class disp_data_source_t
final :
public stats::
source_t
316 const std::string_view name_base,
318 : m_dispatcher{ disp }
320 using namespace so_5::disp::reuse;
322 m_base_prefix = make_disp_prefix(
331 auto & disp = m_dispatcher.get();
333 std::lock_guard< std::mutex > lock{ disp.m_lock };
335 so_5::send< stats::messages::quantity< std::size_t > >(
338 stats::suffixes::disp_active_group_count(),
339 disp.m_groups.size() );
341 std::size_t agent_count = 0;
342 for(
const auto & p : disp.m_groups )
344 distribute_value_for_work_thread(
349 agent_count += p.second.m_user_agent;
352 so_5::send< stats::messages::quantity< std::size_t > >(
355 stats::suffixes::agent_count(),
362 const so_5::mbox_t & mbox,
363 const std::string & group_name,
366 std::ostringstream ss;
367 ss << m_base_prefix.c_str() <<
"/wt-" << group_name;
369 const stats::prefix_t prefix{ ss.str() };
371 so_5::send< stats::messages::quantity< std::size_t > >(
374 stats::suffixes::agent_count(),
377 so_5::send< stats::messages::quantity< std::size_t > >(
380 stats::suffixes::work_thread_queue_size(),
381 wt.m_thread->demands_count() );
383 send_thread_activity_stats(
391
392
393
394
395
396
409
410
411
412
413
418
419
420
421
422
423
424
425
426
427
428
433 work_thread_shptr_t result;
435 std::lock_guard< std::mutex > lock{ m_lock };
437 auto it = m_groups.find( group_name );
439 if( m_groups.end() != it && 0u == --(it->second.m_user_agent) )
441 result = it->second.m_thread;
442 m_groups.erase( it );
456 make( actual_dispatcher_iface_shptr_t disp )
noexcept
458 return { std::move( disp ) };
470 const std::string_view data_sources_name_base,
475 using dispatcher_no_activity_tracking_t =
476 impl::dispatcher_template_t<
479 using dispatcher_with_activity_tracking_t =
480 impl::dispatcher_template_t<
481 work_thread::work_thread_with_activity_tracking_t >;
483 auto binder = so_5::disp::reuse::make_actual_dispatcher<
484 impl::actual_dispatcher_iface_t,
485 dispatcher_no_activity_tracking_t,
486 dispatcher_with_activity_tracking_t >(
487 outliving_mutable(env),
488 data_sources_name_base,
491 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
Alias for namespace with traits of event queue.
A handle for active_group dispatcher.
const std::string m_group_name
Name of group for new agents.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, nonempty_name_t group_name) noexcept
An actual interface of active group dispatcher.
virtual so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept=0
Get the event_queue for the specified active group.
virtual void release_thread_for_group(const std::string &group_name) noexcept=0
Release the thread for the specified active group.
virtual void allocate_thread_for_group(const std::string &group_name)=0
Create a new thread for a group if it necessary.
The very basic interface of active_group dispatcher.
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute_value_for_work_thread(const so_5::mbox_t &mbox, const std::string &group_name, const thread_with_refcounter_t &wt)
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const so_5::mbox_t &mbox) override
Send appropriate notification about the current value.
void release_thread_for_group(const std::string &group_name) noexcept override
Release the thread for the specified active group.
void allocate_thread_for_group(const std::string &group_name) override
Create a new thread for a group if it necessary.
outliving_reference_t< environment_t > m_env
SObjectizer Environment to work in.
std::mutex m_lock
This object lock.
so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept override
Get the event_queue for the specified active group.
active_group_map_t m_groups
A map of dispatchers for active groups.
const disp_params_t m_params
Parameters for the dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
~dispatcher_template_t() noexcept override
work_thread_shptr_t search_and_try_remove_group_from_map(const std::string &group_name) noexcept
Helper function for searching and erasing agent's thread from map of active threads.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
disp_binder_shptr_t binder(nonempty_name_t group_name) override
Interface for dispatcher binders.
An interface of event queue for agent.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
void shutdown_and_wait(T &w)
Just a helper function for consequetive call to shutdown and wait.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Active groups dispatcher implemetation details.
Active groups dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_group dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Auxiliary class for the working agent counting.
work_thread_shptr_t m_thread