2
3
6
7
8
9
10
11
15#include <so_5/spinlocks.hpp>
16#include <so_5/atomic_refcounted.hpp>
18#include <so_5/event_queue.hpp>
20#include <so_5/stats/impl/activity_tracking.hpp>
22#include <so_5/disp/reuse/queue_of_queues.hpp>
24#include <so_5/disp/thread_pool/impl/common_implementation.hpp>
26#include <so_5/impl/thread_join_stuff.hpp>
28#include <forward_list>
31 #define SO_5_CHECK_INVARIANT_IMPL(what, data, file, line)
33 std::cerr << file << ":" << line << ": FAILED INVARIANT: " << #what << "; data: " << data << std::endl;
36 #define SO_5_CHECK_INVARIANT(what, data) SO_5_CHECK_INVARIANT_IMPL(what, data, __FILE__, __LINE__)
38 #define SO_5_CHECK_INVARIANT(what, data)
54using spinlock_t =
so_5::default_spinlock_t;
70
71
72
73
74
75class agent_queue_t
final
120 while( m_head_demand.m_next )
136 bool need_schedule =
false;
139 auto new_demand =
new demand_t( std::move( demand ) );
141 std::lock_guard< spinlock_t > lock( m_lock );
143 m_tail_demand->m_next = new_demand;
148 if( m_head_demand.m_next == m_tail_demand )
155 need_schedule =
true;
166 m_disp_queue.schedule(
this );
170
171
172
176 this->push( std::move(demand) );
180
181
182
183
184
185
189 this->push( std::move(demand) );
194
195
205 return m_head_demand.m_next->m_demand;
210
211
212
218 unsigned int type_of_worker )
224 if( !m_head_demand.m_next )
225 m_tail_demand = &m_head_demand;
239
240
241
247 unsigned int type_of_worker )
257 !old_active || m_active,
this );
281 empty()
const {
return nullptr == m_head_demand.m_next; }
289
290
291
292
293
298 return m_size.load( std::memory_order_acquire );
302
303
304
305
306
307
308
309
320
321
322
323
324
325
326
327
344
345
349
350
351
356
357
364
365
366
367
368
372
373
374
375
376
377
378
385 auto to_be_deleted = m_head_demand.m_next;
386 m_head_demand.m_next = m_head_demand.m_next->m_next;
390 delete to_be_deleted;
398
399
400
401
402
408
409
410
411
412
440
441
442
443
444
455 template<
typename L >
474
475
476
477
478
491 template<
typename L >
520 m_work_activity_collector.start();
526 m_work_activity_collector.stop();
532 m_waiting_stats_collector.start();
538 m_waiting_stats_collector.stop();
546
547
548
549
550
551
552
553template<
typename Impl >
554class work_thread_template_t
final :
public Impl
561 : Impl( queue, std::move(thread_holder) )
567 so_5::impl::ensure_join_from_different_thread(
this->m_thread_id );
568 this->m_thread_holder.unchecked_get().join();
575 this->m_thread_holder.unchecked_get().start( [
this]() { body(); } );
579
580
581
582
583
584
585
586
590 return this->m_thread_id;
598 this->m_thread_id = so_5::query_current_thread_id();
600 agent_queue_t * agent_queue;
601 while(
nullptr != (agent_queue =
this->pop_agent_queue()) )
605 agent_queue_ref_t agent_queue_guard( agent_queue );
607 process_queue( *agent_queue );
612
613
614
615
616
617
618
619
623 agent_queue_t * result =
nullptr;
625 this->wait_started();
627 result =
this->m_disp_queue->pop( *(
this->m_condition) );
629 this->wait_finished();
638 std::unique_lock< spinlock_t > lock( queue.lock() );
640 auto demand = queue.peek_front();
641 if( queue.is_there_not_thread_safe_worker() )
646 auto hint = demand.m_receiver->so_create_execution_hint( demand );
648 bool need_schedule =
true;
649 if( !hint.is_thread_safe() )
651 if( queue.is_there_any_worker() )
656 need_schedule = queue.worker_started(
657 agent_queue_t::not_thread_safe_worker );
661 need_schedule = queue.worker_started(
662 agent_queue_t::thread_safe_worker );
666 !need_schedule || hint.is_thread_safe(), &queue );
673 this->m_disp_queue->schedule( &queue );
676 this->work_started();
679 hint.exec(
this->m_thread_id );
681 this->work_finished();
686 need_schedule = queue.worker_finished(
687 hint.is_thread_safe() ?
688 agent_queue_t::thread_safe_worker :
689 agent_queue_t::not_thread_safe_worker );
692 !need_schedule || queue.active(), &queue );
697 this->m_disp_queue->schedule( &queue );
707
708
709
710
711
712using work_thread_no_activity_tracking_t =
720
721
722
723
724
725using work_thread_with_activity_tracking_t =
733
734
735
736
737
738
766
767
768
769
770
771
772
773
774template<
typename Work_Thread >
775using dispatcher_template_t =
#define SO_5_CHECK_INVARIANT(what, data)
The base class for the object with a reference counting.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
fifo_t query_fifo() const
Get FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t()
Default constructor.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t() noexcept=default
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
An actual interface of thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms) override
Preallocate all necessary resources for a new agent.
~actual_dispatcher_implementation_t() noexcept override
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
bool empty() const
Is empty queue?
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
static constexpr const unsigned int thread_safe_worker
spinlock_t m_lock
Object's lock.
bool active() const
Is active queue?
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
unsigned int m_workers
Count of active workers.
demand_t * m_tail_demand
Tail of the demand's queue.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
~agent_queue_t() override
static constexpr const unsigned int not_thread_safe_worker
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
bool m_active
Is this queue activated?
demand_t m_head_demand
Head of the demand's queue.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
dispatcher_queue_t & m_disp_queue
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual ~basic_dispatcher_iface_t() noexcept=default
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Part of implementation of work thread without activity tracing.
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
void take_activity_stats(L)
Part of implementation of work thread with activity tracing.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
activity_tracking_traits::lock_t m_stats_lock
Lock for activity statistics.
void take_activity_stats(L lambda)
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
void start()
Launch work thread.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
void body()
Thread body method.
Container for storing parameters for MPMC queue.
Multi-producer/Multi-consumer queue of pointers to event queues.
Mixin with thread activity tracking flag.
Mixin that holds optional work thread factory.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
void adjust_thread_count(disp_params_t ¶ms)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t ¶ms) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
Actual demand in event queue.
demand_t * m_next
Next item in queue.
execution_demand_t m_demand
Actual demand.
demand_t(execution_demand_t &&original)
Main data for work_thread.
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
work_thread_holder_t m_thread_holder
Actual thread.
so_5::current_thread_id_t m_thread_id
ID of thread.
dispatcher_queue_t * m_disp_queue
Dispatcher's queue.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
A description of event execution demand.
Various traits of activity tracking implementation.