include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp

90.3% Lines (131/145) 100.0% List of functions (16/16)
posix_stream_file_service.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::posix_stream_file_service::posix_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :36 1018x 100.0% 88.0% boost::corosio::detail::posix_stream_file_service::~posix_stream_file_service() :43 2036x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::construct() :48 43x 100.0% 71.0% boost::corosio::detail::posix_stream_file_service::destroy(boost::corosio::io_object::implementation*) :62 43x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::close(boost::corosio::io_object::handle&) :70 74x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :80 34x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::shutdown() :90 1018x 62.5% 70.0% boost::corosio::detail::posix_stream_file_service::destroy_impl(boost::corosio::detail::posix_stream_file&) :102 43x 100.0% 67.0% boost::corosio::detail::posix_stream_file_service::post(boost::corosio::detail::scheduler_op*) :109 19x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::pool() :124 19x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::get_or_create_pool(boost::capy::execution_context&) :130 1018x 80.0% 67.0% boost::corosio::detail::get_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :148 1018x 100.0% 100.0% boost::corosio::detail::posix_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :158 11x 89.3% 83.0% boost::corosio::detail::posix_stream_file::do_read_work(boost::corosio::detail::pool_work_item*) :208 10x 88.2% 83.0% boost::corosio::detail::posix_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :242 10x 89.3% 83.0% boost::corosio::detail::posix_stream_file::do_write_work(boost::corosio::detail::pool_work_item*) :292 9x 88.2% 83.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
19 #include <boost/corosio/detail/file_service.hpp>
20 #include <boost/corosio/detail/thread_pool.hpp>
21
22 #include <mutex>
23 #include <unordered_map>
24
25 namespace boost::corosio::detail {
26
27 /** Stream file service for POSIX backends.
28
29 Owns all posix_stream_file instances. Thread lifecycle is
30 managed by the thread_pool service (shared with resolver).
31 */
32 class BOOST_COROSIO_DECL posix_stream_file_service final
33 : public file_service
34 {
35 public:
36 1018x posix_stream_file_service(
37 capy::execution_context& ctx, scheduler& sched)
38 2036x : sched_(&sched)
39 1018x , pool_(get_or_create_pool(ctx))
40 {
41 1018x }
42
43 2036x ~posix_stream_file_service() override = default;
44
45 posix_stream_file_service(posix_stream_file_service const&) = delete;
46 posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
47
48 43x io_object::implementation* construct() override
49 {
50 43x auto ptr = std::make_shared<posix_stream_file>(*this);
51 43x auto* impl = ptr.get();
52
53 {
54 43x std::lock_guard<std::mutex> lock(mutex_);
55 43x file_list_.push_back(impl);
56 43x file_ptrs_[impl] = std::move(ptr);
57 43x }
58
59 43x return impl;
60 43x }
61
62 43x void destroy(io_object::implementation* p) override
63 {
64 43x auto& impl = static_cast<posix_stream_file&>(*p);
65 43x impl.cancel();
66 43x impl.close_file();
67 43x destroy_impl(impl);
68 43x }
69
70 74x void close(io_object::handle& h) override
71 {
72 74x if (h.get())
73 {
74 74x auto& impl = static_cast<posix_stream_file&>(*h.get());
75 74x impl.cancel();
76 74x impl.close_file();
77 }
78 74x }
79
80 34x std::error_code open_file(
81 stream_file::implementation& impl,
82 std::filesystem::path const& path,
83 file_base::flags mode) override
84 {
85 34x if (static_cast<reactor_scheduler const*>(sched_)->is_single_threaded())
86 1x return std::make_error_code(std::errc::operation_not_supported);
87 33x return static_cast<posix_stream_file&>(impl).open_file(path, mode);
88 }
89
90 1018x void shutdown() override
91 {
92 1018x std::lock_guard<std::mutex> lock(mutex_);
93 1018x for (auto* impl = file_list_.pop_front(); impl != nullptr;
94 impl = file_list_.pop_front())
95 {
96 impl->cancel();
97 impl->close_file();
98 }
99 1018x file_ptrs_.clear();
100 1018x }
101
102 43x void destroy_impl(posix_stream_file& impl)
103 {
104 43x std::lock_guard<std::mutex> lock(mutex_);
105 43x file_list_.remove(&impl);
106 43x file_ptrs_.erase(&impl);
107 43x }
108
109 19x void post(scheduler_op* op)
110 {
111 19x sched_->post(op);
112 19x }
113
114 void work_started() noexcept
115 {
116 sched_->work_started();
117 }
118
119 void work_finished() noexcept
120 {
121 sched_->work_finished();
122 }
123
124 19x thread_pool& pool() noexcept
125 {
126 19x return pool_;
127 }
128
129 private:
130 1018x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
131 {
132 1018x auto* p = ctx.find_service<thread_pool>();
133 1018x if (p)
134 1018x return *p;
135 return ctx.make_service<thread_pool>();
136 }
137
138 scheduler* sched_;
139 thread_pool& pool_;
140 std::mutex mutex_;
141 intrusive_list<posix_stream_file> file_list_;
142 std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
143 file_ptrs_;
144 };
145
146 /** Get or create the stream file service for the given context. */
147 inline posix_stream_file_service&
148 1018x get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
149 {
150 1018x return ctx.make_service<posix_stream_file_service>(sched);
151 }
152
153 // ---------------------------------------------------------------------------
154 // posix_stream_file inline implementations (require complete service type)
155 // ---------------------------------------------------------------------------
156
157 inline std::coroutine_handle<>
158 11x posix_stream_file::read_some(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 buffer_param param,
162 std::stop_token token,
163 std::error_code* ec,
164 std::size_t* bytes_out)
165 {
166 11x auto& op = read_op_;
167 11x op.reset();
168 11x op.is_read = true;
169
170 11x capy::mutable_buffer bufs[max_buffers];
171 11x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
172
173 11x if (op.iovec_count == 0)
174 {
175 1x *ec = {};
176 1x *bytes_out = 0;
177 1x op.cont_op.cont.h = h;
178 1x return dispatch_coro(ex, op.cont_op.cont);
179 }
180
181 20x for (int i = 0; i < op.iovec_count; ++i)
182 {
183 10x op.iovecs[i].iov_base = bufs[i].data();
184 10x op.iovecs[i].iov_len = bufs[i].size();
185 }
186
187 10x op.h = h;
188 10x op.ex = ex;
189 10x op.ec_out = ec;
190 10x op.bytes_out = bytes_out;
191 10x op.start(token);
192
193 10x op.ex.on_work_started();
194
195 10x read_pool_op_.file_ = this;
196 10x read_pool_op_.ref_ = this->shared_from_this();
197 10x read_pool_op_.func_ = &posix_stream_file::do_read_work;
198 10x if (!svc_.pool().post(&read_pool_op_))
199 {
200 op.impl_ref = std::move(read_pool_op_.ref_);
201 op.cancelled.store(true, std::memory_order_release);
202 svc_.post(&read_op_);
203 }
204 10x return std::noop_coroutine();
205 }
206
207 inline void
208 10x posix_stream_file::do_read_work(pool_work_item* w) noexcept
209 {
210 10x auto* pw = static_cast<pool_op*>(w);
211 10x auto* self = pw->file_;
212 10x auto& op = self->read_op_;
213
214 10x if (!op.cancelled.load(std::memory_order_acquire))
215 {
216 ssize_t n;
217 do
218 {
219 18x n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
220 9x static_cast<off_t>(self->offset_));
221 }
222 9x while (n < 0 && errno == EINTR);
223
224 9x if (n >= 0)
225 {
226 9x op.errn = 0;
227 9x op.bytes_transferred = static_cast<std::size_t>(n);
228 9x self->offset_ += static_cast<std::uint64_t>(n);
229 }
230 else
231 {
232 op.errn = errno;
233 op.bytes_transferred = 0;
234 }
235 }
236
237 10x op.impl_ref = std::move(pw->ref_);
238 10x self->svc_.post(&op);
239 10x }
240
241 inline std::coroutine_handle<>
242 10x posix_stream_file::write_some(
243 std::coroutine_handle<> h,
244 capy::executor_ref ex,
245 buffer_param param,
246 std::stop_token token,
247 std::error_code* ec,
248 std::size_t* bytes_out)
249 {
250 10x auto& op = write_op_;
251 10x op.reset();
252 10x op.is_read = false;
253
254 10x capy::mutable_buffer bufs[max_buffers];
255 10x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
256
257 10x if (op.iovec_count == 0)
258 {
259 1x *ec = {};
260 1x *bytes_out = 0;
261 1x op.cont_op.cont.h = h;
262 1x return dispatch_coro(ex, op.cont_op.cont);
263 }
264
265 18x for (int i = 0; i < op.iovec_count; ++i)
266 {
267 9x op.iovecs[i].iov_base = bufs[i].data();
268 9x op.iovecs[i].iov_len = bufs[i].size();
269 }
270
271 9x op.h = h;
272 9x op.ex = ex;
273 9x op.ec_out = ec;
274 9x op.bytes_out = bytes_out;
275 9x op.start(token);
276
277 9x op.ex.on_work_started();
278
279 9x write_pool_op_.file_ = this;
280 9x write_pool_op_.ref_ = this->shared_from_this();
281 9x write_pool_op_.func_ = &posix_stream_file::do_write_work;
282 9x if (!svc_.pool().post(&write_pool_op_))
283 {
284 op.impl_ref = std::move(write_pool_op_.ref_);
285 op.cancelled.store(true, std::memory_order_release);
286 svc_.post(&write_op_);
287 }
288 9x return std::noop_coroutine();
289 }
290
291 inline void
292 9x posix_stream_file::do_write_work(pool_work_item* w) noexcept
293 {
294 9x auto* pw = static_cast<pool_op*>(w);
295 9x auto* self = pw->file_;
296 9x auto& op = self->write_op_;
297
298 9x if (!op.cancelled.load(std::memory_order_acquire))
299 {
300 ssize_t n;
301 do
302 {
303 18x n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
304 9x static_cast<off_t>(self->offset_));
305 }
306 9x while (n < 0 && errno == EINTR);
307
308 9x if (n >= 0)
309 {
310 9x op.errn = 0;
311 9x op.bytes_transferred = static_cast<std::size_t>(n);
312 9x self->offset_ += static_cast<std::uint64_t>(n);
313 }
314 else
315 {
316 op.errn = errno;
317 op.bytes_transferred = 0;
318 }
319 }
320
321 9x op.impl_ref = std::move(pw->ref_);
322 9x self->svc_.post(&op);
323 9x }
324
325 } // namespace boost::corosio::detail
326
327 #endif // BOOST_COROSIO_POSIX
328
329 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
330