Text, many performance improvements
This commit is contained in:
105
lib/runtime/src/fd.rs
Normal file
105
lib/runtime/src/fd.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! File-descriptor readiness helpers backed by the runtime driver.
|
||||
|
||||
use std::io;
|
||||
use std::os::fd::RawFd;
|
||||
|
||||
use crate::op::completion::completion_for_current_thread;
|
||||
use crate::platform::linux_x86_64::runtime::with_current_driver;
|
||||
use crate::platform::linux_x86_64::uring::{IORING_OP_POLL_ADD, IoUringCqe};
|
||||
|
||||
/// Waits until `fd` becomes readable or reports an error/hangup condition.
|
||||
pub async fn wait_readable(fd: RawFd) -> io::Result<()> {
|
||||
submit_poll(fd, libc::POLLIN | libc::POLLERR | libc::POLLHUP).await
|
||||
}
|
||||
|
||||
async fn submit_poll(fd: RawFd, mask: i16) -> io::Result<()> {
|
||||
let (future, handle) = completion_for_current_thread::<io::Result<()>>();
|
||||
let callback_handle = handle.clone();
|
||||
let token = with_current_driver(|driver| {
|
||||
driver.submit_operation(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_POLL_ADD;
|
||||
sqe.fd = fd;
|
||||
sqe.len = 0;
|
||||
sqe.op_flags = mask as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
callback_handle.complete(cqe_to_result(cqe));
|
||||
},
|
||||
)
|
||||
})?;
|
||||
|
||||
handle.set_cancel(move || {
|
||||
let _ = with_current_driver(|driver| driver.cancel_operation(token));
|
||||
});
|
||||
|
||||
future.await
|
||||
}
|
||||
|
||||
fn cqe_to_result(cqe: IoUringCqe) -> io::Result<()> {
|
||||
if cqe.res < 0 {
|
||||
return Err(io::Error::from_raw_os_error(-cqe.res));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::wait_readable;
|
||||
use crate::{queue_future, queue_task, run};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
#[test]
|
||||
fn wait_readable_resolves_for_pipe() {
|
||||
let mut fds = [0; 2];
|
||||
let result = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
|
||||
assert_eq!(result, 0, "pipe2 should succeed");
|
||||
let read_fd = fds[0];
|
||||
let write_fd = fds[1];
|
||||
|
||||
let observed = Arc::new(AtomicBool::new(false));
|
||||
queue_task({
|
||||
let observed = Arc::clone(&observed);
|
||||
move || {
|
||||
queue_future(async move {
|
||||
wait_readable(read_fd)
|
||||
.await
|
||||
.expect("pipe read end should become readable");
|
||||
observed.store(true, Ordering::SeqCst);
|
||||
|
||||
let mut byte = 0u8;
|
||||
let read = unsafe {
|
||||
libc::read(
|
||||
read_fd,
|
||||
&mut byte as *mut u8 as *mut libc::c_void,
|
||||
std::mem::size_of::<u8>(),
|
||||
)
|
||||
};
|
||||
assert_eq!(read, 1);
|
||||
unsafe {
|
||||
libc::close(read_fd);
|
||||
}
|
||||
});
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let byte = 1u8;
|
||||
let written = unsafe {
|
||||
libc::write(
|
||||
write_fd,
|
||||
&byte as *const u8 as *const libc::c_void,
|
||||
std::mem::size_of::<u8>(),
|
||||
)
|
||||
};
|
||||
assert_eq!(written, 1);
|
||||
unsafe {
|
||||
libc::close(write_fd);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
run();
|
||||
assert!(observed.load(Ordering::SeqCst));
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,7 @@ pub(crate) mod trace_targets {
|
||||
}
|
||||
|
||||
pub mod channel;
|
||||
pub mod fd;
|
||||
pub mod fs;
|
||||
pub mod net;
|
||||
#[doc(hidden)]
|
||||
|
||||
@@ -16,6 +16,7 @@ const IORING_SETUP_CLAMP: u32 = 1 << 4;
|
||||
const IORING_FEAT_SINGLE_MMAP: u32 = 1 << 0;
|
||||
|
||||
pub(crate) const IORING_OP_FSYNC: u8 = 3;
|
||||
pub(crate) const IORING_OP_POLL_ADD: u8 = 6;
|
||||
pub(crate) const IORING_OP_TIMEOUT: u8 = 11;
|
||||
pub(crate) const IORING_OP_TIMEOUT_REMOVE: u8 = 12;
|
||||
pub(crate) const IORING_OP_ACCEPT: u8 = 13;
|
||||
|
||||
Reference in New Issue
Block a user