diff --git a/lib/runtime/src/channel/mpsc.rs b/lib/runtime/src/channel/mpsc.rs index f47273d..d6a2e08 100644 --- a/lib/runtime/src/channel/mpsc.rs +++ b/lib/runtime/src/channel/mpsc.rs @@ -93,6 +93,31 @@ pub enum TryRecvError { Disconnected, } +/// A wakeup deferred until the channel mutex has been released. +/// +/// Waking a waiter while holding the channel lock can be expensive (cross-thread +/// wakeups go through the io_uring ring notification path) and risks priority +/// inversion. All `State` methods collect these instead of calling +/// `CompletionHandle::complete` directly; the caller fires them after dropping +/// the `MutexGuard`. +enum PendingCompletion { + RecvSome(CompletionHandle>, T), + RecvNone(CompletionHandle>), + SendOk(CompletionHandle>>), + SendErr(CompletionHandle>>, T), +} + +fn fire_completions(completions: Vec>) { + for c in completions { + match c { + PendingCompletion::RecvSome(h, v) => h.complete(Some(v)), + PendingCompletion::RecvNone(h) => h.complete(None), + PendingCompletion::SendOk(h) => h.complete(Ok(())), + PendingCompletion::SendErr(h, v) => h.complete(Err(SendError(v))), + } + } +} + impl State { fn new(capacity: Option) -> Self { Self { @@ -106,13 +131,17 @@ impl State { } } - fn try_send_now(&mut self, value: T) -> Result<(), TrySendError> { + fn try_send_now( + &mut self, + value: T, + completions: &mut Vec>, + ) -> Result<(), TrySendError> { if self.receiver_closed { return Err(TrySendError::Closed(value)); } if let Some(waiter) = self.recv_waiter.take() { - waiter.complete(Some(value)); + completions.push(PendingCompletion::RecvSome(waiter, value)); return Ok(()); } @@ -151,10 +180,10 @@ impl State { true } - fn pump_senders(&mut self) { + fn pump_senders(&mut self, completions: &mut Vec>) { loop { if self.receiver_closed { - self.fail_pending_senders(); + self.fail_pending_senders(completions); break; } @@ -170,44 +199,47 @@ impl State { }; if let Some(receiver) = self.recv_waiter.take() { - receiver.complete(Some(waiter.value)); + completions.push(PendingCompletion::RecvSome(receiver, waiter.value)); } else { self.queue.push_back(waiter.value); } - waiter.handle.complete(Ok(())); + completions.push(PendingCompletion::SendOk(waiter.handle)); } if self.queue.is_empty() && self.sender_count == 0 && let Some(waiter) = self.recv_waiter.take() { - waiter.complete(None); + completions.push(PendingCompletion::RecvNone(waiter)); } } - fn fail_pending_senders(&mut self) { + fn fail_pending_senders(&mut self, completions: &mut Vec>) { while let Some(waiter) = self.send_waiters.pop_front() { - waiter.handle.complete(Err(SendError(waiter.value))); + completions.push(PendingCompletion::SendErr(waiter.handle, waiter.value)); } } - fn close_receiver(&mut self) { + fn close_receiver(&mut self, completions: &mut Vec>) { self.receiver_closed = true; - self.fail_pending_senders(); + self.fail_pending_senders(completions); if self.queue.is_empty() && let Some(waiter) = self.recv_waiter.take() { - waiter.complete(None); + completions.push(PendingCompletion::RecvNone(waiter)); } } - fn drop_sender(&mut self) { - self.sender_count = self.sender_count.saturating_sub(1); + fn drop_sender(&mut self, completions: &mut Vec>) { + self.sender_count = self + .sender_count + .checked_sub(1) + .expect("sender count underflow: more drops than creates"); if self.sender_count == 0 && self.queue.is_empty() && let Some(waiter) = self.recv_waiter.take() { - waiter.complete(None); + completions.push(PendingCompletion::RecvNone(waiter)); } } } @@ -252,10 +284,16 @@ impl Sender { /// Attempts to queue a message immediately. pub fn try_send(&self, value: T) -> Result<(), TrySendError> { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .try_send_now(value) + let mut completions = Vec::new(); + let result = { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.try_send_now(value, &mut completions) + }; + fire_completions(completions); + result } /// Returns `true` if the receiver has been closed or dropped. @@ -281,22 +319,30 @@ impl Sender { Poll::Pending => Poll::Pending, } } else { - let mut state = self - .shared - .lock() - .expect("mpsc state should not be poisoned"); - match state.try_send_now(value_slot.take().expect("send value should be present")) { + let mut completions = Vec::new(); + let first_result = { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.try_send_now( + value_slot.take().expect("send value should be present"), + &mut completions, + ) + }; + fire_completions(completions); + match first_result { Ok(()) => Poll::Ready(Ok(())), Err(TrySendError::Closed(value)) => Poll::Ready(Err(SendError(value))), Err(TrySendError::Full(returned)) => { - drop(state); let (future, handle) = runtime_waiter::>>(); let state_shared = Arc::clone(&self.shared); + let mut completions = Vec::new(); let registration = { let mut state = state_shared .lock() .expect("mpsc state should not be poisoned"); - match state.try_send_now(returned) { + match state.try_send_now(returned, &mut completions) { Ok(()) => Ok(None), Err(TrySendError::Closed(value)) => Err(SendError(value)), Err(TrySendError::Full(value)) => { @@ -304,6 +350,7 @@ impl Sender { } } }; + fire_completions(completions); match registration { Ok(None) => { handle.complete(Ok(())); @@ -339,13 +386,18 @@ impl Sender { impl UnboundedSender { /// Queues a message immediately. pub fn send(&self, value: T) -> Result<(), SendError> { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .try_send_now(value) - .map_err(|error| match error { - TrySendError::Full(value) | TrySendError::Closed(value) => SendError(value), - }) + let mut completions = Vec::new(); + let result = { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.try_send_now(value, &mut completions) + }; + fire_completions(completions); + result.map_err(|error| match error { + TrySendError::Full(value) | TrySendError::Closed(value) => SendError(value), + }) } /// Returns `true` if the receiver has been closed or dropped. @@ -372,18 +424,23 @@ impl Receiver { /// Attempts to receive a message immediately. pub fn try_recv(&mut self) -> Result { - let mut state = self - .shared - .lock() - .expect("mpsc state should not be poisoned"); - if let Some(value) = state.queue.pop_front() { - state.pump_senders(); - Ok(value) - } else if state.sender_count == 0 || state.receiver_closed { - Err(TryRecvError::Disconnected) - } else { - Err(TryRecvError::Empty) - } + let mut completions = Vec::new(); + let result = { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + if let Some(value) = state.queue.pop_front() { + state.pump_senders(&mut completions); + Ok(value) + } else if state.sender_count == 0 || state.receiver_closed { + Err(TryRecvError::Disconnected) + } else { + Err(TryRecvError::Empty) + } + }; + fire_completions(completions); + result } /// Closes the channel to future sends. @@ -391,10 +448,15 @@ impl Receiver { /// Already-buffered messages remain available to [`recv`](Self::recv) and /// [`try_recv`](Self::try_recv). pub fn close(&mut self) { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .close_receiver(); + let mut completions = Vec::new(); + { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.close_receiver(&mut completions); + } + fire_completions(completions); } /// Returns `true` if the channel is closed or all senders have been dropped. @@ -432,16 +494,17 @@ impl Receiver { cancel_handle.finish(None); }); + let mut completions = Vec::new(); { let mut state = self .shared .lock() .expect("mpsc state should not be poisoned"); if let Some(value) = state.queue.pop_front() { - state.pump_senders(); - handle.complete(Some(value)); + state.pump_senders(&mut completions); + completions.push(PendingCompletion::RecvSome(handle.clone(), value)); } else if state.receiver_closed || state.sender_count == 0 { - handle.complete(None); + completions.push(PendingCompletion::RecvNone(handle.clone())); } else { assert!( state.recv_waiter.is_none(), @@ -450,6 +513,7 @@ impl Receiver { state.recv_waiter = Some(handle.clone()); } } + fire_completions(completions); *wait = Some(future); self.poll_recv(cx, wait) @@ -459,28 +523,43 @@ impl Receiver { impl Drop for Sender { fn drop(&mut self) { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .drop_sender(); + let mut completions = Vec::new(); + { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.drop_sender(&mut completions); + } + fire_completions(completions); } } impl Drop for UnboundedSender { fn drop(&mut self) { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .drop_sender(); + let mut completions = Vec::new(); + { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.drop_sender(&mut completions); + } + fire_completions(completions); } } impl Drop for Receiver { fn drop(&mut self) { - self.shared - .lock() - .expect("mpsc state should not be poisoned") - .close_receiver(); + let mut completions = Vec::new(); + { + let mut state = self + .shared + .lock() + .expect("mpsc state should not be poisoned"); + state.close_receiver(&mut completions); + } + fire_completions(completions); } } diff --git a/lib/runtime/src/platform/linux_x86_64/runtime.rs b/lib/runtime/src/platform/linux_x86_64/runtime.rs index c055656..039a4b5 100644 --- a/lib/runtime/src/platform/linux_x86_64/runtime.rs +++ b/lib/runtime/src/platform/linux_x86_64/runtime.rs @@ -206,25 +206,47 @@ where { let owner = current_thread_ptr(); let id = allocate_timer_id(); - let deadline = deadline_from_now(delay); + #[cfg(debug_assertions)] tracing::trace!( target: trace_targets::TIMER, event = "set_interval", timer_id = id, delay_ns = delay.as_nanos() as u64, - deadline_ns = deadline.as_nanos() as u64, "scheduling interval" ); - let timer = TimerNode::interval( - id, - deadline, - delay, - Rc::new(RefCell::new(Box::new(callback))), - ); - current_thread().timers.borrow_mut().insert(timer); - rearm_thread_timer(); + if delay.is_zero() { + // Zero-delay intervals never touch the timer heap or arm an io_uring + // timer (a past-deadline kernel timer would fire on every event loop + // iteration, spinning the runtime at 100 % CPU). Instead the callback + // is stored in `immediate_intervals` and self-schedules as a macrotask + // each turn, mirroring JS `setInterval(f, 0)` semantics. + let callback = Rc::new(RefCell::new(Box::new(callback) as Box)); + current_thread() + .immediate_intervals + .borrow_mut() + .insert(id, Rc::clone(&callback)); + schedule_immediate_interval(owner, id); + } else { + let deadline = deadline_from_now(delay); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "set_interval_deadline", + timer_id = id, + deadline_ns = deadline.as_nanos() as u64, + "interval deadline computed" + ); + let timer = TimerNode::interval( + id, + deadline, + delay, + Rc::new(RefCell::new(Box::new(callback))), + ); + current_thread().timers.borrow_mut().insert(timer); + rearm_thread_timer(); + } IntervalHandle { id, @@ -556,6 +578,9 @@ struct ThreadState { local_microtasks: RefCell>, local_macrotasks: RefCell>, timers: RefCell, + /// Zero-delay intervals bypasses the timer heap entirely. Each entry + /// re-enqueues itself as a macrotask on every turn. + immediate_intervals: RefCell>>>>, next_timer_id: Cell, children: RefCell>, } @@ -573,6 +598,7 @@ impl ThreadState { local_microtasks: RefCell::new(VecDeque::new()), local_macrotasks: RefCell::new(VecDeque::new()), timers: RefCell::new(TimerHeap::new()), + immediate_intervals: RefCell::new(HashMap::new()), next_timer_id: Cell::new(1), children: RefCell::new(Vec::new()), } @@ -1109,7 +1135,8 @@ fn has_ready_work() -> bool { } fn has_pending_timers() -> bool { - !current_thread().timers.borrow().is_empty() + let state = current_thread(); + !state.timers.borrow().is_empty() || !state.immediate_intervals.borrow().is_empty() } fn allocate_timer_id() -> usize { @@ -1126,11 +1153,37 @@ fn clear_timer(owner: *const ThreadState, id: usize) { "timer handles must be cleared on their originating thread" ); - if current_thread().timers.borrow_mut().remove(id).is_some() { + let state = current_thread(); + if state.timers.borrow_mut().remove(id).is_some() { rearm_thread_timer(); + } else { + // May be a zero-delay immediate interval; removal is a no-op if absent. + state.immediate_intervals.borrow_mut().remove(&id); } } +/// Enqueues one macrotask turn for a zero-delay interval. +/// +/// The macrotask checks whether the interval is still live (not cleared), fires +/// the callback, and re-enqueues itself for the next turn. +fn schedule_immediate_interval(owner: *const ThreadState, id: usize) { + push_local_macrotask(Box::new(move || { + let callback = current_thread() + .immediate_intervals + .borrow() + .get(&id) + .map(Rc::clone); + let Some(callback) = callback else { + return; // interval was cleared before this turn ran + }; + (callback.borrow_mut())(); + // Re-enqueue for the next turn if still live. + if current_thread().immediate_intervals.borrow().contains_key(&id) { + schedule_immediate_interval(owner, id); + } + })); +} + fn dispatch_expired_timers() { let now = deadline_from_now(Duration::ZERO); let due = current_thread().timers.borrow_mut().pop_due(now); @@ -1333,4 +1386,30 @@ mod tests { assert_eq!(*observed.lock().unwrap(), Some(7)); } + + #[test] + fn zero_interval_fires_once_per_turn_without_spinning() { + // set_interval(Duration::ZERO, ..) must not busy-spin the event loop. + // Each tick is one macrotask turn; we verify the count advances across + // turns by awaiting a sleep between checks. + let count = Rc::new(Cell::new(0usize)); + let count_clone = Rc::clone(&count); + let handle_slot: Rc>> = + Rc::new(RefCell::new(None)); + let handle_slot_clone = Rc::clone(&handle_slot); + + let handle = set_interval(Duration::ZERO, move || { + let next = count_clone.get() + 1; + count_clone.set(next); + if next == 5 { + let handle = handle_slot_clone.borrow_mut().take().unwrap(); + clear_interval(&handle); + } + }); + *handle_slot.borrow_mut() = Some(handle); + + run(); + + assert_eq!(count.get(), 5); + } }