src/corosio/src/detail/epoll/scheduler.cpp

80.1% Lines (407/508) 89.6% Functions (43/48) 67.6% Branches (209/309)
src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107 int inline_budget_max;
108 bool unassisted;
109
110 189 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
111 189 : key(k)
112 189 , next(n)
113 189 , private_outstanding_work(0)
114 189 , inline_budget(0)
115 189 , inline_budget_max(2)
116 189 , unassisted(false)
117 {
118 189 }
119 };
120
121 namespace {
122
123 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
124
125 struct thread_context_guard
126 {
127 scheduler_context frame_;
128
129 189 explicit thread_context_guard(
130 epoll_scheduler const* ctx) noexcept
131 189 : frame_(ctx, context_stack.get())
132 {
133 189 context_stack.set(&frame_);
134 189 }
135
136 189 ~thread_context_guard() noexcept
137 {
138
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (!frame_.private_queue.empty())
139 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
140 189 context_stack.set(frame_.next);
141 189 }
142 };
143
144 scheduler_context*
145 449322 find_context(epoll_scheduler const* self) noexcept
146 {
147
2/2
✓ Branch 1 taken 447643 times.
✓ Branch 2 taken 1679 times.
449322 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
148
1/2
✓ Branch 0 taken 447643 times.
✗ Branch 1 not taken.
447643 if (c->key == self)
149 447643 return c;
150 1679 return nullptr;
151 }
152
153 } // namespace
154
155 void
156 65263 epoll_scheduler::
157 reset_inline_budget() const noexcept
158 {
159
1/2
✓ Branch 1 taken 65263 times.
✗ Branch 2 not taken.
65263 if (auto* ctx = find_context(this))
160 {
161 // Cap when no other thread absorbed queued work. A moderate
162 // cap (4) amortizes scheduling for small buffers while avoiding
163 // bursty I/O that fills socket buffers and stalls large transfers.
164
1/2
✓ Branch 0 taken 65263 times.
✗ Branch 1 not taken.
65263 if (ctx->unassisted)
165 {
166 65263 ctx->inline_budget_max = 4;
167 65263 ctx->inline_budget = 4;
168 65263 return;
169 }
170 // Ramp up when previous cycle fully consumed budget.
171 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
172 if (ctx->inline_budget == 0)
173 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
174 else if (ctx->inline_budget < ctx->inline_budget_max)
175 ctx->inline_budget_max = 2;
176 ctx->inline_budget = ctx->inline_budget_max;
177 }
178 }
179
180 bool
181 278641 epoll_scheduler::
182 try_consume_inline_budget() const noexcept
183 {
184
1/2
✓ Branch 1 taken 278641 times.
✗ Branch 2 not taken.
278641 if (auto* ctx = find_context(this))
185 {
186
2/2
✓ Branch 0 taken 222988 times.
✓ Branch 1 taken 55653 times.
278641 if (ctx->inline_budget > 0)
187 {
188 222988 --ctx->inline_budget;
189 222988 return true;
190 }
191 }
192 55653 return false;
193 }
194
195 void
196 47247 descriptor_state::
197 operator()()
198 {
199 47247 is_enqueued_.store(false, std::memory_order_relaxed);
200
201 // Take ownership of impl ref set by close_socket() to prevent
202 // the owning impl from being freed while we're executing
203 47247 auto prevent_impl_destruction = std::move(impl_ref_);
204
205 47247 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47247 times.
47247 if (ev == 0)
207 {
208 scheduler_->compensating_work_started();
209 return;
210 }
211
212 47247 op_queue local_ops;
213
214 47247 int err = 0;
215
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 47246 times.
47247 if (ev & EPOLLERR)
216 {
217 1 socklen_t len = sizeof(err);
218
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
219 err = errno;
220
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
221 1 err = EIO;
222 }
223
224 {
225
1/1
✓ Branch 1 taken 47247 times.
47247 std::lock_guard lock(mutex);
226
2/2
✓ Branch 0 taken 14571 times.
✓ Branch 1 taken 32676 times.
47247 if (ev & EPOLLIN)
227 {
228
2/2
✓ Branch 0 taken 4701 times.
✓ Branch 1 taken 9870 times.
14571 if (read_op)
229 {
230 4701 auto* rd = read_op;
231
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4701 times.
4701 if (err)
232 rd->complete(err, 0);
233 else
234 4701 rd->perform_io();
235
236
2/4
✓ Branch 0 taken 4701 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4701 times.
4701 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
237 {
238 rd->errn = 0;
239 }
240 else
241 {
242 4701 read_op = nullptr;
243 4701 local_ops.push(rd);
244 }
245 }
246 else
247 {
248 9870 read_ready = true;
249 }
250 }
251
2/2
✓ Branch 0 taken 42549 times.
✓ Branch 1 taken 4698 times.
47247 if (ev & EPOLLOUT)
252 {
253
3/4
✓ Branch 0 taken 37848 times.
✓ Branch 1 taken 4701 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 37848 times.
42549 bool had_write_op = (connect_op || write_op);
254
2/2
✓ Branch 0 taken 4701 times.
✓ Branch 1 taken 37848 times.
42549 if (connect_op)
255 {
256 4701 auto* cn = connect_op;
257
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4701 times.
4701 if (err)
258 cn->complete(err, 0);
259 else
260 4701 cn->perform_io();
261 4701 connect_op = nullptr;
262 4701 local_ops.push(cn);
263 }
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42549 times.
42549 if (write_op)
265 {
266 auto* wr = write_op;
267 if (err)
268 wr->complete(err, 0);
269 else
270 wr->perform_io();
271
272 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
273 {
274 wr->errn = 0;
275 }
276 else
277 {
278 write_op = nullptr;
279 local_ops.push(wr);
280 }
281 }
282
2/2
✓ Branch 0 taken 37848 times.
✓ Branch 1 taken 4701 times.
42549 if (!had_write_op)
283 37848 write_ready = true;
284 }
285
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 47246 times.
47247 if (err)
286 {
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
288 {
289 read_op->complete(err, 0);
290 local_ops.push(std::exchange(read_op, nullptr));
291 }
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
293 {
294 write_op->complete(err, 0);
295 local_ops.push(std::exchange(write_op, nullptr));
296 }
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
298 {
299 connect_op->complete(err, 0);
300 local_ops.push(std::exchange(connect_op, nullptr));
301 }
302 }
303 47247 }
304
305 // Execute first handler inline — the scheduler's work_cleanup
306 // accounts for this as the "consumed" work item
307 47247 scheduler_op* first = local_ops.pop();
308
2/2
✓ Branch 0 taken 9402 times.
✓ Branch 1 taken 37845 times.
47247 if (first)
309 {
310
1/1
✓ Branch 1 taken 9402 times.
9402 scheduler_->post_deferred_completions(local_ops);
311
1/1
✓ Branch 1 taken 9402 times.
9402 (*first)();
312 }
313 else
314 {
315 37845 scheduler_->compensating_work_started();
316 }
317 47247 }
318
319 203 epoll_scheduler::
320 epoll_scheduler(
321 capy::execution_context& ctx,
322 203 int)
323 203 : epoll_fd_(-1)
324 203 , event_fd_(-1)
325 203 , timer_fd_(-1)
326 203 , outstanding_work_(0)
327 203 , stopped_(false)
328 203 , shutdown_(false)
329 203 , task_running_{false}
330 203 , task_interrupted_(false)
331 406 , state_(0)
332 {
333 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
334
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
335 detail::throw_system_error(make_err(errno), "epoll_create1");
336
337 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
338
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
339 {
340 int errn = errno;
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "eventfd");
343 }
344
345 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
347 {
348 int errn = errno;
349 ::close(event_fd_);
350 ::close(epoll_fd_);
351 detail::throw_system_error(make_err(errn), "timerfd_create");
352 }
353
354 203 epoll_event ev{};
355 203 ev.events = EPOLLIN | EPOLLET;
356 203 ev.data.ptr = nullptr;
357
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
358 {
359 int errn = errno;
360 ::close(timer_fd_);
361 ::close(event_fd_);
362 ::close(epoll_fd_);
363 detail::throw_system_error(make_err(errn), "epoll_ctl");
364 }
365
366 203 epoll_event timer_ev{};
367 203 timer_ev.events = EPOLLIN | EPOLLERR;
368 203 timer_ev.data.ptr = &timer_fd_;
369
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
370 {
371 int errn = errno;
372 ::close(timer_fd_);
373 ::close(event_fd_);
374 ::close(epoll_fd_);
375 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
376 }
377
378
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
379
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
380 timer_service::callback(
381 this,
382 [](void* p) {
383 4914 auto* self = static_cast<epoll_scheduler*>(p);
384 4914 self->timerfd_stale_.store(true, std::memory_order_release);
385
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4914 times.
4914 if (self->task_running_.load(std::memory_order_acquire))
386 self->interrupt_reactor();
387 4914 }));
388
389 // Initialize resolver service
390
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
391
392 // Initialize signal service
393
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
394
395 // Push task sentinel to interleave reactor runs with handler execution
396 203 completed_ops_.push(&task_op_);
397 203 }
398
399 406 epoll_scheduler::
400 203 ~epoll_scheduler()
401 {
402
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
403 203 ::close(timer_fd_);
404
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
405 203 ::close(event_fd_);
406
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
407 203 ::close(epoll_fd_);
408 406 }
409
410 void
411 203 epoll_scheduler::
412 shutdown()
413 {
414 {
415
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
416 203 shutdown_ = true;
417
418
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
419 {
420
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
421 203 continue;
422 lock.unlock();
423 h->destroy();
424 lock.lock();
425 203 }
426
427 203 signal_all(lock);
428 203 }
429
430 203 outstanding_work_.store(0, std::memory_order_release);
431
432
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
433 203 interrupt_reactor();
434 203 }
435
436 void
437 6747 epoll_scheduler::
438 post(std::coroutine_handle<> h) const
439 {
440 struct post_handler final
441 : scheduler_op
442 {
443 std::coroutine_handle<> h_;
444
445 explicit
446 6747 post_handler(std::coroutine_handle<> h)
447 6747 : h_(h)
448 {
449 6747 }
450
451 13494 ~post_handler() = default;
452
453 6747 void operator()() override
454 {
455 6747 auto h = h_;
456
1/2
✓ Branch 0 taken 6747 times.
✗ Branch 1 not taken.
6747 delete this;
457
1/1
✓ Branch 1 taken 6747 times.
6747 h.resume();
458 6747 }
459
460 void destroy() override
461 {
462 delete this;
463 }
464 };
465
466
1/1
✓ Branch 1 taken 6747 times.
6747 auto ph = std::make_unique<post_handler>(h);
467
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 5094 times.
✓ Branch 2 taken 1653 times.
6747 if (auto* ctx = find_context(this))
471 {
472 5094 ++ctx->private_outstanding_work;
473 5094 ctx->private_queue.push(ph.release());
474 5094 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
481 1653 completed_ops_.push(ph.release());
482
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
483 6747 }
484
485 void
486 60826 epoll_scheduler::
487 post(scheduler_op* h) const
488 {
489 // Fast path: same thread posts to private queue
490 // Only count locally; work_cleanup batches to global counter
491
2/2
✓ Branch 1 taken 60800 times.
✓ Branch 2 taken 26 times.
60826 if (auto* ctx = find_context(this))
492 {
493 60800 ++ctx->private_outstanding_work;
494 60800 ctx->private_queue.push(h);
495 60800 return;
496 }
497
498 // Slow path: cross-thread post requires mutex
499 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
500
501
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
502 26 completed_ops_.push(h);
503
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
504 26 }
505
506 void
507 5637 epoll_scheduler::
508 on_work_started() noexcept
509 {
510 5637 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
511 5637 }
512
513 void
514 5605 epoll_scheduler::
515 on_work_finished() noexcept
516 {
517
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5605 times.
11210 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
518 stop();
519 5605 }
520
521 bool
522 696 epoll_scheduler::
523 running_in_this_thread() const noexcept
524 {
525
2/2
✓ Branch 1 taken 456 times.
✓ Branch 2 taken 240 times.
696 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
526
1/2
✓ Branch 0 taken 456 times.
✗ Branch 1 not taken.
456 if (c->key == this)
527 456 return true;
528 240 return false;
529 }
530
531 void
532 37 epoll_scheduler::
533 stop()
534 {
535
1/1
✓ Branch 1 taken 37 times.
37 std::unique_lock lock(mutex_);
536
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 16 times.
37 if (!stopped_)
537 {
538 21 stopped_ = true;
539 21 signal_all(lock);
540
1/1
✓ Branch 1 taken 21 times.
21 interrupt_reactor();
541 }
542 37 }
543
544 bool
545 18 epoll_scheduler::
546 stopped() const noexcept
547 {
548 18 std::unique_lock lock(mutex_);
549 36 return stopped_;
550 18 }
551
552 void
553 49 epoll_scheduler::
554 restart()
555 {
556
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
557 49 stopped_ = false;
558 49 }
559
560 std::size_t
561 183 epoll_scheduler::
562 run()
563 {
564
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 157 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
565 {
566
1/1
✓ Branch 1 taken 26 times.
26 stop();
567 26 return 0;
568 }
569
570 157 thread_context_guard ctx(this);
571
1/1
✓ Branch 1 taken 157 times.
157 std::unique_lock lock(mutex_);
572
573 157 std::size_t n = 0;
574 for (;;)
575 {
576
3/3
✓ Branch 1 taken 114944 times.
✓ Branch 3 taken 157 times.
✓ Branch 4 taken 114787 times.
114944 if (!do_one(lock, -1, &ctx.frame_))
577 157 break;
578
1/2
✓ Branch 1 taken 114787 times.
✗ Branch 2 not taken.
114787 if (n != (std::numeric_limits<std::size_t>::max)())
579 114787 ++n;
580
2/2
✓ Branch 1 taken 53823 times.
✓ Branch 2 taken 60964 times.
114787 if (!lock.owns_lock())
581
1/1
✓ Branch 1 taken 53823 times.
53823 lock.lock();
582 }
583 157 return n;
584 157 }
585
586 std::size_t
587 2 epoll_scheduler::
588 run_one()
589 {
590
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
591 {
592 stop();
593 return 0;
594 }
595
596 2 thread_context_guard ctx(this);
597
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
598
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
599 2 }
600
601 std::size_t
602 34 epoll_scheduler::
603 wait_one(long usec)
604 {
605
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
606 {
607
1/1
✓ Branch 1 taken 7 times.
7 stop();
608 7 return 0;
609 }
610
611 27 thread_context_guard ctx(this);
612
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
613
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
614 27 }
615
616 std::size_t
617 2 epoll_scheduler::
618 poll()
619 {
620
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
621 {
622
1/1
✓ Branch 1 taken 1 time.
1 stop();
623 1 return 0;
624 }
625
626 1 thread_context_guard ctx(this);
627
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
628
629 1 std::size_t n = 0;
630 for (;;)
631 {
632
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
633 1 break;
634
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
635 2 ++n;
636
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
637
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
638 }
639 1 return n;
640 1 }
641
642 std::size_t
643 4 epoll_scheduler::
644 poll_one()
645 {
646
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
647 {
648
1/1
✓ Branch 1 taken 2 times.
2 stop();
649 2 return 0;
650 }
651
652 2 thread_context_guard ctx(this);
653
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
654
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
655 2 }
656
657 void
658 9473 epoll_scheduler::
659 register_descriptor(int fd, descriptor_state* desc) const
660 {
661 9473 epoll_event ev{};
662 9473 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
663 9473 ev.data.ptr = desc;
664
665
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9473 times.
9473 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
666 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
667
668 9473 desc->registered_events = ev.events;
669 9473 desc->fd = fd;
670 9473 desc->scheduler_ = this;
671
672
1/1
✓ Branch 1 taken 9473 times.
9473 std::lock_guard lock(desc->mutex);
673 9473 desc->read_ready = false;
674 9473 desc->write_ready = false;
675 9473 }
676
677 void
678 9473 epoll_scheduler::
679 deregister_descriptor(int fd) const
680 {
681 9473 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
682 9473 }
683
684 void
685 9608 epoll_scheduler::
686 work_started() const noexcept
687 {
688 9608 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
689 9608 }
690
691 void
692 16238 epoll_scheduler::
693 work_finished() const noexcept
694 {
695
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16080 times.
32476 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
696 {
697 // Last work item completed - wake all threads so they can exit.
698 // signal_all() wakes threads waiting on the condvar.
699 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
700 // Both are needed because they target different blocking mechanisms.
701 158 std::unique_lock lock(mutex_);
702 158 signal_all(lock);
703
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 157 times.
✓ Branch 3 taken 1 time.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✓ Branch 6 taken 157 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
704 {
705 1 task_interrupted_ = true;
706 1 lock.unlock();
707 1 interrupt_reactor();
708 }
709 158 }
710 16238 }
711
712 void
713 37845 epoll_scheduler::
714 compensating_work_started() const noexcept
715 {
716 37845 auto* ctx = find_context(this);
717
1/2
✓ Branch 0 taken 37845 times.
✗ Branch 1 not taken.
37845 if (ctx)
718 37845 ++ctx->private_outstanding_work;
719 37845 }
720
721 void
722 epoll_scheduler::
723 drain_thread_queue(op_queue& queue, long count) const
724 {
725 // Note: outstanding_work_ was already incremented when posting
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(queue);
728 if (count > 0)
729 maybe_unlock_and_signal_one(lock);
730 }
731
732 void
733 9402 epoll_scheduler::
734 post_deferred_completions(op_queue& ops) const
735 {
736
1/2
✓ Branch 1 taken 9402 times.
✗ Branch 2 not taken.
9402 if (ops.empty())
737 9402 return;
738
739 // Fast path: if on scheduler thread, use private queue
740 if (auto* ctx = find_context(this))
741 {
742 ctx->private_queue.splice(ops);
743 return;
744 }
745
746 // Slow path: add to global queue and wake a thread
747 std::unique_lock lock(mutex_);
748 completed_ops_.splice(ops);
749 wake_one_thread_and_unlock(lock);
750 }
751
752 void
753 251 epoll_scheduler::
754 interrupt_reactor() const
755 {
756 // Only write if not already armed to avoid redundant writes
757 251 bool expected = false;
758
2/2
✓ Branch 1 taken 236 times.
✓ Branch 2 taken 15 times.
251 if (eventfd_armed_.compare_exchange_strong(expected, true,
759 std::memory_order_release, std::memory_order_relaxed))
760 {
761 236 std::uint64_t val = 1;
762
1/1
✓ Branch 1 taken 236 times.
236 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
763 }
764 251 }
765
766 void
767 382 epoll_scheduler::
768 signal_all(std::unique_lock<std::mutex>&) const
769 {
770 382 state_ |= 1;
771 382 cond_.notify_all();
772 382 }
773
774 bool
775 1679 epoll_scheduler::
776 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
777 {
778 1679 state_ |= 1;
779
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
780 {
781 lock.unlock();
782 cond_.notify_one();
783 return true;
784 }
785 1679 return false;
786 }
787
788 bool
789 144236 epoll_scheduler::
790 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
791 {
792 144236 state_ |= 1;
793 144236 bool have_waiters = state_ > 1;
794 144236 lock.unlock();
795
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 144236 times.
144236 if (have_waiters)
796 cond_.notify_one();
797 144236 return have_waiters;
798 }
799
800 void
801 epoll_scheduler::
802 clear_signal() const
803 {
804 state_ &= ~std::size_t(1);
805 }
806
807 void
808 epoll_scheduler::
809 wait_for_signal(std::unique_lock<std::mutex>& lock) const
810 {
811 while ((state_ & 1) == 0)
812 {
813 state_ += 2;
814 cond_.wait(lock);
815 state_ -= 2;
816 }
817 }
818
819 void
820 epoll_scheduler::
821 wait_for_signal_for(
822 std::unique_lock<std::mutex>& lock,
823 long timeout_us) const
824 {
825 if ((state_ & 1) == 0)
826 {
827 state_ += 2;
828 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
829 state_ -= 2;
830 }
831 }
832
833 void
834 1679 epoll_scheduler::
835 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
836 {
837
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
838 return;
839
840
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
841 {
842 26 task_interrupted_ = true;
843 26 lock.unlock();
844 26 interrupt_reactor();
845 }
846 else
847 {
848 1653 lock.unlock();
849 }
850 }
851
852 /** RAII guard for handler execution work accounting.
853
854 Handler consumes 1 work item, may produce N new items via fast-path posts.
855 Net change = N - 1:
856 - If N > 1: add (N-1) to global (more work produced than consumed)
857 - If N == 1: net zero, do nothing
858 - If N < 1: call work_finished() (work consumed, may trigger stop)
859
860 Also drains private queue to global for other threads to process.
861 */
862 struct work_cleanup
863 {
864 epoll_scheduler const* scheduler;
865 std::unique_lock<std::mutex>* lock;
866 scheduler_context* ctx;
867
868 114820 ~work_cleanup()
869 {
870
1/2
✓ Branch 0 taken 114820 times.
✗ Branch 1 not taken.
114820 if (ctx)
871 {
872 114820 long produced = ctx->private_outstanding_work;
873
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 114813 times.
114820 if (produced > 1)
874 7 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
875
2/2
✓ Branch 0 taken 16000 times.
✓ Branch 1 taken 98813 times.
114813 else if (produced < 1)
876 16000 scheduler->work_finished();
877 // produced == 1: net zero, handler consumed what it produced
878 114820 ctx->private_outstanding_work = 0;
879
880
2/2
✓ Branch 1 taken 60975 times.
✓ Branch 2 taken 53845 times.
114820 if (!ctx->private_queue.empty())
881 {
882 60975 lock->lock();
883 60975 scheduler->completed_ops_.splice(ctx->private_queue);
884 }
885 }
886 else
887 {
888 // No thread context - slow-path op was already counted globally
889 scheduler->work_finished();
890 }
891 114820 }
892 };
893
894 /** RAII guard for reactor work accounting.
895
896 Reactor only produces work via timer/signal callbacks posting handlers.
897 Unlike handler execution which consumes 1, the reactor consumes nothing.
898 All produced work must be flushed to global counter.
899 */
900 struct task_cleanup
901 {
902 epoll_scheduler const* scheduler;
903 std::unique_lock<std::mutex>* lock;
904 scheduler_context* ctx;
905
906 39049 ~task_cleanup()
907 39049 {
908
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39049 times.
39049 if (!ctx)
909 return;
910
911
2/2
✓ Branch 0 taken 4907 times.
✓ Branch 1 taken 34142 times.
39049 if (ctx->private_outstanding_work > 0)
912 {
913 4907 scheduler->outstanding_work_.fetch_add(
914 4907 ctx->private_outstanding_work, std::memory_order_relaxed);
915 4907 ctx->private_outstanding_work = 0;
916 }
917
918
2/2
✓ Branch 1 taken 4907 times.
✓ Branch 2 taken 34142 times.
39049 if (!ctx->private_queue.empty())
919 {
920
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4907 times.
4907 if (!lock->owns_lock())
921 lock->lock();
922 4907 scheduler->completed_ops_.splice(ctx->private_queue);
923 }
924 39049 }
925 };
926
927 void
928 9809 epoll_scheduler::
929 update_timerfd() const
930 {
931 9809 auto nearest = timer_svc_->nearest_expiry();
932
933 9809 itimerspec ts{};
934 9809 int flags = 0;
935
936
3/3
✓ Branch 2 taken 9809 times.
✓ Branch 4 taken 9764 times.
✓ Branch 5 taken 45 times.
9809 if (nearest == timer_service::time_point::max())
937 {
938 // No timers - disarm by setting to 0 (relative)
939 }
940 else
941 {
942 9764 auto now = std::chrono::steady_clock::now();
943
3/3
✓ Branch 1 taken 9764 times.
✓ Branch 4 taken 115 times.
✓ Branch 5 taken 9649 times.
9764 if (nearest <= now)
944 {
945 // Use 1ns instead of 0 - zero disarms the timerfd
946 115 ts.it_value.tv_nsec = 1;
947 }
948 else
949 {
950 9649 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
951
1/1
✓ Branch 1 taken 9649 times.
19298 nearest - now).count();
952 9649 ts.it_value.tv_sec = nsec / 1000000000;
953 9649 ts.it_value.tv_nsec = nsec % 1000000000;
954 // Ensure non-zero to avoid disarming if duration rounds to 0
955
3/4
✓ Branch 0 taken 9638 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9638 times.
9649 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
956 ts.it_value.tv_nsec = 1;
957 }
958 }
959
960
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9809 times.
9809 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
961 detail::throw_system_error(make_err(errno), "timerfd_settime");
962 9809 }
963
964 void
965 39049 epoll_scheduler::
966 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
967 {
968
2/2
✓ Branch 0 taken 29416 times.
✓ Branch 1 taken 9633 times.
39049 int timeout_ms = task_interrupted_ ? 0 : -1;
969
970
2/2
✓ Branch 1 taken 9633 times.
✓ Branch 2 taken 29416 times.
39049 if (lock.owns_lock())
971
1/1
✓ Branch 1 taken 9633 times.
9633 lock.unlock();
972
973 39049 task_cleanup on_exit{this, &lock, ctx};
974
975 // Flush deferred timerfd programming before blocking
976
2/2
✓ Branch 1 taken 4902 times.
✓ Branch 2 taken 34147 times.
39049 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
977
1/1
✓ Branch 1 taken 4902 times.
4902 update_timerfd();
978
979 // Event loop runs without mutex held
980 epoll_event events[128];
981
1/1
✓ Branch 1 taken 39049 times.
39049 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
982
983
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 39049 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
39049 if (nfds < 0 && errno != EINTR)
984 detail::throw_system_error(make_err(errno), "epoll_wait");
985
986 39049 bool check_timers = false;
987 39049 op_queue local_ops;
988
989 // Process events without holding the mutex
990
2/2
✓ Branch 0 taken 52187 times.
✓ Branch 1 taken 39049 times.
91236 for (int i = 0; i < nfds; ++i)
991 {
992
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 52154 times.
52187 if (events[i].data.ptr == nullptr)
993 {
994 std::uint64_t val;
995
1/1
✓ Branch 1 taken 33 times.
33 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
996 33 eventfd_armed_.store(false, std::memory_order_relaxed);
997 33 continue;
998 33 }
999
1000
2/2
✓ Branch 0 taken 4907 times.
✓ Branch 1 taken 47247 times.
52154 if (events[i].data.ptr == &timer_fd_)
1001 {
1002 std::uint64_t expirations;
1003
1/1
✓ Branch 1 taken 4907 times.
4907 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
1004 4907 check_timers = true;
1005 4907 continue;
1006 4907 }
1007
1008 // Deferred I/O: just set ready events and enqueue descriptor
1009 // No per-descriptor mutex locking in reactor hot path!
1010 47247 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1011 47247 desc->add_ready_events(events[i].events);
1012
1013 // Only enqueue if not already enqueued
1014 47247 bool expected = false;
1015
1/2
✓ Branch 1 taken 47247 times.
✗ Branch 2 not taken.
47247 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
1016 std::memory_order_release, std::memory_order_relaxed))
1017 {
1018 47247 local_ops.push(desc);
1019 }
1020 }
1021
1022 // Process timers only when timerfd fires
1023
2/2
✓ Branch 0 taken 4907 times.
✓ Branch 1 taken 34142 times.
39049 if (check_timers)
1024 {
1025
1/1
✓ Branch 1 taken 4907 times.
4907 timer_svc_->process_expired();
1026
1/1
✓ Branch 1 taken 4907 times.
4907 update_timerfd();
1027 }
1028
1029
1/1
✓ Branch 1 taken 39049 times.
39049 lock.lock();
1030
1031
2/2
✓ Branch 1 taken 28960 times.
✓ Branch 2 taken 10089 times.
39049 if (!local_ops.empty())
1032 28960 completed_ops_.splice(local_ops);
1033 39049 }
1034
1035 std::size_t
1036 114978 epoll_scheduler::
1037 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1038 {
1039 for (;;)
1040 {
1041
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 154026 times.
154027 if (stopped_)
1042 1 return 0;
1043
1044 154026 scheduler_op* op = completed_ops_.pop();
1045
1046 // Handle reactor sentinel - time to poll for I/O
1047
2/2
✓ Branch 0 taken 39204 times.
✓ Branch 1 taken 114822 times.
154026 if (op == &task_op_)
1048 {
1049 39204 bool more_handlers = !completed_ops_.empty();
1050
1051 // Nothing to run the reactor for: no pending work to wait on,
1052 // or caller requested a non-blocking poll
1053
4/4
✓ Branch 0 taken 9788 times.
✓ Branch 1 taken 29416 times.
✓ Branch 2 taken 155 times.
✓ Branch 3 taken 39049 times.
48992 if (!more_handlers &&
1054
3/4
✓ Branch 1 taken 9633 times.
✓ Branch 2 taken 155 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9633 times.
19576 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1055 timeout_us == 0))
1056 {
1057 155 completed_ops_.push(&task_op_);
1058 155 return 0;
1059 }
1060
1061
3/4
✓ Branch 0 taken 9633 times.
✓ Branch 1 taken 29416 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9633 times.
39049 task_interrupted_ = more_handlers || timeout_us == 0;
1062 39049 task_running_.store(true, std::memory_order_release);
1063
1064
2/2
✓ Branch 0 taken 29416 times.
✓ Branch 1 taken 9633 times.
39049 if (more_handlers)
1065 29416 unlock_and_signal_one(lock);
1066
1067 39049 run_task(lock, ctx);
1068
1069 39049 task_running_.store(false, std::memory_order_relaxed);
1070 39049 completed_ops_.push(&task_op_);
1071 39049 continue;
1072 39049 }
1073
1074 // Handle operation
1075
2/2
✓ Branch 0 taken 114820 times.
✓ Branch 1 taken 2 times.
114822 if (op != nullptr)
1076 {
1077 114820 bool more = !completed_ops_.empty();
1078
1079
1/2
✓ Branch 0 taken 114820 times.
✗ Branch 1 not taken.
114820 if (more)
1080
1/1
✓ Branch 1 taken 114820 times.
114820 ctx->unassisted = !unlock_and_signal_one(lock);
1081 else
1082 {
1083 ctx->unassisted = false;
1084 lock.unlock();
1085 }
1086
1087 114820 work_cleanup on_exit{this, &lock, ctx};
1088
1089
1/1
✓ Branch 1 taken 114820 times.
114820 (*op)();
1090 114820 return 1;
1091 114820 }
1092
1093 // No pending work to wait on, or caller requested non-blocking poll
1094
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1095 timeout_us == 0)
1096 2 return 0;
1097
1098 clear_signal();
1099 if (timeout_us < 0)
1100 wait_for_signal(lock);
1101 else
1102 wait_for_signal_for(lock, timeout_us);
1103 39049 }
1104 }
1105
1106 } // namespace boost::corosio::detail
1107
1108 #endif
1109