2
3
6
7
8
9
10
11
15#include <so_5/environment_infrastructure.hpp>
17#include <so_5/impl/coop_repository_basis.hpp>
18#include <so_5/impl/mbox_iface_for_timers.hpp>
20#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
22#include <so_5/stats/impl/activity_tracking.hpp>
23#include <so_5/stats/impl/st_env_stuff.hpp>
24#include <so_5/stats/controller.hpp>
25#include <so_5/stats/repository.hpp>
26#include <so_5/stats/prefix.hpp>
27#include <so_5/stats/messages.hpp>
28#include <so_5/stats/std_names.hpp>
30#include <so_5/send_functions.hpp>
31#include <so_5/env_infrastructures.hpp>
33#include <so_5/timers.hpp>
47
48
49
50
67
68
69
70
71
72struct fake_activity_tracker_t
final
86
87
88
89
90
91
92
93
94
95
96class real_activity_tracker_t
final
119 result.m_working_stats = m_working.take_stats();
120 result.m_waiting_stats = m_waiting.take_stats();
131 fake_activity_tracker_t & )
138 const so_5::mbox_t & mbox,
140 const current_thread_id_t & thread_id,
141 real_activity_tracker_t & activity_tracker )
143 so_5::send< stats::messages::work_thread_activity >(
146 stats::suffixes::work_thread_activity(),
148 activity_tracker.take_activity_stats() );
155
156
157
158
159
160class coop_repo_t
final
169 coop_listener_unique_ptr_t coop_listener )
180 std::lock_guard< std::mutex > l{ m_lock };
181 return 0u != m_registrations_in_progress ||
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204template<
typename Event_Queue_Type >
249 agent_t & agent )
noexcept override
274
275
276
277
278
279
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
318 typename Event_Queue_Type,
319 typename Activity_Tracker,
320 typename Data_Source_Name_Parts >
347
348
349
350
351 class disp_data_source_t
final :
public stats::
source_t
362 : m_dispatcher{ disp }
365 using namespace so_5::disp::reuse;
367 m_base_prefix = make_disp_prefix(
368 Data_Source_Name_Parts::disp_type_part(),
370 &m_dispatcher.get() );
376 so_5::send< stats::messages::quantity< std::size_t > >(
379 stats::suffixes::agent_count(),
380 m_dispatcher.get().agents_bound() );
382 const auto evt_queue_stats =
383 m_dispatcher.get().event_queue().query_stats();
384 so_5::send< stats::messages::quantity< std::size_t > >(
387 stats::suffixes::work_thread_queue_size(),
388 evt_queue_stats.m_demands_count );
390 send_thread_activity_stats(
393 m_dispatcher.get().thread_id(),
394 m_dispatcher.get().activity_tracker() );
410
411
412
413
414class actual_elapsed_timers_collector_t
final
425 std::type_index type_index,
427 message_ref_t message )
435 using demands_container_t = std::deque< demand_t >;
443 std::type_index type_index,
445 message_ref_t msg )
override
447 m_demands.emplace_back(
448 std::move(type_index),
454
455
459 return m_demands.empty();
463
464
468 for(
auto & d : m_demands )
470 so_5::impl::mbox_iface_for_timers_t{ d.m_mbox }
471 .deliver_message_from_timer( d.m_type_index, d.m_message );
475 if( m_demands.size() < 1000 )
481 demands_container_t demands;
482 swap( demands, m_demands );
491
492
493
494
495
496
497
498
499
500
501
502class direct_delivery_elapsed_timers_collector_t
final
508 std::type_index type_index,
510 message_ref_t msg )
override
512 so_5::impl::mbox_iface_for_timers_t{ mbox }
513 .deliver_message_from_timer( type_index, msg );
521
522
523
524
525
526
527
528
529
530template<
typename Lock_Holder >
531class stats_controller_t
final
535 ,
protected Lock_Holder
541 mbox_t distribution_mbox,
543 mbox_t next_turn_mbox )
544 : m_distribution_mbox( std::move(distribution_mbox) )
545 , m_next_turn_mbox( std::move(next_turn_mbox) )
549 virtual const mbox_t &
552 return m_distribution_mbox;
558 this->lock_and_perform( [&]{
559 if( status_t::off == m_status )
562 const auto run_id = m_run_id + 1;
564 send_next_message( very_small_timeout(), run_id );
567 m_status = status_t::on;
576 this->lock_and_perform( [&] {
577 m_status = status_t::off;
585 return this->lock_and_perform( [&] {
586 auto ret_value = m_distribution_period;
588 m_distribution_period = period;
598 this->lock_and_perform( [&] {
599 source_list_add( what, m_head, m_tail );
606 this->lock_and_perform( [&] {
607 source_list_remove( what, m_head, m_tail );
615 this->lock_and_perform( [&] {
616 if( status_t::on == m_status && run_id == m_run_id )
618 const auto actual_duration = distribute_current_data();
620 if( actual_duration < m_distribution_period )
623 m_distribution_period - actual_duration,
628 send_next_message( very_small_timeout(), m_run_id );
648
649
657
658
662
663
664
671 default_distribution_period() };
673
674
679 return std::chrono::milliseconds{1};
686 auto started_at = std::chrono::steady_clock::now();
688 send< so_5::stats::messages::distribution_started >(
689 m_distribution_mbox );
694 s->distribute( m_distribution_mbox );
696 s = source_list_next( *s );
699 send< so_5::stats::messages::distribution_finished >(
700 m_distribution_mbox );
702 return std::chrono::steady_clock::now() - started_at;
708 std::chrono::steady_clock::duration pause,
711 send_delayed< next_turn >(
714 outliving_mutable( *
this ),
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
Interface for dispatcher binders.
Default implementation of multithreaded environment infrastructure.
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
stats::repository_t & stats_repository() noexcept override
Get stats repository for the environment.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
Initiate a delayed message.
reusable::actual_elapsed_timers_collector_t m_timers_collector
A collector for elapsed timers.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Initiate a timer (delayed or periodic message).
shutdown_status_t m_shutdown_status
Status of shutdown procedure.
void launch(env_init_t init_fn) override
Do actual launch of SObjectizer's Environment.
coop_handle_t register_coop(coop_unique_holder_t coop) override
Register new cooperation.
void stop() noexcept override
Initiate a signal for shutdown of Environment.
so_5::impl::final_dereg_chain_holder_t m_final_dereg_chain
The chain of coops for the final deregistration.
void perform_shutdown_related_actions_if_needed(std::unique_lock< std::mutex > &acquired_lock) noexcept
main_thread_sync_objects_t m_sync_objects
All sync objects to be shared between different parts.
void run_user_supplied_init_and_do_main_loop(env_init_t init_fn)
void try_handle_next_demand(std::unique_lock< std::mutex > &acquired_lock) noexcept
timer_thread_stats_t query_timer_thread_stats() override
Query run-time statistics for timer (thread or manager).
void ready_to_deregister_notify(coop_shptr_t coop) noexcept override
void process_final_deregs_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
Query run-time statistics for cooperation repository.
void run_main_loop() noexcept
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
Create an instance of a new coop.
void run_default_dispatcher_and_go_further(env_init_t init_fn)
bool final_deregister_coop(coop_shptr_t coop) noexcept override
Do final actions of the cooperation deregistration.
timer_manager_unique_ptr_t m_timer_manager
A timer manager to be used.
stats_controller_t m_stats_controller
Stats controller for this environment.
event_queue_impl_t m_event_queue
Queue for execution_demands which must be handled on the main thread.
env_infrastructure_t(environment_t &env, timer_manager_factory_t timer_factory, error_logger_shptr_t error_logger, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
disp_binder_shptr_t make_default_disp_binder() override
Create a binder for the default dispatcher.
void handle_expired_timers_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
coop_repo_t m_coop_repo
Repository of registered coops.
stats::controller_t & stats_controller() noexcept override
Get stats controller for the environment.
Activity_Tracker m_activity_tracker
Actual activity tracker for main working thread.
pop_result_t
Type for result of extraction operation.
void push(execution_demand_t demand) override
event_queue_impl_t(main_thread_sync_objects_t &sync_objects)
void push_evt_start(execution_demand_t demand) override
pop_result_t pop(execution_demand_t &receiver) noexcept
stats_t query_stats() const
std::deque< execution_demand_t > m_demands
main_thread_sync_objects_t & m_sync_objects
void push_evt_finish(execution_demand_t demand) noexcept override
bool empty() const noexcept
demands_container_t m_demands
Collected demands.
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
bool has_live_coop()
Is there any live coop?
coop_repo_t(outliving_reference_t< environment_t > env, coop_listener_unique_ptr_t coop_listener)
Initializing constructor.
A basic part of implementation of dispatcher to be used in places where default dispatcher is needed.
std::atomic< std::size_t > m_agents_bound
Counter of agents bound to that dispatcher.
outliving_reference_t< Event_Queue_Type > m_event_queue
Event queue for that dispatcher.
default_dispatcher_basis_t(outliving_reference_t< Event_Queue_Type > event_queue)
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
Event_Queue_Type & event_queue() const noexcept
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
std::size_t agents_bound() const noexcept
void handle_demand(execution_demand_t &demand)
current_thread_id_t thread_id() const noexcept
current_thread_id_t m_thread_id
ID of the main thread.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
outliving_reference_t< default_dispatcher_t > m_dispatcher
Dispatcher to work with.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(outliving_reference_t< default_dispatcher_t > disp)
An implementation of dispatcher to be used in places where default dispatcher is needed.
outliving_reference_t< Activity_Tracker > m_activity_tracker
Activity tracker.
Activity_Tracker & activity_tracker() noexcept
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for speading run-time stats.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< Event_Queue_Type > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
stats::work_thread_activity_stats_t take_activity_stats()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_waiting
void wait_start_if_not_started()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_working
static std::chrono::steady_clock::duration very_small_timeout()
const mbox_t m_next_turn_mbox
Mbox for delayed messages for initiation of next turn.
const mbox_t m_distribution_mbox
Mbox for sending messages with run-time statistics.
status_t m_status
Current status of stats_controller.
virtual void add(stats::source_t &what) override
Registration of new data source.
virtual void turn_on() override
Turn the monitoring on.
virtual std::chrono::steady_clock::duration set_distribution_period(std::chrono::steady_clock::duration period) override
Set distribution period.
std::mutex m_lock
Object's lock.
status_t
Status of stats_controller.
void send_next_message(std::chrono::steady_clock::duration pause, const int run_id)
Helper method for sending next instance of next_turn message.
virtual const mbox_t & mbox() const override
Get the mbox for receiving monitoring information.
stats_controller_t(mbox_t distribution_mbox, mbox_t next_turn_mbox)
Initializing constructor.
virtual void on_next_turn(int run_id) override
std::chrono::steady_clock::duration distribute_current_data()
Actual distribution of the current statistics.
virtual void turn_off() override
Turn the monitoring off.
int m_run_id
ID of stats distribution.
virtual void remove(stats::source_t &what) noexcept override
Deregistration of previously registered data source.
std::chrono::steady_clock::duration m_distribution_period
stats::source_t * m_tail
Tail of data sources list.
stats::source_t * m_head
Head of data sources list.
An interface for environment_infrastructure entity.
An interface of event queue for agent.
A basic part for various implementations of coop_repository.
Helper class for holding the current chain of coops for the final deregistration.
Helper class for indication of long-lived reference via its type.
Helper for collecting activity stats.
A holder for data-souce that should be automatically registered and deregistered in registry.
A public interface for control SObjectizer monitoring options.
An interface for initiation of next turn in stats distribution.
A type for storing prefix of data_source name.
An interface of data sources repository.
An interface of data source.
An indentificator for the timer.
An interface for collector of elapsed timers.
auto unlock_do_and_lock_again(std::unique_lock< std::mutex > &acquired_lock, Action &&action) -> decltype(action())
main_thread_status_t
A short name for namespace with run-time stats stuff.
void wakeup_if_waiting(main_thread_sync_objects_t &sync_objects)
Simple single-threaded environment infrastructure with thread safety.
SO_5_FUNC environment_infrastructure_factory_t factory(params_t &¶ms)
A factory for creation of simple thread-safe single-thread environment infrastructure object.
Various reusable stuff which can be used in implementation of single-threaded environment infrastruct...
shutdown_status_t
A short name for namespace with run-time stats stuff.
@ must_be_started
Shutdown must be started as soon as possible.
@ completed
Shutdown completed and work of environment must be finished.
@ not_started
Shutdown is not started yet.
@ in_progress
Shutdown is initiated but not finished yet.
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, const current_thread_id_t &thread_id, real_activity_tracker_t &activity_tracker)
void send_thread_activity_stats(const mbox_t &, const stats::prefix_t &, const current_thread_id_t &, fake_activity_tracker_t &)
Various implementations of environment_infrastructure.
Details of SObjectizer run-time implementations.
Internal implementation of run-time monitoring and statistics related stuff.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
A special class for generation of names for dispatcher data sources.
static constexpr const char * disp_type_part() noexcept
Type for representation of statistical data for this event queue.
std::size_t m_demands_count
The current size of the demands queue.
A bunch of sync objects which need to be shared between various parts of env_infrastructure.
main_thread_status_t m_status
The current status of the main thread.
std::condition_variable m_wakeup_condition
A condition to sleep on when no activities to handle.
std::mutex m_lock
Main lock for environment infrastructure.
Type of demand created from elapsed timer.
demand_t(std::type_index type_index, mbox_t mbox, message_ref_t message)
std::type_index m_type_index
void wait_start_if_not_started()
Statistical data for run-time monitoring of coop repository content.
A description of event execution demand.
A special class for cases where lock is not needed at all.
Stats for a work thread activity.
Statistics for run-time monitoring.