SObjectizer  5.8
Loading...
Searching...
No Matches
adv_thread_pool/impl/disp.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief An implementation of advanced thread pool dispatcher.
8 *
9 * \since
10 * v.5.4.0
11 */
12
13#pragma once
14
15#include <so_5/spinlocks.hpp>
16#include <so_5/atomic_refcounted.hpp>
17
18#include <so_5/event_queue.hpp>
19
20#include <so_5/stats/impl/activity_tracking.hpp>
21
22#include <so_5/disp/reuse/queue_of_queues.hpp>
23
24#include <so_5/disp/thread_pool/impl/common_implementation.hpp>
25
26#include <so_5/impl/thread_join_stuff.hpp>
27
28#include <forward_list>
29
30#if 0
31 #define SO_5_CHECK_INVARIANT_IMPL(what, data, file, line)
32 if( !(what) ) {
33 std::cerr << file << ":" << line << ": FAILED INVARIANT: " << #what << "; data: " << data << std::endl;
34 std::abort();
35 }
36 #define SO_5_CHECK_INVARIANT(what, data) SO_5_CHECK_INVARIANT_IMPL(what, data, __FILE__, __LINE__)
37#else
38 #define SO_5_CHECK_INVARIANT(what, data)
39#endif
40
41
42namespace so_5
43{
44
45namespace disp
46{
47
48namespace adv_thread_pool
49{
50
51namespace impl
52{
53
54using spinlock_t = so_5::default_spinlock_t;
55
56class agent_queue_t;
57
58namespace stats = so_5::stats;
59namespace tp_stats = so_5::disp::reuse::thread_pool_stats;
60
61//
62// dispatcher_queue_t
63//
64using dispatcher_queue_t = so_5::disp::reuse::queue_of_queues_t< agent_queue_t >;
65
66//
67// agent_queue_t
68//
69/*!
70 * \brief Event queue for the agent (or cooperation).
71 *
72 * \since
73 * v.5.4.0
74 */
75class agent_queue_t final
76 : public event_queue_t
77 , private so_5::atomic_refcounted_t
78 {
79 friend class so_5::intrusive_ptr_t< agent_queue_t >;
80
81 private :
82 //! Actual demand in event queue.
83 struct demand_t
84 {
85 //! Actual demand.
87
88 //! Next item in queue.
90
92 : m_next( nullptr )
93 {}
95 : m_demand( std::move( original ) )
96 , m_next( nullptr )
97 {}
98 };
99
100 public :
101 static constexpr const unsigned int thread_safe_worker = 2;
102 static constexpr const unsigned int not_thread_safe_worker = 1;
103
104 //! Constructor.
106 //! Dispatcher queue to work with.
107 outliving_reference_t< dispatcher_queue_t > disp_queue,
108 //! Dummy argument. It is necessary here because of
109 //! common implementation for thread-pool and
110 //! adv-thread-pool dispatchers.
111 const bind_params_t & )
114 , m_active( false )
115 , m_workers( 0 )
116 {}
117
118 ~agent_queue_t() override
119 {
120 while( m_head_demand.m_next )
121 delete_head();
122 }
123
124 //! Access to the queue's lock.
125 [[nodiscard]]
126 spinlock_t &
127 lock() noexcept
128 {
129 return m_lock;
130 }
131
132 //! Push next demand to queue.
133 void
134 push( execution_demand_t demand ) override
135 {
136 bool need_schedule = false;
137 {
138 // Do memory allocation before spinlock locking.
139 auto new_demand = new demand_t( std::move( demand ) );
140
141 std::lock_guard< spinlock_t > lock( m_lock );
142
143 m_tail_demand->m_next = new_demand;
145
146 ++m_size;
147
148 if( m_head_demand.m_next == m_tail_demand )
149 {
150 // Queue was empty. Need to detect
151 // necessity of queue activation.
152 if( !m_active )
154 {
155 need_schedule = true;
156 m_active = true;
157 }
158 }
159
160 SO_5_CHECK_INVARIANT( !empty(), this )
161 SO_5_CHECK_INVARIANT( m_active || is_there_any_worker(), this )
162 SO_5_CHECK_INVARIANT( !(need_schedule && !m_active), this )
163 }
164
165 if( need_schedule )
166 m_disp_queue.schedule( this );
167 }
168
169 /*!
170 * \note
171 * Delegates the work to the push() method.
172 */
173 void
175 {
176 this->push( std::move(demand) );
177 }
178
179 /*!
180 * \note
181 * Delegates the work to the push() method.
182 *
183 * \attention
184 * Terminates the whole application if the push() throws.
185 */
186 void
187 push_evt_finish( execution_demand_t demand ) noexcept override
188 {
189 this->push( std::move(demand) );
190 }
191
192 //! Get the information about the front demand.
193 /*!
194 * \attention This method must be called only on non-empty queue.
195 */
196 [[nodiscard]]
199 {
200 SO_5_CHECK_INVARIANT( !empty(), this )
201 SO_5_CHECK_INVARIANT( m_active, this )
202
203 m_active = false;
204
205 return m_head_demand.m_next->m_demand;
206 }
207
208 //! Remove the front demand.
209 /*!
210 * \retval true queue must be activated.
211 * \retval false queue must not be activated.
212 */
213 [[nodiscard]]
214 bool
216 //! Type of worker.
217 //! Must be thread_safe_worker or not_thread_safe_worker.
218 unsigned int type_of_worker )
219 {
220 SO_5_CHECK_INVARIANT( !empty(), this );
221 SO_5_CHECK_INVARIANT( !m_active, this );
222
224 if( !m_head_demand.m_next )
225 m_tail_demand = &m_head_demand;
226
227 m_workers += type_of_worker;
228
229 // Queue must be activated only if queue is not empty
230 // and current worker is a thread safe worker.
231 m_active = ( !empty() &&
232 thread_safe_worker == type_of_worker );
233
234 return m_active;
235 }
236
237 //! Signal about finishing of worker of the specified type.
238 /*!
239 * \retval true queue must be activated.
240 * \retval false queue must not be activated.
241 */
242 [[nodiscard]]
243 bool
245 //! Type of worker.
246 //! Must be thread_safe_worker or not_thread_safe_worker.
247 unsigned int type_of_worker )
248 {
249 m_workers -= type_of_worker;
250
251 bool old_active = m_active;
252 if( !m_active )
253 m_active = !empty();
254
255 SO_5_CHECK_INVARIANT( !(m_active && empty()), this )
257 !old_active || m_active, this );
258
259 return old_active != m_active;
260 }
261
262 //! Check the presence of any worker at the moment.
263 [[nodiscard]]
264 bool
266 {
267 return 0 != m_workers;
268 }
269
270 //! Check the presence of thread unsafe worker.
271 [[nodiscard]]
272 bool
274 {
275 return 0 != (m_workers & not_thread_safe_worker );
276 }
277
278 //! Is empty queue?
279 [[nodiscard]]
280 bool
281 empty() const { return nullptr == m_head_demand.m_next; }
282
283 //! Is active queue?
284 [[nodiscard]]
285 bool
286 active() const { return m_active; }
287
288 /*!
289 * \brief Get the current size of the queue.
290 *
291 * \since
292 * v.5.5.4
293 */
294 [[nodiscard]]
295 std::size_t
296 size() const noexcept
297 {
298 return m_size.load( std::memory_order_acquire );
299 }
300
301 /*!
302 * \brief Give away a pointer to the next agent_queue.
303 *
304 * \note
305 * This method is a part of interface required by
306 * so_5::disp::reuse::queue_of_queues_t.
307 *
308 * \since v.5.8.0
309 */
310 [[nodiscard]]
311 agent_queue_t *
313 {
314 auto * r = m_intrusive_queue_next;
315 m_intrusive_queue_next = nullptr;
316 return r;
317 }
318
319 /*!
320 * \brief Set a pointer to the next agent_queue.
321 *
322 * \note
323 * This method is a part of interface required by
324 * so_5::disp::reuse::queue_of_queues_t.
325 *
326 * \since v.5.8.0
327 */
328 void
329 intrusive_queue_set_next( agent_queue_t * next ) noexcept
330 {
332 }
333
334 private :
335 //! Dispatcher queue for scheduling processing of events from
336 //! this queue.
337 dispatcher_queue_t & m_disp_queue;
338
339 //! Object's lock.
340 spinlock_t m_lock;
341
342 //! Head of the demand's queue.
343 /*!
344 * Never contains actual demand. Only m_next field is used.
345 */
347 //! Tail of the demand's queue.
348 /*!
349 * Must point to m_head_demand if queue is empty or to the very
350 * last queue item otherwise.
351 */
353
354 //! Is this queue activated?
355 /*!
356 * Queue is activated if it is scheduled to dispatcher queue.
357 */
359
360 //! Count of active workers.
361 unsigned int m_workers;
362
363 /*!
364 * \brief Current size of the queue.
365 *
366 * \since
367 * v.5.5.4
368 */
369 std::atomic< std::size_t > m_size = { 0 };
370
371 /*!
372 * \brief The next item in intrusive queue of agent_queues.
373 *
374 * This field is necessary to implement interface required by
375 * so_5::disp::reuse::queue_of_queues_t.
376 *
377 * \since v.5.8.0
378 */
379 agent_queue_t * m_intrusive_queue_next{ nullptr };
380
381 //! Helper method for deleting queue's head object.
382 inline void
383 delete_head() noexcept
384 {
385 auto to_be_deleted = m_head_demand.m_next;
386 m_head_demand.m_next = m_head_demand.m_next->m_next;
387
388 --m_size;
389
390 delete to_be_deleted;
391 }
392 };
393
394//
395// agent_queue_ref_t
396//
397/*!
398 * \brief A typedef of smart pointer for agent_queue.
399 *
400 * \since
401 * v.5.4.0
402 */
403using agent_queue_ref_t = so_5::intrusive_ptr_t< agent_queue_t >;
404
406
407/*!
408 * \brief Main data for work_thread.
409 *
410 * \since
411 * v.5.5.18
412 */
414 {
415 //! Dispatcher's queue.
416 dispatcher_queue_t * m_disp_queue;
417
418 //! ID of thread.
419 /*!
420 * Receives actual value inside body().
421 */
423
424 //! Actual thread.
426
427 //! Waiting object for long wait.
429
431 outliving_reference_t< dispatcher_queue_t > queue,
432 work_thread_holder_t thread_holder )
436 {}
437 };
438
439/*!
440 * \brief Part of implementation of work thread without activity tracing.
441 *
442 * \since
443 * v.5.5.18
444 */
446 {
447 public :
448 //! Initializing constructor.
450 outliving_reference_t< dispatcher_queue_t > queue,
451 work_thread_holder_t thread_holder )
453 {}
454
455 template< typename L >
456 void
457 take_activity_stats( L ) { /* Nothing to do */ }
458
459 protected :
460 void
462
463 void
465
466 void
468
469 void
471 };
472
473/*!
474 * \brief Part of implementation of work thread with activity tracing.
475 *
476 * \since
477 * v.5.5.18
478 */
480 {
481 using activity_tracking_traits = so_5::stats::activity_tracking_stuff::traits;
482
483 public :
484 //! Initializing constructor.
486 outliving_reference_t< dispatcher_queue_t > queue,
487 work_thread_holder_t thread_holder )
489 {}
490
491 template< typename L >
492 void
502
503 protected :
504 //! Lock for activity statistics.
505 activity_tracking_traits::lock_t m_stats_lock;
506
507 //! A collector for work activity.
511
512 //! A collector for waiting stats.
516
517 void
519 {
520 m_work_activity_collector.start();
521 }
522
523 void
525 {
526 m_work_activity_collector.stop();
527 }
528
529 void
531 {
532 m_waiting_stats_collector.start();
533 }
534
535 void
537 {
538 m_waiting_stats_collector.stop();
539 }
540 };
541
542//
543// work_thread_template_t
544//
545/*!
546 * \brief Implementation of work_thread in form of template class.
547 *
548 * \tparam Impl no_activity_tracking_impl_t or with_activity_tracking_impl_t.
549 *
550 * \since
551 * v.5.5.18
552 */
553template< typename Impl >
554class work_thread_template_t final : public Impl
555 {
556 public :
557 //! Initializing constructor.
559 outliving_reference_t< dispatcher_queue_t > queue,
560 work_thread_holder_t thread_holder )
561 : Impl( queue, std::move(thread_holder) )
562 {}
563
564 void
566 {
567 so_5::impl::ensure_join_from_different_thread( this->m_thread_id );
568 this->m_thread_holder.unchecked_get().join();
569 }
570
571 //! Launch work thread.
572 void
574 {
575 this->m_thread_holder.unchecked_get().start( [this]() { body(); } );
576 }
577
578 /*!
579 * \brief Get ID of work thread.
580 *
581 * \note This method returns correct value only after start
582 * of the thread.
583 *
584 * \since
585 * v.5.5.18
586 */
588 thread_id() const
589 {
590 return this->m_thread_id;
591 }
592
593 private :
594 //! Thread body method.
595 void
597 {
598 this->m_thread_id = so_5::query_current_thread_id();
599
600 agent_queue_t * agent_queue;
601 while( nullptr != (agent_queue = this->pop_agent_queue()) )
602 {
603 // This guard is necessary to ensure that queue
604 // will exist until processing of queue finished.
605 agent_queue_ref_t agent_queue_guard( agent_queue );
606
607 process_queue( *agent_queue );
608 }
609 }
610
611 /*!
612 * \since
613 * v.5.5.18
614 *
615 * \brief An attempt of extraction of non-empty agent queue.
616 *
617 * \note This is noexcept method because its logic can't survive
618 * an exception from m_disp_queue->pop.
619 */
620 agent_queue_t *
622 {
623 agent_queue_t * result = nullptr;
624
625 this->wait_started();
626
627 result = this->m_disp_queue->pop( *(this->m_condition) );
628
629 this->wait_finished();
630
631 return result;
632 }
633
634 //! Processing of demands from agent queue.
635 void
636 process_queue( agent_queue_t & queue )
637 {
638 std::unique_lock< spinlock_t > lock( queue.lock() );
639
640 auto demand = queue.peek_front();
641 if( queue.is_there_not_thread_safe_worker() )
642 // We can't process any demand until thread unsafe
643 // worker is working.
644 return;
645
646 auto hint = demand.m_receiver->so_create_execution_hint( demand );
647
648 bool need_schedule = true;
649 if( !hint.is_thread_safe() )
650 {
651 if( queue.is_there_any_worker() )
652 // We can't process not thread safe demand until
653 // there are some other workers.
654 return;
655 else
656 need_schedule = queue.worker_started(
657 agent_queue_t::not_thread_safe_worker );
658 }
659 else
660 // Threa-safe worker can be started.
661 need_schedule = queue.worker_started(
662 agent_queue_t::thread_safe_worker );
663
664 SO_5_CHECK_INVARIANT( !(need_schedule && queue.empty()), &queue )
666 !need_schedule || hint.is_thread_safe(), &queue );
667 SO_5_CHECK_INVARIANT( !need_schedule || queue.active(), &queue );
668
669 // Next few actions must be done on unlocked queue.
670 lock.unlock();
671
672 if( need_schedule )
673 this->m_disp_queue->schedule( &queue );
674
675 // For activity tracking if it is turned on.
676 this->work_started();
677
678 // Processing of event.
679 hint.exec( this->m_thread_id );
680
681 this->work_finished();
682
683 // Next actions must be done on locked queue.
684 lock.lock();
685
686 need_schedule = queue.worker_finished(
687 hint.is_thread_safe() ?
688 agent_queue_t::thread_safe_worker :
689 agent_queue_t::not_thread_safe_worker );
690
692 !need_schedule || queue.active(), &queue );
693
694 lock.unlock();
695
696 if( need_schedule )
697 this->m_disp_queue->schedule( &queue );
698 }
699 };
700
701} /* namespace work_thread_details */
702
703//
704// work_thread_no_activity_tracking_t
705//
706/*!
707 * \brief Type of work thread without activity tracking.
708 *
709 * \since
710 * v.5.5.18
711 */
712using work_thread_no_activity_tracking_t =
713 work_thread_details::work_thread_template_t<
715
716//
717// work_thread_with_activity_tracking_t
718//
719/*!
720 * \brief Type of work thread without activity tracking.
721 *
722 * \since
723 * v.5.5.18
724 */
725using work_thread_with_activity_tracking_t =
726 work_thread_details::work_thread_template_t<
728
729//
730// adaptation_t
731//
732/*!
733 * \brief Adaptation of common implementation of thread-pool-like dispatcher
734 * to the specific of this thread-pool dispatcher.
735 *
736 * \since
737 * v.5.5.4
738 */
740 {
741 [[nodiscard]]
742 static constexpr std::string_view
744 {
745 return { "atp" }; // adv_thread_pool.
746 }
747
748 [[nodiscard]]
749 static bool
750 is_individual_fifo( const bind_params_t & params ) noexcept
751 {
752 return fifo_t::individual == params.query_fifo();
753 }
754
755 static void
756 wait_for_queue_emptyness( agent_queue_t & /*queue*/ ) noexcept
757 {
758 // This type of agent_queue doesn't require waiting for emptyness.
759 }
760 };
761
762//
763// dispatcher_template_t
764//
765/*!
766 * \brief Template for dispatcher.
767 *
768 * This template depends on work_thread type (with or without activity
769 * tracking).
770 *
771 * \since
772 * v.5.5.18
773 */
774template< typename Work_Thread >
775using dispatcher_template_t =
777 Work_Thread,
778 dispatcher_queue_t,
780 adaptation_t >;
781
782} /* namespace impl */
783
784} /* namespace adv_thread_pool */
785
786} /* namespace disp */
787
788} /* namespace so_5 */
#define SO_5_CHECK_INVARIANT(what, data)
A base class for agents.
Definition agent.hpp:673
The base class for the object with a reference counting.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
Container for storing parameters for MPMC queue.
Multi-producer/Multi-consumer queue of pointers to event queues.
Mixin that holds optional work thread factory.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
#define SO_5_FUNC
Definition declspec.hpp:48
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Thread pool dispatcher.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t &params) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
A description of event execution demand.
Various traits of activity tracking implementation.