1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118  
    std::error_code open_socket(
118  
    std::error_code open_socket(
119  
        tcp_socket::implementation& impl,
119  
        tcp_socket::implementation& impl,
120  
        int family,
120  
        int family,
121  
        int type,
121  
        int type,
122  
        int protocol) override;
122  
        int protocol) override;
123  

123  

124  
    select_scheduler& scheduler() const noexcept
124  
    select_scheduler& scheduler() const noexcept
125  
    {
125  
    {
126  
        return state_->sched_;
126  
        return state_->sched_;
127  
    }
127  
    }
128  
    void post(select_op* op);
128  
    void post(select_op* op);
129  
    void work_started() noexcept;
129  
    void work_started() noexcept;
130  
    void work_finished() noexcept;
130  
    void work_finished() noexcept;
131  

131  

132  
private:
132  
private:
133  
    std::unique_ptr<select_socket_state> state_;
133  
    std::unique_ptr<select_socket_state> state_;
134  
};
134  
};
135  

135  

136  
// Backward compatibility alias
136  
// Backward compatibility alias
137  
using select_sockets = select_socket_service;
137  
using select_sockets = select_socket_service;
138  

138  

139  
inline void
139  
inline void
140  
select_op::canceller::operator()() const noexcept
140  
select_op::canceller::operator()() const noexcept
141  
{
141  
{
142  
    op->cancel();
142  
    op->cancel();
143  
}
143  
}
144  

144  

145  
inline void
145  
inline void
146  
select_connect_op::cancel() noexcept
146  
select_connect_op::cancel() noexcept
147  
{
147  
{
148  
    if (socket_impl_)
148  
    if (socket_impl_)
149  
        socket_impl_->cancel_single_op(*this);
149  
        socket_impl_->cancel_single_op(*this);
150  
    else
150  
    else
151  
        request_cancel();
151  
        request_cancel();
152  
}
152  
}
153  

153  

154  
inline void
154  
inline void
155  
select_read_op::cancel() noexcept
155  
select_read_op::cancel() noexcept
156  
{
156  
{
157  
    if (socket_impl_)
157  
    if (socket_impl_)
158  
        socket_impl_->cancel_single_op(*this);
158  
        socket_impl_->cancel_single_op(*this);
159  
    else
159  
    else
160  
        request_cancel();
160  
        request_cancel();
161  
}
161  
}
162  

162  

163  
inline void
163  
inline void
164  
select_write_op::cancel() noexcept
164  
select_write_op::cancel() noexcept
165  
{
165  
{
166  
    if (socket_impl_)
166  
    if (socket_impl_)
167  
        socket_impl_->cancel_single_op(*this);
167  
        socket_impl_->cancel_single_op(*this);
168  
    else
168  
    else
169  
        request_cancel();
169  
        request_cancel();
170  
}
170  
}
171  

171  

