Claude net/fs review
This commit is contained in:
@@ -414,7 +414,6 @@ pub async fn metadata(path: impl AsRef<Path>) -> io::Result<Metadata> {
|
||||
pub async fn create_dir(path: impl AsRef<Path>) -> io::Result<()> {
|
||||
sys_fs::create_dir(FsOp::CreateDir {
|
||||
path: path.as_ref().to_path_buf(),
|
||||
recursive: false,
|
||||
mode: 0o777,
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -83,7 +83,6 @@ pub enum FsOp {
|
||||
},
|
||||
CreateDir {
|
||||
path: PathBuf,
|
||||
recursive: bool,
|
||||
mode: u32,
|
||||
},
|
||||
RemoveFile {
|
||||
|
||||
@@ -24,6 +24,10 @@ enum CompletionKind {
|
||||
NotifySend = 3,
|
||||
Operation = 4,
|
||||
OperationCancel = 5,
|
||||
/// CQE produced by the IORING_OP_LINK_TIMEOUT SQE that accompanies a
|
||||
/// linked operation. We register it so the token range is claimed, but
|
||||
/// the completion itself carries no useful information and is discarded.
|
||||
LinkedTimeout = 6,
|
||||
}
|
||||
|
||||
type CompletionHandler = Box<dyn FnOnce(IoUringCqe) + Send + 'static>;
|
||||
@@ -224,6 +228,49 @@ impl Driver {
|
||||
Ok(token)
|
||||
}
|
||||
|
||||
/// Submits a main operation linked to a timeout.
|
||||
///
|
||||
/// Internally two SQEs are enqueued atomically: the main op (with
|
||||
/// `IOSQE_IO_LINK`) and an `IORING_OP_LINK_TIMEOUT` SQE. If `timeout`
|
||||
/// elapses before the main op completes, the kernel cancels the main op and
|
||||
/// its CQE will carry `-ECANCELED`. The timeout's own CQE is silently
|
||||
/// discarded in `process_cqe`.
|
||||
///
|
||||
/// Returns the token for the main operation, which can be used with
|
||||
/// `cancel_operation` to cancel it early.
|
||||
pub(crate) fn submit_operation_with_linked_timeout(
|
||||
&self,
|
||||
fill: impl FnOnce(&mut IoUringSqe),
|
||||
timeout: Duration,
|
||||
on_complete: impl FnOnce(IoUringCqe) + Send + 'static,
|
||||
) -> io::Result<u64> {
|
||||
let main_token = self.next_token(CompletionKind::Operation);
|
||||
let timeout_token = self.next_token(CompletionKind::LinkedTimeout);
|
||||
#[cfg(debug_assertions)]
|
||||
tracing::trace!(
|
||||
target: trace_targets::ASYNC,
|
||||
event = "submit_operation_with_linked_timeout",
|
||||
main_token,
|
||||
timeout_token,
|
||||
timeout_ns = timeout.as_nanos() as u64,
|
||||
"submitting async driver operation with linked timeout"
|
||||
);
|
||||
self.completions
|
||||
.borrow_mut()
|
||||
.insert(main_token, Box::new(on_complete));
|
||||
|
||||
if let Err(error) =
|
||||
self.ring
|
||||
.submit_linked_with_timeout(main_token, fill, timeout_token, timeout)
|
||||
{
|
||||
let mut completions = self.completions.borrow_mut();
|
||||
let _ = completions.remove(&main_token);
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
Ok(main_token)
|
||||
}
|
||||
|
||||
pub(crate) fn cancel_operation(&self, token: u64) -> io::Result<()> {
|
||||
#[cfg(debug_assertions)]
|
||||
tracing::trace!(
|
||||
@@ -302,6 +349,7 @@ impl Driver {
|
||||
Some(CompletionKind::TimerRemove)
|
||||
| Some(CompletionKind::NotifySend)
|
||||
| Some(CompletionKind::OperationCancel)
|
||||
| Some(CompletionKind::LinkedTimeout)
|
||||
| None => {}
|
||||
}
|
||||
}
|
||||
@@ -347,6 +395,7 @@ fn decode_token_kind(token: u64) -> Option<CompletionKind> {
|
||||
3 => Some(CompletionKind::NotifySend),
|
||||
4 => Some(CompletionKind::Operation),
|
||||
5 => Some(CompletionKind::OperationCancel),
|
||||
6 => Some(CompletionKind::LinkedTimeout),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,10 @@ const IORING_FEAT_SINGLE_MMAP: u32 = 1 << 0;
|
||||
|
||||
pub(crate) const IORING_OP_FSYNC: u8 = 3;
|
||||
pub(crate) const IORING_OP_POLL_ADD: u8 = 6;
|
||||
pub(crate) const IORING_OP_SENDMSG: u8 = 9;
|
||||
pub(crate) const IORING_OP_RECVMSG: u8 = 10;
|
||||
pub(crate) const IORING_OP_TIMEOUT: u8 = 11;
|
||||
pub(crate) const IORING_OP_LINK_TIMEOUT: u8 = 15;
|
||||
pub(crate) const IORING_OP_TIMEOUT_REMOVE: u8 = 12;
|
||||
pub(crate) const IORING_OP_ACCEPT: u8 = 13;
|
||||
pub(crate) const IORING_OP_ASYNC_CANCEL: u8 = 14;
|
||||
@@ -43,6 +46,7 @@ const IORING_MSG_DATA: u64 = 0;
|
||||
pub(crate) const IORING_FSYNC_DATASYNC: u32 = 1 << 0;
|
||||
pub(crate) const IORING_TIMEOUT_ABS: u32 = 1 << 0;
|
||||
pub(crate) const IORING_TIMEOUT_UPDATE: u32 = 1 << 1;
|
||||
pub(crate) const IOSQE_IO_LINK: u8 = 1 << 2;
|
||||
pub(crate) const IOSQE_CQE_SKIP_SUCCESS: u8 = 1 << 6;
|
||||
|
||||
thread_local! {
|
||||
@@ -328,6 +332,44 @@ impl IoUring {
|
||||
self.submit_pending().map(|_| ())
|
||||
}
|
||||
|
||||
/// Submits a main SQE linked to an `IORING_OP_LINK_TIMEOUT` SQE.
|
||||
///
|
||||
/// The main SQE is submitted with `IOSQE_IO_LINK` set so that if `timeout`
|
||||
/// elapses before the main op completes, the kernel cancels the main op
|
||||
/// (`-ECANCELED`) and the timeout CQE arrives with `-ETIME`. If the main op
|
||||
/// completes first, the timeout CQE arrives with `-ECANCELED`.
|
||||
///
|
||||
/// # Safety of the stack-allocated timespec
|
||||
///
|
||||
/// `KernelTimespec` is stack-allocated and its address is passed to the kernel
|
||||
/// in the timeout SQE. This is safe for the same reason as `submit_timeout`:
|
||||
/// both SQEs are flushed via the synchronous `io_uring_enter` call inside
|
||||
/// `submit_pending` before this function returns. The kernel copies the
|
||||
/// timespec value at submission time; no lifetime extension beyond this call
|
||||
/// is required.
|
||||
pub(crate) fn submit_linked_with_timeout(
|
||||
&self,
|
||||
main_token: u64,
|
||||
fill: impl FnOnce(&mut IoUringSqe),
|
||||
timeout_token: u64,
|
||||
timeout: Duration,
|
||||
) -> io::Result<()> {
|
||||
let timespec = duration_to_kernel_timespec(timeout);
|
||||
self.push_sqe(|sqe| {
|
||||
fill(sqe);
|
||||
sqe.flags |= IOSQE_IO_LINK;
|
||||
sqe.user_data = main_token;
|
||||
})?;
|
||||
self.push_sqe(|sqe| {
|
||||
sqe.opcode = IORING_OP_LINK_TIMEOUT;
|
||||
sqe.fd = -1;
|
||||
sqe.addr = (×pec as *const KernelTimespec) as u64;
|
||||
sqe.len = 1;
|
||||
sqe.user_data = timeout_token;
|
||||
})?;
|
||||
self.submit_pending().map(|_| ())
|
||||
}
|
||||
|
||||
pub(crate) fn drain_completions(&self, mut f: impl FnMut(IoUringCqe)) -> bool {
|
||||
let mut head = load_u32(self.cq_head);
|
||||
let tail = load_u32(self.cq_tail);
|
||||
|
||||
@@ -213,12 +213,7 @@ pub async fn try_clone(op: FsOp) -> io::Result<OwnedFd> {
|
||||
}
|
||||
|
||||
pub async fn create_dir(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::CreateDir {
|
||||
path,
|
||||
recursive: _,
|
||||
mode,
|
||||
} = op
|
||||
else {
|
||||
let FsOp::CreateDir { path, mode } = op else {
|
||||
unreachable!("create_dir backend called with non-create_dir op");
|
||||
};
|
||||
|
||||
@@ -314,10 +309,15 @@ impl ReadDirStream {
|
||||
let state = Arc::new(ReadDirState::new(current_thread_handle()));
|
||||
let producer = Arc::clone(&state);
|
||||
|
||||
thread::Builder::new()
|
||||
if let Err(error) = thread::Builder::new()
|
||||
.name("ruin-runtime-read-dir".into())
|
||||
.spawn(move || produce_dir_entries(path, producer))
|
||||
.map_err(io::Error::other)?;
|
||||
{
|
||||
// Spawn failed: the producer thread will never call finish(), so
|
||||
// pending_ops would leak without an explicit decrement here.
|
||||
state.release_pending();
|
||||
return Err(io::Error::other(error));
|
||||
}
|
||||
|
||||
Ok(Self { state })
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Linux networking backend.
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::ffi::c_void;
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
@@ -12,12 +13,20 @@ use std::pin::Pin;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
thread_local! {
|
||||
// None = untested, Some(true) = io_uring works, Some(false) = use offload.
|
||||
// After the first successful IORING_OP_SEND, the clone in send() is skipped
|
||||
// for all subsequent calls on the same thread.
|
||||
static SEND_URING_SUPPORTED: Cell<Option<bool>> = const { Cell::new(None) };
|
||||
}
|
||||
|
||||
use crate::op::completion::completion_for_current_thread;
|
||||
use crate::op::net::{AcceptedSocket, NetOp, ReceivedDatagram};
|
||||
use crate::platform::linux_x86_64::runtime::with_current_driver;
|
||||
use crate::platform::linux_x86_64::uring::{
|
||||
IORING_OP_ACCEPT, IORING_OP_BIND, IORING_OP_CLOSE, IORING_OP_CONNECT, IORING_OP_LISTEN,
|
||||
IORING_OP_RECV, IORING_OP_SEND, IORING_OP_SHUTDOWN, IORING_OP_SOCKET, IoUringCqe, IoUringSqe,
|
||||
IORING_OP_RECV, IORING_OP_RECVMSG, IORING_OP_SEND, IORING_OP_SENDMSG, IORING_OP_SHUTDOWN,
|
||||
IORING_OP_SOCKET, IoUringCqe, IoUringSqe,
|
||||
};
|
||||
|
||||
const DEFAULT_LISTENER_BACKLOG: i32 = 1024;
|
||||
@@ -39,7 +48,7 @@ pub fn execution_path(op: &NetOp) -> ExecutionPath {
|
||||
| NetOp::Recv { .. }
|
||||
| NetOp::Shutdown { .. }
|
||||
| NetOp::Close { .. } => ExecutionPath::IoUring,
|
||||
NetOp::SendTo { .. } | NetOp::RecvFrom { .. } => ExecutionPath::Offload,
|
||||
NetOp::SendTo { .. } | NetOp::RecvFrom { .. } => ExecutionPath::IoUring,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,6 +221,32 @@ pub async fn send(op: NetOp) -> io::Result<usize> {
|
||||
unreachable!("send backend called with non-send op");
|
||||
};
|
||||
|
||||
// Capability known: io_uring SEND is not supported — skip to offload directly.
|
||||
if SEND_URING_SUPPORTED.with(|c| c.get()) == Some(false) {
|
||||
return offload(move || send_sync(fd, data, flags)).await;
|
||||
}
|
||||
|
||||
// Capability known: io_uring SEND works — submit without a fallback clone.
|
||||
if SEND_URING_SUPPORTED.with(|c| c.get()) == Some(true) {
|
||||
let data_ptr = data.as_ptr();
|
||||
let data_len = data.len();
|
||||
return submit_uring::<usize, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_SEND;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = data_ptr as u64;
|
||||
sqe.len = data_len as u32;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _data = data;
|
||||
cqe_to_result(cqe).map(|written| written as usize)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Capability unknown: probe with a one-time clone. Cache the result.
|
||||
let fallback_data = data.clone();
|
||||
let data_ptr = data.as_ptr();
|
||||
let data_len = data.len();
|
||||
@@ -231,9 +266,15 @@ pub async fn send(op: NetOp) -> io::Result<usize> {
|
||||
.await
|
||||
{
|
||||
Err(error) if should_fallback_to_offload(&error) => {
|
||||
SEND_URING_SUPPORTED.with(|c| c.set(Some(false)));
|
||||
offload(move || send_sync(fd, fallback_data, flags)).await
|
||||
}
|
||||
result => result,
|
||||
result => {
|
||||
if result.is_ok() {
|
||||
SEND_URING_SUPPORTED.with(|c| c.set(Some(true)));
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,8 +289,36 @@ pub async fn send_to(op: NetOp) -> io::Result<usize> {
|
||||
unreachable!("send_to backend called with non-send_to op");
|
||||
};
|
||||
|
||||
let raw_addr = RawSocketAddr::from_socket_addr(target);
|
||||
offload(move || send_to_sync(fd, data, raw_addr, flags)).await
|
||||
let raw_addr = Box::new(RawSocketAddr::from_socket_addr(target));
|
||||
let mut iov = Box::new(libc::iovec {
|
||||
iov_base: data.as_ptr() as *mut c_void,
|
||||
iov_len: data.len(),
|
||||
});
|
||||
let mut msg = Box::new(unsafe { std::mem::zeroed::<libc::msghdr>() });
|
||||
msg.msg_name = raw_addr.as_ptr() as *mut c_void;
|
||||
msg.msg_namelen = raw_addr.len();
|
||||
msg.msg_iov = iov.as_mut() as *mut libc::iovec;
|
||||
msg.msg_iovlen = 1;
|
||||
let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64;
|
||||
let iov = SendIovec(iov);
|
||||
let msg = SendMsghdr(msg);
|
||||
|
||||
submit_uring::<usize, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_SENDMSG;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = msg_ptr;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _raw_addr = raw_addr;
|
||||
let _iov = iov;
|
||||
let _msg = msg;
|
||||
let _data = data;
|
||||
cqe_to_result(cqe).map(|written| written as usize)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn recv(op: NetOp) -> io::Result<Vec<u8>> {
|
||||
@@ -288,7 +357,46 @@ pub async fn recv_from(op: NetOp) -> io::Result<ReceivedDatagram> {
|
||||
unreachable!("recv_from backend called with non-recv_from op");
|
||||
};
|
||||
|
||||
let mut data = vec![0u8; len];
|
||||
let mut storage = Box::new(MaybeUninit::<libc::sockaddr_storage>::zeroed());
|
||||
let mut iov = Box::new(libc::iovec {
|
||||
iov_base: data.as_mut_ptr() as *mut c_void,
|
||||
iov_len: data.len(),
|
||||
});
|
||||
let mut msg = Box::new(unsafe { std::mem::zeroed::<libc::msghdr>() });
|
||||
msg.msg_name = storage.as_mut_ptr() as *mut c_void;
|
||||
msg.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
|
||||
msg.msg_iov = iov.as_mut() as *mut libc::iovec;
|
||||
msg.msg_iovlen = 1;
|
||||
let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64;
|
||||
let iov = SendIovec(iov);
|
||||
let msg = SendMsghdr(msg);
|
||||
|
||||
match submit_uring::<ReceivedDatagram, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_RECVMSG;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = msg_ptr;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _iov = iov;
|
||||
let addr_len = msg.0.msg_namelen;
|
||||
drop(msg);
|
||||
let read = cqe_to_result(cqe)? as usize;
|
||||
data.truncate(read);
|
||||
let storage = unsafe { storage.assume_init() };
|
||||
let peer_addr = socket_addr_from_storage(&storage, addr_len)?;
|
||||
Ok(ReceivedDatagram { data, peer_addr })
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(error) if should_fallback_to_offload(&error) => {
|
||||
offload(move || recv_from_sync(fd, len, flags)).await
|
||||
}
|
||||
result => result,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(op: NetOp) -> io::Result<()> {
|
||||
@@ -408,10 +516,24 @@ pub async fn recv_timeout(
|
||||
flags: i32,
|
||||
timeout: Duration,
|
||||
) -> io::Result<Vec<u8>> {
|
||||
offload(move || {
|
||||
wait_socket(fd, libc::POLLIN, timeout)?;
|
||||
recv_sync(fd, len, flags)
|
||||
})
|
||||
let mut buffer = vec![0u8; len];
|
||||
let buffer_ptr = buffer.as_mut_ptr();
|
||||
let buffer_len = buffer.len();
|
||||
submit_uring_with_linked_timeout::<Vec<u8>, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_RECV;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = buffer_ptr as u64;
|
||||
sqe.len = buffer_len as u32;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
timeout,
|
||||
move |cqe| {
|
||||
let read = cqe_to_timed_result(cqe)? as usize;
|
||||
buffer.truncate(read);
|
||||
Ok(buffer)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -421,10 +543,22 @@ pub async fn send_timeout(
|
||||
flags: i32,
|
||||
timeout: Duration,
|
||||
) -> io::Result<usize> {
|
||||
offload(move || {
|
||||
wait_socket(fd, libc::POLLOUT, timeout)?;
|
||||
send_sync(fd, data, flags)
|
||||
})
|
||||
let data_ptr = data.as_ptr();
|
||||
let data_len = data.len();
|
||||
submit_uring_with_linked_timeout::<usize, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_SEND;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = data_ptr as u64;
|
||||
sqe.len = data_len as u32;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
timeout,
|
||||
move |cqe| {
|
||||
let _data = data;
|
||||
cqe_to_timed_result(cqe).map(|written| written as usize)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -434,10 +568,40 @@ pub async fn recv_from_timeout(
|
||||
flags: i32,
|
||||
timeout: Duration,
|
||||
) -> io::Result<ReceivedDatagram> {
|
||||
offload(move || {
|
||||
wait_socket(fd, libc::POLLIN, timeout)?;
|
||||
recv_from_sync(fd, len, flags)
|
||||
})
|
||||
let mut data = vec![0u8; len];
|
||||
let mut storage = Box::new(MaybeUninit::<libc::sockaddr_storage>::zeroed());
|
||||
let mut iov = Box::new(libc::iovec {
|
||||
iov_base: data.as_mut_ptr() as *mut c_void,
|
||||
iov_len: data.len(),
|
||||
});
|
||||
let mut msg = Box::new(unsafe { std::mem::zeroed::<libc::msghdr>() });
|
||||
msg.msg_name = storage.as_mut_ptr() as *mut c_void;
|
||||
msg.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
|
||||
msg.msg_iov = iov.as_mut() as *mut libc::iovec;
|
||||
msg.msg_iovlen = 1;
|
||||
let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64;
|
||||
let iov = SendIovec(iov);
|
||||
let msg = SendMsghdr(msg);
|
||||
|
||||
submit_uring_with_linked_timeout::<ReceivedDatagram, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_RECVMSG;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = msg_ptr;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
timeout,
|
||||
move |cqe| {
|
||||
let _iov = iov;
|
||||
let addr_len = msg.0.msg_namelen;
|
||||
drop(msg);
|
||||
let read = cqe_to_timed_result(cqe)? as usize;
|
||||
data.truncate(read);
|
||||
let storage = unsafe { storage.assume_init() };
|
||||
let peer_addr = socket_addr_from_storage(&storage, addr_len)?;
|
||||
Ok(ReceivedDatagram { data, peer_addr })
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -448,15 +612,69 @@ pub async fn send_to_timeout(
|
||||
flags: i32,
|
||||
timeout: Duration,
|
||||
) -> io::Result<usize> {
|
||||
offload(move || {
|
||||
wait_socket(fd, libc::POLLOUT, timeout)?;
|
||||
send_to_sync(fd, data, RawSocketAddr::from_socket_addr(target), flags)
|
||||
})
|
||||
let raw_addr = Box::new(RawSocketAddr::from_socket_addr(target));
|
||||
let mut iov = Box::new(libc::iovec {
|
||||
iov_base: data.as_ptr() as *mut c_void,
|
||||
iov_len: data.len(),
|
||||
});
|
||||
let mut msg = Box::new(unsafe { std::mem::zeroed::<libc::msghdr>() });
|
||||
msg.msg_name = raw_addr.as_ptr() as *mut c_void;
|
||||
msg.msg_namelen = raw_addr.len();
|
||||
msg.msg_iov = iov.as_mut() as *mut libc::iovec;
|
||||
msg.msg_iovlen = 1;
|
||||
let msg_ptr = msg.as_mut() as *mut libc::msghdr as u64;
|
||||
let iov = SendIovec(iov);
|
||||
let msg = SendMsghdr(msg);
|
||||
|
||||
submit_uring_with_linked_timeout::<usize, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_SENDMSG;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = msg_ptr;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
timeout,
|
||||
move |cqe| {
|
||||
let _raw_addr = raw_addr;
|
||||
let _iov = iov;
|
||||
let _msg = msg;
|
||||
let _data = data;
|
||||
cqe_to_timed_result(cqe).map(|written| written as usize)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn connect_stream_timeout(addr: SocketAddr, timeout: Duration) -> io::Result<OwnedFd> {
|
||||
offload(move || connect_stream_timeout_sync(addr, timeout)).await
|
||||
let socket = socket(NetOp::Socket {
|
||||
domain: socket_domain(addr),
|
||||
socket_type: libc::SOCK_STREAM,
|
||||
protocol: 0,
|
||||
flags: libc::SOCK_CLOEXEC as u32,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let fd = socket.as_raw_fd();
|
||||
let raw_addr = RawSocketAddr::from_socket_addr(addr);
|
||||
let addr_ptr = raw_addr.as_ptr();
|
||||
let addr_len = raw_addr.len();
|
||||
|
||||
submit_uring_with_linked_timeout::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_CONNECT;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = addr_ptr as u64;
|
||||
sqe.off = addr_len as u64;
|
||||
},
|
||||
timeout,
|
||||
move |cqe| {
|
||||
let _raw_addr = raw_addr;
|
||||
cqe_to_timed_result(cqe).map(|_| ())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(socket)
|
||||
}
|
||||
|
||||
pub fn local_addr(fd: RawFd) -> io::Result<SocketAddr> {
|
||||
@@ -569,6 +787,38 @@ where
|
||||
future.await
|
||||
}
|
||||
|
||||
/// Like [`submit_uring`] but pairs the main SQE with an `IORING_OP_LINK_TIMEOUT`.
|
||||
///
|
||||
/// If the timeout elapses before the main op completes, the completion callback
|
||||
/// receives a CQE with `res = -ECANCELED`. Callers should map `-ECANCELED` to
|
||||
/// `io::ErrorKind::TimedOut`.
|
||||
async fn submit_uring_with_linked_timeout<T: Send + 'static, M>(
|
||||
fill: impl FnOnce(&mut IoUringSqe),
|
||||
timeout: Duration,
|
||||
map: M,
|
||||
) -> io::Result<T>
|
||||
where
|
||||
M: FnOnce(IoUringCqe) -> io::Result<T> + Send + 'static,
|
||||
{
|
||||
let (future, handle) = completion_for_current_thread::<io::Result<T>>();
|
||||
let callback_handle = handle.clone();
|
||||
let token = with_current_driver(|driver| {
|
||||
driver.submit_operation_with_linked_timeout(
|
||||
fill,
|
||||
timeout,
|
||||
move |cqe| {
|
||||
callback_handle.complete(map(cqe));
|
||||
},
|
||||
)
|
||||
})?;
|
||||
|
||||
handle.set_cancel(move || {
|
||||
let _ = with_current_driver(|driver| driver.cancel_operation(token));
|
||||
});
|
||||
|
||||
future.await
|
||||
}
|
||||
|
||||
async fn offload<T: Send + 'static>(
|
||||
task: impl FnOnce() -> io::Result<T> + Send + 'static,
|
||||
) -> io::Result<T> {
|
||||
@@ -762,6 +1012,17 @@ fn cqe_to_result(cqe: IoUringCqe) -> io::Result<i32> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`cqe_to_result`] but maps `-ECANCELED` (timeout fired) to `TimedOut`.
|
||||
fn cqe_to_timed_result(cqe: IoUringCqe) -> io::Result<i32> {
|
||||
if cqe.res == -libc::ECANCELED {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"socket operation timed out",
|
||||
));
|
||||
}
|
||||
cqe_to_result(cqe)
|
||||
}
|
||||
|
||||
fn cvt(value: libc::c_int) -> io::Result<libc::c_int> {
|
||||
if value == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
@@ -818,19 +1079,6 @@ fn send_sync(fd: RawFd, data: Vec<u8>, flags: i32) -> io::Result<usize> {
|
||||
cvt_long(written).map(|written| written as usize)
|
||||
}
|
||||
|
||||
fn send_to_sync(fd: RawFd, data: Vec<u8>, target: RawSocketAddr, flags: i32) -> io::Result<usize> {
|
||||
let written = unsafe {
|
||||
libc::sendto(
|
||||
fd,
|
||||
data.as_ptr().cast::<c_void>(),
|
||||
data.len(),
|
||||
flags,
|
||||
target.as_ptr(),
|
||||
target.len(),
|
||||
)
|
||||
};
|
||||
cvt_long(written).map(|written| written as usize)
|
||||
}
|
||||
|
||||
fn recv_sync(fd: RawFd, len: usize, flags: i32) -> io::Result<Vec<u8>> {
|
||||
let mut buffer = vec![0; len];
|
||||
@@ -879,91 +1127,21 @@ fn close_sync(fd: RawFd) -> io::Result<()> {
|
||||
cvt(unsafe { libc::close(fd) }).map(|_| ())
|
||||
}
|
||||
|
||||
fn connect_stream_timeout_sync(addr: SocketAddr, timeout: Duration) -> io::Result<OwnedFd> {
|
||||
let fd = cvt(unsafe {
|
||||
libc::socket(
|
||||
socket_domain(addr),
|
||||
libc::SOCK_STREAM | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
|
||||
0,
|
||||
)
|
||||
})?;
|
||||
let raw_addr = RawSocketAddr::from_socket_addr(addr);
|
||||
let connect_result = unsafe { libc::connect(fd, raw_addr.as_ptr(), raw_addr.len()) };
|
||||
if connect_result == 0 {
|
||||
set_nonblocking(fd, false)?;
|
||||
return Ok(unsafe { OwnedFd::from_raw_fd(fd) });
|
||||
}
|
||||
|
||||
let error = io::Error::last_os_error();
|
||||
if error.raw_os_error() != Some(libc::EINPROGRESS) {
|
||||
let _ = close_sync(fd);
|
||||
return Err(error);
|
||||
}
|
||||
/// Wrapper making `Box<libc::iovec>` sendable across the async CQE boundary.
|
||||
///
|
||||
/// Safety: `iov_base` points into a `Vec<u8>` that is owned by the same
|
||||
/// closure, so the pointer is valid until the CQE fires and the closure drops.
|
||||
struct SendIovec(#[allow(dead_code)] Box<libc::iovec>);
|
||||
unsafe impl Send for SendIovec {}
|
||||
|
||||
let completion = wait_socket(fd, libc::POLLOUT, timeout)
|
||||
.and_then(|_| getsockopt_int(fd, libc::SOL_SOCKET, libc::SO_ERROR));
|
||||
match completion {
|
||||
Ok(0) => {
|
||||
set_nonblocking(fd, false)?;
|
||||
Ok(unsafe { OwnedFd::from_raw_fd(fd) })
|
||||
}
|
||||
Ok(code) => {
|
||||
let _ = close_sync(fd);
|
||||
Err(io::Error::from_raw_os_error(code))
|
||||
}
|
||||
Err(error) => {
|
||||
let _ = close_sync(fd);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_nonblocking(fd: RawFd, enabled: bool) -> io::Result<()> {
|
||||
let flags = cvt(unsafe { libc::fcntl(fd, libc::F_GETFL) })?;
|
||||
let new_flags = if enabled {
|
||||
flags | libc::O_NONBLOCK
|
||||
} else {
|
||||
flags & !libc::O_NONBLOCK
|
||||
};
|
||||
cvt(unsafe { libc::fcntl(fd, libc::F_SETFL, new_flags) }).map(|_| ())
|
||||
}
|
||||
|
||||
fn wait_socket(fd: RawFd, events: i16, timeout: Duration) -> io::Result<()> {
|
||||
let timeout_ms = timeout
|
||||
.as_millis()
|
||||
.min(i32::MAX as u128)
|
||||
.try_into()
|
||||
.unwrap_or(i32::MAX);
|
||||
|
||||
loop {
|
||||
let mut poll_fd = libc::pollfd {
|
||||
fd,
|
||||
events,
|
||||
revents: 0,
|
||||
};
|
||||
let result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
|
||||
if result == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"socket operation timed out",
|
||||
));
|
||||
}
|
||||
if result < 0 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
if poll_fd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 {
|
||||
let socket_error = getsockopt_int(fd, libc::SOL_SOCKET, libc::SO_ERROR).unwrap_or(0);
|
||||
if socket_error != 0 {
|
||||
return Err(io::Error::from_raw_os_error(socket_error));
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
/// Wrapper making `Box<libc::msghdr>` sendable across the async CQE boundary.
|
||||
///
|
||||
/// Safety: all raw pointers inside the `msghdr` (`msg_name`, `msg_iov`) point
|
||||
/// into heap storage owned by the same closure, so they are valid until the
|
||||
/// CQE fires and the closure drops.
|
||||
struct SendMsghdr(Box<libc::msghdr>);
|
||||
unsafe impl Send for SendMsghdr {}
|
||||
|
||||
fn cvt_long(value: libc::ssize_t) -> io::Result<libc::ssize_t> {
|
||||
if value == -1 {
|
||||
|
||||
Reference in New Issue
Block a user