1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127  
    std::error_code open_socket(
127  
    std::error_code open_socket(
128  
        tcp_socket::implementation& impl,
128  
        tcp_socket::implementation& impl,
129  
        int family,
129  
        int family,
130  
        int type,
130  
        int type,
131  
        int protocol) override;
131  
        int protocol) override;
132  

132  

133  
    epoll_scheduler& scheduler() const noexcept
133  
    epoll_scheduler& scheduler() const noexcept
134  
    {
134  
    {
135  
        return state_->sched_;
135  
        return state_->sched_;
136  
    }
136  
    }
137  
    void post(epoll_op* op);
137  
    void post(epoll_op* op);
138  
    void work_started() noexcept;
138  
    void work_started() noexcept;
139  
    void work_finished() noexcept;
139  
    void work_finished() noexcept;
140  

140  

141  
private:
141  
private:
142  
    std::unique_ptr<epoll_socket_state> state_;
142  
    std::unique_ptr<epoll_socket_state> state_;
143  
};
143  
};
144  

144  

145  
//--------------------------------------------------------------------------
145  
//--------------------------------------------------------------------------
146  
//
146  
//
147  
// Implementation
147  
// Implementation
148  
//
148  
//
149  
//--------------------------------------------------------------------------
149  
//--------------------------------------------------------------------------
150  

150  

151  
// Register an op with the reactor, handling cached edge events.
151  
// Register an op with the reactor, handling cached edge events.
152  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
152  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
153  
inline void
153  
inline void
154  
epoll_socket::register_op(
154  
epoll_socket::register_op(
155  
    epoll_op& op,
155  
    epoll_op& op,
156  
    epoll_op*& desc_slot,
156  
    epoll_op*& desc_slot,
157  
    bool& ready_flag,
157  
    bool& ready_flag,
158  
    bool& cancel_flag) noexcept
158  
    bool& cancel_flag) noexcept
159  
{
159  
{
160  
    svc_.work_started();
160  
    svc_.work_started();
161  

161  

162  
    std::lock_guard lock(desc_state_.mutex);
162  
    std::lock_guard lock(desc_state_.mutex);
163  
    bool io_done = false;
163  
    bool io_done = false;
164  
    if (ready_flag)
164  
    if (ready_flag)
165  
    {
165  
    {
166  
        ready_flag = false;
166  
        ready_flag = false;
167  
        op.perform_io();
167  
        op.perform_io();
168  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
168  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
169  
        if (!io_done)
169  
        if (!io_done)
170  
            op.errn = 0;
170  
            op.errn = 0;
171  
    }
171  
    }
172  

172  

173  
    if (cancel_flag)
173  
    if (cancel_flag)
174  
    {
174  
    {
175  
        cancel_flag = false;
175  
        cancel_flag = false;
176  
        op.cancelled.store(true, std::memory_order_relaxed);
176  
        op.cancelled.store(true, std::memory_order_relaxed);
177  
    }
177  
    }
178  

178  

179  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
179  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
180  
    {
180  
    {
181  
        svc_.post(&op);
181  
        svc_.post(&op);
182  
        svc_.work_finished();
182  
        svc_.work_finished();
183  
    }
183  
    }
184  
    else
184  
    else
185  
    {
185  
    {
186  
        desc_slot = &op;
186  
        desc_slot = &op;
187  
    }
187  
    }
188  
}
188  
}
189  

189  

190  
inline void
190  
inline void
191  
epoll_op::canceller::operator()() const noexcept
191  
epoll_op::canceller::operator()() const noexcept
192  
{
192  
{
193  
    op->cancel();
193  
    op->cancel();
194  
}
194  
}
195  

195  

196  
inline void
196  
inline void
197  
epoll_connect_op::cancel() noexcept
197  
epoll_connect_op::cancel() noexcept
198  
{
198  
{
199  
    if (socket_impl_)
199  
    if (socket_impl_)
200  
        socket_impl_->cancel_single_op(*this);
200  
        socket_impl_->cancel_single_op(*this);
201  
    else
201  
    else
202  
        request_cancel();
202  
        request_cancel();
203  
}
203  
}
204  

204  

205  
inline void
205  
inline void
206  
epoll_read_op::cancel() noexcept
206  
epoll_read_op::cancel() noexcept
207  
{
207  
{
208  
    if (socket_impl_)
208  
    if (socket_impl_)
209  
        socket_impl_->cancel_single_op(*this);
209  
        socket_impl_->cancel_single_op(*this);
210  
    else
210  
    else
211  
        request_cancel();
211  
        request_cancel();
212  
}
212  
}
213  

213  

214  
inline void
214  
inline void
215  
epoll_write_op::cancel() noexcept
215  
epoll_write_op::cancel() noexcept
216  
{
216  
{
217  
    if (socket_impl_)
217  
    if (socket_impl_)
218  
        socket_impl_->cancel_single_op(*this);
218  
        socket_impl_->cancel_single_op(*this);
219  
    else
219  
    else
220  
        request_cancel();
220  
        request_cancel();
221  
}
221  
}
222  

222  

223  
inline void
223  
inline void
224  
epoll_op::operator()()
224  
epoll_op::operator()()
225  
{
225  
{
226  
    stop_cb.reset();
226  
    stop_cb.reset();
227  

227  

228  
    socket_impl_->svc_.scheduler().reset_inline_budget();
228  
    socket_impl_->svc_.scheduler().reset_inline_budget();
229  

229  

230  
    if (cancelled.load(std::memory_order_acquire))
230  
    if (cancelled.load(std::memory_order_acquire))
231  
        *ec_out = capy::error::canceled;
231  
        *ec_out = capy::error::canceled;
232  
    else if (errn != 0)
232  
    else if (errn != 0)
233  
        *ec_out = make_err(errn);
233  
        *ec_out = make_err(errn);
234  
    else if (is_read_operation() && bytes_transferred == 0)
234  
    else if (is_read_operation() && bytes_transferred == 0)
235  
        *ec_out = capy::error::eof;
235  
        *ec_out = capy::error::eof;
236  
    else
236  
    else
237  
        *ec_out = {};
237  
        *ec_out = {};
238  

238  

239  
    *bytes_out = bytes_transferred;
239  
    *bytes_out = bytes_transferred;
240  

240  

241  
    // Move to stack before resuming coroutine. The coroutine might close
241  
    // Move to stack before resuming coroutine. The coroutine might close
242  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
242  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
243  
    // last ref and we destroyed it while still in operator(), we'd have
243  
    // last ref and we destroyed it while still in operator(), we'd have
244  
    // use-after-free. Moving to local ensures destruction happens at
244  
    // use-after-free. Moving to local ensures destruction happens at
245  
    // function exit, after all member accesses are complete.
245  
    // function exit, after all member accesses are complete.
246  
    capy::executor_ref saved_ex(ex);
246  
    capy::executor_ref saved_ex(ex);
247  
    std::coroutine_handle<> saved_h(h);
247  
    std::coroutine_handle<> saved_h(h);
248  
    auto prevent_premature_destruction = std::move(impl_ptr);
248  
    auto prevent_premature_destruction = std::move(impl_ptr);
249  
    dispatch_coro(saved_ex, saved_h).resume();
249  
    dispatch_coro(saved_ex, saved_h).resume();
250  
}
250  
}
251  

251  

