Claude runtime review

This commit is contained in:
2026-03-22 14:39:00 -04:00
parent bc9aeaf007
commit 01eb861879
7 changed files with 210 additions and 98 deletions

View File

@@ -48,7 +48,7 @@ fn queue_log(handle: &ThreadHandle, expected: usize, message: impl Into<String>)
fn queue_log_microtask(handle: &ThreadHandle, expected: usize, message: impl Into<String>) { fn queue_log_microtask(handle: &ThreadHandle, expected: usize, message: impl Into<String>) {
let message = message.into(); let message = message.into();
let queued = handle.queue_microtask(move || { let queued = handle.queue_task(move || {
log_event_impl(expected, format_args!("{message}")); log_event_impl(expected, format_args!("{message}"));
}); });
assert!(queued, "main thread should accept log microtask {expected}"); assert!(queued, "main thread should accept log microtask {expected}");

View File

@@ -26,8 +26,11 @@ impl<T: Send + 'static> CompletionState<T> {
return; return;
} }
// Queue as a macrotask: I/O completion events belong to the macrotask
// queue, consistent with the JavaScript event-loop model where I/O
// callbacks are macrotasks and the microtask queue is thread-local only.
let state = Arc::clone(self); let state = Arc::clone(self);
if !self.owner.queue_microtask(move || { if !self.owner.queue_task(move || {
state.wake_queued.store(false, Ordering::Release); state.wake_queued.store(false, Ordering::Release);
if let Some(waker) = state.waker.lock().unwrap().take() { if let Some(waker) = state.waker.lock().unwrap().take() {
waker.wake(); waker.wake();
@@ -138,10 +141,25 @@ impl<T> Drop for CompletionFuture<T> {
let _ = self.state.result.lock().unwrap().take(); let _ = self.state.result.lock().unwrap().take();
let _ = self.state.waker.lock().unwrap().take(); let _ = self.state.waker.lock().unwrap().take();
if !self.state.finished.load(Ordering::Acquire) if self.state.finished.load(Ordering::Acquire) {
&& let Some(cancel) = self.state.cancel.lock().unwrap().take() return;
{ }
if let Some(cancel) = self.state.cancel.lock().unwrap().take() {
// Delegate to the cancel callback (e.g. submit an io_uring cancel).
// The actual I/O completion will eventually call handle.finish(),
// which decrements pending_ops.
cancel(); cancel();
} else {
// No cancel callback was registered — this happens when submit_operation
// failed before set_cancel could be called, leaving no path through
// which finish() would run. Decrement pending_ops directly so the
// runtime does not stall indefinitely waiting for an operation that
// will never complete. The swap is atomic: if finish() races to set
// finished first, the swap returns true and we skip the decrement.
if !self.state.finished.swap(true, Ordering::AcqRel) {
self.state.owner.finish_async_operation();
}
} }
} }
} }

View File

@@ -2,7 +2,7 @@
use std::cell::Cell; use std::cell::Cell;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::HashMap;
use std::io; use std::io;
use std::os::fd::RawFd; use std::os::fd::RawFd;
use std::sync::Arc; use std::sync::Arc;
@@ -90,7 +90,7 @@ pub struct Driver {
active_timer_token: Cell<Option<u64>>, active_timer_token: Cell<Option<u64>>,
pending_wakes: Cell<u64>, pending_wakes: Cell<u64>,
pending_timers: Cell<u64>, pending_timers: Cell<u64>,
completions: RefCell<BTreeMap<u64, CompletionHandler>>, completions: RefCell<HashMap<u64, CompletionHandler>>,
} }
/// Creates a new driver and its paired [`ThreadNotifier`]. /// Creates a new driver and its paired [`ThreadNotifier`].
@@ -123,7 +123,7 @@ pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> {
active_timer_token: Cell::new(None), active_timer_token: Cell::new(None),
pending_wakes: Cell::new(0), pending_wakes: Cell::new(0),
pending_timers: Cell::new(0), pending_timers: Cell::new(0),
completions: RefCell::new(BTreeMap::new()), completions: RefCell::new(HashMap::new()),
}, },
ThreadNotifier { inner: notifier }, ThreadNotifier { inner: notifier },
)) ))

View File

@@ -1,8 +1,9 @@
//! Public runtime loop and worker-thread primitives. //! Public runtime loop and worker-thread primitives.
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::future::Future; use std::future::Future;
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::ptr; use std::ptr;
use std::rc::Rc; use std::rc::Rc;
@@ -18,6 +19,18 @@ type LocalTask = Box<dyn FnOnce() + 'static>;
type SendTask = Box<dyn FnOnce() + Send + 'static>; type SendTask = Box<dyn FnOnce() + Send + 'static>;
type LocalBoxFuture = Pin<Box<dyn Future<Output = ()> + 'static>>; type LocalBoxFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
/// If the microtask queue runs more than this many tasks in a single turn
/// without yielding to the macrotask queue, a warning is emitted.
const MICROTASK_STARVATION_THRESHOLD: u64 = 1000;
struct MacroTask {
task: LocalTask,
/// Wall time at which this task entered the local queue. Populated only
/// in debug builds; used to emit a trace event reporting queue-wait time.
#[cfg(debug_assertions)]
queued_at: Duration,
}
#[thread_local] #[thread_local]
static mut CURRENT_THREAD: *mut ThreadState = ptr::null_mut(); static mut CURRENT_THREAD: *mut ThreadState = ptr::null_mut();
@@ -354,10 +367,20 @@ pub fn run() {
loop { loop {
drain_all(); drain_all();
let mut microtasks_run: u64 = 0;
while let Some(task) = pop_microtask() { while let Some(task) = pop_microtask() {
task(); task();
microtasks_run += 1;
drain_all(); drain_all();
} }
if microtasks_run >= MICROTASK_STARVATION_THRESHOLD {
tracing::warn!(
target: trace_targets::SCHEDULER,
event = "microtask_starvation",
count = microtasks_run,
"microtask queue ran {microtasks_run} tasks in a single turn; macrotask handlers may be starved",
);
}
if let Some(task) = pop_macrotask() { if let Some(task) = pop_macrotask() {
task(); task();
@@ -397,12 +420,31 @@ pub fn run() {
continue; continue;
} }
// Atomically commit to exit: set `closed` while holding the remote
// queue lock. `enqueue_macro` also checks `closed` under this same
// lock, so there is no window in which a task can be accepted after
// we decide to exit. If a task snuck in between the `has_ready_work`
// check above and acquiring the lock, we abort and process it first.
let committed = {
let remote = lock_queue(&state.shared.remote_macrotasks);
if remote.is_empty() {
state.shared.closed.store(true, Ordering::Release);
true
} else {
false
}
};
if !committed {
state.shared.closing.store(false, Ordering::Release);
continue;
}
if let Some(completion) = &state.worker_completion { if let Some(completion) = &state.worker_completion {
completion.finished.store(true, Ordering::Release); completion.finished.store(true, Ordering::Release);
completion.parent_event.shared.notify(); completion.parent_event.shared.notify();
} }
state.shared.closed.store(true, Ordering::Release);
state.shared.notify(); state.shared.notify();
tracing::debug!( tracing::debug!(
target: trace_targets::RUNTIME, target: trace_targets::RUNTIME,
@@ -445,25 +487,6 @@ impl ThreadHandle {
queued queued
} }
/// Queues a microtask onto this runtime thread.
///
/// Returns `false` if the target thread is already closed.
pub fn queue_microtask<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
{
let queued = self.shared.enqueue_micro(Box::new(task));
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "queue_remote_microtask",
queue = "remote_micro",
queued,
"queueing remote microtask"
);
queued
}
/// Returns `true` if the target runtime thread has shut down. /// Returns `true` if the target runtime thread has shut down.
pub fn is_closed(&self) -> bool { pub fn is_closed(&self) -> bool {
self.shared.closed.load(Ordering::Acquire) self.shared.closed.load(Ordering::Acquire)
@@ -493,16 +516,6 @@ impl WorkerHandle {
self.thread.queue_task(task) self.thread.queue_task(task)
} }
/// Queues a microtask onto the worker thread.
///
/// Returns `false` if the worker has already shut down.
pub fn queue_microtask<F>(&self, task: F) -> bool
where
F: FnOnce() + Send + 'static,
{
self.thread.queue_microtask(task)
}
/// Returns `true` once the worker thread has fully exited. /// Returns `true` once the worker thread has fully exited.
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
self.completion.finished.load(Ordering::Acquire) self.completion.finished.load(Ordering::Acquire)
@@ -541,7 +554,7 @@ struct ThreadState {
shared: Arc<ThreadShared>, shared: Arc<ThreadShared>,
worker_completion: Option<Arc<WorkerCompletion>>, worker_completion: Option<Arc<WorkerCompletion>>,
local_microtasks: RefCell<VecDeque<LocalTask>>, local_microtasks: RefCell<VecDeque<LocalTask>>,
local_macrotasks: RefCell<VecDeque<LocalTask>>, local_macrotasks: RefCell<VecDeque<MacroTask>>,
timers: RefCell<TimerHeap>, timers: RefCell<TimerHeap>,
next_timer_id: Cell<usize>, next_timer_id: Cell<usize>,
children: RefCell<Vec<ChildWorker>>, children: RefCell<Vec<ChildWorker>>,
@@ -589,7 +602,9 @@ impl ThreadState {
struct ThreadShared { struct ThreadShared {
notifier: ThreadNotifier, notifier: ThreadNotifier,
remote_microtasks: Mutex<VecDeque<SendTask>>, // The microtask queue is strictly thread-local; only macrotasks may be
// enqueued from remote threads, keeping the microtask queue free from
// cross-thread interference.
remote_macrotasks: Mutex<VecDeque<SendTask>>, remote_macrotasks: Mutex<VecDeque<SendTask>>,
pending_ops: AtomicUsize, pending_ops: AtomicUsize,
closing: AtomicBool, closing: AtomicBool,
@@ -600,7 +615,6 @@ impl ThreadShared {
fn new(notifier: ThreadNotifier) -> Self { fn new(notifier: ThreadNotifier) -> Self {
Self { Self {
notifier, notifier,
remote_microtasks: Mutex::new(VecDeque::new()),
remote_macrotasks: Mutex::new(VecDeque::new()), remote_macrotasks: Mutex::new(VecDeque::new()),
pending_ops: AtomicUsize::new(0), pending_ops: AtomicUsize::new(0),
closing: AtomicBool::new(false), closing: AtomicBool::new(false),
@@ -608,30 +622,37 @@ impl ThreadShared {
} }
} }
fn enqueue_micro(&self, task: SendTask) -> bool {
if self.closed.load(Ordering::Acquire) {
return false;
}
lock_queue(&self.remote_microtasks).push_back(task);
self.notify();
true
}
fn enqueue_macro(&self, task: SendTask) -> bool { fn enqueue_macro(&self, task: SendTask) -> bool {
// Check `closed` under the queue lock so that the exit path can
// atomically set `closed` while holding the same lock, eliminating
// the window where a task is accepted but then stranded at shutdown.
let mut queue = lock_queue(&self.remote_macrotasks);
if self.closed.load(Ordering::Acquire) { if self.closed.load(Ordering::Acquire) {
return false; return false;
} }
queue.push_back(task);
lock_queue(&self.remote_macrotasks).push_back(task); drop(queue);
// Notify after releasing the lock. By this point `closed` is still
// false (we verified under the lock) and teardown_thread has not run,
// so the ring is guaranteed to be alive.
self.notify(); self.notify();
true true
} }
fn notify(&self) { fn notify(&self) {
self.notifier if let Err(error) = self.notifier.notify() {
.notify() // BrokenPipe is expected during shutdown (ring already closed).
.expect("thread notifier should succeed"); // Any other error is unexpected; log it rather than panicking so
// the runtime continues to make progress.
if error.kind() != io::ErrorKind::BrokenPipe {
tracing::error!(
target: trace_targets::DRIVER,
event = "notify_error",
?error,
"unexpected error sending thread notification"
);
}
}
} }
} }
@@ -684,14 +705,14 @@ impl TimerNode {
struct TimerHeap { struct TimerHeap {
nodes: Vec<TimerNode>, nodes: Vec<TimerNode>,
positions: BTreeMap<usize, usize>, positions: HashMap<usize, usize>,
} }
impl TimerHeap { impl TimerHeap {
fn new() -> Self { fn new() -> Self {
Self { Self {
nodes: Vec::new(), nodes: Vec::new(),
positions: BTreeMap::new(), positions: HashMap::new(),
} }
} }
@@ -987,21 +1008,20 @@ fn drain_driver_events() {
} }
fn drain_remote_tasks() { fn drain_remote_tasks() {
let state = current_thread(); // Swap the entire remote queue under the lock and release immediately,
// minimizing the time the lock is held and avoiding per-item allocation.
let drained = {
let mut remote = lock_queue(&current_thread().shared.remote_macrotasks);
std::mem::take(&mut *remote)
};
{ if !drained.is_empty() {
let mut local = state.local_microtasks.borrow_mut(); let mut local = current_thread().local_macrotasks.borrow_mut();
let mut remote = lock_queue(&state.shared.remote_microtasks); for task in drained {
while let Some(task) = remote.pop_front() { // SendTask (Box<dyn FnOnce() + Send>) coerces to LocalTask
local.push_back(task); // (Box<dyn FnOnce()>) by dropping the Send bound.
} let task: LocalTask = task;
} local.push_back(make_macro_task(task));
{
let mut local = state.local_macrotasks.borrow_mut();
let mut remote = lock_queue(&state.shared.remote_macrotasks);
while let Some(task) = remote.pop_front() {
local.push_back(task);
} }
} }
} }
@@ -1030,7 +1050,7 @@ fn drain_completed_workers() {
let mut local = state.local_macrotasks.borrow_mut(); let mut local = state.local_macrotasks.borrow_mut();
for mut child in exited { for mut child in exited {
if let Some(task) = child.on_exit.take() { if let Some(task) = child.on_exit.take() {
local.push_back(task); local.push_back(make_macro_task(task));
} }
} }
} }
@@ -1040,14 +1060,39 @@ fn pop_microtask() -> Option<LocalTask> {
} }
fn pop_macrotask() -> Option<LocalTask> { fn pop_macrotask() -> Option<LocalTask> {
current_thread().local_macrotasks.borrow_mut().pop_front() current_thread()
.local_macrotasks
.borrow_mut()
.pop_front()
.map(|entry| {
#[cfg(debug_assertions)]
{
let now = deadline_from_now(Duration::ZERO);
let wait = now.saturating_sub(entry.queued_at);
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "macrotask_dequeued",
wait_ns = wait.as_nanos() as u64,
"macrotask dequeued after waiting in queue"
);
}
entry.task
})
} }
fn push_local_macrotask(task: LocalTask) { fn push_local_macrotask(task: LocalTask) {
current_thread() current_thread()
.local_macrotasks .local_macrotasks
.borrow_mut() .borrow_mut()
.push_back(task); .push_back(make_macro_task(task));
}
fn make_macro_task(task: LocalTask) -> MacroTask {
MacroTask {
task,
#[cfg(debug_assertions)]
queued_at: deadline_from_now(Duration::ZERO),
}
} }
fn has_ready_work() -> bool { fn has_ready_work() -> bool {
@@ -1056,9 +1101,7 @@ fn has_ready_work() -> bool {
return true; return true;
} }
if !lock_queue(&state.shared.remote_microtasks).is_empty() if !lock_queue(&state.shared.remote_macrotasks).is_empty() {
|| !lock_queue(&state.shared.remote_macrotasks).is_empty()
{
return true; return true;
} }
@@ -1072,7 +1115,8 @@ fn has_pending_timers() -> bool {
fn allocate_timer_id() -> usize { fn allocate_timer_id() -> usize {
let state = current_thread(); let state = current_thread();
let id = state.next_timer_id.get(); let id = state.next_timer_id.get();
state.next_timer_id.set(id.wrapping_add(1)); let next = id.checked_add(1).expect("timer ID space exhausted");
state.next_timer_id.set(next);
id id
} }
@@ -1096,27 +1140,31 @@ fn dispatch_expired_timers() {
return; return;
} }
for mut timer in due { for timer in due {
match timer.kind { match timer.kind {
TimerKind::Timeout(callback) => push_local_macrotask(callback), TimerKind::Timeout(callback) => push_local_macrotask(callback),
TimerKind::Interval { interval, callback } => { TimerKind::Interval { interval, callback } => {
let mut next_deadline = timer.deadline; // Mirror JavaScript's setInterval semantics:
// 1. Schedule the next tick BEFORE queuing the callback,
// so a slow callback does not shift the schedule.
// 2. Fire at most once per dispatch regardless of how many
// ticks were missed; the interval simply drifts forward
// rather than back-filling a burst of callbacks.
let next_deadline = timer
.deadline
.checked_add(interval)
.unwrap_or(Duration::MAX);
let next_timer = TimerNode::interval(
timer.id,
next_deadline,
interval,
Rc::clone(&callback),
);
current_thread().timers.borrow_mut().insert(next_timer);
loop {
let queued_callback = Rc::clone(&callback);
push_local_macrotask(Box::new(move || { push_local_macrotask(Box::new(move || {
(queued_callback.borrow_mut())(); (callback.borrow_mut())();
})); }));
next_deadline = next_deadline.checked_add(interval).unwrap_or(Duration::MAX);
if next_deadline > now || next_deadline == Duration::MAX {
break;
}
}
timer.deadline = next_deadline;
timer.kind = TimerKind::Interval { interval, callback };
current_thread().timers.borrow_mut().insert(timer);
} }
} }
} }

View File

@@ -244,6 +244,12 @@ impl IoUring {
} }
pub(crate) fn submit_timeout(&self, token: u64, deadline: Duration) -> io::Result<()> { pub(crate) fn submit_timeout(&self, token: u64, deadline: Duration) -> io::Result<()> {
// SAFETY: `timespec` is stack-allocated and its pointer is passed to
// the kernel via the SQE's `addr` field. This is safe because
// `submit_pending` calls `io_uring_enter` synchronously within this
// stack frame, before `timespec` is dropped. The kernel copies the
// timespec value during submission of IORING_OP_TIMEOUT, so no
// lifetime extension beyond this call is required.
let timespec = duration_to_kernel_timespec(deadline); let timespec = duration_to_kernel_timespec(deadline);
self.push_sqe(|sqe| { self.push_sqe(|sqe| {
sqe.opcode = IORING_OP_TIMEOUT; sqe.opcode = IORING_OP_TIMEOUT;
@@ -277,6 +283,9 @@ impl IoUring {
token_to_update: u64, token_to_update: u64,
deadline: Duration, deadline: Duration,
) -> io::Result<()> { ) -> io::Result<()> {
// SAFETY: Same stack-pointer contract as submit_timeout — the kernel
// copies the timespec during the synchronous io_uring_enter call made
// by submit_pending before this function returns.
let timespec = duration_to_kernel_timespec(deadline); let timespec = duration_to_kernel_timespec(deadline);
self.push_sqe(|sqe| { self.push_sqe(|sqe| {
sqe.opcode = IORING_OP_TIMEOUT_REMOVE; sqe.opcode = IORING_OP_TIMEOUT_REMOVE;

View File

@@ -372,7 +372,7 @@ impl ReadDirState {
} }
let state = Arc::clone(self); let state = Arc::clone(self);
if !self.owner.queue_microtask(move || { if !self.owner.queue_task(move || {
state.wake_queued.store(false, Ordering::Release); state.wake_queued.store(false, Ordering::Release);
if let Some(waker) = state.waker.lock().unwrap().take() { if let Some(waker) = state.waker.lock().unwrap().take() {
waker.wake(); waker.wake();

View File

@@ -160,6 +160,8 @@ impl std::error::Error for Elapsed {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@@ -199,4 +201,39 @@ mod tests {
assert_eq!(log.as_slice(), ["started", "slept", "timed out"]); assert_eq!(log.as_slice(), ["started", "slept", "timed out"]);
} }
/// Verify that `sleep(Duration::ZERO).await` yields to the macrotask queue
/// before the future continues. A macrotask queued before the sleep must
/// run before the future's continuation.
#[test]
fn sleep_zero_yields_to_macrotask_queue() {
let order = std::thread::spawn(|| {
let order = Rc::new(RefCell::new(Vec::<&'static str>::new()));
// Macrotask queued before the sleep.
{
let order = Rc::clone(&order);
queue_task(move || order.borrow_mut().push("macrotask"));
}
// Future that awaits sleep(ZERO) and then records its continuation.
{
let order = Rc::clone(&order);
queue_future(async move {
sleep(Duration::ZERO).await;
order.borrow_mut().push("after_sleep");
});
}
run();
Rc::try_unwrap(order).unwrap().into_inner()
})
.join()
.expect("test thread should join");
// The macrotask must run before the sleep future continues, because
// sleep(ZERO) resolves via a timer event (macrotask), so the queued
// macrotask runs first.
assert_eq!(order.as_slice(), ["macrotask", "after_sleep"]);
}
} }