diff --git a/lib/runtime/src/fs.rs b/lib/runtime/src/fs.rs index be7edcf..8b8514d 100644 --- a/lib/runtime/src/fs.rs +++ b/lib/runtime/src/fs.rs @@ -414,7 +414,6 @@ pub async fn metadata(path: impl AsRef) -> io::Result { pub async fn create_dir(path: impl AsRef) -> io::Result<()> { sys_fs::create_dir(FsOp::CreateDir { path: path.as_ref().to_path_buf(), - recursive: false, mode: 0o777, }) .await diff --git a/lib/runtime/src/op/fs.rs b/lib/runtime/src/op/fs.rs index 78264a5..ee16070 100644 --- a/lib/runtime/src/op/fs.rs +++ b/lib/runtime/src/op/fs.rs @@ -83,7 +83,6 @@ pub enum FsOp { }, CreateDir { path: PathBuf, - recursive: bool, mode: u32, }, RemoveFile { diff --git a/lib/runtime/src/platform/linux_x86_64/driver.rs b/lib/runtime/src/platform/linux_x86_64/driver.rs index e320238..344ae9a 100644 --- a/lib/runtime/src/platform/linux_x86_64/driver.rs +++ b/lib/runtime/src/platform/linux_x86_64/driver.rs @@ -24,6 +24,10 @@ enum CompletionKind { NotifySend = 3, Operation = 4, OperationCancel = 5, + /// CQE produced by the IORING_OP_LINK_TIMEOUT SQE that accompanies a + /// linked operation. We register it so the token range is claimed, but + /// the completion itself carries no useful information and is discarded. + LinkedTimeout = 6, } type CompletionHandler = Box; @@ -224,6 +228,49 @@ impl Driver { Ok(token) } + /// Submits a main operation linked to a timeout. + /// + /// Internally two SQEs are enqueued atomically: the main op (with + /// `IOSQE_IO_LINK`) and an `IORING_OP_LINK_TIMEOUT` SQE. If `timeout` + /// elapses before the main op completes, the kernel cancels the main op and + /// its CQE will carry `-ECANCELED`. The timeout's own CQE is silently + /// discarded in `process_cqe`. + /// + /// Returns the token for the main operation, which can be used with + /// `cancel_operation` to cancel it early. + pub(crate) fn submit_operation_with_linked_timeout( + &self, + fill: impl FnOnce(&mut IoUringSqe), + timeout: Duration, + on_complete: impl FnOnce(IoUringCqe) + Send + 'static, + ) -> io::Result { + let main_token = self.next_token(CompletionKind::Operation); + let timeout_token = self.next_token(CompletionKind::LinkedTimeout); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::ASYNC, + event = "submit_operation_with_linked_timeout", + main_token, + timeout_token, + timeout_ns = timeout.as_nanos() as u64, + "submitting async driver operation with linked timeout" + ); + self.completions + .borrow_mut() + .insert(main_token, Box::new(on_complete)); + + if let Err(error) = + self.ring + .submit_linked_with_timeout(main_token, fill, timeout_token, timeout) + { + let mut completions = self.completions.borrow_mut(); + let _ = completions.remove(&main_token); + return Err(error); + } + + Ok(main_token) + } + pub(crate) fn cancel_operation(&self, token: u64) -> io::Result<()> { #[cfg(debug_assertions)] tracing::trace!( @@ -302,6 +349,7 @@ impl Driver { Some(CompletionKind::TimerRemove) | Some(CompletionKind::NotifySend) | Some(CompletionKind::OperationCancel) + | Some(CompletionKind::LinkedTimeout) | None => {} } } @@ -347,6 +395,7 @@ fn decode_token_kind(token: u64) -> Option { 3 => Some(CompletionKind::NotifySend), 4 => Some(CompletionKind::Operation), 5 => Some(CompletionKind::OperationCancel), + 6 => Some(CompletionKind::LinkedTimeout), _ => None, } } diff --git a/lib/runtime/src/platform/linux_x86_64/uring.rs b/lib/runtime/src/platform/linux_x86_64/uring.rs index f00902d..60a3f6c 100644 --- a/lib/runtime/src/platform/linux_x86_64/uring.rs +++ b/lib/runtime/src/platform/linux_x86_64/uring.rs @@ -17,7 +17,10 @@ const IORING_FEAT_SINGLE_MMAP: u32 = 1 << 0; pub(crate) const IORING_OP_FSYNC: u8 = 3; pub(crate) const IORING_OP_POLL_ADD: u8 = 6; +pub(crate) const IORING_OP_SENDMSG: u8 = 9; +pub(crate) const IORING_OP_RECVMSG: u8 = 10; pub(crate) const IORING_OP_TIMEOUT: u8 = 11; +pub(crate) const IORING_OP_LINK_TIMEOUT: u8 = 15; pub(crate) const IORING_OP_TIMEOUT_REMOVE: u8 = 12; pub(crate) const IORING_OP_ACCEPT: u8 = 13; pub(crate) const IORING_OP_ASYNC_CANCEL: u8 = 14; @@ -43,6 +46,7 @@ const IORING_MSG_DATA: u64 = 0; pub(crate) const IORING_FSYNC_DATASYNC: u32 = 1 << 0; pub(crate) const IORING_TIMEOUT_ABS: u32 = 1 << 0; pub(crate) const IORING_TIMEOUT_UPDATE: u32 = 1 << 1; +pub(crate) const IOSQE_IO_LINK: u8 = 1 << 2; pub(crate) const IOSQE_CQE_SKIP_SUCCESS: u8 = 1 << 6; thread_local! { @@ -328,6 +332,44 @@ impl IoUring { self.submit_pending().map(|_| ()) } + /// Submits a main SQE linked to an `IORING_OP_LINK_TIMEOUT` SQE. + /// + /// The main SQE is submitted with `IOSQE_IO_LINK` set so that if `timeout` + /// elapses before the main op completes, the kernel cancels the main op + /// (`-ECANCELED`) and the timeout CQE arrives with `-ETIME`. If the main op + /// completes first, the timeout CQE arrives with `-ECANCELED`. + /// + /// # Safety of the stack-allocated timespec + /// + /// `KernelTimespec` is stack-allocated and its address is passed to the kernel + /// in the timeout SQE. This is safe for the same reason as `submit_timeout`: + /// both SQEs are flushed via the synchronous `io_uring_enter` call inside + /// `submit_pending` before this function returns. The kernel copies the + /// timespec value at submission time; no lifetime extension beyond this call + /// is required. + pub(crate) fn submit_linked_with_timeout( + &self, + main_token: u64, + fill: impl FnOnce(&mut IoUringSqe), + timeout_token: u64, + timeout: Duration, + ) -> io::Result<()> { + let timespec = duration_to_kernel_timespec(timeout); + self.push_sqe(|sqe| { + fill(sqe); + sqe.flags |= IOSQE_IO_LINK; + sqe.user_data = main_token; + })?; + self.push_sqe(|sqe| { + sqe.opcode = IORING_OP_LINK_TIMEOUT; + sqe.fd = -1; + sqe.addr = (×pec as *const KernelTimespec) as u64; + sqe.len = 1; + sqe.user_data = timeout_token; + })?; + self.submit_pending().map(|_| ()) + } + pub(crate) fn drain_completions(&self, mut f: impl FnMut(IoUringCqe)) -> bool { let mut head = load_u32(self.cq_head); let tail = load_u32(self.cq_tail); diff --git a/lib/runtime/src/sys/linux/fs.rs b/lib/runtime/src/sys/linux/fs.rs index 3c91d46..5837be8 100644 --- a/lib/runtime/src/sys/linux/fs.rs +++ b/lib/runtime/src/sys/linux/fs.rs @@ -213,12 +213,7 @@ pub async fn try_clone(op: FsOp) -> io::Result { } pub async fn create_dir(op: FsOp) -> io::Result<()> { - let FsOp::CreateDir { - path, - recursive: _, - mode, - } = op - else { + let FsOp::CreateDir { path, mode } = op else { unreachable!("create_dir backend called with non-create_dir op"); }; @@ -314,10 +309,15 @@ impl ReadDirStream { let state = Arc::new(ReadDirState::new(current_thread_handle())); let producer = Arc::clone(&state); - thread::Builder::new() + if let Err(error) = thread::Builder::new() .name("ruin-runtime-read-dir".into()) .spawn(move || produce_dir_entries(path, producer)) - .map_err(io::Error::other)?; + { + // Spawn failed: the producer thread will never call finish(), so + // pending_ops would leak without an explicit decrement here. + state.release_pending(); + return Err(io::Error::other(error)); + } Ok(Self { state }) } diff --git a/lib/runtime/src/sys/linux/net.rs b/lib/runtime/src/sys/linux/net.rs index df81b87..930dca8 100644 --- a/lib/runtime/src/sys/linux/net.rs +++ b/lib/runtime/src/sys/linux/net.rs @@ -1,5 +1,6 @@ //! Linux networking backend. +use std::cell::Cell; use std::ffi::c_void; use std::future::Future; use std::io; @@ -12,12 +13,20 @@ use std::pin::Pin; use std::thread; use std::time::Duration; +thread_local! { + // None = untested, Some(true) = io_uring works, Some(false) = use offload. + // After the first successful IORING_OP_SEND, the clone in send() is skipped + // for all subsequent calls on the same thread. + static SEND_URING_SUPPORTED: Cell> = const { Cell::new(None) }; +} + use crate::op::completion::completion_for_current_thread; use crate::op::net::{AcceptedSocket, NetOp, ReceivedDatagram}; use crate::platform::linux_x86_64::runtime::with_current_driver; use crate::platform::linux_x86_64::uring::{ IORING_OP_ACCEPT, IORING_OP_BIND, IORING_OP_CLOSE, IORING_OP_CONNECT, IORING_OP_LISTEN, - IORING_OP_RECV, IORING_OP_SEND, IORING_OP_SHUTDOWN, IORING_OP_SOCKET, IoUringCqe, IoUringSqe, + IORING_OP_RECV, IORING_OP_RECVMSG, IORING_OP_SEND, IORING_OP_SENDMSG, IORING_OP_SHUTDOWN, + IORING_OP_SOCKET, IoUringCqe, IoUringSqe, }; const DEFAULT_LISTENER_BACKLOG: i32 = 1024; @@ -39,7 +48,7 @@ pub fn execution_path(op: &NetOp) -> ExecutionPath { | NetOp::Recv { .. } | NetOp::Shutdown { .. } | NetOp::Close { .. } => ExecutionPath::IoUring, - NetOp::SendTo { .. } | NetOp::RecvFrom { .. } => ExecutionPath::Offload, + NetOp::SendTo { .. } | NetOp::RecvFrom { .. } => ExecutionPath::IoUring, } } @@ -212,6 +221,32 @@ pub async fn send(op: NetOp) -> io::Result { unreachable!("send backend called with non-send op"); }; + // Capability known: io_uring SEND is not supported — skip to offload directly. + if SEND_URING_SUPPORTED.with(|c| c.get()) == Some(false) { + return offload(move || send_sync(fd, data, flags)).await; + } + + // Capability known: io_uring SEND works — submit without a fallback clone. + if SEND_URING_SUPPORTED.with(|c| c.get()) == Some(true) { + let data_ptr = data.as_ptr(); + let data_len = data.len(); + return submit_uring::( + move |sqe| { + sqe.opcode = IORING_OP_SEND; + sqe.fd = fd; + sqe.addr = data_ptr as u64; + sqe.len = data_len as u32; + sqe.op_flags = flags as u32; + }, + move |cqe| { + let _data = data; + cqe_to_result(cqe).map(|written| written as usize) + }, + ) + .await; + } + + // Capability unknown: probe with a one-time clone. Cache the result. let fallback_data = data.clone(); let data_ptr = data.as_ptr(); let data_len = data.len(); @@ -231,9 +266,15 @@ pub async fn send(op: NetOp) -> io::Result { .await { Err(error) if should_fallback_to_offload(&error) => { + SEND_URING_SUPPORTED.with(|c| c.set(Some(false))); offload(move || send_sync(fd, fallback_data, flags)).await } - result => result, + result => { + if result.is_ok() { + SEND_URING_SUPPORTED.with(|c| c.set(Some(true))); + } + result + } } } @@ -248,8 +289,36 @@ pub async fn send_to(op: NetOp) -> io::Result { unreachable!("send_to backend called with non-send_to op"); }; - let raw_addr = RawSocketAddr::from_socket_addr(target); - offload(move || send_to_sync(fd, data, raw_addr, flags)).await + let raw_addr = Box::new(RawSocketAddr::from_socket_addr(target)); + let mut iov = Box::new(libc::iovec { + iov_base: data.as_ptr() as *mut c_void, + iov_len: data.len(), + }); + let mut msg = Box::new(unsafe { std::mem::zeroed::() }); + msg.msg_name = raw_addr.as_ptr() as *mut c_void; + msg.msg_namelen = raw_addr.len(); + msg.msg_iov = iov.as_mut() as *mut libc::iovec; + msg.msg_iovlen = 1; + let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64; + let iov = SendIovec(iov); + let msg = SendMsghdr(msg); + + submit_uring::( + move |sqe| { + sqe.opcode = IORING_OP_SENDMSG; + sqe.fd = fd; + sqe.addr = msg_ptr; + sqe.op_flags = flags as u32; + }, + move |cqe| { + let _raw_addr = raw_addr; + let _iov = iov; + let _msg = msg; + let _data = data; + cqe_to_result(cqe).map(|written| written as usize) + }, + ) + .await } pub async fn recv(op: NetOp) -> io::Result> { @@ -288,7 +357,46 @@ pub async fn recv_from(op: NetOp) -> io::Result { unreachable!("recv_from backend called with non-recv_from op"); }; - offload(move || recv_from_sync(fd, len, flags)).await + let mut data = vec![0u8; len]; + let mut storage = Box::new(MaybeUninit::::zeroed()); + let mut iov = Box::new(libc::iovec { + iov_base: data.as_mut_ptr() as *mut c_void, + iov_len: data.len(), + }); + let mut msg = Box::new(unsafe { std::mem::zeroed::() }); + msg.msg_name = storage.as_mut_ptr() as *mut c_void; + msg.msg_namelen = std::mem::size_of::() as libc::socklen_t; + msg.msg_iov = iov.as_mut() as *mut libc::iovec; + msg.msg_iovlen = 1; + let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64; + let iov = SendIovec(iov); + let msg = SendMsghdr(msg); + + match submit_uring::( + move |sqe| { + sqe.opcode = IORING_OP_RECVMSG; + sqe.fd = fd; + sqe.addr = msg_ptr; + sqe.op_flags = flags as u32; + }, + move |cqe| { + let _iov = iov; + let addr_len = msg.0.msg_namelen; + drop(msg); + let read = cqe_to_result(cqe)? as usize; + data.truncate(read); + let storage = unsafe { storage.assume_init() }; + let peer_addr = socket_addr_from_storage(&storage, addr_len)?; + Ok(ReceivedDatagram { data, peer_addr }) + }, + ) + .await + { + Err(error) if should_fallback_to_offload(&error) => { + offload(move || recv_from_sync(fd, len, flags)).await + } + result => result, + } } pub async fn shutdown(op: NetOp) -> io::Result<()> { @@ -408,10 +516,24 @@ pub async fn recv_timeout( flags: i32, timeout: Duration, ) -> io::Result> { - offload(move || { - wait_socket(fd, libc::POLLIN, timeout)?; - recv_sync(fd, len, flags) - }) + let mut buffer = vec![0u8; len]; + let buffer_ptr = buffer.as_mut_ptr(); + let buffer_len = buffer.len(); + submit_uring_with_linked_timeout::, _>( + move |sqe| { + sqe.opcode = IORING_OP_RECV; + sqe.fd = fd; + sqe.addr = buffer_ptr as u64; + sqe.len = buffer_len as u32; + sqe.op_flags = flags as u32; + }, + timeout, + move |cqe| { + let read = cqe_to_timed_result(cqe)? as usize; + buffer.truncate(read); + Ok(buffer) + }, + ) .await } @@ -421,10 +543,22 @@ pub async fn send_timeout( flags: i32, timeout: Duration, ) -> io::Result { - offload(move || { - wait_socket(fd, libc::POLLOUT, timeout)?; - send_sync(fd, data, flags) - }) + let data_ptr = data.as_ptr(); + let data_len = data.len(); + submit_uring_with_linked_timeout::( + move |sqe| { + sqe.opcode = IORING_OP_SEND; + sqe.fd = fd; + sqe.addr = data_ptr as u64; + sqe.len = data_len as u32; + sqe.op_flags = flags as u32; + }, + timeout, + move |cqe| { + let _data = data; + cqe_to_timed_result(cqe).map(|written| written as usize) + }, + ) .await } @@ -434,10 +568,40 @@ pub async fn recv_from_timeout( flags: i32, timeout: Duration, ) -> io::Result { - offload(move || { - wait_socket(fd, libc::POLLIN, timeout)?; - recv_from_sync(fd, len, flags) - }) + let mut data = vec![0u8; len]; + let mut storage = Box::new(MaybeUninit::::zeroed()); + let mut iov = Box::new(libc::iovec { + iov_base: data.as_mut_ptr() as *mut c_void, + iov_len: data.len(), + }); + let mut msg = Box::new(unsafe { std::mem::zeroed::() }); + msg.msg_name = storage.as_mut_ptr() as *mut c_void; + msg.msg_namelen = std::mem::size_of::() as libc::socklen_t; + msg.msg_iov = iov.as_mut() as *mut libc::iovec; + msg.msg_iovlen = 1; + let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64; + let iov = SendIovec(iov); + let msg = SendMsghdr(msg); + + submit_uring_with_linked_timeout::( + move |sqe| { + sqe.opcode = IORING_OP_RECVMSG; + sqe.fd = fd; + sqe.addr = msg_ptr; + sqe.op_flags = flags as u32; + }, + timeout, + move |cqe| { + let _iov = iov; + let addr_len = msg.0.msg_namelen; + drop(msg); + let read = cqe_to_timed_result(cqe)? as usize; + data.truncate(read); + let storage = unsafe { storage.assume_init() }; + let peer_addr = socket_addr_from_storage(&storage, addr_len)?; + Ok(ReceivedDatagram { data, peer_addr }) + }, + ) .await } @@ -448,15 +612,69 @@ pub async fn send_to_timeout( flags: i32, timeout: Duration, ) -> io::Result { - offload(move || { - wait_socket(fd, libc::POLLOUT, timeout)?; - send_to_sync(fd, data, RawSocketAddr::from_socket_addr(target), flags) - }) + let raw_addr = Box::new(RawSocketAddr::from_socket_addr(target)); + let mut iov = Box::new(libc::iovec { + iov_base: data.as_ptr() as *mut c_void, + iov_len: data.len(), + }); + let mut msg = Box::new(unsafe { std::mem::zeroed::() }); + msg.msg_name = raw_addr.as_ptr() as *mut c_void; + msg.msg_namelen = raw_addr.len(); + msg.msg_iov = iov.as_mut() as *mut libc::iovec; + msg.msg_iovlen = 1; + let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64; + let iov = SendIovec(iov); + let msg = SendMsghdr(msg); + + submit_uring_with_linked_timeout::( + move |sqe| { + sqe.opcode = IORING_OP_SENDMSG; + sqe.fd = fd; + sqe.addr = msg_ptr; + sqe.op_flags = flags as u32; + }, + timeout, + move |cqe| { + let _raw_addr = raw_addr; + let _iov = iov; + let _msg = msg; + let _data = data; + cqe_to_timed_result(cqe).map(|written| written as usize) + }, + ) .await } pub async fn connect_stream_timeout(addr: SocketAddr, timeout: Duration) -> io::Result { - offload(move || connect_stream_timeout_sync(addr, timeout)).await + let socket = socket(NetOp::Socket { + domain: socket_domain(addr), + socket_type: libc::SOCK_STREAM, + protocol: 0, + flags: libc::SOCK_CLOEXEC as u32, + }) + .await?; + + let fd = socket.as_raw_fd(); + let raw_addr = RawSocketAddr::from_socket_addr(addr); + let addr_ptr = raw_addr.as_ptr(); + let addr_len = raw_addr.len(); + + submit_uring_with_linked_timeout::<(), _>( + move |sqe| { + sqe.opcode = IORING_OP_CONNECT; + sqe.fd = fd; + sqe.addr = addr_ptr as u64; + sqe.off = addr_len as u64; + }, + timeout, + move |cqe| { + let _raw_addr = raw_addr; + cqe_to_timed_result(cqe).map(|_| ()) + }, + ) + .await?; + + Ok(socket) } pub fn local_addr(fd: RawFd) -> io::Result { @@ -569,6 +787,38 @@ where future.await } +/// Like [`submit_uring`] but pairs the main SQE with an `IORING_OP_LINK_TIMEOUT`. +/// +/// If the timeout elapses before the main op completes, the completion callback +/// receives a CQE with `res = -ECANCELED`. Callers should map `-ECANCELED` to +/// `io::ErrorKind::TimedOut`. +async fn submit_uring_with_linked_timeout( + fill: impl FnOnce(&mut IoUringSqe), + timeout: Duration, + map: M, +) -> io::Result +where + M: FnOnce(IoUringCqe) -> io::Result + Send + 'static, +{ + let (future, handle) = completion_for_current_thread::>(); + let callback_handle = handle.clone(); + let token = with_current_driver(|driver| { + driver.submit_operation_with_linked_timeout( + fill, + timeout, + move |cqe| { + callback_handle.complete(map(cqe)); + }, + ) + })?; + + handle.set_cancel(move || { + let _ = with_current_driver(|driver| driver.cancel_operation(token)); + }); + + future.await +} + async fn offload( task: impl FnOnce() -> io::Result + Send + 'static, ) -> io::Result { @@ -762,6 +1012,17 @@ fn cqe_to_result(cqe: IoUringCqe) -> io::Result { } } +/// Like [`cqe_to_result`] but maps `-ECANCELED` (timeout fired) to `TimedOut`. +fn cqe_to_timed_result(cqe: IoUringCqe) -> io::Result { + if cqe.res == -libc::ECANCELED { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "socket operation timed out", + )); + } + cqe_to_result(cqe) +} + fn cvt(value: libc::c_int) -> io::Result { if value == -1 { Err(io::Error::last_os_error()) @@ -818,19 +1079,6 @@ fn send_sync(fd: RawFd, data: Vec, flags: i32) -> io::Result { cvt_long(written).map(|written| written as usize) } -fn send_to_sync(fd: RawFd, data: Vec, target: RawSocketAddr, flags: i32) -> io::Result { - let written = unsafe { - libc::sendto( - fd, - data.as_ptr().cast::(), - data.len(), - flags, - target.as_ptr(), - target.len(), - ) - }; - cvt_long(written).map(|written| written as usize) -} fn recv_sync(fd: RawFd, len: usize, flags: i32) -> io::Result> { let mut buffer = vec![0; len]; @@ -879,91 +1127,21 @@ fn close_sync(fd: RawFd) -> io::Result<()> { cvt(unsafe { libc::close(fd) }).map(|_| ()) } -fn connect_stream_timeout_sync(addr: SocketAddr, timeout: Duration) -> io::Result { - let fd = cvt(unsafe { - libc::socket( - socket_domain(addr), - libc::SOCK_STREAM | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, - 0, - ) - })?; - let raw_addr = RawSocketAddr::from_socket_addr(addr); - let connect_result = unsafe { libc::connect(fd, raw_addr.as_ptr(), raw_addr.len()) }; - if connect_result == 0 { - set_nonblocking(fd, false)?; - return Ok(unsafe { OwnedFd::from_raw_fd(fd) }); - } - let error = io::Error::last_os_error(); - if error.raw_os_error() != Some(libc::EINPROGRESS) { - let _ = close_sync(fd); - return Err(error); - } +/// Wrapper making `Box` sendable across the async CQE boundary. +/// +/// Safety: `iov_base` points into a `Vec` that is owned by the same +/// closure, so the pointer is valid until the CQE fires and the closure drops. +struct SendIovec(#[allow(dead_code)] Box); +unsafe impl Send for SendIovec {} - let completion = wait_socket(fd, libc::POLLOUT, timeout) - .and_then(|_| getsockopt_int(fd, libc::SOL_SOCKET, libc::SO_ERROR)); - match completion { - Ok(0) => { - set_nonblocking(fd, false)?; - Ok(unsafe { OwnedFd::from_raw_fd(fd) }) - } - Ok(code) => { - let _ = close_sync(fd); - Err(io::Error::from_raw_os_error(code)) - } - Err(error) => { - let _ = close_sync(fd); - Err(error) - } - } -} - -fn set_nonblocking(fd: RawFd, enabled: bool) -> io::Result<()> { - let flags = cvt(unsafe { libc::fcntl(fd, libc::F_GETFL) })?; - let new_flags = if enabled { - flags | libc::O_NONBLOCK - } else { - flags & !libc::O_NONBLOCK - }; - cvt(unsafe { libc::fcntl(fd, libc::F_SETFL, new_flags) }).map(|_| ()) -} - -fn wait_socket(fd: RawFd, events: i16, timeout: Duration) -> io::Result<()> { - let timeout_ms = timeout - .as_millis() - .min(i32::MAX as u128) - .try_into() - .unwrap_or(i32::MAX); - - loop { - let mut poll_fd = libc::pollfd { - fd, - events, - revents: 0, - }; - let result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) }; - if result == 0 { - return Err(io::Error::new( - io::ErrorKind::TimedOut, - "socket operation timed out", - )); - } - if result < 0 { - let error = io::Error::last_os_error(); - if error.kind() == io::ErrorKind::Interrupted { - continue; - } - return Err(error); - } - if poll_fd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 { - let socket_error = getsockopt_int(fd, libc::SOL_SOCKET, libc::SO_ERROR).unwrap_or(0); - if socket_error != 0 { - return Err(io::Error::from_raw_os_error(socket_error)); - } - } - return Ok(()); - } -} +/// Wrapper making `Box` sendable across the async CQE boundary. +/// +/// Safety: all raw pointers inside the `msghdr` (`msg_name`, `msg_iov`) point +/// into heap storage owned by the same closure, so they are valid until the +/// CQE fires and the closure drops. +struct SendMsghdr(Box); +unsafe impl Send for SendMsghdr {} fn cvt_long(value: libc::ssize_t) -> io::Result { if value == -1 {