src/corosio/src/detail/select/sockets.hpp

100.0% Lines (12/12) 100.0% Functions (7/7) -% Branches (0/0)
src/corosio/src/detail/select/sockets.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/tcp_socket.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include "src/detail/intrusive.hpp"
22 #include "src/detail/socket_service.hpp"
23
24 #include "src/detail/select/op.hpp"
25 #include "src/detail/select/scheduler.hpp"
26
27 #include <memory>
28 #include <mutex>
29 #include <unordered_map>
30
31 /*
32 select Socket Implementation
33 ============================
34
35 This mirrors the epoll_sockets design for behavioral consistency.
36 Each I/O operation follows the same pattern:
37 1. Try the syscall immediately (non-blocking socket)
38 2. If it succeeds or fails with a real error, post to completion queue
39 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
40
41 Cancellation
42 ------------
43 See op.hpp for the completion/cancellation race handling via the
44 `registered` atomic. cancel() must complete pending operations (post
45 them with cancelled flag) so coroutines waiting on them can resume.
46 close_socket() calls cancel() first to ensure this.
47
48 Impl Lifetime with shared_ptr
49 -----------------------------
50 Socket impls use enable_shared_from_this. The service owns impls via
51 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
52 removal. When a user calls close(), we call cancel() which posts pending
53 ops to the scheduler.
54
55 CRITICAL: The posted ops must keep the impl alive until they complete.
56 Otherwise the scheduler would process a freed op (use-after-free). The
57 cancel() method captures shared_from_this() into op.impl_ptr before
58 posting. When the op completes, impl_ptr is cleared, allowing the impl
59 to be destroyed if no other references exist.
60
61 Service Ownership
62 -----------------
63 select_socket_service owns all socket impls. destroy() removes the
64 shared_ptr from the map, but the impl may survive if ops still hold
65 impl_ptr refs. shutdown() closes all sockets and clears the map; any
66 in-flight ops will complete and release their refs.
67 */
68
69 namespace boost::corosio::detail {
70
71 class select_socket_service;
72 class select_socket_impl;
73
74 /// Socket implementation for select backend.
75 class select_socket_impl
76 : public tcp_socket::implementation
77 , public std::enable_shared_from_this<select_socket_impl>
78 , public intrusive_list<select_socket_impl>::node
79 {
80 friend class select_socket_service;
81
82 public:
83 explicit select_socket_impl(select_socket_service& svc) noexcept;
84
85 std::coroutine_handle<> connect(
86 std::coroutine_handle<>,
87 capy::executor_ref,
88 endpoint,
89 std::stop_token,
90 std::error_code*) override;
91
92 std::coroutine_handle<> read_some(
93 std::coroutine_handle<>,
94 capy::executor_ref,
95 io_buffer_param,
96 std::stop_token,
97 std::error_code*,
98 std::size_t*) override;
99
100 std::coroutine_handle<> write_some(
101 std::coroutine_handle<>,
102 capy::executor_ref,
103 io_buffer_param,
104 std::stop_token,
105 std::error_code*,
106 std::size_t*) override;
107
108 std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
109
110 20919 native_handle_type native_handle() const noexcept override { return fd_; }
111
112 // Socket options
113 std::error_code set_no_delay(bool value) noexcept override;
114 bool no_delay(std::error_code& ec) const noexcept override;
115
116 std::error_code set_keep_alive(bool value) noexcept override;
117 bool keep_alive(std::error_code& ec) const noexcept override;
118
119 std::error_code set_receive_buffer_size(int size) noexcept override;
120 int receive_buffer_size(std::error_code& ec) const noexcept override;
121
122 std::error_code set_send_buffer_size(int size) noexcept override;
123 int send_buffer_size(std::error_code& ec) const noexcept override;
124
125 std::error_code set_linger(bool enabled, int timeout) noexcept override;
126 tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
127
128 16 endpoint local_endpoint() const noexcept override { return local_endpoint_; }
129 16 endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
130 bool is_open() const noexcept { return fd_ >= 0; }
131 void cancel() noexcept override;
132 void cancel_single_op(select_op& op) noexcept;
133 void close_socket() noexcept;
134 3434 void set_socket(int fd) noexcept { fd_ = fd; }
135 6868 void set_endpoints(endpoint local, endpoint remote) noexcept
136 {
137 6868 local_endpoint_ = local;
138 6868 remote_endpoint_ = remote;
139 6868 }
140
141 select_connect_op conn_;
142 select_read_op rd_;
143 select_write_op wr_;
144
145 private:
146 select_socket_service& svc_;
147 int fd_ = -1;
148 endpoint local_endpoint_;
149 endpoint remote_endpoint_;
150 };
151
152 /** State for select socket service. */
153 class select_socket_state
154 {
155 public:
156 133 explicit select_socket_state(select_scheduler& sched) noexcept
157 133 : sched_(sched)
158 {
159 133 }
160
161 select_scheduler& sched_;
162 std::mutex mutex_;
163 intrusive_list<select_socket_impl> socket_list_;
164 std::unordered_map<select_socket_impl*, std::shared_ptr<select_socket_impl>> socket_ptrs_;
165 };
166
167 /** select socket service implementation.
168
169 Inherits from socket_service to enable runtime polymorphism.
170 Uses key_type = socket_service for service lookup.
171 */
172 class select_socket_service : public socket_service
173 {
174 public:
175 explicit select_socket_service(capy::execution_context& ctx);
176 ~select_socket_service();
177
178 select_socket_service(select_socket_service const&) = delete;
179 select_socket_service& operator=(select_socket_service const&) = delete;
180
181 void shutdown() override;
182
183 io_object::implementation* construct() override;
184 void destroy(io_object::implementation*) override;
185 void close(io_object::handle&) override;
186 std::error_code open_socket(tcp_socket::implementation& impl) override;
187
188 10758 select_scheduler& scheduler() const noexcept { return state_->sched_; }
189 void post(select_op* op);
190 void work_started() noexcept;
191 void work_finished() noexcept;
192
193 private:
194 std::unique_ptr<select_socket_state> state_;
195 };
196
197 // Backward compatibility alias
198 using select_sockets = select_socket_service;
199
200 } // namespace boost::corosio::detail
201
202 #endif // BOOST_COROSIO_HAS_SELECT
203
204 #endif // BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
205