src/ex/detail/strand_service.cpp

97.8% Lines (89/91) 95.5% List of functions (21/22) 90.6% Branches (29/32)
f(x) Functions (22)
Function Calls Lines Branches Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&) :53 0 100.0% 75.0% boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long) :70 0 87.5% 50.0% boost::capy::detail::strand_invoker::promise_type::get_return_object() :84 0 100.0% boost::capy::detail::strand_invoker::promise_type::initial_suspend() :87 0 100.0% boost::capy::detail::strand_invoker::promise_type::final_suspend() :88 0 100.0% boost::capy::detail::strand_invoker::promise_type::return_void() :89 0 100.0% boost::capy::detail::strand_invoker::promise_type::unhandled_exception() :90 0 0.0% boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&) :112 0 100.0% boost::capy::detail::strand_service_impl::get_implementation() :117 0 100.0% 100.0% boost::capy::detail::strand_service_impl::shutdown() :127 0 100.0% 100.0% boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>) :143 0 100.0% 100.0% boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&) :156 0 100.0% 100.0% boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&) :167 0 100.0% 100.0% boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&) :179 0 100.0% boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&) :185 0 100.0% boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&) :193 0 100.0% 100.0% boost::capy::detail::strand_service::strand_service() :213 0 100.0% boost::capy::detail::strand_service::~strand_service() :219 0 100.0% boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&) :223 0 100.0% boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :230 0 100.0% 83.3% boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :242 0 100.0% 100.0% boost::capy::detail::get_strand_service(boost::capy::execution_context&) :250 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 // Sentinel stored in cached_frame_ after shutdown to prevent
30 // in-flight invokers from repopulating a freed cache slot.
31 inline void* const kCacheClosed = reinterpret_cast<void*>(1);
32
33 struct strand_impl
34 {
35 std::mutex mutex_;
36 strand_queue pending_;
37 bool locked_ = false;
38 std::atomic<std::thread::id> dispatch_thread_{};
39 std::atomic<void*> cached_frame_{nullptr};
40 };
41
42 //----------------------------------------------------------
43
44 /** Invoker coroutine for strand dispatch.
45
46 Uses custom allocator to recycle frame - one allocation
47 per strand_impl lifetime, stored in trailer for recovery.
48 */
49 struct strand_invoker
50 {
51 struct promise_type
52 {
53 13x void* operator new(std::size_t n, strand_impl& impl)
54 {
55 13x constexpr auto A = alignof(strand_impl*);
56 13x std::size_t padded = (n + A - 1) & ~(A - 1);
57 13x std::size_t total = padded + sizeof(strand_impl*);
58
59 13x void* p = impl.cached_frame_.exchange(
60 nullptr, std::memory_order_acquire);
61
3/4
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
13x if(!p || p == kCacheClosed)
62 9x p = ::operator new(total);
63
64 // Trailer lets delete recover impl
65 13x *reinterpret_cast<strand_impl**>(
66 13x static_cast<char*>(p) + padded) = &impl;
67 13x return p;
68 }
69
70 13x void operator delete(void* p, std::size_t n) noexcept
71 {
72 13x constexpr auto A = alignof(strand_impl*);
73 13x std::size_t padded = (n + A - 1) & ~(A - 1);
74
75 13x auto* impl = *reinterpret_cast<strand_impl**>(
76 static_cast<char*>(p) + padded);
77
78 13x void* expected = nullptr;
79
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 13 times.
13x if(!impl->cached_frame_.compare_exchange_strong(
80 expected, p, std::memory_order_release))
81 ::operator delete(p);
82 13x }
83
84 13x strand_invoker get_return_object() noexcept
85 13x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
86
87 13x std::suspend_always initial_suspend() noexcept { return {}; }
88 13x std::suspend_never final_suspend() noexcept { return {}; }
89 13x void return_void() noexcept {}
90 void unhandled_exception() { std::terminate(); }
91 };
92
93 std::coroutine_handle<promise_type> h_;
94 };
95
96 //----------------------------------------------------------
97
98 /** Concrete implementation of strand_service.
99
100 Holds the fixed pool of strand_impl objects.
101 */
102 class strand_service_impl : public strand_service
103 {
104 static constexpr std::size_t num_impls = 211;
105
106 strand_impl impls_[num_impls];
107 std::size_t salt_ = 0;
108 std::mutex mutex_;
109
110 public:
111 explicit
112 21x strand_service_impl(execution_context&)
113 4452x {
114 21x }
115
116 strand_impl*
117 25x get_implementation() override
118 {
119
1/1
✓ Branch 1 taken 25 times.
25x std::lock_guard<std::mutex> lock(mutex_);
120 25x std::size_t index = salt_++;
121 25x index = index % num_impls;
122 25x return &impls_[index];
123 25x }
124
125 protected:
126 void
127 21x shutdown() override
128 {
129
2/2
✓ Branch 0 taken 4431 times.
✓ Branch 1 taken 21 times.
4452x for(std::size_t i = 0; i < num_impls; ++i)
130 {
131
1/1
✓ Branch 1 taken 4431 times.
4431x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
132 4431x impls_[i].locked_ = true;
133
134 4431x void* p = impls_[i].cached_frame_.exchange(
135 kCacheClosed, std::memory_order_acquire);
136
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 4422 times.
4431x if(p)
137 9x ::operator delete(p);
138 4431x }
139 21x }
140
141 private:
142 static bool
143 328x enqueue(strand_impl& impl, std::coroutine_handle<> h)
144 {
145
1/1
✓ Branch 1 taken 328 times.
328x std::lock_guard<std::mutex> lock(impl.mutex_);
146
1/1
✓ Branch 1 taken 328 times.
328x impl.pending_.push(h);
147
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 315 times.
328x if(!impl.locked_)
148 {
149 13x impl.locked_ = true;
150 13x return true;
151 }
152 315x return false;
153 328x }
154
155 static void
156 23x dispatch_pending(strand_impl& impl)
157 {
158 23x strand_queue::taken_batch batch;
159 {
160
1/1
✓ Branch 1 taken 23 times.
23x std::lock_guard<std::mutex> lock(impl.mutex_);
161 23x batch = impl.pending_.take_all();
162 23x }
163
1/1
✓ Branch 1 taken 23 times.
23x impl.pending_.dispatch_batch(batch);
164 23x }
165
166 static bool
167 23x try_unlock(strand_impl& impl)
168 {
169
1/1
✓ Branch 1 taken 23 times.
23x std::lock_guard<std::mutex> lock(impl.mutex_);
170
2/2
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 10 times.
23x if(impl.pending_.empty())
171 {
172 13x impl.locked_ = false;
173 13x return true;
174 }
175 10x return false;
176 23x }
177
178 static void
179 23x set_dispatch_thread(strand_impl& impl) noexcept
180 {
181 23x impl.dispatch_thread_.store(std::this_thread::get_id());
182 23x }
183
184 static void
185 13x clear_dispatch_thread(strand_impl& impl) noexcept
186 {
187 13x impl.dispatch_thread_.store(std::thread::id{});
188 13x }
189
190 // Loops until queue empty (aggressive). Alternative: per-batch fairness
191 // (repost after each batch to let other work run) - explore if starvation observed.
192 static strand_invoker
193
1/1
✓ Branch 1 taken 13 times.
13x make_invoker(strand_impl& impl)
194 {
195 strand_impl* p = &impl;
196 for(;;)
197 {
198 set_dispatch_thread(*p);
199 dispatch_pending(*p);
200 if(try_unlock(*p))
201 {
202 clear_dispatch_thread(*p);
203 co_return;
204 }
205 }
206 26x }
207
208 friend class strand_service;
209 };
210
211 //----------------------------------------------------------
212
213 21x strand_service::
214 21x strand_service()
215 21x : service()
216 {
217 21x }
218
219 21x strand_service::
220 ~strand_service() = default;
221
222 bool
223 6x strand_service::
224 running_in_this_thread(strand_impl& impl) noexcept
225 {
226 6x return impl.dispatch_thread_.load() == std::this_thread::get_id();
227 }
228
229 std::coroutine_handle<>
230 5x strand_service::
231 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
232 {
233
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 3 times.
5x if(running_in_this_thread(impl))
234 2x return h;
235
236
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3x if(strand_service_impl::enqueue(impl, h))
237
2/2
✓ Branch 1 taken 3 times.
✓ Branch 5 taken 3 times.
3x ex.post(strand_service_impl::make_invoker(impl).h_);
238 3x return std::noop_coroutine();
239 }
240
241 void
242 325x strand_service::
243 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
244 {
245
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 315 times.
325x if(strand_service_impl::enqueue(impl, h))
246
2/2
✓ Branch 1 taken 10 times.
✓ Branch 5 taken 10 times.
10x ex.post(strand_service_impl::make_invoker(impl).h_);
247 325x }
248
249 strand_service&
250 25x get_strand_service(execution_context& ctx)
251 {
252 25x return ctx.use_service<strand_service_impl>();
253 }
254
255 } // namespace detail
256 } // namespace capy
257 } // namespace boost
258