src/corosio/src/local_connect_pair.cpp

60.0% Lines (24/40) 100.0% List of functions (5/5)
local_connect_pair.cpp
f(x) Functions (5)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/local_connect_pair.hpp>
11 #include <boost/corosio/detail/platform.hpp>
12 #include <boost/corosio/native/detail/make_err.hpp>
13
14 #include <system_error>
15
16 #if BOOST_COROSIO_POSIX
17 #include <fcntl.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <unistd.h>
21 #elif BOOST_COROSIO_HAS_IOCP
22 #include <boost/corosio/native/detail/endpoint_convert.hpp>
23
24 #include <cstring>
25 #include <filesystem>
26 #include <random>
27 #include <string>
28 #include <thread>
29
30 #ifndef WIN32_LEAN_AND_MEAN
31 #define WIN32_LEAN_AND_MEAN
32 #endif
33 #include <WinSock2.h>
34
35 #ifndef AF_UNIX
36 #define AF_UNIX 1
37 #endif
38 #endif
39
40 namespace boost::corosio {
41
42 namespace {
43
44 #if BOOST_COROSIO_POSIX
45
46 std::error_code
47 48x make_pair_fds(int type, int& a_fd, int& b_fd) noexcept
48 {
49 int fds[2];
50 48x if (::socketpair(AF_UNIX, type, 0, fds) != 0)
51 return detail::make_err(errno);
52
53 // assign() is documented "adopt-only" and will not mutate the fd;
54 // set O_NONBLOCK before transferring ownership.
55 144x for (int i = 0; i < 2; ++i)
56 {
57 96x int flags = ::fcntl(fds[i], F_GETFL, 0);
58 96x if (flags < 0 || ::fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
59 {
60 auto ec = detail::make_err(errno);
61 ::close(fds[0]);
62 ::close(fds[1]);
63 return ec;
64 }
65 }
66
67 48x a_fd = fds[0];
68 48x b_fd = fds[1];
69 48x return {};
70 }
71
72 template<class Socket>
73 std::error_code
74 48x assign_pair(Socket& a, Socket& b, int a_fd, int b_fd) noexcept
75 {
76 try
77 {
78 48x a.assign(a_fd);
79 }
80 catch (std::system_error const& e)
81 {
82 ::close(a_fd);
83 ::close(b_fd);
84 return e.code();
85 }
86
87 try
88 {
89 48x b.assign(b_fd);
90 }
91 catch (std::system_error const& e)
92 {
93 a.close();
94 ::close(b_fd);
95 return e.code();
96 }
97
98 48x return {};
99 }
100
101 #elif BOOST_COROSIO_HAS_IOCP
102
103 // Build a unique sub-directory under temp and return the full socket
104 // path inside it. Empty string on failure.
105 std::string
106 pick_pair_path(std::filesystem::path& dir_out)
107 {
108 namespace fs = std::filesystem;
109
110 thread_local std::mt19937_64 gen{std::random_device{}()};
111
112 for (int attempt = 0; attempt < 16; ++attempt)
113 {
114 auto candidate =
115 fs::temp_directory_path() /
116 ("co_pair_" + std::to_string(gen()));
117 std::error_code ec;
118 if (fs::create_directory(candidate, ec))
119 {
120 dir_out = candidate;
121 return (candidate / "s").string();
122 }
123 }
124 return {};
125 }
126
127 void
128 remove_pair_path(std::filesystem::path const& dir, std::string const& path)
129 {
130 std::error_code ec;
131 std::filesystem::remove(std::filesystem::path(path), ec);
132 std::filesystem::remove(dir, ec);
133 }
134
135 // Synchronously rendezvous two AF_UNIX SOCK_STREAM sockets. The
136 // listener and accept happen on the caller's thread; the connect
137 // runs on a short-lived worker to avoid a deadlock. The returned
138 // sockets are created with WSA_FLAG_OVERLAPPED so they can be
139 // registered with IOCP by assign_socket().
140 std::error_code
141 make_pair_sockets(SOCKET& a_sock, SOCKET& b_sock) noexcept
142 {
143 namespace fs = std::filesystem;
144
145 a_sock = INVALID_SOCKET;
146 b_sock = INVALID_SOCKET;
147
148 fs::path dir;
149 std::string path = pick_pair_path(dir);
150 if (path.empty())
151 return detail::make_err(ERROR_PATH_NOT_FOUND);
152
153 SOCKET listen_sock = ::WSASocketW(
154 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
155 if (listen_sock == INVALID_SOCKET)
156 {
157 auto ec = detail::make_err(::WSAGetLastError());
158 remove_pair_path(dir, path);
159 return ec;
160 }
161
162 detail::un_sa_t addr{};
163 addr.sun_family = AF_UNIX;
164 std::strncpy(
165 addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
166 int addr_len = static_cast<int>(
167 offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
168
169 if (::bind(
170 listen_sock, reinterpret_cast<sockaddr*>(&addr), addr_len)
171 == SOCKET_ERROR)
172 {
173 auto ec = detail::make_err(::WSAGetLastError());
174 ::closesocket(listen_sock);
175 remove_pair_path(dir, path);
176 return ec;
177 }
178
179 if (::listen(listen_sock, 1) == SOCKET_ERROR)
180 {
181 auto ec = detail::make_err(::WSAGetLastError());
182 ::closesocket(listen_sock);
183 remove_pair_path(dir, path);
184 return ec;
185 }
186
187 SOCKET worker_sock = INVALID_SOCKET;
188 std::error_code worker_ec;
189
190 std::thread worker([&] {
191 worker_sock = ::WSASocketW(
192 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
193 if (worker_sock == INVALID_SOCKET)
194 {
195 worker_ec = detail::make_err(::WSAGetLastError());
196 return;
197 }
198
199 detail::un_sa_t caddr{};
200 caddr.sun_family = AF_UNIX;
201 std::strncpy(
202 caddr.sun_path, path.c_str(), sizeof(caddr.sun_path) - 1);
203 int caddr_len = static_cast<int>(
204 offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
205
206 if (::connect(
207 worker_sock, reinterpret_cast<sockaddr*>(&caddr), caddr_len)
208 == SOCKET_ERROR)
209 {
210 worker_ec = detail::make_err(::WSAGetLastError());
211 ::closesocket(worker_sock);
212 worker_sock = INVALID_SOCKET;
213 }
214 });
215
216 SOCKET accept_sock = ::accept(listen_sock, nullptr, nullptr);
217 std::error_code accept_ec;
218 if (accept_sock == INVALID_SOCKET)
219 accept_ec = detail::make_err(::WSAGetLastError());
220
221 worker.join();
222
223 ::closesocket(listen_sock);
224 remove_pair_path(dir, path);
225
226 if (accept_ec)
227 {
228 if (worker_sock != INVALID_SOCKET)
229 ::closesocket(worker_sock);
230 return accept_ec;
231 }
232 if (worker_ec)
233 {
234 ::closesocket(accept_sock);
235 return worker_ec;
236 }
237
238 a_sock = accept_sock;
239 b_sock = worker_sock;
240 return {};
241 }
242
243 std::error_code
244 assign_pair(
245 local_stream_socket& a,
246 local_stream_socket& b,
247 SOCKET a_sock,
248 SOCKET b_sock) noexcept
249 {
250 try
251 {
252 a.assign(static_cast<native_handle_type>(a_sock));
253 }
254 catch (std::system_error const& e)
255 {
256 ::closesocket(a_sock);
257 ::closesocket(b_sock);
258 return e.code();
259 }
260
261 try
262 {
263 b.assign(static_cast<native_handle_type>(b_sock));
264 }
265 catch (std::system_error const& e)
266 {
267 a.close();
268 ::closesocket(b_sock);
269 return e.code();
270 }
271
272 return {};
273 }
274
275 #endif
276
277 } // namespace
278
279 std::error_code
280 22x connect_pair(local_stream_socket& a, local_stream_socket& b) noexcept
281 {
282 22x if (a.is_open() || b.is_open())
283 2x return detail::make_err(
284 #if BOOST_COROSIO_POSIX
285 EISCONN
286 #else
287 WSAEISCONN
288 #endif
289 2x );
290
291 #if BOOST_COROSIO_POSIX
292 20x int a_fd = -1, b_fd = -1;
293 20x if (auto ec = make_pair_fds(SOCK_STREAM, a_fd, b_fd))
294 return ec;
295 20x return assign_pair(a, b, a_fd, b_fd);
296 #elif BOOST_COROSIO_HAS_IOCP
297 SOCKET a_sock = INVALID_SOCKET, b_sock = INVALID_SOCKET;
298 if (auto ec = make_pair_sockets(a_sock, b_sock))
299 return ec;
300 return assign_pair(a, b, a_sock, b_sock);
301 #else
302 return detail::make_err(ENOSYS);
303 #endif
304 }
305
306 #if BOOST_COROSIO_POSIX
307
308 std::error_code
309 28x connect_pair(local_datagram_socket& a, local_datagram_socket& b) noexcept
310 {
311 28x if (a.is_open() || b.is_open())
312 return detail::make_err(EISCONN);
313
314 28x int a_fd = -1, b_fd = -1;
315 28x if (auto ec = make_pair_fds(SOCK_DGRAM, a_fd, b_fd))
316 return ec;
317 28x return assign_pair(a, b, a_fd, b_fd);
318 }
319
320 #endif
321
322 } // namespace boost::corosio
323