SObjectizer  5.8
Loading...
Searching...
No Matches
prio_one_thread/quoted_round_robin/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief Functions for creating and binding of the single thread dispatcher
8 * with priority support (quoted round robin policy).
9 *
10 * \since
11 * v.5.5.8
12 */
13
14#include <so_5/disp/prio_one_thread/quoted_round_robin/pub.hpp>
15
16#include <so_5/disp/prio_one_thread/quoted_round_robin/impl/demand_queue.hpp>
17#include <so_5/disp/prio_one_thread/reuse/work_thread.hpp>
18
19#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
20#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
21#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
22
23#include <so_5/stats/repository.hpp>
24#include <so_5/stats/messages.hpp>
25#include <so_5/stats/std_names.hpp>
26
27#include <so_5/send_functions.hpp>
28
29namespace so_5 {
30
31namespace disp {
32
33namespace prio_one_thread {
34
36
37namespace impl {
38
39namespace stats = so_5::stats;
40
41namespace {
42
43void
45 const so_5::mbox_t &,
46 const stats::prefix_t &,
48 demand_queue_t > & )
49 {
50 /* Nothing to do */
51 }
52
53void
55 const so_5::mbox_t & mbox,
56 const stats::prefix_t & prefix,
57 so_5::disp::prio_one_thread::reuse::work_thread_with_activity_tracking_t<
58 demand_queue_t > & wt )
59 {
60 so_5::send< stats::messages::work_thread_activity >(
61 mbox,
62 prefix,
63 stats::suffixes::work_thread_activity(),
64 wt.thread_id(),
65 wt.take_activity_stats() );
66 }
67
68} /* namespace anonymous */
69
70//
71// dispatcher_template_t
72//
73/*!
74 * \brief An implementation of dispatcher with one working thread and support
75 * of demand priorities (quoted round robin policy) in form of template class.
76 *
77 * \since
78 * v.5.5.8, v.5.5.18, v.5.6.0
79 */
80template< typename Work_Thread >
81class dispatcher_template_t final : public disp_binder_t
82 {
83 friend class disp_data_source_t;
84
85 public:
88 const std::string_view name_base,
89 disp_params_t params,
90 const quotes_t & quotes )
91 : m_demand_queue{
92 params.queue_params().lock_factory()(),
93 quotes }
94 , m_work_thread{
95 so_5::disp::reuse::acquire_work_thread( params, env.get() ),
96 m_demand_queue }
97 , m_data_source{
98 outliving_mutable(env.get().stats_repository()),
99 name_base,
100 outliving_mutable(*this)
101 }
102 {
103 m_work_thread.start();
104 }
105
106 ~dispatcher_template_t() noexcept override
107 {
108 m_demand_queue.stop();
109 m_work_thread.join();
110 }
111
112 void
114 agent_t & /*agent*/ ) override
115 {
116 // Nothing to do.
117 }
118
119 void
121 agent_t & /*agent*/ ) noexcept override
122 {
123 // Nothing to do.
124 }
125
126 void
128 agent_t & agent ) noexcept override
129 {
130 const auto priority = agent.so_priority();
131
132 agent.so_bind_to_dispatcher(
133 m_demand_queue.event_queue_by_priority( priority ) );
134
135 m_demand_queue.agent_bound( priority );
136 }
137
138 void
140 agent_t & agent ) noexcept override
141 {
142 const auto priority = agent.so_priority();
143
144 m_demand_queue.agent_unbound( priority );
145 }
146
147 private:
148 /*!
149 * \brief Data source for run-time monitoring of whole dispatcher.
150 *
151 * \since
152 * v.5.5.8
153 */
154 class disp_data_source_t : public stats::source_t
155 {
156 //! Dispatcher to work with.
157 outliving_reference_t< dispatcher_template_t > m_dispatcher;
158
159 //! Basic prefix for data sources.
161
162 public :
164 const std::string_view name_base,
165 outliving_reference_t< dispatcher_template_t > disp )
166 : m_dispatcher{ disp }
168 "pot-qrr",
169 name_base,
170 &(disp.get()) )
171 }
172 {}
173
174 void
175 distribute( const mbox_t & mbox ) override
176 {
177 auto & disp = m_dispatcher.get();
178
180
182 [&]( const demand_queue_t::queue_stats_t & stat ) {
184 mbox,
189
191 } );
192
194 mbox,
197 agents_count );
198
200 mbox,
203 }
204
205 private:
206 void
208 const mbox_t & mbox,
209 priority_t priority,
210 std::size_t quote,
211 std::size_t agents_count,
212 std::size_t demands_count )
213 {
215 ss << m_base_prefix.c_str() << "/p" << to_size_t(priority);
216
217 const stats::prefix_t prefix{ ss.str() };
218
220 mbox,
221 prefix,
223 quote );
224
226 mbox,
227 prefix,
229 agents_count );
230
232 mbox,
233 prefix,
236 }
237 };
238
239 //! Demand queue for the dispatcher.
241
242 //! Working thread for the dispatcher.
243 Work_Thread m_work_thread;
244
245 //! Data source for run-time monitoring.
248 };
249
250//
251// dispatcher_handle_maker_t
252//
254 {
255 public :
257 make( disp_binder_shptr_t binder ) noexcept
258 {
259 return { std::move( binder ) };
260 }
261 };
262
263} /* namespace impl */
264
265//
266// make_dispatcher
267//
270 environment_t & env,
271 const std::string_view data_sources_name_base,
272 const quotes_t & quotes,
273 disp_params_t params )
274 {
275 using namespace so_5::disp::reuse;
276 using namespace so_5::disp::prio_one_thread::reuse;
277
278 using dispatcher_no_activity_tracking_t =
279 impl::dispatcher_template_t<
280 work_thread_no_activity_tracking_t< impl::demand_queue_t > >;
281
282 using dispatcher_with_activity_tracking_t =
283 impl::dispatcher_template_t<
284 work_thread_with_activity_tracking_t< impl::demand_queue_t > >;
285
286 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
287 disp_binder_t,
288 dispatcher_no_activity_tracking_t,
289 dispatcher_with_activity_tracking_t >(
290 outliving_mutable(env),
291 data_sources_name_base,
292 std::move(params),
293 quotes );
294
295 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
296 }
297
298} /* namespace quoted_round_robin */
299
300} /* namespace prio_one_thread */
301
302} /* namespace disp */
303
304} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
A demand queue for dispatcher with one common working thread and round-robin processing of prioritise...
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t quote, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params, const quotes_t &quotes)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
Interface for dispatcher binders.
SObjectizer Environment.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Implementation details for dispatcher with round-robin policy of handling prioritized events.
Dispatcher which handles events of different priorities in round-robin maner.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes, disp_params_t params)
Create an instance of quoted_round_robin dispatcher.
Reusable code for dispatchers with one working thread for events of all priorities.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
priority_t
Definition of supported priorities.
Definition priority.hpp:28