src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include "src/detail/scheduler_impl.hpp"
21 #include "src/detail/scheduler_op.hpp"
22
23 #include <atomic>
24 #include <condition_variable>
25 #include <cstddef>
26 #include <cstdint>
27 #include <mutex>
28
29 namespace boost::corosio::detail {
30
31 struct epoll_op;
32 struct descriptor_state;
33 struct scheduler_context;
34
35 /** Linux scheduler using epoll for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using Linux epoll
38 for efficient I/O event notification. It uses a single reactor model
39 where one thread runs epoll_wait while other threads
40 wait on a condition variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - IOCP parity: Behavior matches Windows I/O completion port semantics
45
46 When threads call run(), they first try to execute queued handlers.
47 If the queue is empty and no reactor is running, one thread becomes
48 the reactor and runs epoll_wait. Other threads wait on a condition
49 variable until handlers are available.
50
51 @par Thread Safety
52 All public member functions are thread-safe.
53 */
54 class epoll_scheduler
55 : public scheduler_impl
56 , public capy::execution_context::service
57 {
58 public:
59 using key_type = scheduler;
60
61 /** Construct the scheduler.
62
63 Creates an epoll instance, eventfd for reactor interruption,
64 and timerfd for kernel-managed timer expiry.
65
66 @param ctx Reference to the owning execution_context.
67 @param concurrency_hint Hint for expected thread count (unused).
68 */
69 epoll_scheduler(
70 capy::execution_context& ctx,
71 int concurrency_hint = -1);
72
73 /// Destroy the scheduler.
74 ~epoll_scheduler();
75
76 epoll_scheduler(epoll_scheduler const&) = delete;
77 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
78
79 void shutdown() override;
80 void post(std::coroutine_handle<> h) const override;
81 void post(scheduler_op* h) const override;
82 void on_work_started() noexcept override;
83 void on_work_finished() noexcept override;
84 bool running_in_this_thread() const noexcept override;
85 void stop() override;
86 bool stopped() const noexcept override;
87 void restart() override;
88 std::size_t run() override;
89 std::size_t run_one() override;
90 std::size_t wait_one(long usec) override;
91 std::size_t poll() override;
92 std::size_t poll_one() override;
93
94 /** Return the epoll file descriptor.
95
96 Used by socket services to register file descriptors
97 for I/O event notification.
98
99 @return The epoll file descriptor.
100 */
101 int epoll_fd() const noexcept { return epoll_fd_; }
102
103 /** Reset the thread's inline completion budget.
104
105 Called at the start of each posted completion handler to
106 grant a fresh budget for speculative inline completions.
107 */
108 void reset_inline_budget() const noexcept;
109
110 /** Consume one unit of inline budget if available.
111
112 @return True if budget was available and consumed.
113 */
114 bool try_consume_inline_budget() const noexcept;
115
116 /** Register a descriptor for persistent monitoring.
117
118 The fd is registered once and stays registered until explicitly
119 deregistered. Events are dispatched via descriptor_state which
120 tracks pending read/write/connect operations.
121
122 @param fd The file descriptor to register.
123 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
124 */
125 void register_descriptor(int fd, descriptor_state* desc) const;
126
127 /** Deregister a persistently registered descriptor.
128
129 @param fd The file descriptor to deregister.
130 */
131 void deregister_descriptor(int fd) const;
132
133 /** For use by I/O operations to track pending work. */
134 void work_started() const noexcept override;
135
136 /** For use by I/O operations to track completed work. */
137 void work_finished() const noexcept override;
138
139 /** Offset a forthcoming work_finished from work_cleanup.
140
141 Called by descriptor_state when all I/O returned EAGAIN and no
142 handler will be executed. Must be called from a scheduler thread.
143 */
144 void compensating_work_started() const noexcept;
145
146 /** Drain work from thread context's private queue to global queue.
147
148 Called by thread_context_guard destructor when a thread exits run().
149 Transfers pending work to the global queue under mutex protection.
150
151 @param queue The private queue to drain.
152 @param count Item count for wakeup decisions (wakes other threads if positive).
153 */
154 void drain_thread_queue(op_queue& queue, long count) const;
155
156 /** Post completed operations for deferred invocation.
157
158 If called from a thread running this scheduler, operations go to
159 the thread's private queue (fast path). Otherwise, operations are
160 added to the global queue under mutex and a waiter is signaled.
161
162 @par Preconditions
163 work_started() must have been called for each operation.
164
165 @param ops Queue of operations to post.
166 */
167 void post_deferred_completions(op_queue& ops) const;
168
169 private:
170 friend struct work_cleanup;
171 friend struct task_cleanup;
172
173 std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
174 void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
175 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
176 void interrupt_reactor() const;
177 void update_timerfd() const;
178
179 /** Set the signaled state and wake all waiting threads.
180
181 @par Preconditions
182 Mutex must be held.
183
184 @param lock The held mutex lock.
185 */
186 void signal_all(std::unique_lock<std::mutex>& lock) const;
187
188 /** Set the signaled state and wake one waiter if any exist.
189
190 Only unlocks and signals if at least one thread is waiting.
191 Use this when the caller needs to perform a fallback action
192 (such as interrupting the reactor) when no waiters exist.
193
194 @par Preconditions
195 Mutex must be held.
196
197 @param lock The held mutex lock.
198
199 @return `true` if unlocked and signaled, `false` if lock still held.
200 */
201 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
202
203 /** Set the signaled state, unlock, and wake one waiter if any exist.
204
205 Always unlocks the mutex. Use this when the caller will release
206 the lock regardless of whether a waiter exists.
207
208 @par Preconditions
209 Mutex must be held.
210
211 @param lock The held mutex lock.
212
213 @return `true` if a waiter was signaled, `false` otherwise.
214 */
215 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
216
217 /** Clear the signaled state before waiting.
218
219 @par Preconditions
220 Mutex must be held.
221 */
222 void clear_signal() const;
223
224 /** Block until the signaled state is set.
225
226 Returns immediately if already signaled (fast-path). Otherwise
227 increments the waiter count, waits on the condition variable,
228 and decrements the waiter count upon waking.
229
230 @par Preconditions
231 Mutex must be held.
232
233 @param lock The held mutex lock.
234 */
235 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
236
237 /** Block until signaled or timeout expires.
238
239 @par Preconditions
240 Mutex must be held.
241
242 @param lock The held mutex lock.
243 @param timeout_us Maximum time to wait in microseconds.
244 */
245 void wait_for_signal_for(
246 std::unique_lock<std::mutex>& lock,
247 long timeout_us) const;
248
249 int epoll_fd_;
250 int event_fd_; // for interrupting reactor
251 int timer_fd_; // timerfd for kernel-managed timer expiry
252 mutable std::mutex mutex_;
253 mutable std::condition_variable cond_;
254 mutable op_queue completed_ops_;
255 mutable std::atomic<long> outstanding_work_;
256 bool stopped_;
257 bool shutdown_;
258
259 // True while a thread is blocked in epoll_wait. Used by
260 // wake_one_thread_and_unlock and work_finished to know when
261 // an eventfd interrupt is needed instead of a condvar signal.
262 mutable std::atomic<bool> task_running_{false};
263
264 // True when the reactor has been told to do a non-blocking poll
265 // (more handlers queued or poll mode). Prevents redundant eventfd
266 // writes and controls the epoll_wait timeout.
267 mutable bool task_interrupted_ = false;
268
269 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
270 mutable std::size_t state_ = 0;
271
272 // Edge-triggered eventfd state
273 mutable std::atomic<bool> eventfd_armed_{false};
274
275 // Set when the earliest timer changes; flushed before epoll_wait
276 // blocks. Avoids timerfd_settime syscalls for timers that are
277 // scheduled then cancelled without being waited on.
278 mutable std::atomic<bool> timerfd_stale_{false};
279
280 // Sentinel operation for interleaving reactor runs with handler execution.
281 // Ensures the reactor runs periodically even when handlers are continuously
282 // posted, preventing starvation of I/O events, timers, and signals.
283 struct task_op final : scheduler_op
284 {
285 void operator()() override {}
286 void destroy() override {}
287 };
288 task_op task_op_;
289 };
290
291 } // namespace boost::corosio::detail
292
293 #endif // BOOST_COROSIO_HAS_EPOLL
294
295 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
296