libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.3% Lines (97/115) 81.0% Functions (17/21) 65.0% Branches (26/40)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Set during registration only (no mutex needed)
125 std::uint32_t registered_events = 0;
126 int fd = -1;
127
128 // For deferred I/O - set by reactor, read by scheduler
129 std::atomic<std::uint32_t> ready_events_{0};
130 std::atomic<bool> is_enqueued_{false};
131 epoll_scheduler const* scheduler_ = nullptr;
132
133 // Prevents impl destruction while this descriptor_state is queued.
134 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 std::shared_ptr<void> impl_ref_;
136
137 /// Add ready events atomically.
138 89143 void add_ready_events(std::uint32_t ev) noexcept
139 {
140 89143 ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 89143 }
142
143 /// Perform deferred I/O and queue completions.
144 void operator()() override;
145
146 /// Destroy without invoking.
147 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
148 /// the self-referential cycle set by close_socket().
149 void destroy() override { impl_ref_.reset(); }
150 };
151
152 struct epoll_op : scheduler_op
153 {
154 struct canceller
155 {
156 epoll_op* op;
157 void operator()() const noexcept;
158 };
159
160 std::coroutine_handle<> h;
161 capy::executor_ref ex;
162 std::error_code* ec_out = nullptr;
163 std::size_t* bytes_out = nullptr;
164
165 int fd = -1;
166 int errn = 0;
167 std::size_t bytes_transferred = 0;
168
169 std::atomic<bool> cancelled{false};
170 std::optional<std::stop_callback<canceller>> stop_cb;
171
172 // Prevents use-after-free when socket is closed with pending ops.
173 // See "Impl Lifetime Management" in file header.
174 std::shared_ptr<void> impl_ptr;
175
176 // For stop_token cancellation - pointer to owning socket/acceptor impl.
177 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
178 epoll_socket_impl* socket_impl_ = nullptr;
179 epoll_acceptor_impl* acceptor_impl_ = nullptr;
180
181 16126 epoll_op() = default;
182
183 172960 void reset() noexcept
184 {
185 172960 fd = -1;
186 172960 errn = 0;
187 172960 bytes_transferred = 0;
188 172960 cancelled.store(false, std::memory_order_relaxed);
189 172960 impl_ptr.reset();
190 172960 socket_impl_ = nullptr;
191 172960 acceptor_impl_ = nullptr;
192 172960 }
193
194 167608 void operator()() override
195 {
196 167608 stop_cb.reset();
197
198
1/2
✓ Branch 0 taken 167608 times.
✗ Branch 1 not taken.
167608 if (ec_out)
199 {
200
2/2
✓ Branch 1 taken 198 times.
✓ Branch 2 taken 167410 times.
167608 if (cancelled.load(std::memory_order_acquire))
201 198 *ec_out = capy::error::canceled;
202
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 167409 times.
167410 else if (errn != 0)
203 1 *ec_out = make_err(errn);
204
6/6
✓ Branch 1 taken 83690 times.
✓ Branch 2 taken 83719 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 83685 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 167404 times.
167409 else if (is_read_operation() && bytes_transferred == 0)
205 5 *ec_out = capy::error::eof;
206 else
207 167404 *ec_out = {};
208 }
209
210
1/2
✓ Branch 0 taken 167608 times.
✗ Branch 1 not taken.
167608 if (bytes_out)
211 167608 *bytes_out = bytes_transferred;
212
213 // Move to stack before resuming coroutine. The coroutine might close
214 // the socket, releasing the last wrapper ref. If impl_ptr were the
215 // last ref and we destroyed it while still in operator(), we'd have
216 // use-after-free. Moving to local ensures destruction happens at
217 // function exit, after all member accesses are complete.
218 167608 capy::executor_ref saved_ex( std::move( ex ) );
219 167608 std::coroutine_handle<> saved_h( std::move( h ) );
220 167608 auto prevent_premature_destruction = std::move(impl_ptr);
221
2/2
✓ Branch 1 taken 167608 times.
✓ Branch 4 taken 167608 times.
167608 dispatch_coro(saved_ex, saved_h).resume();
222 167608 }
223
224 83718 virtual bool is_read_operation() const noexcept { return false; }
225 virtual void cancel() noexcept = 0;
226
227 void destroy() override
228 {
229 stop_cb.reset();
230 impl_ptr.reset();
231 }
232
233 24869 void request_cancel() noexcept
234 {
235 24869 cancelled.store(true, std::memory_order_release);
236 24869 }
237
238 170280 void start(std::stop_token token, epoll_socket_impl* impl)
239 {
240 170280 cancelled.store(false, std::memory_order_release);
241 170280 stop_cb.reset();
242 170280 socket_impl_ = impl;
243 170280 acceptor_impl_ = nullptr;
244
245
2/2
✓ Branch 1 taken 105 times.
✓ Branch 2 taken 170175 times.
170280 if (token.stop_possible())
246 105 stop_cb.emplace(token, canceller{this});
247 170280 }
248
249 2680 void start(std::stop_token token, epoll_acceptor_impl* impl)
250 {
251 2680 cancelled.store(false, std::memory_order_release);
252 2680 stop_cb.reset();
253 2680 socket_impl_ = nullptr;
254 2680 acceptor_impl_ = impl;
255
256
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2671 times.
2680 if (token.stop_possible())
257 9 stop_cb.emplace(token, canceller{this});
258 2680 }
259
260 172908 void complete(int err, std::size_t bytes) noexcept
261 {
262 172908 errn = err;
263 172908 bytes_transferred = bytes;
264 172908 }
265
266 virtual void perform_io() noexcept {}
267 };
268
269
270 struct epoll_connect_op : epoll_op
271 {
272 endpoint target_endpoint;
273
274 2672 void reset() noexcept
275 {
276 2672 epoll_op::reset();
277 2672 target_endpoint = endpoint{};
278 2672 }
279
280 2672 void perform_io() noexcept override
281 {
282 // connect() completion status is retrieved via SO_ERROR, not return value
283 2672 int err = 0;
284 2672 socklen_t len = sizeof(err);
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2672 times.
2672 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
286 err = errno;
287 2672 complete(err, 0);
288 2672 }
289
290 // Defined in sockets.cpp where epoll_socket_impl is complete
291 void operator()() override;
292 void cancel() noexcept override;
293 };
294
295
296 struct epoll_read_op : epoll_op
297 {
298 static constexpr std::size_t max_buffers = 16;
299 iovec iovecs[max_buffers];
300 int iovec_count = 0;
301 bool empty_buffer_read = false;
302
303 83691 bool is_read_operation() const noexcept override
304 {
305 83691 return !empty_buffer_read;
306 }
307
308 83885 void reset() noexcept
309 {
310 83885 epoll_op::reset();
311 83885 iovec_count = 0;
312 83885 empty_buffer_read = false;
313 83885 }
314
315 208 void perform_io() noexcept override
316 {
317 ssize_t n;
318 do {
319 208 n = ::readv(fd, iovecs, iovec_count);
320
3/4
✓ Branch 0 taken 115 times.
✓ Branch 1 taken 93 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 115 times.
208 } while (n < 0 && errno == EINTR);
321
322
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 115 times.
208 if (n >= 0)
323 93 complete(0, static_cast<std::size_t>(n));
324 else
325 115 complete(errno, 0);
326 208 }
327
328 void cancel() noexcept override;
329 };
330
331
332 struct epoll_write_op : epoll_op
333 {
334 static constexpr std::size_t max_buffers = 16;
335 iovec iovecs[max_buffers];
336 int iovec_count = 0;
337
338 83723 void reset() noexcept
339 {
340 83723 epoll_op::reset();
341 83723 iovec_count = 0;
342 83723 }
343
344 void perform_io() noexcept override
345 {
346 msghdr msg{};
347 msg.msg_iov = iovecs;
348 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
349
350 ssize_t n;
351 do {
352 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
353 } while (n < 0 && errno == EINTR);
354
355 if (n >= 0)
356 complete(0, static_cast<std::size_t>(n));
357 else
358 complete(errno, 0);
359 }
360
361 void cancel() noexcept override;
362 };
363
364
365 struct epoll_accept_op : epoll_op
366 {
367 int accepted_fd = -1;
368 io_object::io_object_impl* peer_impl = nullptr;
369 io_object::io_object_impl** impl_out = nullptr;
370
371 2680 void reset() noexcept
372 {
373 2680 epoll_op::reset();
374 2680 accepted_fd = -1;
375 2680 peer_impl = nullptr;
376 2680 impl_out = nullptr;
377 2680 }
378
379 2669 void perform_io() noexcept override
380 {
381 2669 sockaddr_in addr{};
382 2669 socklen_t addrlen = sizeof(addr);
383 int new_fd;
384 do {
385 2669 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
386 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
387
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2669 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2669 } while (new_fd < 0 && errno == EINTR);
388
389
1/2
✓ Branch 0 taken 2669 times.
✗ Branch 1 not taken.
2669 if (new_fd >= 0)
390 {
391 2669 accepted_fd = new_fd;
392 2669 complete(0, 0);
393 }
394 else
395 {
396 complete(errno, 0);
397 }
398 2669 }
399
400 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
401 void operator()() override;
402 void cancel() noexcept override;
403 };
404
405 } // namespace boost::corosio::detail
406
407 #endif // BOOST_COROSIO_HAS_EPOLL
408
409 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
410