172  
inline void
172  
inline void
173  
select_connect_op::operator()()
173  
select_connect_op::operator()()
174  
{
174  
{
175  
    stop_cb.reset();
175  
    stop_cb.reset();
176  

176  

177  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
177  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178  

178  

179  
    // Cache endpoints on successful connect
179  
    // Cache endpoints on successful connect
180  
    if (success && socket_impl_)
180  
    if (success && socket_impl_)
181  
    {
181  
    {
182  
        endpoint local_ep;
182  
        endpoint local_ep;
183  
        sockaddr_storage local_storage{};
183  
        sockaddr_storage local_storage{};
184  
        socklen_t local_len = sizeof(local_storage);
184  
        socklen_t local_len = sizeof(local_storage);
185  
        if (::getsockname(
185  
        if (::getsockname(
186  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
186  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187  
            0)
187  
            0)
188  
            local_ep = from_sockaddr(local_storage);
188  
            local_ep = from_sockaddr(local_storage);
189  
        static_cast<select_socket*>(socket_impl_)
189  
        static_cast<select_socket*>(socket_impl_)
190  
            ->set_endpoints(local_ep, target_endpoint);
190  
            ->set_endpoints(local_ep, target_endpoint);
191  
    }
191  
    }
192  

192  

193  
    if (ec_out)
193  
    if (ec_out)
194  
    {
194  
    {
195  
        if (cancelled.load(std::memory_order_acquire))
195  
        if (cancelled.load(std::memory_order_acquire))
196  
            *ec_out = capy::error::canceled;
196  
            *ec_out = capy::error::canceled;
197  
        else if (errn != 0)
197  
        else if (errn != 0)
198  
            *ec_out = make_err(errn);
198  
            *ec_out = make_err(errn);
199  
        else
199  
        else
200  
            *ec_out = {};
200  
            *ec_out = {};
201  
    }
201  
    }
202  

202  

203  
    if (bytes_out)
203  
    if (bytes_out)
204  
        *bytes_out = bytes_transferred;
204  
        *bytes_out = bytes_transferred;
205  

205  

206  
    // Move to stack before destroying the frame
206  
    // Move to stack before destroying the frame
207  
    capy::executor_ref saved_ex(ex);
207  
    capy::executor_ref saved_ex(ex);
208  
    std::coroutine_handle<> saved_h(h);
208  
    std::coroutine_handle<> saved_h(h);
209  
    impl_ptr.reset();
209  
    impl_ptr.reset();
210  
    dispatch_coro(saved_ex, saved_h).resume();
210  
    dispatch_coro(saved_ex, saved_h).resume();
211  
}
211  
}
212  

212  

213  
inline select_socket::select_socket(select_socket_service& svc) noexcept
213  
inline select_socket::select_socket(select_socket_service& svc) noexcept
214  
    : svc_(svc)
214  
    : svc_(svc)
215  
{
215  
{
216  
}
216  
}
217  

217  

218  
inline std::coroutine_handle<>
218  
inline std::coroutine_handle<>
219  
select_socket::connect(
219  
select_socket::connect(
220  
    std::coroutine_handle<> h,
220  
    std::coroutine_handle<> h,
221  
    capy::executor_ref ex,
221  
    capy::executor_ref ex,
222  
    endpoint ep,
222  
    endpoint ep,
223  
    std::stop_token token,
223  
    std::stop_token token,
224  
    std::error_code* ec)
224  
    std::error_code* ec)
225  
{
225  
{
226  
    auto& op = conn_;
226  
    auto& op = conn_;
227  
    op.reset();
227  
    op.reset();
228  
    op.h               = h;
228  
    op.h               = h;
229  
    op.ex              = ex;
229  
    op.ex              = ex;
230  
    op.ec_out          = ec;
230  
    op.ec_out          = ec;
231  
    op.fd              = fd_;
231  
    op.fd              = fd_;
232  
    op.target_endpoint = ep; // Store target for endpoint caching
232  
    op.target_endpoint = ep; // Store target for endpoint caching
233  
    op.start(token, this);
233  
    op.start(token, this);
234  

234  

235  
    sockaddr_storage storage{};
235  
    sockaddr_storage storage{};
236  
    socklen_t addrlen =
236  
    socklen_t addrlen =
237  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
237  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239  

239  

240  
    if (result == 0)
240  
    if (result == 0)
241  
    {
241  
    {
242  
        // Sync success — cache endpoints immediately
242  
        // Sync success — cache endpoints immediately
243  
        sockaddr_storage local_storage{};
243  
        sockaddr_storage local_storage{};
244  
        socklen_t local_len = sizeof(local_storage);
244  
        socklen_t local_len = sizeof(local_storage);
245  
        if (::getsockname(
245  
        if (::getsockname(
246  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
246  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247  
            0)
247  
            0)
248  
            local_endpoint_ = detail::from_sockaddr(local_storage);
248  
            local_endpoint_ = detail::from_sockaddr(local_storage);
249  
        remote_endpoint_ = ep;
249  
        remote_endpoint_ = ep;
250  

250  

251  
        op.complete(0, 0);
251  
        op.complete(0, 0);
252  
        op.impl_ptr = shared_from_this();
252  
        op.impl_ptr = shared_from_this();
253  
        svc_.post(&op);
253  
        svc_.post(&op);
254  
        // completion is always posted to scheduler queue, never inline.
254  
        // completion is always posted to scheduler queue, never inline.
255  
        return std::noop_coroutine();
255  
        return std::noop_coroutine();
256  
    }
256  
    }
257  

257  

258  
    if (errno == EINPROGRESS)
258  
    if (errno == EINPROGRESS)
259  
    {
259  
    {
260  
        svc_.work_started();
260  
        svc_.work_started();
261  
        op.impl_ptr = shared_from_this();
261  
        op.impl_ptr = shared_from_this();
262  

262  

263  
        // Set registering BEFORE register_fd to close the race window where
263  
        // Set registering BEFORE register_fd to close the race window where
264  
        // reactor sees an event before we set registered. The reactor treats
264  
        // reactor sees an event before we set registered. The reactor treats
265  
        // registering the same as registered when claiming the op.
265  
        // registering the same as registered when claiming the op.
266  
        op.registered.store(
266  
        op.registered.store(
267  
            select_registration_state::registering, std::memory_order_release);
267  
            select_registration_state::registering, std::memory_order_release);
268  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269  

269  

270  
        // Transition to registered. If this fails, reactor or cancel already
270  
        // Transition to registered. If this fails, reactor or cancel already
271  
        // claimed the op (state is now unregistered), so we're done. However,
271  
        // claimed the op (state is now unregistered), so we're done. However,
272  
        // we must still deregister the fd because cancel's deregister_fd may
272  
        // we must still deregister the fd because cancel's deregister_fd may
273  
        // have run before our register_fd, leaving the fd orphaned.
273  
        // have run before our register_fd, leaving the fd orphaned.
274  
        auto expected = select_registration_state::registering;
274  
        auto expected = select_registration_state::registering;
275  
        if (!op.registered.compare_exchange_strong(
275  
        if (!op.registered.compare_exchange_strong(
276  
                expected, select_registration_state::registered,
276  
                expected, select_registration_state::registered,
277  
                std::memory_order_acq_rel))
277  
                std::memory_order_acq_rel))
278  
        {
278  
        {
279  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280  
            // completion is always posted to scheduler queue, never inline.
280  
            // completion is always posted to scheduler queue, never inline.
281  
            return std::noop_coroutine();
281  
            return std::noop_coroutine();
282  
        }
282  
        }
283  

283  

284  
        // If cancelled was set before we registered, handle it now.
284  
        // If cancelled was set before we registered, handle it now.
285  
        if (op.cancelled.load(std::memory_order_acquire))
285  
        if (op.cancelled.load(std::memory_order_acquire))
286  
        {
286  
        {
287  
            auto prev = op.registered.exchange(
287  
            auto prev = op.registered.exchange(
288  
                select_registration_state::unregistered,
288  
                select_registration_state::unregistered,
289  
                std::memory_order_acq_rel);
289  
                std::memory_order_acq_rel);
290  
            if (prev != select_registration_state::unregistered)
290  
            if (prev != select_registration_state::unregistered)
291  
            {
291  
            {
292  
                svc_.scheduler().deregister_fd(
292  
                svc_.scheduler().deregister_fd(
293  
                    fd_, select_scheduler::event_write);
293  
                    fd_, select_scheduler::event_write);
294  
                op.impl_ptr = shared_from_this();
294  
                op.impl_ptr = shared_from_this();
295  
                svc_.post(&op);
295  
                svc_.post(&op);
296  
                svc_.work_finished();
296  
                svc_.work_finished();
297  
            }
297  
            }
298  
        }
298  
        }
299  
        // completion is always posted to scheduler queue, never inline.
299  
        // completion is always posted to scheduler queue, never inline.
300  
        return std::noop_coroutine();
300  
        return std::noop_coroutine();
301  
    }
301  
    }
302  

302  

303  
    op.complete(errno, 0);
303  
    op.complete(errno, 0);
304  
    op.impl_ptr = shared_from_this();
304  
    op.impl_ptr = shared_from_this();
305  
    svc_.post(&op);
305  
    svc_.post(&op);
306  
    // completion is always posted to scheduler queue, never inline.
306  
    // completion is always posted to scheduler queue, never inline.
307  
    return std::noop_coroutine();
307  
    return std::noop_coroutine();
308  
}
308  
}
309  

309  

310  
inline std::coroutine_handle<>
310  
inline std::coroutine_handle<>
311  
select_socket::read_some(
311  
select_socket::read_some(
312  
    std::coroutine_handle<> h,
312  
    std::coroutine_handle<> h,
313  
    capy::executor_ref ex,
313  
    capy::executor_ref ex,
314  
    buffer_param param,
314  
    buffer_param param,
315  
    std::stop_token token,
315  
    std::stop_token token,
316  
    std::error_code* ec,
316  
    std::error_code* ec,
317  
    std::size_t* bytes_out)
317  
    std::size_t* bytes_out)
318  
{
318  
{
319  
    auto& op = rd_;
319  
    auto& op = rd_;
320  
    op.reset();
320  
    op.reset();
321  
    op.h         = h;
321  
    op.h         = h;
322  
    op.ex        = ex;
322  
    op.ex        = ex;
323  
    op.ec_out    = ec;
323  
    op.ec_out    = ec;
324  
    op.bytes_out = bytes_out;
324  
    op.bytes_out = bytes_out;
325  
    op.fd        = fd_;
325  
    op.fd        = fd_;
326  
    op.start(token, this);
326  
    op.start(token, this);
327  

327  

328  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
328  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
329  
    op.iovec_count =
329  
    op.iovec_count =
330  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331  

331  

332  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333  
    {
333  
    {
334  
        op.empty_buffer_read = true;
334  
        op.empty_buffer_read = true;
335  
        op.complete(0, 0);
335  
        op.complete(0, 0);
336  
        op.impl_ptr = shared_from_this();
336  
        op.impl_ptr = shared_from_this();
337  
        svc_.post(&op);
337  
        svc_.post(&op);
338  
        return std::noop_coroutine();
338  
        return std::noop_coroutine();
339  
    }
339  
    }
340  

340  

341  
    for (int i = 0; i < op.iovec_count; ++i)
341  
    for (int i = 0; i < op.iovec_count; ++i)
342  
    {
342  
    {
343  
        op.iovecs[i].iov_base = bufs[i].data();
343  
        op.iovecs[i].iov_base = bufs[i].data();
344  
        op.iovecs[i].iov_len  = bufs[i].size();
344  
        op.iovecs[i].iov_len  = bufs[i].size();
345  
    }
345  
    }
346  

346  

347  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348  

348  

349  
    if (n > 0)
349  
    if (n > 0)
350  
    {
350  
    {
351  
        op.complete(0, static_cast<std::size_t>(n));
351  
        op.complete(0, static_cast<std::size_t>(n));
352  
        op.impl_ptr = shared_from_this();
352  
        op.impl_ptr = shared_from_this();
353  
        svc_.post(&op);
353  
        svc_.post(&op);
354  
        return std::noop_coroutine();
354  
        return std::noop_coroutine();
355  
    }
355  
    }
356  

356  

357  
    if (n == 0)
357  
    if (n == 0)
358  
    {
358  
    {
359  
        op.complete(0, 0);
359  
        op.complete(0, 0);
360  
        op.impl_ptr = shared_from_this();
360  
        op.impl_ptr = shared_from_this();
361  
        svc_.post(&op);
361  
        svc_.post(&op);
362  
        return std::noop_coroutine();
362  
        return std::noop_coroutine();
363  
    }
363  
    }
364  

364  

365  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
365  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
366  
    {
366  
    {
367  
        svc_.work_started();
367  
        svc_.work_started();
368  
        op.impl_ptr = shared_from_this();
368  
        op.impl_ptr = shared_from_this();
369  

369  

370  
        // Set registering BEFORE register_fd to close the race window where
370  
        // Set registering BEFORE register_fd to close the race window where
371  
        // reactor sees an event before we set registered.
371  
        // reactor sees an event before we set registered.
372  
        op.registered.store(
372  
        op.registered.store(
373  
            select_registration_state::registering, std::memory_order_release);
373  
            select_registration_state::registering, std::memory_order_release);
374  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375  

375  

376  
        // Transition to registered. If this fails, reactor or cancel already
376  
        // Transition to registered. If this fails, reactor or cancel already
377  
        // claimed the op (state is now unregistered), so we're done. However,
377  
        // claimed the op (state is now unregistered), so we're done. However,
378  
        // we must still deregister the fd because cancel's deregister_fd may
378  
        // we must still deregister the fd because cancel's deregister_fd may
379  
        // have run before our register_fd, leaving the fd orphaned.
379  
        // have run before our register_fd, leaving the fd orphaned.
380  
        auto expected = select_registration_state::registering;
380  
        auto expected = select_registration_state::registering;
381  
        if (!op.registered.compare_exchange_strong(
381  
        if (!op.registered.compare_exchange_strong(
382  
                expected, select_registration_state::registered,
382  
                expected, select_registration_state::registered,
383  
                std::memory_order_acq_rel))
383  
                std::memory_order_acq_rel))
384  
        {
384  
        {
385  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386  
            return std::noop_coroutine();
386  
            return std::noop_coroutine();
387  
        }
387  
        }
388  

388  

389  
        // If cancelled was set before we registered, handle it now.
389  
        // If cancelled was set before we registered, handle it now.
390  
        if (op.cancelled.load(std::memory_order_acquire))
390  
        if (op.cancelled.load(std::memory_order_acquire))
391  
        {
391  
        {
392  
            auto prev = op.registered.exchange(
392  
            auto prev = op.registered.exchange(
393  
                select_registration_state::unregistered,
393  
                select_registration_state::unregistered,
394  
                std::memory_order_acq_rel);
394  
                std::memory_order_acq_rel);
395  
            if (prev != select_registration_state::unregistered)
395  
            if (prev != select_registration_state::unregistered)
396  
            {
396  
            {
397  
                svc_.scheduler().deregister_fd(
397  
                svc_.scheduler().deregister_fd(
398  
                    fd_, select_scheduler::event_read);
398  
                    fd_, select_scheduler::event_read);
399  
                op.impl_ptr = shared_from_this();
399  
                op.impl_ptr = shared_from_this();
400  
                svc_.post(&op);
400  
                svc_.post(&op);
401  
                svc_.work_finished();
401  
                svc_.work_finished();
402  
            }
402  
            }
403  
        }
403  
        }
404  
        return std::noop_coroutine();
404  
        return std::noop_coroutine();
405  
    }
405  
    }
406  

406  

407  
    op.complete(errno, 0);
407  
    op.complete(errno, 0);
408  
    op.impl_ptr = shared_from_this();
408  
    op.impl_ptr = shared_from_this();
409  
    svc_.post(&op);
409  
    svc_.post(&op);
410  
    return std::noop_coroutine();
410  
    return std::noop_coroutine();
411  
}
411  
}
412  

412  

413  
inline std::coroutine_handle<>
413  
inline std::coroutine_handle<>
414  
select_socket::write_some(
414  
select_socket::write_some(
415  
    std::coroutine_handle<> h,
415  
    std::coroutine_handle<> h,
416  
    capy::executor_ref ex,
416  
    capy::executor_ref ex,
417  
    buffer_param param,
417  
    buffer_param param,
418  
    std::stop_token token,
418  
    std::stop_token token,
419  
    std::error_code* ec,
419  
    std::error_code* ec,
420  
    std::size_t* bytes_out)
420  
    std::size_t* bytes_out)
421  
{
421  
{
422  
    auto& op = wr_;
422  
    auto& op = wr_;
423  
    op.reset();
423  
    op.reset();
424  
    op.h         = h;
424  
    op.h         = h;
425  
    op.ex        = ex;
425  
    op.ex        = ex;
426  
    op.ec_out    = ec;
426  
    op.ec_out    = ec;
427  
    op.bytes_out = bytes_out;
427  
    op.bytes_out = bytes_out;
428  
    op.fd        = fd_;
428  
    op.fd        = fd_;
429  
    op.start(token, this);
429  
    op.start(token, this);
430  

430  

431  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
431  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
432  
    op.iovec_count =
432  
    op.iovec_count =
433  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434  

434  

435  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436  
    {
436  
    {
437  
        op.complete(0, 0);
437  
        op.complete(0, 0);
438  
        op.impl_ptr = shared_from_this();
438  
        op.impl_ptr = shared_from_this();
439  
        svc_.post(&op);
439  
        svc_.post(&op);
440  
        return std::noop_coroutine();
440  
        return std::noop_coroutine();
441  
    }
441  
    }
442  

442  

443  
    for (int i = 0; i < op.iovec_count; ++i)
443  
    for (int i = 0; i < op.iovec_count; ++i)
444  
    {
444  
    {
445  
        op.iovecs[i].iov_base = bufs[i].data();
445  
        op.iovecs[i].iov_base = bufs[i].data();
446  
        op.iovecs[i].iov_len  = bufs[i].size();
446  
        op.iovecs[i].iov_len  = bufs[i].size();
447  
    }
447  
    }
448  

448  

449  
    msghdr msg{};
449  
    msghdr msg{};
450  
    msg.msg_iov    = op.iovecs;
450  
    msg.msg_iov    = op.iovecs;
451  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452  

452  

453  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454  

454  

455  
    if (n > 0)
455  
    if (n > 0)
456  
    {
456  
    {
457  
        op.complete(0, static_cast<std::size_t>(n));
457  
        op.complete(0, static_cast<std::size_t>(n));
458  
        op.impl_ptr = shared_from_this();
458  
        op.impl_ptr = shared_from_this();
459  
        svc_.post(&op);
459  
        svc_.post(&op);
460  
        return std::noop_coroutine();
460  
        return std::noop_coroutine();
461  
    }
461  
    }
462  

462  

463  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
463  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
464  
    {
464  
    {
465  
        svc_.work_started();
465  
        svc_.work_started();
466  
        op.impl_ptr = shared_from_this();
466  
        op.impl_ptr = shared_from_this();
467  

467  

468  
        // Set registering BEFORE register_fd to close the race window where
468  
        // Set registering BEFORE register_fd to close the race window where
469  
        // reactor sees an event before we set registered.
469  
        // reactor sees an event before we set registered.
470  
        op.registered.store(
470  
        op.registered.store(
471  
            select_registration_state::registering, std::memory_order_release);
471  
            select_registration_state::registering, std::memory_order_release);
472  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473  

473  

474  
        // Transition to registered. If this fails, reactor or cancel already
474  
        // Transition to registered. If this fails, reactor or cancel already
475  
        // claimed the op (state is now unregistered), so we're done. However,
475  
        // claimed the op (state is now unregistered), so we're done. However,
476  
        // we must still deregister the fd because cancel's deregister_fd may
476  
        // we must still deregister the fd because cancel's deregister_fd may
477  
        // have run before our register_fd, leaving the fd orphaned.
477  
        // have run before our register_fd, leaving the fd orphaned.
478  
        auto expected = select_registration_state::registering;
478  
        auto expected = select_registration_state::registering;
479  
        if (!op.registered.compare_exchange_strong(
479  
        if (!op.registered.compare_exchange_strong(
480  
                expected, select_registration_state::registered,
480  
                expected, select_registration_state::registered,
481  
                std::memory_order_acq_rel))
481  
                std::memory_order_acq_rel))
482  
        {
482  
        {
483  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484  
            return std::noop_coroutine();
484  
            return std::noop_coroutine();
485  
        }
485  
        }
486  

486  

487  
        // If cancelled was set before we registered, handle it now.
487  
        // If cancelled was set before we registered, handle it now.
488  
        if (op.cancelled.load(std::memory_order_acquire))
488  
        if (op.cancelled.load(std::memory_order_acquire))
489  
        {
489  
        {
490  
            auto prev = op.registered.exchange(
490  
            auto prev = op.registered.exchange(
491  
                select_registration_state::unregistered,
491  
                select_registration_state::unregistered,
492  
                std::memory_order_acq_rel);
492  
                std::memory_order_acq_rel);
493  
            if (prev != select_registration_state::unregistered)
493  
            if (prev != select_registration_state::unregistered)
494  
            {
494  
            {
495  
                svc_.scheduler().deregister_fd(
495  
                svc_.scheduler().deregister_fd(
496  
                    fd_, select_scheduler::event_write);
496  
                    fd_, select_scheduler::event_write);
497  
                op.impl_ptr = shared_from_this();
497  
                op.impl_ptr = shared_from_this();
498  
                svc_.post(&op);
498  
                svc_.post(&op);
499  
                svc_.work_finished();
499  
                svc_.work_finished();
500  
            }
500  
            }
501  
        }
501  
        }
502  
        return std::noop_coroutine();
502  
        return std::noop_coroutine();
503  
    }
503  
    }
504  

504  

505  
    op.complete(errno ? errno : EIO, 0);
505  
    op.complete(errno ? errno : EIO, 0);
506  
    op.impl_ptr = shared_from_this();
506  
    op.impl_ptr = shared_from_this();
507  
    svc_.post(&op);
507  
    svc_.post(&op);
508  
    return std::noop_coroutine();
508  
    return std::noop_coroutine();
509  
}
509  
}
510  

510  

511  
inline std::error_code
511  
inline std::error_code
512  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513  
{
513  
{
514  
    int how;
514  
    int how;
515  
    switch (what)
515  
    switch (what)
516  
    {
516  
    {
517  
    case tcp_socket::shutdown_receive:
517  
    case tcp_socket::shutdown_receive:
518  
        how = SHUT_RD;
518  
        how = SHUT_RD;
519  
        break;
519  
        break;
520  
    case tcp_socket::shutdown_send:
520  
    case tcp_socket::shutdown_send:
521  
        how = SHUT_WR;
521  
        how = SHUT_WR;
522  
        break;
522  
        break;
523  
    case tcp_socket::shutdown_both:
523  
    case tcp_socket::shutdown_both:
524  
        how = SHUT_RDWR;
524  
        how = SHUT_RDWR;
525  
        break;
525  
        break;
526  
    default:
526  
    default:
527  
        return make_err(EINVAL);
527  
        return make_err(EINVAL);
528  
    }
528  
    }
529  
    if (::shutdown(fd_, how) != 0)
529  
    if (::shutdown(fd_, how) != 0)
530  
        return make_err(errno);
530  
        return make_err(errno);
531  
    return {};
531  
    return {};
532  
}
532  
}
533  

533  

534  
inline std::error_code
534  
inline std::error_code
535  
select_socket::set_option(
535  
select_socket::set_option(
536  
    int level, int optname, void const* data, std::size_t size) noexcept
536  
    int level, int optname, void const* data, std::size_t size) noexcept
537  
{
537  
{
538  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
538  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539  
        0)
539  
        0)
540  
        return make_err(errno);
540  
        return make_err(errno);
541  
    return {};
541  
    return {};
542  
}
542  
}
543  

543  

544  
inline std::error_code
544  
inline std::error_code
545  
select_socket::get_option(
545  
select_socket::get_option(
546  
    int level, int optname, void* data, std::size_t* size) const noexcept
546  
    int level, int optname, void* data, std::size_t* size) const noexcept
547  
{
547  
{
548  
    socklen_t len = static_cast<socklen_t>(*size);
548  
    socklen_t len = static_cast<socklen_t>(*size);
549  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
549  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
550  
        return make_err(errno);
550  
        return make_err(errno);
551  
    *size = static_cast<std::size_t>(len);
551  
    *size = static_cast<std::size_t>(len);
552  
    return {};
552  
    return {};
553  
}
553  
}
554  

554  

555  
inline void
555  
inline void
556  
select_socket::cancel() noexcept
556  
select_socket::cancel() noexcept
557  
{
557  
{
558  
    auto self = weak_from_this().lock();
558  
    auto self = weak_from_this().lock();
559  
    if (!self)
559  
    if (!self)
560  
        return;
560  
        return;
561  

561  

562  
    auto cancel_op = [this, &self](select_op& op, int events) {
562  
    auto cancel_op = [this, &self](select_op& op, int events) {
563  
        auto prev = op.registered.exchange(
563  
        auto prev = op.registered.exchange(
564  
            select_registration_state::unregistered, std::memory_order_acq_rel);
564  
            select_registration_state::unregistered, std::memory_order_acq_rel);
565  
        op.request_cancel();
565  
        op.request_cancel();
566  
        if (prev != select_registration_state::unregistered)
566  
        if (prev != select_registration_state::unregistered)
567  
        {
567  
        {
568  
            svc_.scheduler().deregister_fd(fd_, events);
568  
            svc_.scheduler().deregister_fd(fd_, events);
569  
            op.impl_ptr = self;
569  
            op.impl_ptr = self;
570  
            svc_.post(&op);
570  
            svc_.post(&op);
571  
            svc_.work_finished();
571  
            svc_.work_finished();
572  
        }
572  
        }
573  
    };
573  
    };
574  

574  

575  
    cancel_op(conn_, select_scheduler::event_write);
575  
    cancel_op(conn_, select_scheduler::event_write);
576  
    cancel_op(rd_, select_scheduler::event_read);
576  
    cancel_op(rd_, select_scheduler::event_read);
577  
    cancel_op(wr_, select_scheduler::event_write);
577  
    cancel_op(wr_, select_scheduler::event_write);
578  
}
578  
}
579  

579  

580  
inline void
580  
inline void
581  
select_socket::cancel_single_op(select_op& op) noexcept
581  
select_socket::cancel_single_op(select_op& op) noexcept
582  
{
582  
{
583  
    auto self = weak_from_this().lock();
583  
    auto self = weak_from_this().lock();
584  
    if (!self)
584  
    if (!self)
585  
        return;
585  
        return;
586  

586  

587  
    // Called from stop_token callback to cancel a specific pending operation.
587  
    // Called from stop_token callback to cancel a specific pending operation.
588  
    auto prev = op.registered.exchange(
588  
    auto prev = op.registered.exchange(
589  
        select_registration_state::unregistered, std::memory_order_acq_rel);
589  
        select_registration_state::unregistered, std::memory_order_acq_rel);
590  
    op.request_cancel();
590  
    op.request_cancel();
591  

591  

592  
    if (prev != select_registration_state::unregistered)
592  
    if (prev != select_registration_state::unregistered)
593  
    {
593  
    {
594  
        // Determine which event type to deregister
594  
        // Determine which event type to deregister
595  
        int events = 0;
595  
        int events = 0;
596  
        if (&op == &conn_ || &op == &wr_)
596  
        if (&op == &conn_ || &op == &wr_)
597  
            events = select_scheduler::event_write;
597  
            events = select_scheduler::event_write;
598  
        else if (&op == &rd_)
598  
        else if (&op == &rd_)
599  
            events = select_scheduler::event_read;
599  
            events = select_scheduler::event_read;
600  

600  

601  
        svc_.scheduler().deregister_fd(fd_, events);
601  
        svc_.scheduler().deregister_fd(fd_, events);
602  

602  

603  
        op.impl_ptr = self;
603  
        op.impl_ptr = self;
604  
        svc_.post(&op);
604  
        svc_.post(&op);
605  
        svc_.work_finished();
605  
        svc_.work_finished();
606  
    }
606  
    }
607  
}
607  
}
608  

608  

609  
inline void
609  
inline void
610  
select_socket::close_socket() noexcept
610  
select_socket::close_socket() noexcept
611  
{
611  
{
612  
    auto self = weak_from_this().lock();
612  
    auto self = weak_from_this().lock();
613  
    if (self)
613  
    if (self)
614  
    {
614  
    {
615  
        auto cancel_op = [this, &self](select_op& op, int events) {
615  
        auto cancel_op = [this, &self](select_op& op, int events) {
616  
            auto prev = op.registered.exchange(
616  
            auto prev = op.registered.exchange(
617  
                select_registration_state::unregistered,
617  
                select_registration_state::unregistered,
618  
                std::memory_order_acq_rel);
618  
                std::memory_order_acq_rel);
619  
            op.request_cancel();
619  
            op.request_cancel();
620  
            if (prev != select_registration_state::unregistered)
620  
            if (prev != select_registration_state::unregistered)
621  
            {
621  
            {
622  
                svc_.scheduler().deregister_fd(fd_, events);
622  
                svc_.scheduler().deregister_fd(fd_, events);
623  
                op.impl_ptr = self;
623  
                op.impl_ptr = self;
624  
                svc_.post(&op);
624  
                svc_.post(&op);
625  
                svc_.work_finished();
625  
                svc_.work_finished();
626  
            }
626  
            }
627  
        };
627  
        };
628  

628  

629  
        cancel_op(conn_, select_scheduler::event_write);
629  
        cancel_op(conn_, select_scheduler::event_write);
630  
        cancel_op(rd_, select_scheduler::event_read);
630  
        cancel_op(rd_, select_scheduler::event_read);
631  
        cancel_op(wr_, select_scheduler::event_write);
631  
        cancel_op(wr_, select_scheduler::event_write);
632  
    }
632  
    }
633  

633  

634  
    if (fd_ >= 0)
634  
    if (fd_ >= 0)
635  
    {
635  
    {
636  
        svc_.scheduler().deregister_fd(
636  
        svc_.scheduler().deregister_fd(
637  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
637  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
638  
        ::close(fd_);
638  
        ::close(fd_);
639  
        fd_ = -1;
639  
        fd_ = -1;
640  
    }
640  
    }
641  

641  

642  
    local_endpoint_  = endpoint{};
642  
    local_endpoint_  = endpoint{};
643  
    remote_endpoint_ = endpoint{};
643  
    remote_endpoint_ = endpoint{};
644  
}
644  
}
645  

645  

646  
inline select_socket_service::select_socket_service(
646  
inline select_socket_service::select_socket_service(
647  
    capy::execution_context& ctx)
647  
    capy::execution_context& ctx)
648  
    : state_(
648  
    : state_(
649  
          std::make_unique<select_socket_state>(
649  
          std::make_unique<select_socket_state>(
650  
              ctx.use_service<select_scheduler>()))
650  
              ctx.use_service<select_scheduler>()))
651  
{
651  
{
652  
}
652  
}
653  

653  

654  
inline select_socket_service::~select_socket_service() {}
654  
inline select_socket_service::~select_socket_service() {}
655  

655  

656  
inline void
656  
inline void
657  
select_socket_service::shutdown()
657  
select_socket_service::shutdown()
658  
{
658  
{
659  
    std::lock_guard lock(state_->mutex_);
659  
    std::lock_guard lock(state_->mutex_);
660  

660  

661  
    while (auto* impl = state_->socket_list_.pop_front())
661  
    while (auto* impl = state_->socket_list_.pop_front())
662  
        impl->close_socket();
662  
        impl->close_socket();
663  

663  

664  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
664  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665  
    // drains completed_ops_, calling destroy() on each queued op. Letting
665  
    // drains completed_ops_, calling destroy() on each queued op. Letting
666  
    // ~state_ release the ptrs (during service destruction, after scheduler
666  
    // ~state_ release the ptrs (during service destruction, after scheduler
667  
    // shutdown) keeps every impl alive until all ops have been drained.
667  
    // shutdown) keeps every impl alive until all ops have been drained.
668  
}
668  
}
669  

669  

670  
inline io_object::implementation*
670  
inline io_object::implementation*
671  
select_socket_service::construct()
671  
select_socket_service::construct()
672  
{
672  
{
673  
    auto impl = std::make_shared<select_socket>(*this);
673  
    auto impl = std::make_shared<select_socket>(*this);
674  
    auto* raw = impl.get();
674  
    auto* raw = impl.get();
675  

675  

676  
    {
676  
    {
677  
        std::lock_guard lock(state_->mutex_);
677  
        std::lock_guard lock(state_->mutex_);
678  
        state_->socket_list_.push_back(raw);
678  
        state_->socket_list_.push_back(raw);
679  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
679  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
680  
    }
680  
    }
681  

681  

682  
    return raw;
682  
    return raw;
683  
}
683  
}
684  

684  

685  
inline void
685  
inline void
686  
select_socket_service::destroy(io_object::implementation* impl)
686  
select_socket_service::destroy(io_object::implementation* impl)
687  
{
687  
{
688  
    auto* select_impl = static_cast<select_socket*>(impl);
688  
    auto* select_impl = static_cast<select_socket*>(impl);
689  
    select_impl->close_socket();
689  
    select_impl->close_socket();
690  
    std::lock_guard lock(state_->mutex_);
690  
    std::lock_guard lock(state_->mutex_);
691  
    state_->socket_list_.remove(select_impl);
691  
    state_->socket_list_.remove(select_impl);
692  
    state_->socket_ptrs_.erase(select_impl);
692  
    state_->socket_ptrs_.erase(select_impl);
693  
}
693  
}
694  

694  

695  
inline std::error_code
695  
inline std::error_code
696  
select_socket_service::open_socket(
696  
select_socket_service::open_socket(
697  
    tcp_socket::implementation& impl, int family, int type, int protocol)
697  
    tcp_socket::implementation& impl, int family, int type, int protocol)
698  
{
698  
{
699  
    auto* select_impl = static_cast<select_socket*>(&impl);
699  
    auto* select_impl = static_cast<select_socket*>(&impl);
700  
    select_impl->close_socket();
700  
    select_impl->close_socket();
701  

701  

702  
    int fd = ::socket(family, type, protocol);
702  
    int fd = ::socket(family, type, protocol);
703  
    if (fd < 0)
703  
    if (fd < 0)
704  
        return make_err(errno);
704  
        return make_err(errno);
705  

705  

706  
    if (family == AF_INET6)
706  
    if (family == AF_INET6)
707  
    {
707  
    {
708  
        int one = 1;
708  
        int one = 1;
709  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
709  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710  
    }
710  
    }
711  

711  

712  
    // Set non-blocking and close-on-exec
712  
    // Set non-blocking and close-on-exec
713  
    int flags = ::fcntl(fd, F_GETFL, 0);
713  
    int flags = ::fcntl(fd, F_GETFL, 0);
714  
    if (flags == -1)
714  
    if (flags == -1)
715  
    {
715  
    {
716  
        int errn = errno;
716  
        int errn = errno;
717  
        ::close(fd);
717  
        ::close(fd);
718  
        return make_err(errn);
718  
        return make_err(errn);
719  
    }
719  
    }
720  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
720  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721  
    {
721  
    {
722  
        int errn = errno;
722  
        int errn = errno;
723  
        ::close(fd);
723  
        ::close(fd);
724  
        return make_err(errn);
724  
        return make_err(errn);
725  
    }
725  
    }
726  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
726  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727  
    {
727  
    {
728  
        int errn = errno;
728  
        int errn = errno;
729  
        ::close(fd);
729  
        ::close(fd);
730  
        return make_err(errn);
730  
        return make_err(errn);
731  
    }
731  
    }
732  

732  

733  
    // Check fd is within select() limits
733  
    // Check fd is within select() limits
734  
    if (fd >= FD_SETSIZE)
734  
    if (fd >= FD_SETSIZE)
735  
    {
735  
    {
736  
        ::close(fd);
736  
        ::close(fd);
737  
        return make_err(EMFILE); // Too many open files
737  
        return make_err(EMFILE); // Too many open files
738  
    }
738  
    }
739  

739  

740  
    select_impl->fd_ = fd;
740  
    select_impl->fd_ = fd;
741  
    return {};
741  
    return {};
742  
}
742  
}
743  

743  

744  
inline void
744  
inline void
745  
select_socket_service::close(io_object::handle& h)
745  
select_socket_service::close(io_object::handle& h)
746  
{
746  
{
747  
    static_cast<select_socket*>(h.get())->close_socket();
747  
    static_cast<select_socket*>(h.get())->close_socket();
748  
}
748  
}
749  

749  

750  
inline void
750  
inline void
751  
select_socket_service::post(select_op* op)
751  
select_socket_service::post(select_op* op)
752  
{
752  
{
753  
    state_->sched_.post(op);
753  
    state_->sched_.post(op);
754  
}
754  
}
755  

755  

756  
inline void
756  
inline void
757  
select_socket_service::work_started() noexcept
757  
select_socket_service::work_started() noexcept
758  
{
758  
{
759  
    state_->sched_.work_started();
759  
    state_->sched_.work_started();
760  
}
760  
}
761  

761  

762  
inline void
762  
inline void
763  
select_socket_service::work_finished() noexcept
763  
select_socket_service::work_finished() noexcept
764  
{
764  
{
765  
    state_->sched_.work_finished();
765  
    state_->sched_.work_finished();
766  
}
766  
}
767  

767  

768  
} // namespace boost::corosio::detail
768  
} // namespace boost::corosio::detail
769  

769  

770  
#endif // BOOST_COROSIO_HAS_SELECT
770  
#endif // BOOST_COROSIO_HAS_SELECT
771  

771  

772  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
772  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP