Restaged repo, allocator and runtime implemented, ioring-backed async fs/net/channel/timer primitives
This commit is contained in:
586
lib/runtime/src/sys/linux/fs.rs
Normal file
586
lib/runtime/src/sys/linux/fs.rs
Normal file
@@ -0,0 +1,586 @@
|
||||
//! Linux filesystem backend.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::ffi::CString;
|
||||
use std::future::poll_fn;
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::os::fd::{FromRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::thread;
|
||||
|
||||
use crate::op::completion::completion_for_current_thread;
|
||||
use crate::op::fs::{FileType, FsOp, MetadataTarget, OpenOptions, RawDirEntry, RawMetadata};
|
||||
use crate::platform::linux_x86_64::runtime::{
|
||||
ThreadHandle, current_thread_handle, with_current_reactor,
|
||||
};
|
||||
use crate::platform::linux_x86_64::uring::{
|
||||
IORING_FSYNC_DATASYNC, IORING_OP_CLOSE, IORING_OP_FSYNC, IORING_OP_FTRUNCATE,
|
||||
IORING_OP_MKDIRAT, IORING_OP_OPENAT, IORING_OP_READ, IORING_OP_RENAMEAT, IORING_OP_STATX,
|
||||
IORING_OP_UNLINKAT, IORING_OP_WRITE, IoUringCqe,
|
||||
};
|
||||
|
||||
const STATX_BASIC_MASK: u32 =
|
||||
libc::STATX_TYPE | libc::STATX_MODE | libc::STATX_SIZE | libc::STATX_NLINK;
|
||||
const FILE_CURSOR: u64 = u64::MAX;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum ExecutionPath {
|
||||
IoUring,
|
||||
Offload,
|
||||
}
|
||||
|
||||
pub fn execution_path(op: &FsOp) -> ExecutionPath {
|
||||
match op {
|
||||
FsOp::ReadDir { .. } | FsOp::Duplicate { .. } => ExecutionPath::Offload,
|
||||
FsOp::Open { .. }
|
||||
| FsOp::Read { .. }
|
||||
| FsOp::Write { .. }
|
||||
| FsOp::Metadata { .. }
|
||||
| FsOp::SetLen { .. }
|
||||
| FsOp::SyncAll { .. }
|
||||
| FsOp::SyncData { .. }
|
||||
| FsOp::CreateDir { .. }
|
||||
| FsOp::RemoveFile { .. }
|
||||
| FsOp::RemoveDir { .. }
|
||||
| FsOp::Rename { .. }
|
||||
| FsOp::Close { .. } => ExecutionPath::IoUring,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn open(op: FsOp) -> io::Result<OwnedFd> {
|
||||
let FsOp::Open { path, options } = op else {
|
||||
unreachable!("open backend called with non-open op");
|
||||
};
|
||||
|
||||
let path = path_to_c_string(&path)?;
|
||||
let path_ptr = path.as_ptr();
|
||||
let (flags, mode) = open_flags(&options)?;
|
||||
submit_uring::<OwnedFd, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_OPENAT;
|
||||
sqe.fd = libc::AT_FDCWD;
|
||||
sqe.addr = path_ptr as u64;
|
||||
sqe.len = mode;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _path = path;
|
||||
cqe_to_result(cqe).map(|fd| unsafe { OwnedFd::from_raw_fd(fd as RawFd) })
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn read(op: FsOp) -> io::Result<Vec<u8>> {
|
||||
let FsOp::Read { fd, offset, len } = op else {
|
||||
unreachable!("read backend called with non-read op");
|
||||
};
|
||||
|
||||
let mut buffer = vec![0; len];
|
||||
let buffer_ptr = buffer.as_mut_ptr();
|
||||
let buffer_len = buffer.len();
|
||||
submit_uring::<Vec<u8>, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_READ;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = buffer_ptr as u64;
|
||||
sqe.len = buffer_len as u32;
|
||||
sqe.off = offset.unwrap_or(FILE_CURSOR);
|
||||
},
|
||||
move |cqe| {
|
||||
let read = cqe_to_result(cqe)? as usize;
|
||||
buffer.truncate(read);
|
||||
Ok(buffer)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn write(op: FsOp) -> io::Result<usize> {
|
||||
let FsOp::Write { fd, offset, data } = op else {
|
||||
unreachable!("write backend called with non-write op");
|
||||
};
|
||||
let data_ptr = data.as_ptr();
|
||||
let data_len = data.len();
|
||||
|
||||
submit_uring::<usize, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_WRITE;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = data_ptr as u64;
|
||||
sqe.len = data_len as u32;
|
||||
sqe.off = offset.unwrap_or(FILE_CURSOR);
|
||||
},
|
||||
move |cqe| {
|
||||
let _data = data;
|
||||
cqe_to_result(cqe).map(|written| written as usize)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn metadata(op: FsOp) -> io::Result<RawMetadata> {
|
||||
let FsOp::Metadata {
|
||||
target,
|
||||
follow_symlinks,
|
||||
} = op
|
||||
else {
|
||||
unreachable!("metadata backend called with non-metadata op");
|
||||
};
|
||||
|
||||
let mut statx = Box::new(MaybeUninit::<libc::statx>::zeroed());
|
||||
let statx_ptr = statx.as_mut_ptr();
|
||||
let (fd, path, flags) = match target {
|
||||
MetadataTarget::Path(path) => (
|
||||
libc::AT_FDCWD,
|
||||
path_to_c_string(&path)?,
|
||||
metadata_flags(follow_symlinks),
|
||||
),
|
||||
MetadataTarget::File(fd) => (
|
||||
fd,
|
||||
CString::new(Vec::<u8>::new()).expect("empty statx path should be valid"),
|
||||
libc::AT_EMPTY_PATH,
|
||||
),
|
||||
};
|
||||
let path_ptr = path.as_ptr();
|
||||
|
||||
submit_uring::<RawMetadata, _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_STATX;
|
||||
sqe.fd = fd;
|
||||
sqe.addr = path_ptr as u64;
|
||||
sqe.len = STATX_BASIC_MASK;
|
||||
sqe.off = statx_ptr as u64;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _path = path;
|
||||
cqe_to_result(cqe)?;
|
||||
let statx = unsafe { statx.assume_init() };
|
||||
Ok(raw_metadata_from_statx(&statx))
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn sync_all(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::SyncAll { fd } = op else {
|
||||
unreachable!("sync_all backend called with non-sync_all op");
|
||||
};
|
||||
|
||||
submit_sync(fd, 0).await
|
||||
}
|
||||
|
||||
pub async fn sync_data(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::SyncData { fd } = op else {
|
||||
unreachable!("sync_data backend called with non-sync_data op");
|
||||
};
|
||||
|
||||
submit_sync(fd, IORING_FSYNC_DATASYNC).await
|
||||
}
|
||||
|
||||
pub async fn set_len(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::SetLen { fd, len } = op else {
|
||||
unreachable!("set_len backend called with non-set_len op");
|
||||
};
|
||||
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_FTRUNCATE;
|
||||
sqe.fd = fd;
|
||||
sqe.off = len;
|
||||
},
|
||||
move |cqe| cqe_to_result(cqe).map(|_| ()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn try_clone(op: FsOp) -> io::Result<OwnedFd> {
|
||||
let FsOp::Duplicate { fd } = op else {
|
||||
unreachable!("try_clone backend called with non-duplicate op");
|
||||
};
|
||||
|
||||
offload(move || {
|
||||
let duplicated = cvt(unsafe { libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0) })?;
|
||||
Ok(unsafe { OwnedFd::from_raw_fd(duplicated) })
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn create_dir(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::CreateDir {
|
||||
path,
|
||||
recursive: _,
|
||||
mode,
|
||||
} = op
|
||||
else {
|
||||
unreachable!("create_dir backend called with non-create_dir op");
|
||||
};
|
||||
|
||||
let path = path_to_c_string(&path)?;
|
||||
let path_ptr = path.as_ptr();
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_MKDIRAT;
|
||||
sqe.fd = libc::AT_FDCWD;
|
||||
sqe.addr = path_ptr as u64;
|
||||
sqe.len = mode;
|
||||
},
|
||||
move |cqe| {
|
||||
let _path = path;
|
||||
cqe_to_result(cqe).map(|_| ())
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove_file(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::RemoveFile { path } = op else {
|
||||
unreachable!("remove_file backend called with non-remove_file op");
|
||||
};
|
||||
|
||||
submit_unlink(path, 0).await
|
||||
}
|
||||
|
||||
pub async fn remove_dir(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::RemoveDir { path } = op else {
|
||||
unreachable!("remove_dir backend called with non-remove_dir op");
|
||||
};
|
||||
|
||||
submit_unlink(path, libc::AT_REMOVEDIR).await
|
||||
}
|
||||
|
||||
pub async fn rename(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::Rename { from, to } = op else {
|
||||
unreachable!("rename backend called with non-rename op");
|
||||
};
|
||||
|
||||
let from = path_to_c_string(&from)?;
|
||||
let to = path_to_c_string(&to)?;
|
||||
let from_ptr = from.as_ptr();
|
||||
let to_ptr = to.as_ptr();
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_RENAMEAT;
|
||||
sqe.fd = libc::AT_FDCWD;
|
||||
sqe.addr = from_ptr as u64;
|
||||
sqe.len = libc::AT_FDCWD as u32;
|
||||
sqe.off = to_ptr as u64;
|
||||
sqe.op_flags = 0;
|
||||
},
|
||||
move |cqe| {
|
||||
let _from = from;
|
||||
let _to = to;
|
||||
cqe_to_result(cqe).map(|_| ())
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn close(op: FsOp) -> io::Result<()> {
|
||||
let FsOp::Close { fd } = op else {
|
||||
unreachable!("close backend called with non-close op");
|
||||
};
|
||||
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_CLOSE;
|
||||
sqe.fd = fd;
|
||||
},
|
||||
move |cqe| cqe_to_result(cqe).map(|_| ()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn read_dir(op: FsOp) -> io::Result<ReadDirStream> {
|
||||
let FsOp::ReadDir { path } = op else {
|
||||
unreachable!("read_dir backend called with non-read_dir op");
|
||||
};
|
||||
|
||||
ReadDirStream::new(path)
|
||||
}
|
||||
|
||||
pub struct ReadDirStream {
|
||||
state: Arc<ReadDirState>,
|
||||
}
|
||||
|
||||
impl ReadDirStream {
|
||||
fn new(path: PathBuf) -> io::Result<Self> {
|
||||
let state = Arc::new(ReadDirState::new(current_thread_handle()));
|
||||
let producer = Arc::clone(&state);
|
||||
|
||||
thread::Builder::new()
|
||||
.name("ruin-runtime-read-dir".into())
|
||||
.spawn(move || produce_dir_entries(path, producer))
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
Ok(Self { state })
|
||||
}
|
||||
|
||||
pub async fn next_entry(&mut self) -> io::Result<Option<RawDirEntry>> {
|
||||
poll_fn(|cx| self.state.poll_next(cx)).await
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadDirState {
|
||||
owner: ThreadHandle,
|
||||
queue: Mutex<VecDeque<io::Result<RawDirEntry>>>,
|
||||
done: AtomicBool,
|
||||
pending: AtomicBool,
|
||||
wake_queued: AtomicBool,
|
||||
waker: Mutex<Option<Waker>>,
|
||||
}
|
||||
|
||||
impl ReadDirState {
|
||||
fn new(owner: ThreadHandle) -> Self {
|
||||
owner.begin_async_operation();
|
||||
Self {
|
||||
owner,
|
||||
queue: Mutex::new(VecDeque::new()),
|
||||
done: AtomicBool::new(false),
|
||||
pending: AtomicBool::new(true),
|
||||
wake_queued: AtomicBool::new(false),
|
||||
waker: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn push(self: &Arc<Self>, entry: io::Result<RawDirEntry>) {
|
||||
self.queue.lock().unwrap().push_back(entry);
|
||||
self.notify();
|
||||
}
|
||||
|
||||
fn finish(self: &Arc<Self>) {
|
||||
self.done.store(true, Ordering::Release);
|
||||
self.release_pending();
|
||||
self.notify();
|
||||
}
|
||||
|
||||
fn release_pending(&self) {
|
||||
if self.pending.swap(false, Ordering::AcqRel) {
|
||||
self.owner.finish_async_operation();
|
||||
}
|
||||
}
|
||||
|
||||
fn notify(self: &Arc<Self>) {
|
||||
if self.wake_queued.swap(true, Ordering::AcqRel) {
|
||||
return;
|
||||
}
|
||||
|
||||
let state = Arc::clone(self);
|
||||
if !self.owner.queue_microtask(move || {
|
||||
state.wake_queued.store(false, Ordering::Release);
|
||||
if let Some(waker) = state.waker.lock().unwrap().take() {
|
||||
waker.wake();
|
||||
}
|
||||
}) {
|
||||
self.wake_queued.store(false, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_next(&self, cx: &mut Context<'_>) -> Poll<io::Result<Option<RawDirEntry>>> {
|
||||
if let Some(entry) = self.queue.lock().unwrap().pop_front() {
|
||||
return Poll::Ready(entry.map(Some));
|
||||
}
|
||||
|
||||
if self.done.load(Ordering::Acquire) {
|
||||
return Poll::Ready(Ok(None));
|
||||
}
|
||||
|
||||
*self.waker.lock().unwrap() = Some(cx.waker().clone());
|
||||
|
||||
if let Some(entry) = self.queue.lock().unwrap().pop_front() {
|
||||
let _ = self.waker.lock().unwrap().take();
|
||||
return Poll::Ready(entry.map(Some));
|
||||
}
|
||||
|
||||
if self.done.load(Ordering::Acquire) {
|
||||
let _ = self.waker.lock().unwrap().take();
|
||||
return Poll::Ready(Ok(None));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ReadDirStream {
|
||||
fn drop(&mut self) {
|
||||
self.state.release_pending();
|
||||
}
|
||||
}
|
||||
|
||||
fn produce_dir_entries(path: PathBuf, state: Arc<ReadDirState>) {
|
||||
match std::fs::read_dir(path) {
|
||||
Ok(entries) => {
|
||||
for entry in entries {
|
||||
match entry {
|
||||
Ok(entry) => {
|
||||
let file_name = entry.file_name();
|
||||
state.push(Ok(RawDirEntry {
|
||||
path: entry.path(),
|
||||
file_name,
|
||||
}));
|
||||
}
|
||||
Err(error) => state.push(Err(error)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => state.push(Err(error)),
|
||||
}
|
||||
|
||||
state.finish();
|
||||
}
|
||||
|
||||
async fn submit_sync(fd: RawFd, flags: u32) -> io::Result<()> {
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_FSYNC;
|
||||
sqe.fd = fd;
|
||||
sqe.op_flags = flags;
|
||||
},
|
||||
move |cqe| cqe_to_result(cqe).map(|_| ()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn submit_unlink(path: PathBuf, flags: i32) -> io::Result<()> {
|
||||
let path = path_to_c_string(&path)?;
|
||||
let path_ptr = path.as_ptr();
|
||||
submit_uring::<(), _>(
|
||||
move |sqe| {
|
||||
sqe.opcode = IORING_OP_UNLINKAT;
|
||||
sqe.fd = libc::AT_FDCWD;
|
||||
sqe.addr = path_ptr as u64;
|
||||
sqe.op_flags = flags as u32;
|
||||
},
|
||||
move |cqe| {
|
||||
let _path = path;
|
||||
cqe_to_result(cqe).map(|_| ())
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn submit_uring<T: Send + 'static, M>(
|
||||
fill: impl FnOnce(&mut crate::platform::linux_x86_64::uring::IoUringSqe),
|
||||
map: M,
|
||||
) -> io::Result<T>
|
||||
where
|
||||
M: FnOnce(IoUringCqe) -> io::Result<T> + Send + 'static,
|
||||
{
|
||||
let (future, handle) = completion_for_current_thread::<io::Result<T>>();
|
||||
let callback_handle = handle.clone();
|
||||
let token = with_current_reactor(|reactor| {
|
||||
reactor.submit_operation(fill, move |cqe| {
|
||||
callback_handle.complete(map(cqe));
|
||||
})
|
||||
})?;
|
||||
|
||||
handle.set_cancel(move || {
|
||||
let _ = with_current_reactor(|reactor| reactor.cancel_operation(token));
|
||||
});
|
||||
|
||||
future.await
|
||||
}
|
||||
|
||||
async fn offload<T: Send + 'static>(
|
||||
task: impl FnOnce() -> io::Result<T> + Send + 'static,
|
||||
) -> io::Result<T> {
|
||||
let (future, handle) = completion_for_current_thread::<io::Result<T>>();
|
||||
thread::Builder::new()
|
||||
.name("ruin-runtime-fs-offload".into())
|
||||
.spawn(move || handle.complete(task()))
|
||||
.map_err(io::Error::other)?;
|
||||
future.await
|
||||
}
|
||||
|
||||
fn path_to_c_string(path: &Path) -> io::Result<CString> {
|
||||
CString::new(path.as_os_str().as_bytes()).map_err(|_| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"paths containing NUL bytes are not supported",
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn open_flags(options: &OpenOptions) -> io::Result<(i32, u32)> {
|
||||
if !options.read && !options.write && !options.append {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"OpenOptions requires read, write, or append access",
|
||||
));
|
||||
}
|
||||
|
||||
let mut flags = if options.read {
|
||||
if options.write || options.append {
|
||||
libc::O_RDWR
|
||||
} else {
|
||||
libc::O_RDONLY
|
||||
}
|
||||
} else {
|
||||
libc::O_WRONLY
|
||||
};
|
||||
|
||||
if options.append {
|
||||
flags |= libc::O_APPEND;
|
||||
}
|
||||
if options.truncate {
|
||||
flags |= libc::O_TRUNC;
|
||||
}
|
||||
if options.create_new {
|
||||
flags |= libc::O_CREAT | libc::O_EXCL;
|
||||
} else if options.create {
|
||||
flags |= libc::O_CREAT;
|
||||
}
|
||||
|
||||
Ok((flags | libc::O_CLOEXEC, 0o666))
|
||||
}
|
||||
|
||||
fn metadata_flags(follow_symlinks: bool) -> i32 {
|
||||
let mut flags = libc::AT_NO_AUTOMOUNT;
|
||||
if !follow_symlinks {
|
||||
flags |= libc::AT_SYMLINK_NOFOLLOW;
|
||||
}
|
||||
flags
|
||||
}
|
||||
|
||||
fn raw_metadata_from_statx(statx: &libc::statx) -> RawMetadata {
|
||||
RawMetadata {
|
||||
file_type: file_type_from_mode(statx.stx_mode),
|
||||
mode: statx.stx_mode,
|
||||
len: statx.stx_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn file_type_from_mode(mode: u16) -> FileType {
|
||||
match mode & libc::S_IFMT as u16 {
|
||||
value if value == libc::S_IFREG as u16 => FileType::File,
|
||||
value if value == libc::S_IFDIR as u16 => FileType::Directory,
|
||||
value if value == libc::S_IFLNK as u16 => FileType::Symlink,
|
||||
value if value == libc::S_IFBLK as u16 => FileType::BlockDevice,
|
||||
value if value == libc::S_IFCHR as u16 => FileType::CharacterDevice,
|
||||
value if value == libc::S_IFIFO as u16 => FileType::Fifo,
|
||||
value if value == libc::S_IFSOCK as u16 => FileType::Socket,
|
||||
_ => FileType::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
fn cqe_to_result(cqe: IoUringCqe) -> io::Result<i32> {
|
||||
if cqe.res < 0 {
|
||||
Err(io::Error::from_raw_os_error(-cqe.res))
|
||||
} else {
|
||||
Ok(cqe.res)
|
||||
}
|
||||
}
|
||||
|
||||
fn cvt(value: libc::c_int) -> io::Result<libc::c_int> {
|
||||
if value == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user