libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

67.2% Lines (317/472) 91.7% Functions (33/36) 50.4% Branches (134/266)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 105 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 105 op->cancel();
38 105 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 99 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (socket_impl_)
55 99 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 99 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 4365 epoll_connect_op::
72 operator()()
73 {
74 4365 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 4364 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 4364 times.
✗ Branch 4 not taken.
4365 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 4364 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 4364 times.
✗ Branch 3 not taken.
4365 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 4364 endpoint local_ep;
83 4364 sockaddr_in local_addr{};
84 4364 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 4364 times.
✗ Branch 2 not taken.
4364 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 4364 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 4364 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 4365 times.
✗ Branch 1 not taken.
4365 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4365 times.
4365 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 4364 times.
4365 else if (errn != 0)
96 1 *ec_out = make_err(errn);
97 else
98 4364 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4365 times.
4365 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 4365 capy::executor_ref saved_ex( std::move( ex ) );
106 4365 capy::coro saved_h( std::move( h ) );
107 4365 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 4365 times.
4365 resume_coro(saved_ex, saved_h);
109 4365 }
110
111 8740 epoll_socket_impl::
112 8740 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 8740 : svc_(svc)
114 {
115 8740 }
116
117 8740 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 8740 epoll_socket_impl::
122 release()
123 {
124 8740 close_socket();
125 8740 svc_.destroy_impl(*this);
126 8740 }
127
128 std::coroutine_handle<>
129 4365 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 4365 auto& op = conn_;
138 4365 op.reset();
139 4365 op.h = h;
140 4365 op.ex = ex;
141 4365 op.ec_out = ec;
142 4365 op.fd = fd_;
143 4365 op.target_endpoint = ep; // Store target for endpoint caching
144 4365 op.start(token, this);
145
146 4365 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 4365 times.
4365 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4365 times.
4365 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 4365 times.
✗ Branch 1 not taken.
4365 if (errno == EINPROGRESS)
167 {
168 4365 svc_.work_started();
169
1/1
✓ Branch 1 taken 4365 times.
4365 op.impl_ptr = shared_from_this();
170
171 4365 bool perform_now = false;
172 {
173
1/1
✓ Branch 1 taken 4365 times.
4365 std::lock_guard lock(desc_state_.mutex);
174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4365 times.
4365 if (desc_state_.write_ready)
175 {
176 desc_state_.write_ready = false;
177 perform_now = true;
178 }
179 else
180 {
181 4365 desc_state_.connect_op = &op;
182 }
183 4365 }
184
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4365 times.
4365 if (perform_now)
186 {
187 op.perform_io();
188 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 {
190 op.errn = 0;
191 std::lock_guard lock(desc_state_.mutex);
192 desc_state_.connect_op = &op;
193 }
194 else
195 {
196 svc_.post(&op);
197 svc_.work_finished();
198 }
199 return std::noop_coroutine();
200 }
201
202
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4365 times.
4365 if (op.cancelled.load(std::memory_order_acquire))
203 {
204 epoll_op* claimed = nullptr;
205 {
206 std::lock_guard lock(desc_state_.mutex);
207 if (desc_state_.connect_op == &op)
208 claimed = std::exchange(desc_state_.connect_op, nullptr);
209 }
210 if (claimed)
211 {
212 svc_.post(claimed);
213 svc_.work_finished();
214 }
215 }
216 // completion is always posted to scheduler queue, never inline.
217 4365 return std::noop_coroutine();
218 }
219
220 op.complete(errno, 0);
221 op.impl_ptr = shared_from_this();
222 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 return std::noop_coroutine();
225 }
226
227 void
228 170 epoll_socket_impl::
229 do_read_io()
230 {
231 170 auto& op = rd_;
232
233 ssize_t n;
234 do {
235 170 n = ::readv(fd_, op.iovecs, op.iovec_count);
236
2/4
✓ Branch 0 taken 170 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 170 times.
170 } while (n < 0 && errno == EINTR);
237
238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 170 times.
170 if (n > 0)
239 {
240 {
241 std::lock_guard lock(desc_state_.mutex);
242 desc_state_.read_ready = false;
243 }
244 op.complete(0, static_cast<std::size_t>(n));
245 svc_.post(&op);
246 return;
247 }
248
249
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 170 times.
170 if (n == 0)
250 {
251 {
252 std::lock_guard lock(desc_state_.mutex);
253 desc_state_.read_ready = false;
254 }
255 op.complete(0, 0);
256 svc_.post(&op);
257 return;
258 }
259
260
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
170 if (errno == EAGAIN || errno == EWOULDBLOCK)
261 {
262 170 svc_.work_started();
263
264 170 bool perform_now = false;
265 {
266
1/1
✓ Branch 1 taken 170 times.
170 std::lock_guard lock(desc_state_.mutex);
267
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 92 times.
170 if (desc_state_.read_ready)
268 {
269 78 desc_state_.read_ready = false;
270 78 perform_now = true;
271 }
272 else
273 {
274 92 desc_state_.read_op = &op;
275 }
276 170 }
277
278
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 92 times.
170 if (perform_now)
279 {
280 78 op.perform_io();
281
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
78 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
282 {
283 78 op.errn = 0;
284
1/1
✓ Branch 1 taken 78 times.
78 std::lock_guard lock(desc_state_.mutex);
285 78 desc_state_.read_op = &op;
286 78 }
287 else
288 {
289 svc_.post(&op);
290 svc_.work_finished();
291 }
292 78 return;
293 }
294
295
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 92 times.
92 if (op.cancelled.load(std::memory_order_acquire))
296 {
297 epoll_op* claimed = nullptr;
298 {
299 std::lock_guard lock(desc_state_.mutex);
300 if (desc_state_.read_op == &op)
301 claimed = std::exchange(desc_state_.read_op, nullptr);
302 }
303 if (claimed)
304 {
305 svc_.post(claimed);
306 svc_.work_finished();
307 }
308 }
309 92 return;
310 }
311
312 op.complete(errno, 0);
313 svc_.post(&op);
314 }
315
316 void
317 epoll_socket_impl::
318 do_write_io()
319 {
320 auto& op = wr_;
321
322 msghdr msg{};
323 msg.msg_iov = op.iovecs;
324 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
325
326 ssize_t n;
327 do {
328 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
329 } while (n < 0 && errno == EINTR);
330
331 if (n > 0)
332 {
333 {
334 std::lock_guard lock(desc_state_.mutex);
335 desc_state_.write_ready = false;
336 }
337 op.complete(0, static_cast<std::size_t>(n));
338 svc_.post(&op);
339 return;
340 }
341
342 if (errno == EAGAIN || errno == EWOULDBLOCK)
343 {
344 svc_.work_started();
345
346 bool perform_now = false;
347 {
348 std::lock_guard lock(desc_state_.mutex);
349 if (desc_state_.write_ready)
350 {
351 desc_state_.write_ready = false;
352 perform_now = true;
353 }
354 else
355 {
356 desc_state_.write_op = &op;
357 }
358 }
359
360 if (perform_now)
361 {
362 op.perform_io();
363 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
364 {
365 op.errn = 0;
366 std::lock_guard lock(desc_state_.mutex);
367 desc_state_.write_op = &op;
368 }
369 else
370 {
371 svc_.post(&op);
372 svc_.work_finished();
373 }
374 return;
375 }
376
377 if (op.cancelled.load(std::memory_order_acquire))
378 {
379 epoll_op* claimed = nullptr;
380 {
381 std::lock_guard lock(desc_state_.mutex);
382 if (desc_state_.write_op == &op)
383 claimed = std::exchange(desc_state_.write_op, nullptr);
384 }
385 if (claimed)
386 {
387 svc_.post(claimed);
388 svc_.work_finished();
389 }
390 }
391 return;
392 }
393
394 op.complete(errno ? errno : EIO, 0);
395 svc_.post(&op);
396 }
397
398 std::coroutine_handle<>
399 117498 epoll_socket_impl::
400 read_some(
401 std::coroutine_handle<> h,
402 capy::executor_ref ex,
403 io_buffer_param param,
404 std::stop_token token,
405 std::error_code* ec,
406 std::size_t* bytes_out)
407 {
408 117498 auto& op = rd_;
409 117498 op.reset();
410 117498 op.h = h;
411 117498 op.ex = ex;
412 117498 op.ec_out = ec;
413 117498 op.bytes_out = bytes_out;
414 117498 op.fd = fd_;
415 117498 op.start(token, this);
416
1/1
✓ Branch 1 taken 117498 times.
117498 op.impl_ptr = shared_from_this();
417
418 // Must prepare buffers before initiator runs
419 117498 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
420 117498 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
421
422
6/8
✓ Branch 0 taken 117497 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 117497 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 117497 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 117497 times.
117498 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
423 {
424 1 op.empty_buffer_read = true;
425 1 op.complete(0, 0);
426
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
427 1 return std::noop_coroutine();
428 }
429
430
2/2
✓ Branch 0 taken 117497 times.
✓ Branch 1 taken 117497 times.
234994 for (int i = 0; i < op.iovec_count; ++i)
431 {
432 117497 op.iovecs[i].iov_base = bufs[i].data();
433 117497 op.iovecs[i].iov_len = bufs[i].size();
434 }
435
436 // Speculative read: bypass initiator when data is ready
437 ssize_t n;
438 do {
439
1/1
✓ Branch 1 taken 117497 times.
117497 n = ::readv(fd_, op.iovecs, op.iovec_count);
440
3/4
✓ Branch 0 taken 170 times.
✓ Branch 1 taken 117327 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 170 times.
117497 } while (n < 0 && errno == EINTR);
441
442
2/2
✓ Branch 0 taken 117322 times.
✓ Branch 1 taken 175 times.
117497 if (n > 0)
443 {
444 117322 op.complete(0, static_cast<std::size_t>(n));
445
1/1
✓ Branch 1 taken 117322 times.
117322 svc_.post(&op);
446 117322 return std::noop_coroutine();
447 }
448
449
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 170 times.
175 if (n == 0)
450 {
451 5 op.complete(0, 0);
452
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
453 5 return std::noop_coroutine();
454 }
455
456
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 170 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
170 if (errno != EAGAIN && errno != EWOULDBLOCK)
457 {
458 op.complete(errno, 0);
459 svc_.post(&op);
460 return std::noop_coroutine();
461 }
462
463 // EAGAIN — full async path
464
1/1
✓ Branch 1 taken 170 times.
170 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
465 }
466
467 std::coroutine_handle<>
468 117376 epoll_socket_impl::
469 write_some(
470 std::coroutine_handle<> h,
471 capy::executor_ref ex,
472 io_buffer_param param,
473 std::stop_token token,
474 std::error_code* ec,
475 std::size_t* bytes_out)
476 {
477 117376 auto& op = wr_;
478 117376 op.reset();
479 117376 op.h = h;
480 117376 op.ex = ex;
481 117376 op.ec_out = ec;
482 117376 op.bytes_out = bytes_out;
483 117376 op.fd = fd_;
484 117376 op.start(token, this);
485
1/1
✓ Branch 1 taken 117376 times.
117376 op.impl_ptr = shared_from_this();
486
487 117376 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
488 117376 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
489
490
6/8
✓ Branch 0 taken 117375 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 117375 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 117375 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 117375 times.
117376 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
491 {
492 1 op.complete(0, 0);
493
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
494 1 return std::noop_coroutine();
495 }
496
497
2/2
✓ Branch 0 taken 117375 times.
✓ Branch 1 taken 117375 times.
234750 for (int i = 0; i < op.iovec_count; ++i)
498 {
499 117375 op.iovecs[i].iov_base = bufs[i].data();
500 117375 op.iovecs[i].iov_len = bufs[i].size();
501 }
502
503 // Speculative write: bypass initiator when buffer space is ready
504 117375 msghdr msg{};
505 117375 msg.msg_iov = op.iovecs;
506 117375 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
507
508 ssize_t n;
509 do {
510
1/1
✓ Branch 1 taken 117375 times.
117375 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
511
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 117374 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
117375 } while (n < 0 && errno == EINTR);
512
513
2/2
✓ Branch 0 taken 117374 times.
✓ Branch 1 taken 1 time.
117375 if (n > 0)
514 {
515 117374 op.complete(0, static_cast<std::size_t>(n));
516
1/1
✓ Branch 1 taken 117374 times.
117374 svc_.post(&op);
517 117374 return std::noop_coroutine();
518 }
519
520
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (n == 0)
521 {
522 op.complete(0, 0);
523 svc_.post(&op);
524 return std::noop_coroutine();
525 }
526
527
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
1 if (errno != EAGAIN && errno != EWOULDBLOCK)
528 {
529 1 op.complete(errno, 0);
530
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
531 1 return std::noop_coroutine();
532 }
533
534 // EAGAIN — full async path
535 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
536 }
537
538 std::error_code
539 3 epoll_socket_impl::
540 shutdown(tcp_socket::shutdown_type what) noexcept
541 {
542 int how;
543
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
544 {
545 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
546 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
547 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
548 default:
549 return make_err(EINVAL);
550 }
551
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
552 return make_err(errno);
553 3 return {};
554 }
555
556 std::error_code
557 5 epoll_socket_impl::
558 set_no_delay(bool value) noexcept
559 {
560
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
561
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
562 return make_err(errno);
563 5 return {};
564 }
565
566 bool
567 5 epoll_socket_impl::
568 no_delay(std::error_code& ec) const noexcept
569 {
570 5 int flag = 0;
571 5 socklen_t len = sizeof(flag);
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
573 {
574 ec = make_err(errno);
575 return false;
576 }
577 5 ec = {};
578 5 return flag != 0;
579 }
580
581 std::error_code
582 4 epoll_socket_impl::
583 set_keep_alive(bool value) noexcept
584 {
585
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
586
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
587 return make_err(errno);
588 4 return {};
589 }
590
591 bool
592 4 epoll_socket_impl::
593 keep_alive(std::error_code& ec) const noexcept
594 {
595 4 int flag = 0;
596 4 socklen_t len = sizeof(flag);
597
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
598 {
599 ec = make_err(errno);
600 return false;
601 }
602 4 ec = {};
603 4 return flag != 0;
604 }
605
606 std::error_code
607 1 epoll_socket_impl::
608 set_receive_buffer_size(int size) noexcept
609 {
610
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
611 return make_err(errno);
612 1 return {};
613 }
614
615 int
616 3 epoll_socket_impl::
617 receive_buffer_size(std::error_code& ec) const noexcept
618 {
619 3 int size = 0;
620 3 socklen_t len = sizeof(size);
621
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
622 {
623 ec = make_err(errno);
624 return 0;
625 }
626 3 ec = {};
627 3 return size;
628 }
629
630 std::error_code
631 1 epoll_socket_impl::
632 set_send_buffer_size(int size) noexcept
633 {
634
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
635 return make_err(errno);
636 1 return {};
637 }
638
639 int
640 3 epoll_socket_impl::
641 send_buffer_size(std::error_code& ec) const noexcept
642 {
643 3 int size = 0;
644 3 socklen_t len = sizeof(size);
645
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
646 {
647 ec = make_err(errno);
648 return 0;
649 }
650 3 ec = {};
651 3 return size;
652 }
653
654 std::error_code
655 8 epoll_socket_impl::
656 set_linger(bool enabled, int timeout) noexcept
657 {
658
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
659 1 return make_err(EINVAL);
660 struct ::linger lg;
661
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
662 7 lg.l_linger = timeout;
663
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
664 return make_err(errno);
665 7 return {};
666 }
667
668 tcp_socket::linger_options
669 3 epoll_socket_impl::
670 linger(std::error_code& ec) const noexcept
671 {
672 3 struct ::linger lg{};
673 3 socklen_t len = sizeof(lg);
674
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
675 {
676 ec = make_err(errno);
677 return {};
678 }
679 3 ec = {};
680 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
681 }
682
683 void
684 13212 epoll_socket_impl::
685 cancel() noexcept
686 {
687 13212 std::shared_ptr<epoll_socket_impl> self;
688 try {
689
1/1
✓ Branch 1 taken 13212 times.
13212 self = shared_from_this();
690 } catch (const std::bad_weak_ptr&) {
691 return;
692 }
693
694 13212 conn_.request_cancel();
695 13212 rd_.request_cancel();
696 13212 wr_.request_cancel();
697
698 13212 epoll_op* conn_claimed = nullptr;
699 13212 epoll_op* rd_claimed = nullptr;
700 13212 epoll_op* wr_claimed = nullptr;
701 {
702 13212 std::lock_guard lock(desc_state_.mutex);
703
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13212 times.
13212 if (desc_state_.connect_op == &conn_)
704 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
705
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 13161 times.
13212 if (desc_state_.read_op == &rd_)
706 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
707
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13212 times.
13212 if (desc_state_.write_op == &wr_)
708 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
709 13212 }
710
711
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13212 times.
13212 if (conn_claimed)
712 {
713 conn_.impl_ptr = self;
714 svc_.post(&conn_);
715 svc_.work_finished();
716 }
717
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 13161 times.
13212 if (rd_claimed)
718 {
719 51 rd_.impl_ptr = self;
720 51 svc_.post(&rd_);
721 51 svc_.work_finished();
722 }
723
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13212 times.
13212 if (wr_claimed)
724 {
725 wr_.impl_ptr = self;
726 svc_.post(&wr_);
727 svc_.work_finished();
728 }
729 13212 }
730
731 void
732 99 epoll_socket_impl::
733 cancel_single_op(epoll_op& op) noexcept
734 {
735 99 op.request_cancel();
736
737 99 epoll_op** desc_op_ptr = nullptr;
738
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
739
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
740 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
741
742
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (desc_op_ptr)
743 {
744 99 epoll_op* claimed = nullptr;
745 {
746 99 std::lock_guard lock(desc_state_.mutex);
747
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 32 times.
99 if (*desc_op_ptr == &op)
748 67 claimed = std::exchange(*desc_op_ptr, nullptr);
749 99 }
750
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 32 times.
99 if (claimed)
751 {
752 try {
753
1/1
✓ Branch 1 taken 67 times.
67 op.impl_ptr = shared_from_this();
754 } catch (const std::bad_weak_ptr&) {}
755 67 svc_.post(&op);
756 67 svc_.work_finished();
757 }
758 }
759 99 }
760
761 void
762 13116 epoll_socket_impl::
763 close_socket() noexcept
764 {
765 13116 cancel();
766
767 // Keep impl alive if descriptor_state is queued in the scheduler.
768 // Without this, destroy_impl() drops the last shared_ptr while
769 // the queued descriptor_state node would become dangling.
770
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 13112 times.
13116 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
771 {
772 try {
773
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
774 } catch (std::bad_weak_ptr const&) {}
775 }
776
777
2/2
✓ Branch 0 taken 8740 times.
✓ Branch 1 taken 4376 times.
13116 if (fd_ >= 0)
778 {
779
1/2
✓ Branch 0 taken 8740 times.
✗ Branch 1 not taken.
8740 if (desc_state_.registered_events != 0)
780 8740 svc_.scheduler().deregister_descriptor(fd_);
781 8740 ::close(fd_);
782 8740 fd_ = -1;
783 }
784
785 13116 desc_state_.fd = -1;
786 {
787 13116 std::lock_guard lock(desc_state_.mutex);
788 13116 desc_state_.read_op = nullptr;
789 13116 desc_state_.write_op = nullptr;
790 13116 desc_state_.connect_op = nullptr;
791 13116 desc_state_.read_ready = false;
792 13116 desc_state_.write_ready = false;
793 13116 }
794 13116 desc_state_.registered_events = 0;
795
796 13116 local_endpoint_ = endpoint{};
797 13116 remote_endpoint_ = endpoint{};
798 13116 }
799
800 189 epoll_socket_service::
801 189 epoll_socket_service(capy::execution_context& ctx)
802
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
803 {
804 189 }
805
806 378 epoll_socket_service::
807 189 ~epoll_socket_service()
808 {
809 378 }
810
811 void
812 189 epoll_socket_service::
813 shutdown()
814 {
815
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
816
817
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
818 impl->close_socket();
819
820 189 state_->socket_ptrs_.clear();
821 189 }
822
823 tcp_socket::socket_impl&
824 8740 epoll_socket_service::
825 create_impl()
826 {
827
1/1
✓ Branch 1 taken 8740 times.
8740 auto impl = std::make_shared<epoll_socket_impl>(*this);
828 8740 auto* raw = impl.get();
829
830 {
831
1/1
✓ Branch 2 taken 8740 times.
8740 std::lock_guard lock(state_->mutex_);
832 8740 state_->socket_list_.push_back(raw);
833
1/1
✓ Branch 3 taken 8740 times.
8740 state_->socket_ptrs_.emplace(raw, std::move(impl));
834 8740 }
835
836 8740 return *raw;
837 8740 }
838
839 void
840 8740 epoll_socket_service::
841 destroy_impl(tcp_socket::socket_impl& impl)
842 {
843 8740 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
844
1/1
✓ Branch 2 taken 8740 times.
8740 std::lock_guard lock(state_->mutex_);
845 8740 state_->socket_list_.remove(epoll_impl);
846
1/1
✓ Branch 2 taken 8740 times.
8740 state_->socket_ptrs_.erase(epoll_impl);
847 8740 }
848
849 std::error_code
850 4376 epoll_socket_service::
851 open_socket(tcp_socket::socket_impl& impl)
852 {
853 4376 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
854 4376 epoll_impl->close_socket();
855
856 4376 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4376 times.
4376 if (fd < 0)
858 return make_err(errno);
859
860 4376 epoll_impl->fd_ = fd;
861
862 // Register fd with epoll (edge-triggered mode)
863 4376 epoll_impl->desc_state_.fd = fd;
864 {
865
1/1
✓ Branch 1 taken 4376 times.
4376 std::lock_guard lock(epoll_impl->desc_state_.mutex);
866 4376 epoll_impl->desc_state_.read_op = nullptr;
867 4376 epoll_impl->desc_state_.write_op = nullptr;
868 4376 epoll_impl->desc_state_.connect_op = nullptr;
869 4376 }
870 4376 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
871
872 4376 return {};
873 }
874
875 void
876 234822 epoll_socket_service::
877 post(epoll_op* op)
878 {
879 234822 state_->sched_.post(op);
880 234822 }
881
882 void
883 4535 epoll_socket_service::
884 work_started() noexcept
885 {
886 4535 state_->sched_.work_started();
887 4535 }
888
889 void
890 118 epoll_socket_service::
891 work_finished() noexcept
892 {
893 118 state_->sched_.work_finished();
894 118 }
895
896 } // namespace boost::corosio::detail
897
898 #endif // BOOST_COROSIO_HAS_EPOLL
899