src/corosio/src/detail/timer_service.cpp

85.4% Lines (298/349) 92.5% Functions (37/40) 69.8% Branches (120/172)
src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11 #include "src/detail/scheduler_impl.hpp"
12
13 #include <boost/corosio/basic_io_context.hpp>
14 #include <boost/corosio/detail/thread_local_ptr.hpp>
15 #include "src/detail/scheduler_op.hpp"
16 #include "src/detail/intrusive.hpp"
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <system_error>
20
21 #include <atomic>
22 #include <coroutine>
23 #include <limits>
24 #include <mutex>
25 #include <optional>
26 #include <stop_token>
27 #include <vector>
28
29 /*
30 Timer Service
31 =============
32
33 The public timer class holds an opaque implementation* and forwards
34 all operations through extern free functions defined at the bottom
35 of this file.
36
37 Data Structures
38 ---------------
39 waiter_node holds per-waiter state: coroutine handle, executor,
40 error output, stop_token, embedded completion_op. Each concurrent
41 co_await t.wait() allocates one waiter_node.
42
43 implementation holds per-timer state: expiry, heap index, and an
44 intrusive_list of waiter_nodes. Multiple coroutines can wait on
45 the same timer simultaneously.
46
47 timer_service_impl owns a min-heap of active timers, a free list
48 of recycled impls, and a free list of recycled waiter_nodes. The
49 heap is ordered by expiry time; the scheduler queries
50 nearest_expiry() to set the epoll/timerfd timeout.
51
52 Optimization Strategy
53 ---------------------
54 The common timer lifecycle is: construct, set expiry, cancel or
55 wait, destroy. Several optimizations target this path:
56
57 1. Deferred heap insertion — expires_after() stores the expiry
58 but does not insert into the heap. Insertion happens in
59 wait(). If the timer is cancelled or destroyed before wait(),
60 the heap is never touched and no mutex is taken. This also
61 enables the already-expired fast path: when wait() sees
62 expiry <= now before inserting, it posts the coroutine
63 handle to the executor and returns noop_coroutine — no
64 heap, no mutex, no epoll. This is only possible because
65 the coroutine API guarantees wait() always follows
66 expires_after(); callback APIs cannot assume this call
67 order.
68
69 2. Thread-local impl cache — A single-slot per-thread cache of
70 implementation avoids the mutex on create/destroy for the common
71 create-then-destroy-on-same-thread pattern. On pop, if the
72 cached impl's svc_ doesn't match the current service, the
73 stale impl is deleted eagerly rather than reused.
74
75 3. Embedded completion_op — Each waiter_node embeds a
76 scheduler_op subclass, eliminating heap allocation per
77 fire/cancel. Its destroy() is a no-op since the waiter_node
78 owns the lifetime.
79
80 4. Cached nearest expiry — An atomic<int64_t> mirrors the heap
81 root's time, updated under the lock. nearest_expiry() and
82 empty() read the atomic without locking.
83
84 5. might_have_pending_waits_ flag — Set on wait(), cleared on
85 cancel. Lets cancel_timer() return without locking when no
86 wait was ever issued.
87
88 6. Thread-local waiter cache — Single-slot per-thread cache of
89 waiter_node avoids the free-list mutex for the common
90 wait-then-complete-on-same-thread pattern.
91
92 With all fast paths hit (idle timer, same thread), the
93 schedule/cancel cycle takes zero mutex locks.
94
95 Concurrency
96 -----------
97 stop_token callbacks can fire from any thread. The impl_
98 pointer on waiter_node is used as a "still in list" marker:
99 set to nullptr under the mutex when a waiter is removed by
100 cancel_timer() or process_expired(). cancel_waiter() checks
101 this under the mutex to avoid double-removal races.
102
103 Multiple io_contexts in the same program are safe. The
104 service pointer is obtained directly from the scheduler,
105 and TL-cached impls are validated by comparing svc_ against
106 the current service pointer. Waiter nodes have no service
107 affinity and can safely migrate between contexts.
108 */
109
110 namespace boost::corosio::detail {
111
112 class timer_service_impl;
113 struct implementation;
114 struct waiter_node;
115
116 void timer_service_invalidate_cache() noexcept;
117
118 struct waiter_node
119 : intrusive_list<waiter_node>::node
120 {
121 // Embedded completion op — avoids heap allocation per fire/cancel
122 struct completion_op final : scheduler_op
123 {
124 waiter_node* waiter_ = nullptr;
125
126 static void do_complete(
127 void* owner,
128 scheduler_op* base,
129 std::uint32_t,
130 std::uint32_t);
131
132 142 completion_op() noexcept
133 142 : scheduler_op(&do_complete)
134 {
135 142 }
136
137 void operator()() override;
138 // No-op — lifetime owned by waiter_node, not the scheduler queue
139 void destroy() override {}
140 };
141
142 // Per-waiter stop_token cancellation
143 struct canceller
144 {
145 waiter_node* waiter_;
146 void operator()() const;
147 };
148
149 // nullptr once removed from timer's waiter list (concurrency marker)
150 implementation* impl_ = nullptr;
151 timer_service_impl* svc_ = nullptr;
152 std::coroutine_handle<> h_;
153 capy::executor_ref d_;
154 std::error_code* ec_out_ = nullptr;
155 std::stop_token token_;
156 std::optional<std::stop_callback<canceller>> stop_cb_;
157 completion_op op_;
158 std::error_code ec_value_;
159 waiter_node* next_free_ = nullptr;
160
161 142 waiter_node() noexcept
162 142 {
163 142 op_.waiter_ = this;
164 142 }
165 };
166
167 struct implementation
168 : timer::implementation
169 {
170 using clock_type = std::chrono::steady_clock;
171 using time_point = clock_type::time_point;
172 using duration = clock_type::duration;
173
174 timer_service_impl* svc_ = nullptr;
175 intrusive_list<waiter_node> waiters_;
176
177 // Free list linkage (reused when impl is on free_list)
178 implementation* next_free_ = nullptr;
179
180 explicit implementation(timer_service_impl& svc) noexcept;
181
182 std::coroutine_handle<> wait(
183 std::coroutine_handle<>,
184 capy::executor_ref,
185 std::stop_token,
186 std::error_code*) override;
187 };
188
189 implementation* try_pop_tl_cache(timer_service_impl*) noexcept;
190 bool try_push_tl_cache(implementation*) noexcept;
191 waiter_node* try_pop_waiter_tl_cache() noexcept;
192 bool try_push_waiter_tl_cache(waiter_node*) noexcept;
193
194 class timer_service_impl : public timer_service
195 {
196 public:
197 using clock_type = std::chrono::steady_clock;
198 using time_point = clock_type::time_point;
199 using key_type = timer_service;
200
201 private:
202 struct heap_entry
203 {
204 time_point time_;
205 implementation* timer_;
206 };
207
208 scheduler* sched_ = nullptr;
209 mutable std::mutex mutex_;
210 std::vector<heap_entry> heap_;
211 implementation* free_list_ = nullptr;
212 waiter_node* waiter_free_list_ = nullptr;
213 callback on_earliest_changed_;
214 // Avoids mutex in nearest_expiry() and empty()
215 mutable std::atomic<std::int64_t> cached_nearest_ns_{
216 (std::numeric_limits<std::int64_t>::max)()};
217
218 public:
219 336 timer_service_impl(capy::execution_context&, scheduler& sched)
220 336 : timer_service()
221 336 , sched_(&sched)
222 {
223 336 }
224
225 17188 scheduler& get_scheduler() noexcept { return *sched_; }
226
227 672 ~timer_service_impl() = default;
228
229 timer_service_impl(timer_service_impl const&) = delete;
230 timer_service_impl& operator=(timer_service_impl const&) = delete;
231
232 336 void set_on_earliest_changed(callback cb) override
233 {
234 336 on_earliest_changed_ = cb;
235 336 }
236
237 336 void shutdown() override
238 {
239 336 timer_service_invalidate_cache();
240
241 // Cancel waiting timers still in the heap
242
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 336 times.
336 for (auto& entry : heap_)
243 {
244 auto* impl = entry.timer_;
245 while (auto* w = impl->waiters_.pop_front())
246 {
247 w->stop_cb_.reset();
248 w->h_.destroy();
249 sched_->on_work_finished();
250 delete w;
251 }
252 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
253 delete impl;
254 }
255 336 heap_.clear();
256 336 cached_nearest_ns_.store(
257 (std::numeric_limits<std::int64_t>::max)(),
258 std::memory_order_release);
259
260 // Delete free-listed impls
261
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 336 times.
384 while (free_list_)
262 {
263 48 auto* next = free_list_->next_free_;
264
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 delete free_list_;
265 48 free_list_ = next;
266 }
267
268 // Delete free-listed waiters
269
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 336 times.
394 while (waiter_free_list_)
270 {
271 58 auto* next = waiter_free_list_->next_free_;
272
1/2
✓ Branch 0 taken 58 times.
✗ Branch 1 not taken.
58 delete waiter_free_list_;
273 58 waiter_free_list_ = next;
274 }
275 336 }
276
277 8878 io_object::implementation* construct() override
278 {
279 8878 implementation* impl = try_pop_tl_cache(this);
280
2/2
✓ Branch 0 taken 8705 times.
✓ Branch 1 taken 173 times.
8878 if (impl)
281 {
282 8705 impl->svc_ = this;
283 8705 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
284 8705 impl->might_have_pending_waits_ = false;
285 8705 return impl;
286 }
287
288
1/1
✓ Branch 1 taken 173 times.
173 std::lock_guard lock(mutex_);
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 if (free_list_)
290 {
291 impl = free_list_;
292 free_list_ = impl->next_free_;
293 impl->next_free_ = nullptr;
294 impl->svc_ = this;
295 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
296 impl->might_have_pending_waits_ = false;
297 }
298 else
299 {
300
1/1
✓ Branch 1 taken 173 times.
173 impl = new implementation(*this);
301 }
302 173 return impl;
303 173 }
304
305 8878 void destroy(io_object::implementation* p) override
306 {
307 8878 destroy_impl(static_cast<implementation&>(*p));
308 8878 }
309
310 8878 void destroy_impl(implementation& impl)
311 {
312
1/1
✓ Branch 1 taken 8878 times.
8878 cancel_timer(impl);
313
314
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8878 times.
8878 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
315 {
316 std::lock_guard lock(mutex_);
317 remove_timer_impl(impl);
318 refresh_cached_nearest();
319 }
320
321
2/2
✓ Branch 1 taken 8830 times.
✓ Branch 2 taken 48 times.
8878 if (try_push_tl_cache(&impl))
322 8830 return;
323
324
1/1
✓ Branch 1 taken 48 times.
48 std::lock_guard lock(mutex_);
325 48 impl.next_free_ = free_list_;
326 48 free_list_ = &impl;
327 48 }
328
329 8594 waiter_node* create_waiter()
330 {
331
2/2
✓ Branch 1 taken 8452 times.
✓ Branch 2 taken 142 times.
8594 if (auto* w = try_pop_waiter_tl_cache())
332 8452 return w;
333
334
1/1
✓ Branch 1 taken 142 times.
142 std::lock_guard lock(mutex_);
335
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
142 if (waiter_free_list_)
336 {
337 auto* w = waiter_free_list_;
338 waiter_free_list_ = w->next_free_;
339 w->next_free_ = nullptr;
340 return w;
341 }
342
343
1/1
✓ Branch 1 taken 142 times.
142 return new waiter_node();
344 142 }
345
346 8594 void destroy_waiter(waiter_node* w)
347 {
348
2/2
✓ Branch 1 taken 8536 times.
✓ Branch 2 taken 58 times.
8594 if (try_push_waiter_tl_cache(w))
349 8536 return;
350
351
1/1
✓ Branch 1 taken 58 times.
58 std::lock_guard lock(mutex_);
352 58 w->next_free_ = waiter_free_list_;
353 58 waiter_free_list_ = w;
354 58 }
355
356 // Heap insertion deferred to wait() — avoids lock when timer is idle
357 6 std::size_t update_timer(implementation& impl, time_point new_time)
358 {
359 bool in_heap =
360 6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
361
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 6 times.
6 if (!in_heap && impl.waiters_.empty())
362 return 0;
363
364 6 bool notify = false;
365 6 intrusive_list<waiter_node> canceled;
366
367 {
368
1/1
✓ Branch 1 taken 6 times.
6 std::lock_guard lock(mutex_);
369
370
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
371 {
372 10 w->impl_ = nullptr;
373 10 canceled.push_back(w);
374 10 }
375
376
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if (impl.heap_index_ < heap_.size())
377 {
378 6 time_point old_time = heap_[impl.heap_index_].time_;
379 6 heap_[impl.heap_index_].time_ = new_time;
380
381
2/3
✓ Branch 1 taken 6 times.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 if (new_time < old_time)
382
1/1
✓ Branch 1 taken 6 times.
6 up_heap(impl.heap_index_);
383 else
384 down_heap(impl.heap_index_);
385
386 6 notify = (impl.heap_index_ == 0);
387 }
388
389 6 refresh_cached_nearest();
390 6 }
391
392 6 std::size_t count = 0;
393
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = canceled.pop_front())
394 {
395 10 w->ec_value_ = make_error_code(capy::error::canceled);
396
1/1
✓ Branch 1 taken 10 times.
10 sched_->post(&w->op_);
397 10 ++count;
398 10 }
399
400
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (notify)
401
1/1
✓ Branch 1 taken 6 times.
6 on_earliest_changed_();
402
403 6 return count;
404 }
405
406 // Inserts timer into heap if needed and pushes waiter, all under
407 // one lock to prevent races with cancel_waiter/process_expired
408 8594 void insert_waiter(implementation& impl, waiter_node* w)
409 {
410 8594 bool notify = false;
411 {
412
1/1
✓ Branch 1 taken 8594 times.
8594 std::lock_guard lock(mutex_);
413
2/2
✓ Branch 1 taken 8572 times.
✓ Branch 2 taken 22 times.
8594 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
414 {
415 8572 impl.heap_index_ = heap_.size();
416
1/1
✓ Branch 1 taken 8572 times.
8572 heap_.push_back({impl.expiry_, &impl});
417
1/1
✓ Branch 2 taken 8572 times.
8572 up_heap(heap_.size() - 1);
418 8572 notify = (impl.heap_index_ == 0);
419 8572 refresh_cached_nearest();
420 }
421 8594 impl.waiters_.push_back(w);
422 8594 }
423
2/2
✓ Branch 0 taken 8560 times.
✓ Branch 1 taken 34 times.
8594 if (notify)
424 8560 on_earliest_changed_();
425 8594 }
426
427 8886 std::size_t cancel_timer(implementation& impl)
428 {
429
2/2
✓ Branch 0 taken 8870 times.
✓ Branch 1 taken 16 times.
8886 if (!impl.might_have_pending_waits_)
430 8870 return 0;
431
432 // Not in heap and no waiters — just clear the flag
433 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()
434
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 16 times.
16 && impl.waiters_.empty())
435 {
436 impl.might_have_pending_waits_ = false;
437 return 0;
438 }
439
440 16 intrusive_list<waiter_node> canceled;
441
442 {
443
1/1
✓ Branch 1 taken 16 times.
16 std::lock_guard lock(mutex_);
444
1/1
✓ Branch 1 taken 16 times.
16 remove_timer_impl(impl);
445
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = impl.waiters_.pop_front())
446 {
447 20 w->impl_ = nullptr;
448 20 canceled.push_back(w);
449 20 }
450 16 refresh_cached_nearest();
451 16 }
452
453 16 impl.might_have_pending_waits_ = false;
454
455 16 std::size_t count = 0;
456
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = canceled.pop_front())
457 {
458 20 w->ec_value_ = make_error_code(capy::error::canceled);
459
1/1
✓ Branch 1 taken 20 times.
20 sched_->post(&w->op_);
460 20 ++count;
461 20 }
462
463 16 return count;
464 }
465
466 // Cancel a single waiter (called from stop_token callback, any thread)
467 4 void cancel_waiter(waiter_node* w)
468 {
469 {
470
1/1
✓ Branch 1 taken 4 times.
4 std::lock_guard lock(mutex_);
471 // Already removed by cancel_timer or process_expired
472
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!w->impl_)
473 return;
474 4 auto* impl = w->impl_;
475 4 w->impl_ = nullptr;
476 4 impl->waiters_.remove(w);
477
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if (impl->waiters_.empty())
478 {
479
1/1
✓ Branch 1 taken 2 times.
2 remove_timer_impl(*impl);
480 2 impl->might_have_pending_waits_ = false;
481 }
482 4 refresh_cached_nearest();
483 4 }
484
485 4 w->ec_value_ = make_error_code(capy::error::canceled);
486 4 sched_->post(&w->op_);
487 }
488
489 // Cancel front waiter only (FIFO), return 0 or 1
490 2 std::size_t cancel_one_waiter(implementation& impl)
491 {
492
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!impl.might_have_pending_waits_)
493 return 0;
494
495 2 waiter_node* w = nullptr;
496
497 {
498
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(mutex_);
499 2 w = impl.waiters_.pop_front();
500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!w)
501 return 0;
502 2 w->impl_ = nullptr;
503
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (impl.waiters_.empty())
504 {
505 remove_timer_impl(impl);
506 impl.might_have_pending_waits_ = false;
507 }
508 2 refresh_cached_nearest();
509 2 }
510
511 2 w->ec_value_ = make_error_code(capy::error::canceled);
512 2 sched_->post(&w->op_);
513 2 return 1;
514 }
515
516 bool empty() const noexcept override
517 {
518 return cached_nearest_ns_.load(std::memory_order_acquire)
519 == (std::numeric_limits<std::int64_t>::max)();
520 }
521
522 20370 time_point nearest_expiry() const noexcept override
523 {
524 20370 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
525 20370 return time_point(time_point::duration(ns));
526 }
527
528 101622 std::size_t process_expired() override
529 {
530 101622 intrusive_list<waiter_node> expired;
531
532 {
533
1/1
✓ Branch 1 taken 101622 times.
101622 std::lock_guard lock(mutex_);
534 101622 auto now = clock_type::now();
535
536
7/7
✓ Branch 1 taken 109809 times.
✓ Branch 2 taken 367 times.
✓ Branch 5 taken 109809 times.
✓ Branch 8 taken 8554 times.
✓ Branch 9 taken 101255 times.
✓ Branch 10 taken 8554 times.
✓ Branch 11 taken 101622 times.
110176 while (!heap_.empty() && heap_[0].time_ <= now)
537 {
538 8554 implementation* t = heap_[0].timer_;
539
1/1
✓ Branch 1 taken 8554 times.
8554 remove_timer_impl(*t);
540
2/2
✓ Branch 1 taken 8558 times.
✓ Branch 2 taken 8554 times.
17112 while (auto* w = t->waiters_.pop_front())
541 {
542 8558 w->impl_ = nullptr;
543 8558 w->ec_value_ = {};
544 8558 expired.push_back(w);
545 8558 }
546 8554 t->might_have_pending_waits_ = false;
547 }
548
549 101622 refresh_cached_nearest();
550 101622 }
551
552 101622 std::size_t count = 0;
553
2/2
✓ Branch 1 taken 8558 times.
✓ Branch 2 taken 101622 times.
110180 while (auto* w = expired.pop_front())
554 {
555
1/1
✓ Branch 1 taken 8558 times.
8558 sched_->post(&w->op_);
556 8558 ++count;
557 8558 }
558
559 101622 return count;
560 }
561
562 private:
563 110222 void refresh_cached_nearest() noexcept
564 {
565 110222 auto ns = heap_.empty()
566
2/2
✓ Branch 0 taken 383 times.
✓ Branch 1 taken 109839 times.
110222 ? (std::numeric_limits<std::int64_t>::max)()
567 109839 : heap_[0].time_.time_since_epoch().count();
568 110222 cached_nearest_ns_.store(ns, std::memory_order_release);
569 110222 }
570
571 8572 void remove_timer_impl(implementation& impl)
572 {
573 8572 std::size_t index = impl.heap_index_;
574
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8572 times.
8572 if (index >= heap_.size())
575 return; // Not in heap
576
577
2/2
✓ Branch 1 taken 103 times.
✓ Branch 2 taken 8469 times.
8572 if (index == heap_.size() - 1)
578 {
579 // Last element, just pop
580 103 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
581 103 heap_.pop_back();
582 }
583 else
584 {
585 // Swap with last and reheapify
586 8469 swap_heap(index, heap_.size() - 1);
587 8469 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
588 8469 heap_.pop_back();
589
590
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 8469 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 8469 times.
8469 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
591 up_heap(index);
592 else
593 8469 down_heap(index);
594 }
595 }
596
597 8578 void up_heap(std::size_t index)
598 {
599
2/2
✓ Branch 0 taken 8471 times.
✓ Branch 1 taken 8566 times.
17037 while (index > 0)
600 {
601 8471 std::size_t parent = (index - 1) / 2;
602
2/2
✓ Branch 4 taken 12 times.
✓ Branch 5 taken 8459 times.
8471 if (!(heap_[index].time_ < heap_[parent].time_))
603 12 break;
604 8459 swap_heap(index, parent);
605 8459 index = parent;
606 }
607 8578 }
608
609 8469 void down_heap(std::size_t index)
610 {
611 8469 std::size_t child = index * 2 + 1;
612
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8465 times.
8469 while (child < heap_.size())
613 {
614 4 std::size_t min_child = (child + 1 == heap_.size() ||
615 heap_[child].time_ < heap_[child + 1].time_)
616
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 ? child : child + 1;
617
618
1/2
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
619 4 break;
620
621 swap_heap(index, min_child);
622 index = min_child;
623 child = index * 2 + 1;
624 }
625 8469 }
626
627 16928 void swap_heap(std::size_t i1, std::size_t i2)
628 {
629 16928 heap_entry tmp = heap_[i1];
630 16928 heap_[i1] = heap_[i2];
631 16928 heap_[i2] = tmp;
632 16928 heap_[i1].timer_->heap_index_ = i1;
633 16928 heap_[i2].timer_->heap_index_ = i2;
634 16928 }
635 };
636
637 173 implementation::
638 173 implementation(timer_service_impl& svc) noexcept
639 173 : svc_(&svc)
640 {
641 173 }
642
643 void
644 4 waiter_node::canceller::
645 operator()() const
646 {
647 4 waiter_->svc_->cancel_waiter(waiter_);
648 4 }
649
650 void
651 waiter_node::completion_op::
652 do_complete(
653 void* owner,
654 scheduler_op* base,
655 std::uint32_t,
656 std::uint32_t)
657 {
658 if (!owner)
659 return;
660 static_cast<completion_op*>(base)->operator()();
661 }
662
663 void
664 8594 waiter_node::completion_op::
665 operator()()
666 {
667 8594 auto* w = waiter_;
668 8594 w->stop_cb_.reset();
669
1/2
✓ Branch 0 taken 8594 times.
✗ Branch 1 not taken.
8594 if (w->ec_out_)
670 8594 *w->ec_out_ = w->ec_value_;
671
672 8594 auto h = w->h_;
673 8594 auto d = w->d_;
674 8594 auto* svc = w->svc_;
675 8594 auto& sched = svc->get_scheduler();
676
677
1/1
✓ Branch 1 taken 8594 times.
8594 svc->destroy_waiter(w);
678
679
1/1
✓ Branch 1 taken 8594 times.
8594 d.post(h);
680 8594 sched.on_work_finished();
681 8594 }
682
683 std::coroutine_handle<>
684 8594 implementation::
685 wait(
686 std::coroutine_handle<> h,
687 capy::executor_ref d,
688 std::stop_token token,
689 std::error_code* ec)
690 {
691 // Already-expired fast path — no waiter_node, no mutex.
692 // Post instead of dispatch so the coroutine yields to the
693 // scheduler, allowing other queued work to run.
694
2/2
✓ Branch 1 taken 8572 times.
✓ Branch 2 taken 22 times.
8594 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
695 {
696
3/5
✓ Branch 2 taken 8572 times.
✓ Branch 4 taken 8572 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8572 times.
17144 if (expiry_ == (time_point::min)() ||
697
2/3
✓ Branch 2 taken 8572 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8572 times.
17144 expiry_ <= clock_type::now())
698 {
699 if (ec)
700 *ec = {};
701 d.post(h);
702 return std::noop_coroutine();
703 }
704 }
705
706 8594 auto* w = svc_->create_waiter();
707 8594 w->impl_ = this;
708 8594 w->svc_ = svc_;
709 8594 w->h_ = h;
710 8594 w->d_ = std::move(d);
711 8594 w->token_ = std::move(token);
712 8594 w->ec_out_ = ec;
713
714 8594 svc_->insert_waiter(*this, w);
715 8594 might_have_pending_waits_ = true;
716 8594 svc_->get_scheduler().on_work_started();
717
718
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8590 times.
8594 if (w->token_.stop_possible())
719 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
720
721 8594 return std::noop_coroutine();
722 }
723
724 // Extern free functions called from timer.cpp
725 //
726 // Two thread-local caches avoid hot-path mutex acquisitions:
727 //
728 // 1. Impl cache — single-slot, validated by comparing svc_ on the
729 // impl against the current service pointer.
730 //
731 // 2. Waiter cache — single-slot, no service affinity.
732 //
733 // The service pointer is obtained from the scheduler_impl's
734 // timer_svc_ member, avoiding find_service() on the hot path.
735 // All caches are cleared by timer_service_invalidate_cache()
736 // during shutdown.
737
738 thread_local_ptr<implementation> tl_cached_impl;
739 thread_local_ptr<waiter_node> tl_cached_waiter;
740
741 implementation*
742 8878 try_pop_tl_cache(timer_service_impl* svc) noexcept
743 {
744 8878 auto* impl = tl_cached_impl.get();
745
2/2
✓ Branch 0 taken 8705 times.
✓ Branch 1 taken 173 times.
8878 if (impl)
746 {
747 8705 tl_cached_impl.set(nullptr);
748
1/2
✓ Branch 0 taken 8705 times.
✗ Branch 1 not taken.
8705 if (impl->svc_ == svc)
749 8705 return impl;
750 // Stale impl from a destroyed service
751 delete impl;
752 }
753 173 return nullptr;
754 }
755
756 bool
757 8878 try_push_tl_cache(implementation* impl) noexcept
758 {
759
2/2
✓ Branch 1 taken 8830 times.
✓ Branch 2 taken 48 times.
8878 if (!tl_cached_impl.get())
760 {
761 8830 tl_cached_impl.set(impl);
762 8830 return true;
763 }
764 48 return false;
765 }
766
767 waiter_node*
768 8594 try_pop_waiter_tl_cache() noexcept
769 {
770 8594 auto* w = tl_cached_waiter.get();
771
2/2
✓ Branch 0 taken 8452 times.
✓ Branch 1 taken 142 times.
8594 if (w)
772 {
773 8452 tl_cached_waiter.set(nullptr);
774 8452 return w;
775 }
776 142 return nullptr;
777 }
778
779 bool
780 8594 try_push_waiter_tl_cache(waiter_node* w) noexcept
781 {
782
2/2
✓ Branch 1 taken 8536 times.
✓ Branch 2 taken 58 times.
8594 if (!tl_cached_waiter.get())
783 {
784 8536 tl_cached_waiter.set(w);
785 8536 return true;
786 }
787 58 return false;
788 }
789
790 void
791 336 timer_service_invalidate_cache() noexcept
792 {
793
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 211 times.
336 delete tl_cached_impl.get();
794 336 tl_cached_impl.set(nullptr);
795
796
2/2
✓ Branch 1 taken 84 times.
✓ Branch 2 taken 252 times.
336 delete tl_cached_waiter.get();
797 336 tl_cached_waiter.set(nullptr);
798 336 }
799
800 struct timer_service_access
801 {
802 static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept
803 {
804 return static_cast<scheduler_impl&>(*ctx.sched_);
805 }
806 };
807
808 std::size_t
809 6 timer_service_update_expiry(timer::implementation& base)
810 {
811 6 auto& impl = static_cast<implementation&>(base);
812 6 return impl.svc_->update_timer(impl, impl.expiry_);
813 }
814
815 std::size_t
816 8 timer_service_cancel(timer::implementation& base) noexcept
817 {
818 8 auto& impl = static_cast<implementation&>(base);
819 8 return impl.svc_->cancel_timer(impl);
820 }
821
822 std::size_t
823 2 timer_service_cancel_one(timer::implementation& base) noexcept
824 {
825 2 auto& impl = static_cast<implementation&>(base);
826 2 return impl.svc_->cancel_one_waiter(impl);
827 }
828
829 timer_service&
830 336 get_timer_service(capy::execution_context& ctx, scheduler& sched)
831 {
832 336 return ctx.make_service<timer_service_impl>(sched);
833 }
834
835 } // namespace boost::corosio::detail
836