SObjectizer-5 Extra
Loading...
Searching...
No Matches
simple_mtsafe.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of Asio-based simple thread safe
4 * environment infrastructure.
5 */
6
7#pragma once
8
9#include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10
11#include <so_5/version.hpp>
12#if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 8u, 0u)
13#error "SObjectizer-5.8.0 is required"
14#endif
15
16#include <so_5/impl/st_env_infrastructure_reuse.hpp>
17#include <so_5/impl/internal_env_iface.hpp>
18#include <so_5/details/sync_helpers.hpp>
19#include <so_5/details/at_scope_exit.hpp>
20#include <so_5/details/invoke_noexcept_code.hpp>
21
22#include <asio.hpp>
23
24#include <string_view>
25
26namespace so_5 {
27
28namespace extra {
29
30namespace env_infrastructures {
31
32namespace asio {
33
34namespace simple_mtsafe {
35
36namespace impl {
37
38//! A short name for namespace with reusable stuff.
40
41//! A short name for namespace with common stuff.
42namespace asio_common = ::so_5::extra::env_infrastructures::asio::common;
43
44//
45// shutdown_status_t
46//
48
49//
50// coop_repo_t
51//
52/*!
53 * \brief Implementation of coop_repository for
54 * simple thread-safe single-threaded environment infrastructure.
55 */
57
58//
59// stats_controller_t
60//
61/*!
62 * \brief Implementation of stats_controller for that type of
63 * single-threaded environment.
64 */
67
68//
69// event_queue_impl_t
70//
71/*!
72 * \brief Implementation of event_queue interface for the default dispatcher.
73 *
74 * \tparam Activity_Tracker A type for tracking work thread activity.
75 */
76template< typename Activity_Tracker >
78 {
80
81 public :
82 //! Type for representation of statistical data for this event queue.
83 struct stats_t
84 {
85 //! The current size of the demands queue.
87 };
88
89 //! Initializing constructor.
91 //! Asio's io_context to be used for dispatching.
93 //! Actual activity tracker.
97 , m_pending_demands{}
98 {
99 }
100
101 virtual void
103 {
104 // Statistics must be updated.
106
107 // Now we can schedule processing of the demand.
108 // It ::asio::post fails then statistics must be reverted.
110 [&] {
111 ::asio::post(
112 m_io_svc.get(),
113 [this, d = std::move(demand)]() mutable {
114 // Statistics must be updated.
116
117 // Update wait statistics.
119 const auto wait_starter = ::so_5::details::at_scope_exit(
120 [this]{ m_activity_tracker.get().wait_started(); } );
121
122 // The demand can be handled now.
123 // With working time tracking.
125 {
126 // For the case if call_handler will throw.
127 const auto stopper = ::so_5::details::at_scope_exit(
128 [this]{ m_activity_tracker.get().work_stopped(); });
129
131 }
132 } );
133 },
134 [this] {
136 } );
137 }
138
139 void
141 {
142 // Just delegate to the ordinary push().
143 this->push( std::move(demand) );
144 }
145
146 void
148 {
149 // Just delegate to the ordinary push() despite
150 // the fact that push() isn't noexcept.
151 this->push( std::move(demand) );
152 }
153
154 //! Notification that event queue work is started.
155 void
157 //! ID of the main working thread.
159 {
161
162 // There is no any pending demand now. We can start counting
163 // the waiting time.
165 }
166
167 //! Get the current statistics.
168 stats_t
169 query_stats() const noexcept
170 {
172 }
173
174 private :
177
179
181 };
182
183//
184// disp_ds_name_parts_t
185//
186/*!
187 * \brief A class with major part of dispatcher name.
188 */
189struct disp_ds_name_parts_t final
190 {
191 static constexpr std::string_view
192 disp_type_part() noexcept { return { "asio_mtsafe" }; }
193 };
194
195//
196// default_dispatcher_t
197//
198/*!
199 * \brief An implementation of dispatcher to be used in
200 * places where default dispatcher is needed.
201 *
202 * \tparam Activity_Tracker a type of activity tracker to be used
203 * for run-time statistics.
204 *
205 * \since
206 * v.1.3.0
207 */
208template< typename Activity_Tracker >
214 {
219
220 public :
226 {
227 // Event queue should be started manually.
228 // We known that the default dispatcher is created on a thread
229 // that will be used for events dispatching.
230 event_queue.get().start( this->thread_id() );
231 }
232 };
233
234//
235// env_infrastructure_t
236//
237/*!
238 * \brief Default implementation of not-thread safe single-threaded environment
239 * infrastructure.
240 *
241 * \attention
242 * This object doesn't have any mutexes. All synchronization is done via
243 * delegation mutating operations to Asio's context (asio::post and
244 * asio::dispatch are used).
245 *
246 * \tparam Activity_Tracker A type of activity tracker to be used.
247 */
248template< typename Activity_Tracker >
251 {
252 public :
254 //! Asio's io_context to be used.
256 //! Environment to work in.
258 //! Cooperation action listener.
260 //! Mbox for distribution of run-time stats.
262
263 void
264 launch( env_init_t init_fn ) override;
265
266 /*!
267 * \attention
268 * Since SO-5.8.0 this method should be a noexcept, but
269 * Asio's post() can throw. In that case the whole application
270 * will be terminated.
271 */
272 void
273 stop() noexcept override;
274
275 [[nodiscard]]
277 make_coop(
280
283 coop_unique_holder_t coop ) override;
284
285 /*!
286 * \attention
287 * This method should be a noexcept, but it uses Asio's post() and the
288 * post() call can throw. In that case the whole application will be
289 * terminated.
290 */
291 void
293 coop_shptr_t coop ) noexcept override;
294
295 bool
297 coop_shptr_t coop_name ) noexcept override;
298
301 const std::type_index & type_wrapper,
302 const message_ref_t & msg,
303 const mbox_t & mbox,
305 std::chrono::steady_clock::duration period ) override;
306
307 void
309 const std::type_index & type_wrapper,
310 const message_ref_t & msg,
311 const mbox_t & mbox,
312 std::chrono::steady_clock::duration pause ) override;
313
315 stats_controller() noexcept override;
316
318 stats_repository() noexcept override;
319
322
324 query_timer_thread_stats() override;
325
327 make_default_disp_binder() override;
328
329 private :
330 //! Asio's io_context to be used.
332
333 //! Actual SObjectizer Environment.
335
336 //! Status of shutdown procedure.
338
339 //! Repository of registered coops.
341
342 //! Actual activity tracker.
343 Activity_Tracker m_activity_tracker;
344
345 //! Event queue which is necessary for the default dispatcher.
347
348 //! Dispatcher to be used as default dispatcher.
349 /*!
350 * \note
351 * Has an actual value only inside launch() method.
352 */
354
355 //! Stats controller for this environment.
357
358 //! Counter of cooperations which are waiting for final deregistration
359 //! step.
360 /*!
361 * It is necessary for building correct run-time stats.
362 */
364
365 //! The pointer to an exception that was thrown during init phase.
366 /*!
367 * This exception is stored inside a callback posted to Asio.
368 * An then this exception will be rethrown from launch() method
369 * after the shutdown of SObjectizer.
370 */
372
373 void
374 run_default_dispatcher_and_go_further( env_init_t init_fn );
375
376 /*!
377 * \note Calls m_io_svc.stop() and m_default_disp.shutdown() if necessary.
378 *
379 * \attention Must be called only for locked object!
380 */
381 void
383 };
384
385template< typename Activity_Tracker >
386env_infrastructure_t<Activity_Tracker>::env_infrastructure_t(
387 outliving_reference_t<::asio::io_context> io_svc,
388 environment_t & env,
389 coop_listener_unique_ptr_t coop_listener,
390 mbox_t stats_distribution_mbox )
391 : m_io_svc( io_svc )
392 , m_env( env )
400 {}
401
402template< typename Activity_Tracker >
403void
404env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
405 {
406 // Post initial operation to Asio event loop.
407 ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
409 } );
410
411 // Default dispatcher should be destroyed on exit from this function.
414 } );
415
416 // Tell that there is a work to do.
417 auto work = ::asio::make_work_guard( m_io_svc.get() );
418
419 // Launch Asio event loop.
420 m_io_svc.get().run();
421
423 // Some exception was thrown during initialization.
424 // It should be rethrown.
426 }
427
428template< typename Activity_Tracker >
429void
430env_infrastructure_t<Activity_Tracker>::stop() noexcept
431 {
436 {
437 // All registered cooperations must be deregistered now.
439 [this] {
441
443
445 } );
446 }
447 else
448 // Check for shutdown completeness must be performed only
449 // on the main Asio's thread.
450 ::asio::dispatch( m_io_svc.get(), [this] {
452 } );
453 }
454
455template< typename Activity_Tracker >
465
466template< typename Activity_Tracker >
470 {
472 }
473
474template< typename Activity_Tracker >
475void
477 coop_shptr_t coop_to_dereg ) noexcept
478 {
480
481 ::asio::post( m_io_svc.get(), [this, coop = std::move(coop_to_dereg)] {
485 } );
486 }
487
488template< typename Activity_Tracker >
489bool
491 coop_shptr_t coop ) noexcept
492 {
495 }
496
497template< typename Activity_Tracker >
500 const std::type_index & type_index,
501 const message_ref_t & msg,
502 const mbox_t & mbox,
505 {
506 using namespace asio_common;
507
508 // We do not control shutdown_status_t here. Because it seems
509 // to be safe to call schedule_timer after call to stop().
510 // New timer will simply ignored during shutdown process.
513 {
516 m_io_svc.get(),
518 msg,
519 mbox,
520 period ) };
521
524
526 }
527 else
528 {
531 m_io_svc.get(),
533 msg,
534 mbox ) };
535
538
540 }
541
542 return result;
543 }
544
545template< typename Activity_Tracker >
546void
548 const std::type_index & type_index,
549 const message_ref_t & msg,
550 const mbox_t & mbox,
551 std::chrono::steady_clock::duration pause )
552 {
553 using namespace asio_common;
554
555 // We do not control shutdown_status_t here. Because it seems
556 // to be safe to call schedule_timer after call to stop().
557 // New timer will simply ignored during shutdown process.
558
561 m_io_svc.get(),
563 msg,
564 mbox ) };
565
567 }
568
569template< typename Activity_Tracker >
571env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
572 {
573 return m_stats_controller;
574 }
575
576template< typename Activity_Tracker >
578env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
579 {
580 return m_stats_controller;
581 }
582
583template< typename Activity_Tracker >
595
596template< typename Activity_Tracker >
599 {
600 // NOTE: this type of environment_infrastructure doesn't support
601 // statistics for timers.
602 return { 0, 0 };
603 }
604
605template< typename Activity_Tracker >
608 {
609 return { m_default_disp };
610 }
611
612template< typename Activity_Tracker >
613void
615 env_init_t init_fn )
616 {
617 try
618 {
624
625 // User-supplied init can be called now.
626 init_fn();
627 }
628 catch(...)
629 {
630 // We can't restore if the following fragment throws and exception.
632 // The current exception should be stored to be
633 // rethrown later.
635
636 // SObjectizer's shutdown should be initiated.
637 stop();
638
639 // NOTE: pointer to the default dispatcher will be dropped
640 // in launch() method.
641 } );
642 }
643 }
644
645template< typename Activity_Tracker >
646void
648 {
650 {
651 // If there is no more live coops then shutdown must be
652 // completed.
654 {
656 // Asio's event loop must be broken here!
657 m_io_svc.get().stop();
658 }
659 }
660 }
661
662} /* namespace impl */
663
664//
665// factory
666//
667/*!
668 * \brief A factory for creation of environment infrastructure based on
669 * Asio's event loop.
670 *
671 * \attention
672 * This environment infrastructure is not a thread safe.
673 *
674 * Usage example:
675 * \code
676int main()
677{
678 asio::io_context io_svc;
679
680 so_5::launch( [](so_5::environment_t & env) {
681 ... // Some initialization stuff.
682 },
683 [&io_svc](so_5::environment_params_t & params) {
684 using asio_env = so_5::extra::env_infrastructures::asio::simple_mtsafe;
685
686 params.infrastructure_factory( asio_env::factory(io_svc) );
687 } );
688
689 return 0;
690}
691 * \endcode
692 */
695 {
696 using namespace impl;
697
698 return [&io_svc](
699 environment_t & env,
700 environment_params_t & env_params,
701 mbox_t stats_distribution_mbox )
702 {
703 environment_infrastructure_t * obj = nullptr;
704
705 // Create environment infrastructure object in dependence of
706 // work thread activity tracking flag.
707 const auto tracking = env_params.work_thread_activity_tracking();
708 if( work_thread_activity_tracking_t::on == tracking )
709 obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
710 outliving_mutable(io_svc),
711 env,
712 env_params.so5_giveout_coop_listener(),
713 std::move(stats_distribution_mbox) );
714 else
715 obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
716 outliving_mutable(io_svc),
717 env,
718 env_params.so5_giveout_coop_listener(),
719 std::move(stats_distribution_mbox) );
720
721 return environment_infrastructure_unique_ptr_t(
722 obj,
723 environment_infrastructure_t::default_deleter() );
724 };
725 }
726
727} /* namespace simple_mtsafe */
728
729} /* namespace asio */
730
731} /* namespace env_infrastructures */
732
733} /* namespace extra */
734
735} /* namespace so_5 */
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< event_queue_impl_t< Activity_Tracker > > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
Default implementation of not-thread safe single-threaded environment infrastructure.
stats_controller_t m_stats_controller
Stats controller for this environment.
std::atomic< std::size_t > m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
std::atomic< shutdown_status_t > m_shutdown_status
Status of shutdown procedure.
std::exception_ptr m_exception_from_init
The pointer to an exception that was thrown during init phase.
outliving_reference_t< ::asio::io_context > m_io_svc
Asio's io_context to be used.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio's event loop.
Ranges for error codes of each submodules.
Definition details.hpp:13
Type for representation of statistical data for this event queue.