libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.2% Lines (393/490) 89.1% Functions (41/46) 67.5% Branches (204/302)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 171 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 171 : key(k)
108 171 , next(n)
109 171 , private_outstanding_work(0)
110 {
111 171 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 171 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 171 : frame_(ctx, context_stack.get())
125 {
126 171 context_stack.set(&frame_);
127 171 }
128
129 171 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 171 times.
171 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 171 context_stack.set(frame_.next);
134 171 }
135 };
136
137 scheduler_context*
138 363075 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 361426 times.
✓ Branch 2 taken 1649 times.
363075 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 361426 times.
✗ Branch 1 not taken.
361426 if (c->key == self)
142 361426 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 126182 descriptor_state::
150 operator()()
151 {
152 126182 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 126182 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 126182 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 126182 times.
126182 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 126182 op_queue local_ops;
166
167 126182 int err = 0;
168
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 126181 times.
126182 if (ev & EPOLLERR)
169 {
170 1 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
174 1 err = EIO;
175 }
176
177 126182 epoll_op* rd = nullptr;
178 126182 epoll_op* wr = nullptr;
179 126182 epoll_op* cn = nullptr;
180 {
181
1/1
✓ Branch 1 taken 126182 times.
126182 std::lock_guard lock(mutex);
182
2/2
✓ Branch 0 taken 60525 times.
✓ Branch 1 taken 65657 times.
126182 if (ev & EPOLLIN)
183 {
184 60525 rd = std::exchange(read_op, nullptr);
185
2/2
✓ Branch 0 taken 56111 times.
✓ Branch 1 taken 4414 times.
60525 if (!rd)
186 56111 read_ready = true;
187 }
188
2/2
✓ Branch 0 taken 121820 times.
✓ Branch 1 taken 4362 times.
126182 if (ev & EPOLLOUT)
189 {
190 121820 cn = std::exchange(connect_op, nullptr);
191 121820 wr = std::exchange(write_op, nullptr);
192
3/4
✓ Branch 0 taken 117455 times.
✓ Branch 1 taken 4365 times.
✓ Branch 2 taken 117455 times.
✗ Branch 3 not taken.
121820 if (!cn && !wr)
193 117455 write_ready = true;
194 }
195
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 126181 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
126182 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
196 {
197 rd = std::exchange(read_op, nullptr);
198 wr = std::exchange(write_op, nullptr);
199 cn = std::exchange(connect_op, nullptr);
200 }
201 126182 }
202
203 // Non-null after I/O means EAGAIN; re-register under lock below
204
2/2
✓ Branch 0 taken 4414 times.
✓ Branch 1 taken 121768 times.
126182 if (rd)
205 {
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4414 times.
4414 if (err)
207 rd->complete(err, 0);
208 else
209 4414 rd->perform_io();
210
211
2/4
✓ Branch 0 taken 4414 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4414 times.
4414 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
212 {
213 rd->errn = 0;
214 }
215 else
216 {
217 4414 local_ops.push(rd);
218 4414 rd = nullptr;
219 }
220 }
221
222
2/2
✓ Branch 0 taken 4365 times.
✓ Branch 1 taken 121817 times.
126182 if (cn)
223 {
224
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4365 times.
4365 if (err)
225 cn->complete(err, 0);
226 else
227 4365 cn->perform_io();
228 4365 local_ops.push(cn);
229 4365 cn = nullptr;
230 }
231
232
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 126182 times.
126182 if (wr)
233 {
234 if (err)
235 wr->complete(err, 0);
236 else
237 wr->perform_io();
238
239 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
240 {
241 wr->errn = 0;
242 }
243 else
244 {
245 local_ops.push(wr);
246 wr = nullptr;
247 }
248 }
249
250
2/4
✓ Branch 0 taken 126182 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 126182 times.
126182 if (rd || wr)
251 {
252 std::lock_guard lock(mutex);
253 if (rd)
254 read_op = rd;
255 if (wr)
256 write_op = wr;
257 }
258
259 // Execute first handler inline — the scheduler's work_cleanup
260 // accounts for this as the "consumed" work item
261 126182 scheduler_op* first = local_ops.pop();
262
2/2
✓ Branch 0 taken 8779 times.
✓ Branch 1 taken 117403 times.
126182 if (first)
263 {
264
1/1
✓ Branch 1 taken 8779 times.
8779 scheduler_->post_deferred_completions(local_ops);
265
1/1
✓ Branch 1 taken 8779 times.
8779 (*first)();
266 }
267 else
268 {
269 117403 scheduler_->compensating_work_started();
270 }
271 126182 }
272
273 189 epoll_scheduler::
274 epoll_scheduler(
275 capy::execution_context& ctx,
276 189 int)
277 189 : epoll_fd_(-1)
278 189 , event_fd_(-1)
279 189 , timer_fd_(-1)
280 189 , outstanding_work_(0)
281 189 , stopped_(false)
282 189 , shutdown_(false)
283 189 , task_running_{false}
284 189 , task_interrupted_(false)
285 378 , state_(0)
286 {
287 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
288
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
289 detail::throw_system_error(make_err(errno), "epoll_create1");
290
291 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
293 {
294 int errn = errno;
295 ::close(epoll_fd_);
296 detail::throw_system_error(make_err(errn), "eventfd");
297 }
298
299 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
301 {
302 int errn = errno;
303 ::close(event_fd_);
304 ::close(epoll_fd_);
305 detail::throw_system_error(make_err(errn), "timerfd_create");
306 }
307
308 189 epoll_event ev{};
309 189 ev.events = EPOLLIN | EPOLLET;
310 189 ev.data.ptr = nullptr;
311
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
312 {
313 int errn = errno;
314 ::close(timer_fd_);
315 ::close(event_fd_);
316 ::close(epoll_fd_);
317 detail::throw_system_error(make_err(errn), "epoll_ctl");
318 }
319
320 189 epoll_event timer_ev{};
321 189 timer_ev.events = EPOLLIN | EPOLLERR;
322 189 timer_ev.data.ptr = &timer_fd_;
323
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
324 {
325 int errn = errno;
326 ::close(timer_fd_);
327 ::close(event_fd_);
328 ::close(epoll_fd_);
329 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
330 }
331
332
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
333
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
334 timer_service::callback(
335 this,
336 [](void* p) {
337 4557 auto* self = static_cast<epoll_scheduler*>(p);
338 4557 self->timerfd_stale_.store(true, std::memory_order_release);
339
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4557 times.
4557 if (self->task_running_.load(std::memory_order_acquire))
340 self->interrupt_reactor();
341 4557 }));
342
343 // Initialize resolver service
344
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
345
346 // Initialize signal service
347
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
348
349 // Push task sentinel to interleave reactor runs with handler execution
350 189 completed_ops_.push(&task_op_);
351 189 }
352
353 378 epoll_scheduler::
354 189 ~epoll_scheduler()
355 {
356
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
357 189 ::close(timer_fd_);
358
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
359 189 ::close(event_fd_);
360
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
361 189 ::close(epoll_fd_);
362 378 }
363
364 void
365 189 epoll_scheduler::
366 shutdown()
367 {
368 {
369
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
370 189 shutdown_ = true;
371
372
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
373 {
374
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
375 189 continue;
376 lock.unlock();
377 h->destroy();
378 lock.lock();
379 189 }
380
381 189 signal_all(lock);
382 189 }
383
384 189 outstanding_work_.store(0, std::memory_order_release);
385
386
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
387 189 interrupt_reactor();
388 189 }
389
390 void
391 6239 epoll_scheduler::
392 post(capy::coro h) const
393 {
394 struct post_handler final
395 : scheduler_op
396 {
397 capy::coro h_;
398
399 explicit
400 6239 post_handler(capy::coro h)
401 6239 : h_(h)
402 {
403 6239 }
404
405 12478 ~post_handler() = default;
406
407 6239 void operator()() override
408 {
409 6239 auto h = h_;
410
1/2
✓ Branch 0 taken 6239 times.
✗ Branch 1 not taken.
6239 delete this;
411
1/1
✓ Branch 1 taken 6239 times.
6239 h.resume();
412 6239 }
413
414 void destroy() override
415 {
416 delete this;
417 }
418 };
419
420
1/1
✓ Branch 1 taken 6239 times.
6239 auto ph = std::make_unique<post_handler>(h);
421
422 // Fast path: same thread posts to private queue
423 // Only count locally; work_cleanup batches to global counter
424
2/2
✓ Branch 1 taken 4616 times.
✓ Branch 2 taken 1623 times.
6239 if (auto* ctx = find_context(this))
425 {
426 4616 ++ctx->private_outstanding_work;
427 4616 ctx->private_queue.push(ph.release());
428 4616 return;
429 }
430
431 // Slow path: cross-thread post requires mutex
432 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
433
434
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
435 1623 completed_ops_.push(ph.release());
436
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
437 6239 }
438
439 void
440 239433 epoll_scheduler::
441 post(scheduler_op* h) const
442 {
443 // Fast path: same thread posts to private queue
444 // Only count locally; work_cleanup batches to global counter
445
2/2
✓ Branch 1 taken 239407 times.
✓ Branch 2 taken 26 times.
239433 if (auto* ctx = find_context(this))
446 {
447 239407 ++ctx->private_outstanding_work;
448 239407 ctx->private_queue.push(h);
449 239407 return;
450 }
451
452 // Slow path: cross-thread post requires mutex
453 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
454
455
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
456 26 completed_ops_.push(h);
457
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
458 26 }
459
460 void
461 5058 epoll_scheduler::
462 on_work_started() noexcept
463 {
464 5058 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
465 5058 }
466
467 void
468 5026 epoll_scheduler::
469 on_work_finished() noexcept
470 {
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5026 times.
10052 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
472 stop();
473 5026 }
474
475 bool
476 4845 epoll_scheduler::
477 running_in_this_thread() const noexcept
478 {
479
2/2
✓ Branch 1 taken 4635 times.
✓ Branch 2 taken 210 times.
4845 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
480
1/2
✓ Branch 0 taken 4635 times.
✗ Branch 1 not taken.
4635 if (c->key == this)
481 4635 return true;
482 210 return false;
483 }
484
485 void
486 27 epoll_scheduler::
487 stop()
488 {
489
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
490
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 9 times.
27 if (!stopped_)
491 {
492 18 stopped_ = true;
493 18 signal_all(lock);
494
1/1
✓ Branch 1 taken 18 times.
18 interrupt_reactor();
495 }
496 27 }
497
498 bool
499 16 epoll_scheduler::
500 stopped() const noexcept
501 {
502 16 std::unique_lock lock(mutex_);
503 32 return stopped_;
504 16 }
505
506 void
507 49 epoll_scheduler::
508 restart()
509 {
510
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
511 49 stopped_ = false;
512 49 }
513
514 std::size_t
515 175 epoll_scheduler::
516 run()
517 {
518
2/2
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 157 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
519 {
520
1/1
✓ Branch 1 taken 18 times.
18 stop();
521 18 return 0;
522 }
523
524 157 thread_context_guard ctx(this);
525
1/1
✓ Branch 1 taken 157 times.
157 std::unique_lock lock(mutex_);
526
527 157 std::size_t n = 0;
528 for (;;)
529 {
530
3/3
✓ Branch 1 taken 371996 times.
✓ Branch 3 taken 157 times.
✓ Branch 4 taken 371839 times.
371996 if (!do_one(lock, -1, &ctx.frame_))
531 157 break;
532
1/2
✓ Branch 1 taken 371839 times.
✗ Branch 2 not taken.
371839 if (n != (std::numeric_limits<std::size_t>::max)())
533 371839 ++n;
534
2/2
✓ Branch 1 taken 132426 times.
✓ Branch 2 taken 239413 times.
371839 if (!lock.owns_lock())
535
1/1
✓ Branch 1 taken 132426 times.
132426 lock.lock();
536 }
537 157 return n;
538 157 }
539
540 std::size_t
541 2 epoll_scheduler::
542 run_one()
543 {
544
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
545 {
546 stop();
547 return 0;
548 }
549
550 2 thread_context_guard ctx(this);
551
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
552
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
553 2 }
554
555 std::size_t
556 14 epoll_scheduler::
557 wait_one(long usec)
558 {
559
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
560 {
561
1/1
✓ Branch 1 taken 5 times.
5 stop();
562 5 return 0;
563 }
564
565 9 thread_context_guard ctx(this);
566
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
567
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
568 9 }
569
570 std::size_t
571 2 epoll_scheduler::
572 poll()
573 {
574
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
575 {
576
1/1
✓ Branch 1 taken 1 time.
1 stop();
577 1 return 0;
578 }
579
580 1 thread_context_guard ctx(this);
581
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
582
583 1 std::size_t n = 0;
584 for (;;)
585 {
586
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
587 1 break;
588
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
589 2 ++n;
590
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
591
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
592 }
593 1 return n;
594 1 }
595
596 std::size_t
597 4 epoll_scheduler::
598 poll_one()
599 {
600
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
601 {
602
1/1
✓ Branch 1 taken 2 times.
2 stop();
603 2 return 0;
604 }
605
606 2 thread_context_guard ctx(this);
607
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
608
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
609 2 }
610
611 void
612 8802 epoll_scheduler::
613 register_descriptor(int fd, descriptor_state* desc) const
614 {
615 8802 epoll_event ev{};
616 8802 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
617 8802 ev.data.ptr = desc;
618
619
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8802 times.
8802 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
620 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
621
622 8802 desc->registered_events = ev.events;
623 8802 desc->fd = fd;
624 8802 desc->scheduler_ = this;
625
626
1/1
✓ Branch 1 taken 8802 times.
8802 std::lock_guard lock(desc->mutex);
627 8802 desc->read_ready = false;
628 8802 desc->write_ready = false;
629 8802 }
630
631 void
632 8802 epoll_scheduler::
633 deregister_descriptor(int fd) const
634 {
635 8802 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
636 8802 }
637
638 void
639 8906 epoll_scheduler::
640 work_started() const noexcept
641 {
642 8906 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
643 8906 }
644
645 void
646 15194 epoll_scheduler::
647 work_finished() const noexcept
648 {
649
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 15046 times.
30388 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
650 {
651 // Last work item completed - wake all threads so they can exit.
652 // signal_all() wakes threads waiting on the condvar.
653 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
654 // Both are needed because they target different blocking mechanisms.
655 148 std::unique_lock lock(mutex_);
656 148 signal_all(lock);
657
5/6
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 145 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 145 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
658 {
659 3 task_interrupted_ = true;
660 3 lock.unlock();
661 3 interrupt_reactor();
662 }
663 148 }
664 15194 }
665
666 void
667 117403 epoll_scheduler::
668 compensating_work_started() const noexcept
669 {
670 117403 auto* ctx = find_context(this);
671
1/2
✓ Branch 0 taken 117403 times.
✗ Branch 1 not taken.
117403 if (ctx)
672 117403 ++ctx->private_outstanding_work;
673 117403 }
674
675 void
676 epoll_scheduler::
677 drain_thread_queue(op_queue& queue, long count) const
678 {
679 // Note: outstanding_work_ was already incremented when posting
680 std::unique_lock lock(mutex_);
681 completed_ops_.splice(queue);
682 if (count > 0)
683 maybe_unlock_and_signal_one(lock);
684 }
685
686 void
687 8779 epoll_scheduler::
688 post_deferred_completions(op_queue& ops) const
689 {
690
1/2
✓ Branch 1 taken 8779 times.
✗ Branch 2 not taken.
8779 if (ops.empty())
691 8779 return;
692
693 // Fast path: if on scheduler thread, use private queue
694 if (auto* ctx = find_context(this))
695 {
696 ctx->private_queue.splice(ops);
697 return;
698 }
699
700 // Slow path: add to global queue and wake a thread
701 std::unique_lock lock(mutex_);
702 completed_ops_.splice(ops);
703 wake_one_thread_and_unlock(lock);
704 }
705
706 void
707 236 epoll_scheduler::
708 interrupt_reactor() const
709 {
710 // Only write if not already armed to avoid redundant writes
711 236 bool expected = false;
712
2/2
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 12 times.
236 if (eventfd_armed_.compare_exchange_strong(expected, true,
713 std::memory_order_release, std::memory_order_relaxed))
714 {
715 224 std::uint64_t val = 1;
716
1/1
✓ Branch 1 taken 224 times.
224 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
717 }
718 236 }
719
720 void
721 355 epoll_scheduler::
722 signal_all(std::unique_lock<std::mutex>&) const
723 {
724 355 state_ |= 1;
725 355 cond_.notify_all();
726 355 }
727
728 bool
729 1649 epoll_scheduler::
730 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
731 {
732 1649 state_ |= 1;
733
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
734 {
735 lock.unlock();
736 cond_.notify_one();
737 return true;
738 }
739 1649 return false;
740 }
741
742 void
743 504275 epoll_scheduler::
744 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
745 {
746 504275 state_ |= 1;
747 504275 bool have_waiters = state_ > 1;
748 504275 lock.unlock();
749
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 504275 times.
504275 if (have_waiters)
750 cond_.notify_one();
751 504275 }
752
753 void
754 epoll_scheduler::
755 clear_signal() const
756 {
757 state_ &= ~std::size_t(1);
758 }
759
760 void
761 epoll_scheduler::
762 wait_for_signal(std::unique_lock<std::mutex>& lock) const
763 {
764 while ((state_ & 1) == 0)
765 {
766 state_ += 2;
767 cond_.wait(lock);
768 state_ -= 2;
769 }
770 }
771
772 void
773 epoll_scheduler::
774 wait_for_signal_for(
775 std::unique_lock<std::mutex>& lock,
776 long timeout_us) const
777 {
778 if ((state_ & 1) == 0)
779 {
780 state_ += 2;
781 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
782 state_ -= 2;
783 }
784 }
785
786 void
787 1649 epoll_scheduler::
788 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
789 {
790
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
791 return;
792
793
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
794 {
795 26 task_interrupted_ = true;
796 26 lock.unlock();
797 26 interrupt_reactor();
798 }
799 else
800 {
801 1623 lock.unlock();
802 }
803 }
804
805 /** RAII guard for handler execution work accounting.
806
807 Handler consumes 1 work item, may produce N new items via fast-path posts.
808 Net change = N - 1:
809 - If N > 1: add (N-1) to global (more work produced than consumed)
810 - If N == 1: net zero, do nothing
811 - If N < 1: call work_finished() (work consumed, may trigger stop)
812
813 Also drains private queue to global for other threads to process.
814 */
815 struct work_cleanup
816 {
817 epoll_scheduler const* scheduler;
818 std::unique_lock<std::mutex>* lock;
819 scheduler_context* ctx;
820
821 371854 ~work_cleanup()
822 {
823
1/2
✓ Branch 0 taken 371854 times.
✗ Branch 1 not taken.
371854 if (ctx)
824 {
825 371854 long produced = ctx->private_outstanding_work;
826
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 371807 times.
371854 if (produced > 1)
827 47 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
828
2/2
✓ Branch 0 taken 15035 times.
✓ Branch 1 taken 356772 times.
371807 else if (produced < 1)
829 15035 scheduler->work_finished();
830 // produced == 1: net zero, handler consumed what it produced
831 371854 ctx->private_outstanding_work = 0;
832
833
2/2
✓ Branch 1 taken 239416 times.
✓ Branch 2 taken 132438 times.
371854 if (!ctx->private_queue.empty())
834 {
835 239416 lock->lock();
836 239416 scheduler->completed_ops_.splice(ctx->private_queue);
837 }
838 }
839 else
840 {
841 // No thread context - slow-path op was already counted globally
842 scheduler->work_finished();
843 }
844 371854 }
845 };
846
847 /** RAII guard for reactor work accounting.
848
849 Reactor only produces work via timer/signal callbacks posting handlers.
850 Unlike handler execution which consumes 1, the reactor consumes nothing.
851 All produced work must be flushed to global counter.
852 */
853 struct task_cleanup
854 {
855 epoll_scheduler const* scheduler;
856 std::unique_lock<std::mutex>* lock;
857 scheduler_context* ctx;
858
859 141370 ~task_cleanup()
860 141370 {
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 141370 times.
141370 if (!ctx)
862 return;
863
864
2/2
✓ Branch 0 taken 4559 times.
✓ Branch 1 taken 136811 times.
141370 if (ctx->private_outstanding_work > 0)
865 {
866 4559 scheduler->outstanding_work_.fetch_add(
867 4559 ctx->private_outstanding_work, std::memory_order_relaxed);
868 4559 ctx->private_outstanding_work = 0;
869 }
870
871
2/2
✓ Branch 1 taken 4559 times.
✓ Branch 2 taken 136811 times.
141370 if (!ctx->private_queue.empty())
872 {
873
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4559 times.
4559 if (!lock->owns_lock())
874 lock->lock();
875 4559 scheduler->completed_ops_.splice(ctx->private_queue);
876 }
877 141370 }
878 };
879
880 void
881 9112 epoll_scheduler::
882 update_timerfd() const
883 {
884 9112 auto nearest = timer_svc_->nearest_expiry();
885
886 9112 itimerspec ts{};
887 9112 int flags = 0;
888
889
3/3
✓ Branch 2 taken 9112 times.
✓ Branch 4 taken 9072 times.
✓ Branch 5 taken 40 times.
9112 if (nearest == timer_service::time_point::max())
890 {
891 // No timers - disarm by setting to 0 (relative)
892 }
893 else
894 {
895 9072 auto now = std::chrono::steady_clock::now();
896
3/3
✓ Branch 1 taken 9072 times.
✓ Branch 4 taken 128 times.
✓ Branch 5 taken 8944 times.
9072 if (nearest <= now)
897 {
898 // Use 1ns instead of 0 - zero disarms the timerfd
899 128 ts.it_value.tv_nsec = 1;
900 }
901 else
902 {
903 8944 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
904
1/1
✓ Branch 1 taken 8944 times.
17888 nearest - now).count();
905 8944 ts.it_value.tv_sec = nsec / 1000000000;
906 8944 ts.it_value.tv_nsec = nsec % 1000000000;
907 // Ensure non-zero to avoid disarming if duration rounds to 0
908
3/4
✓ Branch 0 taken 8940 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8940 times.
8944 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
909 ts.it_value.tv_nsec = 1;
910 }
911 }
912
913
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9112 times.
9112 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
914 detail::throw_system_error(make_err(errno), "timerfd_settime");
915 9112 }
916
917 void
918 141370 epoll_scheduler::
919 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
920 {
921
2/2
✓ Branch 0 taken 132421 times.
✓ Branch 1 taken 8949 times.
141370 int timeout_ms = task_interrupted_ ? 0 : -1;
922
923
2/2
✓ Branch 1 taken 8949 times.
✓ Branch 2 taken 132421 times.
141370 if (lock.owns_lock())
924
1/1
✓ Branch 1 taken 8949 times.
8949 lock.unlock();
925
926 141370 task_cleanup on_exit{this, &lock, ctx};
927
928 // Flush deferred timerfd programming before blocking
929
2/2
✓ Branch 1 taken 4553 times.
✓ Branch 2 taken 136817 times.
141370 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
930
1/1
✓ Branch 1 taken 4553 times.
4553 update_timerfd();
931
932 // Event loop runs without mutex held
933 epoll_event events[128];
934
1/1
✓ Branch 1 taken 141370 times.
141370 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
935
936
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 141370 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
141370 if (nfds < 0 && errno != EINTR)
937 detail::throw_system_error(make_err(errno), "epoll_wait");
938
939 141370 bool check_timers = false;
940 141370 op_queue local_ops;
941
942 // Process events without holding the mutex
943
2/2
✓ Branch 0 taken 130776 times.
✓ Branch 1 taken 141370 times.
272146 for (int i = 0; i < nfds; ++i)
944 {
945
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 130741 times.
130776 if (events[i].data.ptr == nullptr)
946 {
947 std::uint64_t val;
948
1/1
✓ Branch 1 taken 35 times.
35 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
949 35 eventfd_armed_.store(false, std::memory_order_relaxed);
950 35 continue;
951 35 }
952
953
2/2
✓ Branch 0 taken 4559 times.
✓ Branch 1 taken 126182 times.
130741 if (events[i].data.ptr == &timer_fd_)
954 {
955 std::uint64_t expirations;
956
1/1
✓ Branch 1 taken 4559 times.
4559 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
957 4559 check_timers = true;
958 4559 continue;
959 4559 }
960
961 // Deferred I/O: just set ready events and enqueue descriptor
962 // No per-descriptor mutex locking in reactor hot path!
963 126182 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
964 126182 desc->add_ready_events(events[i].events);
965
966 // Only enqueue if not already enqueued
967 126182 bool expected = false;
968
1/2
✓ Branch 1 taken 126182 times.
✗ Branch 2 not taken.
126182 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
969 std::memory_order_release, std::memory_order_relaxed))
970 {
971 126182 local_ops.push(desc);
972 }
973 }
974
975 // Process timers only when timerfd fires
976
2/2
✓ Branch 0 taken 4559 times.
✓ Branch 1 taken 136811 times.
141370 if (check_timers)
977 {
978
1/1
✓ Branch 1 taken 4559 times.
4559 timer_svc_->process_expired();
979
1/1
✓ Branch 1 taken 4559 times.
4559 update_timerfd();
980 }
981
982
1/1
✓ Branch 1 taken 141370 times.
141370 lock.lock();
983
984
2/2
✓ Branch 1 taken 75902 times.
✓ Branch 2 taken 65468 times.
141370 if (!local_ops.empty())
985 75902 completed_ops_.splice(local_ops);
986 141370 }
987
988 std::size_t
989 372012 epoll_scheduler::
990 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
991 {
992 for (;;)
993 {
994
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 513376 times.
513382 if (stopped_)
995 6 return 0;
996
997 513376 scheduler_op* op = completed_ops_.pop();
998
999 // Handle reactor sentinel - time to poll for I/O
1000
2/2
✓ Branch 0 taken 141519 times.
✓ Branch 1 taken 371857 times.
513376 if (op == &task_op_)
1001 {
1002 141519 bool more_handlers = !completed_ops_.empty();
1003
1004 // Nothing to run the reactor for: no pending work to wait on,
1005 // or caller requested a non-blocking poll
1006
4/4
✓ Branch 0 taken 9098 times.
✓ Branch 1 taken 132421 times.
✓ Branch 2 taken 149 times.
✓ Branch 3 taken 141370 times.
150617 if (!more_handlers &&
1007
3/4
✓ Branch 1 taken 8949 times.
✓ Branch 2 taken 149 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8949 times.
18196 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1008 timeout_us == 0))
1009 {
1010 149 completed_ops_.push(&task_op_);
1011 149 return 0;
1012 }
1013
1014
3/4
✓ Branch 0 taken 8949 times.
✓ Branch 1 taken 132421 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8949 times.
141370 task_interrupted_ = more_handlers || timeout_us == 0;
1015 141370 task_running_.store(true, std::memory_order_release);
1016
1017
2/2
✓ Branch 0 taken 132421 times.
✓ Branch 1 taken 8949 times.
141370 if (more_handlers)
1018 132421 unlock_and_signal_one(lock);
1019
1020 141370 run_task(lock, ctx);
1021
1022 141370 task_running_.store(false, std::memory_order_relaxed);
1023 141370 completed_ops_.push(&task_op_);
1024 141370 continue;
1025 141370 }
1026
1027 // Handle operation
1028
2/2
✓ Branch 0 taken 371854 times.
✓ Branch 1 taken 3 times.
371857 if (op != nullptr)
1029 {
1030
1/2
✓ Branch 1 taken 371854 times.
✗ Branch 2 not taken.
371854 if (!completed_ops_.empty())
1031
1/1
✓ Branch 1 taken 371854 times.
371854 unlock_and_signal_one(lock);
1032 else
1033 lock.unlock();
1034
1035 371854 work_cleanup on_exit{this, &lock, ctx};
1036
1037
1/1
✓ Branch 1 taken 371854 times.
371854 (*op)();
1038 371854 return 1;
1039 371854 }
1040
1041 // No pending work to wait on, or caller requested non-blocking poll
1042
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
6 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0)
1044 3 return 0;
1045
1046 clear_signal();
1047 if (timeout_us < 0)
1048 wait_for_signal(lock);
1049 else
1050 wait_for_signal_for(lock, timeout_us);
1051 141370 }
1052 }
1053
1054 } // namespace boost::corosio::detail
1055
1056 #endif
1057