Rename reactor -> driver, prep for lib/reactivity

This commit is contained in:
2026-03-19 19:47:06 -04:00
parent 3fd8209420
commit 7b3c2fcbef
13 changed files with 490 additions and 62 deletions

4
Cargo.lock generated
View File

@@ -151,6 +151,10 @@ dependencies = [
"syn",
]
[[package]]
name = "ruin_reactivity"
version = "0.1.0"
[[package]]
name = "smallvec"
version = "1.15.1"

View File

@@ -1,4 +1,14 @@
//! Async channels for inter-thread communication.
//!
//! The channel types in this module are userspace synchronization primitives. They do not carry
//! messages through the kernel; instead, channel state lives in shared Rust data structures and
//! the runtime uses `io_uring` `MSG_RING` notifications only to wake the owning runtime thread
//! when an async waiter becomes ready.
//!
//! The initial surface includes:
//!
//! - [`oneshot`] for single-value handoff
//! - [`mpsc`] for bounded and unbounded multi-producer/single-consumer queues
pub mod mpsc;
pub mod oneshot;

View File

@@ -1,3 +1,5 @@
//! Multi-producer, single-consumer channels.
use std::collections::VecDeque;
use std::future::poll_fn;
use std::pin::Pin;
@@ -7,6 +9,13 @@ use std::task::{Context, Poll};
use crate::op::completion::{CompletionFuture, CompletionHandle};
use crate::sys::linux::channel::runtime_waiter;
/// Creates a bounded channel with room for at most `capacity` queued messages.
///
/// Bounded senders provide both [`Sender::try_send`] and async [`Sender::send`] backpressure.
///
/// # Panics
///
/// Panics if `capacity == 0`.
pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "bounded channels require capacity > 0");
let shared = Arc::new(Mutex::new(State::new(Some(capacity))));
@@ -18,6 +27,9 @@ pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
)
}
/// Creates an unbounded channel.
///
/// Unbounded senders never wait for capacity, but the single receiver is still asynchronous.
pub fn unbounded_channel<T: Send + 'static>() -> (UnboundedSender<T>, Receiver<T>) {
let shared = Arc::new(Mutex::new(State::new(None)));
(
@@ -28,14 +40,17 @@ pub fn unbounded_channel<T: Send + 'static>() -> (UnboundedSender<T>, Receiver<T
)
}
/// Bounded multi-producer sender.
pub struct Sender<T: Send + 'static> {
shared: Arc<Mutex<State<T>>>,
}
/// Unbounded multi-producer sender.
pub struct UnboundedSender<T: Send + 'static> {
shared: Arc<Mutex<State<T>>>,
}
/// Single consumer for both bounded and unbounded MPSC channels.
pub struct Receiver<T: Send + 'static> {
shared: Arc<Mutex<State<T>>>,
}
@@ -57,17 +72,24 @@ struct SendWaiter<T: Send + 'static> {
}
#[derive(Debug, Eq, PartialEq)]
/// Error returned when sending fails because the receiver has been closed or dropped.
pub struct SendError<T>(pub T);
#[derive(Debug, Eq, PartialEq)]
/// Error returned by [`Sender::try_send`] when a message cannot be queued immediately.
pub enum TrySendError<T> {
/// The bounded queue is currently full.
Full(T),
/// The receiver has been closed or dropped.
Closed(T),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
/// Error returned by [`Receiver::try_recv`] when no message is available immediately.
pub enum TryRecvError {
/// The channel is still open, but currently empty.
Empty,
/// The channel is closed and no more messages can arrive.
Disconnected,
}
@@ -215,12 +237,20 @@ impl<T: Send + 'static> Clone for UnboundedSender<T> {
}
impl<T: Send + 'static> Sender<T> {
/// Waits until the message can be queued.
///
/// When the bounded channel is full, this future waits until the receiver frees capacity.
///
/// # Panics
///
/// Panics if this future is first polled outside a runtime-managed thread.
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
let mut value = Some(value);
let mut wait = None;
poll_fn(|cx| self.poll_send(cx, &mut value, &mut wait)).await
}
/// Attempts to queue a message immediately.
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
self.shared
.lock()
@@ -228,6 +258,7 @@ impl<T: Send + 'static> Sender<T> {
.try_send_now(value)
}
/// Returns `true` if the receiver has been closed or dropped.
pub fn is_closed(&self) -> bool {
self.shared
.lock()
@@ -306,6 +337,7 @@ impl<T: Send + 'static> Sender<T> {
}
impl<T: Send + 'static> UnboundedSender<T> {
/// Queues a message immediately.
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.shared
.lock()
@@ -316,6 +348,7 @@ impl<T: Send + 'static> UnboundedSender<T> {
})
}
/// Returns `true` if the receiver has been closed or dropped.
pub fn is_closed(&self) -> bool {
self.shared
.lock()
@@ -325,11 +358,19 @@ impl<T: Send + 'static> UnboundedSender<T> {
}
impl<T: Send + 'static> Receiver<T> {
/// Waits for the next message.
///
/// Returns `None` when the channel is closed and all buffered messages have been drained.
///
/// # Panics
///
/// Panics if this future is first polled outside a runtime-managed thread.
pub async fn recv(&mut self) -> Option<T> {
let mut wait = None;
poll_fn(|cx| self.poll_recv(cx, &mut wait)).await
}
/// Attempts to receive a message immediately.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut state = self
.shared
@@ -345,6 +386,10 @@ impl<T: Send + 'static> Receiver<T> {
}
}
/// Closes the channel to future sends.
///
/// Already-buffered messages remain available to [`recv`](Self::recv) and
/// [`try_recv`](Self::try_recv).
pub fn close(&mut self) {
self.shared
.lock()
@@ -352,6 +397,7 @@ impl<T: Send + 'static> Receiver<T> {
.close_receiver();
}
/// Returns `true` if the channel is closed or all senders have been dropped.
pub fn is_closed(&self) -> bool {
let state = self
.shared

View File

@@ -1,3 +1,5 @@
//! Single-use channels for handing one value from a sender to a receiver.
use std::future::poll_fn;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
@@ -6,6 +8,15 @@ use std::task::{Context, Poll};
use crate::op::completion::{CompletionFuture, CompletionHandle};
use crate::sys::linux::channel::runtime_waiter;
/// Creates a single-use channel for transferring one value from a [`Sender`] to a [`Receiver`].
///
/// # Examples
///
/// ```
/// let (sender, mut receiver) = ruin_runtime::channel::oneshot::channel::<usize>();
/// sender.send(7).unwrap();
/// assert_eq!(receiver.try_recv(), Ok(7));
/// ```
pub fn channel<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Mutex::new(State {
value: None,
@@ -24,10 +35,12 @@ pub fn channel<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
)
}
/// Sending half of a oneshot channel.
pub struct Sender<T: Send + 'static> {
shared: Option<Arc<Mutex<State<T>>>>,
}
/// Receiving half of a oneshot channel.
pub struct Receiver<T: Send + 'static> {
shared: Arc<Mutex<State<T>>>,
consumed: bool,
@@ -41,18 +54,27 @@ struct State<T: Send + 'static> {
}
#[derive(Debug, Eq, PartialEq)]
/// Error returned when a oneshot send fails because the receiver is gone or closed.
pub struct SendError<T>(pub T);
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
/// Error returned when a oneshot receive observes a closed channel with no value.
pub struct RecvError;
#[derive(Debug, Eq, PartialEq)]
/// Non-blocking receive errors for [`Receiver::try_recv`].
pub enum TryRecvError {
/// No value has been sent yet, and the sender is still alive.
Empty,
/// The channel can never yield a value.
Closed,
}
impl<T: Send + 'static> Sender<T> {
/// Sends `value` into the channel.
///
/// This consumes the sender. If the receiver is already waiting on a runtime thread, the
/// receive future is woken through the runtime's cross-thread notifier path.
pub fn send(mut self, value: T) -> Result<(), SendError<T>> {
let Some(shared) = self.shared.take() else {
return Err(SendError(value));
@@ -80,6 +102,7 @@ impl<T: Send + 'static> Sender<T> {
Ok(())
}
/// Returns `true` if the receiver has been closed or dropped.
pub fn is_closed(&self) -> bool {
self.shared.as_ref().is_none_or(|shared| {
shared
@@ -91,11 +114,19 @@ impl<T: Send + 'static> Sender<T> {
}
impl<T: Send + 'static> Receiver<T> {
/// Waits for the channel's value.
///
/// # Panics
///
/// Panics if this future is first polled outside a runtime-managed thread. Async channel
/// waiting registers with the current runtime thread so it can be woken through the driver's
/// notification path.
pub async fn recv(&mut self) -> Result<T, RecvError> {
let mut wait = None;
poll_fn(|cx| self.poll_recv(cx, &mut wait)).await
}
/// Attempts to receive the value without waiting.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
if self.consumed {
return Err(TryRecvError::Closed);
@@ -118,6 +149,10 @@ impl<T: Send + 'static> Receiver<T> {
}
}
/// Closes the receiver.
///
/// Closing prevents future sends from succeeding. If a value has already been sent, it can
/// still be retrieved.
pub fn close(&mut self) {
let mut state = self
.shared
@@ -126,6 +161,7 @@ impl<T: Send + 'static> Receiver<T> {
state.receiver_closed = true;
}
/// Returns `true` if the channel is closed to future sends.
pub fn is_closed(&self) -> bool {
let state = self
.shared

View File

@@ -4,6 +4,9 @@
//! - Dropping an I/O future cancels interest in the result.
//! - The runtime issues best-effort kernel cancellation where supported.
//! - The underlying OS operation may still complete after the future is dropped.
//!
//! The public surface intentionally mirrors `std::fs` where that shape makes sense, while using
//! async methods for operations that may block the caller.
use std::ffi::OsStr;
use std::io;
@@ -21,33 +24,43 @@ struct FileInner {
fd: OwnedFd,
}
/// Async file handle.
///
/// `File` is cheap to clone internally and supports both cursor-based sequential I/O and
/// offset-based positioned I/O.
pub struct File {
inner: Arc<FileInner>,
}
/// Builder used to configure how a [`File`] is opened.
pub struct OpenOptions {
inner: OpOpenOptions,
}
#[derive(Clone, Debug, Eq, PartialEq)]
/// File metadata returned by [`metadata`] or [`File::metadata`].
pub struct Metadata {
inner: RawMetadata,
}
/// Async directory-entry stream returned by [`read_dir`].
pub struct ReadDir {
inner: sys_fs::ReadDirStream,
}
#[derive(Clone, Debug, Eq, PartialEq)]
/// Directory entry yielded by [`ReadDir::next_entry`].
pub struct DirEntry {
inner: OpDirEntry,
}
impl File {
/// Opens an existing file for reading.
pub async fn open(path: impl AsRef<Path>) -> io::Result<Self> {
OpenOptions::new().read(true).open(path).await
}
/// Opens a file for writing, creating or truncating it first.
pub async fn create(path: impl AsRef<Path>) -> io::Result<Self> {
OpenOptions::new()
.write(true)
@@ -57,10 +70,12 @@ impl File {
.await
}
/// Reads bytes from the file's current cursor position.
pub async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read_impl(None, buf).await
}
/// Reads exactly `buf.len()` bytes from the current cursor position.
pub async fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> {
while !buf.is_empty() {
let read = self.read(buf).await?;
@@ -75,10 +90,12 @@ impl File {
Ok(())
}
/// Writes bytes at the file's current cursor position.
pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_impl(None, buf).await
}
/// Writes the entire buffer at the file's current cursor position.
pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
while !buf.is_empty() {
let written = self.write(buf).await?;
@@ -93,22 +110,30 @@ impl File {
Ok(())
}
/// Flushes any userspace buffering associated with this handle.
///
/// The current implementation does not add additional buffering beyond the kernel file
/// description, so this is effectively a no-op.
pub async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
/// Synchronizes file contents and metadata to stable storage.
pub async fn sync_all(&self) -> io::Result<()> {
sys_fs::sync_all(FsOp::SyncAll { fd: self.raw_fd() }).await
}
/// Synchronizes file contents to stable storage.
pub async fn sync_data(&self) -> io::Result<()> {
sys_fs::sync_data(FsOp::SyncData { fd: self.raw_fd() }).await
}
/// Reads bytes starting at `offset` without using the shared file cursor.
pub async fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
self.read_impl(Some(offset), buf).await
}
/// Reads exactly `buf.len()` bytes starting at `offset`.
pub async fn read_exact_at(&self, mut offset: u64, mut buf: &mut [u8]) -> io::Result<()> {
while !buf.is_empty() {
let read = self.read_at(offset, buf).await?;
@@ -124,10 +149,12 @@ impl File {
Ok(())
}
/// Writes bytes starting at `offset` without using the shared file cursor.
pub async fn write_at(&self, offset: u64, buf: &[u8]) -> io::Result<usize> {
self.write_impl(Some(offset), buf).await
}
/// Writes the entire buffer starting at `offset`.
pub async fn write_all_at(&self, mut offset: u64, mut buf: &[u8]) -> io::Result<()> {
while !buf.is_empty() {
let written = self.write_at(offset, buf).await?;
@@ -143,6 +170,7 @@ impl File {
Ok(())
}
/// Returns metadata for this file handle.
pub async fn metadata(&self) -> io::Result<Metadata> {
sys_fs::metadata(FsOp::Metadata {
target: MetadataTarget::File(self.raw_fd()),
@@ -152,6 +180,7 @@ impl File {
.map(Metadata::from_raw)
}
/// Truncates or extends the underlying file to `len` bytes.
pub async fn set_len(&self, len: u64) -> io::Result<()> {
sys_fs::set_len(FsOp::SetLen {
fd: self.raw_fd(),
@@ -160,6 +189,9 @@ impl File {
.await
}
/// Duplicates the underlying file description.
///
/// As with `std::fs::File::try_clone`, the cloned handle shares kernel-managed cursor state.
pub async fn try_clone(&self) -> io::Result<Self> {
sys_fs::try_clone(FsOp::Duplicate { fd: self.raw_fd() })
.await
@@ -200,42 +232,63 @@ impl File {
}
impl OpenOptions {
/// Creates a blank set of open options.
pub fn new() -> Self {
Self {
inner: OpOpenOptions::default(),
}
}
/// Controls read access.
pub fn read(&mut self, value: bool) -> &mut Self {
self.inner.read = value;
self
}
/// Controls write access.
pub fn write(&mut self, value: bool) -> &mut Self {
self.inner.write = value;
self
}
/// Controls append mode.
pub fn append(&mut self, value: bool) -> &mut Self {
self.inner.append = value;
self
}
/// Controls whether the file is truncated after opening.
pub fn truncate(&mut self, value: bool) -> &mut Self {
self.inner.truncate = value;
self
}
/// Controls whether the file is created if it does not already exist.
pub fn create(&mut self, value: bool) -> &mut Self {
self.inner.create = value;
self
}
/// Controls whether opening must create a brand-new file.
pub fn create_new(&mut self, value: bool) -> &mut Self {
self.inner.create_new = value;
self
}
/// Opens a file with the configured options.
///
/// # Examples
///
/// ```
/// # let _ = || async {
/// let file = ruin_runtime::fs::OpenOptions::new()
/// .read(true)
/// .write(true)
/// .open("example.txt")
/// .await;
/// # let _ = file;
/// # };
/// ```
pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
sys_fs::open(FsOp::Open {
path: path.as_ref().to_path_buf(),
@@ -257,32 +310,39 @@ impl Metadata {
Self { inner }
}
/// Returns the file length in bytes.
pub fn len(&self) -> u64 {
self.inner.len
}
/// Returns `true` if the file length is zero.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Returns `true` if this metadata describes a regular file.
pub fn is_file(&self) -> bool {
self.inner.file_type == RawFileType::File
}
/// Returns `true` if this metadata describes a directory.
pub fn is_dir(&self) -> bool {
self.inner.file_type == RawFileType::Directory
}
/// Returns `true` if this metadata describes a symbolic link.
pub fn is_symlink(&self) -> bool {
self.inner.file_type == RawFileType::Symlink
}
/// Returns the raw POSIX mode bits reported by the platform backend.
pub fn mode(&self) -> u16 {
self.inner.mode
}
}
impl ReadDir {
/// Returns the next directory entry, or `None` once the stream is exhausted.
pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> {
self.inner
.next_entry()
@@ -292,19 +352,23 @@ impl ReadDir {
}
impl DirEntry {
/// Returns the full path to this directory entry.
pub fn path(&self) -> PathBuf {
self.inner.path.clone()
}
/// Returns the file name portion of this directory entry.
pub fn file_name(&self) -> &OsStr {
self.inner.file_name.as_os_str()
}
/// Resolves metadata for this entry.
pub async fn metadata(&self) -> io::Result<Metadata> {
metadata(self.path()).await
}
}
/// Reads the entire contents of a file into memory.
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let mut file = File::open(path.as_ref()).await?;
let mut output = Vec::new();
@@ -319,11 +383,13 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
}
}
/// Reads the entire contents of a UTF-8 file into a [`String`].
pub async fn read_to_string(path: impl AsRef<Path>) -> io::Result<String> {
let bytes = read(path).await?;
String::from_utf8(bytes).map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
}
/// Replaces the contents of a file with `data`, creating it if needed.
pub async fn write(path: impl AsRef<Path>, data: impl AsRef<[u8]>) -> io::Result<()> {
let mut file = OpenOptions::new()
.write(true)
@@ -334,6 +400,7 @@ pub async fn write(path: impl AsRef<Path>, data: impl AsRef<[u8]>) -> io::Result
file.write_all(data.as_ref()).await
}
/// Returns metadata for a filesystem path.
pub async fn metadata(path: impl AsRef<Path>) -> io::Result<Metadata> {
sys_fs::metadata(FsOp::Metadata {
target: MetadataTarget::Path(path.as_ref().to_path_buf()),
@@ -343,6 +410,7 @@ pub async fn metadata(path: impl AsRef<Path>) -> io::Result<Metadata> {
.map(Metadata::from_raw)
}
/// Creates a single directory.
pub async fn create_dir(path: impl AsRef<Path>) -> io::Result<()> {
sys_fs::create_dir(FsOp::CreateDir {
path: path.as_ref().to_path_buf(),
@@ -352,6 +420,7 @@ pub async fn create_dir(path: impl AsRef<Path>) -> io::Result<()> {
.await
}
/// Creates a directory and any missing parent directories.
pub async fn create_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
let mut current = PathBuf::new();
@@ -372,6 +441,7 @@ pub async fn create_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
Ok(())
}
/// Removes a file.
pub async fn remove_file(path: impl AsRef<Path>) -> io::Result<()> {
sys_fs::remove_file(FsOp::RemoveFile {
path: path.as_ref().to_path_buf(),
@@ -379,6 +449,7 @@ pub async fn remove_file(path: impl AsRef<Path>) -> io::Result<()> {
.await
}
/// Removes an empty directory.
pub async fn remove_dir(path: impl AsRef<Path>) -> io::Result<()> {
sys_fs::remove_dir(FsOp::RemoveDir {
path: path.as_ref().to_path_buf(),
@@ -386,6 +457,7 @@ pub async fn remove_dir(path: impl AsRef<Path>) -> io::Result<()> {
.await
}
/// Renames or moves a filesystem entry.
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
sys_fs::rename(FsOp::Rename {
from: from.as_ref().to_path_buf(),
@@ -394,6 +466,16 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
.await
}
/// Opens an async directory-entry stream.
///
/// # Examples
///
/// ```
/// # let _ = || async {
/// let mut entries = ruin_runtime::fs::read_dir(".").await.unwrap();
/// let _ = entries.next_entry().await;
/// # };
/// ```
pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
sys_fs::read_dir(FsOp::ReadDir {
path: path.as_ref().to_path_buf(),

View File

@@ -1,6 +1,21 @@
//! Runtime, driver, async I/O, and channel primitives for RUIN.
//!
//! The crate is centered around a single-threaded event loop with explicit worker threads,
//! JavaScript-style microtask/macrotask scheduling, and Linux `io_uring`-backed I/O.
//!
//! Most users will start with:
//!
//! - [`main`] or [`async_main`] for executable entry points
//! - [`run`], [`queue_task`], [`queue_microtask`], and [`queue_future`] for event-loop work
//! - [`fs`], [`net`], [`time`], and [`channel`] for async runtime services
//!
//! # Platform support
//!
//! `ruin-runtime` currently targets Linux on `x86_64`.
//!
//! RUIN runtime foundations.
//!
//! This crate provides a Linux x86_64 runtime substrate: the mesh allocator, the reactor, and a
//! This crate provides a Linux x86_64 runtime substrate: the mesh allocator, the driver, and a
//! single-threaded runtime loop with worker-thread task forwarding.
#![feature(thread_local)]
@@ -13,13 +28,31 @@ extern crate alloc;
pub mod channel;
pub mod fs;
pub mod net;
#[doc(hidden)]
pub mod op;
#[doc(hidden)]
pub mod platform;
#[doc(hidden)]
pub mod sys;
pub mod time;
pub use ruin_runtime_proc_macros::{async_main, main};
/// Marks an `async fn main()` as the runtime entry point.
///
/// The macro generates a real Rust `main` that queues the returned future onto the main runtime
/// thread before calling [`run`].
pub use ruin_runtime_proc_macros::async_main;
/// Marks a synchronous `fn main()` as the runtime entry point.
///
/// The macro generates a real Rust `main` that queues the function body onto the main runtime
/// thread before calling [`run`].
pub use ruin_runtime_proc_macros::main;
/// Driver primitives re-exported from the Linux x86_64 backend.
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub use platform::linux_x86_64::driver::{
Driver, ReadyEvents, ThreadNotifier, create, create_driver, monotonic_now,
};
/// Public mesh-allocator surface re-exported from the Linux x86_64 backend.
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub use platform::linux_x86_64::mesh_alloc::{
ActiveMeshGuard, Arena, AtomicBitmap, BitIter, CLASS_TO_SIZE, CompactionAdvice,
@@ -32,12 +65,10 @@ pub use platform::linux_x86_64::mesh_alloc::{
page_shift, page_size, retry_on_efault, retry_on_efault_ptrs, round_up_to_page,
runtime_slots_per_span, size_class_for,
};
/// Additional allocator helpers re-exported from the Linux x86_64 backend.
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub use platform::linux_x86_64::mesh_alloc::{FreelistId, bitmaps_meshable};
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub use platform::linux_x86_64::reactor::{
Reactor, ReadyEvents, ThreadNotifier, create, create_reactor, monotonic_now,
};
/// Runtime/event-loop primitives re-exported from the Linux x86_64 backend.
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub use platform::linux_x86_64::runtime::{
IntervalHandle, JoinHandle, ThreadHandle, TimeoutHandle, WorkerHandle, clear_interval,
@@ -45,6 +76,9 @@ pub use platform::linux_x86_64::runtime::{
set_interval, set_timeout, spawn_worker, yield_now,
};
/// Returns the default global mesh allocator configuration for this crate.
///
/// This is useful when embedding the allocator in a `#[global_allocator]` static.
pub const fn default_global_allocator() -> GlobalMeshAllocator {
GlobalMeshAllocator::with_default_config()
}

View File

@@ -1,4 +1,7 @@
//! Portable async networking API.
//!
//! The public surface follows the general shape of `std::net`, but uses async methods for socket
//! operations that would otherwise block the caller.
use std::future::Future;
use std::io;
@@ -40,6 +43,10 @@ type PendingRead = Pin<Box<dyn Future<Output = io::Result<Vec<u8>>> + 'static>>;
type PendingWrite = Pin<Box<dyn Future<Output = io::Result<usize>> + 'static>>;
type PendingShutdown = Pin<Box<dyn Future<Output = io::Result<()>> + 'static>>;
/// Async TCP stream.
///
/// This type also implements Hyper's runtime I/O traits, allowing it to be used directly as an
/// HTTP transport.
pub struct TcpStream {
inner: Arc<TcpStreamInner>,
pending_read: Option<PendingRead>,
@@ -48,16 +55,19 @@ pub struct TcpStream {
}
#[derive(Clone, Debug)]
/// Async TCP listening socket.
pub struct TcpListener {
inner: Arc<TcpListenerInner>,
}
#[derive(Debug)]
/// Async UDP socket.
pub struct UdpSocket {
inner: Arc<UdpSocketInner>,
}
impl TcpStream {
/// Connects to the first resolved address that succeeds.
pub async fn connect<A>(addr: A) -> io::Result<Self>
where
A: ToSocketAddrs + Send + 'static,
@@ -79,6 +89,7 @@ impl TcpStream {
}))
}
/// Connects to `addr`, failing if the deadline elapses first.
pub async fn connect_timeout(addr: &SocketAddr, timeout: Duration) -> io::Result<Self> {
validate_timeout(timeout)?;
crate::sys::linux::net::connect_stream_timeout(*addr, timeout)
@@ -86,6 +97,7 @@ impl TcpStream {
.map(Self::from_owned_fd)
}
/// Reads bytes from the stream.
pub async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let data = match self.read_timeout_value() {
Some(timeout) => {
@@ -105,6 +117,7 @@ impl TcpStream {
Ok(read)
}
/// Reads exactly `buf.len()` bytes from the stream.
pub async fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> {
while !buf.is_empty() {
let read = self.read(buf).await?;
@@ -119,6 +132,7 @@ impl TcpStream {
Ok(())
}
/// Writes bytes to the stream.
pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.write_timeout_value() {
Some(timeout) => {
@@ -135,6 +149,7 @@ impl TcpStream {
}
}
/// Writes the entire buffer to the stream.
pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
while !buf.is_empty() {
let written = self.write(buf).await?;
@@ -149,6 +164,7 @@ impl TcpStream {
Ok(())
}
/// Shuts down the read, write, or both halves of the connection.
pub async fn shutdown(&self, how: Shutdown) -> io::Result<()> {
crate::sys::linux::net::shutdown(NetOp::Shutdown {
fd: self.raw_fd(),
@@ -157,50 +173,65 @@ impl TcpStream {
.await
}
/// Duplicates the underlying stream socket.
pub async fn try_clone(&self) -> io::Result<Self> {
crate::sys::linux::net::duplicate(self.raw_fd())
.await
.map(Self::from_owned_fd)
}
/// Returns the local socket address of this stream.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
crate::sys::linux::net::local_addr(self.raw_fd())
}
/// Returns the remote peer address of this stream.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
crate::sys::linux::net::peer_addr(self.raw_fd())
}
/// Reads the current `TCP_NODELAY` setting.
pub fn nodelay(&self) -> io::Result<bool> {
crate::sys::linux::net::nodelay(self.raw_fd())
}
/// Enables or disables `TCP_NODELAY`.
pub fn set_nodelay(&self, enabled: bool) -> io::Result<()> {
crate::sys::linux::net::set_nodelay(self.raw_fd(), enabled)
}
/// Reads the socket's IP time-to-live value.
pub fn ttl(&self) -> io::Result<u32> {
crate::sys::linux::net::ttl(self.raw_fd())
}
/// Sets the socket's IP time-to-live value.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
crate::sys::linux::net::set_ttl(self.raw_fd(), ttl)
}
/// Returns the read timeout used by async read operations on this handle.
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.read_timeout_value())
}
/// Sets the read timeout used by async read operations on this handle.
///
/// Passing `Some(Duration::ZERO)` is rejected.
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
validate_optional_timeout(timeout)?;
self.inner.timeouts.lock().unwrap().read = timeout;
Ok(())
}
/// Returns the write timeout used by async write operations on this handle.
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.write_timeout_value())
}
/// Sets the write timeout used by async write operations on this handle.
///
/// Passing `Some(Duration::ZERO)` is rejected.
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
validate_optional_timeout(timeout)?;
self.inner.timeouts.lock().unwrap().write = timeout;
@@ -233,6 +264,7 @@ impl TcpStream {
}
impl TcpListener {
/// Binds a TCP listener to the first resolved address that succeeds.
pub async fn bind<A>(addr: A) -> io::Result<Self>
where
A: ToSocketAddrs + Send + 'static,
@@ -254,6 +286,7 @@ impl TcpListener {
}))
}
/// Accepts an incoming connection.
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let accepted = crate::sys::linux::net::accept(NetOp::Accept { fd: self.raw_fd() }).await?;
@@ -261,14 +294,17 @@ impl TcpListener {
Ok((stream, accepted.peer_addr))
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
crate::sys::linux::net::local_addr(self.raw_fd())
}
/// Reads the listener socket's IP time-to-live value.
pub fn ttl(&self) -> io::Result<u32> {
crate::sys::linux::net::ttl(self.raw_fd())
}
/// Sets the listener socket's IP time-to-live value.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
crate::sys::linux::net::set_ttl(self.raw_fd(), ttl)
}
@@ -285,6 +321,7 @@ impl TcpListener {
}
impl UdpSocket {
/// Binds a UDP socket to the first resolved address that succeeds.
pub async fn bind<A>(addr: A) -> io::Result<Self>
where
A: ToSocketAddrs + Send + 'static,
@@ -306,6 +343,10 @@ impl UdpSocket {
}))
}
/// Connects the socket to a default peer.
///
/// Once connected, [`send`](Self::send), [`recv`](Self::recv), and [`peer_addr`](Self::peer_addr)
/// operate relative to that peer.
pub async fn connect<A>(&self, addr: A) -> io::Result<()>
where
A: ToSocketAddrs + Send + 'static,
@@ -332,6 +373,7 @@ impl UdpSocket {
}))
}
/// Sends a datagram to the connected peer.
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
match self.write_timeout_value() {
Some(timeout) => {
@@ -348,6 +390,7 @@ impl UdpSocket {
}
}
/// Receives a datagram from the connected peer.
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
let data = match self.read_timeout_value() {
Some(timeout) => {
@@ -367,6 +410,7 @@ impl UdpSocket {
Ok(read)
}
/// Peeks at the next datagram from the connected peer without consuming it.
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
let data = match self.read_timeout_value() {
Some(timeout) => {
@@ -392,6 +436,7 @@ impl UdpSocket {
Ok(read)
}
/// Sends a datagram to `addr`.
pub async fn send_to<A>(&self, buf: &[u8], addr: A) -> io::Result<usize>
where
A: ToSocketAddrs + Send + 'static,
@@ -435,6 +480,7 @@ impl UdpSocket {
}))
}
/// Receives a datagram and returns the sender address.
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let datagram = match self.read_timeout_value() {
Some(timeout) => {
@@ -455,6 +501,7 @@ impl UdpSocket {
Ok((read, datagram.peer_addr))
}
/// Peeks at the next datagram and returns the sender address without consuming it.
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let datagram = match self.read_timeout_value() {
Some(timeout) => {
@@ -480,50 +527,65 @@ impl UdpSocket {
Ok((read, datagram.peer_addr))
}
/// Duplicates the underlying UDP socket.
pub async fn try_clone(&self) -> io::Result<Self> {
crate::sys::linux::net::duplicate(self.raw_fd())
.await
.map(Self::from_owned_fd)
}
/// Returns the local socket address of this socket.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
crate::sys::linux::net::local_addr(self.raw_fd())
}
/// Returns the connected peer address, if the socket has been connected.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
crate::sys::linux::net::peer_addr(self.raw_fd())
}
/// Reads the `SO_BROADCAST` setting.
pub fn broadcast(&self) -> io::Result<bool> {
crate::sys::linux::net::broadcast(self.raw_fd())
}
/// Enables or disables `SO_BROADCAST`.
pub fn set_broadcast(&self, enabled: bool) -> io::Result<()> {
crate::sys::linux::net::set_broadcast(self.raw_fd(), enabled)
}
/// Reads the socket's IP time-to-live value.
pub fn ttl(&self) -> io::Result<u32> {
crate::sys::linux::net::ttl(self.raw_fd())
}
/// Sets the socket's IP time-to-live value.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
crate::sys::linux::net::set_ttl(self.raw_fd(), ttl)
}
/// Returns the read timeout used by async receive operations on this handle.
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.read_timeout_value())
}
/// Sets the read timeout used by async receive operations on this handle.
///
/// Passing `Some(Duration::ZERO)` is rejected.
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
validate_optional_timeout(timeout)?;
self.inner.timeouts.lock().unwrap().read = timeout;
Ok(())
}
/// Returns the write timeout used by async send operations on this handle.
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
Ok(self.write_timeout_value())
}
/// Sets the write timeout used by async send operations on this handle.
///
/// Passing `Some(Duration::ZERO)` is rejected.
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
validate_optional_timeout(timeout)?;
self.inner.timeouts.lock().unwrap().write = timeout;

View File

@@ -1,3 +1,5 @@
//! Public runtime driver primitives.
use std::cell::Cell;
use std::cell::RefCell;
use std::collections::BTreeMap;
@@ -51,23 +53,29 @@ impl NotifierInner {
}
#[derive(Clone)]
/// Cross-thread notifier for a runtime thread's driver.
pub struct ThreadNotifier {
inner: Arc<NotifierInner>,
}
impl ThreadNotifier {
/// Sends a wake notification to the target runtime thread.
pub fn notify(&self) -> io::Result<()> {
self.inner.notify()
}
}
#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
/// Readiness information returned by [`Driver::poll`].
pub struct ReadyEvents {
/// One or more timer expirations are pending.
pub timer: bool,
/// One or more cross-thread wake notifications are pending.
pub wake: bool,
}
pub struct Reactor {
/// Low-level Linux runtime driver backed by `io_uring`.
pub struct Driver {
ring: IoUring,
notifier: Arc<NotifierInner>,
next_token: Cell<u64>,
@@ -77,11 +85,16 @@ pub struct Reactor {
completions: RefCell<BTreeMap<u64, CompletionHandler>>,
}
pub fn create() -> io::Result<(Reactor, ThreadNotifier)> {
create_reactor()
/// Creates a new driver and its paired [`ThreadNotifier`].
pub fn create() -> io::Result<(Driver, ThreadNotifier)> {
create_driver()
}
pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> {
/// Creates a new driver and its paired [`ThreadNotifier`].
///
/// This is identical to [`create`] and exists as a more explicit name for callers that want to
/// emphasize driver construction.
pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> {
let ring = IoUring::new(64)?;
let notifier = Arc::new(NotifierInner {
ring_fd: ring.ring_fd(),
@@ -89,7 +102,7 @@ pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> {
});
Ok((
Reactor {
Driver {
ring,
notifier: Arc::clone(&notifier),
next_token: Cell::new(1),
@@ -102,7 +115,7 @@ pub fn create_reactor() -> io::Result<(Reactor, ThreadNotifier)> {
))
}
impl Reactor {
impl Driver {
pub(crate) fn bind_current_thread(&self) {
self.ring.bind_current_thread();
}
@@ -111,6 +124,7 @@ impl Reactor {
self.ring.unbind_current_thread();
}
/// Polls the driver without blocking.
pub fn poll(&self) -> io::Result<Option<ReadyEvents>> {
let mut ready = ReadyEvents::default();
let saw_any = self
@@ -119,10 +133,14 @@ impl Reactor {
if saw_any { Ok(Some(ready)) } else { Ok(None) }
}
/// Blocks until at least one completion is available.
pub fn wait(&self) -> io::Result<()> {
self.ring.wait_for_cqe()
}
/// Updates the currently armed timer deadline.
///
/// Passing `None` removes any active timer.
pub fn rearm_timer(&self, deadline: Option<Duration>) -> io::Result<()> {
match (self.active_timer_token.get(), deadline) {
(Some(active), Some(deadline)) => {
@@ -171,6 +189,7 @@ impl Reactor {
})
}
/// Drains the accumulated wake notification count.
pub fn drain_wake(&self) -> io::Result<u64> {
let wakes = self.pending_wakes.replace(0);
if wakes == 0 {
@@ -183,6 +202,7 @@ impl Reactor {
}
}
/// Drains the accumulated timer-expiration count.
pub fn drain_timer(&self) -> io::Result<u64> {
let timers = self.pending_timers.replace(0);
if timers == 0 {
@@ -234,12 +254,13 @@ impl Reactor {
}
}
impl Drop for Reactor {
impl Drop for Driver {
fn drop(&mut self) {
self.notifier.closed.store(true, Ordering::Release);
}
}
/// Returns the current monotonic time used by the runtime timer system.
pub fn monotonic_now() -> io::Result<Duration> {
let mut now = std::mem::MaybeUninit::<libc::timespec>::uninit();
let result = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, now.as_mut_ptr()) };
@@ -268,16 +289,16 @@ fn decode_token_kind(token: u64) -> Option<CompletionKind> {
#[cfg(test)]
mod tests {
use super::{create_reactor, monotonic_now};
use super::{create_driver, monotonic_now};
use std::thread;
use std::time::Duration;
#[test]
fn notifier_wakes_target_ring() {
let (sender, _) = create_reactor().expect("sender reactor should initialize");
let (sender, _) = create_driver().expect("sender driver should initialize");
sender.bind_current_thread();
let (target, notifier) = create_reactor().expect("target reactor should initialize");
let (target, notifier) = create_driver().expect("target driver should initialize");
notifier.notify().expect("notify should succeed");
let ready = loop {
@@ -295,7 +316,7 @@ mod tests {
#[test]
fn notifier_wakes_target_ring_from_plain_thread() {
let (target, notifier) = create_reactor().expect("target reactor should initialize");
let (target, notifier) = create_driver().expect("target driver should initialize");
thread::spawn(move || {
notifier.notify().expect("notify should succeed");
@@ -317,14 +338,14 @@ mod tests {
#[test]
fn timeout_reports_deadlines() {
let (reactor, _notifier) = create_reactor().expect("reactor should initialize");
let (driver, _notifier) = create_driver().expect("driver should initialize");
let deadline = monotonic_now().expect("clock should work") + Duration::from_millis(20);
reactor
driver
.rearm_timer(Some(deadline))
.expect("timer should arm");
let ready = loop {
if let Some(ready) = reactor.poll().expect("poll should succeed") {
if let Some(ready) = driver.poll().expect("poll should succeed") {
break ready;
}
thread::sleep(Duration::from_millis(5));
@@ -332,9 +353,6 @@ mod tests {
assert!(ready.timer);
assert!(!ready.wake);
assert_eq!(
reactor.drain_timer().expect("timer drain should succeed"),
1
);
assert_eq!(driver.drain_timer().expect("timer drain should succeed"), 1);
}
}

View File

@@ -1,4 +1,4 @@
pub mod driver;
pub mod mesh_alloc;
pub mod reactor;
pub mod runtime;
pub(crate) mod uring;

View File

@@ -1,3 +1,5 @@
//! Public runtime loop and worker-thread primitives.
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
@@ -9,7 +11,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
use super::reactor::{Reactor, ThreadNotifier, create as create_reactor, monotonic_now};
use super::driver::{Driver, ThreadNotifier, create_driver, monotonic_now};
type LocalTask = Box<dyn FnOnce() + 'static>;
type SendTask = Box<dyn FnOnce() + Send + 'static>;
@@ -19,16 +21,19 @@ type LocalBoxFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
static mut CURRENT_THREAD: *mut ThreadState = ptr::null_mut();
#[derive(Clone)]
/// Handle for queueing work onto a specific runtime thread.
pub struct ThreadHandle {
shared: Arc<ThreadShared>,
}
/// Handle for a worker runtime thread spawned with [`spawn_worker`].
pub struct WorkerHandle {
thread: ThreadHandle,
completion: Arc<WorkerCompletion>,
}
#[derive(Clone)]
/// Handle returned by [`set_timeout`].
pub struct TimeoutHandle {
id: usize,
owner: *const ThreadState,
@@ -36,20 +41,37 @@ pub struct TimeoutHandle {
}
#[derive(Clone)]
/// Handle returned by [`set_interval`].
pub struct IntervalHandle {
id: usize,
owner: *const ThreadState,
_local: Rc<()>,
}
/// Handle returned by [`queue_future`].
///
/// Awaiting a join handle yields the output of the queued future.
pub struct JoinHandle<T> {
state: Rc<JoinState<T>>,
}
/// Future returned by [`yield_now`].
///
/// Awaiting this future will immediately yield control back to the runtime scheduler, allowing other queued microtasks
/// to run before the current task continues executing. Note that continuation of futures runs as a microtask, so this
/// can only yield to other microtasks and not to macrotasks (driver events such as file or network I/O, timers, or
/// channel messages).
pub struct YieldNow {
yielded: bool,
}
/// Returns a handle for the current runtime thread.
///
/// If the current thread has not yet entered the runtime, the runtime state is initialized lazily.
///
/// # Panics
///
/// Panics if the runtime cannot initialize its driver for the current thread.
pub fn current_thread_handle() -> ThreadHandle {
current_thread().handle()
}
@@ -58,10 +80,17 @@ pub(crate) fn try_current_thread_handle() -> Option<ThreadHandle> {
unsafe { (!CURRENT_THREAD.is_null()).then(|| (*CURRENT_THREAD).handle()) }
}
pub(crate) fn with_current_reactor<T>(f: impl FnOnce(&Reactor) -> T) -> T {
f(&current_thread().reactor)
pub(crate) fn with_current_driver<T>(f: impl FnOnce(&Driver) -> T) -> T {
f(&current_thread().driver)
}
/// Queues a macrotask on the current runtime thread.
///
/// The task runs after all currently-queued macrotasks, and after all microtasks.
///
/// # Panics
///
/// Panics if the runtime cannot initialize its state for the current thread.
pub fn queue_task<F>(task: F)
where
F: FnOnce() + 'static,
@@ -69,6 +98,14 @@ where
push_local_macrotask(Box::new(task));
}
/// Queues a microtask on the current runtime thread.
///
/// Microtasks run before the next macrotask turn, mirroring JavaScript-style event loop
/// semantics.
///
/// # Panics
///
/// Panics if the runtime cannot initialize its state for the current thread.
pub fn queue_microtask<F>(task: F)
where
F: FnOnce() + 'static,
@@ -79,6 +116,11 @@ where
.push_back(Box::new(task));
}
/// Schedules a one-shot timer on the current runtime thread.
///
/// # Panics
///
/// Panics if the runtime cannot initialize its state for the current thread.
pub fn set_timeout<F>(delay: Duration, callback: F) -> TimeoutHandle
where
F: FnOnce() + 'static,
@@ -98,10 +140,22 @@ where
}
}
/// Cancels a timeout previously created by [`set_timeout`].
///
/// # Panics
///
/// Panics if called from a different runtime thread than the one that created `handle`.
pub fn clear_timeout(handle: &TimeoutHandle) {
clear_timer(handle.owner, handle.id);
}
/// Schedules a repeating timer on the current runtime thread.
///
/// The callback is invoked once per interval until the handle is cleared.
///
/// # Panics
///
/// Panics if the runtime cannot initialize its state for the current thread.
pub fn set_interval<F>(delay: Duration, callback: F) -> IntervalHandle
where
F: FnMut() + 'static,
@@ -126,10 +180,33 @@ where
}
}
/// Cancels an interval previously created by [`set_interval`].
///
/// # Panics
///
/// Panics if called from a different runtime thread than the one that created `handle`.
pub fn clear_interval(handle: &IntervalHandle) {
clear_timer(handle.owner, handle.id);
}
/// Queues a future on the current runtime thread.
///
/// The future is scheduled immediately and can be awaited through the returned [`JoinHandle`].
///
/// The future will be driven to completion regardless or whether the join handle is polled or dropped, so this function
/// can be used as a convenient way to spawn detached async tasks on the current thread.
///
/// # Examples
///
/// ```
/// # let _ = || {
/// let handle = ruin_runtime::queue_future(async { 42usize });
/// # };
/// ```
///
/// # Panics
///
/// Panics if the runtime cannot initialize its state for the current thread.
pub fn queue_future<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
@@ -150,13 +227,21 @@ where
JoinHandle { state }
}
/// Spawns a worker runtime thread.
///
/// `initial_task` is queued onto the worker as its first macrotask. `on_exit` runs on the parent
/// runtime thread after the worker shuts down.
///
/// # Panics
///
/// Panics if the worker thread or its driver cannot be created.
pub fn spawn_worker<Init, Exit>(initial_task: Init, on_exit: Exit) -> WorkerHandle
where
Init: FnOnce() + Send + 'static,
Exit: FnOnce() + 'static,
{
let parent = current_thread();
let (reactor, notifier) = create_reactor().expect("worker reactor should initialize");
let (driver, notifier) = create_driver().expect("worker driver should initialize");
let shared = Arc::new(ThreadShared::new(notifier));
let handle = ThreadHandle {
shared: Arc::clone(&shared),
@@ -175,7 +260,7 @@ where
std::thread::Builder::new()
.name("ruin-runtime-worker".into())
.spawn(move || {
install_thread(shared, reactor, Some(worker_completion));
install_thread(shared, driver, Some(worker_completion));
queue_task(initial_task);
run();
})
@@ -187,6 +272,14 @@ where
}
}
/// Runs the current runtime thread until no work, timers, child workers, or async operations
/// remain.
///
/// This is the main event loop entry point used by the proc-macro entry attributes.
///
/// # Panics
///
/// Panics if runtime initialization fails or if the underlying driver returns an unexpected error.
pub fn run() {
let _ = current_thread();
@@ -223,7 +316,7 @@ pub fn run() {
if has_pending_timers() || state.has_live_children() || state.has_live_async_operations() {
state.shared.closing.store(false, Ordering::Release);
state.reactor.wait().expect("reactor wait should succeed");
state.driver.wait().expect("driver wait should succeed");
continue;
}
@@ -240,16 +333,20 @@ pub fn run() {
}
fn drain_all() {
drain_reactor_events();
drain_driver_events();
drain_remote_tasks();
drain_completed_workers();
}
/// Returns a future that yields back to the runtime scheduler once.
pub fn yield_now() -> YieldNow {
YieldNow { yielded: false }
}
impl ThreadHandle {
/// Queues a macrotask onto this runtime thread.
///
/// Returns `false` if the target thread is already closed.
pub fn queue_task<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
@@ -257,6 +354,9 @@ impl ThreadHandle {
self.shared.enqueue_macro(Box::new(task))
}
/// Queues a microtask onto this runtime thread.
///
/// Returns `false` if the target thread is already closed.
pub fn queue_microtask<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
@@ -264,6 +364,7 @@ impl ThreadHandle {
self.shared.enqueue_micro(Box::new(task))
}
/// Returns `true` if the target runtime thread has shut down.
pub fn is_closed(&self) -> bool {
self.shared.closed.load(Ordering::Acquire)
}
@@ -282,6 +383,9 @@ impl ThreadHandle {
}
impl WorkerHandle {
/// Queues a macrotask onto the worker thread.
///
/// Returns `false` if the worker has already shut down.
pub fn queue_task<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
@@ -289,6 +393,9 @@ impl WorkerHandle {
self.thread.queue_task(task)
}
/// Queues a microtask onto the worker thread.
///
/// Returns `false` if the worker has already shut down.
pub fn queue_microtask<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
@@ -296,10 +403,12 @@ impl WorkerHandle {
self.thread.queue_microtask(task)
}
/// Returns `true` once the worker thread has fully exited.
pub fn is_finished(&self) -> bool {
self.completion.finished.load(Ordering::Acquire)
}
/// Returns a generic [`ThreadHandle`] for the worker thread.
pub fn thread(&self) -> ThreadHandle {
self.thread.clone()
}
@@ -328,7 +437,7 @@ impl Future for YieldNow {
}
struct ThreadState {
reactor: Reactor,
driver: Driver,
shared: Arc<ThreadShared>,
worker_completion: Option<Arc<WorkerCompletion>>,
local_microtasks: RefCell<VecDeque<LocalTask>>,
@@ -341,11 +450,11 @@ struct ThreadState {
impl ThreadState {
fn new(
shared: Arc<ThreadShared>,
reactor: Reactor,
driver: Driver,
worker_completion: Option<Arc<WorkerCompletion>>,
) -> Self {
Self {
reactor,
driver,
shared,
worker_completion,
local_microtasks: RefCell::new(VecDeque::new()),
@@ -695,11 +804,11 @@ unsafe fn future_task_drop(data: *const ()) {
fn current_thread() -> &'static ThreadState {
unsafe {
if CURRENT_THREAD.is_null() {
let (reactor, notifier) = create_reactor().expect("runtime reactor should initialize");
let (driver, notifier) = create_driver().expect("runtime driver should initialize");
let shared = Arc::new(ThreadShared::new(notifier));
let state = Box::new(ThreadState::new(shared, reactor, None));
let state = Box::new(ThreadState::new(shared, driver, None));
let state = Box::into_raw(state);
(*state).reactor.bind_current_thread();
(*state).driver.bind_current_thread();
CURRENT_THREAD = state;
}
@@ -713,14 +822,14 @@ fn current_thread_ptr() -> *const ThreadState {
fn install_thread(
shared: Arc<ThreadShared>,
reactor: Reactor,
driver: Driver,
worker_completion: Option<Arc<WorkerCompletion>>,
) {
unsafe {
debug_assert!(CURRENT_THREAD.is_null(), "thread runtime already installed");
let state = Box::new(ThreadState::new(shared, reactor, worker_completion));
let state = Box::new(ThreadState::new(shared, driver, worker_completion));
let state = Box::into_raw(state);
(*state).reactor.bind_current_thread();
(*state).driver.bind_current_thread();
CURRENT_THREAD = state;
}
}
@@ -731,18 +840,18 @@ fn teardown_thread() {
CURRENT_THREAD = ptr::null_mut();
if !state.is_null() {
(*state).reactor.unbind_current_thread();
(*state).driver.unbind_current_thread();
drop(Box::from_raw(state));
}
}
}
fn drain_reactor_events() {
fn drain_driver_events() {
loop {
let ready = current_thread()
.reactor
.driver
.poll()
.expect("reactor poll should succeed");
.expect("driver poll should succeed");
let Some(ready) = ready else {
break;
@@ -751,13 +860,13 @@ fn drain_reactor_events() {
let state = current_thread();
if ready.wake {
let _ = state
.reactor
.driver
.drain_wake()
.expect("wake drain should succeed");
}
if ready.timer {
let _ = state
.reactor
.driver
.drain_timer()
.expect("timer drain should succeed");
dispatch_expired_timers();
@@ -906,9 +1015,9 @@ fn dispatch_expired_timers() {
fn rearm_thread_timer() {
let deadline = current_thread().timers.borrow().peek_deadline();
current_thread()
.reactor
.driver
.rearm_timer(deadline)
.expect("timerfd rearm should succeed");
.expect("driver timer rearm should succeed");
}
fn deadline_from_now(delay: Duration) -> Duration {

View File

@@ -16,7 +16,7 @@ use std::thread;
use crate::op::completion::completion_for_current_thread;
use crate::op::fs::{FileType, FsOp, MetadataTarget, OpenOptions, RawDirEntry, RawMetadata};
use crate::platform::linux_x86_64::runtime::{
ThreadHandle, current_thread_handle, with_current_reactor,
ThreadHandle, current_thread_handle, with_current_driver,
};
use crate::platform::linux_x86_64::uring::{
IORING_FSYNC_DATASYNC, IORING_OP_CLOSE, IORING_OP_FSYNC, IORING_OP_FTRUNCATE,
@@ -474,14 +474,14 @@ where
{
let (future, handle) = completion_for_current_thread::<io::Result<T>>();
let callback_handle = handle.clone();
let token = with_current_reactor(|reactor| {
reactor.submit_operation(fill, move |cqe| {
let token = with_current_driver(|driver| {
driver.submit_operation(fill, move |cqe| {
callback_handle.complete(map(cqe));
})
})?;
handle.set_cancel(move || {
let _ = with_current_reactor(|reactor| reactor.cancel_operation(token));
let _ = with_current_driver(|driver| driver.cancel_operation(token));
});
future.await

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use crate::op::completion::completion_for_current_thread;
use crate::op::net::{AcceptedSocket, NetOp, ReceivedDatagram};
use crate::platform::linux_x86_64::runtime::with_current_reactor;
use crate::platform::linux_x86_64::runtime::with_current_driver;
use crate::platform::linux_x86_64::uring::{
IORING_OP_ACCEPT, IORING_OP_BIND, IORING_OP_CLOSE, IORING_OP_CONNECT, IORING_OP_LISTEN,
IORING_OP_RECV, IORING_OP_SEND, IORING_OP_SHUTDOWN, IORING_OP_SOCKET, IoUringCqe, IoUringSqe,
@@ -556,14 +556,14 @@ where
{
let (future, handle) = completion_for_current_thread::<io::Result<T>>();
let callback_handle = handle.clone();
let token = with_current_reactor(|reactor| {
reactor.submit_operation(fill, move |cqe| {
let token = with_current_driver(|driver| {
driver.submit_operation(fill, move |cqe| {
callback_handle.complete(map(cqe));
})
})?;
handle.set_cancel(move || {
let _ = with_current_reactor(|reactor| reactor.cancel_operation(token));
let _ = with_current_driver(|driver| driver.cancel_operation(token));
});
future.await

View File

@@ -1,9 +1,11 @@
//! Runtime time primitives.
//!
//! These helpers integrate with the runtime's timer queue and are designed to be used from
//! futures scheduled with [`crate::queue_future`] or one of the runtime entry macros.
use std::cell::{Cell, RefCell};
use std::fmt;
use std::future::{Future, poll_fn};
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Waker;
@@ -12,6 +14,7 @@ use std::time::Duration;
use crate::{clear_timeout, set_timeout};
/// Future returned by [`sleep`].
pub struct Sleep {
delay: Option<Duration>,
state: Option<Rc<SleepState>>,
@@ -20,8 +23,18 @@ pub struct Sleep {
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
/// Error returned by [`timeout`] when the deadline expires first.
pub struct Elapsed;
/// Returns a future that completes after `duration` has elapsed on the current runtime thread.
///
/// # Examples
///
/// ```
/// # let _ = || async {
/// ruin_runtime::time::sleep(std::time::Duration::from_millis(10)).await;
/// # };
/// ```
pub fn sleep(duration: Duration) -> Sleep {
Sleep {
delay: Some(duration),
@@ -31,6 +44,24 @@ pub fn sleep(duration: Duration) -> Sleep {
}
}
/// Runs `future` until it completes or `duration` elapses, whichever happens first.
///
/// The wrapped future is dropped when the timeout fires. As with other runtime operations, dropping
/// a future cancels interest in the result but does not guarantee cancellation of any underlying
/// OS work that future may have started.
///
/// # Examples
///
/// ```
/// # let _ = || async {
/// let result = ruin_runtime::time::timeout(
/// std::time::Duration::from_millis(5),
/// async { 42usize },
/// )
/// .await;
/// assert_eq!(result, Ok(42));
/// # };
/// ```
pub async fn timeout<F>(duration: Duration, future: F) -> Result<F::Output, Elapsed>
where
F: Future,
@@ -52,10 +83,6 @@ where
.await
}
pub fn timeout_error(action: &'static str) -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, format!("{action} timed out"))
}
impl Future for Sleep {
type Output = ();