src/corosio/src/detail/epoll/sockets.cpp

80.3% Lines (346/431) 94.4% Functions (34/36) 63.8% Branches (146/229)
src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 4902 epoll_socket_impl::
37 register_op(
38 epoll_op& op,
39 epoll_op*& desc_slot,
40 bool& ready_flag,
41 bool& cancel_flag) noexcept
42 {
43 4902 svc_.work_started();
44
45 4902 std::lock_guard lock(desc_state_.mutex);
46 4902 bool io_done = false;
47
2/2
✓ Branch 0 taken 142 times.
✓ Branch 1 taken 4760 times.
4902 if (ready_flag)
48 {
49 142 ready_flag = false;
50 142 op.perform_io();
51
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
52
1/2
✓ Branch 0 taken 142 times.
✗ Branch 1 not taken.
142 if (!io_done)
53 142 op.errn = 0;
54 }
55
56
2/2
✓ Branch 0 taken 95 times.
✓ Branch 1 taken 4807 times.
4902 if (cancel_flag)
57 {
58 95 cancel_flag = false;
59 95 op.cancelled.store(true, std::memory_order_relaxed);
60 }
61
62
5/6
✓ Branch 0 taken 4902 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 95 times.
✓ Branch 4 taken 4807 times.
✓ Branch 5 taken 95 times.
✓ Branch 6 taken 4807 times.
4902 if (io_done || op.cancelled.load(std::memory_order_acquire))
63 {
64 95 svc_.post(&op);
65 95 svc_.work_finished();
66 }
67 else
68 {
69 4807 desc_slot = &op;
70 }
71 4902 }
72
73 void
74 104 epoll_op::canceller::
75 operator()() const noexcept
76 {
77 104 op->cancel();
78 104 }
79
80 void
81 epoll_connect_op::
82 cancel() noexcept
83 {
84 if (socket_impl_)
85 socket_impl_->cancel_single_op(*this);
86 else
87 request_cancel();
88 }
89
90 void
91 98 epoll_read_op::
92 cancel() noexcept
93 {
94
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
95 98 socket_impl_->cancel_single_op(*this);
96 else
97 request_cancel();
98 98 }
99
100 void
101 epoll_write_op::
102 cancel() noexcept
103 {
104 if (socket_impl_)
105 socket_impl_->cancel_single_op(*this);
106 else
107 request_cancel();
108 }
109
110 void
111 55854 epoll_op::
112 operator()()
113 {
114 55854 stop_cb.reset();
115
116 55854 socket_impl_->svc_.scheduler().reset_inline_budget();
117
118
2/2
✓ Branch 1 taken 205 times.
✓ Branch 2 taken 55649 times.
55854 if (cancelled.load(std::memory_order_acquire))
119 205 *ec_out = capy::error::canceled;
120
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 55649 times.
55649 else if (errn != 0)
121 *ec_out = make_err(errn);
122
4/6
✓ Branch 1 taken 27816 times.
✓ Branch 2 taken 27833 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 27816 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 55649 times.
55649 else if (is_read_operation() && bytes_transferred == 0)
123 *ec_out = capy::error::eof;
124 else
125 55649 *ec_out = {};
126
127 55854 *bytes_out = bytes_transferred;
128
129 // Move to stack before resuming coroutine. The coroutine might close
130 // the socket, releasing the last wrapper ref. If impl_ptr were the
131 // last ref and we destroyed it while still in operator(), we'd have
132 // use-after-free. Moving to local ensures destruction happens at
133 // function exit, after all member accesses are complete.
134 55854 capy::executor_ref saved_ex( std::move( ex ) );
135 55854 std::coroutine_handle<> saved_h( std::move( h ) );
136 55854 auto prevent_premature_destruction = std::move(impl_ptr);
137
2/2
✓ Branch 1 taken 55854 times.
✓ Branch 4 taken 55854 times.
55854 dispatch_coro(saved_ex, saved_h).resume();
138 55854 }
139
140 void
141 4701 epoll_connect_op::
142 operator()()
143 {
144 4701 stop_cb.reset();
145
146 4701 socket_impl_->svc_.scheduler().reset_inline_budget();
147
148
3/4
✓ Branch 0 taken 4699 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 4699 times.
✗ Branch 4 not taken.
4701 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
149
150 // Cache endpoints on successful connect
151
3/4
✓ Branch 0 taken 4699 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 4699 times.
✗ Branch 3 not taken.
4701 if (success && socket_impl_)
152 {
153 // Query local endpoint via getsockname (may fail, but remote is always known)
154 4699 endpoint local_ep;
155 4699 sockaddr_in local_addr{};
156 4699 socklen_t local_len = sizeof(local_addr);
157
1/2
✓ Branch 1 taken 4699 times.
✗ Branch 2 not taken.
4699 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
158 4699 local_ep = from_sockaddr_in(local_addr);
159 // Always cache remote endpoint; local may be default if getsockname failed
160 4699 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
161 }
162
163
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4701 times.
4701 if (cancelled.load(std::memory_order_acquire))
164 *ec_out = capy::error::canceled;
165
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 4699 times.
4701 else if (errn != 0)
166 2 *ec_out = make_err(errn);
167 else
168 4699 *ec_out = {};
169
170 // Move to stack before resuming. See epoll_op::operator()() for rationale.
171 4701 capy::executor_ref saved_ex( std::move( ex ) );
172 4701 std::coroutine_handle<> saved_h( std::move( h ) );
173 4701 auto prevent_premature_destruction = std::move(impl_ptr);
174
2/2
✓ Branch 1 taken 4701 times.
✓ Branch 4 taken 4701 times.
4701 dispatch_coro(saved_ex, saved_h).resume();
175 4701 }
176
177 14154 epoll_socket_impl::
178 14154 epoll_socket_impl(epoll_socket_service& svc) noexcept
179 14154 : svc_(svc)
180 {
181 14154 }
182
183 14154 epoll_socket_impl::
184 ~epoll_socket_impl() = default;
185
186 std::coroutine_handle<>
187 4701 epoll_socket_impl::
188 connect(
189 std::coroutine_handle<> h,
190 capy::executor_ref ex,
191 endpoint ep,
192 std::stop_token token,
193 std::error_code* ec)
194 {
195 4701 auto& op = conn_;
196
197 4701 sockaddr_in addr = detail::to_sockaddr_in(ep);
198
1/1
✓ Branch 1 taken 4701 times.
4701 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
199
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4701 times.
4701 if (result == 0)
201 {
202 sockaddr_in local_addr{};
203 socklen_t local_len = sizeof(local_addr);
204 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
205 local_endpoint_ = detail::from_sockaddr_in(local_addr);
206 remote_endpoint_ = ep;
207 }
208
209
2/4
✓ Branch 0 taken 4701 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4701 times.
4701 if (result == 0 || errno != EINPROGRESS)
210 {
211 int err = (result < 0) ? errno : 0;
212 if (svc_.scheduler().try_consume_inline_budget())
213 {
214 *ec = err ? make_err(err) : std::error_code{};
215 return dispatch_coro(ex, h);
216 }
217 op.reset();
218 op.h = h;
219 op.ex = ex;
220 op.ec_out = ec;
221 op.fd = fd_;
222 op.target_endpoint = ep;
223 op.start(token, this);
224 op.impl_ptr = shared_from_this();
225 op.complete(err, 0);
226 svc_.post(&op);
227 return std::noop_coroutine();
228 }
229
230 // EINPROGRESS — register with reactor
231 4701 op.reset();
232 4701 op.h = h;
233 4701 op.ex = ex;
234 4701 op.ec_out = ec;
235 4701 op.fd = fd_;
236 4701 op.target_endpoint = ep;
237 4701 op.start(token, this);
238
1/1
✓ Branch 1 taken 4701 times.
4701 op.impl_ptr = shared_from_this();
239
240 4701 register_op(op, desc_state_.connect_op, desc_state_.write_ready,
241 4701 desc_state_.connect_cancel_pending);
242 4701 return std::noop_coroutine();
243 }
244
245 std::coroutine_handle<>
246 139521 epoll_socket_impl::
247 read_some(
248 std::coroutine_handle<> h,
249 capy::executor_ref ex,
250 io_buffer_param param,
251 std::stop_token token,
252 std::error_code* ec,
253 std::size_t* bytes_out)
254 {
255 139521 auto& op = rd_;
256 139521 op.reset();
257
258 139521 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
259 139521 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
260
261
6/8
✓ Branch 0 taken 139520 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 139520 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 139520 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 139520 times.
139521 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
262 {
263 1 op.empty_buffer_read = true;
264 1 op.h = h;
265 1 op.ex = ex;
266 1 op.ec_out = ec;
267 1 op.bytes_out = bytes_out;
268 1 op.start(token, this);
269
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
270 1 op.complete(0, 0);
271
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
272 1 return std::noop_coroutine();
273 }
274
275
2/2
✓ Branch 0 taken 139520 times.
✓ Branch 1 taken 139520 times.
279040 for (int i = 0; i < op.iovec_count; ++i)
276 {
277 139520 op.iovecs[i].iov_base = bufs[i].data();
278 139520 op.iovecs[i].iov_len = bufs[i].size();
279 }
280
281 // Speculative read
282 ssize_t n;
283 do {
284
1/1
✓ Branch 1 taken 139520 times.
139520 n = ::readv(fd_, op.iovecs, op.iovec_count);
285
3/4
✓ Branch 0 taken 201 times.
✓ Branch 1 taken 139319 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 201 times.
139520 } while (n < 0 && errno == EINTR);
286
287
3/6
✓ Branch 0 taken 201 times.
✓ Branch 1 taken 139319 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 201 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
139520 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
288 {
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 139319 times.
139319 int err = (n < 0) ? errno : 0;
290 139319 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
291
292
2/2
✓ Branch 2 taken 111503 times.
✓ Branch 3 taken 27816 times.
139319 if (svc_.scheduler().try_consume_inline_budget())
293 {
294
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 111503 times.
111503 if (err)
295 *ec = make_err(err);
296
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 111498 times.
111503 else if (n == 0)
297 5 *ec = capy::error::eof;
298 else
299 111498 *ec = {};
300 111503 *bytes_out = bytes;
301
1/1
✓ Branch 1 taken 111503 times.
111503 return dispatch_coro(ex, h);
302 }
303 27816 op.h = h;
304 27816 op.ex = ex;
305 27816 op.ec_out = ec;
306 27816 op.bytes_out = bytes_out;
307 27816 op.start(token, this);
308
1/1
✓ Branch 1 taken 27816 times.
27816 op.impl_ptr = shared_from_this();
309 27816 op.complete(err, bytes);
310
1/1
✓ Branch 1 taken 27816 times.
27816 svc_.post(&op);
311 27816 return std::noop_coroutine();
312 }
313
314 // EAGAIN — register with reactor
315 201 op.h = h;
316 201 op.ex = ex;
317 201 op.ec_out = ec;
318 201 op.bytes_out = bytes_out;
319 201 op.fd = fd_;
320 201 op.start(token, this);
321
1/1
✓ Branch 1 taken 201 times.
201 op.impl_ptr = shared_from_this();
322
323 201 register_op(op, desc_state_.read_op, desc_state_.read_ready,
324 201 desc_state_.read_cancel_pending);
325 201 return std::noop_coroutine();
326 }
327
328 std::coroutine_handle<>
329 139321 epoll_socket_impl::
330 write_some(
331 std::coroutine_handle<> h,
332 capy::executor_ref ex,
333 io_buffer_param param,
334 std::stop_token token,
335 std::error_code* ec,
336 std::size_t* bytes_out)
337 {
338 139321 auto& op = wr_;
339 139321 op.reset();
340
341 139321 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
342 139321 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
343
344
6/8
✓ Branch 0 taken 139320 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 139320 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 139320 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 139320 times.
139321 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
345 {
346 1 op.h = h;
347 1 op.ex = ex;
348 1 op.ec_out = ec;
349 1 op.bytes_out = bytes_out;
350 1 op.start(token, this);
351
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
352 1 op.complete(0, 0);
353
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
354 1 return std::noop_coroutine();
355 }
356
357
2/2
✓ Branch 0 taken 139320 times.
✓ Branch 1 taken 139320 times.
278640 for (int i = 0; i < op.iovec_count; ++i)
358 {
359 139320 op.iovecs[i].iov_base = bufs[i].data();
360 139320 op.iovecs[i].iov_len = bufs[i].size();
361 }
362
363 // Speculative write
364 139320 msghdr msg{};
365 139320 msg.msg_iov = op.iovecs;
366 139320 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
367
368 ssize_t n;
369 do {
370
1/1
✓ Branch 1 taken 139320 times.
139320 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
371
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 139319 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
139320 } while (n < 0 && errno == EINTR);
372
373
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 139319 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
139320 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
374 {
375
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 139319 times.
139320 int err = (n < 0) ? errno : 0;
376 139320 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
377
378
2/2
✓ Branch 2 taken 111485 times.
✓ Branch 3 taken 27835 times.
139320 if (svc_.scheduler().try_consume_inline_budget())
379 {
380
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 111484 times.
111485 *ec = err ? make_err(err) : std::error_code{};
381 111485 *bytes_out = bytes;
382
1/1
✓ Branch 1 taken 111485 times.
111485 return dispatch_coro(ex, h);
383 }
384 27835 op.h = h;
385 27835 op.ex = ex;
386 27835 op.ec_out = ec;
387 27835 op.bytes_out = bytes_out;
388 27835 op.start(token, this);
389
1/1
✓ Branch 1 taken 27835 times.
27835 op.impl_ptr = shared_from_this();
390 27835 op.complete(err, bytes);
391
1/1
✓ Branch 1 taken 27835 times.
27835 svc_.post(&op);
392 27835 return std::noop_coroutine();
393 }
394
395 // EAGAIN — register with reactor
396 op.h = h;
397 op.ex = ex;
398 op.ec_out = ec;
399 op.bytes_out = bytes_out;
400 op.fd = fd_;
401 op.start(token, this);
402 op.impl_ptr = shared_from_this();
403
404 register_op(op, desc_state_.write_op, desc_state_.write_ready,
405 desc_state_.write_cancel_pending);
406 return std::noop_coroutine();
407 }
408
409 std::error_code
410 3 epoll_socket_impl::
411 shutdown(tcp_socket::shutdown_type what) noexcept
412 {
413 int how;
414
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
415 {
416 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
417 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
418 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
419 default:
420 return make_err(EINVAL);
421 }
422
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
423 return make_err(errno);
424 3 return {};
425 }
426
427 std::error_code
428 5 epoll_socket_impl::
429 set_no_delay(bool value) noexcept
430 {
431
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
432
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
433 return make_err(errno);
434 5 return {};
435 }
436
437 bool
438 5 epoll_socket_impl::
439 no_delay(std::error_code& ec) const noexcept
440 {
441 5 int flag = 0;
442 5 socklen_t len = sizeof(flag);
443
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
444 {
445 ec = make_err(errno);
446 return false;
447 }
448 5 ec = {};
449 5 return flag != 0;
450 }
451
452 std::error_code
453 4 epoll_socket_impl::
454 set_keep_alive(bool value) noexcept
455 {
456
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
457
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
458 return make_err(errno);
459 4 return {};
460 }
461
462 bool
463 4 epoll_socket_impl::
464 keep_alive(std::error_code& ec) const noexcept
465 {
466 4 int flag = 0;
467 4 socklen_t len = sizeof(flag);
468
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
469 {
470 ec = make_err(errno);
471 return false;
472 }
473 4 ec = {};
474 4 return flag != 0;
475 }
476
477 std::error_code
478 1 epoll_socket_impl::
479 set_receive_buffer_size(int size) noexcept
480 {
481
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
482 return make_err(errno);
483 1 return {};
484 }
485
486 int
487 3 epoll_socket_impl::
488 receive_buffer_size(std::error_code& ec) const noexcept
489 {
490 3 int size = 0;
491 3 socklen_t len = sizeof(size);
492
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
493 {
494 ec = make_err(errno);
495 return 0;
496 }
497 3 ec = {};
498 3 return size;
499 }
500
501 std::error_code
502 1 epoll_socket_impl::
503 set_send_buffer_size(int size) noexcept
504 {
505
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
506 return make_err(errno);
507 1 return {};
508 }
509
510 int
511 3 epoll_socket_impl::
512 send_buffer_size(std::error_code& ec) const noexcept
513 {
514 3 int size = 0;
515 3 socklen_t len = sizeof(size);
516
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
517 {
518 ec = make_err(errno);
519 return 0;
520 }
521 3 ec = {};
522 3 return size;
523 }
524
525 std::error_code
526 8 epoll_socket_impl::
527 set_linger(bool enabled, int timeout) noexcept
528 {
529
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
530 1 return make_err(EINVAL);
531 struct ::linger lg;
532
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
533 7 lg.l_linger = timeout;
534
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
535 return make_err(errno);
536 7 return {};
537 }
538
539 tcp_socket::linger_options
540 3 epoll_socket_impl::
541 linger(std::error_code& ec) const noexcept
542 {
543 3 struct ::linger lg{};
544 3 socklen_t len = sizeof(lg);
545
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
546 {
547 ec = make_err(errno);
548 return {};
549 }
550 3 ec = {};
551 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
552 }
553
554 void
555 42618 epoll_socket_impl::
556 cancel() noexcept
557 {
558 42618 std::shared_ptr<epoll_socket_impl> self;
559 try {
560
1/1
✓ Branch 1 taken 42618 times.
42618 self = shared_from_this();
561 } catch (const std::bad_weak_ptr&) {
562 return;
563 }
564
565 42618 conn_.request_cancel();
566 42618 rd_.request_cancel();
567 42618 wr_.request_cancel();
568
569 42618 epoll_op* conn_claimed = nullptr;
570 42618 epoll_op* rd_claimed = nullptr;
571 42618 epoll_op* wr_claimed = nullptr;
572 {
573 42618 std::lock_guard lock(desc_state_.mutex);
574
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42618 times.
42618 if (desc_state_.connect_op == &conn_)
575 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
576 else
577 42618 desc_state_.connect_cancel_pending = true;
578
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 42614 times.
42618 if (desc_state_.read_op == &rd_)
579 4 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
580 else
581 42614 desc_state_.read_cancel_pending = true;
582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42618 times.
42618 if (desc_state_.write_op == &wr_)
583 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
584 else
585 42618 desc_state_.write_cancel_pending = true;
586 42618 }
587
588
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42618 times.
42618 if (conn_claimed)
589 {
590 conn_.impl_ptr = self;
591 svc_.post(&conn_);
592 svc_.work_finished();
593 }
594
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 42614 times.
42618 if (rd_claimed)
595 {
596 4 rd_.impl_ptr = self;
597 4 svc_.post(&rd_);
598 4 svc_.work_finished();
599 }
600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42618 times.
42618 if (wr_claimed)
601 {
602 wr_.impl_ptr = self;
603 svc_.post(&wr_);
604 svc_.work_finished();
605 }
606 42618 }
607
608 void
609 98 epoll_socket_impl::
610 cancel_single_op(epoll_op& op) noexcept
611 {
612 98 op.request_cancel();
613
614 98 epoll_op** desc_op_ptr = nullptr;
615
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
616
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
617 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
618
619
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
620 {
621 98 epoll_op* claimed = nullptr;
622 {
623 98 std::lock_guard lock(desc_state_.mutex);
624
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (*desc_op_ptr == &op)
625 98 claimed = std::exchange(*desc_op_ptr, nullptr);
626 else if (&op == &conn_)
627 desc_state_.connect_cancel_pending = true;
628 else if (&op == &rd_)
629 desc_state_.read_cancel_pending = true;
630 else if (&op == &wr_)
631 desc_state_.write_cancel_pending = true;
632 98 }
633
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (claimed)
634 {
635 try {
636
1/1
✓ Branch 1 taken 98 times.
98 op.impl_ptr = shared_from_this();
637 } catch (const std::bad_weak_ptr&) {}
638 98 svc_.post(&op);
639 98 svc_.work_finished();
640 }
641 }
642 98 }
643
644 void
645 42431 epoll_socket_impl::
646 close_socket() noexcept
647 {
648 42431 cancel();
649
650 // Keep impl alive if descriptor_state is queued in the scheduler.
651 // Without this, destroy_impl() drops the last shared_ptr while
652 // the queued descriptor_state node would become dangling.
653
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 42424 times.
42431 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
654 {
655 try {
656
1/1
✓ Branch 1 taken 7 times.
7 desc_state_.impl_ref_ = shared_from_this();
657 } catch (std::bad_weak_ptr const&) {}
658 }
659
660
2/2
✓ Branch 0 taken 9411 times.
✓ Branch 1 taken 33020 times.
42431 if (fd_ >= 0)
661 {
662
1/2
✓ Branch 0 taken 9411 times.
✗ Branch 1 not taken.
9411 if (desc_state_.registered_events != 0)
663 9411 svc_.scheduler().deregister_descriptor(fd_);
664 9411 ::close(fd_);
665 9411 fd_ = -1;
666 }
667
668 42431 desc_state_.fd = -1;
669 {
670 42431 std::lock_guard lock(desc_state_.mutex);
671 42431 desc_state_.read_op = nullptr;
672 42431 desc_state_.write_op = nullptr;
673 42431 desc_state_.connect_op = nullptr;
674 42431 desc_state_.read_ready = false;
675 42431 desc_state_.write_ready = false;
676 42431 desc_state_.read_cancel_pending = false;
677 42431 desc_state_.write_cancel_pending = false;
678 42431 desc_state_.connect_cancel_pending = false;
679 42431 }
680 42431 desc_state_.registered_events = 0;
681
682 42431 local_endpoint_ = endpoint{};
683 42431 remote_endpoint_ = endpoint{};
684 42431 }
685
686 203 epoll_socket_service::
687 203 epoll_socket_service(capy::execution_context& ctx)
688
2/2
✓ Branch 2 taken 203 times.
✓ Branch 5 taken 203 times.
203 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
689 {
690 203 }
691
692 406 epoll_socket_service::
693 203 ~epoll_socket_service()
694 {
695 406 }
696
697 void
698 203 epoll_socket_service::
699 shutdown()
700 {
701
1/1
✓ Branch 2 taken 203 times.
203 std::lock_guard lock(state_->mutex_);
702
703
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 203 times.
203 while (auto* impl = state_->socket_list_.pop_front())
704 impl->close_socket();
705
706 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
707 // drains completed_ops_, calling destroy() on each queued op. If we
708 // released our shared_ptrs now, an epoll_op::destroy() could free the
709 // last ref to an impl whose embedded descriptor_state is still linked
710 // in the queue — use-after-free on the next pop(). Letting ~state_
711 // release the ptrs (during service destruction, after scheduler
712 // shutdown) keeps every impl alive until all ops have been drained.
713 203 }
714
715 io_object::implementation*
716 14154 epoll_socket_service::
717 construct()
718 {
719
1/1
✓ Branch 1 taken 14154 times.
14154 auto impl = std::make_shared<epoll_socket_impl>(*this);
720 14154 auto* raw = impl.get();
721
722 {
723
1/1
✓ Branch 2 taken 14154 times.
14154 std::lock_guard lock(state_->mutex_);
724 14154 state_->socket_list_.push_back(raw);
725
1/1
✓ Branch 3 taken 14154 times.
14154 state_->socket_ptrs_.emplace(raw, std::move(impl));
726 14154 }
727
728 14154 return raw;
729 14154 }
730
731 void
732 14154 epoll_socket_service::
733 destroy(io_object::implementation* impl)
734 {
735 14154 auto* epoll_impl = static_cast<epoll_socket_impl*>(impl);
736 14154 epoll_impl->close_socket();
737
1/1
✓ Branch 2 taken 14154 times.
14154 std::lock_guard lock(state_->mutex_);
738 14154 state_->socket_list_.remove(epoll_impl);
739
1/1
✓ Branch 2 taken 14154 times.
14154 state_->socket_ptrs_.erase(epoll_impl);
740 14154 }
741
742 std::error_code
743 4712 epoll_socket_service::
744 open_socket(tcp_socket::implementation& impl)
745 {
746 4712 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
747 4712 epoll_impl->close_socket();
748
749 4712 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
750
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4712 times.
4712 if (fd < 0)
751 return make_err(errno);
752
753 4712 epoll_impl->fd_ = fd;
754
755 // Register fd with epoll (edge-triggered mode)
756 4712 epoll_impl->desc_state_.fd = fd;
757 {
758
1/1
✓ Branch 1 taken 4712 times.
4712 std::lock_guard lock(epoll_impl->desc_state_.mutex);
759 4712 epoll_impl->desc_state_.read_op = nullptr;
760 4712 epoll_impl->desc_state_.write_op = nullptr;
761 4712 epoll_impl->desc_state_.connect_op = nullptr;
762 4712 }
763 4712 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
764
765 4712 return {};
766 }
767
768 void
769 23565 epoll_socket_service::
770 close(io_object::handle& h)
771 {
772 23565 static_cast<epoll_socket_impl*>(h.get())->close_socket();
773 23565 }
774
775 void
776 55850 epoll_socket_service::
777 post(epoll_op* op)
778 {
779 55850 state_->sched_.post(op);
780 55850 }
781
782 void
783 4902 epoll_socket_service::
784 work_started() noexcept
785 {
786 4902 state_->sched_.work_started();
787 4902 }
788
789 void
790 197 epoll_socket_service::
791 work_finished() noexcept
792 {
793 197 state_->sched_.work_finished();
794 197 }
795
796 } // namespace boost::corosio::detail
797
798 #endif // BOOST_COROSIO_HAS_EPOLL
799