src/ex/thread_pool.cpp

100.0% Lines (119/119) 100.0% List of functions (24/24) 85.2% Branches (46/54)
f(x) Functions (24)
Function Calls Lines Branches Blocks
boost::capy::thread_pool::impl::work::work(std::__n4861::coroutine_handle<void>) :55 0 100.0% boost::capy::thread_pool::impl::work::run() :60 0 100.0% 66.7% boost::capy::thread_pool::impl::work::destroy() :67 0 100.0% 77.8% boost::capy::thread_pool::impl::~impl() :88 0 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :94 0 100.0% 100.0% boost::capy::thread_pool::impl::post(std::__n4861::coroutine_handle<void>) :107 0 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :119 0 100.0% boost::capy::thread_pool::impl::on_work_finished() :125 0 100.0% 100.0% boost::capy::thread_pool::impl::join() :138 0 100.0% 100.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :154 0 100.0% 75.0% boost::capy::thread_pool::impl::stop() :166 0 100.0% boost::capy::thread_pool::impl::ensure_started() :177 0 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :179 0 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :182 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :187 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :199 0 100.0% 90.0% boost::capy::thread_pool::~thread_pool() :215 0 100.0% 50.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :225 0 100.0% 60.0% boost::capy::thread_pool::join() :233 0 100.0% boost::capy::thread_pool::stop() :240 0 100.0% boost::capy::thread_pool::get_executor() const :249 0 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :257 0 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :264 0 100.0% boost::capy::thread_pool::executor_type::post(std::__n4861::coroutine_handle<void>) const :271 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are coroutine handles wrapped in intrusive list nodes, stored
26 in a single queue protected by a mutex. Worker threads wait on a
27 condition_variable until work is available or stop is requested.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Work tracking: on_work_started/on_work_finished maintain an atomic
35 outstanding_work_ counter. join() blocks until this counter reaches
36 zero, then signals workers to stop and joins threads.
37
38 Two shutdown paths:
39 - join(): waits for outstanding work to drain, then stops workers.
40 - stop(): immediately signals workers to exit; queued work is abandoned.
41 - Destructor: stop() then join() (abandon + wait for threads).
42 */
43
44 namespace boost {
45 namespace capy {
46
47 //------------------------------------------------------------------------------
48
49 class thread_pool::impl
50 {
51 struct work : detail::intrusive_queue<work>::node
52 {
53 std::coroutine_handle<> h_;
54
55 788x explicit work(std::coroutine_handle<> h) noexcept
56 788x : h_(h)
57 {
58 788x }
59
60 541x void run()
61 {
62 541x auto h = h_;
63
1/2
✓ Branch 0 taken 541 times.
✗ Branch 1 not taken.
541x delete this;
64
1/1
✓ Branch 1 taken 541 times.
541x h.resume();
65 541x }
66
67 247x void destroy()
68 {
69 247x auto h = h_;
70
1/2
✓ Branch 0 taken 247 times.
✗ Branch 1 not taken.
247x delete this;
71
5/6
✓ Branch 1 taken 247 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 115 times.
✓ Branch 7 taken 132 times.
✓ Branch 8 taken 115 times.
✓ Branch 9 taken 132 times.
247x if(h && h != std::noop_coroutine())
72
1/1
✓ Branch 1 taken 115 times.
115x h.destroy();
73 247x }
74 };
75
76 std::mutex mutex_;
77 std::condition_variable cv_;
78 detail::intrusive_queue<work> q_;
79 std::vector<std::thread> threads_;
80 std::atomic<std::size_t> outstanding_work_{0};
81 bool stop_{false};
82 bool joined_{false};
83 std::size_t num_threads_;
84 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 std::once_flag start_flag_;
86
87 public:
88 149x ~impl()
89 {
90
2/2
✓ Branch 1 taken 247 times.
✓ Branch 2 taken 149 times.
396x while(auto* w = q_.pop())
91 247x w->destroy();
92 149x }
93
94 149x impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 149x : num_threads_(num_threads)
96 {
97
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 147 times.
149x if(num_threads_ == 0)
98 4x num_threads_ = std::max(
99 2x std::thread::hardware_concurrency(), 1u);
100
101 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102
1/1
✓ Branch 1 taken 149 times.
149x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 149x thread_name_prefix_[n] = '\0';
104 149x }
105
106 void
107 788x post(std::coroutine_handle<> h)
108 {
109 788x ensure_started();
110 788x auto* w = new work(h);
111 {
112
1/1
✓ Branch 1 taken 788 times.
788x std::lock_guard<std::mutex> lock(mutex_);
113 788x q_.push(w);
114 788x }
115 788x cv_.notify_one();
116 788x }
117
118 void
119 328x on_work_started() noexcept
120 {
121 328x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 328x }
123
124 void
125 328x on_work_finished() noexcept
126 {
127 328x if(outstanding_work_.fetch_sub(
128
2/2
✓ Branch 0 taken 77 times.
✓ Branch 1 taken 251 times.
328x 1, std::memory_order_acq_rel) == 1)
129 {
130 77x std::lock_guard<std::mutex> lock(mutex_);
131
4/4
✓ Branch 0 taken 53 times.
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 49 times.
77x if(joined_ && !stop_)
132 4x stop_ = true;
133 77x cv_.notify_all();
134 77x }
135 328x }
136
137 void
138 159x join() noexcept
139 {
140 {
141 159x std::unique_lock<std::mutex> lock(mutex_);
142
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 149 times.
159x if(joined_)
143 10x return;
144 149x joined_ = true;
145
146 149x if(outstanding_work_.load(
147
2/2
✓ Branch 0 taken 97 times.
✓ Branch 1 taken 52 times.
149x std::memory_order_acquire) == 0)
148 {
149 97x stop_ = true;
150 97x cv_.notify_all();
151 }
152 else
153 {
154 52x cv_.wait(lock, [this]{
155 57x return stop_;
156 });
157 }
158 159x }
159
160
2/2
✓ Branch 5 taken 163 times.
✓ Branch 6 taken 149 times.
312x for(auto& t : threads_)
161
1/2
✓ Branch 1 taken 163 times.
✗ Branch 2 not taken.
163x if(t.joinable())
162 163x t.join();
163 }
164
165 void
166 151x stop() noexcept
167 {
168 {
169 151x std::lock_guard<std::mutex> lock(mutex_);
170 151x stop_ = true;
171 151x }
172 151x cv_.notify_all();
173 151x }
174
175 private:
176 void
177 788x ensure_started()
178 {
179
1/1
✓ Branch 1 taken 788 times.
788x std::call_once(start_flag_, [this]{
180 93x threads_.reserve(num_threads_);
181
2/2
✓ Branch 0 taken 163 times.
✓ Branch 1 taken 93 times.
256x for(std::size_t i = 0; i < num_threads_; ++i)
182
1/1
✓ Branch 2 taken 163 times.
326x threads_.emplace_back([this, i]{ run(i); });
183 93x });
184 788x }
185
186 void
187 163x run(std::size_t index)
188 {
189 // Build name; set_current_thread_name truncates to platform limits.
190 char name[16];
191 163x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 163x set_current_thread_name(name);
193
194 for(;;)
195 {
196 704x work* w = nullptr;
197 {
198
1/1
✓ Branch 1 taken 704 times.
704x std::unique_lock<std::mutex> lock(mutex_);
199
1/1
✓ Branch 1 taken 704 times.
704x cv_.wait(lock, [this]{
200
2/2
✓ Branch 1 taken 245 times.
✓ Branch 2 taken 635 times.
1125x return !q_.empty() ||
201
2/2
✓ Branch 0 taken 69 times.
✓ Branch 1 taken 176 times.
1125x stop_;
202 });
203
2/2
✓ Branch 0 taken 163 times.
✓ Branch 1 taken 541 times.
704x if(stop_)
204 326x return;
205 541x w = q_.pop();
206 704x }
207
1/2
✓ Branch 0 taken 541 times.
✗ Branch 1 not taken.
541x if(w)
208
1/1
✓ Branch 1 taken 541 times.
541x w->run();
209 541x }
210 }
211 };
212
213 //------------------------------------------------------------------------------
214
215 149x thread_pool::
216 ~thread_pool()
217 {
218 149x impl_->stop();
219 149x impl_->join();
220 149x shutdown();
221 149x destroy();
222
1/2
✓ Branch 0 taken 149 times.
✗ Branch 1 not taken.
149x delete impl_;
223 149x }
224
225 149x thread_pool::
226 149x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227
2/4
✓ Branch 2 taken 149 times.
✓ Branch 5 taken 149 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
149x : impl_(new impl(num_threads, thread_name_prefix))
228 {
229
1/1
✓ Branch 1 taken 149 times.
149x this->set_frame_allocator(std::allocator<void>{});
230 149x }
231
232 void
233 10x thread_pool::
234 join() noexcept
235 {
236 10x impl_->join();
237 10x }
238
239 void
240 2x thread_pool::
241 stop() noexcept
242 {
243 2x impl_->stop();
244 2x }
245
246 //------------------------------------------------------------------------------
247
248 thread_pool::executor_type
249 145x thread_pool::
250 get_executor() const noexcept
251 {
252 145x return executor_type(
253 145x const_cast<thread_pool&>(*this));
254 }
255
256 void
257 328x thread_pool::executor_type::
258 on_work_started() const noexcept
259 {
260 328x pool_->impl_->on_work_started();
261 328x }
262
263 void
264 328x thread_pool::executor_type::
265 on_work_finished() const noexcept
266 {
267 328x pool_->impl_->on_work_finished();
268 328x }
269
270 void
271 788x thread_pool::executor_type::
272 post(std::coroutine_handle<> h) const
273 {
274 788x pool_->impl_->post(h);
275 788x }
276
277 } // capy
278 } // boost
279