SObjectizer  5.8
Loading...
Searching...
No Matches
nef_one_thread/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief Functions for creating and binding of the single thread dispatcher
8 * that provides noexcept guarantee for scheduling evt_finish demand.
9 *
10 * \since v.5.5.8
11 */
12
13#include <so_5/disp/nef_one_thread/pub.hpp>
14
15#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
16#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
17#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
18
19#include <so_5/impl/thread_join_stuff.hpp>
20
21#include <so_5/stats/repository.hpp>
22#include <so_5/stats/messages.hpp>
23#include <so_5/stats/std_names.hpp>
24
25#include <so_5/send_functions.hpp>
26
27namespace so_5 {
28
29namespace disp {
30
31namespace nef_one_thread {
32
33namespace impl {
34
35namespace queue_traits = so_5::disp::mpsc_queue_traits;
36
37namespace stats = so_5::stats;
38
39//
40// demand_t
41//
42/*!
43 * \brief A single execution demand.
44 *
45 * \since v.5.8.0
46 */
48 {
49 //! Execution demand to be used.
50 /*!
51 * \note
52 * It may be empty (if the default constructor was used for
53 * preallocated evt_start/evt_finish demands).
54 */
56
57 //! Next demand in the queue.
58 /*!
59 * \note
60 * It's a dynamically allocated object that has to be deallocated
61 * manually during the destruction of the queue.
62 */
63 demand_t * m_next = nullptr;
64
65 /*!
66 * \brief Default constructor.
67 *
68 * It's necessary for preallocation of evt_start and evt_finish
69 * demands.
70 */
71 demand_t() = default;
72
73 //! Initializing constructor.
77 };
78
79//
80// demand_unique_ptr_t
81//
82/*!
83 * \brief An alias for unique_ptr to demand.
84 *
85 * \since v.5.8.0
86 */
88
89//
90// demand_queue_t
91//
93 {
94 //! Queue lock.
96
97 //! Shutdown flag.
98 bool m_shutdown = false;
99
100 //! Head of the queue.
101 /*! Null if queue is empty. */
102 demand_t * m_head = nullptr;
103 //! Tail of the queue.
104 /*! Null if queue is empty. */
105 demand_t * m_tail = nullptr;
106
107 //! Current size of the queue.
108 std::atomic< std::size_t > m_size = { 0 };
109
110 public:
111 //! Initializing constructor.
113 //! Lock to be used for queue protection.
114 queue_traits::lock_unique_ptr_t lock )
115 : m_lock{ std::move(lock) }
116 {}
117
118 ~demand_queue_t() noexcept
119 {
120 while( m_head )
121 {
122 remove_head();
123 }
124 }
125
126 //! Set the shutdown signal.
127 void
129 {
130 queue_traits::lock_guard_t lock{ *m_lock };
131
132 m_shutdown = true;
133
134 // If the queue is empty then someone can wait a notification.
135 if( !m_head )
136 {
137 lock.notify_one();
138 }
139 }
140
141 void
142 push( demand_unique_ptr_t tail_demand )
143 {
144 queue_traits::lock_guard_t lock{ *m_lock };
145
146 ++m_size;
147
148 if( nullptr == m_head )
149 {
150 m_head = tail_demand.release();
151 m_tail = m_head;
152
153 // Someone might wait for the first demand.
154 lock.notify_one();
155 }
156 else
157 {
158 m_tail->m_next = tail_demand.release();
160 }
161 }
162
163 // Returns nullptr if shutdown flag is set.
166 {
167 demand_unique_ptr_t result;
168
169 queue_traits::unique_lock_t lock{ *m_lock };
170
171 while( !m_shutdown && (nullptr == m_head) )
172 {
174 }
175
176 if( !m_shutdown )
177 {
178 // Assume that m_head != nullptr.
179 result = remove_head();
180 }
181
182 return result;
183 }
184
185 //! Get the current size of the queue.
186 std::size_t
187 size() const
188 {
189 return m_size.load( std::memory_order_acquire );
190 }
191
192 private:
193 //! Helper method for deleting queue's head object.
195 remove_head() noexcept
196 {
197 demand_unique_ptr_t to_be_deleted{ m_head };
199 to_be_deleted->m_next = nullptr;
200
201 --m_size;
202
203 return to_be_deleted;
204 }
205 };
206
207//
208// agent_queue_t
209//
210/*!
211 *
212 * \note
213 * This class is Moveable, but not Copyable.
214 */
215class agent_queue_t final : public event_queue_t
216 {
218
221
222 public:
224 demand_queue_t & dest_queue,
225 demand_unique_ptr_t evt_start_demand,
226 demand_unique_ptr_t evt_finish_demand )
230 {}
231
232 agent_queue_t( const agent_queue_t & ) = delete;
233 agent_queue_t &
234 operator=( const agent_queue_t & ) = delete;
235
236 agent_queue_t( agent_queue_t && o ) = delete;
237 agent_queue_t &
238 operator=( agent_queue_t && o ) = delete;
239
240 void
241 push( execution_demand_t demand ) override
242 {
243 m_dest_queue.get().push(
244 std::make_unique< demand_t >( std::move(demand) ) );
245 }
246
247 void
249 {
250 // ATTENTION: assume that m_evt_start_demand is valid.
251 // It's UB if that isn't the true.
252 m_evt_start_demand->m_execution_demand = std::move(demand);
253 m_dest_queue.get().push( std::move(m_evt_start_demand) );
254 }
255
256 void
257 push_evt_finish( execution_demand_t demand ) noexcept override
258 {
259 // ATTENTION: assume that m_evt_start_demand is valid.
260 // It's UB if that isn't the true.
261 //
262 // Don't expect exceptions here.
263 m_evt_finish_demand->m_execution_demand = std::move(demand);
264 m_dest_queue.get().push( std::move(m_evt_finish_demand) );
265 }
266 };
267
269
270//
271// common_data_t
272//
273/*!
274 * \brief A common data for all work thread implementations.
275 *
276 * \since v.5.8.0
277 */
279 {
280 //! Demands queue to work for.
282
283 //! Thread object.
285
286 //! ID of the work thread.
287 /*!
288 * \note Receives actual value only after successful start
289 * of the thread.
290 */
292
293 //! Initializing constructor.
295 //! Lock to be used for queue protection.
296 queue_traits::lock_unique_ptr_t lock,
297 //! Worker thread to be used.
298 work_thread_holder_t thread_holder )
299 : m_queue{ std::move(lock) }
301 {}
302 };
303
304//
305// no_activity_tracking_impl_t
306//
307/*!
308 * \brief A part of implementation of work thread without activity tracking.
309 *
310 * \since v.5.8.0
311 */
313 {
314 public :
316 //! Lock to be used for queue protection.
317 queue_traits::lock_unique_ptr_t lock,
318 //! Worker thread to be used.
319 work_thread_holder_t thread_holder )
321 std::move(lock),
323 }
324 {}
325
326 protected :
327 void
328 work_started() { /* Nothing to do. */ }
329
330 void
331 work_finished() { /* Nothing to do. */ }
332
333 void
334 wait_started() { /* Nothing to do. */ }
335
336 void
337 wait_finished() { /* Nothing to do. */ }
338 };
339
340//
341// with_activity_tracking_impl_t
342//
343/*!
344 * \brief A part of implementation of work thread with activity tracking.
345 *
346 * \since v.5.5.18
347 */
349 {
350 public :
352 //! Lock to be used for queue protection.
353 queue_traits::lock_unique_ptr_t lock,
354 //! Worker thread to be used.
355 work_thread_holder_t thread_holder )
357 std::move(lock),
359 }
360 {}
361
364 {
366
367 result.m_working_stats = m_working_stats.take_stats();
368 result.m_waiting_stats = m_waiting_stats.take_stats();
369
370 return result;
371 }
372
373 protected :
374 //! Statictics for work activity.
378
379 //! Statictics for wait activity.
383
384 void
385 work_started() { m_working_stats.start(); }
386
387 void
388 work_finished() { m_working_stats.stop(); }
389
390 void
391 wait_started() { m_waiting_stats.start(); }
392
393 void
394 wait_finished() { m_waiting_stats.stop(); }
395 };
396
397} /* namespace work_thread_details */
398
399//
400// work_thread_template_t
401//
402/*!
403 * \brief A worker thread for nef_one_thread dispatcher.
404 *
405 * \since v.5.8.0
406 */
407template< typename Work_Thread >
408class work_thread_template_t : public Work_Thread
409 {
410 using base_type_t = Work_Thread;
411
412 public :
413 //! Initializing constructor.
415 //! Lock to be used for queue protection.
416 queue_traits::lock_unique_ptr_t lock,
417 //! Worker thread to be used.
418 work_thread_holder_t thread_holder )
419 : base_type_t{
420 std::move(lock),
422 }
423 {}
424
425 void
427 {
428 this->m_thread_holder.unchecked_get().start( [this]() { body(); } );
429 }
430
431 void
433 {
434 this->m_queue.stop();
435 }
436
437 void
443
444 [[nodiscard]]
446 thread_id() const
447 {
448 return this->m_thread_id;
449 }
450
451 [[nodiscard]]
453 demand_queue() noexcept
454 {
455 return this->m_queue;
456 }
457
458 private :
459 void
461 {
463
464 bool demand_extracted = true;
465 do
466 {
467 auto d = this->pop_demand();
468 if( d )
469 {
471 }
472 else
473 // No more demands, it's time to break the loop.
474 demand_extracted = false;
475 }
476 while( demand_extracted );
477 }
478
479 [[nodiscard]]
482 {
483 this->wait_started();
485 [this] { this->wait_finished(); } );
486
487 return this->m_queue.pop();
488 }
489
490 void
492 {
493 this->work_started();
495 [this] { this->work_finished(); } );
496
498 }
499 };
500
501//
502// work_thread_no_activity_tracking_t
503//
504using work_thread_no_activity_tracking_t =
506
507//
508// work_thread_with_activity_tracking_t
509//
510using work_thread_with_activity_tracking_t =
512
513//
514// send_thread_activity_stats
515//
516void
518 const so_5::mbox_t &,
519 const stats::prefix_t &,
520 work_thread_no_activity_tracking_t & )
521 {
522 /* Nothing to do */
523 }
524
525void
526send_thread_activity_stats(
527 const so_5::mbox_t & mbox,
528 const stats::prefix_t & prefix,
529 work_thread_with_activity_tracking_t & wt )
530 {
531 so_5::send< stats::messages::work_thread_activity >(
532 mbox,
533 prefix,
534 stats::suffixes::work_thread_activity(),
535 wt.thread_id(),
536 wt.take_activity_stats() );
537 }
538
539//
540// dispatcher_template_t
541//
542/*!
543 * \brief An implementation of dispatcher with one working
544 * thread and guarantee that evt_finish demands will be added to
545 * the queue without exceptions.
546 *
547 * \since v.5.8.0
548 */
549template< typename Work_Thread >
550class dispatcher_template_t final : public disp_binder_t
551 {
552 friend class disp_data_source_t;
553
554 public:
557 const std::string_view name_base,
558 disp_params_t params )
559 : m_work_thread{
560 params.queue_params().lock_factory()(),
561 so_5::disp::reuse::acquire_work_thread( params, env.get() ),
562 }
563 , m_data_source{
564 outliving_mutable(env.get().stats_repository()),
565 name_base,
566 outliving_mutable(*this)
567 }
568 {
569 m_work_thread.start();
570 }
571
572 ~dispatcher_template_t() noexcept override
573 {
574 m_work_thread.stop();
575 m_work_thread.join();
576 }
577
578 void
580 agent_t & agent ) override
581 {
582 // Assume that there is no pointer to the agent in the map yet.
583 auto evt_start_demand = std::make_unique< demand_t >();
584 auto evt_finish_demand = std::make_unique< demand_t >();
585
586 auto queue = std::make_unique< agent_queue_t >(
587 m_work_thread.demand_queue(),
588 std::move(evt_start_demand),
589 std::move(evt_finish_demand)
590 );
591
592 // All further operattions have to be performed under the lock.
593 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
594
595 m_agents.emplace(
596 std::addressof(agent),
597 std::move(queue) );
598 }
599
600 void
602 agent_t & agent ) noexcept override
603 {
604 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
605
606 m_agents.erase( std::addressof(agent) );
607 }
608
609 void
611 agent_t & agent ) noexcept override
612 {
613 event_queue_t & queue = [&]() -> event_queue_t &
614 {
615 std::lock_guard< std::mutex > lock{ m_agent_map_lock };
616
617 auto it = m_agents.find( std::addressof(agent) );
618 // Just in case, to simplify debugging if something
619 // went very, very wrong.
620 // This will lead to the termination of the application,
621 // but it is better than accessing a random pointer.
622 if( it == m_agents.end() )
624 rc_no_preallocated_resources_for_agent,
625 "nef_one_thread dispatcher has no info about an agent "
626 "in bind() method" );
627
628 return *(it->second);
629 }();
630
631 agent.so_bind_to_dispatcher( queue );
632 }
633
634 void
636 agent_t & agent ) noexcept override
637 {
638 // Just reuse existing implementation.
639 this->undo_preallocation( agent );
640 }
641
642 private:
643 /*!
644 * \brief Data source for run-time monitoring of whole dispatcher.
645 *
646 * \since v.5.8.0
647 */
648 class disp_data_source_t : public stats::source_t
649 {
650 //! Dispatcher to work with.
651 outliving_reference_t< dispatcher_template_t > m_dispatcher;
652
653 //! Basic prefix for data sources.
655
656 public :
658 const std::string_view name_base,
659 outliving_reference_t< dispatcher_template_t > disp )
660 : m_dispatcher{ disp }
662 "nef-ot",
663 name_base,
664 &(disp.get()) )
665 }
666 {}
667
668 void
669 distribute( const mbox_t & mbox ) override
670 {
671 auto & disp = m_dispatcher.get();
672
673 const std::size_t agents_count = [&disp]() {
675 return disp.m_agents.size();
676 }();
677
679 mbox,
682 agents_count );
683
685 mbox,
689
691 mbox,
694 }
695 };
696
697 //! Type of map from agent pointer to an individual event_queue.
698 using agent_map_t = std::map< agent_t *, std::unique_ptr<agent_queue_t> >;
699
700 //! Worker thread for the dispatcher.
701 Work_Thread m_work_thread;
702
703 //! Data source for run-time monitoring.
706
707 //! Lock for agent_map protection.
709
710 //! Agents for those resources are allocated by the dispatcher.
712 };
713
714//
715// dispatcher_handle_maker_t
716//
718 {
719 public :
721 make( disp_binder_shptr_t binder ) noexcept
722 {
723 return { std::move( binder ) };
724 }
725 };
726
727} /* namespace impl */
728
729//
730// make_dispatcher
731//
734 environment_t & env,
735 const std::string_view data_sources_name_base,
736 disp_params_t params )
737 {
738 using namespace so_5::disp::reuse;
739
740 using dispatcher_no_activity_tracking_t =
741 impl::dispatcher_template_t<
742 impl::work_thread_no_activity_tracking_t >;
743
744 using dispatcher_with_activity_tracking_t =
745 impl::dispatcher_template_t<
746 impl::work_thread_with_activity_tracking_t >;
747
748 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
749 disp_binder_t,
750 dispatcher_no_activity_tracking_t,
751 dispatcher_with_activity_tracking_t >(
752 outliving_mutable(env),
753 data_sources_name_base,
754 std::move(params) );
755
756 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
757 }
758
759} /* namespace nef_one_thread */
760
761} /* namespace disp */
762
763} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
An analog of std::lock_guard for MPSC queue lock.
An analog of std::unique_lock for MPSC queue lock.
Alias for namespace with traits of event queue.
A handle for nef_one_thread dispatcher.
agent_queue_t(const agent_queue_t &)=delete
void push_evt_finish(execution_demand_t demand) noexcept override
Enqueue a demand for evt_finish event.
void push_evt_start(execution_demand_t demand) override
Enqueue a demand for evt_start event.
std::reference_wrapper< demand_queue_t > m_dest_queue
void push(execution_demand_t demand) override
Enqueue new event to the queue.
agent_queue_t & operator=(const agent_queue_t &)=delete
agent_queue_t(demand_queue_t &dest_queue, demand_unique_ptr_t evt_start_demand, demand_unique_ptr_t evt_finish_demand)
agent_queue_t & operator=(agent_queue_t &&o)=delete
demand_unique_ptr_t remove_head() noexcept
Helper method for deleting queue's head object.
std::size_t size() const
Get the current size of the queue.
std::atomic< std::size_t > m_size
Current size of the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock)
Initializing constructor.
queue_traits::lock_unique_ptr_t m_lock
Queue lock.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
std::mutex m_agent_map_lock
Lock for agent_map protection.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
Work_Thread m_work_thread
Worker thread for the dispatcher.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
agent_map_t m_agents
Agents for those resources are allocated by the dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
A part of implementation of work thread without activity tracking.
no_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
A part of implementation of work thread with activity tracking.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_waiting_stats
Statictics for wait activity.
with_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
A worker thread for nef_one_thread dispatcher.
work_thread_template_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
Initializing constructor.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
Base for the case of internal stats lock.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread_no_activity_tracking_t &)
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of nef_one_thread dispatcher.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
demand_t * m_next
Next demand in the queue.
demand_t()=default
Default constructor.
execution_demand_t m_execution_demand
Execution demand to be used.
demand_t(execution_demand_t &&source)
Initializing constructor.
common_data_t(queue_traits::lock_unique_ptr_t lock, work_thread_holder_t thread_holder)
Initializing constructor.
A description of event execution demand.