use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::error::Error; use std::fmt; use std::panic::panic_any; use std::rc::{Rc, Weak}; use ruin_runtime::queue_microtask; use crate::NodeId; type Job = Box; thread_local! { static CURRENT_REACTOR: RefCell> = const { RefCell::new(Weak::new()) }; } /// Returns the current thread's default reactor. /// /// The first call on a thread creates a new reactor for that thread and caches it in thread-local /// storage. pub fn current() -> Reactor { Reactor::current() } #[allow(dead_code)] pub(crate) trait ObserverHook { fn notify(&self); } /// Panic payload emitted when the reactor detects a dependency cycle. #[derive(Clone, Debug, Eq, PartialEq)] pub struct ReactCycleError { cycle: Vec, } impl ReactCycleError { fn new(cycle: Vec) -> Self { Self { cycle } } /// Returns the cycle path that was detected. pub fn cycle(&self) -> &[NodeId] { &self.cycle } } impl fmt::Display for ReactCycleError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "reactive cycle detected: ")?; for (index, node) in self.cycle.iter().enumerate() { if index != 0 { write!(f, " -> ")?; } write!(f, "{node}")?; } Ok(()) } } impl Error for ReactCycleError {} /// Single-threaded coordinator for a reactive graph. /// /// A reactor tracks dependency edges between reactive nodes, manages the currently executing /// observer stack, and schedules deferred jobs onto the runtime microtask queue. #[derive(Clone)] pub struct Reactor { inner: Rc, } impl Reactor { /// Creates a new empty reactor. pub fn new() -> Self { Self { inner: Rc::new(ReactorInner::new()), } } /// Returns the current thread's default reactor. pub fn current() -> Self { CURRENT_REACTOR.with(|slot| { if let Some(inner) = slot.borrow().upgrade() { return Self { inner }; } let reactor = Self::new(); *slot.borrow_mut() = Rc::downgrade(&reactor.inner); reactor }) } /// Runs `f` in the dependency-tracking scope of `observer`. /// /// Existing dependencies for `observer` are cleared before `f` runs. Any calls to /// [`observe`](Self::observe) made while `f` executes will become the observer's new /// dependencies. pub fn run_in_context(&self, observer: NodeId, f: impl FnOnce() -> T) -> T { self.clear_observer_dependencies(observer); self.inner.stack.borrow_mut().push(observer); let inserted = self.inner.active_computations.borrow_mut().insert(observer); debug_assert!(inserted, "observer should not already be active"); struct Guard<'a> { inner: &'a ReactorInner, } impl Drop for Guard<'_> { fn drop(&mut self) { let popped = self.inner.stack.borrow_mut().pop(); debug_assert!(popped.is_some(), "reactor observer stack underflow"); if let Some(node) = popped { let removed = self.inner.active_computations.borrow_mut().remove(&node); debug_assert!(removed, "observer should have been active"); } } } let _guard = Guard { inner: &self.inner }; f() } /// Records that the currently executing observer depends on `observable`. /// /// # Panics /// /// Panics with [`ReactCycleError`] if `observable` is already being computed in the current /// dependency stack. pub fn observe(&self, observable: NodeId) { if self .inner .active_computations .borrow() .contains(&observable) { let stack = self.inner.stack.borrow(); let start = stack .iter() .position(|node| *node == observable) .expect("active computation should appear in observer stack"); let mut cycle = stack[start..].to_vec(); cycle.push(observable); panic_any(ReactCycleError::new(cycle)); } let current = self.inner.stack.borrow().last().copied(); let Some(observer) = current else { return; }; self.inner .dependencies .borrow_mut() .entry(observer) .or_default() .insert(observable); self.inner .dependents .borrow_mut() .entry(observable) .or_default() .insert(observer); } /// Notifies dependents of `observable`. pub fn trigger(&self, observable: NodeId) { let dependents = self .inner .dependents .borrow() .get(&observable) .map(|nodes| nodes.iter().copied().collect::>()) .unwrap_or_default(); for dependent in dependents { let hook = self .inner .observers .borrow() .get(&dependent) .cloned() .and_then(|weak| weak.upgrade()); if let Some(hook) = hook { hook.notify(); } else { self.inner.observers.borrow_mut().remove(&dependent); } } } /// Disposes all graph bookkeeping for `node`. pub fn dispose(&self, node: NodeId) { self.clear_observer_dependencies(node); let incoming = self .inner .dependents .borrow_mut() .remove(&node) .map(|nodes| nodes.into_iter().collect::>()) .unwrap_or_default(); for observer in incoming { let mut dependencies = self.inner.dependencies.borrow_mut(); if let Some(observed) = dependencies.get_mut(&observer) { observed.remove(&node); if observed.is_empty() { dependencies.remove(&observer); } } } self.inner.observers.borrow_mut().remove(&node); } /// Schedules a job to run in the reactor's microtask-backed job queue. pub fn schedule(&self, job: impl FnOnce() + 'static) { self.inner .pending_jobs .borrow_mut() .push_back(Box::new(job)); self.inner.ensure_flush_scheduled(); } #[allow(dead_code)] pub(crate) fn allocate_node(&self) -> NodeId { let raw = self.inner.next_node.get(); self.inner.next_node.set(raw.wrapping_add(1)); NodeId::new(raw) } #[allow(dead_code)] pub(crate) fn register_observer(&self, id: NodeId, observer: Rc) { self.inner .observers .borrow_mut() .insert(id, Rc::downgrade(&observer)); } #[allow(dead_code)] pub(crate) fn unregister_observer(&self, id: NodeId) { self.inner.observers.borrow_mut().remove(&id); } fn clear_observer_dependencies(&self, observer: NodeId) { let observed = self .inner .dependencies .borrow_mut() .remove(&observer) .map(|nodes| nodes.into_iter().collect::>()) .unwrap_or_default(); for observable in observed { let mut dependents = self.inner.dependents.borrow_mut(); if let Some(observers) = dependents.get_mut(&observable) { observers.remove(&observer); if observers.is_empty() { dependents.remove(&observable); } } } } } impl Default for Reactor { fn default() -> Self { Self::new() } } impl fmt::Debug for Reactor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Reactor") .field("ptr", &Rc::as_ptr(&self.inner)) .finish() } } struct ReactorInner { #[allow(dead_code)] next_node: Cell, dependencies: RefCell>>, dependents: RefCell>>, observers: RefCell>>, stack: RefCell>, active_computations: RefCell>, pending_jobs: RefCell>, flush_scheduled: Cell, } impl ReactorInner { fn new() -> Self { Self { next_node: Cell::new(1), dependencies: RefCell::new(BTreeMap::new()), dependents: RefCell::new(BTreeMap::new()), observers: RefCell::new(BTreeMap::new()), stack: RefCell::new(Vec::new()), active_computations: RefCell::new(BTreeSet::new()), pending_jobs: RefCell::new(VecDeque::new()), flush_scheduled: Cell::new(false), } } fn ensure_flush_scheduled(self: &Rc) { if self.flush_scheduled.replace(true) { return; } let reactor = Rc::clone(self); queue_microtask(move || { reactor.flush_jobs(); }); } fn flush_jobs(self: Rc) { loop { let job = self.pending_jobs.borrow_mut().pop_front(); let Some(job) = job else { break; }; job(); } self.flush_scheduled.set(false); if !self.pending_jobs.borrow().is_empty() { self.ensure_flush_scheduled(); } } } #[cfg(test)] mod tests { use std::cell::Cell; use std::panic::{AssertUnwindSafe, catch_unwind}; use std::rc::Rc; use ruin_runtime::{queue_task, run}; use super::{ReactCycleError, Reactor, current}; #[test] fn current_reactor_is_thread_local_singleton() { let one = current(); let two = current(); assert!(Rc::ptr_eq(&one.inner, &two.inner)); } #[test] fn observe_records_dependency_edges() { let reactor = Reactor::new(); let observer = reactor.allocate_node(); let observable = reactor.allocate_node(); reactor.run_in_context(observer, || reactor.observe(observable)); assert_eq!( reactor.inner.dependencies.borrow().get(&observer), Some(&[observable].into_iter().collect()) ); assert_eq!( reactor.inner.dependents.borrow().get(&observable), Some(&[observer].into_iter().collect()) ); } #[test] fn cycle_detection_panics_with_structured_error() { let reactor = Reactor::new(); let a = reactor.allocate_node(); let b = reactor.allocate_node(); let panic = catch_unwind(AssertUnwindSafe(|| { reactor.run_in_context(a, || { reactor.observe(b); reactor.run_in_context(b, || reactor.observe(a)); }); })) .expect_err("cycle should panic"); let error = panic .downcast::() .expect("panic payload should be ReactCycleError"); assert_eq!(error.cycle(), &[a, b, a]); } #[test] fn scheduled_jobs_flush_on_runtime_microtask_queue() { let observed = Rc::new(Cell::new(0usize)); queue_task({ let observed = Rc::clone(&observed); move || { let reactor = Reactor::new(); reactor.schedule({ let observed = Rc::clone(&observed); move || observed.set(1) }); assert_eq!(observed.get(), 0); } }); run(); assert_eq!(observed.get(), 1); } }