include/boost/capy/io/write_now.hpp

89.4% Lines (84/94) 96.2% List of functions (25/26) 58.6% Branches (17/29)
f(x) Functions (26)
Function Calls Lines Branches Blocks
boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::get_return_object() :101 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::initial_suspend() :108 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::final_suspend() :117 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::final_suspend()::awaiter::await_ready() const :123 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::final_suspend()::awaiter::await_suspend(std::__n4861::coroutine_handle<void>) const :128 0 80.0% 50.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::final_suspend()::awaiter::await_resume() const :136 0 33.3% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::return_value(boost::capy::io_result<unsigned long>) :143 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::unhandled_exception() :149 0 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<boost::capy::const_buffer> >(boost::capy::consuming_buffers<boost::capy::const_buffer>)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<boost::capy::const_buffer> >(boost::capy::consuming_buffers<boost::capy::const_buffer>)::awaitable&&) :160 0 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<boost::capy::mutable_buffer> >(boost::capy::consuming_buffers<boost::capy::mutable_buffer>)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<boost::capy::mutable_buffer> >(boost::capy::consuming_buffers<boost::capy::mutable_buffer>)::awaitable&&) :160 0 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<std::array<boost::capy::const_buffer, 2ul> > >(boost::capy::consuming_buffers<std::array<boost::capy::const_buffer, 2ul> >)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::consuming_buffers<std::array<boost::capy::const_buffer, 2ul> > >(boost::capy::consuming_buffers<std::array<boost::capy::const_buffer, 2ul> >)::awaitable&&) :160 0 100.0% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::const_buffer&) :197 0 90.0% 66.7% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::mutable_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::mutable_buffer&) :197 0 70.0% 33.3% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 2ul> >(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, std::array<boost::capy::const_buffer, 2ul>&) :197 0 70.0% 33.3% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator delete(void*, unsigned long) :214 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::~op_type() :221 0 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::op_type(boost::capy::write_now<boost::capy::test::write_stream>::op_type&&) :230 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_ready() const :237 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :242 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_resume() :252 0 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::op_type(std::__n4861::coroutine_handle<boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type>) :261 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::~write_now() :270 0 100.0% 50.0% boost::capy::write_now<boost::capy::test::write_stream>::write_now(boost::capy::test::write_stream&) :281 0 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::const_buffer>(boost::capy::const_buffer) :335 0 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::mutable_buffer>(boost::capy::mutable_buffer) :335 0 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<std::array<boost::capy::const_buffer, 2ul> >(std::array<boost::capy::const_buffer, 2ul>) :335 0 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 #define BOOST_CAPY_IO_WRITE_NOW_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/consuming_buffers.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23
24 #include <cstddef>
25 #include <exception>
26 #include <new>
27 #include <stop_token>
28 #include <utility>
29
30 #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
31 # if defined(__GNUC__) && !defined(__clang__)
32 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
33 # else
34 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
35 # endif
36 #endif
37
38 namespace boost {
39 namespace capy {
40
41 /** Eagerly writes complete buffer sequences with frame caching.
42
43 This class wraps a @ref WriteStream and provides an `operator()`
44 that writes an entire buffer sequence, attempting to complete
45 synchronously. If every `write_some` completes without suspending,
46 the entire operation finishes in `await_ready` with no coroutine
47 suspension.
48
49 The class maintains a one-element coroutine frame cache. After
50 the first call, subsequent calls reuse the cached frame memory,
51 avoiding repeated allocation for the internal coroutine.
52
53 @tparam Stream The stream type, must satisfy @ref WriteStream.
54
55 @par Thread Safety
56 Distinct objects: Safe.
57 Shared objects: Unsafe.
58
59 @par Preconditions
60 Only one operation may be outstanding at a time. A new call to
61 `operator()` must not be made until the previous operation has
62 completed (i.e., the returned awaitable has been fully consumed).
63
64 @par Example
65
66 @code
67 template< WriteStream Stream >
68 task<> send_messages( Stream& stream )
69 {
70 write_now wn( stream );
71 auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
72 if( ec1 )
73 detail::throw_system_error( ec1 );
74 auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
75 if( ec2 )
76 detail::throw_system_error( ec2 );
77 }
78 @endcode
79
80 @see write, write_some, WriteStream, ConstBufferSequence
81 */
82 template<class Stream>
83 requires WriteStream<Stream>
84 class write_now
85 {
86 Stream& stream_;
87 void* cached_frame_ = nullptr;
88 std::size_t cached_size_ = 0;
89
90 struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
91 op_type
92 {
93 struct promise_type
94 {
95 io_result<std::size_t> result_;
96 std::exception_ptr ep_;
97 std::coroutine_handle<> cont_{nullptr};
98 io_env const* env_ = nullptr;
99 bool done_ = false;
100
101 68x op_type get_return_object()
102 {
103 return op_type{
104 std::coroutine_handle<
105 68x promise_type>::from_promise(*this)};
106 }
107
108 68x auto initial_suspend() noexcept
109 {
110 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 68x return std::suspend_always{};
112 #else
113 return std::suspend_never{};
114 #endif
115 }
116
117 68x auto final_suspend() noexcept
118 {
119 struct awaiter
120 {
121 promise_type* p_;
122
123 68x bool await_ready() const noexcept
124 {
125 68x return false;
126 }
127
128 68x std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
129 {
130 68x p_->done_ = true;
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
68x if(!p_->cont_)
132 return std::noop_coroutine();
133 68x return p_->cont_;
134 }
135
136 void await_resume() const noexcept
137 {
138 }
139 };
140 68x return awaiter{this};
141 }
142
143 46x void return_value(
144 io_result<std::size_t> r) noexcept
145 {
146 46x result_ = r;
147 46x }
148
149 22x void unhandled_exception()
150 {
151 22x ep_ = std::current_exception();
152 22x }
153
154 std::suspend_always yield_value(int) noexcept
155 {
156 return {};
157 }
158
159 template<class A>
160 84x auto await_transform(A&& a)
161 {
162 using decayed = std::decay_t<A>;
163 if constexpr (IoAwaitable<decayed>)
164 {
165 struct wrapper
166 {
167 decayed inner_;
168 promise_type* p_;
169
170 bool await_ready()
171 {
172 return inner_.await_ready();
173 }
174
175 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
176 {
177 return detail::call_await_suspend(
178 &inner_, h,
179 p_->env_);
180 }
181
182 decltype(auto) await_resume()
183 {
184 return inner_.await_resume();
185 }
186 };
187 return wrapper{
188 84x std::forward<A>(a), this};
189 }
190 else
191 {
192 return std::forward<A>(a);
193 }
194 }
195
196 static void*
197 68x operator new(
198 std::size_t size,
199 write_now& self,
200 auto&)
201 {
202
4/6
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::const_buffer&):
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 40 times.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::mutable_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::mutable_buffer&):
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 2ul> >(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, std::array<boost::capy::const_buffer, 2ul>&):
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
68x if(self.cached_frame_ &&
203
1/6
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::const_buffer&):
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::mutable_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::mutable_buffer&):
✗ Branch 0 not taken.
✗ Branch 1 not taken.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 2ul> >(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, std::array<boost::capy::const_buffer, 2ul>&):
✗ Branch 0 not taken.
✗ Branch 1 not taken.
4x self.cached_size_ >= size)
204 4x return self.cached_frame_;
205 64x void* p = ::operator new(size);
206
3/6
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::const_buffer&):
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::mutable_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::mutable_buffer&):
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 2ul> >(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, std::array<boost::capy::const_buffer, 2ul>&):
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
64x if(self.cached_frame_)
207 ::operator delete(self.cached_frame_);
208 64x self.cached_frame_ = p;
209 64x self.cached_size_ = size;
210 64x return p;
211 }
212
213 static void
214 68x operator delete(void*, std::size_t) noexcept
215 {
216 68x }
217 };
218
219 std::coroutine_handle<promise_type> h_;
220
221 136x ~op_type()
222 {
223
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 68 times.
136x if(h_)
224 68x h_.destroy();
225 136x }
226
227 op_type(op_type const&) = delete;
228 op_type& operator=(op_type const&) = delete;
229
230 68x op_type(op_type&& other) noexcept
231 68x : h_(std::exchange(other.h_, nullptr))
232 {
233 68x }
234
235 op_type& operator=(op_type&&) = delete;
236
237 68x bool await_ready() const noexcept
238 {
239 68x return h_.promise().done_;
240 }
241
242 68x std::coroutine_handle<> await_suspend(
243 std::coroutine_handle<> cont,
244 io_env const* env)
245 {
246 68x auto& p = h_.promise();
247 68x p.cont_ = cont;
248 68x p.env_ = env;
249 68x return h_;
250 }
251
252 68x io_result<std::size_t> await_resume()
253 {
254 68x auto& p = h_.promise();
255
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 46 times.
68x if(p.ep_)
256 22x std::rethrow_exception(p.ep_);
257 46x return p.result_;
258 }
259
260 private:
261 68x explicit op_type(
262 std::coroutine_handle<promise_type> h)
263 68x : h_(h)
264 {
265 68x }
266 };
267
268 public:
269 /** Destructor. Frees the cached coroutine frame. */
270 64x ~write_now()
271 {
272
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64x if(cached_frame_)
273 64x ::operator delete(cached_frame_);
274 64x }
275
276 /** Construct from a stream reference.
277
278 @param s The stream to write to. Must outlive this object.
279 */
280 explicit
281 64x write_now(Stream& s) noexcept
282 64x : stream_(s)
283 {
284 64x }
285
286 write_now(write_now const&) = delete;
287 write_now& operator=(write_now const&) = delete;
288
289 /** Eagerly write the entire buffer sequence.
290
291 Writes data to the stream by calling `write_some` repeatedly
292 until the entire buffer sequence is written or an error
293 occurs. The operation attempts to complete synchronously:
294 if every `write_some` completes without suspending, the
295 entire operation finishes in `await_ready`.
296
297 When the fast path cannot complete, the coroutine suspends
298 and continues asynchronously. The internal coroutine frame
299 is cached and reused across calls.
300
301 @param buffers The buffer sequence to write. Passed by
302 value to ensure the sequence lives in the coroutine
303 frame across suspension points.
304
305 @return An awaitable that await-returns `(error_code,std::size_t)`.
306 On success, `n` equals `buffer_size(buffers)`. On
307 error, `n` is the number of bytes written before the
308 error. Compare error codes to conditions:
309 @li `cond::canceled` - Operation was cancelled
310 @li `std::errc::broken_pipe` - Peer closed connection
311
312 @par Example
313
314 @code
315 write_now wn( stream );
316 auto [ec, n] = co_await wn( make_buffer( body ) );
317 if( ec )
318 detail::throw_system_error( ec );
319 @endcode
320
321 @see write, write_some, WriteStream
322 */
323 // GCC falsely warns that the coroutine promise's
324 // placement operator new(size_t, write_now&, auto&)
325 // mismatches operator delete(void*, size_t). Per the
326 // standard, coroutine deallocation lookup is separate.
327 #if defined(__GNUC__) && !defined(__clang__)
328 #pragma GCC diagnostic push
329 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
330 #endif
331
332 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
333 template<ConstBufferSequence Buffers>
334 op_type
335
3/3
boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::const_buffer>(boost::capy::const_buffer):
✓ Branch 1 taken 44 times.
boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::mutable_buffer>(boost::capy::mutable_buffer):
✓ Branch 1 taken 6 times.
boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<std::array<boost::capy::const_buffer, 2ul> >(std::array<boost::capy::const_buffer, 2ul>):
✓ Branch 1 taken 18 times.
68x operator()(Buffers buffers)
336 {
337 std::size_t const total_size = buffer_size(buffers);
338 std::size_t total_written = 0;
339 consuming_buffers cb(buffers);
340 while(total_written < total_size)
341 {
342 auto r =
343 co_await stream_.write_some(cb);
344 cb.consume(r.t1);
345 total_written += r.t1;
346 if(r.ec)
347 co_return io_result<std::size_t>{
348 r.ec, total_written};
349 }
350 co_return io_result<std::size_t>{
351 {}, total_written};
352 136x }
353 #else
354 template<ConstBufferSequence Buffers>
355 op_type
356 operator()(Buffers buffers)
357 {
358 std::size_t const total_size = buffer_size(buffers);
359 std::size_t total_written = 0;
360
361 // GCC ICE in expand_expr_real_1 (expr.cc:11376)
362 // when consuming_buffers spans a co_yield, so
363 // the GCC path uses a separate simple coroutine.
364 consuming_buffers cb(buffers);
365 while(total_written < total_size)
366 {
367 auto inner = stream_.write_some(cb);
368 if(!inner.await_ready())
369 break;
370 auto r = inner.await_resume();
371 if(r.ec)
372 co_return io_result<std::size_t>{
373 r.ec, total_written};
374 cb.consume(r.t1);
375 total_written += r.t1;
376 }
377
378 if(total_written >= total_size)
379 co_return io_result<std::size_t>{
380 {}, total_written};
381
382 co_yield 0;
383
384 while(total_written < total_size)
385 {
386 auto r =
387 co_await stream_.write_some(cb);
388 cb.consume(r.t1);
389 total_written += r.t1;
390 if(r.ec)
391 co_return io_result<std::size_t>{
392 r.ec, total_written};
393 }
394 co_return io_result<std::size_t>{
395 {}, total_written};
396 }
397 #endif
398
399 #if defined(__GNUC__) && !defined(__clang__)
400 #pragma GCC diagnostic pop
401 #endif
402 };
403
404 template<WriteStream S>
405 write_now(S&) -> write_now<S>;
406
407 } // namespace capy
408 } // namespace boost
409
410 #endif
411