diff --git a/lib/runtime/examples/runtime_loop_showcase.rs b/lib/runtime/examples/runtime_loop_showcase.rs index 98c7da9..5df9f11 100644 --- a/lib/runtime/examples/runtime_loop_showcase.rs +++ b/lib/runtime/examples/runtime_loop_showcase.rs @@ -48,7 +48,7 @@ fn queue_log(handle: &ThreadHandle, expected: usize, message: impl Into) fn queue_log_microtask(handle: &ThreadHandle, expected: usize, message: impl Into) { let message = message.into(); - let queued = handle.queue_microtask(move || { + let queued = handle.queue_task(move || { log_event_impl(expected, format_args!("{message}")); }); assert!(queued, "main thread should accept log microtask {expected}"); diff --git a/lib/runtime/src/op/completion.rs b/lib/runtime/src/op/completion.rs index b52e2cc..be3f663 100644 --- a/lib/runtime/src/op/completion.rs +++ b/lib/runtime/src/op/completion.rs @@ -26,8 +26,11 @@ impl CompletionState { return; } + // Queue as a macrotask: I/O completion events belong to the macrotask + // queue, consistent with the JavaScript event-loop model where I/O + // callbacks are macrotasks and the microtask queue is thread-local only. let state = Arc::clone(self); - if !self.owner.queue_microtask(move || { + if !self.owner.queue_task(move || { state.wake_queued.store(false, Ordering::Release); if let Some(waker) = state.waker.lock().unwrap().take() { waker.wake(); @@ -138,10 +141,25 @@ impl Drop for CompletionFuture { let _ = self.state.result.lock().unwrap().take(); let _ = self.state.waker.lock().unwrap().take(); - if !self.state.finished.load(Ordering::Acquire) - && let Some(cancel) = self.state.cancel.lock().unwrap().take() - { + if self.state.finished.load(Ordering::Acquire) { + return; + } + + if let Some(cancel) = self.state.cancel.lock().unwrap().take() { + // Delegate to the cancel callback (e.g. submit an io_uring cancel). + // The actual I/O completion will eventually call handle.finish(), + // which decrements pending_ops. cancel(); + } else { + // No cancel callback was registered — this happens when submit_operation + // failed before set_cancel could be called, leaving no path through + // which finish() would run. Decrement pending_ops directly so the + // runtime does not stall indefinitely waiting for an operation that + // will never complete. The swap is atomic: if finish() races to set + // finished first, the swap returns true and we skip the decrement. + if !self.state.finished.swap(true, Ordering::AcqRel) { + self.state.owner.finish_async_operation(); + } } } } diff --git a/lib/runtime/src/platform/linux_x86_64/driver.rs b/lib/runtime/src/platform/linux_x86_64/driver.rs index 92cca05..e320238 100644 --- a/lib/runtime/src/platform/linux_x86_64/driver.rs +++ b/lib/runtime/src/platform/linux_x86_64/driver.rs @@ -2,7 +2,7 @@ use std::cell::Cell; use std::cell::RefCell; -use std::collections::BTreeMap; +use std::collections::HashMap; use std::io; use std::os::fd::RawFd; use std::sync::Arc; @@ -90,7 +90,7 @@ pub struct Driver { active_timer_token: Cell>, pending_wakes: Cell, pending_timers: Cell, - completions: RefCell>, + completions: RefCell>, } /// Creates a new driver and its paired [`ThreadNotifier`]. @@ -123,7 +123,7 @@ pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> { active_timer_token: Cell::new(None), pending_wakes: Cell::new(0), pending_timers: Cell::new(0), - completions: RefCell::new(BTreeMap::new()), + completions: RefCell::new(HashMap::new()), }, ThreadNotifier { inner: notifier }, )) diff --git a/lib/runtime/src/platform/linux_x86_64/runtime.rs b/lib/runtime/src/platform/linux_x86_64/runtime.rs index 83e5f43..c055656 100644 --- a/lib/runtime/src/platform/linux_x86_64/runtime.rs +++ b/lib/runtime/src/platform/linux_x86_64/runtime.rs @@ -1,8 +1,9 @@ //! Public runtime loop and worker-thread primitives. use std::cell::{Cell, RefCell}; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::future::Future; +use std::io; use std::pin::Pin; use std::ptr; use std::rc::Rc; @@ -18,6 +19,18 @@ type LocalTask = Box; type SendTask = Box; type LocalBoxFuture = Pin + 'static>>; +/// If the microtask queue runs more than this many tasks in a single turn +/// without yielding to the macrotask queue, a warning is emitted. +const MICROTASK_STARVATION_THRESHOLD: u64 = 1000; + +struct MacroTask { + task: LocalTask, + /// Wall time at which this task entered the local queue. Populated only + /// in debug builds; used to emit a trace event reporting queue-wait time. + #[cfg(debug_assertions)] + queued_at: Duration, +} + #[thread_local] static mut CURRENT_THREAD: *mut ThreadState = ptr::null_mut(); @@ -354,10 +367,20 @@ pub fn run() { loop { drain_all(); + let mut microtasks_run: u64 = 0; while let Some(task) = pop_microtask() { task(); + microtasks_run += 1; drain_all(); } + if microtasks_run >= MICROTASK_STARVATION_THRESHOLD { + tracing::warn!( + target: trace_targets::SCHEDULER, + event = "microtask_starvation", + count = microtasks_run, + "microtask queue ran {microtasks_run} tasks in a single turn; macrotask handlers may be starved", + ); + } if let Some(task) = pop_macrotask() { task(); @@ -397,12 +420,31 @@ pub fn run() { continue; } + // Atomically commit to exit: set `closed` while holding the remote + // queue lock. `enqueue_macro` also checks `closed` under this same + // lock, so there is no window in which a task can be accepted after + // we decide to exit. If a task snuck in between the `has_ready_work` + // check above and acquiring the lock, we abort and process it first. + let committed = { + let remote = lock_queue(&state.shared.remote_macrotasks); + if remote.is_empty() { + state.shared.closed.store(true, Ordering::Release); + true + } else { + false + } + }; + + if !committed { + state.shared.closing.store(false, Ordering::Release); + continue; + } + if let Some(completion) = &state.worker_completion { completion.finished.store(true, Ordering::Release); completion.parent_event.shared.notify(); } - state.shared.closed.store(true, Ordering::Release); state.shared.notify(); tracing::debug!( target: trace_targets::RUNTIME, @@ -445,25 +487,6 @@ impl ThreadHandle { queued } - /// Queues a microtask onto this runtime thread. - /// - /// Returns `false` if the target thread is already closed. - pub fn queue_microtask(&self, task: F) -> bool - where - F: FnOnce() + Send + 'static, - { - let queued = self.shared.enqueue_micro(Box::new(task)); - #[cfg(debug_assertions)] - tracing::trace!( - target: trace_targets::SCHEDULER, - event = "queue_remote_microtask", - queue = "remote_micro", - queued, - "queueing remote microtask" - ); - queued - } - /// Returns `true` if the target runtime thread has shut down. pub fn is_closed(&self) -> bool { self.shared.closed.load(Ordering::Acquire) @@ -493,16 +516,6 @@ impl WorkerHandle { self.thread.queue_task(task) } - /// Queues a microtask onto the worker thread. - /// - /// Returns `false` if the worker has already shut down. - pub fn queue_microtask(&self, task: F) -> bool - where - F: FnOnce() + Send + 'static, - { - self.thread.queue_microtask(task) - } - /// Returns `true` once the worker thread has fully exited. pub fn is_finished(&self) -> bool { self.completion.finished.load(Ordering::Acquire) @@ -541,7 +554,7 @@ struct ThreadState { shared: Arc, worker_completion: Option>, local_microtasks: RefCell>, - local_macrotasks: RefCell>, + local_macrotasks: RefCell>, timers: RefCell, next_timer_id: Cell, children: RefCell>, @@ -589,7 +602,9 @@ impl ThreadState { struct ThreadShared { notifier: ThreadNotifier, - remote_microtasks: Mutex>, + // The microtask queue is strictly thread-local; only macrotasks may be + // enqueued from remote threads, keeping the microtask queue free from + // cross-thread interference. remote_macrotasks: Mutex>, pending_ops: AtomicUsize, closing: AtomicBool, @@ -600,7 +615,6 @@ impl ThreadShared { fn new(notifier: ThreadNotifier) -> Self { Self { notifier, - remote_microtasks: Mutex::new(VecDeque::new()), remote_macrotasks: Mutex::new(VecDeque::new()), pending_ops: AtomicUsize::new(0), closing: AtomicBool::new(false), @@ -608,30 +622,37 @@ impl ThreadShared { } } - fn enqueue_micro(&self, task: SendTask) -> bool { - if self.closed.load(Ordering::Acquire) { - return false; - } - - lock_queue(&self.remote_microtasks).push_back(task); - self.notify(); - true - } - fn enqueue_macro(&self, task: SendTask) -> bool { + // Check `closed` under the queue lock so that the exit path can + // atomically set `closed` while holding the same lock, eliminating + // the window where a task is accepted but then stranded at shutdown. + let mut queue = lock_queue(&self.remote_macrotasks); if self.closed.load(Ordering::Acquire) { return false; } - - lock_queue(&self.remote_macrotasks).push_back(task); + queue.push_back(task); + drop(queue); + // Notify after releasing the lock. By this point `closed` is still + // false (we verified under the lock) and teardown_thread has not run, + // so the ring is guaranteed to be alive. self.notify(); true } fn notify(&self) { - self.notifier - .notify() - .expect("thread notifier should succeed"); + if let Err(error) = self.notifier.notify() { + // BrokenPipe is expected during shutdown (ring already closed). + // Any other error is unexpected; log it rather than panicking so + // the runtime continues to make progress. + if error.kind() != io::ErrorKind::BrokenPipe { + tracing::error!( + target: trace_targets::DRIVER, + event = "notify_error", + ?error, + "unexpected error sending thread notification" + ); + } + } } } @@ -684,14 +705,14 @@ impl TimerNode { struct TimerHeap { nodes: Vec, - positions: BTreeMap, + positions: HashMap, } impl TimerHeap { fn new() -> Self { Self { nodes: Vec::new(), - positions: BTreeMap::new(), + positions: HashMap::new(), } } @@ -987,21 +1008,20 @@ fn drain_driver_events() { } fn drain_remote_tasks() { - let state = current_thread(); + // Swap the entire remote queue under the lock and release immediately, + // minimizing the time the lock is held and avoiding per-item allocation. + let drained = { + let mut remote = lock_queue(¤t_thread().shared.remote_macrotasks); + std::mem::take(&mut *remote) + }; - { - let mut local = state.local_microtasks.borrow_mut(); - let mut remote = lock_queue(&state.shared.remote_microtasks); - while let Some(task) = remote.pop_front() { - local.push_back(task); - } - } - - { - let mut local = state.local_macrotasks.borrow_mut(); - let mut remote = lock_queue(&state.shared.remote_macrotasks); - while let Some(task) = remote.pop_front() { - local.push_back(task); + if !drained.is_empty() { + let mut local = current_thread().local_macrotasks.borrow_mut(); + for task in drained { + // SendTask (Box) coerces to LocalTask + // (Box) by dropping the Send bound. + let task: LocalTask = task; + local.push_back(make_macro_task(task)); } } } @@ -1030,7 +1050,7 @@ fn drain_completed_workers() { let mut local = state.local_macrotasks.borrow_mut(); for mut child in exited { if let Some(task) = child.on_exit.take() { - local.push_back(task); + local.push_back(make_macro_task(task)); } } } @@ -1040,14 +1060,39 @@ fn pop_microtask() -> Option { } fn pop_macrotask() -> Option { - current_thread().local_macrotasks.borrow_mut().pop_front() + current_thread() + .local_macrotasks + .borrow_mut() + .pop_front() + .map(|entry| { + #[cfg(debug_assertions)] + { + let now = deadline_from_now(Duration::ZERO); + let wait = now.saturating_sub(entry.queued_at); + tracing::trace!( + target: trace_targets::SCHEDULER, + event = "macrotask_dequeued", + wait_ns = wait.as_nanos() as u64, + "macrotask dequeued after waiting in queue" + ); + } + entry.task + }) } fn push_local_macrotask(task: LocalTask) { current_thread() .local_macrotasks .borrow_mut() - .push_back(task); + .push_back(make_macro_task(task)); +} + +fn make_macro_task(task: LocalTask) -> MacroTask { + MacroTask { + task, + #[cfg(debug_assertions)] + queued_at: deadline_from_now(Duration::ZERO), + } } fn has_ready_work() -> bool { @@ -1056,9 +1101,7 @@ fn has_ready_work() -> bool { return true; } - if !lock_queue(&state.shared.remote_microtasks).is_empty() - || !lock_queue(&state.shared.remote_macrotasks).is_empty() - { + if !lock_queue(&state.shared.remote_macrotasks).is_empty() { return true; } @@ -1072,7 +1115,8 @@ fn has_pending_timers() -> bool { fn allocate_timer_id() -> usize { let state = current_thread(); let id = state.next_timer_id.get(); - state.next_timer_id.set(id.wrapping_add(1)); + let next = id.checked_add(1).expect("timer ID space exhausted"); + state.next_timer_id.set(next); id } @@ -1096,27 +1140,31 @@ fn dispatch_expired_timers() { return; } - for mut timer in due { + for timer in due { match timer.kind { TimerKind::Timeout(callback) => push_local_macrotask(callback), TimerKind::Interval { interval, callback } => { - let mut next_deadline = timer.deadline; + // Mirror JavaScript's setInterval semantics: + // 1. Schedule the next tick BEFORE queuing the callback, + // so a slow callback does not shift the schedule. + // 2. Fire at most once per dispatch regardless of how many + // ticks were missed; the interval simply drifts forward + // rather than back-filling a burst of callbacks. + let next_deadline = timer + .deadline + .checked_add(interval) + .unwrap_or(Duration::MAX); + let next_timer = TimerNode::interval( + timer.id, + next_deadline, + interval, + Rc::clone(&callback), + ); + current_thread().timers.borrow_mut().insert(next_timer); - loop { - let queued_callback = Rc::clone(&callback); - push_local_macrotask(Box::new(move || { - (queued_callback.borrow_mut())(); - })); - - next_deadline = next_deadline.checked_add(interval).unwrap_or(Duration::MAX); - if next_deadline > now || next_deadline == Duration::MAX { - break; - } - } - - timer.deadline = next_deadline; - timer.kind = TimerKind::Interval { interval, callback }; - current_thread().timers.borrow_mut().insert(timer); + push_local_macrotask(Box::new(move || { + (callback.borrow_mut())(); + })); } } } diff --git a/lib/runtime/src/platform/linux_x86_64/uring.rs b/lib/runtime/src/platform/linux_x86_64/uring.rs index 8afd726..f00902d 100644 --- a/lib/runtime/src/platform/linux_x86_64/uring.rs +++ b/lib/runtime/src/platform/linux_x86_64/uring.rs @@ -244,6 +244,12 @@ impl IoUring { } pub(crate) fn submit_timeout(&self, token: u64, deadline: Duration) -> io::Result<()> { + // SAFETY: `timespec` is stack-allocated and its pointer is passed to + // the kernel via the SQE's `addr` field. This is safe because + // `submit_pending` calls `io_uring_enter` synchronously within this + // stack frame, before `timespec` is dropped. The kernel copies the + // timespec value during submission of IORING_OP_TIMEOUT, so no + // lifetime extension beyond this call is required. let timespec = duration_to_kernel_timespec(deadline); self.push_sqe(|sqe| { sqe.opcode = IORING_OP_TIMEOUT; @@ -277,6 +283,9 @@ impl IoUring { token_to_update: u64, deadline: Duration, ) -> io::Result<()> { + // SAFETY: Same stack-pointer contract as submit_timeout — the kernel + // copies the timespec during the synchronous io_uring_enter call made + // by submit_pending before this function returns. let timespec = duration_to_kernel_timespec(deadline); self.push_sqe(|sqe| { sqe.opcode = IORING_OP_TIMEOUT_REMOVE; diff --git a/lib/runtime/src/sys/linux/fs.rs b/lib/runtime/src/sys/linux/fs.rs index 88aef7c..3c91d46 100644 --- a/lib/runtime/src/sys/linux/fs.rs +++ b/lib/runtime/src/sys/linux/fs.rs @@ -372,7 +372,7 @@ impl ReadDirState { } let state = Arc::clone(self); - if !self.owner.queue_microtask(move || { + if !self.owner.queue_task(move || { state.wake_queued.store(false, Ordering::Release); if let Some(waker) = state.waker.lock().unwrap().take() { waker.wake(); diff --git a/lib/runtime/src/time.rs b/lib/runtime/src/time.rs index 930611c..9e8d519 100644 --- a/lib/runtime/src/time.rs +++ b/lib/runtime/src/time.rs @@ -160,6 +160,8 @@ impl std::error::Error for Elapsed {} #[cfg(test)] mod tests { + use std::cell::RefCell; + use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -199,4 +201,39 @@ mod tests { assert_eq!(log.as_slice(), ["started", "slept", "timed out"]); } + + /// Verify that `sleep(Duration::ZERO).await` yields to the macrotask queue + /// before the future continues. A macrotask queued before the sleep must + /// run before the future's continuation. + #[test] + fn sleep_zero_yields_to_macrotask_queue() { + let order = std::thread::spawn(|| { + let order = Rc::new(RefCell::new(Vec::<&'static str>::new())); + + // Macrotask queued before the sleep. + { + let order = Rc::clone(&order); + queue_task(move || order.borrow_mut().push("macrotask")); + } + + // Future that awaits sleep(ZERO) and then records its continuation. + { + let order = Rc::clone(&order); + queue_future(async move { + sleep(Duration::ZERO).await; + order.borrow_mut().push("after_sleep"); + }); + } + + run(); + Rc::try_unwrap(order).unwrap().into_inner() + }) + .join() + .expect("test thread should join"); + + // The macrotask must run before the sleep future continues, because + // sleep(ZERO) resolves via a timer event (macrotask), so the queued + // macrotask runs first. + assert_eq!(order.as_slice(), ["macrotask", "after_sleep"]); + } }