2
3
6
7
8
9
10
11
13#include <so_5/disp/nef_one_thread/pub.hpp>
15#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
16#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
17#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
19#include <so_5/impl/thread_join_stuff.hpp>
21#include <so_5/stats/repository.hpp>
22#include <so_5/stats/messages.hpp>
23#include <so_5/stats/std_names.hpp>
25#include <so_5/send_functions.hpp>
43
44
45
46
51
52
53
54
59
60
61
62
66
67
68
69
70
83
84
85
86
114 queue_traits::lock_unique_ptr_t lock )
142 push( demand_unique_ptr_t tail_demand )
150 m_head = tail_demand.release();
158 m_tail->m_next = tail_demand.release();
167 demand_unique_ptr_t result;
179 result = remove_head();
189 return m_size.load( std::memory_order_acquire );
197 demand_unique_ptr_t to_be_deleted{ m_head };
199 to_be_deleted->m_next =
nullptr;
203 return to_be_deleted;
211
212
213
214
225 demand_unique_ptr_t evt_start_demand,
226 demand_unique_ptr_t evt_finish_demand )
243 m_dest_queue.get().push(
244 std::make_unique< demand_t >( std::move(demand) ) );
252 m_evt_start_demand->m_execution_demand = std::move(demand);
253 m_dest_queue.get().push( std::move(m_evt_start_demand) );
263 m_evt_finish_demand->m_execution_demand = std::move(demand);
264 m_dest_queue.get().push( std::move(m_evt_finish_demand) );
274
275
276
277
288
289
290
296 queue_traits::lock_unique_ptr_t lock,
308
309
310
311
317 queue_traits::lock_unique_ptr_t lock,
344
345
346
347
353 queue_traits::lock_unique_ptr_t lock,
367 result.m_working_stats = m_working_stats.take_stats();
368 result.m_waiting_stats = m_waiting_stats.take_stats();
403
404
405
406
407template<
typename Work_Thread >
410 using base_type_t = Work_Thread;
416 queue_traits::lock_unique_ptr_t lock,
504using work_thread_no_activity_tracking_t =
510using work_thread_with_activity_tracking_t =
518 const so_5::mbox_t &,
520 work_thread_no_activity_tracking_t & )
526send_thread_activity_stats(
527 const so_5::mbox_t & mbox,
529 work_thread_with_activity_tracking_t & wt )
531 so_5::send< stats::messages::work_thread_activity >(
534 stats::suffixes::work_thread_activity(),
536 wt.take_activity_stats() );
543
544
545
546
547
548
549template<
typename Work_Thread >
552 friend class disp_data_source_t;
557 const std::string_view name_base,
560 params.queue_params().lock_factory()(),
561 so_5::disp::reuse::acquire_work_thread( params, env.get() ),
564 outliving_mutable(env.get().stats_repository()),
566 outliving_mutable(*
this)
569 m_work_thread.start();
574 m_work_thread.stop();
575 m_work_thread.join();
583 auto evt_start_demand = std::make_unique< demand_t >();
584 auto evt_finish_demand = std::make_unique< demand_t >();
586 auto queue = std::make_unique< agent_queue_t >(
587 m_work_thread.demand_queue(),
588 std::move(evt_start_demand),
589 std::move(evt_finish_demand)
593 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
596 std::addressof(agent),
602 agent_t & agent )
noexcept override
604 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
606 m_agents.erase( std::addressof(agent) );
611 agent_t & agent )
noexcept override
613 event_queue_t & queue = [&]() -> event_queue_t &
615 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
617 auto it = m_agents.find( std::addressof(agent) );
622 if( it == m_agents.end() )
624 rc_no_preallocated_resources_for_agent,
625 "nef_one_thread dispatcher has no info about an agent "
626 "in bind() method" );
628 return *(it->second);
631 agent.so_bind_to_dispatcher( queue );
636 agent_t & agent )
noexcept override
639 this->undo_preallocation( agent );
644
645
646
647
658 const std::string_view name_base,
698 using agent_map_t = std::map< agent_t *, std::unique_ptr<agent_queue_t> >;
721 make( disp_binder_shptr_t binder )
noexcept
723 return { std::move( binder ) };
735 const std::string_view data_sources_name_base,
740 using dispatcher_no_activity_tracking_t =
741 impl::dispatcher_template_t<
742 impl::work_thread_no_activity_tracking_t >;
744 using dispatcher_with_activity_tracking_t =
745 impl::dispatcher_template_t<
746 impl::work_thread_with_activity_tracking_t >;
748 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
750 dispatcher_no_activity_tracking_t,
751 dispatcher_with_activity_tracking_t >(
752 outliving_mutable(env),
753 data_sources_name_base,
756 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
An analog of std::lock_guard for MPSC queue lock.
void notify_one() noexcept
An analog of std::unique_lock for MPSC queue lock.
void wait_for_notify() noexcept
Alias for namespace with traits of event queue.
A handle for nef_one_thread dispatcher.
agent_queue_t(const agent_queue_t &)=delete
agent_queue_t(agent_queue_t &&o)=delete
void push_evt_finish(execution_demand_t demand) noexcept override
Enqueue a demand for evt_finish event.
void push_evt_start(execution_demand_t demand) override
Enqueue a demand for evt_start event.
std::reference_wrapper< demand_queue_t > m_dest_queue
demand_unique_ptr_t m_evt_start_demand
void push(execution_demand_t demand) override
Enqueue new event to the queue.
agent_queue_t & operator=(const agent_queue_t &)=delete
demand_unique_ptr_t m_evt_finish_demand
agent_queue_t(demand_queue_t &dest_queue, demand_unique_ptr_t evt_start_demand, demand_unique_ptr_t evt_finish_demand)
agent_queue_t & operator=(agent_queue_t &&o)=delete
demand_unique_ptr_t remove_head() noexcept
Helper method for deleting queue's head object.
std::size_t size() const
Get the current size of the queue.
~demand_queue_t() noexcept
void push(demand_unique_ptr_t tail_demand)
demand_t * m_head
Head of the queue.
std::atomic< std::size_t > m_size
Current size of the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock)
Initializing constructor.
demand_t * m_tail
Tail of the queue.
void stop()
Set the shutdown signal.
demand_unique_ptr_t pop()
bool m_shutdown
Shutdown flag.
queue_traits::lock_unique_ptr_t m_lock
Queue lock.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
Data source for run-time monitoring of whole dispatcher.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
std::mutex m_agent_map_lock
Lock for agent_map protection.
~dispatcher_template_t() noexcept override
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
Work_Thread m_work_thread
Worker thread for the dispatcher.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
agent_map_t m_agents
Agents for those resources are allocated by the dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
A part of implementation of work thread without activity tracking.
no_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
A part of implementation of work thread with activity tracking.
so_5::stats::work_thread_activity_stats_t take_activity_stats()
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_waiting_stats
Statictics for wait activity.
with_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
A worker thread for nef_one_thread dispatcher.
void call_handler(so_5::execution_demand_t &demand)
demand_queue_t & demand_queue() noexcept
demand_unique_ptr_t pop_demand()
so_5::current_thread_id_t thread_id() const
work_thread_template_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
Initializing constructor.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Base for the case of internal stats lock.
Helper for collecting activity stats.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread_no_activity_tracking_t &)
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of nef_one_thread dispatcher.
Reusable components for dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
A single execution demand.
demand_t * m_next
Next demand in the queue.
demand_t()=default
Default constructor.
execution_demand_t m_execution_demand
Execution demand to be used.
demand_t(execution_demand_t &&source)
Initializing constructor.
A common data for all work thread implementations.
demand_queue_t m_queue
Demands queue to work for.
common_data_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::current_thread_id_t m_thread_id
ID of the work thread.
work_thread_holder_t m_thread_holder
Thread object.
A description of event execution demand.
Stats for a work thread activity.