include/boost/corosio/io_stream.hpp

97.1% Lines (34/35) 100.0% Functions (27/27) 83.3% Branches (5/6)
include/boost/corosio/io_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 #define BOOST_COROSIO_IO_STREAM_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/io_object.hpp>
15 #include <boost/capy/io_result.hpp>
16 #include <boost/corosio/io_buffer_param.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/io_env.hpp>
19 #include <system_error>
20
21 #include <coroutine>
22 #include <cstddef>
23 #include <stop_token>
24
25 namespace boost::corosio {
26
27 /** Platform stream with read/write operations.
28
29 This base class provides the fundamental async read and write
30 operations for kernel-level stream I/O. Derived classes wrap
31 OS-specific stream implementations (sockets, pipes, etc.) and
32 satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33
34 @par Semantics
35 Concrete classes wrap direct platform I/O completed by the kernel.
36 Functions taking `io_stream&` signal "platform implementation
37 required" - use this when you need actual kernel I/O rather than
38 a mock or test double.
39
40 For generic stream algorithms that work with test mocks,
41 use `template<capy::Stream S>` instead of `io_stream&`.
42
43 @par Thread Safety
44 Distinct objects: Safe.
45 Shared objects: Unsafe. All calls to a single stream must be made
46 from the same implicit or explicit serialization context.
47
48 @par Example
49 @code
50 // Read until buffer full or EOF
51 capy::task<> read_all( io_stream& stream, std::span<char> buf )
52 {
53 std::size_t total = 0;
54 while( total < buf.size() )
55 {
56 auto [ec, n] = co_await stream.read_some(
57 capy::buffer( buf.data() + total, buf.size() - total ) );
58 if( ec == capy::cond::eof )
59 break;
60 if( ec.failed() )
61 capy::detail::throw_system_error( ec );
62 total += n;
63 }
64 }
65 @endcode
66
67 @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68 */
69 class BOOST_COROSIO_DECL io_stream : public io_object
70 {
71 public:
72 /** Asynchronously read data from the stream.
73
74 This operation suspends the calling coroutine and initiates a
75 kernel-level read. The coroutine resumes when the operation
76 completes.
77
78 @li The operation completes when:
79 @li At least one byte has been read into the buffer sequence
80 @li The peer closes the connection (EOF)
81 @li An error occurs
82 @li The operation is cancelled via stop token or `cancel()`
83
84 @par Concurrency
85 At most one write operation may be in flight concurrently with
86 this read. No other read operations may be in flight until this
87 operation completes. Note that concurrent in-flight operations
88 does not imply the initiating calls may be made concurrently;
89 all calls must be serialized.
90
91 @par Cancellation
92 Supports cancellation via `std::stop_token` propagated through
93 the IoAwaitable protocol, or via the I/O object's `cancel()`
94 member. When cancelled, the operation completes with an error
95 that compares equal to `capy::cond::canceled`.
96
97 @par Preconditions
98 The stream must be open and connected.
99
100 @param buffers The buffer sequence to read data into. The caller
101 retains ownership and must ensure validity until the
102 operation completes.
103
104 @return An awaitable yielding `(error_code, std::size_t)`.
105 On success, `bytes_transferred` contains the number of bytes
106 read. Compare error codes to conditions, not specific values:
107 @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108 @li `capy::cond::canceled` - Operation was cancelled
109
110 @par Example
111 @code
112 // Simple read with error handling
113 auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114 if( ec == capy::cond::eof )
115 co_return; // Connection closed gracefully
116 if( ec.failed() )
117 capy::detail::throw_system_error( ec );
118 process( buf, n );
119 @endcode
120
121 @note This operation may read fewer bytes than the buffer
122 capacity. Use a loop or `capy::async_read` to read an
123 exact amount.
124
125 @see write_some, capy::async_read
126 */
127 template<capy::MutableBufferSequence MB>
128 223138 auto read_some(MB const& buffers)
129 {
130 223138 return read_some_awaitable<MB>(*this, buffers);
131 }
132
133 /** Asynchronously write data to the stream.
134
135 This operation suspends the calling coroutine and initiates a
136 kernel-level write. The coroutine resumes when the operation
137 completes.
138
139 @li The operation completes when:
140 @li At least one byte has been written from the buffer sequence
141 @li An error occurs (including connection reset by peer)
142 @li The operation is cancelled via stop token or `cancel()`
143
144 @par Concurrency
145 At most one read operation may be in flight concurrently with
146 this write. No other write operations may be in flight until
147 this operation completes. Note that concurrent in-flight
148 operations does not imply the initiating calls may be made
149 concurrently; all calls must be serialized.
150
151 @par Cancellation
152 Supports cancellation via `std::stop_token` propagated through
153 the IoAwaitable protocol, or via the I/O object's `cancel()`
154 member. When cancelled, the operation completes with an error
155 that compares equal to `capy::cond::canceled`.
156
157 @par Preconditions
158 The stream must be open and connected.
159
160 @param buffers The buffer sequence containing data to write.
161 The caller retains ownership and must ensure validity
162 until the operation completes.
163
164 @return An awaitable yielding `(error_code, std::size_t)`.
165 On success, `bytes_transferred` contains the number of bytes
166 written. Compare error codes to conditions, not specific
167 values:
168 @li `capy::cond::canceled` - Operation was cancelled
169 @li `std::errc::broken_pipe` - Peer closed connection
170
171 @par Example
172 @code
173 // Write all data
174 std::string_view data = "Hello, World!";
175 std::size_t written = 0;
176 while( written < data.size() )
177 {
178 auto [ec, n] = co_await stream.write_some(
179 capy::buffer( data.data() + written,
180 data.size() - written ) );
181 if( ec.failed() )
182 capy::detail::throw_system_error( ec );
183 written += n;
184 }
185 @endcode
186
187 @note This operation may write fewer bytes than the buffer
188 contains. Use a loop or `capy::async_write` to write
189 all data.
190
191 @see read_some, capy::async_write
192 */
193 template<capy::ConstBufferSequence CB>
194 222775 auto write_some(CB const& buffers)
195 {
196 222775 return write_some_awaitable<CB>(*this, buffers);
197 }
198
199 protected:
200 /// Awaitable for async read operations.
201 template<class MutableBufferSequence>
202 struct read_some_awaitable
203 {
204 io_stream& ios_;
205 MutableBufferSequence buffers_;
206 std::stop_token token_;
207 mutable std::error_code ec_;
208 mutable std::size_t bytes_transferred_ = 0;
209
210 223138 read_some_awaitable(
211 io_stream& ios,
212 MutableBufferSequence buffers) noexcept
213 223138 : ios_(ios)
214 223138 , buffers_(std::move(buffers))
215 {
216 223138 }
217
218 223138 bool await_ready() const noexcept
219 {
220 223138 return token_.stop_requested();
221 }
222
223 223138 capy::io_result<std::size_t> await_resume() const noexcept
224 {
225
2/2
✓ Branch 1 taken 196 times.
✓ Branch 2 taken 222942 times.
223138 if (token_.stop_requested())
226 196 return {make_error_code(std::errc::operation_canceled), 0};
227 222942 return {ec_, bytes_transferred_};
228 }
229
230 223138 auto await_suspend(
231 std::coroutine_handle<> h,
232 capy::io_env const* env) -> std::coroutine_handle<>
233 {
234 223138 token_ = env->stop_token;
235
1/1
✓ Branch 4 taken 223138 times.
223138 return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
236 }
237 };
238
239 /// Awaitable for async write operations.
240 template<class ConstBufferSequence>
241 struct write_some_awaitable
242 {
243 io_stream& ios_;
244 ConstBufferSequence buffers_;
245 std::stop_token token_;
246 mutable std::error_code ec_;
247 mutable std::size_t bytes_transferred_ = 0;
248
249 222775 write_some_awaitable(
250 io_stream& ios,
251 ConstBufferSequence buffers) noexcept
252 222775 : ios_(ios)
253 222775 , buffers_(std::move(buffers))
254 {
255 222775 }
256
257 222775 bool await_ready() const noexcept
258 {
259 222775 return token_.stop_requested();
260 }
261
262 222775 capy::io_result<std::size_t> await_resume() const noexcept
263 {
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 222775 times.
222775 if (token_.stop_requested())
265 return {make_error_code(std::errc::operation_canceled), 0};
266 222775 return {ec_, bytes_transferred_};
267 }
268
269 222775 auto await_suspend(
270 std::coroutine_handle<> h,
271 capy::io_env const* env) -> std::coroutine_handle<>
272 {
273 222775 token_ = env->stop_token;
274
1/1
✓ Branch 4 taken 222775 times.
222775 return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
275 }
276 };
277
278 public:
279 /** Platform-specific stream implementation interface.
280
281 Derived classes implement this interface to provide kernel-level
282 read and write operations for each supported platform (IOCP,
283 epoll, kqueue, io_uring).
284 */
285 struct implementation : io_object::implementation
286 {
287 /// Initiate platform read operation.
288 virtual std::coroutine_handle<> read_some(
289 std::coroutine_handle<>,
290 capy::executor_ref,
291 io_buffer_param,
292 std::stop_token,
293 std::error_code*,
294 std::size_t*) = 0;
295
296 /// Initiate platform write operation.
297 virtual std::coroutine_handle<> write_some(
298 std::coroutine_handle<>,
299 capy::executor_ref,
300 io_buffer_param,
301 std::stop_token,
302 std::error_code*,
303 std::size_t*) = 0;
304 };
305
306 protected:
307 /// Construct stream from a handle.
308 explicit
309 16343 io_stream(handle h) noexcept
310 16343 : io_object(std::move(h))
311 {
312 16343 }
313
314 private:
315 /// Return implementation downcasted to stream interface.
316 445913 implementation& get() const noexcept
317 {
318 445913 return *static_cast<implementation*>(h_.get());
319 }
320 };
321
322 } // namespace boost::corosio
323
324 #endif
325