src/corosio/src/detail/select/acceptors.cpp

62.2% Lines (156/251) 88.9% Functions (16/18) 43.9% Branches (65/148)
src/corosio/src/detail/select/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/acceptors.hpp"
15 #include "src/detail/select/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 void
29 select_accept_op::
30 cancel() noexcept
31 {
32 if (acceptor_impl_)
33 acceptor_impl_->cancel_single_op(*this);
34 else
35 request_cancel();
36 }
37
38 void
39 3437 select_accept_op::
40 operator()()
41 {
42 3437 stop_cb.reset();
43
44
3/4
✓ Branch 0 taken 3437 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3434 times.
✓ Branch 4 taken 3 times.
3437 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
45
46
1/2
✓ Branch 0 taken 3437 times.
✗ Branch 1 not taken.
3437 if (ec_out)
47 {
48
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3434 times.
3437 if (cancelled.load(std::memory_order_acquire))
49 3 *ec_out = capy::error::canceled;
50
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3434 times.
3434 else if (errn != 0)
51 *ec_out = make_err(errn);
52 else
53 3434 *ec_out = {};
54 }
55
56
3/4
✓ Branch 0 taken 3434 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3434 times.
✗ Branch 3 not taken.
3437 if (success && accepted_fd >= 0)
57 {
58
1/2
✓ Branch 0 taken 3434 times.
✗ Branch 1 not taken.
3434 if (acceptor_impl_)
59 {
60 3434 auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
61 3434 ->service().socket_service();
62
1/2
✓ Branch 0 taken 3434 times.
✗ Branch 1 not taken.
3434 if (socket_svc)
63 {
64
1/1
✓ Branch 1 taken 3434 times.
3434 auto& impl = static_cast<select_socket_impl&>(*socket_svc->construct());
65 3434 impl.set_socket(accepted_fd);
66
67 3434 sockaddr_in local_addr{};
68 3434 socklen_t local_len = sizeof(local_addr);
69 3434 sockaddr_in remote_addr{};
70 3434 socklen_t remote_len = sizeof(remote_addr);
71
72 3434 endpoint local_ep, remote_ep;
73
1/2
✓ Branch 1 taken 3434 times.
✗ Branch 2 not taken.
3434 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
74 3434 local_ep = from_sockaddr_in(local_addr);
75
1/2
✓ Branch 1 taken 3434 times.
✗ Branch 2 not taken.
3434 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
76 3434 remote_ep = from_sockaddr_in(remote_addr);
77
78 3434 impl.set_endpoints(local_ep, remote_ep);
79
80
1/2
✓ Branch 0 taken 3434 times.
✗ Branch 1 not taken.
3434 if (impl_out)
81 3434 *impl_out = &impl;
82
83 3434 accepted_fd = -1;
84 }
85 else
86 {
87 if (ec_out && !*ec_out)
88 *ec_out = make_err(ENOENT);
89 ::close(accepted_fd);
90 accepted_fd = -1;
91 if (impl_out)
92 *impl_out = nullptr;
93 }
94 }
95 else
96 {
97 ::close(accepted_fd);
98 accepted_fd = -1;
99 if (impl_out)
100 *impl_out = nullptr;
101 }
102 3434 }
103 else
104 {
105
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (accepted_fd >= 0)
106 {
107 ::close(accepted_fd);
108 accepted_fd = -1;
109 }
110
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (peer_impl)
112 {
113 auto* socket_svc_cleanup = static_cast<select_acceptor_impl*>(acceptor_impl_)
114 ->service().socket_service();
115 if (socket_svc_cleanup)
116 socket_svc_cleanup->destroy(peer_impl);
117 peer_impl = nullptr;
118 }
119
120
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (impl_out)
121 3 *impl_out = nullptr;
122 }
123
124 // Move to stack before destroying the frame
125 3437 capy::executor_ref saved_ex( std::move( ex ) );
126 3437 std::coroutine_handle<> saved_h( std::move( h ) );
127 3437 impl_ptr.reset();
128
2/2
✓ Branch 1 taken 3437 times.
✓ Branch 4 taken 3437 times.
3437 dispatch_coro(saved_ex, saved_h).resume();
129 3437 }
130
131 44 select_acceptor_impl::
132 44 select_acceptor_impl(select_acceptor_service& svc) noexcept
133 44 : svc_(svc)
134 {
135 44 }
136
137 std::coroutine_handle<>
138 3437 select_acceptor_impl::
139 accept(
140 std::coroutine_handle<> h,
141 capy::executor_ref ex,
142 std::stop_token token,
143 std::error_code* ec,
144 io_object::implementation** impl_out)
145 {
146 3437 auto& op = acc_;
147 3437 op.reset();
148 3437 op.h = h;
149 3437 op.ex = ex;
150 3437 op.ec_out = ec;
151 3437 op.impl_out = impl_out;
152 3437 op.fd = fd_;
153 3437 op.start(token, this);
154
155 3437 sockaddr_in addr{};
156 3437 socklen_t addrlen = sizeof(addr);
157
1/1
✓ Branch 1 taken 3437 times.
3437 int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
158
159
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3435 times.
3437 if (accepted >= 0)
160 {
161 // Reject fds that exceed select()'s FD_SETSIZE limit.
162 // Better to fail now than during later async operations.
163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (accepted >= FD_SETSIZE)
164 {
165 ::close(accepted);
166 op.accepted_fd = -1;
167 op.complete(EINVAL, 0);
168 op.impl_ptr = shared_from_this();
169 svc_.post(&op);
170 // completion is always posted to scheduler queue, never inline.
171 return std::noop_coroutine();
172 }
173
174 // Set non-blocking and close-on-exec flags.
175 // A non-blocking socket is essential for the async reactor;
176 // if we can't configure it, fail rather than risk blocking.
177
1/1
✓ Branch 1 taken 2 times.
2 int flags = ::fcntl(accepted, F_GETFL, 0);
178
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (flags == -1)
179 {
180 int err = errno;
181 ::close(accepted);
182 op.accepted_fd = -1;
183 op.complete(err, 0);
184 op.impl_ptr = shared_from_this();
185 svc_.post(&op);
186 // completion is always posted to scheduler queue, never inline.
187 return std::noop_coroutine();
188 }
189
190
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
191 {
192 int err = errno;
193 ::close(accepted);
194 op.accepted_fd = -1;
195 op.complete(err, 0);
196 op.impl_ptr = shared_from_this();
197 svc_.post(&op);
198 // completion is always posted to scheduler queue, never inline.
199 return std::noop_coroutine();
200 }
201
202
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
203 {
204 int err = errno;
205 ::close(accepted);
206 op.accepted_fd = -1;
207 op.complete(err, 0);
208 op.impl_ptr = shared_from_this();
209 svc_.post(&op);
210 // completion is always posted to scheduler queue, never inline.
211 return std::noop_coroutine();
212 }
213
214 2 op.accepted_fd = accepted;
215 2 op.complete(0, 0);
216
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
217
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
218 // completion is always posted to scheduler queue, never inline.
219 2 return std::noop_coroutine();
220 }
221
222
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 3435 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
3435 if (errno == EAGAIN || errno == EWOULDBLOCK)
223 {
224 3435 svc_.work_started();
225
1/1
✓ Branch 1 taken 3435 times.
3435 op.impl_ptr = shared_from_this();
226
227 // Set registering BEFORE register_fd to close the race window where
228 // reactor sees an event before we set registered.
229 3435 op.registered.store(select_registration_state::registering, std::memory_order_release);
230
1/1
✓ Branch 2 taken 3435 times.
3435 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
231
232 // Transition to registered. If this fails, reactor or cancel already
233 // claimed the op (state is now unregistered), so we're done. However,
234 // we must still deregister the fd because cancel's deregister_fd may
235 // have run before our register_fd, leaving the fd orphaned.
236 3435 auto expected = select_registration_state::registering;
237
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3435 times.
3435 if (!op.registered.compare_exchange_strong(
238 expected, select_registration_state::registered, std::memory_order_acq_rel))
239 {
240 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
241 // completion is always posted to scheduler queue, never inline.
242 return std::noop_coroutine();
243 }
244
245 // If cancelled was set before we registered, handle it now.
246
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3435 times.
3435 if (op.cancelled.load(std::memory_order_acquire))
247 {
248 auto prev = op.registered.exchange(
249 select_registration_state::unregistered, std::memory_order_acq_rel);
250 if (prev != select_registration_state::unregistered)
251 {
252 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
253 op.impl_ptr = shared_from_this();
254 svc_.post(&op);
255 svc_.work_finished();
256 }
257 }
258 // completion is always posted to scheduler queue, never inline.
259 3435 return std::noop_coroutine();
260 }
261
262 op.complete(errno, 0);
263 op.impl_ptr = shared_from_this();
264 svc_.post(&op);
265 // completion is always posted to scheduler queue, never inline.
266 return std::noop_coroutine();
267 }
268
269 void
270 173 select_acceptor_impl::
271 cancel() noexcept
272 {
273 173 std::shared_ptr<select_acceptor_impl> self;
274 try {
275
1/1
✓ Branch 1 taken 173 times.
173 self = shared_from_this();
276 } catch (const std::bad_weak_ptr&) {
277 return;
278 }
279
280 173 auto prev = acc_.registered.exchange(
281 select_registration_state::unregistered, std::memory_order_acq_rel);
282 173 acc_.request_cancel();
283
284
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 170 times.
173 if (prev != select_registration_state::unregistered)
285 {
286 3 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
287 3 acc_.impl_ptr = self;
288 3 svc_.post(&acc_);
289 3 svc_.work_finished();
290 }
291 173 }
292
293 void
294 select_acceptor_impl::
295 cancel_single_op(select_op& op) noexcept
296 {
297 // Called from stop_token callback to cancel a specific pending operation.
298 auto prev = op.registered.exchange(
299 select_registration_state::unregistered, std::memory_order_acq_rel);
300 op.request_cancel();
301
302 if (prev != select_registration_state::unregistered)
303 {
304 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
305
306 // Keep impl alive until op completes
307 try {
308 op.impl_ptr = shared_from_this();
309 } catch (const std::bad_weak_ptr&) {
310 // Impl is being destroyed, op will be orphaned but that's ok
311 }
312
313 svc_.post(&op);
314 svc_.work_finished();
315 }
316 }
317
318 void
319 172 select_acceptor_impl::
320 close_socket() noexcept
321 {
322 172 cancel();
323
324
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 130 times.
172 if (fd_ >= 0)
325 {
326 // Unconditionally remove from registered_fds_ to handle edge cases
327 42 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
328 42 ::close(fd_);
329 42 fd_ = -1;
330 }
331
332 // Clear cached endpoint
333 172 local_endpoint_ = endpoint{};
334 172 }
335
336 133 select_acceptor_service::
337 133 select_acceptor_service(capy::execution_context& ctx)
338 133 : ctx_(ctx)
339
2/2
✓ Branch 2 taken 133 times.
✓ Branch 5 taken 133 times.
133 , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
340 {
341 133 }
342
343 266 select_acceptor_service::
344 133 ~select_acceptor_service()
345 {
346 266 }
347
348 void
349 133 select_acceptor_service::
350 shutdown()
351 {
352
1/1
✓ Branch 2 taken 133 times.
133 std::lock_guard lock(state_->mutex_);
353
354
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 133 times.
133 while (auto* impl = state_->acceptor_list_.pop_front())
355 impl->close_socket();
356
357 // Don't clear acceptor_ptrs_ here — same rationale as
358 // select_socket_service::shutdown(). Let ~state_ release ptrs
359 // after scheduler shutdown has drained all queued ops.
360 133 }
361
362 io_object::implementation*
363 44 select_acceptor_service::
364 construct()
365 {
366
1/1
✓ Branch 1 taken 44 times.
44 auto impl = std::make_shared<select_acceptor_impl>(*this);
367 44 auto* raw = impl.get();
368
369
1/1
✓ Branch 2 taken 44 times.
44 std::lock_guard lock(state_->mutex_);
370 44 state_->acceptor_list_.push_back(raw);
371
1/1
✓ Branch 3 taken 44 times.
44 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
372
373 44 return raw;
374 44 }
375
376 void
377 44 select_acceptor_service::
378 destroy(io_object::implementation* impl)
379 {
380 44 auto* select_impl = static_cast<select_acceptor_impl*>(impl);
381 44 select_impl->close_socket();
382
1/1
✓ Branch 2 taken 44 times.
44 std::lock_guard lock(state_->mutex_);
383 44 state_->acceptor_list_.remove(select_impl);
384
1/1
✓ Branch 2 taken 44 times.
44 state_->acceptor_ptrs_.erase(select_impl);
385 44 }
386
387 void
388 86 select_acceptor_service::
389 close(io_object::handle& h)
390 {
391 86 static_cast<select_acceptor_impl*>(h.get())->close_socket();
392 86 }
393
394 std::error_code
395 42 select_acceptor_service::
396 open_acceptor(
397 tcp_acceptor::implementation& impl,
398 endpoint ep,
399 int backlog)
400 {
401 42 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
402 42 select_impl->close_socket();
403
404 42 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
405
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (fd < 0)
406 return make_err(errno);
407
408 // Set non-blocking and close-on-exec
409
1/1
✓ Branch 1 taken 42 times.
42 int flags = ::fcntl(fd, F_GETFL, 0);
410
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (flags == -1)
411 {
412 int errn = errno;
413 ::close(fd);
414 return make_err(errn);
415 }
416
2/3
✓ Branch 1 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 42 times.
42 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
417 {
418 int errn = errno;
419 ::close(fd);
420 return make_err(errn);
421 }
422
2/3
✓ Branch 1 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 42 times.
42 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
423 {
424 int errn = errno;
425 ::close(fd);
426 return make_err(errn);
427 }
428
429 // Check fd is within select() limits
430
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (fd >= FD_SETSIZE)
431 {
432 ::close(fd);
433 return make_err(EMFILE);
434 }
435
436 42 int reuse = 1;
437 42 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
438
439 42 sockaddr_in addr = detail::to_sockaddr_in(ep);
440
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
441 {
442 int errn = errno;
443 ::close(fd);
444 return make_err(errn);
445 }
446
447
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::listen(fd, backlog) < 0)
448 {
449 int errn = errno;
450 ::close(fd);
451 return make_err(errn);
452 }
453
454 42 select_impl->fd_ = fd;
455
456 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
457 42 sockaddr_in local_addr{};
458 42 socklen_t local_len = sizeof(local_addr);
459
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
460 42 select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
461
462 42 return {};
463 }
464
465 void
466 5 select_acceptor_service::
467 post(select_op* op)
468 {
469 5 state_->sched_.post(op);
470 5 }
471
472 void
473 3435 select_acceptor_service::
474 work_started() noexcept
475 {
476 3435 state_->sched_.work_started();
477 3435 }
478
479 void
480 3 select_acceptor_service::
481 work_finished() noexcept
482 {
483 3 state_->sched_.work_finished();
484 3 }
485
486 select_socket_service*
487 3434 select_acceptor_service::
488 socket_service() const noexcept
489 {
490 3434 auto* svc = ctx_.find_service<detail::socket_service>();
491
2/4
✓ Branch 0 taken 3434 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3434 times.
✗ Branch 3 not taken.
3434 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
492 }
493
494 } // namespace boost::corosio::detail
495
496 #endif // BOOST_COROSIO_HAS_SELECT
497