diff --git a/Cargo.lock b/Cargo.lock index a991cc3..1d9cb73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,6 +151,10 @@ dependencies = [ "syn", ] +[[package]] +name = "ruin_reactivity" +version = "0.1.0" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/lib/runtime/src/channel/mod.rs b/lib/runtime/src/channel/mod.rs index b58519e..99a62c8 100644 --- a/lib/runtime/src/channel/mod.rs +++ b/lib/runtime/src/channel/mod.rs @@ -1,4 +1,14 @@ //! Async channels for inter-thread communication. +//! +//! The channel types in this module are userspace synchronization primitives. They do not carry +//! messages through the kernel; instead, channel state lives in shared Rust data structures and +//! the runtime uses `io_uring` `MSG_RING` notifications only to wake the owning runtime thread +//! when an async waiter becomes ready. +//! +//! The initial surface includes: +//! +//! - [`oneshot`] for single-value handoff +//! - [`mpsc`] for bounded and unbounded multi-producer/single-consumer queues pub mod mpsc; pub mod oneshot; diff --git a/lib/runtime/src/channel/mpsc.rs b/lib/runtime/src/channel/mpsc.rs index db4634c..f47273d 100644 --- a/lib/runtime/src/channel/mpsc.rs +++ b/lib/runtime/src/channel/mpsc.rs @@ -1,3 +1,5 @@ +//! Multi-producer, single-consumer channels. + use std::collections::VecDeque; use std::future::poll_fn; use std::pin::Pin; @@ -7,6 +9,13 @@ use std::task::{Context, Poll}; use crate::op::completion::{CompletionFuture, CompletionHandle}; use crate::sys::linux::channel::runtime_waiter; +/// Creates a bounded channel with room for at most `capacity` queued messages. +/// +/// Bounded senders provide both [`Sender::try_send`] and async [`Sender::send`] backpressure. +/// +/// # Panics +/// +/// Panics if `capacity == 0`. pub fn channel(capacity: usize) -> (Sender, Receiver) { assert!(capacity > 0, "bounded channels require capacity > 0"); let shared = Arc::new(Mutex::new(State::new(Some(capacity)))); @@ -18,6 +27,9 @@ pub fn channel(capacity: usize) -> (Sender, Receiver) { ) } +/// Creates an unbounded channel. +/// +/// Unbounded senders never wait for capacity, but the single receiver is still asynchronous. pub fn unbounded_channel() -> (UnboundedSender, Receiver) { let shared = Arc::new(Mutex::new(State::new(None))); ( @@ -28,14 +40,17 @@ pub fn unbounded_channel() -> (UnboundedSender, Receiver { shared: Arc>>, } +/// Unbounded multi-producer sender. pub struct UnboundedSender { shared: Arc>>, } +/// Single consumer for both bounded and unbounded MPSC channels. pub struct Receiver { shared: Arc>>, } @@ -57,17 +72,24 @@ struct SendWaiter { } #[derive(Debug, Eq, PartialEq)] +/// Error returned when sending fails because the receiver has been closed or dropped. pub struct SendError(pub T); #[derive(Debug, Eq, PartialEq)] +/// Error returned by [`Sender::try_send`] when a message cannot be queued immediately. pub enum TrySendError { + /// The bounded queue is currently full. Full(T), + /// The receiver has been closed or dropped. Closed(T), } #[derive(Clone, Copy, Debug, Eq, PartialEq)] +/// Error returned by [`Receiver::try_recv`] when no message is available immediately. pub enum TryRecvError { + /// The channel is still open, but currently empty. Empty, + /// The channel is closed and no more messages can arrive. Disconnected, } @@ -215,12 +237,20 @@ impl Clone for UnboundedSender { } impl Sender { + /// Waits until the message can be queued. + /// + /// When the bounded channel is full, this future waits until the receiver frees capacity. + /// + /// # Panics + /// + /// Panics if this future is first polled outside a runtime-managed thread. pub async fn send(&self, value: T) -> Result<(), SendError> { let mut value = Some(value); let mut wait = None; poll_fn(|cx| self.poll_send(cx, &mut value, &mut wait)).await } + /// Attempts to queue a message immediately. pub fn try_send(&self, value: T) -> Result<(), TrySendError> { self.shared .lock() @@ -228,6 +258,7 @@ impl Sender { .try_send_now(value) } + /// Returns `true` if the receiver has been closed or dropped. pub fn is_closed(&self) -> bool { self.shared .lock() @@ -306,6 +337,7 @@ impl Sender { } impl UnboundedSender { + /// Queues a message immediately. pub fn send(&self, value: T) -> Result<(), SendError> { self.shared .lock() @@ -316,6 +348,7 @@ impl UnboundedSender { }) } + /// Returns `true` if the receiver has been closed or dropped. pub fn is_closed(&self) -> bool { self.shared .lock() @@ -325,11 +358,19 @@ impl UnboundedSender { } impl Receiver { + /// Waits for the next message. + /// + /// Returns `None` when the channel is closed and all buffered messages have been drained. + /// + /// # Panics + /// + /// Panics if this future is first polled outside a runtime-managed thread. pub async fn recv(&mut self) -> Option { let mut wait = None; poll_fn(|cx| self.poll_recv(cx, &mut wait)).await } + /// Attempts to receive a message immediately. pub fn try_recv(&mut self) -> Result { let mut state = self .shared @@ -345,6 +386,10 @@ impl Receiver { } } + /// Closes the channel to future sends. + /// + /// Already-buffered messages remain available to [`recv`](Self::recv) and + /// [`try_recv`](Self::try_recv). pub fn close(&mut self) { self.shared .lock() @@ -352,6 +397,7 @@ impl Receiver { .close_receiver(); } + /// Returns `true` if the channel is closed or all senders have been dropped. pub fn is_closed(&self) -> bool { let state = self .shared diff --git a/lib/runtime/src/channel/oneshot.rs b/lib/runtime/src/channel/oneshot.rs index 79ea4c4..631d7d9 100644 --- a/lib/runtime/src/channel/oneshot.rs +++ b/lib/runtime/src/channel/oneshot.rs @@ -1,3 +1,5 @@ +//! Single-use channels for handing one value from a sender to a receiver. + use std::future::poll_fn; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -6,6 +8,15 @@ use std::task::{Context, Poll}; use crate::op::completion::{CompletionFuture, CompletionHandle}; use crate::sys::linux::channel::runtime_waiter; +/// Creates a single-use channel for transferring one value from a [`Sender`] to a [`Receiver`]. +/// +/// # Examples +/// +/// ``` +/// let (sender, mut receiver) = ruin_runtime::channel::oneshot::channel::(); +/// sender.send(7).unwrap(); +/// assert_eq!(receiver.try_recv(), Ok(7)); +/// ``` pub fn channel() -> (Sender, Receiver) { let shared = Arc::new(Mutex::new(State { value: None, @@ -24,10 +35,12 @@ pub fn channel() -> (Sender, Receiver) { ) } +/// Sending half of a oneshot channel. pub struct Sender { shared: Option>>>, } +/// Receiving half of a oneshot channel. pub struct Receiver { shared: Arc>>, consumed: bool, @@ -41,18 +54,27 @@ struct State { } #[derive(Debug, Eq, PartialEq)] +/// Error returned when a oneshot send fails because the receiver is gone or closed. pub struct SendError(pub T); #[derive(Clone, Copy, Debug, Eq, PartialEq)] +/// Error returned when a oneshot receive observes a closed channel with no value. pub struct RecvError; #[derive(Debug, Eq, PartialEq)] +/// Non-blocking receive errors for [`Receiver::try_recv`]. pub enum TryRecvError { + /// No value has been sent yet, and the sender is still alive. Empty, + /// The channel can never yield a value. Closed, } impl Sender { + /// Sends `value` into the channel. + /// + /// This consumes the sender. If the receiver is already waiting on a runtime thread, the + /// receive future is woken through the runtime's cross-thread notifier path. pub fn send(mut self, value: T) -> Result<(), SendError> { let Some(shared) = self.shared.take() else { return Err(SendError(value)); @@ -80,6 +102,7 @@ impl Sender { Ok(()) } + /// Returns `true` if the receiver has been closed or dropped. pub fn is_closed(&self) -> bool { self.shared.as_ref().is_none_or(|shared| { shared @@ -91,11 +114,19 @@ impl Sender { } impl Receiver { + /// Waits for the channel's value. + /// + /// # Panics + /// + /// Panics if this future is first polled outside a runtime-managed thread. Async channel + /// waiting registers with the current runtime thread so it can be woken through the driver's + /// notification path. pub async fn recv(&mut self) -> Result { let mut wait = None; poll_fn(|cx| self.poll_recv(cx, &mut wait)).await } + /// Attempts to receive the value without waiting. pub fn try_recv(&mut self) -> Result { if self.consumed { return Err(TryRecvError::Closed); @@ -118,6 +149,10 @@ impl Receiver { } } + /// Closes the receiver. + /// + /// Closing prevents future sends from succeeding. If a value has already been sent, it can + /// still be retrieved. pub fn close(&mut self) { let mut state = self .shared @@ -126,6 +161,7 @@ impl Receiver { state.receiver_closed = true; } + /// Returns `true` if the channel is closed to future sends. pub fn is_closed(&self) -> bool { let state = self .shared diff --git a/lib/runtime/src/fs.rs b/lib/runtime/src/fs.rs index aa33f2d..be7edcf 100644 --- a/lib/runtime/src/fs.rs +++ b/lib/runtime/src/fs.rs @@ -4,6 +4,9 @@ //! - Dropping an I/O future cancels interest in the result. //! - The runtime issues best-effort kernel cancellation where supported. //! - The underlying OS operation may still complete after the future is dropped. +//! +//! The public surface intentionally mirrors `std::fs` where that shape makes sense, while using +//! async methods for operations that may block the caller. use std::ffi::OsStr; use std::io; @@ -21,33 +24,43 @@ struct FileInner { fd: OwnedFd, } +/// Async file handle. +/// +/// `File` is cheap to clone internally and supports both cursor-based sequential I/O and +/// offset-based positioned I/O. pub struct File { inner: Arc, } +/// Builder used to configure how a [`File`] is opened. pub struct OpenOptions { inner: OpOpenOptions, } #[derive(Clone, Debug, Eq, PartialEq)] +/// File metadata returned by [`metadata`] or [`File::metadata`]. pub struct Metadata { inner: RawMetadata, } +/// Async directory-entry stream returned by [`read_dir`]. pub struct ReadDir { inner: sys_fs::ReadDirStream, } #[derive(Clone, Debug, Eq, PartialEq)] +/// Directory entry yielded by [`ReadDir::next_entry`]. pub struct DirEntry { inner: OpDirEntry, } impl File { + /// Opens an existing file for reading. pub async fn open(path: impl AsRef) -> io::Result { OpenOptions::new().read(true).open(path).await } + /// Opens a file for writing, creating or truncating it first. pub async fn create(path: impl AsRef) -> io::Result { OpenOptions::new() .write(true) @@ -57,10 +70,12 @@ impl File { .await } + /// Reads bytes from the file's current cursor position. pub async fn read(&mut self, buf: &mut [u8]) -> io::Result { self.read_impl(None, buf).await } + /// Reads exactly `buf.len()` bytes from the current cursor position. pub async fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> { while !buf.is_empty() { let read = self.read(buf).await?; @@ -75,10 +90,12 @@ impl File { Ok(()) } + /// Writes bytes at the file's current cursor position. pub async fn write(&mut self, buf: &[u8]) -> io::Result { self.write_impl(None, buf).await } + /// Writes the entire buffer at the file's current cursor position. pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> { while !buf.is_empty() { let written = self.write(buf).await?; @@ -93,22 +110,30 @@ impl File { Ok(()) } + /// Flushes any userspace buffering associated with this handle. + /// + /// The current implementation does not add additional buffering beyond the kernel file + /// description, so this is effectively a no-op. pub async fn flush(&mut self) -> io::Result<()> { Ok(()) } + /// Synchronizes file contents and metadata to stable storage. pub async fn sync_all(&self) -> io::Result<()> { sys_fs::sync_all(FsOp::SyncAll { fd: self.raw_fd() }).await } + /// Synchronizes file contents to stable storage. pub async fn sync_data(&self) -> io::Result<()> { sys_fs::sync_data(FsOp::SyncData { fd: self.raw_fd() }).await } + /// Reads bytes starting at `offset` without using the shared file cursor. pub async fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { self.read_impl(Some(offset), buf).await } + /// Reads exactly `buf.len()` bytes starting at `offset`. pub async fn read_exact_at(&self, mut offset: u64, mut buf: &mut [u8]) -> io::Result<()> { while !buf.is_empty() { let read = self.read_at(offset, buf).await?; @@ -124,10 +149,12 @@ impl File { Ok(()) } + /// Writes bytes starting at `offset` without using the shared file cursor. pub async fn write_at(&self, offset: u64, buf: &[u8]) -> io::Result { self.write_impl(Some(offset), buf).await } + /// Writes the entire buffer starting at `offset`. pub async fn write_all_at(&self, mut offset: u64, mut buf: &[u8]) -> io::Result<()> { while !buf.is_empty() { let written = self.write_at(offset, buf).await?; @@ -143,6 +170,7 @@ impl File { Ok(()) } + /// Returns metadata for this file handle. pub async fn metadata(&self) -> io::Result { sys_fs::metadata(FsOp::Metadata { target: MetadataTarget::File(self.raw_fd()), @@ -152,6 +180,7 @@ impl File { .map(Metadata::from_raw) } + /// Truncates or extends the underlying file to `len` bytes. pub async fn set_len(&self, len: u64) -> io::Result<()> { sys_fs::set_len(FsOp::SetLen { fd: self.raw_fd(), @@ -160,6 +189,9 @@ impl File { .await } + /// Duplicates the underlying file description. + /// + /// As with `std::fs::File::try_clone`, the cloned handle shares kernel-managed cursor state. pub async fn try_clone(&self) -> io::Result { sys_fs::try_clone(FsOp::Duplicate { fd: self.raw_fd() }) .await @@ -200,42 +232,63 @@ impl File { } impl OpenOptions { + /// Creates a blank set of open options. pub fn new() -> Self { Self { inner: OpOpenOptions::default(), } } + /// Controls read access. pub fn read(&mut self, value: bool) -> &mut Self { self.inner.read = value; self } + /// Controls write access. pub fn write(&mut self, value: bool) -> &mut Self { self.inner.write = value; self } + /// Controls append mode. pub fn append(&mut self, value: bool) -> &mut Self { self.inner.append = value; self } + /// Controls whether the file is truncated after opening. pub fn truncate(&mut self, value: bool) -> &mut Self { self.inner.truncate = value; self } + /// Controls whether the file is created if it does not already exist. pub fn create(&mut self, value: bool) -> &mut Self { self.inner.create = value; self } + /// Controls whether opening must create a brand-new file. pub fn create_new(&mut self, value: bool) -> &mut Self { self.inner.create_new = value; self } + /// Opens a file with the configured options. + /// + /// # Examples + /// + /// ``` + /// # let _ = || async { + /// let file = ruin_runtime::fs::OpenOptions::new() + /// .read(true) + /// .write(true) + /// .open("example.txt") + /// .await; + /// # let _ = file; + /// # }; + /// ``` pub async fn open(&self, path: impl AsRef) -> io::Result { sys_fs::open(FsOp::Open { path: path.as_ref().to_path_buf(), @@ -257,32 +310,39 @@ impl Metadata { Self { inner } } + /// Returns the file length in bytes. pub fn len(&self) -> u64 { self.inner.len } + /// Returns `true` if the file length is zero. pub fn is_empty(&self) -> bool { self.len() == 0 } + /// Returns `true` if this metadata describes a regular file. pub fn is_file(&self) -> bool { self.inner.file_type == RawFileType::File } + /// Returns `true` if this metadata describes a directory. pub fn is_dir(&self) -> bool { self.inner.file_type == RawFileType::Directory } + /// Returns `true` if this metadata describes a symbolic link. pub fn is_symlink(&self) -> bool { self.inner.file_type == RawFileType::Symlink } + /// Returns the raw POSIX mode bits reported by the platform backend. pub fn mode(&self) -> u16 { self.inner.mode } } impl ReadDir { + /// Returns the next directory entry, or `None` once the stream is exhausted. pub async fn next_entry(&mut self) -> io::Result> { self.inner .next_entry() @@ -292,19 +352,23 @@ impl ReadDir { } impl DirEntry { + /// Returns the full path to this directory entry. pub fn path(&self) -> PathBuf { self.inner.path.clone() } + /// Returns the file name portion of this directory entry. pub fn file_name(&self) -> &OsStr { self.inner.file_name.as_os_str() } + /// Resolves metadata for this entry. pub async fn metadata(&self) -> io::Result { metadata(self.path()).await } } +/// Reads the entire contents of a file into memory. pub async fn read(path: impl AsRef) -> io::Result> { let mut file = File::open(path.as_ref()).await?; let mut output = Vec::new(); @@ -319,11 +383,13 @@ pub async fn read(path: impl AsRef) -> io::Result> { } } +/// Reads the entire contents of a UTF-8 file into a [`String`]. pub async fn read_to_string(path: impl AsRef) -> io::Result { let bytes = read(path).await?; String::from_utf8(bytes).map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error)) } +/// Replaces the contents of a file with `data`, creating it if needed. pub async fn write(path: impl AsRef, data: impl AsRef<[u8]>) -> io::Result<()> { let mut file = OpenOptions::new() .write(true) @@ -334,6 +400,7 @@ pub async fn write(path: impl AsRef, data: impl AsRef<[u8]>) -> io::Result file.write_all(data.as_ref()).await } +/// Returns metadata for a filesystem path. pub async fn metadata(path: impl AsRef) -> io::Result { sys_fs::metadata(FsOp::Metadata { target: MetadataTarget::Path(path.as_ref().to_path_buf()), @@ -343,6 +410,7 @@ pub async fn metadata(path: impl AsRef) -> io::Result { .map(Metadata::from_raw) } +/// Creates a single directory. pub async fn create_dir(path: impl AsRef) -> io::Result<()> { sys_fs::create_dir(FsOp::CreateDir { path: path.as_ref().to_path_buf(), @@ -352,6 +420,7 @@ pub async fn create_dir(path: impl AsRef) -> io::Result<()> { .await } +/// Creates a directory and any missing parent directories. pub async fn create_dir_all(path: impl AsRef) -> io::Result<()> { let path = path.as_ref(); let mut current = PathBuf::new(); @@ -372,6 +441,7 @@ pub async fn create_dir_all(path: impl AsRef) -> io::Result<()> { Ok(()) } +/// Removes a file. pub async fn remove_file(path: impl AsRef) -> io::Result<()> { sys_fs::remove_file(FsOp::RemoveFile { path: path.as_ref().to_path_buf(), @@ -379,6 +449,7 @@ pub async fn remove_file(path: impl AsRef) -> io::Result<()> { .await } +/// Removes an empty directory. pub async fn remove_dir(path: impl AsRef) -> io::Result<()> { sys_fs::remove_dir(FsOp::RemoveDir { path: path.as_ref().to_path_buf(), @@ -386,6 +457,7 @@ pub async fn remove_dir(path: impl AsRef) -> io::Result<()> { .await } +/// Renames or moves a filesystem entry. pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result<()> { sys_fs::rename(FsOp::Rename { from: from.as_ref().to_path_buf(), @@ -394,6 +466,16 @@ pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result< .await } +/// Opens an async directory-entry stream. +/// +/// # Examples +/// +/// ``` +/// # let _ = || async { +/// let mut entries = ruin_runtime::fs::read_dir(".").await.unwrap(); +/// let _ = entries.next_entry().await; +/// # }; +/// ``` pub async fn read_dir(path: impl AsRef) -> io::Result { sys_fs::read_dir(FsOp::ReadDir { path: path.as_ref().to_path_buf(), diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 90390ff..9559c9d 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -1,6 +1,21 @@ +//! Runtime, driver, async I/O, and channel primitives for RUIN. +//! +//! The crate is centered around a single-threaded event loop with explicit worker threads, +//! JavaScript-style microtask/macrotask scheduling, and Linux `io_uring`-backed I/O. +//! +//! Most users will start with: +//! +//! - [`main`] or [`async_main`] for executable entry points +//! - [`run`], [`queue_task`], [`queue_microtask`], and [`queue_future`] for event-loop work +//! - [`fs`], [`net`], [`time`], and [`channel`] for async runtime services +//! +//! # Platform support +//! +//! `ruin-runtime` currently targets Linux on `x86_64`. +//! //! RUIN runtime foundations. //! -//! This crate provides a Linux x86_64 runtime substrate: the mesh allocator, the reactor, and a +//! This crate provides a Linux x86_64 runtime substrate: the mesh allocator, the driver, and a //! single-threaded runtime loop with worker-thread task forwarding. #![feature(thread_local)] @@ -13,13 +28,31 @@ extern crate alloc; pub mod channel; pub mod fs; pub mod net; +#[doc(hidden)] pub mod op; +#[doc(hidden)] pub mod platform; +#[doc(hidden)] pub mod sys; pub mod time; -pub use ruin_runtime_proc_macros::{async_main, main}; +/// Marks an `async fn main()` as the runtime entry point. +/// +/// The macro generates a real Rust `main` that queues the returned future onto the main runtime +/// thread before calling [`run`]. +pub use ruin_runtime_proc_macros::async_main; +/// Marks a synchronous `fn main()` as the runtime entry point. +/// +/// The macro generates a real Rust `main` that queues the function body onto the main runtime +/// thread before calling [`run`]. +pub use ruin_runtime_proc_macros::main; +/// Driver primitives re-exported from the Linux x86_64 backend. +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +pub use platform::linux_x86_64::driver::{ + Driver, ReadyEvents, ThreadNotifier, create, create_driver, monotonic_now, +}; +/// Public mesh-allocator surface re-exported from the Linux x86_64 backend. #[cfg(all(target_os = "linux", target_arch = "x86_64"))] pub use platform::linux_x86_64::mesh_alloc::{ ActiveMeshGuard, Arena, AtomicBitmap, BitIter, CLASS_TO_SIZE, CompactionAdvice, @@ -32,12 +65,10 @@ pub use platform::linux_x86_64::mesh_alloc::{ page_shift, page_size, retry_on_efault, retry_on_efault_ptrs, round_up_to_page, runtime_slots_per_span, size_class_for, }; +/// Additional allocator helpers re-exported from the Linux x86_64 backend. #[cfg(all(target_os = "linux", target_arch = "x86_64"))] pub use platform::linux_x86_64::mesh_alloc::{FreelistId, bitmaps_meshable}; -#[cfg(all(target_os = "linux", target_arch = "x86_64"))] -pub use platform::linux_x86_64::reactor::{ - Reactor, ReadyEvents, ThreadNotifier, create, create_reactor, monotonic_now, -}; +/// Runtime/event-loop primitives re-exported from the Linux x86_64 backend. #[cfg(all(target_os = "linux", target_arch = "x86_64"))] pub use platform::linux_x86_64::runtime::{ IntervalHandle, JoinHandle, ThreadHandle, TimeoutHandle, WorkerHandle, clear_interval, @@ -45,6 +76,9 @@ pub use platform::linux_x86_64::runtime::{ set_interval, set_timeout, spawn_worker, yield_now, }; +/// Returns the default global mesh allocator configuration for this crate. +/// +/// This is useful when embedding the allocator in a `#[global_allocator]` static. pub const fn default_global_allocator() -> GlobalMeshAllocator { GlobalMeshAllocator::with_default_config() } diff --git a/lib/runtime/src/net.rs b/lib/runtime/src/net.rs index aa9a961..7902f53 100644 --- a/lib/runtime/src/net.rs +++ b/lib/runtime/src/net.rs @@ -1,4 +1,7 @@ //! Portable async networking API. +//! +//! The public surface follows the general shape of `std::net`, but uses async methods for socket +//! operations that would otherwise block the caller. use std::future::Future; use std::io; @@ -40,6 +43,10 @@ type PendingRead = Pin>> + 'static>>; type PendingWrite = Pin> + 'static>>; type PendingShutdown = Pin> + 'static>>; +/// Async TCP stream. +/// +/// This type also implements Hyper's runtime I/O traits, allowing it to be used directly as an +/// HTTP transport. pub struct TcpStream { inner: Arc, pending_read: Option, @@ -48,16 +55,19 @@ pub struct TcpStream { } #[derive(Clone, Debug)] +/// Async TCP listening socket. pub struct TcpListener { inner: Arc, } #[derive(Debug)] +/// Async UDP socket. pub struct UdpSocket { inner: Arc, } impl TcpStream { + /// Connects to the first resolved address that succeeds. pub async fn connect(addr: A) -> io::Result where A: ToSocketAddrs + Send + 'static, @@ -79,6 +89,7 @@ impl TcpStream { })) } + /// Connects to `addr`, failing if the deadline elapses first. pub async fn connect_timeout(addr: &SocketAddr, timeout: Duration) -> io::Result { validate_timeout(timeout)?; crate::sys::linux::net::connect_stream_timeout(*addr, timeout) @@ -86,6 +97,7 @@ impl TcpStream { .map(Self::from_owned_fd) } + /// Reads bytes from the stream. pub async fn read(&mut self, buf: &mut [u8]) -> io::Result { let data = match self.read_timeout_value() { Some(timeout) => { @@ -105,6 +117,7 @@ impl TcpStream { Ok(read) } + /// Reads exactly `buf.len()` bytes from the stream. pub async fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> { while !buf.is_empty() { let read = self.read(buf).await?; @@ -119,6 +132,7 @@ impl TcpStream { Ok(()) } + /// Writes bytes to the stream. pub async fn write(&mut self, buf: &[u8]) -> io::Result { match self.write_timeout_value() { Some(timeout) => { @@ -135,6 +149,7 @@ impl TcpStream { } } + /// Writes the entire buffer to the stream. pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> { while !buf.is_empty() { let written = self.write(buf).await?; @@ -149,6 +164,7 @@ impl TcpStream { Ok(()) } + /// Shuts down the read, write, or both halves of the connection. pub async fn shutdown(&self, how: Shutdown) -> io::Result<()> { crate::sys::linux::net::shutdown(NetOp::Shutdown { fd: self.raw_fd(), @@ -157,50 +173,65 @@ impl TcpStream { .await } + /// Duplicates the underlying stream socket. pub async fn try_clone(&self) -> io::Result { crate::sys::linux::net::duplicate(self.raw_fd()) .await .map(Self::from_owned_fd) } + /// Returns the local socket address of this stream. pub fn local_addr(&self) -> io::Result { crate::sys::linux::net::local_addr(self.raw_fd()) } + /// Returns the remote peer address of this stream. pub fn peer_addr(&self) -> io::Result { crate::sys::linux::net::peer_addr(self.raw_fd()) } + /// Reads the current `TCP_NODELAY` setting. pub fn nodelay(&self) -> io::Result { crate::sys::linux::net::nodelay(self.raw_fd()) } + /// Enables or disables `TCP_NODELAY`. pub fn set_nodelay(&self, enabled: bool) -> io::Result<()> { crate::sys::linux::net::set_nodelay(self.raw_fd(), enabled) } + /// Reads the socket's IP time-to-live value. pub fn ttl(&self) -> io::Result { crate::sys::linux::net::ttl(self.raw_fd()) } + /// Sets the socket's IP time-to-live value. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { crate::sys::linux::net::set_ttl(self.raw_fd(), ttl) } + /// Returns the read timeout used by async read operations on this handle. pub fn read_timeout(&self) -> io::Result> { Ok(self.read_timeout_value()) } + /// Sets the read timeout used by async read operations on this handle. + /// + /// Passing `Some(Duration::ZERO)` is rejected. pub fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { validate_optional_timeout(timeout)?; self.inner.timeouts.lock().unwrap().read = timeout; Ok(()) } + /// Returns the write timeout used by async write operations on this handle. pub fn write_timeout(&self) -> io::Result> { Ok(self.write_timeout_value()) } + /// Sets the write timeout used by async write operations on this handle. + /// + /// Passing `Some(Duration::ZERO)` is rejected. pub fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { validate_optional_timeout(timeout)?; self.inner.timeouts.lock().unwrap().write = timeout; @@ -233,6 +264,7 @@ impl TcpStream { } impl TcpListener { + /// Binds a TCP listener to the first resolved address that succeeds. pub async fn bind(addr: A) -> io::Result where A: ToSocketAddrs + Send + 'static, @@ -254,6 +286,7 @@ impl TcpListener { })) } + /// Accepts an incoming connection. pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let accepted = crate::sys::linux::net::accept(NetOp::Accept { fd: self.raw_fd() }).await?; @@ -261,14 +294,17 @@ impl TcpListener { Ok((stream, accepted.peer_addr)) } + /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result { crate::sys::linux::net::local_addr(self.raw_fd()) } + /// Reads the listener socket's IP time-to-live value. pub fn ttl(&self) -> io::Result { crate::sys::linux::net::ttl(self.raw_fd()) } + /// Sets the listener socket's IP time-to-live value. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { crate::sys::linux::net::set_ttl(self.raw_fd(), ttl) } @@ -285,6 +321,7 @@ impl TcpListener { } impl UdpSocket { + /// Binds a UDP socket to the first resolved address that succeeds. pub async fn bind(addr: A) -> io::Result where A: ToSocketAddrs + Send + 'static, @@ -306,6 +343,10 @@ impl UdpSocket { })) } + /// Connects the socket to a default peer. + /// + /// Once connected, [`send`](Self::send), [`recv`](Self::recv), and [`peer_addr`](Self::peer_addr) + /// operate relative to that peer. pub async fn connect(&self, addr: A) -> io::Result<()> where A: ToSocketAddrs + Send + 'static, @@ -332,6 +373,7 @@ impl UdpSocket { })) } + /// Sends a datagram to the connected peer. pub async fn send(&self, buf: &[u8]) -> io::Result { match self.write_timeout_value() { Some(timeout) => { @@ -348,6 +390,7 @@ impl UdpSocket { } } + /// Receives a datagram from the connected peer. pub async fn recv(&self, buf: &mut [u8]) -> io::Result { let data = match self.read_timeout_value() { Some(timeout) => { @@ -367,6 +410,7 @@ impl UdpSocket { Ok(read) } + /// Peeks at the next datagram from the connected peer without consuming it. pub async fn peek(&self, buf: &mut [u8]) -> io::Result { let data = match self.read_timeout_value() { Some(timeout) => { @@ -392,6 +436,7 @@ impl UdpSocket { Ok(read) } + /// Sends a datagram to `addr`. pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result where A: ToSocketAddrs + Send + 'static, @@ -435,6 +480,7 @@ impl UdpSocket { })) } + /// Receives a datagram and returns the sender address. pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { let datagram = match self.read_timeout_value() { Some(timeout) => { @@ -455,6 +501,7 @@ impl UdpSocket { Ok((read, datagram.peer_addr)) } + /// Peeks at the next datagram and returns the sender address without consuming it. pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { let datagram = match self.read_timeout_value() { Some(timeout) => { @@ -480,50 +527,65 @@ impl UdpSocket { Ok((read, datagram.peer_addr)) } + /// Duplicates the underlying UDP socket. pub async fn try_clone(&self) -> io::Result { crate::sys::linux::net::duplicate(self.raw_fd()) .await .map(Self::from_owned_fd) } + /// Returns the local socket address of this socket. pub fn local_addr(&self) -> io::Result { crate::sys::linux::net::local_addr(self.raw_fd()) } + /// Returns the connected peer address, if the socket has been connected. pub fn peer_addr(&self) -> io::Result { crate::sys::linux::net::peer_addr(self.raw_fd()) } + /// Reads the `SO_BROADCAST` setting. pub fn broadcast(&self) -> io::Result { crate::sys::linux::net::broadcast(self.raw_fd()) } + /// Enables or disables `SO_BROADCAST`. pub fn set_broadcast(&self, enabled: bool) -> io::Result<()> { crate::sys::linux::net::set_broadcast(self.raw_fd(), enabled) } + /// Reads the socket's IP time-to-live value. pub fn ttl(&self) -> io::Result { crate::sys::linux::net::ttl(self.raw_fd()) } + /// Sets the socket's IP time-to-live value. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { crate::sys::linux::net::set_ttl(self.raw_fd(), ttl) } + /// Returns the read timeout used by async receive operations on this handle. pub fn read_timeout(&self) -> io::Result> { Ok(self.read_timeout_value()) } + /// Sets the read timeout used by async receive operations on this handle. + /// + /// Passing `Some(Duration::ZERO)` is rejected. pub fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { validate_optional_timeout(timeout)?; self.inner.timeouts.lock().unwrap().read = timeout; Ok(()) } + /// Returns the write timeout used by async send operations on this handle. pub fn write_timeout(&self) -> io::Result> { Ok(self.write_timeout_value()) } + /// Sets the write timeout used by async send operations on this handle. + /// + /// Passing `Some(Duration::ZERO)` is rejected. pub fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { validate_optional_timeout(timeout)?; self.inner.timeouts.lock().unwrap().write = timeout; diff --git a/lib/runtime/src/platform/linux_x86_64/reactor.rs b/lib/runtime/src/platform/linux_x86_64/driver.rs similarity index 83% rename from lib/runtime/src/platform/linux_x86_64/reactor.rs rename to lib/runtime/src/platform/linux_x86_64/driver.rs index 3a833cb..f4578bd 100644 --- a/lib/runtime/src/platform/linux_x86_64/reactor.rs +++ b/lib/runtime/src/platform/linux_x86_64/driver.rs @@ -1,3 +1,5 @@ +//! Public runtime driver primitives. + use std::cell::Cell; use std::cell::RefCell; use std::collections::BTreeMap; @@ -51,23 +53,29 @@ impl NotifierInner { } #[derive(Clone)] +/// Cross-thread notifier for a runtime thread's driver. pub struct ThreadNotifier { inner: Arc, } impl ThreadNotifier { + /// Sends a wake notification to the target runtime thread. pub fn notify(&self) -> io::Result<()> { self.inner.notify() } } #[derive(Debug, Default, Clone, Copy, Eq, PartialEq)] +/// Readiness information returned by [`Driver::poll`]. pub struct ReadyEvents { + /// One or more timer expirations are pending. pub timer: bool, + /// One or more cross-thread wake notifications are pending. pub wake: bool, } -pub struct Reactor { +/// Low-level Linux runtime driver backed by `io_uring`. +pub struct Driver { ring: IoUring, notifier: Arc, next_token: Cell, @@ -77,11 +85,16 @@ pub struct Reactor { completions: RefCell>, } -pub fn create() -> io::Result<(Reactor, ThreadNotifier)> { - create_reactor() +/// Creates a new driver and its paired [`ThreadNotifier`]. +pub fn create() -> io::Result<(Driver, ThreadNotifier)> { + create_driver() } -pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> { +/// Creates a new driver and its paired [`ThreadNotifier`]. +/// +/// This is identical to [`create`] and exists as a more explicit name for callers that want to +/// emphasize driver construction. +pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> { let ring = IoUring::new(64)?; let notifier = Arc::new(NotifierInner { ring_fd: ring.ring_fd(), @@ -89,7 +102,7 @@ pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> { }); Ok(( - Reactor { + Driver { ring, notifier: Arc::clone(¬ifier), next_token: Cell::new(1), @@ -102,7 +115,7 @@ pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> { )) } -impl Reactor { +impl Driver { pub(crate) fn bind_current_thread(&self) { self.ring.bind_current_thread(); } @@ -111,6 +124,7 @@ impl Reactor { self.ring.unbind_current_thread(); } + /// Polls the driver without blocking. pub fn poll(&self) -> io::Result> { let mut ready = ReadyEvents::default(); let saw_any = self @@ -119,10 +133,14 @@ impl Reactor { if saw_any { Ok(Some(ready)) } else { Ok(None) } } + /// Blocks until at least one completion is available. pub fn wait(&self) -> io::Result<()> { self.ring.wait_for_cqe() } + /// Updates the currently armed timer deadline. + /// + /// Passing `None` removes any active timer. pub fn rearm_timer(&self, deadline: Option) -> io::Result<()> { match (self.active_timer_token.get(), deadline) { (Some(active), Some(deadline)) => { @@ -171,6 +189,7 @@ impl Reactor { }) } + /// Drains the accumulated wake notification count. pub fn drain_wake(&self) -> io::Result { let wakes = self.pending_wakes.replace(0); if wakes == 0 { @@ -183,6 +202,7 @@ impl Reactor { } } + /// Drains the accumulated timer-expiration count. pub fn drain_timer(&self) -> io::Result { let timers = self.pending_timers.replace(0); if timers == 0 { @@ -234,12 +254,13 @@ impl Reactor { } } -impl Drop for Reactor { +impl Drop for Driver { fn drop(&mut self) { self.notifier.closed.store(true, Ordering::Release); } } +/// Returns the current monotonic time used by the runtime timer system. pub fn monotonic_now() -> io::Result { let mut now = std::mem::MaybeUninit::::uninit(); let result = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, now.as_mut_ptr()) }; @@ -268,16 +289,16 @@ fn decode_token_kind(token: u64) -> Option { #[cfg(test)] mod tests { - use super::{create_reactor, monotonic_now}; + use super::{create_driver, monotonic_now}; use std::thread; use std::time::Duration; #[test] fn notifier_wakes_target_ring() { - let (sender, _) = create_reactor().expect("sender reactor should initialize"); + let (sender, _) = create_driver().expect("sender driver should initialize"); sender.bind_current_thread(); - let (target, notifier) = create_reactor().expect("target reactor should initialize"); + let (target, notifier) = create_driver().expect("target driver should initialize"); notifier.notify().expect("notify should succeed"); let ready = loop { @@ -295,7 +316,7 @@ mod tests { #[test] fn notifier_wakes_target_ring_from_plain_thread() { - let (target, notifier) = create_reactor().expect("target reactor should initialize"); + let (target, notifier) = create_driver().expect("target driver should initialize"); thread::spawn(move || { notifier.notify().expect("notify should succeed"); @@ -317,14 +338,14 @@ mod tests { #[test] fn timeout_reports_deadlines() { - let (reactor, _notifier) = create_reactor().expect("reactor should initialize"); + let (driver, _notifier) = create_driver().expect("driver should initialize"); let deadline = monotonic_now().expect("clock should work") + Duration::from_millis(20); - reactor + driver .rearm_timer(Some(deadline)) .expect("timer should arm"); let ready = loop { - if let Some(ready) = reactor.poll().expect("poll should succeed") { + if let Some(ready) = driver.poll().expect("poll should succeed") { break ready; } thread::sleep(Duration::from_millis(5)); @@ -332,9 +353,6 @@ mod tests { assert!(ready.timer); assert!(!ready.wake); - assert_eq!( - reactor.drain_timer().expect("timer drain should succeed"), - 1 - ); + assert_eq!(driver.drain_timer().expect("timer drain should succeed"), 1); } } diff --git a/lib/runtime/src/platform/linux_x86_64/mod.rs b/lib/runtime/src/platform/linux_x86_64/mod.rs index a6a1110..706a729 100644 --- a/lib/runtime/src/platform/linux_x86_64/mod.rs +++ b/lib/runtime/src/platform/linux_x86_64/mod.rs @@ -1,4 +1,4 @@ +pub mod driver; pub mod mesh_alloc; -pub mod reactor; pub mod runtime; pub(crate) mod uring; diff --git a/lib/runtime/src/platform/linux_x86_64/runtime.rs b/lib/runtime/src/platform/linux_x86_64/runtime.rs index 5deffd2..ef2ba5a 100644 --- a/lib/runtime/src/platform/linux_x86_64/runtime.rs +++ b/lib/runtime/src/platform/linux_x86_64/runtime.rs @@ -1,3 +1,5 @@ +//! Public runtime loop and worker-thread primitives. + use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, VecDeque}; use std::future::Future; @@ -9,7 +11,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; -use super::reactor::{Reactor, ThreadNotifier, create as create_reactor, monotonic_now}; +use super::driver::{Driver, ThreadNotifier, create_driver, monotonic_now}; type LocalTask = Box; type SendTask = Box; @@ -19,16 +21,19 @@ type LocalBoxFuture = Pin + 'static>>; static mut CURRENT_THREAD: *mut ThreadState = ptr::null_mut(); #[derive(Clone)] +/// Handle for queueing work onto a specific runtime thread. pub struct ThreadHandle { shared: Arc, } +/// Handle for a worker runtime thread spawned with [`spawn_worker`]. pub struct WorkerHandle { thread: ThreadHandle, completion: Arc, } #[derive(Clone)] +/// Handle returned by [`set_timeout`]. pub struct TimeoutHandle { id: usize, owner: *const ThreadState, @@ -36,20 +41,37 @@ pub struct TimeoutHandle { } #[derive(Clone)] +/// Handle returned by [`set_interval`]. pub struct IntervalHandle { id: usize, owner: *const ThreadState, _local: Rc<()>, } +/// Handle returned by [`queue_future`]. +/// +/// Awaiting a join handle yields the output of the queued future. pub struct JoinHandle { state: Rc>, } +/// Future returned by [`yield_now`]. +/// +/// Awaiting this future will immediately yield control back to the runtime scheduler, allowing other queued microtasks +/// to run before the current task continues executing. Note that continuation of futures runs as a microtask, so this +/// can only yield to other microtasks and not to macrotasks (driver events such as file or network I/O, timers, or +/// channel messages). pub struct YieldNow { yielded: bool, } +/// Returns a handle for the current runtime thread. +/// +/// If the current thread has not yet entered the runtime, the runtime state is initialized lazily. +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its driver for the current thread. pub fn current_thread_handle() -> ThreadHandle { current_thread().handle() } @@ -58,10 +80,17 @@ pub(crate) fn try_current_thread_handle() -> Option { unsafe { (!CURRENT_THREAD.is_null()).then(|| (*CURRENT_THREAD).handle()) } } -pub(crate) fn with_current_reactor(f: impl FnOnce(&Reactor) -> T) -> T { - f(¤t_thread().reactor) +pub(crate) fn with_current_driver(f: impl FnOnce(&Driver) -> T) -> T { + f(¤t_thread().driver) } +/// Queues a macrotask on the current runtime thread. +/// +/// The task runs after all currently-queued macrotasks, and after all microtasks. +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its state for the current thread. pub fn queue_task(task: F) where F: FnOnce() + 'static, @@ -69,6 +98,14 @@ where push_local_macrotask(Box::new(task)); } +/// Queues a microtask on the current runtime thread. +/// +/// Microtasks run before the next macrotask turn, mirroring JavaScript-style event loop +/// semantics. +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its state for the current thread. pub fn queue_microtask(task: F) where F: FnOnce() + 'static, @@ -79,6 +116,11 @@ where .push_back(Box::new(task)); } +/// Schedules a one-shot timer on the current runtime thread. +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its state for the current thread. pub fn set_timeout(delay: Duration, callback: F) -> TimeoutHandle where F: FnOnce() + 'static, @@ -98,10 +140,22 @@ where } } +/// Cancels a timeout previously created by [`set_timeout`]. +/// +/// # Panics +/// +/// Panics if called from a different runtime thread than the one that created `handle`. pub fn clear_timeout(handle: &TimeoutHandle) { clear_timer(handle.owner, handle.id); } +/// Schedules a repeating timer on the current runtime thread. +/// +/// The callback is invoked once per interval until the handle is cleared. +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its state for the current thread. pub fn set_interval(delay: Duration, callback: F) -> IntervalHandle where F: FnMut() + 'static, @@ -126,10 +180,33 @@ where } } +/// Cancels an interval previously created by [`set_interval`]. +/// +/// # Panics +/// +/// Panics if called from a different runtime thread than the one that created `handle`. pub fn clear_interval(handle: &IntervalHandle) { clear_timer(handle.owner, handle.id); } +/// Queues a future on the current runtime thread. +/// +/// The future is scheduled immediately and can be awaited through the returned [`JoinHandle`]. +/// +/// The future will be driven to completion regardless or whether the join handle is polled or dropped, so this function +/// can be used as a convenient way to spawn detached async tasks on the current thread. +/// +/// # Examples +/// +/// ``` +/// # let _ = || { +/// let handle = ruin_runtime::queue_future(async { 42usize }); +/// # }; +/// ``` +/// +/// # Panics +/// +/// Panics if the runtime cannot initialize its state for the current thread. pub fn queue_future(future: F) -> JoinHandle where F: Future + 'static, @@ -150,13 +227,21 @@ where JoinHandle { state } } +/// Spawns a worker runtime thread. +/// +/// `initial_task` is queued onto the worker as its first macrotask. `on_exit` runs on the parent +/// runtime thread after the worker shuts down. +/// +/// # Panics +/// +/// Panics if the worker thread or its driver cannot be created. pub fn spawn_worker(initial_task: Init, on_exit: Exit) -> WorkerHandle where Init: FnOnce() + Send + 'static, Exit: FnOnce() + 'static, { let parent = current_thread(); - let (reactor, notifier) = create_reactor().expect("worker reactor should initialize"); + let (driver, notifier) = create_driver().expect("worker driver should initialize"); let shared = Arc::new(ThreadShared::new(notifier)); let handle = ThreadHandle { shared: Arc::clone(&shared), @@ -175,7 +260,7 @@ where std::thread::Builder::new() .name("ruin-runtime-worker".into()) .spawn(move || { - install_thread(shared, reactor, Some(worker_completion)); + install_thread(shared, driver, Some(worker_completion)); queue_task(initial_task); run(); }) @@ -187,6 +272,14 @@ where } } +/// Runs the current runtime thread until no work, timers, child workers, or async operations +/// remain. +/// +/// This is the main event loop entry point used by the proc-macro entry attributes. +/// +/// # Panics +/// +/// Panics if runtime initialization fails or if the underlying driver returns an unexpected error. pub fn run() { let _ = current_thread(); @@ -223,7 +316,7 @@ pub fn run() { if has_pending_timers() || state.has_live_children() || state.has_live_async_operations() { state.shared.closing.store(false, Ordering::Release); - state.reactor.wait().expect("reactor wait should succeed"); + state.driver.wait().expect("driver wait should succeed"); continue; } @@ -240,16 +333,20 @@ pub fn run() { } fn drain_all() { - drain_reactor_events(); + drain_driver_events(); drain_remote_tasks(); drain_completed_workers(); } +/// Returns a future that yields back to the runtime scheduler once. pub fn yield_now() -> YieldNow { YieldNow { yielded: false } } impl ThreadHandle { + /// Queues a macrotask onto this runtime thread. + /// + /// Returns `false` if the target thread is already closed. pub fn queue_task(&self, task: F) -> bool where F: FnOnce() + Send + 'static, @@ -257,6 +354,9 @@ impl ThreadHandle { self.shared.enqueue_macro(Box::new(task)) } + /// Queues a microtask onto this runtime thread. + /// + /// Returns `false` if the target thread is already closed. pub fn queue_microtask(&self, task: F) -> bool where F: FnOnce() + Send + 'static, @@ -264,6 +364,7 @@ impl ThreadHandle { self.shared.enqueue_micro(Box::new(task)) } + /// Returns `true` if the target runtime thread has shut down. pub fn is_closed(&self) -> bool { self.shared.closed.load(Ordering::Acquire) } @@ -282,6 +383,9 @@ impl ThreadHandle { } impl WorkerHandle { + /// Queues a macrotask onto the worker thread. + /// + /// Returns `false` if the worker has already shut down. pub fn queue_task(&self, task: F) -> bool where F: FnOnce() + Send + 'static, @@ -289,6 +393,9 @@ impl WorkerHandle { self.thread.queue_task(task) } + /// Queues a microtask onto the worker thread. + /// + /// Returns `false` if the worker has already shut down. pub fn queue_microtask(&self, task: F) -> bool where F: FnOnce() + Send + 'static, @@ -296,10 +403,12 @@ impl WorkerHandle { self.thread.queue_microtask(task) } + /// Returns `true` once the worker thread has fully exited. pub fn is_finished(&self) -> bool { self.completion.finished.load(Ordering::Acquire) } + /// Returns a generic [`ThreadHandle`] for the worker thread. pub fn thread(&self) -> ThreadHandle { self.thread.clone() } @@ -328,7 +437,7 @@ impl Future for YieldNow { } struct ThreadState { - reactor: Reactor, + driver: Driver, shared: Arc, worker_completion: Option>, local_microtasks: RefCell>, @@ -341,11 +450,11 @@ struct ThreadState { impl ThreadState { fn new( shared: Arc, - reactor: Reactor, + driver: Driver, worker_completion: Option>, ) -> Self { Self { - reactor, + driver, shared, worker_completion, local_microtasks: RefCell::new(VecDeque::new()), @@ -695,11 +804,11 @@ unsafe fn future_task_drop(data: *const ()) { fn current_thread() -> &'static ThreadState { unsafe { if CURRENT_THREAD.is_null() { - let (reactor, notifier) = create_reactor().expect("runtime reactor should initialize"); + let (driver, notifier) = create_driver().expect("runtime driver should initialize"); let shared = Arc::new(ThreadShared::new(notifier)); - let state = Box::new(ThreadState::new(shared, reactor, None)); + let state = Box::new(ThreadState::new(shared, driver, None)); let state = Box::into_raw(state); - (*state).reactor.bind_current_thread(); + (*state).driver.bind_current_thread(); CURRENT_THREAD = state; } @@ -713,14 +822,14 @@ fn current_thread_ptr() -> *const ThreadState { fn install_thread( shared: Arc, - reactor: Reactor, + driver: Driver, worker_completion: Option>, ) { unsafe { debug_assert!(CURRENT_THREAD.is_null(), "thread runtime already installed"); - let state = Box::new(ThreadState::new(shared, reactor, worker_completion)); + let state = Box::new(ThreadState::new(shared, driver, worker_completion)); let state = Box::into_raw(state); - (*state).reactor.bind_current_thread(); + (*state).driver.bind_current_thread(); CURRENT_THREAD = state; } } @@ -731,18 +840,18 @@ fn teardown_thread() { CURRENT_THREAD = ptr::null_mut(); if !state.is_null() { - (*state).reactor.unbind_current_thread(); + (*state).driver.unbind_current_thread(); drop(Box::from_raw(state)); } } } -fn drain_reactor_events() { +fn drain_driver_events() { loop { let ready = current_thread() - .reactor + .driver .poll() - .expect("reactor poll should succeed"); + .expect("driver poll should succeed"); let Some(ready) = ready else { break; @@ -751,13 +860,13 @@ fn drain_reactor_events() { let state = current_thread(); if ready.wake { let _ = state - .reactor + .driver .drain_wake() .expect("wake drain should succeed"); } if ready.timer { let _ = state - .reactor + .driver .drain_timer() .expect("timer drain should succeed"); dispatch_expired_timers(); @@ -906,9 +1015,9 @@ fn dispatch_expired_timers() { fn rearm_thread_timer() { let deadline = current_thread().timers.borrow().peek_deadline(); current_thread() - .reactor + .driver .rearm_timer(deadline) - .expect("timerfd rearm should succeed"); + .expect("driver timer rearm should succeed"); } fn deadline_from_now(delay: Duration) -> Duration { diff --git a/lib/runtime/src/sys/linux/fs.rs b/lib/runtime/src/sys/linux/fs.rs index 84e39da..88aef7c 100644 --- a/lib/runtime/src/sys/linux/fs.rs +++ b/lib/runtime/src/sys/linux/fs.rs @@ -16,7 +16,7 @@ use std::thread; use crate::op::completion::completion_for_current_thread; use crate::op::fs::{FileType, FsOp, MetadataTarget, OpenOptions, RawDirEntry, RawMetadata}; use crate::platform::linux_x86_64::runtime::{ - ThreadHandle, current_thread_handle, with_current_reactor, + ThreadHandle, current_thread_handle, with_current_driver, }; use crate::platform::linux_x86_64::uring::{ IORING_FSYNC_DATASYNC, IORING_OP_CLOSE, IORING_OP_FSYNC, IORING_OP_FTRUNCATE, @@ -474,14 +474,14 @@ where { let (future, handle) = completion_for_current_thread::>(); let callback_handle = handle.clone(); - let token = with_current_reactor(|reactor| { - reactor.submit_operation(fill, move |cqe| { + let token = with_current_driver(|driver| { + driver.submit_operation(fill, move |cqe| { callback_handle.complete(map(cqe)); }) })?; handle.set_cancel(move || { - let _ = with_current_reactor(|reactor| reactor.cancel_operation(token)); + let _ = with_current_driver(|driver| driver.cancel_operation(token)); }); future.await diff --git a/lib/runtime/src/sys/linux/net.rs b/lib/runtime/src/sys/linux/net.rs index 85111dc..df81b87 100644 --- a/lib/runtime/src/sys/linux/net.rs +++ b/lib/runtime/src/sys/linux/net.rs @@ -14,7 +14,7 @@ use std::time::Duration; use crate::op::completion::completion_for_current_thread; use crate::op::net::{AcceptedSocket, NetOp, ReceivedDatagram}; -use crate::platform::linux_x86_64::runtime::with_current_reactor; +use crate::platform::linux_x86_64::runtime::with_current_driver; use crate::platform::linux_x86_64::uring::{ IORING_OP_ACCEPT, IORING_OP_BIND, IORING_OP_CLOSE, IORING_OP_CONNECT, IORING_OP_LISTEN, IORING_OP_RECV, IORING_OP_SEND, IORING_OP_SHUTDOWN, IORING_OP_SOCKET, IoUringCqe, IoUringSqe, @@ -556,14 +556,14 @@ where { let (future, handle) = completion_for_current_thread::>(); let callback_handle = handle.clone(); - let token = with_current_reactor(|reactor| { - reactor.submit_operation(fill, move |cqe| { + let token = with_current_driver(|driver| { + driver.submit_operation(fill, move |cqe| { callback_handle.complete(map(cqe)); }) })?; handle.set_cancel(move || { - let _ = with_current_reactor(|reactor| reactor.cancel_operation(token)); + let _ = with_current_driver(|driver| driver.cancel_operation(token)); }); future.await diff --git a/lib/runtime/src/time.rs b/lib/runtime/src/time.rs index 35cda8c..930611c 100644 --- a/lib/runtime/src/time.rs +++ b/lib/runtime/src/time.rs @@ -1,9 +1,11 @@ //! Runtime time primitives. +//! +//! These helpers integrate with the runtime's timer queue and are designed to be used from +//! futures scheduled with [`crate::queue_future`] or one of the runtime entry macros. use std::cell::{Cell, RefCell}; use std::fmt; use std::future::{Future, poll_fn}; -use std::io; use std::pin::Pin; use std::rc::Rc; use std::task::Waker; @@ -12,6 +14,7 @@ use std::time::Duration; use crate::{clear_timeout, set_timeout}; +/// Future returned by [`sleep`]. pub struct Sleep { delay: Option, state: Option>, @@ -20,8 +23,18 @@ pub struct Sleep { } #[derive(Clone, Copy, Debug, Eq, PartialEq)] +/// Error returned by [`timeout`] when the deadline expires first. pub struct Elapsed; +/// Returns a future that completes after `duration` has elapsed on the current runtime thread. +/// +/// # Examples +/// +/// ``` +/// # let _ = || async { +/// ruin_runtime::time::sleep(std::time::Duration::from_millis(10)).await; +/// # }; +/// ``` pub fn sleep(duration: Duration) -> Sleep { Sleep { delay: Some(duration), @@ -31,6 +44,24 @@ pub fn sleep(duration: Duration) -> Sleep { } } +/// Runs `future` until it completes or `duration` elapses, whichever happens first. +/// +/// The wrapped future is dropped when the timeout fires. As with other runtime operations, dropping +/// a future cancels interest in the result but does not guarantee cancellation of any underlying +/// OS work that future may have started. +/// +/// # Examples +/// +/// ``` +/// # let _ = || async { +/// let result = ruin_runtime::time::timeout( +/// std::time::Duration::from_millis(5), +/// async { 42usize }, +/// ) +/// .await; +/// assert_eq!(result, Ok(42)); +/// # }; +/// ``` pub async fn timeout(duration: Duration, future: F) -> Result where F: Future, @@ -52,10 +83,6 @@ where .await } -pub fn timeout_error(action: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::TimedOut, format!("{action} timed out")) -} - impl Future for Sleep { type Output = ();