libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

81.8% Lines (193/236) 100.0% Functions (18/18) 52.7% Branches (69/131)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <utility>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <sys/epoll.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 6 epoll_accept_op::
32 cancel() noexcept
33 {
34
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
35 6 acceptor_impl_->cancel_single_op(*this);
36 else
37 request_cancel();
38 6 }
39
40 void
41 2680 epoll_accept_op::
42 operator()()
43 {
44 2680 stop_cb.reset();
45
46
3/4
✓ Branch 0 taken 2680 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2671 times.
✓ Branch 4 taken 9 times.
2680 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
47
48
1/2
✓ Branch 0 taken 2680 times.
✗ Branch 1 not taken.
2680 if (ec_out)
49 {
50
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2671 times.
2680 if (cancelled.load(std::memory_order_acquire))
51 9 *ec_out = capy::error::canceled;
52
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2671 times.
2671 else if (errn != 0)
53 *ec_out = make_err(errn);
54 else
55 2671 *ec_out = {};
56 }
57
58
3/4
✓ Branch 0 taken 2671 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2671 times.
✗ Branch 3 not taken.
2680 if (success && accepted_fd >= 0)
59 {
60
1/2
✓ Branch 0 taken 2671 times.
✗ Branch 1 not taken.
2671 if (acceptor_impl_)
61 {
62 2671 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
63 2671 ->service().socket_service();
64
1/2
✓ Branch 0 taken 2671 times.
✗ Branch 1 not taken.
2671 if (socket_svc)
65 {
66
1/1
✓ Branch 1 taken 2671 times.
2671 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
67 2671 impl.set_socket(accepted_fd);
68
69 // Register accepted socket with epoll (edge-triggered mode)
70 2671 impl.desc_state_.fd = accepted_fd;
71 {
72
1/1
✓ Branch 1 taken 2671 times.
2671 std::lock_guard lock(impl.desc_state_.mutex);
73 2671 impl.desc_state_.read_op = nullptr;
74 2671 impl.desc_state_.write_op = nullptr;
75 2671 impl.desc_state_.connect_op = nullptr;
76 2671 }
77
1/1
✓ Branch 2 taken 2671 times.
2671 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
78
79 2671 sockaddr_in local_addr{};
80 2671 socklen_t local_len = sizeof(local_addr);
81 2671 sockaddr_in remote_addr{};
82 2671 socklen_t remote_len = sizeof(remote_addr);
83
84 2671 endpoint local_ep, remote_ep;
85
1/2
✓ Branch 1 taken 2671 times.
✗ Branch 2 not taken.
2671 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 2671 local_ep = from_sockaddr_in(local_addr);
87
1/2
✓ Branch 1 taken 2671 times.
✗ Branch 2 not taken.
2671 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
88 2671 remote_ep = from_sockaddr_in(remote_addr);
89
90 2671 impl.set_endpoints(local_ep, remote_ep);
91
92
1/2
✓ Branch 0 taken 2671 times.
✗ Branch 1 not taken.
2671 if (impl_out)
93 2671 *impl_out = &impl;
94
95 2671 accepted_fd = -1;
96 }
97 else
98 {
99 if (ec_out && !*ec_out)
100 *ec_out = make_err(ENOENT);
101 ::close(accepted_fd);
102 accepted_fd = -1;
103 if (impl_out)
104 *impl_out = nullptr;
105 }
106 }
107 else
108 {
109 ::close(accepted_fd);
110 accepted_fd = -1;
111 if (impl_out)
112 *impl_out = nullptr;
113 }
114 2671 }
115 else
116 {
117
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
118 {
119 ::close(accepted_fd);
120 accepted_fd = -1;
121 }
122
123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (peer_impl)
124 {
125 peer_impl->release();
126 peer_impl = nullptr;
127 }
128
129
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
130 9 *impl_out = nullptr;
131 }
132
133 // Move to stack before resuming. See epoll_op::operator()() for rationale.
134 2680 capy::executor_ref saved_ex( std::move( ex ) );
135 2680 std::coroutine_handle<> saved_h( std::move( h ) );
136 2680 auto prevent_premature_destruction = std::move(impl_ptr);
137
2/2
✓ Branch 1 taken 2680 times.
✓ Branch 4 taken 2680 times.
2680 dispatch_coro(saved_ex, saved_h).resume();
138 2680 }
139
140 64 epoll_acceptor_impl::
141 64 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
142 64 : svc_(svc)
143 {
144 64 }
145
146 void
147 64 epoll_acceptor_impl::
148 release()
149 {
150 64 close_socket();
151 64 svc_.destroy_acceptor_impl(*this);
152 64 }
153
154 std::coroutine_handle<>
155 2680 epoll_acceptor_impl::
156 accept(
157 std::coroutine_handle<> h,
158 capy::executor_ref ex,
159 std::stop_token token,
160 std::error_code* ec,
161 io_object::io_object_impl** impl_out)
162 {
163 2680 auto& op = acc_;
164 2680 op.reset();
165 2680 op.h = h;
166 2680 op.ex = ex;
167 2680 op.ec_out = ec;
168 2680 op.impl_out = impl_out;
169 2680 op.fd = fd_;
170 2680 op.start(token, this);
171
172 2680 sockaddr_in addr{};
173 2680 socklen_t addrlen = sizeof(addr);
174 int accepted;
175 do {
176
1/1
✓ Branch 1 taken 2680 times.
2680 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
177 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
178
3/4
✓ Branch 0 taken 2678 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2678 times.
2680 } while (accepted < 0 && errno == EINTR);
179
180
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2678 times.
2680 if (accepted >= 0)
181 {
182 {
183
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
184 2 desc_state_.read_ready = false;
185 2 }
186 2 op.accepted_fd = accepted;
187 2 op.complete(0, 0);
188
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
189
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
190 // completion is always posted to scheduler queue, never inline.
191 2 return std::noop_coroutine();
192 }
193
194
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2678 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2678 if (errno == EAGAIN || errno == EWOULDBLOCK)
195 {
196 2678 svc_.work_started();
197
1/1
✓ Branch 1 taken 2678 times.
2678 op.impl_ptr = shared_from_this();
198
199
1/1
✓ Branch 1 taken 2678 times.
2678 std::lock_guard lock(desc_state_.mutex);
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2678 times.
2678 if (desc_state_.read_ready)
201 {
202 desc_state_.read_ready = false;
203 op.perform_io();
204 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
205 {
206 op.errn = 0;
207 if (op.cancelled.load(std::memory_order_acquire))
208 {
209 svc_.post(&op);
210 svc_.work_finished();
211 }
212 else
213 {
214 desc_state_.read_op = &op;
215 }
216 }
217 else
218 {
219 svc_.post(&op);
220 svc_.work_finished();
221 }
222 }
223 else
224 {
225
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2678 times.
2678 if (op.cancelled.load(std::memory_order_acquire))
226 {
227 svc_.post(&op);
228 svc_.work_finished();
229 }
230 else
231 {
232 2678 desc_state_.read_op = &op;
233 }
234 }
235 2678 return std::noop_coroutine();
236 2678 }
237
238 op.complete(errno, 0);
239 op.impl_ptr = shared_from_this();
240 svc_.post(&op);
241 // completion is always posted to scheduler queue, never inline.
242 return std::noop_coroutine();
243 }
244
245 void
246 129 epoll_acceptor_impl::
247 cancel() noexcept
248 {
249 129 std::shared_ptr<epoll_acceptor_impl> self;
250 try {
251
1/1
✓ Branch 1 taken 129 times.
129 self = shared_from_this();
252 } catch (const std::bad_weak_ptr&) {
253 return;
254 }
255
256 129 acc_.request_cancel();
257
258 129 epoll_op* claimed = nullptr;
259 {
260 129 std::lock_guard lock(desc_state_.mutex);
261
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 126 times.
129 if (desc_state_.read_op == &acc_)
262 3 claimed = std::exchange(desc_state_.read_op, nullptr);
263 129 }
264
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 126 times.
129 if (claimed)
265 {
266 3 acc_.impl_ptr = self;
267 3 svc_.post(&acc_);
268 3 svc_.work_finished();
269 }
270 129 }
271
272 void
273 6 epoll_acceptor_impl::
274 cancel_single_op(epoll_op& op) noexcept
275 {
276 6 op.request_cancel();
277
278 6 epoll_op* claimed = nullptr;
279 {
280 6 std::lock_guard lock(desc_state_.mutex);
281
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (desc_state_.read_op == &op)
282 6 claimed = std::exchange(desc_state_.read_op, nullptr);
283 6 }
284
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (claimed)
285 {
286 try {
287
1/1
✓ Branch 1 taken 6 times.
6 op.impl_ptr = shared_from_this();
288 } catch (const std::bad_weak_ptr&) {}
289 6 svc_.post(&op);
290 6 svc_.work_finished();
291 }
292 6 }
293
294 void
295 128 epoll_acceptor_impl::
296 close_socket() noexcept
297 {
298 128 cancel();
299
300
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 128 times.
128 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
301 {
302 try {
303 desc_state_.impl_ref_ = shared_from_this();
304 } catch (std::bad_weak_ptr const&) {}
305 }
306
307
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 66 times.
128 if (fd_ >= 0)
308 {
309
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
310 62 svc_.scheduler().deregister_descriptor(fd_);
311 62 ::close(fd_);
312 62 fd_ = -1;
313 }
314
315 128 desc_state_.fd = -1;
316 {
317 128 std::lock_guard lock(desc_state_.mutex);
318 128 desc_state_.read_op = nullptr;
319 128 desc_state_.read_ready = false;
320 128 desc_state_.write_ready = false;
321 128 }
322 128 desc_state_.registered_events = 0;
323
324 // Clear cached endpoint
325 128 local_endpoint_ = endpoint{};
326 128 }
327
328 189 epoll_acceptor_service::
329 189 epoll_acceptor_service(capy::execution_context& ctx)
330 189 : ctx_(ctx)
331
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
332 {
333 189 }
334
335 378 epoll_acceptor_service::
336 189 ~epoll_acceptor_service()
337 {
338 378 }
339
340 void
341 189 epoll_acceptor_service::
342 shutdown()
343 {
344
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
345
346
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->acceptor_list_.pop_front())
347 impl->close_socket();
348
349 // Don't clear acceptor_ptrs_ here — same rationale as
350 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
351 // after scheduler shutdown has drained all queued ops.
352 189 }
353
354 tcp_acceptor::acceptor_impl&
355 64 epoll_acceptor_service::
356 create_acceptor_impl()
357 {
358
1/1
✓ Branch 1 taken 64 times.
64 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
359 64 auto* raw = impl.get();
360
361
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
362 64 state_->acceptor_list_.push_back(raw);
363
1/1
✓ Branch 3 taken 64 times.
64 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
364
365 64 return *raw;
366 64 }
367
368 void
369 64 epoll_acceptor_service::
370 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
371 {
372 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
373
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
374 64 state_->acceptor_list_.remove(epoll_impl);
375
1/1
✓ Branch 2 taken 64 times.
64 state_->acceptor_ptrs_.erase(epoll_impl);
376 64 }
377
378 std::error_code
379 64 epoll_acceptor_service::
380 open_acceptor(
381 tcp_acceptor::acceptor_impl& impl,
382 endpoint ep,
383 int backlog)
384 {
385 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
386 64 epoll_impl->close_socket();
387
388 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
389
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if (fd < 0)
390 return make_err(errno);
391
392 64 int reuse = 1;
393 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
394
395 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
396
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 62 times.
64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
397 {
398 2 int errn = errno;
399
1/1
✓ Branch 1 taken 2 times.
2 ::close(fd);
400 2 return make_err(errn);
401 }
402
403
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
404 {
405 int errn = errno;
406 ::close(fd);
407 return make_err(errn);
408 }
409
410 62 epoll_impl->fd_ = fd;
411
412 // Register fd with epoll (edge-triggered mode)
413 62 epoll_impl->desc_state_.fd = fd;
414 {
415
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
416 62 epoll_impl->desc_state_.read_op = nullptr;
417 62 }
418
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
419
420 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
421 62 sockaddr_in local_addr{};
422 62 socklen_t local_len = sizeof(local_addr);
423
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
424 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
425
426 62 return {};
427 }
428
429 void
430 11 epoll_acceptor_service::
431 post(epoll_op* op)
432 {
433 11 state_->sched_.post(op);
434 11 }
435
436 void
437 2678 epoll_acceptor_service::
438 work_started() noexcept
439 {
440 2678 state_->sched_.work_started();
441 2678 }
442
443 void
444 9 epoll_acceptor_service::
445 work_finished() noexcept
446 {
447 9 state_->sched_.work_finished();
448 9 }
449
450 epoll_socket_service*
451 2671 epoll_acceptor_service::
452 socket_service() const noexcept
453 {
454 2671 auto* svc = ctx_.find_service<detail::socket_service>();
455
2/4
✓ Branch 0 taken 2671 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2671 times.
✗ Branch 3 not taken.
2671 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
456 }
457
458 } // namespace boost::corosio::detail
459
460 #endif // BOOST_COROSIO_HAS_EPOLL
461