libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.5% Lines (391/486) 89.1% Functions (41/46) 68.7% Branches (204/297)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 160 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 160 : key(k)
108 160 , next(n)
109 160 , private_outstanding_work(0)
110 {
111 160 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 160 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 160 : frame_(ctx, context_stack.get())
125 {
126 160 context_stack.set(&frame_);
127 160 }
128
129 160 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 160 times.
160 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 160 context_stack.set(frame_.next);
134 160 }
135 };
136
137 scheduler_context*
138 258773 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 257124 times.
✓ Branch 2 taken 1649 times.
258773 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 257124 times.
✗ Branch 1 not taken.
257124 if (c->key == self)
142 257124 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 89143 descriptor_state::
150 operator()()
151 {
152 89143 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 89143 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 89143 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 89143 times.
89143 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 89143 op_queue local_ops;
166
167 89143 int err = 0;
168
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 89142 times.
89143 if (ev & EPOLLERR)
169 {
170 1 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
174 1 err = EIO;
175 }
176
177 {
178
1/1
✓ Branch 1 taken 89143 times.
89143 std::lock_guard lock(mutex);
179
2/2
✓ Branch 0 taken 34599 times.
✓ Branch 1 taken 54544 times.
89143 if (ev & EPOLLIN)
180 {
181
2/2
✓ Branch 0 taken 2762 times.
✓ Branch 1 taken 31837 times.
34599 if (read_op)
182 {
183 2762 auto* rd = read_op;
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2762 times.
2762 if (err)
185 rd->complete(err, 0);
186 else
187 2762 rd->perform_io();
188
189
2/4
✓ Branch 0 taken 2762 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2762 times.
2762 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
190 {
191 rd->errn = 0;
192 }
193 else
194 {
195 2762 read_op = nullptr;
196 2762 local_ops.push(rd);
197 }
198 }
199 else
200 {
201 31837 read_ready = true;
202 }
203 }
204
2/2
✓ Branch 0 taken 86474 times.
✓ Branch 1 taken 2669 times.
89143 if (ev & EPOLLOUT)
205 {
206
3/4
✓ Branch 0 taken 83802 times.
✓ Branch 1 taken 2672 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 83802 times.
86474 bool had_write_op = (connect_op || write_op);
207
2/2
✓ Branch 0 taken 2672 times.
✓ Branch 1 taken 83802 times.
86474 if (connect_op)
208 {
209 2672 auto* cn = connect_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2672 times.
2672 if (err)
211 cn->complete(err, 0);
212 else
213 2672 cn->perform_io();
214 2672 connect_op = nullptr;
215 2672 local_ops.push(cn);
216 }
217
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 86474 times.
86474 if (write_op)
218 {
219 auto* wr = write_op;
220 if (err)
221 wr->complete(err, 0);
222 else
223 wr->perform_io();
224
225 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
226 {
227 wr->errn = 0;
228 }
229 else
230 {
231 write_op = nullptr;
232 local_ops.push(wr);
233 }
234 }
235
2/2
✓ Branch 0 taken 83802 times.
✓ Branch 1 taken 2672 times.
86474 if (!had_write_op)
236 83802 write_ready = true;
237 }
238
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 89142 times.
89143 if (err)
239 {
240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
241 {
242 read_op->complete(err, 0);
243 local_ops.push(std::exchange(read_op, nullptr));
244 }
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
246 {
247 write_op->complete(err, 0);
248 local_ops.push(std::exchange(write_op, nullptr));
249 }
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
251 {
252 connect_op->complete(err, 0);
253 local_ops.push(std::exchange(connect_op, nullptr));
254 }
255 }
256 89143 }
257
258 // Execute first handler inline — the scheduler's work_cleanup
259 // accounts for this as the "consumed" work item
260 89143 scheduler_op* first = local_ops.pop();
261
2/2
✓ Branch 0 taken 5434 times.
✓ Branch 1 taken 83709 times.
89143 if (first)
262 {
263
1/1
✓ Branch 1 taken 5434 times.
5434 scheduler_->post_deferred_completions(local_ops);
264
1/1
✓ Branch 1 taken 5434 times.
5434 (*first)();
265 }
266 else
267 {
268 83709 scheduler_->compensating_work_started();
269 }
270 89143 }
271
272 189 epoll_scheduler::
273 epoll_scheduler(
274 capy::execution_context& ctx,
275 189 int)
276 189 : epoll_fd_(-1)
277 189 , event_fd_(-1)
278 189 , timer_fd_(-1)
279 189 , outstanding_work_(0)
280 189 , stopped_(false)
281 189 , shutdown_(false)
282 189 , task_running_{false}
283 189 , task_interrupted_(false)
284 378 , state_(0)
285 {
286 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
288 detail::throw_system_error(make_err(errno), "epoll_create1");
289
290 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
292 {
293 int errn = errno;
294 ::close(epoll_fd_);
295 detail::throw_system_error(make_err(errn), "eventfd");
296 }
297
298 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
300 {
301 int errn = errno;
302 ::close(event_fd_);
303 ::close(epoll_fd_);
304 detail::throw_system_error(make_err(errn), "timerfd_create");
305 }
306
307 189 epoll_event ev{};
308 189 ev.events = EPOLLIN | EPOLLET;
309 189 ev.data.ptr = nullptr;
310
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
311 {
312 int errn = errno;
313 ::close(timer_fd_);
314 ::close(event_fd_);
315 ::close(epoll_fd_);
316 detail::throw_system_error(make_err(errn), "epoll_ctl");
317 }
318
319 189 epoll_event timer_ev{};
320 189 timer_ev.events = EPOLLIN | EPOLLERR;
321 189 timer_ev.data.ptr = &timer_fd_;
322
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
323 {
324 int errn = errno;
325 ::close(timer_fd_);
326 ::close(event_fd_);
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
329 }
330
331
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
332
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
333 timer_service::callback(
334 this,
335 [](void* p) {
336 2864 auto* self = static_cast<epoll_scheduler*>(p);
337 2864 self->timerfd_stale_.store(true, std::memory_order_release);
338
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2864 times.
2864 if (self->task_running_.load(std::memory_order_acquire))
339 self->interrupt_reactor();
340 2864 }));
341
342 // Initialize resolver service
343
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
344
345 // Initialize signal service
346
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
347
348 // Push task sentinel to interleave reactor runs with handler execution
349 189 completed_ops_.push(&task_op_);
350 189 }
351
352 378 epoll_scheduler::
353 189 ~epoll_scheduler()
354 {
355
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
356 189 ::close(timer_fd_);
357
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
358 189 ::close(event_fd_);
359
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
360 189 ::close(epoll_fd_);
361 378 }
362
363 void
364 189 epoll_scheduler::
365 shutdown()
366 {
367 {
368
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
369 189 shutdown_ = true;
370
371
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
372 {
373
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
374 189 continue;
375 lock.unlock();
376 h->destroy();
377 lock.lock();
378 189 }
379
380 189 signal_all(lock);
381 189 }
382
383 189 outstanding_work_.store(0, std::memory_order_release);
384
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 interrupt_reactor();
387 189 }
388
389 void
390 4632 epoll_scheduler::
391 post(std::coroutine_handle<> h) const
392 {
393 struct post_handler final
394 : scheduler_op
395 {
396 std::coroutine_handle<> h_;
397
398 explicit
399 4632 post_handler(std::coroutine_handle<> h)
400 4632 : h_(h)
401 {
402 4632 }
403
404 9264 ~post_handler() = default;
405
406 4632 void operator()() override
407 {
408 4632 auto h = h_;
409
1/2
✓ Branch 0 taken 4632 times.
✗ Branch 1 not taken.
4632 delete this;
410
1/1
✓ Branch 1 taken 4632 times.
4632 h.resume();
411 4632 }
412
413 void destroy() override
414 {
415 delete this;
416 }
417 };
418
419
1/1
✓ Branch 1 taken 4632 times.
4632 auto ph = std::make_unique<post_handler>(h);
420
421 // Fast path: same thread posts to private queue
422 // Only count locally; work_cleanup batches to global counter
423
2/2
✓ Branch 1 taken 3009 times.
✓ Branch 2 taken 1623 times.
4632 if (auto* ctx = find_context(this))
424 {
425 3009 ++ctx->private_outstanding_work;
426 3009 ctx->private_queue.push(ph.release());
427 3009 return;
428 }
429
430 // Slow path: cross-thread post requires mutex
431 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
432
433
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
434 1623 completed_ops_.push(ph.release());
435
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
436 4632 }
437
438 void
439 170432 epoll_scheduler::
440 post(scheduler_op* h) const
441 {
442 // Fast path: same thread posts to private queue
443 // Only count locally; work_cleanup batches to global counter
444
2/2
✓ Branch 1 taken 170406 times.
✓ Branch 2 taken 26 times.
170432 if (auto* ctx = find_context(this))
445 {
446 170406 ++ctx->private_outstanding_work;
447 170406 ctx->private_queue.push(h);
448 170406 return;
449 }
450
451 // Slow path: cross-thread post requires mutex
452 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
453
454
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
455 26 completed_ops_.push(h);
456
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
457 26 }
458
459 void
460 3483 epoll_scheduler::
461 on_work_started() noexcept
462 {
463 3483 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
464 3483 }
465
466 void
467 3451 epoll_scheduler::
468 on_work_finished() noexcept
469 {
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3451 times.
6902 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
471 stop();
472 3451 }
473
474 bool
475 600 epoll_scheduler::
476 running_in_this_thread() const noexcept
477 {
478
2/2
✓ Branch 1 taken 390 times.
✓ Branch 2 taken 210 times.
600 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
479
1/2
✓ Branch 0 taken 390 times.
✗ Branch 1 not taken.
390 if (c->key == this)
480 390 return true;
481 210 return false;
482 }
483
484 void
485 38 epoll_scheduler::
486 stop()
487 {
488
1/1
✓ Branch 1 taken 38 times.
38 std::unique_lock lock(mutex_);
489
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 18 times.
38 if (!stopped_)
490 {
491 20 stopped_ = true;
492 20 signal_all(lock);
493
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
494 }
495 38 }
496
497 bool
498 16 epoll_scheduler::
499 stopped() const noexcept
500 {
501 16 std::unique_lock lock(mutex_);
502 32 return stopped_;
503 16 }
504
505 void
506 49 epoll_scheduler::
507 restart()
508 {
509
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
510 49 stopped_ = false;
511 49 }
512
513 std::size_t
514 175 epoll_scheduler::
515 run()
516 {
517
2/2
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 146 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
518 {
519
1/1
✓ Branch 1 taken 29 times.
29 stop();
520 29 return 0;
521 }
522
523 146 thread_context_guard ctx(this);
524
1/1
✓ Branch 1 taken 146 times.
146 std::unique_lock lock(mutex_);
525
526 146 std::size_t n = 0;
527 for (;;)
528 {
529
3/3
✓ Branch 1 taken 264338 times.
✓ Branch 3 taken 146 times.
✓ Branch 4 taken 264192 times.
264338 if (!do_one(lock, -1, &ctx.frame_))
530 146 break;
531
1/2
✓ Branch 1 taken 264192 times.
✗ Branch 2 not taken.
264192 if (n != (std::numeric_limits<std::size_t>::max)())
532 264192 ++n;
533
2/2
✓ Branch 1 taken 93815 times.
✓ Branch 2 taken 170377 times.
264192 if (!lock.owns_lock())
534
1/1
✓ Branch 1 taken 93815 times.
93815 lock.lock();
535 }
536 146 return n;
537 146 }
538
539 std::size_t
540 2 epoll_scheduler::
541 run_one()
542 {
543
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545 stop();
546 return 0;
547 }
548
549 2 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
551
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
552 2 }
553
554 std::size_t
555 14 epoll_scheduler::
556 wait_one(long usec)
557 {
558
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
559 {
560
1/1
✓ Branch 1 taken 5 times.
5 stop();
561 5 return 0;
562 }
563
564 9 thread_context_guard ctx(this);
565
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
566
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
567 9 }
568
569 std::size_t
570 2 epoll_scheduler::
571 poll()
572 {
573
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
574 {
575
1/1
✓ Branch 1 taken 1 time.
1 stop();
576 1 return 0;
577 }
578
579 1 thread_context_guard ctx(this);
580
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
581
582 1 std::size_t n = 0;
583 for (;;)
584 {
585
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
586 1 break;
587
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
588 2 ++n;
589
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
590
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
591 }
592 1 return n;
593 1 }
594
595 std::size_t
596 4 epoll_scheduler::
597 poll_one()
598 {
599
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 2 times.
2 stop();
602 2 return 0;
603 }
604
605 2 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
607
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
608 2 }
609
610 void
611 5416 epoll_scheduler::
612 register_descriptor(int fd, descriptor_state* desc) const
613 {
614 5416 epoll_event ev{};
615 5416 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
616 5416 ev.data.ptr = desc;
617
618
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5416 times.
5416 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
619 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
620
621 5416 desc->registered_events = ev.events;
622 5416 desc->fd = fd;
623 5416 desc->scheduler_ = this;
624
625
1/1
✓ Branch 1 taken 5416 times.
5416 std::lock_guard lock(desc->mutex);
626 5416 desc->read_ready = false;
627 5416 desc->write_ready = false;
628 5416 }
629
630 void
631 5416 epoll_scheduler::
632 deregister_descriptor(int fd) const
633 {
634 5416 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
635 5416 }
636
637 void
638 5601 epoll_scheduler::
639 work_started() const noexcept
640 {
641 5601 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
642 5601 }
643
644 void
645 10317 epoll_scheduler::
646 work_finished() const noexcept
647 {
648
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 10169 times.
20634 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
649 {
650 // Last work item completed - wake all threads so they can exit.
651 // signal_all() wakes threads waiting on the condvar.
652 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
653 // Both are needed because they target different blocking mechanisms.
654 148 std::unique_lock lock(mutex_);
655 148 signal_all(lock);
656
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 146 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
657 {
658 2 task_interrupted_ = true;
659 2 lock.unlock();
660 2 interrupt_reactor();
661 }
662 148 }
663 10317 }
664
665 void
666 83709 epoll_scheduler::
667 compensating_work_started() const noexcept
668 {
669 83709 auto* ctx = find_context(this);
670
1/2
✓ Branch 0 taken 83709 times.
✗ Branch 1 not taken.
83709 if (ctx)
671 83709 ++ctx->private_outstanding_work;
672 83709 }
673
674 void
675 epoll_scheduler::
676 drain_thread_queue(op_queue& queue, long count) const
677 {
678 // Note: outstanding_work_ was already incremented when posting
679 std::unique_lock lock(mutex_);
680 completed_ops_.splice(queue);
681 if (count > 0)
682 maybe_unlock_and_signal_one(lock);
683 }
684
685 void
686 5434 epoll_scheduler::
687 post_deferred_completions(op_queue& ops) const
688 {
689
1/2
✓ Branch 1 taken 5434 times.
✗ Branch 2 not taken.
5434 if (ops.empty())
690 5434 return;
691
692 // Fast path: if on scheduler thread, use private queue
693 if (auto* ctx = find_context(this))
694 {
695 ctx->private_queue.splice(ops);
696 return;
697 }
698
699 // Slow path: add to global queue and wake a thread
700 std::unique_lock lock(mutex_);
701 completed_ops_.splice(ops);
702 wake_one_thread_and_unlock(lock);
703 }
704
705 void
706 237 epoll_scheduler::
707 interrupt_reactor() const
708 {
709 // Only write if not already armed to avoid redundant writes
710 237 bool expected = false;
711
2/2
✓ Branch 1 taken 223 times.
✓ Branch 2 taken 14 times.
237 if (eventfd_armed_.compare_exchange_strong(expected, true,
712 std::memory_order_release, std::memory_order_relaxed))
713 {
714 223 std::uint64_t val = 1;
715
1/1
✓ Branch 1 taken 223 times.
223 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
716 }
717 237 }
718
719 void
720 357 epoll_scheduler::
721 signal_all(std::unique_lock<std::mutex>&) const
722 {
723 357 state_ |= 1;
724 357 cond_.notify_all();
725 357 }
726
727 bool
728 1649 epoll_scheduler::
729 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
730 {
731 1649 state_ |= 1;
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
733 {
734 lock.unlock();
735 cond_.notify_one();
736 return true;
737 }
738 1649 return false;
739 }
740
741 void
742 344077 epoll_scheduler::
743 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
744 {
745 344077 state_ |= 1;
746 344077 bool have_waiters = state_ > 1;
747 344077 lock.unlock();
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 344077 times.
344077 if (have_waiters)
749 cond_.notify_one();
750 344077 }
751
752 void
753 epoll_scheduler::
754 clear_signal() const
755 {
756 state_ &= ~std::size_t(1);
757 }
758
759 void
760 epoll_scheduler::
761 wait_for_signal(std::unique_lock<std::mutex>& lock) const
762 {
763 while ((state_ & 1) == 0)
764 {
765 state_ += 2;
766 cond_.wait(lock);
767 state_ -= 2;
768 }
769 }
770
771 void
772 epoll_scheduler::
773 wait_for_signal_for(
774 std::unique_lock<std::mutex>& lock,
775 long timeout_us) const
776 {
777 if ((state_ & 1) == 0)
778 {
779 state_ += 2;
780 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
781 state_ -= 2;
782 }
783 }
784
785 void
786 1649 epoll_scheduler::
787 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
788 {
789
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
790 return;
791
792
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
793 {
794 26 task_interrupted_ = true;
795 26 lock.unlock();
796 26 interrupt_reactor();
797 }
798 else
799 {
800 1623 lock.unlock();
801 }
802 }
803
804 /** RAII guard for handler execution work accounting.
805
806 Handler consumes 1 work item, may produce N new items via fast-path posts.
807 Net change = N - 1:
808 - If N > 1: add (N-1) to global (more work produced than consumed)
809 - If N == 1: net zero, do nothing
810 - If N < 1: call work_finished() (work consumed, may trigger stop)
811
812 Also drains private queue to global for other threads to process.
813 */
814 struct work_cleanup
815 {
816 epoll_scheduler const* scheduler;
817 std::unique_lock<std::mutex>* lock;
818 scheduler_context* ctx;
819
820 264207 ~work_cleanup()
821 {
822
1/2
✓ Branch 0 taken 264207 times.
✗ Branch 1 not taken.
264207 if (ctx)
823 {
824 264207 long produced = ctx->private_outstanding_work;
825
2/2
✓ Branch 0 taken 169 times.
✓ Branch 1 taken 264038 times.
264207 if (produced > 1)
826 169 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
827
2/2
✓ Branch 0 taken 10118 times.
✓ Branch 1 taken 253920 times.
264038 else if (produced < 1)
828 10118 scheduler->work_finished();
829 // produced == 1: net zero, handler consumed what it produced
830 264207 ctx->private_outstanding_work = 0;
831
832
2/2
✓ Branch 1 taken 170380 times.
✓ Branch 2 taken 93827 times.
264207 if (!ctx->private_queue.empty())
833 {
834 170380 lock->lock();
835 170380 scheduler->completed_ops_.splice(ctx->private_queue);
836 }
837 }
838 else
839 {
840 // No thread context - slow-path op was already counted globally
841 scheduler->work_finished();
842 }
843 264207 }
844 };
845
846 /** RAII guard for reactor work accounting.
847
848 Reactor only produces work via timer/signal callbacks posting handlers.
849 Unlike handler execution which consumes 1, the reactor consumes nothing.
850 All produced work must be flushed to global counter.
851 */
852 struct task_cleanup
853 {
854 epoll_scheduler const* scheduler;
855 std::unique_lock<std::mutex>* lock;
856 scheduler_context* ctx;
857
858 85429 ~task_cleanup()
859 85429 {
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 85429 times.
85429 if (!ctx)
861 return;
862
863
2/2
✓ Branch 0 taken 2865 times.
✓ Branch 1 taken 82564 times.
85429 if (ctx->private_outstanding_work > 0)
864 {
865 2865 scheduler->outstanding_work_.fetch_add(
866 2865 ctx->private_outstanding_work, std::memory_order_relaxed);
867 2865 ctx->private_outstanding_work = 0;
868 }
869
870
2/2
✓ Branch 1 taken 2865 times.
✓ Branch 2 taken 82564 times.
85429 if (!ctx->private_queue.empty())
871 {
872
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2865 times.
2865 if (!lock->owns_lock())
873 lock->lock();
874 2865 scheduler->completed_ops_.splice(ctx->private_queue);
875 }
876 85429 }
877 };
878
879 void
880 5725 epoll_scheduler::
881 update_timerfd() const
882 {
883 5725 auto nearest = timer_svc_->nearest_expiry();
884
885 5725 itimerspec ts{};
886 5725 int flags = 0;
887
888
3/3
✓ Branch 2 taken 5725 times.
✓ Branch 4 taken 5684 times.
✓ Branch 5 taken 41 times.
5725 if (nearest == timer_service::time_point::max())
889 {
890 // No timers - disarm by setting to 0 (relative)
891 }
892 else
893 {
894 5684 auto now = std::chrono::steady_clock::now();
895
3/3
✓ Branch 1 taken 5684 times.
✓ Branch 4 taken 12 times.
✓ Branch 5 taken 5672 times.
5684 if (nearest <= now)
896 {
897 // Use 1ns instead of 0 - zero disarms the timerfd
898 12 ts.it_value.tv_nsec = 1;
899 }
900 else
901 {
902 5672 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
903
1/1
✓ Branch 1 taken 5672 times.
11344 nearest - now).count();
904 5672 ts.it_value.tv_sec = nsec / 1000000000;
905 5672 ts.it_value.tv_nsec = nsec % 1000000000;
906 // Ensure non-zero to avoid disarming if duration rounds to 0
907
3/4
✓ Branch 0 taken 5668 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5668 times.
5672 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
908 ts.it_value.tv_nsec = 1;
909 }
910 }
911
912
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5725 times.
5725 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
913 detail::throw_system_error(make_err(errno), "timerfd_settime");
914 5725 }
915
916 void
917 85429 epoll_scheduler::
918 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
919 {
920
2/2
✓ Branch 0 taken 79870 times.
✓ Branch 1 taken 5559 times.
85429 int timeout_ms = task_interrupted_ ? 0 : -1;
921
922
2/2
✓ Branch 1 taken 5559 times.
✓ Branch 2 taken 79870 times.
85429 if (lock.owns_lock())
923
1/1
✓ Branch 1 taken 5559 times.
5559 lock.unlock();
924
925 85429 task_cleanup on_exit{this, &lock, ctx};
926
927 // Flush deferred timerfd programming before blocking
928
2/2
✓ Branch 1 taken 2860 times.
✓ Branch 2 taken 82569 times.
85429 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
929
1/1
✓ Branch 1 taken 2860 times.
2860 update_timerfd();
930
931 // Event loop runs without mutex held
932 epoll_event events[128];
933
1/1
✓ Branch 1 taken 85429 times.
85429 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
934
935
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 85429 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
85429 if (nfds < 0 && errno != EINTR)
936 detail::throw_system_error(make_err(errno), "epoll_wait");
937
938 85429 bool check_timers = false;
939 85429 op_queue local_ops;
940
941 // Process events without holding the mutex
942
2/2
✓ Branch 0 taken 92042 times.
✓ Branch 1 taken 85429 times.
177471 for (int i = 0; i < nfds; ++i)
943 {
944
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 92008 times.
92042 if (events[i].data.ptr == nullptr)
945 {
946 std::uint64_t val;
947
1/1
✓ Branch 1 taken 34 times.
34 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
948 34 eventfd_armed_.store(false, std::memory_order_relaxed);
949 34 continue;
950 34 }
951
952
2/2
✓ Branch 0 taken 2865 times.
✓ Branch 1 taken 89143 times.
92008 if (events[i].data.ptr == &timer_fd_)
953 {
954 std::uint64_t expirations;
955
1/1
✓ Branch 1 taken 2865 times.
2865 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
956 2865 check_timers = true;
957 2865 continue;
958 2865 }
959
960 // Deferred I/O: just set ready events and enqueue descriptor
961 // No per-descriptor mutex locking in reactor hot path!
962 89143 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
963 89143 desc->add_ready_events(events[i].events);
964
965 // Only enqueue if not already enqueued
966 89143 bool expected = false;
967
1/2
✓ Branch 1 taken 89143 times.
✗ Branch 2 not taken.
89143 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
968 std::memory_order_release, std::memory_order_relaxed))
969 {
970 89143 local_ops.push(desc);
971 }
972 }
973
974 // Process timers only when timerfd fires
975
2/2
✓ Branch 0 taken 2865 times.
✓ Branch 1 taken 82564 times.
85429 if (check_timers)
976 {
977
1/1
✓ Branch 1 taken 2865 times.
2865 timer_svc_->process_expired();
978
1/1
✓ Branch 1 taken 2865 times.
2865 update_timerfd();
979 }
980
981
1/1
✓ Branch 1 taken 85429 times.
85429 lock.lock();
982
983
2/2
✓ Branch 1 taken 47621 times.
✓ Branch 2 taken 37808 times.
85429 if (!local_ops.empty())
984 47621 completed_ops_.splice(local_ops);
985 85429 }
986
987 std::size_t
988 264354 epoll_scheduler::
989 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
990 {
991 for (;;)
992 {
993
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 349780 times.
349783 if (stopped_)
994 3 return 0;
995
996 349780 scheduler_op* op = completed_ops_.pop();
997
998 // Handle reactor sentinel - time to poll for I/O
999
2/2
✓ Branch 0 taken 85571 times.
✓ Branch 1 taken 264209 times.
349780 if (op == &task_op_)
1000 {
1001 85571 bool more_handlers = !completed_ops_.empty();
1002
1003 // Nothing to run the reactor for: no pending work to wait on,
1004 // or caller requested a non-blocking poll
1005
4/4
✓ Branch 0 taken 5701 times.
✓ Branch 1 taken 79870 times.
✓ Branch 2 taken 142 times.
✓ Branch 3 taken 85429 times.
91272 if (!more_handlers &&
1006
3/4
✓ Branch 1 taken 5559 times.
✓ Branch 2 taken 142 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5559 times.
11402 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1007 timeout_us == 0))
1008 {
1009 142 completed_ops_.push(&task_op_);
1010 142 return 0;
1011 }
1012
1013
3/4
✓ Branch 0 taken 5559 times.
✓ Branch 1 taken 79870 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5559 times.
85429 task_interrupted_ = more_handlers || timeout_us == 0;
1014 85429 task_running_.store(true, std::memory_order_release);
1015
1016
2/2
✓ Branch 0 taken 79870 times.
✓ Branch 1 taken 5559 times.
85429 if (more_handlers)
1017 79870 unlock_and_signal_one(lock);
1018
1019 85429 run_task(lock, ctx);
1020
1021 85429 task_running_.store(false, std::memory_order_relaxed);
1022 85429 completed_ops_.push(&task_op_);
1023 85429 continue;
1024 85429 }
1025
1026 // Handle operation
1027
2/2
✓ Branch 0 taken 264207 times.
✓ Branch 1 taken 2 times.
264209 if (op != nullptr)
1028 {
1029
1/2
✓ Branch 1 taken 264207 times.
✗ Branch 2 not taken.
264207 if (!completed_ops_.empty())
1030
1/1
✓ Branch 1 taken 264207 times.
264207 unlock_and_signal_one(lock);
1031 else
1032 lock.unlock();
1033
1034 264207 work_cleanup on_exit{this, &lock, ctx};
1035
1036
1/1
✓ Branch 1 taken 264207 times.
264207 (*op)();
1037 264207 return 1;
1038 264207 }
1039
1040 // No pending work to wait on, or caller requested non-blocking poll
1041
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1042 timeout_us == 0)
1043 2 return 0;
1044
1045 clear_signal();
1046 if (timeout_us < 0)
1047 wait_for_signal(lock);
1048 else
1049 wait_for_signal_for(lock, timeout_us);
1050 85429 }
1051 }
1052
1053 } // namespace boost::corosio::detail
1054
1055 #endif
1056