include/boost/corosio/native/detail/select/select_socket_service.hpp

75.7% Lines (262/346) 93.1% Functions (27/29)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/native/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 168 explicit select_socket_state(select_scheduler& sched) noexcept
88 168 : sched_(sched)
89 {
90 168 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(
119 tcp_socket::implementation& impl,
120 int family,
121 int type,
122 int protocol) override;
123
124 10941 select_scheduler& scheduler() const noexcept
125 {
126 10941 return state_->sched_;
127 }
128 void post(select_op* op);
129 void work_started() noexcept;
130 void work_finished() noexcept;
131
132 private:
133 std::unique_ptr<select_socket_state> state_;
134 };
135
136 // Backward compatibility alias
137 using select_sockets = select_socket_service;
138
139 inline void
140 98 select_op::canceller::operator()() const noexcept
141 {
142 98 op->cancel();
143 98 }
144
145 inline void
146 select_connect_op::cancel() noexcept
147 {
148 if (socket_impl_)
149 socket_impl_->cancel_single_op(*this);
150 else
151 request_cancel();
152 }
153
154 inline void
155 98 select_read_op::cancel() noexcept
156 {
157 98 if (socket_impl_)
158 98 socket_impl_->cancel_single_op(*this);
159 else
160 request_cancel();
161 98 }
162
163 inline void
164 select_write_op::cancel() noexcept
165 {
166 if (socket_impl_)
167 socket_impl_->cancel_single_op(*this);
168 else
169 request_cancel();
170 }
171
172 inline void
173 3496 select_connect_op::operator()()
174 {
175 3496 stop_cb.reset();
176
177 3496 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178
179 // Cache endpoints on successful connect
180 3496 if (success && socket_impl_)
181 {
182 3493 endpoint local_ep;
183 3493 sockaddr_storage local_storage{};
184 3493 socklen_t local_len = sizeof(local_storage);
185 3493 if (::getsockname(
186 3493 fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 0)
188 3493 local_ep = from_sockaddr(local_storage);
189 3493 static_cast<select_socket*>(socket_impl_)
190 3493 ->set_endpoints(local_ep, target_endpoint);
191 }
192
193 3496 if (ec_out)
194 {
195 3496 if (cancelled.load(std::memory_order_acquire))
196 *ec_out = capy::error::canceled;
197 3496 else if (errn != 0)
198 3 *ec_out = make_err(errn);
199 else
200 3493 *ec_out = {};
201 }
202
203 3496 if (bytes_out)
204 *bytes_out = bytes_transferred;
205
206 // Move to stack before destroying the frame
207 3496 capy::executor_ref saved_ex(ex);
208 3496 std::coroutine_handle<> saved_h(h);
209 3496 impl_ptr.reset();
210 3496 dispatch_coro(saved_ex, saved_h).resume();
211 3496 }
212
213 10506 inline select_socket::select_socket(select_socket_service& svc) noexcept
214 10506 : svc_(svc)
215 {
216 10506 }
217
218 inline std::coroutine_handle<>
219 3496 select_socket::connect(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 endpoint ep,
223 std::stop_token token,
224 std::error_code* ec)
225 {
226 3496 auto& op = conn_;
227 3496 op.reset();
228 3496 op.h = h;
229 3496 op.ex = ex;
230 3496 op.ec_out = ec;
231 3496 op.fd = fd_;
232 3496 op.target_endpoint = ep; // Store target for endpoint caching
233 3496 op.start(token, this);
234
235 3496 sockaddr_storage storage{};
236 socklen_t addrlen =
237 3496 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 3496 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239
240 3496 if (result == 0)
241 {
242 // Sync success — cache endpoints immediately
243 sockaddr_storage local_storage{};
244 socklen_t local_len = sizeof(local_storage);
245 if (::getsockname(
246 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 0)
248 local_endpoint_ = detail::from_sockaddr(local_storage);
249 remote_endpoint_ = ep;
250
251 op.complete(0, 0);
252 op.impl_ptr = shared_from_this();
253 svc_.post(&op);
254 // completion is always posted to scheduler queue, never inline.
255 return std::noop_coroutine();
256 }
257
258 3496 if (errno == EINPROGRESS)
259 {
260 3496 svc_.work_started();
261 3496 op.impl_ptr = shared_from_this();
262
263 // Set registering BEFORE register_fd to close the race window where
264 // reactor sees an event before we set registered. The reactor treats
265 // registering the same as registered when claiming the op.
266 3496 op.registered.store(
267 select_registration_state::registering, std::memory_order_release);
268 3496 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269
270 // Transition to registered. If this fails, reactor or cancel already
271 // claimed the op (state is now unregistered), so we're done. However,
272 // we must still deregister the fd because cancel's deregister_fd may
273 // have run before our register_fd, leaving the fd orphaned.
274 3496 auto expected = select_registration_state::registering;
275 3496 if (!op.registered.compare_exchange_strong(
276 expected, select_registration_state::registered,
277 std::memory_order_acq_rel))
278 {
279 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 // completion is always posted to scheduler queue, never inline.
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285 3496 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered,
289 std::memory_order_acq_rel);
290 if (prev != select_registration_state::unregistered)
291 {
292 svc_.scheduler().deregister_fd(
293 fd_, select_scheduler::event_write);
294 op.impl_ptr = shared_from_this();
295 svc_.post(&op);
296 svc_.work_finished();
297 }
298 }
299 // completion is always posted to scheduler queue, never inline.
300 3496 return std::noop_coroutine();
301 }
302
303 op.complete(errno, 0);
304 op.impl_ptr = shared_from_this();
305 svc_.post(&op);
306 // completion is always posted to scheduler queue, never inline.
307 return std::noop_coroutine();
308 }
309
310 inline std::coroutine_handle<>
311 82460 select_socket::read_some(
312 std::coroutine_handle<> h,
313 capy::executor_ref ex,
314 buffer_param param,
315 std::stop_token token,
316 std::error_code* ec,
317 std::size_t* bytes_out)
318 {
319 82460 auto& op = rd_;
320 82460 op.reset();
321 82460 op.h = h;
322 82460 op.ex = ex;
323 82460 op.ec_out = ec;
324 82460 op.bytes_out = bytes_out;
325 82460 op.fd = fd_;
326 82460 op.start(token, this);
327
328 82460 capy::mutable_buffer bufs[select_read_op::max_buffers];
329 82460 op.iovec_count =
330 82460 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331
332 82460 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 {
334 1 op.empty_buffer_read = true;
335 1 op.complete(0, 0);
336 1 op.impl_ptr = shared_from_this();
337 1 svc_.post(&op);
338 1 return std::noop_coroutine();
339 }
340
341 164918 for (int i = 0; i < op.iovec_count; ++i)
342 {
343 82459 op.iovecs[i].iov_base = bufs[i].data();
344 82459 op.iovecs[i].iov_len = bufs[i].size();
345 }
346
347 82459 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348
349 82459 if (n > 0)
350 {
351 82172 op.complete(0, static_cast<std::size_t>(n));
352 82172 op.impl_ptr = shared_from_this();
353 82172 svc_.post(&op);
354 82172 return std::noop_coroutine();
355 }
356
357 287 if (n == 0)
358 {
359 5 op.complete(0, 0);
360 5 op.impl_ptr = shared_from_this();
361 5 svc_.post(&op);
362 5 return std::noop_coroutine();
363 }
364
365 282 if (errno == EAGAIN || errno == EWOULDBLOCK)
366 {
367 282 svc_.work_started();
368 282 op.impl_ptr = shared_from_this();
369
370 // Set registering BEFORE register_fd to close the race window where
371 // reactor sees an event before we set registered.
372 282 op.registered.store(
373 select_registration_state::registering, std::memory_order_release);
374 282 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375
376 // Transition to registered. If this fails, reactor or cancel already
377 // claimed the op (state is now unregistered), so we're done. However,
378 // we must still deregister the fd because cancel's deregister_fd may
379 // have run before our register_fd, leaving the fd orphaned.
380 282 auto expected = select_registration_state::registering;
381 282 if (!op.registered.compare_exchange_strong(
382 expected, select_registration_state::registered,
383 std::memory_order_acq_rel))
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 return std::noop_coroutine();
387 }
388
389 // If cancelled was set before we registered, handle it now.
390 282 if (op.cancelled.load(std::memory_order_acquire))
391 {
392 auto prev = op.registered.exchange(
393 select_registration_state::unregistered,
394 std::memory_order_acq_rel);
395 if (prev != select_registration_state::unregistered)
396 {
397 svc_.scheduler().deregister_fd(
398 fd_, select_scheduler::event_read);
399 op.impl_ptr = shared_from_this();
400 svc_.post(&op);
401 svc_.work_finished();
402 }
403 }
404 282 return std::noop_coroutine();
405 }
406
407 op.complete(errno, 0);
408 op.impl_ptr = shared_from_this();
409 svc_.post(&op);
410 return std::noop_coroutine();
411 }
412
413 inline std::coroutine_handle<>
414 82297 select_socket::write_some(
415 std::coroutine_handle<> h,
416 capy::executor_ref ex,
417 buffer_param param,
418 std::stop_token token,
419 std::error_code* ec,
420 std::size_t* bytes_out)
421 {
422 82297 auto& op = wr_;
423 82297 op.reset();
424 82297 op.h = h;
425 82297 op.ex = ex;
426 82297 op.ec_out = ec;
427 82297 op.bytes_out = bytes_out;
428 82297 op.fd = fd_;
429 82297 op.start(token, this);
430
431 82297 capy::mutable_buffer bufs[select_write_op::max_buffers];
432 82297 op.iovec_count =
433 82297 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434
435 82297 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 {
437 1 op.complete(0, 0);
438 1 op.impl_ptr = shared_from_this();
439 1 svc_.post(&op);
440 1 return std::noop_coroutine();
441 }
442
443 164592 for (int i = 0; i < op.iovec_count; ++i)
444 {
445 82296 op.iovecs[i].iov_base = bufs[i].data();
446 82296 op.iovecs[i].iov_len = bufs[i].size();
447 }
448
449 82296 msghdr msg{};
450 82296 msg.msg_iov = op.iovecs;
451 82296 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452
453 82296 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454
455 82296 if (n > 0)
456 {
457 82295 op.complete(0, static_cast<std::size_t>(n));
458 82295 op.impl_ptr = shared_from_this();
459 82295 svc_.post(&op);
460 82295 return std::noop_coroutine();
461 }
462
463 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
464 {
465 svc_.work_started();
466 op.impl_ptr = shared_from_this();
467
468 // Set registering BEFORE register_fd to close the race window where
469 // reactor sees an event before we set registered.
470 op.registered.store(
471 select_registration_state::registering, std::memory_order_release);
472 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473
474 // Transition to registered. If this fails, reactor or cancel already
475 // claimed the op (state is now unregistered), so we're done. However,
476 // we must still deregister the fd because cancel's deregister_fd may
477 // have run before our register_fd, leaving the fd orphaned.
478 auto expected = select_registration_state::registering;
479 if (!op.registered.compare_exchange_strong(
480 expected, select_registration_state::registered,
481 std::memory_order_acq_rel))
482 {
483 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 return std::noop_coroutine();
485 }
486
487 // If cancelled was set before we registered, handle it now.
488 if (op.cancelled.load(std::memory_order_acquire))
489 {
490 auto prev = op.registered.exchange(
491 select_registration_state::unregistered,
492 std::memory_order_acq_rel);
493 if (prev != select_registration_state::unregistered)
494 {
495 svc_.scheduler().deregister_fd(
496 fd_, select_scheduler::event_write);
497 op.impl_ptr = shared_from_this();
498 svc_.post(&op);
499 svc_.work_finished();
500 }
501 }
502 return std::noop_coroutine();
503 }
504
505 1 op.complete(errno ? errno : EIO, 0);
506 1 op.impl_ptr = shared_from_this();
507 1 svc_.post(&op);
508 1 return std::noop_coroutine();
509 }
510
511 inline std::error_code
512 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 {
514 int how;
515 3 switch (what)
516 {
517 1 case tcp_socket::shutdown_receive:
518 1 how = SHUT_RD;
519 1 break;
520 1 case tcp_socket::shutdown_send:
521 1 how = SHUT_WR;
522 1 break;
523 1 case tcp_socket::shutdown_both:
524 1 how = SHUT_RDWR;
525 1 break;
526 default:
527 return make_err(EINVAL);
528 }
529 3 if (::shutdown(fd_, how) != 0)
530 return make_err(errno);
531 3 return {};
532 }
533
534 inline std::error_code
535 28 select_socket::set_option(
536 int level, int optname, void const* data, std::size_t size) noexcept
537 {
538 28 if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 0)
540 return make_err(errno);
541 28 return {};
542 }
543
544 inline std::error_code
545 31 select_socket::get_option(
546 int level, int optname, void* data, std::size_t* size) const noexcept
547 {
548 31 socklen_t len = static_cast<socklen_t>(*size);
549 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 return make_err(errno);
551 31 *size = static_cast<std::size_t>(len);
552 31 return {};
553 }
554
555 inline void
556 177 select_socket::cancel() noexcept
557 {
558 177 auto self = weak_from_this().lock();
559 177 if (!self)
560 return;
561
562 531 auto cancel_op = [this, &self](select_op& op, int events) {
563 531 auto prev = op.registered.exchange(
564 select_registration_state::unregistered, std::memory_order_acq_rel);
565 531 op.request_cancel();
566 531 if (prev != select_registration_state::unregistered)
567 {
568 92 svc_.scheduler().deregister_fd(fd_, events);
569 92 op.impl_ptr = self;
570 92 svc_.post(&op);
571 92 svc_.work_finished();
572 }
573 708 };
574
575 177 cancel_op(conn_, select_scheduler::event_write);
576 177 cancel_op(rd_, select_scheduler::event_read);
577 177 cancel_op(wr_, select_scheduler::event_write);
578 177 }
579
580 inline void
581 98 select_socket::cancel_single_op(select_op& op) noexcept
582 {
583 98 auto self = weak_from_this().lock();
584 98 if (!self)
585 return;
586
587 // Called from stop_token callback to cancel a specific pending operation.
588 98 auto prev = op.registered.exchange(
589 select_registration_state::unregistered, std::memory_order_acq_rel);
590 98 op.request_cancel();
591
592 98 if (prev != select_registration_state::unregistered)
593 {
594 // Determine which event type to deregister
595 66 int events = 0;
596 66 if (&op == &conn_ || &op == &wr_)
597 events = select_scheduler::event_write;
598 66 else if (&op == &rd_)
599 66 events = select_scheduler::event_read;
600
601 66 svc_.scheduler().deregister_fd(fd_, events);
602
603 66 op.impl_ptr = self;
604 66 svc_.post(&op);
605 66 svc_.work_finished();
606 }
607 98 }
608
609 inline void
610 31527 select_socket::close_socket() noexcept
611 {
612 31527 auto self = weak_from_this().lock();
613 31527 if (self)
614 {
615 94581 auto cancel_op = [this, &self](select_op& op, int events) {
616 94581 auto prev = op.registered.exchange(
617 select_registration_state::unregistered,
618 std::memory_order_acq_rel);
619 94581 op.request_cancel();
620 94581 if (prev != select_registration_state::unregistered)
621 {
622 1 svc_.scheduler().deregister_fd(fd_, events);
623 1 op.impl_ptr = self;
624 1 svc_.post(&op);
625 1 svc_.work_finished();
626 }
627 126108 };
628
629 31527 cancel_op(conn_, select_scheduler::event_write);
630 31527 cancel_op(rd_, select_scheduler::event_read);
631 31527 cancel_op(wr_, select_scheduler::event_write);
632 }
633
634 31527 if (fd_ >= 0)
635 {
636 7004 svc_.scheduler().deregister_fd(
637 fd_, select_scheduler::event_read | select_scheduler::event_write);
638 7004 ::close(fd_);
639 7004 fd_ = -1;
640 }
641
642 31527 local_endpoint_ = endpoint{};
643 31527 remote_endpoint_ = endpoint{};
644 31527 }
645
646 168 inline select_socket_service::select_socket_service(
647 168 capy::execution_context& ctx)
648 168 : state_(
649 std::make_unique<select_socket_state>(
650 168 ctx.use_service<select_scheduler>()))
651 {
652 168 }
653
654 336 inline select_socket_service::~select_socket_service() {}
655
656 inline void
657 168 select_socket_service::shutdown()
658 {
659 168 std::lock_guard lock(state_->mutex_);
660
661 168 while (auto* impl = state_->socket_list_.pop_front())
662 impl->close_socket();
663
664 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 // drains completed_ops_, calling destroy() on each queued op. Letting
666 // ~state_ release the ptrs (during service destruction, after scheduler
667 // shutdown) keeps every impl alive until all ops have been drained.
668 168 }
669
670 inline io_object::implementation*
671 10506 select_socket_service::construct()
672 {
673 10506 auto impl = std::make_shared<select_socket>(*this);
674 10506 auto* raw = impl.get();
675
676 {
677 10506 std::lock_guard lock(state_->mutex_);
678 10506 state_->socket_list_.push_back(raw);
679 10506 state_->socket_ptrs_.emplace(raw, std::move(impl));
680 10506 }
681
682 10506 return raw;
683 10506 }
684
685 inline void
686 10506 select_socket_service::destroy(io_object::implementation* impl)
687 {
688 10506 auto* select_impl = static_cast<select_socket*>(impl);
689 10506 select_impl->close_socket();
690 10506 std::lock_guard lock(state_->mutex_);
691 10506 state_->socket_list_.remove(select_impl);
692 10506 state_->socket_ptrs_.erase(select_impl);
693 10506 }
694
695 inline std::error_code
696 3511 select_socket_service::open_socket(
697 tcp_socket::implementation& impl, int family, int type, int protocol)
698 {
699 3511 auto* select_impl = static_cast<select_socket*>(&impl);
700 3511 select_impl->close_socket();
701
702 3511 int fd = ::socket(family, type, protocol);
703 3511 if (fd < 0)
704 return make_err(errno);
705
706 3511 if (family == AF_INET6)
707 {
708 5 int one = 1;
709 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 }
711
712 // Set non-blocking and close-on-exec
713 3511 int flags = ::fcntl(fd, F_GETFL, 0);
714 3511 if (flags == -1)
715 {
716 int errn = errno;
717 ::close(fd);
718 return make_err(errn);
719 }
720 3511 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 {
722 int errn = errno;
723 ::close(fd);
724 return make_err(errn);
725 }
726 3511 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 {
728 int errn = errno;
729 ::close(fd);
730 return make_err(errn);
731 }
732
733 // Check fd is within select() limits
734 3511 if (fd >= FD_SETSIZE)
735 {
736 ::close(fd);
737 return make_err(EMFILE); // Too many open files
738 }
739
740 3511 select_impl->fd_ = fd;
741 3511 return {};
742 }
743
744 inline void
745 17510 select_socket_service::close(io_object::handle& h)
746 {
747 17510 static_cast<select_socket*>(h.get())->close_socket();
748 17510 }
749
750 inline void
751 164634 select_socket_service::post(select_op* op)
752 {
753 164634 state_->sched_.post(op);
754 164634 }
755
756 inline void
757 3778 select_socket_service::work_started() noexcept
758 {
759 3778 state_->sched_.work_started();
760 3778 }
761
762 inline void
763 159 select_socket_service::work_finished() noexcept
764 {
765 159 state_->sched_.work_finished();
766 159 }
767
768 } // namespace boost::corosio::detail
769
770 #endif // BOOST_COROSIO_HAS_SELECT
771
772 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
773