Claude channels/timers review

This commit is contained in:
2026-03-22 16:13:59 -04:00
parent 6a825afcc6
commit 497af9151d
2 changed files with 234 additions and 76 deletions

View File

@@ -93,6 +93,31 @@ pub enum TryRecvError {
Disconnected, Disconnected,
} }
/// A wakeup deferred until the channel mutex has been released.
///
/// Waking a waiter while holding the channel lock can be expensive (cross-thread
/// wakeups go through the io_uring ring notification path) and risks priority
/// inversion. All `State` methods collect these instead of calling
/// `CompletionHandle::complete` directly; the caller fires them after dropping
/// the `MutexGuard`.
enum PendingCompletion<T: Send + 'static> {
RecvSome(CompletionHandle<Option<T>>, T),
RecvNone(CompletionHandle<Option<T>>),
SendOk(CompletionHandle<Result<(), SendError<T>>>),
SendErr(CompletionHandle<Result<(), SendError<T>>>, T),
}
fn fire_completions<T: Send + 'static>(completions: Vec<PendingCompletion<T>>) {
for c in completions {
match c {
PendingCompletion::RecvSome(h, v) => h.complete(Some(v)),
PendingCompletion::RecvNone(h) => h.complete(None),
PendingCompletion::SendOk(h) => h.complete(Ok(())),
PendingCompletion::SendErr(h, v) => h.complete(Err(SendError(v))),
}
}
}
impl<T: Send + 'static> State<T> { impl<T: Send + 'static> State<T> {
fn new(capacity: Option<usize>) -> Self { fn new(capacity: Option<usize>) -> Self {
Self { Self {
@@ -106,13 +131,17 @@ impl<T: Send + 'static> State<T> {
} }
} }
fn try_send_now(&mut self, value: T) -> Result<(), TrySendError<T>> { fn try_send_now(
&mut self,
value: T,
completions: &mut Vec<PendingCompletion<T>>,
) -> Result<(), TrySendError<T>> {
if self.receiver_closed { if self.receiver_closed {
return Err(TrySendError::Closed(value)); return Err(TrySendError::Closed(value));
} }
if let Some(waiter) = self.recv_waiter.take() { if let Some(waiter) = self.recv_waiter.take() {
waiter.complete(Some(value)); completions.push(PendingCompletion::RecvSome(waiter, value));
return Ok(()); return Ok(());
} }
@@ -151,10 +180,10 @@ impl<T: Send + 'static> State<T> {
true true
} }
fn pump_senders(&mut self) { fn pump_senders(&mut self, completions: &mut Vec<PendingCompletion<T>>) {
loop { loop {
if self.receiver_closed { if self.receiver_closed {
self.fail_pending_senders(); self.fail_pending_senders(completions);
break; break;
} }
@@ -170,44 +199,47 @@ impl<T: Send + 'static> State<T> {
}; };
if let Some(receiver) = self.recv_waiter.take() { if let Some(receiver) = self.recv_waiter.take() {
receiver.complete(Some(waiter.value)); completions.push(PendingCompletion::RecvSome(receiver, waiter.value));
} else { } else {
self.queue.push_back(waiter.value); self.queue.push_back(waiter.value);
} }
waiter.handle.complete(Ok(())); completions.push(PendingCompletion::SendOk(waiter.handle));
} }
if self.queue.is_empty() if self.queue.is_empty()
&& self.sender_count == 0 && self.sender_count == 0
&& let Some(waiter) = self.recv_waiter.take() && let Some(waiter) = self.recv_waiter.take()
{ {
waiter.complete(None); completions.push(PendingCompletion::RecvNone(waiter));
} }
} }
fn fail_pending_senders(&mut self) { fn fail_pending_senders(&mut self, completions: &mut Vec<PendingCompletion<T>>) {
while let Some(waiter) = self.send_waiters.pop_front() { while let Some(waiter) = self.send_waiters.pop_front() {
waiter.handle.complete(Err(SendError(waiter.value))); completions.push(PendingCompletion::SendErr(waiter.handle, waiter.value));
} }
} }
fn close_receiver(&mut self) { fn close_receiver(&mut self, completions: &mut Vec<PendingCompletion<T>>) {
self.receiver_closed = true; self.receiver_closed = true;
self.fail_pending_senders(); self.fail_pending_senders(completions);
if self.queue.is_empty() if self.queue.is_empty()
&& let Some(waiter) = self.recv_waiter.take() && let Some(waiter) = self.recv_waiter.take()
{ {
waiter.complete(None); completions.push(PendingCompletion::RecvNone(waiter));
} }
} }
fn drop_sender(&mut self) { fn drop_sender(&mut self, completions: &mut Vec<PendingCompletion<T>>) {
self.sender_count = self.sender_count.saturating_sub(1); self.sender_count = self
.sender_count
.checked_sub(1)
.expect("sender count underflow: more drops than creates");
if self.sender_count == 0 if self.sender_count == 0
&& self.queue.is_empty() && self.queue.is_empty()
&& let Some(waiter) = self.recv_waiter.take() && let Some(waiter) = self.recv_waiter.take()
{ {
waiter.complete(None); completions.push(PendingCompletion::RecvNone(waiter));
} }
} }
} }
@@ -252,10 +284,16 @@ impl<T: Send + 'static> Sender<T> {
/// Attempts to queue a message immediately. /// Attempts to queue a message immediately.
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> { pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
self.shared let mut completions = Vec::new();
let result = {
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.try_send_now(value) state.try_send_now(value, &mut completions)
};
fire_completions(completions);
result
} }
/// Returns `true` if the receiver has been closed or dropped. /// Returns `true` if the receiver has been closed or dropped.
@@ -281,22 +319,30 @@ impl<T: Send + 'static> Sender<T> {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} else { } else {
let mut completions = Vec::new();
let first_result = {
let mut state = self let mut state = self
.shared .shared
.lock() .lock()
.expect("mpsc state should not be poisoned"); .expect("mpsc state should not be poisoned");
match state.try_send_now(value_slot.take().expect("send value should be present")) { state.try_send_now(
value_slot.take().expect("send value should be present"),
&mut completions,
)
};
fire_completions(completions);
match first_result {
Ok(()) => Poll::Ready(Ok(())), Ok(()) => Poll::Ready(Ok(())),
Err(TrySendError::Closed(value)) => Poll::Ready(Err(SendError(value))), Err(TrySendError::Closed(value)) => Poll::Ready(Err(SendError(value))),
Err(TrySendError::Full(returned)) => { Err(TrySendError::Full(returned)) => {
drop(state);
let (future, handle) = runtime_waiter::<Result<(), SendError<T>>>(); let (future, handle) = runtime_waiter::<Result<(), SendError<T>>>();
let state_shared = Arc::clone(&self.shared); let state_shared = Arc::clone(&self.shared);
let mut completions = Vec::new();
let registration = { let registration = {
let mut state = state_shared let mut state = state_shared
.lock() .lock()
.expect("mpsc state should not be poisoned"); .expect("mpsc state should not be poisoned");
match state.try_send_now(returned) { match state.try_send_now(returned, &mut completions) {
Ok(()) => Ok(None), Ok(()) => Ok(None),
Err(TrySendError::Closed(value)) => Err(SendError(value)), Err(TrySendError::Closed(value)) => Err(SendError(value)),
Err(TrySendError::Full(value)) => { Err(TrySendError::Full(value)) => {
@@ -304,6 +350,7 @@ impl<T: Send + 'static> Sender<T> {
} }
} }
}; };
fire_completions(completions);
match registration { match registration {
Ok(None) => { Ok(None) => {
handle.complete(Ok(())); handle.complete(Ok(()));
@@ -339,11 +386,16 @@ impl<T: Send + 'static> Sender<T> {
impl<T: Send + 'static> UnboundedSender<T> { impl<T: Send + 'static> UnboundedSender<T> {
/// Queues a message immediately. /// Queues a message immediately.
pub fn send(&self, value: T) -> Result<(), SendError<T>> { pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.shared let mut completions = Vec::new();
let result = {
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.try_send_now(value) state.try_send_now(value, &mut completions)
.map_err(|error| match error { };
fire_completions(completions);
result.map_err(|error| match error {
TrySendError::Full(value) | TrySendError::Closed(value) => SendError(value), TrySendError::Full(value) | TrySendError::Closed(value) => SendError(value),
}) })
} }
@@ -372,18 +424,23 @@ impl<T: Send + 'static> Receiver<T> {
/// Attempts to receive a message immediately. /// Attempts to receive a message immediately.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> { pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut completions = Vec::new();
let result = {
let mut state = self let mut state = self
.shared .shared
.lock() .lock()
.expect("mpsc state should not be poisoned"); .expect("mpsc state should not be poisoned");
if let Some(value) = state.queue.pop_front() { if let Some(value) = state.queue.pop_front() {
state.pump_senders(); state.pump_senders(&mut completions);
Ok(value) Ok(value)
} else if state.sender_count == 0 || state.receiver_closed { } else if state.sender_count == 0 || state.receiver_closed {
Err(TryRecvError::Disconnected) Err(TryRecvError::Disconnected)
} else { } else {
Err(TryRecvError::Empty) Err(TryRecvError::Empty)
} }
};
fire_completions(completions);
result
} }
/// Closes the channel to future sends. /// Closes the channel to future sends.
@@ -391,10 +448,15 @@ impl<T: Send + 'static> Receiver<T> {
/// Already-buffered messages remain available to [`recv`](Self::recv) and /// Already-buffered messages remain available to [`recv`](Self::recv) and
/// [`try_recv`](Self::try_recv). /// [`try_recv`](Self::try_recv).
pub fn close(&mut self) { pub fn close(&mut self) {
self.shared let mut completions = Vec::new();
{
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.close_receiver(); state.close_receiver(&mut completions);
}
fire_completions(completions);
} }
/// Returns `true` if the channel is closed or all senders have been dropped. /// Returns `true` if the channel is closed or all senders have been dropped.
@@ -432,16 +494,17 @@ impl<T: Send + 'static> Receiver<T> {
cancel_handle.finish(None); cancel_handle.finish(None);
}); });
let mut completions = Vec::new();
{ {
let mut state = self let mut state = self
.shared .shared
.lock() .lock()
.expect("mpsc state should not be poisoned"); .expect("mpsc state should not be poisoned");
if let Some(value) = state.queue.pop_front() { if let Some(value) = state.queue.pop_front() {
state.pump_senders(); state.pump_senders(&mut completions);
handle.complete(Some(value)); completions.push(PendingCompletion::RecvSome(handle.clone(), value));
} else if state.receiver_closed || state.sender_count == 0 { } else if state.receiver_closed || state.sender_count == 0 {
handle.complete(None); completions.push(PendingCompletion::RecvNone(handle.clone()));
} else { } else {
assert!( assert!(
state.recv_waiter.is_none(), state.recv_waiter.is_none(),
@@ -450,6 +513,7 @@ impl<T: Send + 'static> Receiver<T> {
state.recv_waiter = Some(handle.clone()); state.recv_waiter = Some(handle.clone());
} }
} }
fire_completions(completions);
*wait = Some(future); *wait = Some(future);
self.poll_recv(cx, wait) self.poll_recv(cx, wait)
@@ -459,28 +523,43 @@ impl<T: Send + 'static> Receiver<T> {
impl<T: Send + 'static> Drop for Sender<T> { impl<T: Send + 'static> Drop for Sender<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shared let mut completions = Vec::new();
{
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.drop_sender(); state.drop_sender(&mut completions);
}
fire_completions(completions);
} }
} }
impl<T: Send + 'static> Drop for UnboundedSender<T> { impl<T: Send + 'static> Drop for UnboundedSender<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shared let mut completions = Vec::new();
{
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.drop_sender(); state.drop_sender(&mut completions);
}
fire_completions(completions);
} }
} }
impl<T: Send + 'static> Drop for Receiver<T> { impl<T: Send + 'static> Drop for Receiver<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.shared let mut completions = Vec::new();
{
let mut state = self
.shared
.lock() .lock()
.expect("mpsc state should not be poisoned") .expect("mpsc state should not be poisoned");
.close_receiver(); state.close_receiver(&mut completions);
}
fire_completions(completions);
} }
} }