252  
inline void
252  
inline void
253  
epoll_connect_op::operator()()
253  
epoll_connect_op::operator()()
254  
{
254  
{
255  
    stop_cb.reset();
255  
    stop_cb.reset();
256  

256  

257  
    socket_impl_->svc_.scheduler().reset_inline_budget();
257  
    socket_impl_->svc_.scheduler().reset_inline_budget();
258  

258  

259  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
259  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
260  

260  

261  
    // Cache endpoints on successful connect
261  
    // Cache endpoints on successful connect
262  
    if (success && socket_impl_)
262  
    if (success && socket_impl_)
263  
    {
263  
    {
264  
        endpoint local_ep;
264  
        endpoint local_ep;
265  
        sockaddr_storage local_storage{};
265  
        sockaddr_storage local_storage{};
266  
        socklen_t local_len = sizeof(local_storage);
266  
        socklen_t local_len = sizeof(local_storage);
267  
        if (::getsockname(
267  
        if (::getsockname(
268  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
268  
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
269  
            0)
269  
            0)
270  
            local_ep = from_sockaddr(local_storage);
270  
            local_ep = from_sockaddr(local_storage);
271  
        static_cast<epoll_socket*>(socket_impl_)
271  
        static_cast<epoll_socket*>(socket_impl_)
272  
            ->set_endpoints(local_ep, target_endpoint);
272  
            ->set_endpoints(local_ep, target_endpoint);
273  
    }
273  
    }
274  

274  

275  
    if (cancelled.load(std::memory_order_acquire))
275  
    if (cancelled.load(std::memory_order_acquire))
276  
        *ec_out = capy::error::canceled;
276  
        *ec_out = capy::error::canceled;
277  
    else if (errn != 0)
277  
    else if (errn != 0)
278  
        *ec_out = make_err(errn);
278  
        *ec_out = make_err(errn);
279  
    else
279  
    else
280  
        *ec_out = {};
280  
        *ec_out = {};
281  

281  

282  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
282  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
283  
    capy::executor_ref saved_ex(ex);
283  
    capy::executor_ref saved_ex(ex);
284  
    std::coroutine_handle<> saved_h(h);
284  
    std::coroutine_handle<> saved_h(h);
285  
    auto prevent_premature_destruction = std::move(impl_ptr);
285  
    auto prevent_premature_destruction = std::move(impl_ptr);
286  
    dispatch_coro(saved_ex, saved_h).resume();
286  
    dispatch_coro(saved_ex, saved_h).resume();
287  
}
287  
}
288  

288  

289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290  
    : svc_(svc)
290  
    : svc_(svc)
291  
{
291  
{
292  
}
292  
}
293  

293  

294  
inline epoll_socket::~epoll_socket() = default;
294  
inline epoll_socket::~epoll_socket() = default;
295  

295  

296  
inline std::coroutine_handle<>
296  
inline std::coroutine_handle<>
297  
epoll_socket::connect(
297  
epoll_socket::connect(
298  
    std::coroutine_handle<> h,
298  
    std::coroutine_handle<> h,
299  
    capy::executor_ref ex,
299  
    capy::executor_ref ex,
300  
    endpoint ep,
300  
    endpoint ep,
301  
    std::stop_token token,
301  
    std::stop_token token,
302  
    std::error_code* ec)
302  
    std::error_code* ec)
303  
{
303  
{
304  
    auto& op = conn_;
304  
    auto& op = conn_;
305  

305  

306  
    sockaddr_storage storage{};
306  
    sockaddr_storage storage{};
307  
    socklen_t addrlen =
307  
    socklen_t addrlen =
308  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
308  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
309  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309  
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310  

310  

311  
    if (result == 0)
311  
    if (result == 0)
312  
    {
312  
    {
313  
        sockaddr_storage local_storage{};
313  
        sockaddr_storage local_storage{};
314  
        socklen_t local_len = sizeof(local_storage);
314  
        socklen_t local_len = sizeof(local_storage);
315  
        if (::getsockname(
315  
        if (::getsockname(
316  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
316  
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
317  
            0)
317  
            0)
318  
            local_endpoint_ = detail::from_sockaddr(local_storage);
318  
            local_endpoint_ = detail::from_sockaddr(local_storage);
319  
        remote_endpoint_ = ep;
319  
        remote_endpoint_ = ep;
320  
    }
320  
    }
321  

321  

322  
    if (result == 0 || errno != EINPROGRESS)
322  
    if (result == 0 || errno != EINPROGRESS)
323  
    {
323  
    {
324  
        int err = (result < 0) ? errno : 0;
324  
        int err = (result < 0) ? errno : 0;
325  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        if (svc_.scheduler().try_consume_inline_budget())
326  
        {
326  
        {
327  
            *ec = err ? make_err(err) : std::error_code{};
327  
            *ec = err ? make_err(err) : std::error_code{};
328  
            return dispatch_coro(ex, h);
328  
            return dispatch_coro(ex, h);
329  
        }
329  
        }
330  
        op.reset();
330  
        op.reset();
331  
        op.h               = h;
331  
        op.h               = h;
332  
        op.ex              = ex;
332  
        op.ex              = ex;
333  
        op.ec_out          = ec;
333  
        op.ec_out          = ec;
334  
        op.fd              = fd_;
334  
        op.fd              = fd_;
335  
        op.target_endpoint = ep;
335  
        op.target_endpoint = ep;
336  
        op.start(token, this);
336  
        op.start(token, this);
337  
        op.impl_ptr = shared_from_this();
337  
        op.impl_ptr = shared_from_this();
338  
        op.complete(err, 0);
338  
        op.complete(err, 0);
339  
        svc_.post(&op);
339  
        svc_.post(&op);
340  
        return std::noop_coroutine();
340  
        return std::noop_coroutine();
341  
    }
341  
    }
342  

342  

343  
    // EINPROGRESS — register with reactor
343  
    // EINPROGRESS — register with reactor
344  
    op.reset();
344  
    op.reset();
345  
    op.h               = h;
345  
    op.h               = h;
346  
    op.ex              = ex;
346  
    op.ex              = ex;
347  
    op.ec_out          = ec;
347  
    op.ec_out          = ec;
348  
    op.fd              = fd_;
348  
    op.fd              = fd_;
349  
    op.target_endpoint = ep;
349  
    op.target_endpoint = ep;
350  
    op.start(token, this);
350  
    op.start(token, this);
351  
    op.impl_ptr = shared_from_this();
351  
    op.impl_ptr = shared_from_this();
352  

352  

353  
    register_op(
353  
    register_op(
354  
        op, desc_state_.connect_op, desc_state_.write_ready,
354  
        op, desc_state_.connect_op, desc_state_.write_ready,
355  
        desc_state_.connect_cancel_pending);
355  
        desc_state_.connect_cancel_pending);
356  
    return std::noop_coroutine();
356  
    return std::noop_coroutine();
357  
}
357  
}
358  

358  

359  
inline std::coroutine_handle<>
359  
inline std::coroutine_handle<>
360  
epoll_socket::read_some(
360  
epoll_socket::read_some(
361  
    std::coroutine_handle<> h,
361  
    std::coroutine_handle<> h,
362  
    capy::executor_ref ex,
362  
    capy::executor_ref ex,
363  
    buffer_param param,
363  
    buffer_param param,
364  
    std::stop_token token,
364  
    std::stop_token token,
365  
    std::error_code* ec,
365  
    std::error_code* ec,
366  
    std::size_t* bytes_out)
366  
    std::size_t* bytes_out)
367  
{
367  
{
368  
    auto& op = rd_;
368  
    auto& op = rd_;
369  
    op.reset();
369  
    op.reset();
370  

370  

371  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
372  
    op.iovec_count =
372  
    op.iovec_count =
373  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
374  

374  

375  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
376  
    {
376  
    {
377  
        op.empty_buffer_read = true;
377  
        op.empty_buffer_read = true;
378  
        op.h                 = h;
378  
        op.h                 = h;
379  
        op.ex                = ex;
379  
        op.ex                = ex;
380  
        op.ec_out            = ec;
380  
        op.ec_out            = ec;
381  
        op.bytes_out         = bytes_out;
381  
        op.bytes_out         = bytes_out;
382  
        op.start(token, this);
382  
        op.start(token, this);
383  
        op.impl_ptr = shared_from_this();
383  
        op.impl_ptr = shared_from_this();
384  
        op.complete(0, 0);
384  
        op.complete(0, 0);
385  
        svc_.post(&op);
385  
        svc_.post(&op);
386  
        return std::noop_coroutine();
386  
        return std::noop_coroutine();
387  
    }
387  
    }
388  

388  

389  
    for (int i = 0; i < op.iovec_count; ++i)
389  
    for (int i = 0; i < op.iovec_count; ++i)
390  
    {
390  
    {
391  
        op.iovecs[i].iov_base = bufs[i].data();
391  
        op.iovecs[i].iov_base = bufs[i].data();
392  
        op.iovecs[i].iov_len  = bufs[i].size();
392  
        op.iovecs[i].iov_len  = bufs[i].size();
393  
    }
393  
    }
394  

394  

395  
    // Speculative read
395  
    // Speculative read
396  
    ssize_t n;
396  
    ssize_t n;
397  
    do
397  
    do
398  
    {
398  
    {
399  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
399  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
400  
    }
400  
    }
401  
    while (n < 0 && errno == EINTR);
401  
    while (n < 0 && errno == EINTR);
402  

402  

403  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
404  
    {
404  
    {
405  
        int err    = (n < 0) ? errno : 0;
405  
        int err    = (n < 0) ? errno : 0;
406  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
407  

407  

408  
        if (svc_.scheduler().try_consume_inline_budget())
408  
        if (svc_.scheduler().try_consume_inline_budget())
409  
        {
409  
        {
410  
            if (err)
410  
            if (err)
411  
                *ec = make_err(err);
411  
                *ec = make_err(err);
412  
            else if (n == 0)
412  
            else if (n == 0)
413  
                *ec = capy::error::eof;
413  
                *ec = capy::error::eof;
414  
            else
414  
            else
415  
                *ec = {};
415  
                *ec = {};
416  
            *bytes_out = bytes;
416  
            *bytes_out = bytes;
417  
            return dispatch_coro(ex, h);
417  
            return dispatch_coro(ex, h);
418  
        }
418  
        }
419  
        op.h         = h;
419  
        op.h         = h;
420  
        op.ex        = ex;
420  
        op.ex        = ex;
421  
        op.ec_out    = ec;
421  
        op.ec_out    = ec;
422  
        op.bytes_out = bytes_out;
422  
        op.bytes_out = bytes_out;
423  
        op.start(token, this);
423  
        op.start(token, this);
424  
        op.impl_ptr = shared_from_this();
424  
        op.impl_ptr = shared_from_this();
425  
        op.complete(err, bytes);
425  
        op.complete(err, bytes);
426  
        svc_.post(&op);
426  
        svc_.post(&op);
427  
        return std::noop_coroutine();
427  
        return std::noop_coroutine();
428  
    }
428  
    }
429  

429  

430  
    // EAGAIN — register with reactor
430  
    // EAGAIN — register with reactor
431  
    op.h         = h;
431  
    op.h         = h;
432  
    op.ex        = ex;
432  
    op.ex        = ex;
433  
    op.ec_out    = ec;
433  
    op.ec_out    = ec;
434  
    op.bytes_out = bytes_out;
434  
    op.bytes_out = bytes_out;
435  
    op.fd        = fd_;
435  
    op.fd        = fd_;
436  
    op.start(token, this);
436  
    op.start(token, this);
437  
    op.impl_ptr = shared_from_this();
437  
    op.impl_ptr = shared_from_this();
438  

438  

439  
    register_op(
439  
    register_op(
440  
        op, desc_state_.read_op, desc_state_.read_ready,
440  
        op, desc_state_.read_op, desc_state_.read_ready,
441  
        desc_state_.read_cancel_pending);
441  
        desc_state_.read_cancel_pending);
442  
    return std::noop_coroutine();
442  
    return std::noop_coroutine();
443  
}
443  
}
444  

444  

445  
inline std::coroutine_handle<>
445  
inline std::coroutine_handle<>
446  
epoll_socket::write_some(
446  
epoll_socket::write_some(
447  
    std::coroutine_handle<> h,
447  
    std::coroutine_handle<> h,
448  
    capy::executor_ref ex,
448  
    capy::executor_ref ex,
449  
    buffer_param param,
449  
    buffer_param param,
450  
    std::stop_token token,
450  
    std::stop_token token,
451  
    std::error_code* ec,
451  
    std::error_code* ec,
452  
    std::size_t* bytes_out)
452  
    std::size_t* bytes_out)
453  
{
453  
{
454  
    auto& op = wr_;
454  
    auto& op = wr_;
455  
    op.reset();
455  
    op.reset();
456  

456  

457  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
458  
    op.iovec_count =
458  
    op.iovec_count =
459  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
460  

460  

461  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
462  
    {
462  
    {
463  
        op.h         = h;
463  
        op.h         = h;
464  
        op.ex        = ex;
464  
        op.ex        = ex;
465  
        op.ec_out    = ec;
465  
        op.ec_out    = ec;
466  
        op.bytes_out = bytes_out;
466  
        op.bytes_out = bytes_out;
467  
        op.start(token, this);
467  
        op.start(token, this);
468  
        op.impl_ptr = shared_from_this();
468  
        op.impl_ptr = shared_from_this();
469  
        op.complete(0, 0);
469  
        op.complete(0, 0);
470  
        svc_.post(&op);
470  
        svc_.post(&op);
471  
        return std::noop_coroutine();
471  
        return std::noop_coroutine();
472  
    }
472  
    }
473  

473  

474  
    for (int i = 0; i < op.iovec_count; ++i)
474  
    for (int i = 0; i < op.iovec_count; ++i)
475  
    {
475  
    {
476  
        op.iovecs[i].iov_base = bufs[i].data();
476  
        op.iovecs[i].iov_base = bufs[i].data();
477  
        op.iovecs[i].iov_len  = bufs[i].size();
477  
        op.iovecs[i].iov_len  = bufs[i].size();
478  
    }
478  
    }
479  

479  

480  
    // Speculative write
480  
    // Speculative write
481  
    msghdr msg{};
481  
    msghdr msg{};
482  
    msg.msg_iov    = op.iovecs;
482  
    msg.msg_iov    = op.iovecs;
483  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
484  

484  

485  
    ssize_t n;
485  
    ssize_t n;
486  
    do
486  
    do
487  
    {
487  
    {
488  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
489  
    }
489  
    }
490  
    while (n < 0 && errno == EINTR);
490  
    while (n < 0 && errno == EINTR);
491  

491  

492  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
493  
    {
493  
    {
494  
        int err    = (n < 0) ? errno : 0;
494  
        int err    = (n < 0) ? errno : 0;
495  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
496  

496  

497  
        if (svc_.scheduler().try_consume_inline_budget())
497  
        if (svc_.scheduler().try_consume_inline_budget())
498  
        {
498  
        {
499  
            *ec        = err ? make_err(err) : std::error_code{};
499  
            *ec        = err ? make_err(err) : std::error_code{};
500  
            *bytes_out = bytes;
500  
            *bytes_out = bytes;
501  
            return dispatch_coro(ex, h);
501  
            return dispatch_coro(ex, h);
502  
        }
502  
        }
503  
        op.h         = h;
503  
        op.h         = h;
504  
        op.ex        = ex;
504  
        op.ex        = ex;
505  
        op.ec_out    = ec;
505  
        op.ec_out    = ec;
506  
        op.bytes_out = bytes_out;
506  
        op.bytes_out = bytes_out;
507  
        op.start(token, this);
507  
        op.start(token, this);
508  
        op.impl_ptr = shared_from_this();
508  
        op.impl_ptr = shared_from_this();
509  
        op.complete(err, bytes);
509  
        op.complete(err, bytes);
510  
        svc_.post(&op);
510  
        svc_.post(&op);
511  
        return std::noop_coroutine();
511  
        return std::noop_coroutine();
512  
    }
512  
    }
513  

513  

514  
    // EAGAIN — register with reactor
514  
    // EAGAIN — register with reactor
515  
    op.h         = h;
515  
    op.h         = h;
516  
    op.ex        = ex;
516  
    op.ex        = ex;
517  
    op.ec_out    = ec;
517  
    op.ec_out    = ec;
518  
    op.bytes_out = bytes_out;
518  
    op.bytes_out = bytes_out;
519  
    op.fd        = fd_;
519  
    op.fd        = fd_;
520  
    op.start(token, this);
520  
    op.start(token, this);
521  
    op.impl_ptr = shared_from_this();
521  
    op.impl_ptr = shared_from_this();
522  

522  

523  
    register_op(
523  
    register_op(
524  
        op, desc_state_.write_op, desc_state_.write_ready,
524  
        op, desc_state_.write_op, desc_state_.write_ready,
525  
        desc_state_.write_cancel_pending);
525  
        desc_state_.write_cancel_pending);
526  
    return std::noop_coroutine();
526  
    return std::noop_coroutine();
527  
}
527  
}
528  

528  

529  
inline std::error_code
529  
inline std::error_code
530  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
531  
{
531  
{
532  
    int how;
532  
    int how;
533  
    switch (what)
533  
    switch (what)
534  
    {
534  
    {
535  
    case tcp_socket::shutdown_receive:
535  
    case tcp_socket::shutdown_receive:
536  
        how = SHUT_RD;
536  
        how = SHUT_RD;
537  
        break;
537  
        break;
538  
    case tcp_socket::shutdown_send:
538  
    case tcp_socket::shutdown_send:
539  
        how = SHUT_WR;
539  
        how = SHUT_WR;
540  
        break;
540  
        break;
541  
    case tcp_socket::shutdown_both:
541  
    case tcp_socket::shutdown_both:
542  
        how = SHUT_RDWR;
542  
        how = SHUT_RDWR;
543  
        break;
543  
        break;
544  
    default:
544  
    default:
545  
        return make_err(EINVAL);
545  
        return make_err(EINVAL);
546  
    }
546  
    }
547  
    if (::shutdown(fd_, how) != 0)
547  
    if (::shutdown(fd_, how) != 0)
548  
        return make_err(errno);
548  
        return make_err(errno);
549  
    return {};
549  
    return {};
550  
}
550  
}
551  

551  

552  
inline std::error_code
552  
inline std::error_code
553  
epoll_socket::set_option(
553  
epoll_socket::set_option(
554  
    int level, int optname, void const* data, std::size_t size) noexcept
554  
    int level, int optname, void const* data, std::size_t size) noexcept
555  
{
555  
{
556  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
556  
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
557  
        0)
557  
        0)
558  
        return make_err(errno);
558  
        return make_err(errno);
559  
    return {};
559  
    return {};
560  
}
560  
}
561  

561  

562  
inline std::error_code
562  
inline std::error_code
563  
epoll_socket::get_option(
563  
epoll_socket::get_option(
564  
    int level, int optname, void* data, std::size_t* size) const noexcept
564  
    int level, int optname, void* data, std::size_t* size) const noexcept
565  
{
565  
{
566  
    socklen_t len = static_cast<socklen_t>(*size);
566  
    socklen_t len = static_cast<socklen_t>(*size);
567  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
567  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
568  
        return make_err(errno);
568  
        return make_err(errno);
569  
    *size = static_cast<std::size_t>(len);
569  
    *size = static_cast<std::size_t>(len);
570  
    return {};
570  
    return {};
571  
}
571  
}
572  

572  

573  
inline void
573  
inline void
574  
epoll_socket::cancel() noexcept
574  
epoll_socket::cancel() noexcept
575  
{
575  
{
576  
    auto self = weak_from_this().lock();
576  
    auto self = weak_from_this().lock();
577  
    if (!self)
577  
    if (!self)
578  
        return;
578  
        return;
579  

579  

580  
    conn_.request_cancel();
580  
    conn_.request_cancel();
581  
    rd_.request_cancel();
581  
    rd_.request_cancel();
582  
    wr_.request_cancel();
582  
    wr_.request_cancel();
583  

583  

584  
    epoll_op* conn_claimed = nullptr;
584  
    epoll_op* conn_claimed = nullptr;
585  
    epoll_op* rd_claimed   = nullptr;
585  
    epoll_op* rd_claimed   = nullptr;
586  
    epoll_op* wr_claimed   = nullptr;
586  
    epoll_op* wr_claimed   = nullptr;
587  
    {
587  
    {
588  
        std::lock_guard lock(desc_state_.mutex);
588  
        std::lock_guard lock(desc_state_.mutex);
589  
        if (desc_state_.connect_op == &conn_)
589  
        if (desc_state_.connect_op == &conn_)
590  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
590  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591  
        else
591  
        else
592  
            desc_state_.connect_cancel_pending = true;
592  
            desc_state_.connect_cancel_pending = true;
593  
        if (desc_state_.read_op == &rd_)
593  
        if (desc_state_.read_op == &rd_)
594  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
594  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595  
        else
595  
        else
596  
            desc_state_.read_cancel_pending = true;
596  
            desc_state_.read_cancel_pending = true;
597  
        if (desc_state_.write_op == &wr_)
597  
        if (desc_state_.write_op == &wr_)
598  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
598  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599  
        else
599  
        else
600  
            desc_state_.write_cancel_pending = true;
600  
            desc_state_.write_cancel_pending = true;
601  
    }
601  
    }
602  

602  

603  
    if (conn_claimed)
603  
    if (conn_claimed)
604  
    {
604  
    {
605  
        conn_.impl_ptr = self;
605  
        conn_.impl_ptr = self;
606  
        svc_.post(&conn_);
606  
        svc_.post(&conn_);
607  
        svc_.work_finished();
607  
        svc_.work_finished();
608  
    }
608  
    }
609  
    if (rd_claimed)
609  
    if (rd_claimed)
610  
    {
610  
    {
611  
        rd_.impl_ptr = self;
611  
        rd_.impl_ptr = self;
612  
        svc_.post(&rd_);
612  
        svc_.post(&rd_);
613  
        svc_.work_finished();
613  
        svc_.work_finished();
614  
    }
614  
    }
615  
    if (wr_claimed)
615  
    if (wr_claimed)
616  
    {
616  
    {
617  
        wr_.impl_ptr = self;
617  
        wr_.impl_ptr = self;
618  
        svc_.post(&wr_);
618  
        svc_.post(&wr_);
619  
        svc_.work_finished();
619  
        svc_.work_finished();
620  
    }
620  
    }
621  
}
621  
}
622  

622  

623  
inline void
623  
inline void
624  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
624  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
625  
{
625  
{
626  
    auto self = weak_from_this().lock();
626  
    auto self = weak_from_this().lock();
627  
    if (!self)
627  
    if (!self)
628  
        return;
628  
        return;
629  

629  

630  
    op.request_cancel();
630  
    op.request_cancel();
631  

631  

632  
    epoll_op** desc_op_ptr = nullptr;
632  
    epoll_op** desc_op_ptr = nullptr;
633  
    if (&op == &conn_)
633  
    if (&op == &conn_)
634  
        desc_op_ptr = &desc_state_.connect_op;
634  
        desc_op_ptr = &desc_state_.connect_op;
635  
    else if (&op == &rd_)
635  
    else if (&op == &rd_)
636  
        desc_op_ptr = &desc_state_.read_op;
636  
        desc_op_ptr = &desc_state_.read_op;
637  
    else if (&op == &wr_)
637  
    else if (&op == &wr_)
638  
        desc_op_ptr = &desc_state_.write_op;
638  
        desc_op_ptr = &desc_state_.write_op;
639  

639  

640  
    if (desc_op_ptr)
640  
    if (desc_op_ptr)
641  
    {
641  
    {
642  
        epoll_op* claimed = nullptr;
642  
        epoll_op* claimed = nullptr;
643  
        {
643  
        {
644  
            std::lock_guard lock(desc_state_.mutex);
644  
            std::lock_guard lock(desc_state_.mutex);
645  
            if (*desc_op_ptr == &op)
645  
            if (*desc_op_ptr == &op)
646  
                claimed = std::exchange(*desc_op_ptr, nullptr);
646  
                claimed = std::exchange(*desc_op_ptr, nullptr);
647  
            else if (&op == &conn_)
647  
            else if (&op == &conn_)
648  
                desc_state_.connect_cancel_pending = true;
648  
                desc_state_.connect_cancel_pending = true;
649  
            else if (&op == &rd_)
649  
            else if (&op == &rd_)
650  
                desc_state_.read_cancel_pending = true;
650  
                desc_state_.read_cancel_pending = true;
651  
            else if (&op == &wr_)
651  
            else if (&op == &wr_)
652  
                desc_state_.write_cancel_pending = true;
652  
                desc_state_.write_cancel_pending = true;
653  
        }
653  
        }
654  
        if (claimed)
654  
        if (claimed)
655  
        {
655  
        {
656  
            op.impl_ptr = self;
656  
            op.impl_ptr = self;
657  
            svc_.post(&op);
657  
            svc_.post(&op);
658  
            svc_.work_finished();
658  
            svc_.work_finished();
659  
        }
659  
        }
660  
    }
660  
    }
661  
}
661  
}
662  

662  

663  
inline void
663  
inline void
664  
epoll_socket::close_socket() noexcept
664  
epoll_socket::close_socket() noexcept
665  
{
665  
{
666  
    auto self = weak_from_this().lock();
666  
    auto self = weak_from_this().lock();
667  
    if (self)
667  
    if (self)
668  
    {
668  
    {
669  
        conn_.request_cancel();
669  
        conn_.request_cancel();
670  
        rd_.request_cancel();
670  
        rd_.request_cancel();
671  
        wr_.request_cancel();
671  
        wr_.request_cancel();
672  

672  

673  
        epoll_op* conn_claimed = nullptr;
673  
        epoll_op* conn_claimed = nullptr;
674  
        epoll_op* rd_claimed   = nullptr;
674  
        epoll_op* rd_claimed   = nullptr;
675  
        epoll_op* wr_claimed   = nullptr;
675  
        epoll_op* wr_claimed   = nullptr;
676  
        {
676  
        {
677  
            std::lock_guard lock(desc_state_.mutex);
677  
            std::lock_guard lock(desc_state_.mutex);
678  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
678  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
679  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
680  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
680  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
681  
            desc_state_.read_ready             = false;
681  
            desc_state_.read_ready             = false;
682  
            desc_state_.write_ready            = false;
682  
            desc_state_.write_ready            = false;
683  
            desc_state_.read_cancel_pending    = false;
683  
            desc_state_.read_cancel_pending    = false;
684  
            desc_state_.write_cancel_pending   = false;
684  
            desc_state_.write_cancel_pending   = false;
685  
            desc_state_.connect_cancel_pending = false;
685  
            desc_state_.connect_cancel_pending = false;
686  
        }
686  
        }
687  

687  

688  
        if (conn_claimed)
688  
        if (conn_claimed)
689  
        {
689  
        {
690  
            conn_.impl_ptr = self;
690  
            conn_.impl_ptr = self;
691  
            svc_.post(&conn_);
691  
            svc_.post(&conn_);
692  
            svc_.work_finished();
692  
            svc_.work_finished();
693  
        }
693  
        }
694  
        if (rd_claimed)
694  
        if (rd_claimed)
695  
        {
695  
        {
696  
            rd_.impl_ptr = self;
696  
            rd_.impl_ptr = self;
697  
            svc_.post(&rd_);
697  
            svc_.post(&rd_);
698  
            svc_.work_finished();
698  
            svc_.work_finished();
699  
        }
699  
        }
700  
        if (wr_claimed)
700  
        if (wr_claimed)
701  
        {
701  
        {
702  
            wr_.impl_ptr = self;
702  
            wr_.impl_ptr = self;
703  
            svc_.post(&wr_);
703  
            svc_.post(&wr_);
704  
            svc_.work_finished();
704  
            svc_.work_finished();
705  
        }
705  
        }
706  

706  

707  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
707  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708  
            desc_state_.impl_ref_ = self;
708  
            desc_state_.impl_ref_ = self;
709  
    }
709  
    }
710  

710  

711  
    if (fd_ >= 0)
711  
    if (fd_ >= 0)
712  
    {
712  
    {
713  
        if (desc_state_.registered_events != 0)
713  
        if (desc_state_.registered_events != 0)
714  
            svc_.scheduler().deregister_descriptor(fd_);
714  
            svc_.scheduler().deregister_descriptor(fd_);
715  
        ::close(fd_);
715  
        ::close(fd_);
716  
        fd_ = -1;
716  
        fd_ = -1;
717  
    }
717  
    }
718  

718  

719  
    desc_state_.fd                = -1;
719  
    desc_state_.fd                = -1;
720  
    desc_state_.registered_events = 0;
720  
    desc_state_.registered_events = 0;
721  

721  

722  
    local_endpoint_  = endpoint{};
722  
    local_endpoint_  = endpoint{};
723  
    remote_endpoint_ = endpoint{};
723  
    remote_endpoint_ = endpoint{};
724  
}
724  
}
725  

725  

726  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
726  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727  
    : state_(
727  
    : state_(
728  
          std::make_unique<epoll_socket_state>(
728  
          std::make_unique<epoll_socket_state>(
729  
              ctx.use_service<epoll_scheduler>()))
729  
              ctx.use_service<epoll_scheduler>()))
730  
{
730  
{
731  
}
731  
}
732  

732  

733  
inline epoll_socket_service::~epoll_socket_service() {}
733  
inline epoll_socket_service::~epoll_socket_service() {}
734  

734  

735  
inline void
735  
inline void
736  
epoll_socket_service::shutdown()
736  
epoll_socket_service::shutdown()
737  
{
737  
{
738  
    std::lock_guard lock(state_->mutex_);
738  
    std::lock_guard lock(state_->mutex_);
739  

739  

740  
    while (auto* impl = state_->socket_list_.pop_front())
740  
    while (auto* impl = state_->socket_list_.pop_front())
741  
        impl->close_socket();
741  
        impl->close_socket();
742  

742  

743  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
743  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744  
    // drains completed_ops_, calling destroy() on each queued op. If we
744  
    // drains completed_ops_, calling destroy() on each queued op. If we
745  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
745  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
746  
    // last ref to an impl whose embedded descriptor_state is still linked
746  
    // last ref to an impl whose embedded descriptor_state is still linked
747  
    // in the queue — use-after-free on the next pop(). Letting ~state_
747  
    // in the queue — use-after-free on the next pop(). Letting ~state_
748  
    // release the ptrs (during service destruction, after scheduler
748  
    // release the ptrs (during service destruction, after scheduler
749  
    // shutdown) keeps every impl alive until all ops have been drained.
749  
    // shutdown) keeps every impl alive until all ops have been drained.
750  
}
750  
}
751  

751  

752  
inline io_object::implementation*
752  
inline io_object::implementation*
753  
epoll_socket_service::construct()
753  
epoll_socket_service::construct()
754  
{
754  
{
755  
    auto impl = std::make_shared<epoll_socket>(*this);
755  
    auto impl = std::make_shared<epoll_socket>(*this);
756  
    auto* raw = impl.get();
756  
    auto* raw = impl.get();
757  

757  

758  
    {
758  
    {
759  
        std::lock_guard lock(state_->mutex_);
759  
        std::lock_guard lock(state_->mutex_);
760  
        state_->socket_list_.push_back(raw);
760  
        state_->socket_list_.push_back(raw);
761  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
761  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
762  
    }
762  
    }
763  

763  

764  
    return raw;
764  
    return raw;
765  
}
765  
}
766  

766  

767  
inline void
767  
inline void
768  
epoll_socket_service::destroy(io_object::implementation* impl)
768  
epoll_socket_service::destroy(io_object::implementation* impl)
769  
{
769  
{
770  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
770  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
771  
    epoll_impl->close_socket();
771  
    epoll_impl->close_socket();
772  
    std::lock_guard lock(state_->mutex_);
772  
    std::lock_guard lock(state_->mutex_);
773  
    state_->socket_list_.remove(epoll_impl);
773  
    state_->socket_list_.remove(epoll_impl);
774  
    state_->socket_ptrs_.erase(epoll_impl);
774  
    state_->socket_ptrs_.erase(epoll_impl);
775  
}
775  
}
776  

776  

777  
inline std::error_code
777  
inline std::error_code
778  
epoll_socket_service::open_socket(
778  
epoll_socket_service::open_socket(
779  
    tcp_socket::implementation& impl, int family, int type, int protocol)
779  
    tcp_socket::implementation& impl, int family, int type, int protocol)
780  
{
780  
{
781  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
781  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782  
    epoll_impl->close_socket();
782  
    epoll_impl->close_socket();
783  

783  

784  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
784  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785  
    if (fd < 0)
785  
    if (fd < 0)
786  
        return make_err(errno);
786  
        return make_err(errno);
787  

787  

788  
    if (family == AF_INET6)
788  
    if (family == AF_INET6)
789  
    {
789  
    {
790  
        int one = 1;
790  
        int one = 1;
791  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
791  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792  
    }
792  
    }
793  

793  

794  
    epoll_impl->fd_ = fd;
794  
    epoll_impl->fd_ = fd;
795  

795  

796  
    // Register fd with epoll (edge-triggered mode)
796  
    // Register fd with epoll (edge-triggered mode)
797  
    epoll_impl->desc_state_.fd = fd;
797  
    epoll_impl->desc_state_.fd = fd;
798  
    {
798  
    {
799  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
799  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
800  
        epoll_impl->desc_state_.read_op    = nullptr;
800  
        epoll_impl->desc_state_.read_op    = nullptr;
801  
        epoll_impl->desc_state_.write_op   = nullptr;
801  
        epoll_impl->desc_state_.write_op   = nullptr;
802  
        epoll_impl->desc_state_.connect_op = nullptr;
802  
        epoll_impl->desc_state_.connect_op = nullptr;
803  
    }
803  
    }
804  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
804  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805  

805  

806  
    return {};
806  
    return {};
807  
}
807  
}
808  

808  

809  
inline void
809  
inline void
810  
epoll_socket_service::close(io_object::handle& h)
810  
epoll_socket_service::close(io_object::handle& h)
811  
{
811  
{
812  
    static_cast<epoll_socket*>(h.get())->close_socket();
812  
    static_cast<epoll_socket*>(h.get())->close_socket();
813  
}
813  
}
814  

814  

815  
inline void
815  
inline void
816  
epoll_socket_service::post(epoll_op* op)
816  
epoll_socket_service::post(epoll_op* op)
817  
{
817  
{
818  
    state_->sched_.post(op);
818  
    state_->sched_.post(op);
819  
}
819  
}
820  

820  

821  
inline void
821  
inline void
822  
epoll_socket_service::work_started() noexcept
822  
epoll_socket_service::work_started() noexcept
823  
{
823  
{
824  
    state_->sched_.work_started();
824  
    state_->sched_.work_started();
825  
}
825  
}
826  

826  

827  
inline void
827  
inline void
828  
epoll_socket_service::work_finished() noexcept
828  
epoll_socket_service::work_finished() noexcept
829  
{
829  
{
830  
    state_->sched_.work_finished();
830  
    state_->sched_.work_finished();
831  
}
831  
}
832  

832  

833  
} // namespace boost::corosio::detail
833  
} // namespace boost::corosio::detail
834  

834  

835  
#endif // BOOST_COROSIO_HAS_EPOLL
835  
#endif // BOOST_COROSIO_HAS_EPOLL
836  

836  

837  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
837  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP