TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_socket.hpp>
22 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 :
24 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/dispatch_coro.hpp>
26 : #include <boost/corosio/native/detail/make_err.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <boost/capy/buffers.hpp>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <netinet/tcp.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : #include <memory>
40 : #include <mutex>
41 : #include <unordered_map>
42 :
43 : /*
44 : select Socket Implementation
45 : ============================
46 :
47 : This mirrors the epoll_sockets design for behavioral consistency.
48 : Each I/O operation follows the same pattern:
49 : 1. Try the syscall immediately (non-blocking socket)
50 : 2. If it succeeds or fails with a real error, post to completion queue
51 : 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52 :
53 : Cancellation
54 : ------------
55 : See op.hpp for the completion/cancellation race handling via the
56 : `registered` atomic. cancel() must complete pending operations (post
57 : them with cancelled flag) so coroutines waiting on them can resume.
58 : close_socket() calls cancel() first to ensure this.
59 :
60 : Impl Lifetime with shared_ptr
61 : -----------------------------
62 : Socket impls use enable_shared_from_this. The service owns impls via
63 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 : removal. When a user calls close(), we call cancel() which posts pending
65 : ops to the scheduler.
66 :
67 : CRITICAL: The posted ops must keep the impl alive until they complete.
68 : Otherwise the scheduler would process a freed op (use-after-free). The
69 : cancel() method captures shared_from_this() into op.impl_ptr before
70 : posting. When the op completes, impl_ptr is cleared, allowing the impl
71 : to be destroyed if no other references exist.
72 :
73 : Service Ownership
74 : -----------------
75 : select_socket_service owns all socket impls. destroy() removes the
76 : shared_ptr from the map, but the impl may survive if ops still hold
77 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 : in-flight ops will complete and release their refs.
79 : */
80 :
81 : namespace boost::corosio::detail {
82 :
83 : /** State for select socket service. */
84 : class select_socket_state
85 : {
86 : public:
87 HIT 168 : explicit select_socket_state(select_scheduler& sched) noexcept
88 168 : : sched_(sched)
89 : {
90 168 : }
91 :
92 : select_scheduler& sched_;
93 : std::mutex mutex_;
94 : intrusive_list<select_socket> socket_list_;
95 : std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 : socket_ptrs_;
97 : };
98 :
99 : /** select socket service implementation.
100 :
101 : Inherits from socket_service to enable runtime polymorphism.
102 : Uses key_type = socket_service for service lookup.
103 : */
104 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 : {
106 : public:
107 : explicit select_socket_service(capy::execution_context& ctx);
108 : ~select_socket_service() override;
109 :
110 : select_socket_service(select_socket_service const&) = delete;
111 : select_socket_service& operator=(select_socket_service const&) = delete;
112 :
113 : void shutdown() override;
114 :
115 : io_object::implementation* construct() override;
116 : void destroy(io_object::implementation*) override;
117 : void close(io_object::handle&) override;
118 : std::error_code open_socket(
119 : tcp_socket::implementation& impl,
120 : int family,
121 : int type,
122 : int protocol) override;
123 :
124 10941 : select_scheduler& scheduler() const noexcept
125 : {
126 10941 : return state_->sched_;
127 : }
128 : void post(select_op* op);
129 : void work_started() noexcept;
130 : void work_finished() noexcept;
131 :
132 : private:
133 : std::unique_ptr<select_socket_state> state_;
134 : };
135 :
136 : // Backward compatibility alias
137 : using select_sockets = select_socket_service;
138 :
139 : inline void
140 98 : select_op::canceller::operator()() const noexcept
141 : {
142 98 : op->cancel();
143 98 : }
144 :
145 : inline void
146 MIS 0 : select_connect_op::cancel() noexcept
147 : {
148 0 : if (socket_impl_)
149 0 : socket_impl_->cancel_single_op(*this);
150 : else
151 0 : request_cancel();
152 0 : }
153 :
154 : inline void
155 HIT 98 : select_read_op::cancel() noexcept
156 : {
157 98 : if (socket_impl_)
158 98 : socket_impl_->cancel_single_op(*this);
159 : else
160 MIS 0 : request_cancel();
161 HIT 98 : }
162 :
163 : inline void
164 MIS 0 : select_write_op::cancel() noexcept
165 : {
166 0 : if (socket_impl_)
167 0 : socket_impl_->cancel_single_op(*this);
168 : else
169 0 : request_cancel();
170 0 : }
171 :
172 : inline void
173 HIT 3496 : select_connect_op::operator()()
174 : {
175 3496 : stop_cb.reset();
176 :
177 3496 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178 :
179 : // Cache endpoints on successful connect
180 3496 : if (success && socket_impl_)
181 : {
182 3493 : endpoint local_ep;
183 3493 : sockaddr_storage local_storage{};
184 3493 : socklen_t local_len = sizeof(local_storage);
185 3493 : if (::getsockname(
186 3493 : fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 : 0)
188 3493 : local_ep = from_sockaddr(local_storage);
189 3493 : static_cast<select_socket*>(socket_impl_)
190 3493 : ->set_endpoints(local_ep, target_endpoint);
191 : }
192 :
193 3496 : if (ec_out)
194 : {
195 3496 : if (cancelled.load(std::memory_order_acquire))
196 MIS 0 : *ec_out = capy::error::canceled;
197 HIT 3496 : else if (errn != 0)
198 3 : *ec_out = make_err(errn);
199 : else
200 3493 : *ec_out = {};
201 : }
202 :
203 3496 : if (bytes_out)
204 MIS 0 : *bytes_out = bytes_transferred;
205 :
206 : // Move to stack before destroying the frame
207 HIT 3496 : capy::executor_ref saved_ex(ex);
208 3496 : std::coroutine_handle<> saved_h(h);
209 3496 : impl_ptr.reset();
210 3496 : dispatch_coro(saved_ex, saved_h).resume();
211 3496 : }
212 :
213 10506 : inline select_socket::select_socket(select_socket_service& svc) noexcept
214 10506 : : svc_(svc)
215 : {
216 10506 : }
217 :
218 : inline std::coroutine_handle<>
219 3496 : select_socket::connect(
220 : std::coroutine_handle<> h,
221 : capy::executor_ref ex,
222 : endpoint ep,
223 : std::stop_token token,
224 : std::error_code* ec)
225 : {
226 3496 : auto& op = conn_;
227 3496 : op.reset();
228 3496 : op.h = h;
229 3496 : op.ex = ex;
230 3496 : op.ec_out = ec;
231 3496 : op.fd = fd_;
232 3496 : op.target_endpoint = ep; // Store target for endpoint caching
233 3496 : op.start(token, this);
234 :
235 3496 : sockaddr_storage storage{};
236 : socklen_t addrlen =
237 3496 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 3496 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239 :
240 3496 : if (result == 0)
241 : {
242 : // Sync success — cache endpoints immediately
243 MIS 0 : sockaddr_storage local_storage{};
244 0 : socklen_t local_len = sizeof(local_storage);
245 0 : if (::getsockname(
246 0 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 : 0)
248 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
249 0 : remote_endpoint_ = ep;
250 :
251 0 : op.complete(0, 0);
252 0 : op.impl_ptr = shared_from_this();
253 0 : svc_.post(&op);
254 : // completion is always posted to scheduler queue, never inline.
255 0 : return std::noop_coroutine();
256 : }
257 :
258 HIT 3496 : if (errno == EINPROGRESS)
259 : {
260 3496 : svc_.work_started();
261 3496 : op.impl_ptr = shared_from_this();
262 :
263 : // Set registering BEFORE register_fd to close the race window where
264 : // reactor sees an event before we set registered. The reactor treats
265 : // registering the same as registered when claiming the op.
266 3496 : op.registered.store(
267 : select_registration_state::registering, std::memory_order_release);
268 3496 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269 :
270 : // Transition to registered. If this fails, reactor or cancel already
271 : // claimed the op (state is now unregistered), so we're done. However,
272 : // we must still deregister the fd because cancel's deregister_fd may
273 : // have run before our register_fd, leaving the fd orphaned.
274 3496 : auto expected = select_registration_state::registering;
275 3496 : if (!op.registered.compare_exchange_strong(
276 : expected, select_registration_state::registered,
277 : std::memory_order_acq_rel))
278 : {
279 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 : // completion is always posted to scheduler queue, never inline.
281 0 : return std::noop_coroutine();
282 : }
283 :
284 : // If cancelled was set before we registered, handle it now.
285 HIT 3496 : if (op.cancelled.load(std::memory_order_acquire))
286 : {
287 MIS 0 : auto prev = op.registered.exchange(
288 : select_registration_state::unregistered,
289 : std::memory_order_acq_rel);
290 0 : if (prev != select_registration_state::unregistered)
291 : {
292 0 : svc_.scheduler().deregister_fd(
293 : fd_, select_scheduler::event_write);
294 0 : op.impl_ptr = shared_from_this();
295 0 : svc_.post(&op);
296 0 : svc_.work_finished();
297 : }
298 : }
299 : // completion is always posted to scheduler queue, never inline.
300 HIT 3496 : return std::noop_coroutine();
301 : }
302 :
303 MIS 0 : op.complete(errno, 0);
304 0 : op.impl_ptr = shared_from_this();
305 0 : svc_.post(&op);
306 : // completion is always posted to scheduler queue, never inline.
307 0 : return std::noop_coroutine();
308 : }
309 :
310 : inline std::coroutine_handle<>
311 HIT 82460 : select_socket::read_some(
312 : std::coroutine_handle<> h,
313 : capy::executor_ref ex,
314 : buffer_param param,
315 : std::stop_token token,
316 : std::error_code* ec,
317 : std::size_t* bytes_out)
318 : {
319 82460 : auto& op = rd_;
320 82460 : op.reset();
321 82460 : op.h = h;
322 82460 : op.ex = ex;
323 82460 : op.ec_out = ec;
324 82460 : op.bytes_out = bytes_out;
325 82460 : op.fd = fd_;
326 82460 : op.start(token, this);
327 :
328 82460 : capy::mutable_buffer bufs[select_read_op::max_buffers];
329 82460 : op.iovec_count =
330 82460 : static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331 :
332 82460 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 : {
334 1 : op.empty_buffer_read = true;
335 1 : op.complete(0, 0);
336 1 : op.impl_ptr = shared_from_this();
337 1 : svc_.post(&op);
338 1 : return std::noop_coroutine();
339 : }
340 :
341 164918 : for (int i = 0; i < op.iovec_count; ++i)
342 : {
343 82459 : op.iovecs[i].iov_base = bufs[i].data();
344 82459 : op.iovecs[i].iov_len = bufs[i].size();
345 : }
346 :
347 82459 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348 :
349 82459 : if (n > 0)
350 : {
351 82172 : op.complete(0, static_cast<std::size_t>(n));
352 82172 : op.impl_ptr = shared_from_this();
353 82172 : svc_.post(&op);
354 82172 : return std::noop_coroutine();
355 : }
356 :
357 287 : if (n == 0)
358 : {
359 5 : op.complete(0, 0);
360 5 : op.impl_ptr = shared_from_this();
361 5 : svc_.post(&op);
362 5 : return std::noop_coroutine();
363 : }
364 :
365 282 : if (errno == EAGAIN || errno == EWOULDBLOCK)
366 : {
367 282 : svc_.work_started();
368 282 : op.impl_ptr = shared_from_this();
369 :
370 : // Set registering BEFORE register_fd to close the race window where
371 : // reactor sees an event before we set registered.
372 282 : op.registered.store(
373 : select_registration_state::registering, std::memory_order_release);
374 282 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375 :
376 : // Transition to registered. If this fails, reactor or cancel already
377 : // claimed the op (state is now unregistered), so we're done. However,
378 : // we must still deregister the fd because cancel's deregister_fd may
379 : // have run before our register_fd, leaving the fd orphaned.
380 282 : auto expected = select_registration_state::registering;
381 282 : if (!op.registered.compare_exchange_strong(
382 : expected, select_registration_state::registered,
383 : std::memory_order_acq_rel))
384 : {
385 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 0 : return std::noop_coroutine();
387 : }
388 :
389 : // If cancelled was set before we registered, handle it now.
390 HIT 282 : if (op.cancelled.load(std::memory_order_acquire))
391 : {
392 MIS 0 : auto prev = op.registered.exchange(
393 : select_registration_state::unregistered,
394 : std::memory_order_acq_rel);
395 0 : if (prev != select_registration_state::unregistered)
396 : {
397 0 : svc_.scheduler().deregister_fd(
398 : fd_, select_scheduler::event_read);
399 0 : op.impl_ptr = shared_from_this();
400 0 : svc_.post(&op);
401 0 : svc_.work_finished();
402 : }
403 : }
404 HIT 282 : return std::noop_coroutine();
405 : }
406 :
407 MIS 0 : op.complete(errno, 0);
408 0 : op.impl_ptr = shared_from_this();
409 0 : svc_.post(&op);
410 0 : return std::noop_coroutine();
411 : }
412 :
413 : inline std::coroutine_handle<>
414 HIT 82297 : select_socket::write_some(
415 : std::coroutine_handle<> h,
416 : capy::executor_ref ex,
417 : buffer_param param,
418 : std::stop_token token,
419 : std::error_code* ec,
420 : std::size_t* bytes_out)
421 : {
422 82297 : auto& op = wr_;
423 82297 : op.reset();
424 82297 : op.h = h;
425 82297 : op.ex = ex;
426 82297 : op.ec_out = ec;
427 82297 : op.bytes_out = bytes_out;
428 82297 : op.fd = fd_;
429 82297 : op.start(token, this);
430 :
431 82297 : capy::mutable_buffer bufs[select_write_op::max_buffers];
432 82297 : op.iovec_count =
433 82297 : static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434 :
435 82297 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 : {
437 1 : op.complete(0, 0);
438 1 : op.impl_ptr = shared_from_this();
439 1 : svc_.post(&op);
440 1 : return std::noop_coroutine();
441 : }
442 :
443 164592 : for (int i = 0; i < op.iovec_count; ++i)
444 : {
445 82296 : op.iovecs[i].iov_base = bufs[i].data();
446 82296 : op.iovecs[i].iov_len = bufs[i].size();
447 : }
448 :
449 82296 : msghdr msg{};
450 82296 : msg.msg_iov = op.iovecs;
451 82296 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452 :
453 82296 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454 :
455 82296 : if (n > 0)
456 : {
457 82295 : op.complete(0, static_cast<std::size_t>(n));
458 82295 : op.impl_ptr = shared_from_this();
459 82295 : svc_.post(&op);
460 82295 : return std::noop_coroutine();
461 : }
462 :
463 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
464 : {
465 MIS 0 : svc_.work_started();
466 0 : op.impl_ptr = shared_from_this();
467 :
468 : // Set registering BEFORE register_fd to close the race window where
469 : // reactor sees an event before we set registered.
470 0 : op.registered.store(
471 : select_registration_state::registering, std::memory_order_release);
472 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473 :
474 : // Transition to registered. If this fails, reactor or cancel already
475 : // claimed the op (state is now unregistered), so we're done. However,
476 : // we must still deregister the fd because cancel's deregister_fd may
477 : // have run before our register_fd, leaving the fd orphaned.
478 0 : auto expected = select_registration_state::registering;
479 0 : if (!op.registered.compare_exchange_strong(
480 : expected, select_registration_state::registered,
481 : std::memory_order_acq_rel))
482 : {
483 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 0 : return std::noop_coroutine();
485 : }
486 :
487 : // If cancelled was set before we registered, handle it now.
488 0 : if (op.cancelled.load(std::memory_order_acquire))
489 : {
490 0 : auto prev = op.registered.exchange(
491 : select_registration_state::unregistered,
492 : std::memory_order_acq_rel);
493 0 : if (prev != select_registration_state::unregistered)
494 : {
495 0 : svc_.scheduler().deregister_fd(
496 : fd_, select_scheduler::event_write);
497 0 : op.impl_ptr = shared_from_this();
498 0 : svc_.post(&op);
499 0 : svc_.work_finished();
500 : }
501 : }
502 0 : return std::noop_coroutine();
503 : }
504 :
505 HIT 1 : op.complete(errno ? errno : EIO, 0);
506 1 : op.impl_ptr = shared_from_this();
507 1 : svc_.post(&op);
508 1 : return std::noop_coroutine();
509 : }
510 :
511 : inline std::error_code
512 3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 : {
514 : int how;
515 3 : switch (what)
516 : {
517 1 : case tcp_socket::shutdown_receive:
518 1 : how = SHUT_RD;
519 1 : break;
520 1 : case tcp_socket::shutdown_send:
521 1 : how = SHUT_WR;
522 1 : break;
523 1 : case tcp_socket::shutdown_both:
524 1 : how = SHUT_RDWR;
525 1 : break;
526 MIS 0 : default:
527 0 : return make_err(EINVAL);
528 : }
529 HIT 3 : if (::shutdown(fd_, how) != 0)
530 MIS 0 : return make_err(errno);
531 HIT 3 : return {};
532 : }
533 :
534 : inline std::error_code
535 28 : select_socket::set_option(
536 : int level, int optname, void const* data, std::size_t size) noexcept
537 : {
538 28 : if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 : 0)
540 MIS 0 : return make_err(errno);
541 HIT 28 : return {};
542 : }
543 :
544 : inline std::error_code
545 31 : select_socket::get_option(
546 : int level, int optname, void* data, std::size_t* size) const noexcept
547 : {
548 31 : socklen_t len = static_cast<socklen_t>(*size);
549 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 MIS 0 : return make_err(errno);
551 HIT 31 : *size = static_cast<std::size_t>(len);
552 31 : return {};
553 : }
554 :
555 : inline void
556 177 : select_socket::cancel() noexcept
557 : {
558 177 : auto self = weak_from_this().lock();
559 177 : if (!self)
560 MIS 0 : return;
561 :
562 HIT 531 : auto cancel_op = [this, &self](select_op& op, int events) {
563 531 : auto prev = op.registered.exchange(
564 : select_registration_state::unregistered, std::memory_order_acq_rel);
565 531 : op.request_cancel();
566 531 : if (prev != select_registration_state::unregistered)
567 : {
568 92 : svc_.scheduler().deregister_fd(fd_, events);
569 92 : op.impl_ptr = self;
570 92 : svc_.post(&op);
571 92 : svc_.work_finished();
572 : }
573 708 : };
574 :
575 177 : cancel_op(conn_, select_scheduler::event_write);
576 177 : cancel_op(rd_, select_scheduler::event_read);
577 177 : cancel_op(wr_, select_scheduler::event_write);
578 177 : }
579 :
580 : inline void
581 98 : select_socket::cancel_single_op(select_op& op) noexcept
582 : {
583 98 : auto self = weak_from_this().lock();
584 98 : if (!self)
585 MIS 0 : return;
586 :
587 : // Called from stop_token callback to cancel a specific pending operation.
588 HIT 98 : auto prev = op.registered.exchange(
589 : select_registration_state::unregistered, std::memory_order_acq_rel);
590 98 : op.request_cancel();
591 :
592 98 : if (prev != select_registration_state::unregistered)
593 : {
594 : // Determine which event type to deregister
595 66 : int events = 0;
596 66 : if (&op == &conn_ || &op == &wr_)
597 MIS 0 : events = select_scheduler::event_write;
598 HIT 66 : else if (&op == &rd_)
599 66 : events = select_scheduler::event_read;
600 :
601 66 : svc_.scheduler().deregister_fd(fd_, events);
602 :
603 66 : op.impl_ptr = self;
604 66 : svc_.post(&op);
605 66 : svc_.work_finished();
606 : }
607 98 : }
608 :
609 : inline void
610 31527 : select_socket::close_socket() noexcept
611 : {
612 31527 : auto self = weak_from_this().lock();
613 31527 : if (self)
614 : {
615 94581 : auto cancel_op = [this, &self](select_op& op, int events) {
616 94581 : auto prev = op.registered.exchange(
617 : select_registration_state::unregistered,
618 : std::memory_order_acq_rel);
619 94581 : op.request_cancel();
620 94581 : if (prev != select_registration_state::unregistered)
621 : {
622 1 : svc_.scheduler().deregister_fd(fd_, events);
623 1 : op.impl_ptr = self;
624 1 : svc_.post(&op);
625 1 : svc_.work_finished();
626 : }
627 126108 : };
628 :
629 31527 : cancel_op(conn_, select_scheduler::event_write);
630 31527 : cancel_op(rd_, select_scheduler::event_read);
631 31527 : cancel_op(wr_, select_scheduler::event_write);
632 : }
633 :
634 31527 : if (fd_ >= 0)
635 : {
636 7004 : svc_.scheduler().deregister_fd(
637 : fd_, select_scheduler::event_read | select_scheduler::event_write);
638 7004 : ::close(fd_);
639 7004 : fd_ = -1;
640 : }
641 :
642 31527 : local_endpoint_ = endpoint{};
643 31527 : remote_endpoint_ = endpoint{};
644 31527 : }
645 :
646 168 : inline select_socket_service::select_socket_service(
647 168 : capy::execution_context& ctx)
648 168 : : state_(
649 : std::make_unique<select_socket_state>(
650 168 : ctx.use_service<select_scheduler>()))
651 : {
652 168 : }
653 :
654 336 : inline select_socket_service::~select_socket_service() {}
655 :
656 : inline void
657 168 : select_socket_service::shutdown()
658 : {
659 168 : std::lock_guard lock(state_->mutex_);
660 :
661 168 : while (auto* impl = state_->socket_list_.pop_front())
662 MIS 0 : impl->close_socket();
663 :
664 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 : // drains completed_ops_, calling destroy() on each queued op. Letting
666 : // ~state_ release the ptrs (during service destruction, after scheduler
667 : // shutdown) keeps every impl alive until all ops have been drained.
668 HIT 168 : }
669 :
670 : inline io_object::implementation*
671 10506 : select_socket_service::construct()
672 : {
673 10506 : auto impl = std::make_shared<select_socket>(*this);
674 10506 : auto* raw = impl.get();
675 :
676 : {
677 10506 : std::lock_guard lock(state_->mutex_);
678 10506 : state_->socket_list_.push_back(raw);
679 10506 : state_->socket_ptrs_.emplace(raw, std::move(impl));
680 10506 : }
681 :
682 10506 : return raw;
683 10506 : }
684 :
685 : inline void
686 10506 : select_socket_service::destroy(io_object::implementation* impl)
687 : {
688 10506 : auto* select_impl = static_cast<select_socket*>(impl);
689 10506 : select_impl->close_socket();
690 10506 : std::lock_guard lock(state_->mutex_);
691 10506 : state_->socket_list_.remove(select_impl);
692 10506 : state_->socket_ptrs_.erase(select_impl);
693 10506 : }
694 :
695 : inline std::error_code
696 3511 : select_socket_service::open_socket(
697 : tcp_socket::implementation& impl, int family, int type, int protocol)
698 : {
699 3511 : auto* select_impl = static_cast<select_socket*>(&impl);
700 3511 : select_impl->close_socket();
701 :
702 3511 : int fd = ::socket(family, type, protocol);
703 3511 : if (fd < 0)
704 MIS 0 : return make_err(errno);
705 :
706 HIT 3511 : if (family == AF_INET6)
707 : {
708 5 : int one = 1;
709 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 : }
711 :
712 : // Set non-blocking and close-on-exec
713 3511 : int flags = ::fcntl(fd, F_GETFL, 0);
714 3511 : if (flags == -1)
715 : {
716 MIS 0 : int errn = errno;
717 0 : ::close(fd);
718 0 : return make_err(errn);
719 : }
720 HIT 3511 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 : {
722 MIS 0 : int errn = errno;
723 0 : ::close(fd);
724 0 : return make_err(errn);
725 : }
726 HIT 3511 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 : {
728 MIS 0 : int errn = errno;
729 0 : ::close(fd);
730 0 : return make_err(errn);
731 : }
732 :
733 : // Check fd is within select() limits
734 HIT 3511 : if (fd >= FD_SETSIZE)
735 : {
736 MIS 0 : ::close(fd);
737 0 : return make_err(EMFILE); // Too many open files
738 : }
739 :
740 HIT 3511 : select_impl->fd_ = fd;
741 3511 : return {};
742 : }
743 :
744 : inline void
745 17510 : select_socket_service::close(io_object::handle& h)
746 : {
747 17510 : static_cast<select_socket*>(h.get())->close_socket();
748 17510 : }
749 :
750 : inline void
751 164634 : select_socket_service::post(select_op* op)
752 : {
753 164634 : state_->sched_.post(op);
754 164634 : }
755 :
756 : inline void
757 3778 : select_socket_service::work_started() noexcept
758 : {
759 3778 : state_->sched_.work_started();
760 3778 : }
761 :
762 : inline void
763 159 : select_socket_service::work_finished() noexcept
764 : {
765 159 : state_->sched_.work_finished();
766 159 : }
767 :
768 : } // namespace boost::corosio::detail
769 :
770 : #endif // BOOST_COROSIO_HAS_SELECT
771 :
772 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
|