View File

@@ -206,25 +206,47 @@ where
{ {
let owner = current_thread_ptr(); let owner = current_thread_ptr();
let id = allocate_timer_id(); let id = allocate_timer_id();
let deadline = deadline_from_now(delay);
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::trace!( tracing::trace!(
target: trace_targets::TIMER, target: trace_targets::TIMER,
event = "set_interval", event = "set_interval",
timer_id = id, timer_id = id,
delay_ns = delay.as_nanos() as u64, delay_ns = delay.as_nanos() as u64,
deadline_ns = deadline.as_nanos() as u64,
"scheduling interval" "scheduling interval"
); );
if delay.is_zero() {
// Zero-delay intervals never touch the timer heap or arm an io_uring
// timer (a past-deadline kernel timer would fire on every event loop
// iteration, spinning the runtime at 100 % CPU). Instead the callback
// is stored in `immediate_intervals` and self-schedules as a macrotask
// each turn, mirroring JS `setInterval(f, 0)` semantics.
let callback = Rc::new(RefCell::new(Box::new(callback) as Box<dyn FnMut()>));
current_thread()
.immediate_intervals
.borrow_mut()
.insert(id, Rc::clone(&callback));
schedule_immediate_interval(owner, id);
} else {
let deadline = deadline_from_now(delay);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "set_interval_deadline",
timer_id = id,
deadline_ns = deadline.as_nanos() as u64,
"interval deadline computed"
);
let timer = TimerNode::interval( let timer = TimerNode::interval(
id, id,
deadline, deadline,
delay, delay,
Rc::new(RefCell::new(Box::new(callback))), Rc::new(RefCell::new(Box::new(callback))),
); );
current_thread().timers.borrow_mut().insert(timer); current_thread().timers.borrow_mut().insert(timer);
rearm_thread_timer(); rearm_thread_timer();
}
IntervalHandle { IntervalHandle {
id, id,
@@ -556,6 +578,9 @@ struct ThreadState {
local_microtasks: RefCell<VecDeque<LocalTask>>, local_microtasks: RefCell<VecDeque<LocalTask>>,
local_macrotasks: RefCell<VecDeque<MacroTask>>, local_macrotasks: RefCell<VecDeque<MacroTask>>,
timers: RefCell<TimerHeap>, timers: RefCell<TimerHeap>,
/// Zero-delay intervals bypasses the timer heap entirely. Each entry
/// re-enqueues itself as a macrotask on every turn.
immediate_intervals: RefCell<HashMap<usize, Rc<RefCell<Box<dyn FnMut()>>>>>,
next_timer_id: Cell<usize>, next_timer_id: Cell<usize>,
children: RefCell<Vec<ChildWorker>>, children: RefCell<Vec<ChildWorker>>,
} }
@@ -573,6 +598,7 @@ impl ThreadState {
local_microtasks: RefCell::new(VecDeque::new()), local_microtasks: RefCell::new(VecDeque::new()),
local_macrotasks: RefCell::new(VecDeque::new()), local_macrotasks: RefCell::new(VecDeque::new()),
timers: RefCell::new(TimerHeap::new()), timers: RefCell::new(TimerHeap::new()),
immediate_intervals: RefCell::new(HashMap::new()),
next_timer_id: Cell::new(1), next_timer_id: Cell::new(1),
children: RefCell::new(Vec::new()), children: RefCell::new(Vec::new()),
} }
@@ -1109,7 +1135,8 @@ fn has_ready_work() -> bool {
} }
fn has_pending_timers() -> bool { fn has_pending_timers() -> bool {
!current_thread().timers.borrow().is_empty() let state = current_thread();
!state.timers.borrow().is_empty() || !state.immediate_intervals.borrow().is_empty()
} }
fn allocate_timer_id() -> usize { fn allocate_timer_id() -> usize {
@@ -1126,11 +1153,37 @@ fn clear_timer(owner: *const ThreadState, id: usize) {
"timer handles must be cleared on their originating thread" "timer handles must be cleared on their originating thread"
); );
if current_thread().timers.borrow_mut().remove(id).is_some() { let state = current_thread();
if state.timers.borrow_mut().remove(id).is_some() {
rearm_thread_timer(); rearm_thread_timer();
} else {
// May be a zero-delay immediate interval; removal is a no-op if absent.
state.immediate_intervals.borrow_mut().remove(&id);
} }
} }
/// Enqueues one macrotask turn for a zero-delay interval.
///
/// The macrotask checks whether the interval is still live (not cleared), fires
/// the callback, and re-enqueues itself for the next turn.
fn schedule_immediate_interval(owner: *const ThreadState, id: usize) {
push_local_macrotask(Box::new(move || {
let callback = current_thread()
.immediate_intervals
.borrow()
.get(&id)
.map(Rc::clone);
let Some(callback) = callback else {
return; // interval was cleared before this turn ran
};
(callback.borrow_mut())();
// Re-enqueue for the next turn if still live.
if current_thread().immediate_intervals.borrow().contains_key(&id) {
schedule_immediate_interval(owner, id);
}
}));
}
fn dispatch_expired_timers() { fn dispatch_expired_timers() {
let now = deadline_from_now(Duration::ZERO); let now = deadline_from_now(Duration::ZERO);
let due = current_thread().timers.borrow_mut().pop_due(now); let due = current_thread().timers.borrow_mut().pop_due(now);
@@ -1333,4 +1386,30 @@ mod tests {
assert_eq!(*observed.lock().unwrap(), Some(7)); assert_eq!(*observed.lock().unwrap(), Some(7));
} }
#[test]
fn zero_interval_fires_once_per_turn_without_spinning() {
// set_interval(Duration::ZERO, ..) must not busy-spin the event loop.
// Each tick is one macrotask turn; we verify the count advances across
// turns by awaiting a sleep between checks.
let count = Rc::new(Cell::new(0usize));
let count_clone = Rc::clone(&count);
let handle_slot: Rc<RefCell<Option<super::IntervalHandle>>> =
Rc::new(RefCell::new(None));
let handle_slot_clone = Rc::clone(&handle_slot);
let handle = set_interval(Duration::ZERO, move || {
let next = count_clone.get() + 1;
count_clone.set(next);
if next == 5 {
let handle = handle_slot_clone.borrow_mut().take().unwrap();
clear_interval(&handle);
}
});
*handle_slot.borrow_mut() = Some(handle);
run();
assert_eq!(count.get(), 5);
}
} }