src/corosio/src/detail/epoll/acceptors.cpp

80.2% Lines (178/222) 100.0% Functions (18/18) 57.4% Branches (70/122)
src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <utility>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <sys/epoll.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 6 epoll_accept_op::
32 cancel() noexcept
33 {
34
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
35 6 acceptor_impl_->cancel_single_op(*this);
36 else
37 request_cancel();
38 6 }
39
40 void
41 4708 epoll_accept_op::
42 operator()()
43 {
44 4708 stop_cb.reset();
45
46 4708 static_cast<epoll_acceptor_impl*>(acceptor_impl_)
47 4708 ->service().scheduler().reset_inline_budget();
48
49
3/4
✓ Branch 0 taken 4708 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4699 times.
✓ Branch 4 taken 9 times.
4708 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
50
51
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 4699 times.
4708 if (cancelled.load(std::memory_order_acquire))
52 9 *ec_out = capy::error::canceled;
53
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4699 times.
4699 else if (errn != 0)
54 *ec_out = make_err(errn);
55 else
56 4699 *ec_out = {};
57
58 // Set up the peer socket on success
59
4/6
✓ Branch 0 taken 4699 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 4699 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4699 times.
✗ Branch 5 not taken.
4708 if (success && accepted_fd >= 0 && acceptor_impl_)
60 {
61 4699 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 4699 ->service().socket_service();
63
1/2
✓ Branch 0 taken 4699 times.
✗ Branch 1 not taken.
4699 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 4699 times.
4699 auto& impl = static_cast<epoll_socket_impl&>(*socket_svc->construct());
66 4699 impl.set_socket(accepted_fd);
67
68 4699 impl.desc_state_.fd = accepted_fd;
69 {
70
1/1
✓ Branch 1 taken 4699 times.
4699 std::lock_guard lock(impl.desc_state_.mutex);
71 4699 impl.desc_state_.read_op = nullptr;
72 4699 impl.desc_state_.write_op = nullptr;
73 4699 impl.desc_state_.connect_op = nullptr;
74 4699 }
75
1/1
✓ Branch 2 taken 4699 times.
4699 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
76
77 4699 impl.set_endpoints(
78 4699 static_cast<epoll_acceptor_impl*>(acceptor_impl_)->local_endpoint(),
79 4699 from_sockaddr_in(peer_addr));
80
81
1/2
✓ Branch 0 taken 4699 times.
✗ Branch 1 not taken.
4699 if (impl_out)
82 4699 *impl_out = &impl;
83 4699 accepted_fd = -1;
84 }
85 else
86 {
87 // No socket service — treat as error
88 *ec_out = make_err(ENOENT);
89 success = false;
90 }
91 }
92
93
3/4
✓ Branch 0 taken 4699 times.
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4699 times.
4708 if (!success || !acceptor_impl_)
94 {
95
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
96 {
97 ::close(accepted_fd);
98 accepted_fd = -1;
99 }
100
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
101 9 *impl_out = nullptr;
102 }
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 4708 capy::executor_ref saved_ex( std::move( ex ) );
106 4708 std::coroutine_handle<> saved_h( std::move( h ) );
107 4708 auto prevent_premature_destruction = std::move(impl_ptr);
108
2/2
✓ Branch 1 taken 4708 times.
✓ Branch 4 taken 4708 times.
4708 dispatch_coro(saved_ex, saved_h).resume();
109 4708 }
110
111 65 epoll_acceptor_impl::
112 65 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
113 65 : svc_(svc)
114 {
115 65 }
116
117 std::coroutine_handle<>
118 4708 epoll_acceptor_impl::
119 accept(
120 std::coroutine_handle<> h,
121 capy::executor_ref ex,
122 std::stop_token token,
123 std::error_code* ec,
124 io_object::implementation** impl_out)
125 {
126 4708 auto& op = acc_;
127 4708 op.reset();
128 4708 op.h = h;
129 4708 op.ex = ex;
130 4708 op.ec_out = ec;
131 4708 op.impl_out = impl_out;
132 4708 op.fd = fd_;
133 4708 op.start(token, this);
134
135 4708 sockaddr_in addr{};
136 4708 socklen_t addrlen = sizeof(addr);
137 int accepted;
138 do {
139
1/1
✓ Branch 1 taken 4708 times.
4708 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
140 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
141
3/4
✓ Branch 0 taken 4706 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4706 times.
4708 } while (accepted < 0 && errno == EINTR);
142
143
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 4706 times.
4708 if (accepted >= 0)
144 {
145 {
146
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
147 2 desc_state_.read_ready = false;
148 2 }
149
150
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 if (svc_.scheduler().try_consume_inline_budget())
151 {
152 auto* socket_svc = svc_.socket_service();
153 if (socket_svc)
154 {
155 auto& impl = static_cast<epoll_socket_impl&>(*socket_svc->construct());
156 impl.set_socket(accepted);
157
158 impl.desc_state_.fd = accepted;
159 {
160 std::lock_guard lock(impl.desc_state_.mutex);
161 impl.desc_state_.read_op = nullptr;
162 impl.desc_state_.write_op = nullptr;
163 impl.desc_state_.connect_op = nullptr;
164 }
165 socket_svc->scheduler().register_descriptor(accepted, &impl.desc_state_);
166
167 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
168
169 *ec = {};
170 if (impl_out)
171 *impl_out = &impl;
172 }
173 else
174 {
175 ::close(accepted);
176 *ec = make_err(ENOENT);
177 if (impl_out)
178 *impl_out = nullptr;
179 }
180 return dispatch_coro(ex, h);
181 }
182
183 2 op.accepted_fd = accepted;
184 2 op.peer_addr = addr;
185 2 op.complete(0, 0);
186
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
187
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
188 2 return std::noop_coroutine();
189 }
190
191
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 4706 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
4706 if (errno == EAGAIN || errno == EWOULDBLOCK)
192 {
193
1/1
✓ Branch 1 taken 4706 times.
4706 op.impl_ptr = shared_from_this();
194 4706 svc_.work_started();
195
196
1/1
✓ Branch 1 taken 4706 times.
4706 std::lock_guard lock(desc_state_.mutex);
197 4706 bool io_done = false;
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4706 times.
4706 if (desc_state_.read_ready)
199 {
200 desc_state_.read_ready = false;
201 op.perform_io();
202 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
203 if (!io_done)
204 op.errn = 0;
205 }
206
207
3/6
✓ Branch 0 taken 4706 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 4706 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 4706 times.
4706 if (io_done || op.cancelled.load(std::memory_order_acquire))
208 {
209 svc_.post(&op);
210 svc_.work_finished();
211 }
212 else
213 {
214 4706 desc_state_.read_op = &op;
215 }
216 4706 return std::noop_coroutine();
217 4706 }
218
219 op.complete(errno, 0);
220 op.impl_ptr = shared_from_this();
221 svc_.post(&op);
222 // completion is always posted to scheduler queue, never inline.
223 return std::noop_coroutine();
224 }
225
226 void
227 257 epoll_acceptor_impl::
228 cancel() noexcept
229 {
230 257 cancel_single_op(acc_);
231 257 }
232
233 void
234 263 epoll_acceptor_impl::
235 cancel_single_op(epoll_op& op) noexcept
236 {
237 263 op.request_cancel();
238
239 263 epoll_op* claimed = nullptr;
240 {
241 263 std::lock_guard lock(desc_state_.mutex);
242
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 254 times.
263 if (desc_state_.read_op == &op)
243 9 claimed = std::exchange(desc_state_.read_op, nullptr);
244 263 }
245
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 254 times.
263 if (claimed)
246 {
247 try {
248
1/1
✓ Branch 1 taken 9 times.
9 op.impl_ptr = shared_from_this();
249 } catch (const std::bad_weak_ptr&) {}
250 9 svc_.post(&op);
251 9 svc_.work_finished();
252 }
253 263 }
254
255 void
256 256 epoll_acceptor_impl::
257 close_socket() noexcept
258 {
259 256 cancel();
260
261
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 255 times.
256 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
262 {
263 try {
264
1/1
✓ Branch 1 taken 1 time.
1 desc_state_.impl_ref_ = shared_from_this();
265 } catch (std::bad_weak_ptr const&) {}
266 }
267
268
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 194 times.
256 if (fd_ >= 0)
269 {
270
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
271 62 svc_.scheduler().deregister_descriptor(fd_);
272 62 ::close(fd_);
273 62 fd_ = -1;
274 }
275
276 256 desc_state_.fd = -1;
277 {
278 256 std::lock_guard lock(desc_state_.mutex);
279 256 desc_state_.read_op = nullptr;
280 256 desc_state_.read_ready = false;
281 256 desc_state_.write_ready = false;
282 256 }
283 256 desc_state_.registered_events = 0;
284
285 // Clear cached endpoint
286 256 local_endpoint_ = endpoint{};
287 256 }
288
289 203 epoll_acceptor_service::
290 203 epoll_acceptor_service(capy::execution_context& ctx)
291 203 : ctx_(ctx)
292
2/2
✓ Branch 2 taken 203 times.
✓ Branch 5 taken 203 times.
203 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
293 {
294 203 }
295
296 406 epoll_acceptor_service::
297 203 ~epoll_acceptor_service()
298 {
299 406 }
300
301 void
302 203 epoll_acceptor_service::
303 shutdown()
304 {
305
1/1
✓ Branch 2 taken 203 times.
203 std::lock_guard lock(state_->mutex_);
306
307
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 203 times.
203 while (auto* impl = state_->acceptor_list_.pop_front())
308 impl->close_socket();
309
310 // Don't clear acceptor_ptrs_ here — same rationale as
311 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
312 // after scheduler shutdown has drained all queued ops.
313 203 }
314
315 io_object::implementation*
316 65 epoll_acceptor_service::
317 construct()
318 {
319
1/1
✓ Branch 1 taken 65 times.
65 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
320 65 auto* raw = impl.get();
321
322
1/1
✓ Branch 2 taken 65 times.
65 std::lock_guard lock(state_->mutex_);
323 65 state_->acceptor_list_.push_back(raw);
324
1/1
✓ Branch 3 taken 65 times.
65 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
325
326 65 return raw;
327 65 }
328
329 void
330 65 epoll_acceptor_service::
331 destroy(io_object::implementation* impl)
332 {
333 65 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(impl);
334 65 epoll_impl->close_socket();
335
1/1
✓ Branch 2 taken 65 times.
65 std::lock_guard lock(state_->mutex_);
336 65 state_->acceptor_list_.remove(epoll_impl);
337
1/1
✓ Branch 2 taken 65 times.
65 state_->acceptor_ptrs_.erase(epoll_impl);
338 65 }
339
340 void
341 127 epoll_acceptor_service::
342 close(io_object::handle& h)
343 {
344 127 static_cast<epoll_acceptor_impl*>(h.get())->close_socket();
345 127 }
346
347 std::error_code
348 64 epoll_acceptor_service::
349 open_acceptor(
350 tcp_acceptor::implementation& impl,
351 endpoint ep,
352 int backlog)
353 {
354 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
355 64 epoll_impl->close_socket();
356
357 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
358
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if (fd < 0)
359 return make_err(errno);
360
361 64 int reuse = 1;
362 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
363
364 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
365
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 62 times.
64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
366 {
367 2 int errn = errno;
368
1/1
✓ Branch 1 taken 2 times.
2 ::close(fd);
369 2 return make_err(errn);
370 }
371
372
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
373 {
374 int errn = errno;
375 ::close(fd);
376 return make_err(errn);
377 }
378
379 62 epoll_impl->fd_ = fd;
380
381 // Register fd with epoll (edge-triggered mode)
382 62 epoll_impl->desc_state_.fd = fd;
383 {
384
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
385 62 epoll_impl->desc_state_.read_op = nullptr;
386 62 }
387
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
388
389 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
390 62 sockaddr_in local_addr{};
391 62 socklen_t local_len = sizeof(local_addr);
392
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
393 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
394
395 62 return {};
396 }
397
398 void
399 11 epoll_acceptor_service::
400 post(epoll_op* op)
401 {
402 11 state_->sched_.post(op);
403 11 }
404
405 void
406 4706 epoll_acceptor_service::
407 work_started() noexcept
408 {
409 4706 state_->sched_.work_started();
410 4706 }
411
412 void
413 9 epoll_acceptor_service::
414 work_finished() noexcept
415 {
416 9 state_->sched_.work_finished();
417 9 }
418
419 epoll_socket_service*
420 4699 epoll_acceptor_service::
421 socket_service() const noexcept
422 {
423 4699 auto* svc = ctx_.find_service<detail::socket_service>();
424
2/4
✓ Branch 0 taken 4699 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4699 times.
✗ Branch 3 not taken.
4699 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
425 }
426
427 } // namespace boost::corosio::detail
428
429 #endif // BOOST_COROSIO_HAS_EPOLL
430