From 9594c3871486e10c9d45d672bb34a605d59b2310 Mon Sep 17 00:00:00 2001 From: Will Temple Date: Thu, 19 Mar 2026 20:18:13 -0400 Subject: [PATCH] Reactivity phase 1 --- Cargo.lock | 3 + lib/reactivity/Cargo.toml | 7 + lib/reactivity/src/id.rs | 18 ++ lib/reactivity/src/lib.rs | 11 + lib/reactivity/src/reactor.rs | 407 ++++++++++++++++++++++++++++++++++ 5 files changed, 446 insertions(+) create mode 100644 lib/reactivity/Cargo.toml create mode 100644 lib/reactivity/src/id.rs create mode 100644 lib/reactivity/src/lib.rs create mode 100644 lib/reactivity/src/reactor.rs diff --git a/Cargo.lock b/Cargo.lock index 1d9cb73..86624cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,6 +154,9 @@ dependencies = [ [[package]] name = "ruin_reactivity" version = "0.1.0" +dependencies = [ + "ruin-runtime", +] [[package]] name = "smallvec" diff --git a/lib/reactivity/Cargo.toml b/lib/reactivity/Cargo.toml new file mode 100644 index 0000000..525518c --- /dev/null +++ b/lib/reactivity/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "ruin_reactivity" +version = "0.1.0" +edition = "2024" + +[dependencies] +ruin_runtime = { package = "ruin-runtime", path = "../runtime" } diff --git a/lib/reactivity/src/id.rs b/lib/reactivity/src/id.rs new file mode 100644 index 0000000..70d35ca --- /dev/null +++ b/lib/reactivity/src/id.rs @@ -0,0 +1,18 @@ +use core::fmt; + +/// Stable identifier for a node in a reactive graph. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct NodeId(pub(crate) u64); + +impl NodeId { + #[allow(dead_code)] + pub(crate) const fn new(raw: u64) -> Self { + Self(raw) + } +} + +impl fmt::Display for NodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/lib/reactivity/src/lib.rs b/lib/reactivity/src/lib.rs new file mode 100644 index 0000000..7688794 --- /dev/null +++ b/lib/reactivity/src/lib.rs @@ -0,0 +1,11 @@ +//! Fine-grained reactivity primitives for RUIN. +//! +//! This crate is intentionally layered on top of `ruin-runtime`. The reactive graph is +//! single-threaded and designed to live on a runtime-managed thread, while async work feeds it +//! from the edges by updating state or emitting events. + +mod id; +mod reactor; + +pub use id::NodeId; +pub use reactor::{ReactCycleError, Reactor, current}; diff --git a/lib/reactivity/src/reactor.rs b/lib/reactivity/src/reactor.rs new file mode 100644 index 0000000..92778ef --- /dev/null +++ b/lib/reactivity/src/reactor.rs @@ -0,0 +1,407 @@ +use std::cell::{Cell, RefCell}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::error::Error; +use std::fmt; +use std::panic::panic_any; +use std::rc::{Rc, Weak}; + +use ruin_runtime::queue_microtask; + +use crate::NodeId; + +type Job = Box; + +thread_local! { + static CURRENT_REACTOR: RefCell> = const { RefCell::new(Weak::new()) }; +} + +/// Returns the current thread's default reactor. +/// +/// The first call on a thread creates a new reactor for that thread and caches it in thread-local +/// storage. +pub fn current() -> Reactor { + Reactor::current() +} + +#[allow(dead_code)] +pub(crate) trait ObserverHook { + fn notify(&self); +} + +/// Panic payload emitted when the reactor detects a dependency cycle. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ReactCycleError { + cycle: Vec, +} + +impl ReactCycleError { + fn new(cycle: Vec) -> Self { + Self { cycle } + } + + /// Returns the cycle path that was detected. + pub fn cycle(&self) -> &[NodeId] { + &self.cycle + } +} + +impl fmt::Display for ReactCycleError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "reactive cycle detected: ")?; + for (index, node) in self.cycle.iter().enumerate() { + if index != 0 { + write!(f, " -> ")?; + } + write!(f, "{node}")?; + } + Ok(()) + } +} + +impl Error for ReactCycleError {} + +/// Single-threaded coordinator for a reactive graph. +/// +/// A reactor tracks dependency edges between reactive nodes, manages the currently executing +/// observer stack, and schedules deferred jobs onto the runtime microtask queue. +#[derive(Clone)] +pub struct Reactor { + inner: Rc, +} + +impl Reactor { + /// Creates a new empty reactor. + pub fn new() -> Self { + Self { + inner: Rc::new(ReactorInner::new()), + } + } + + /// Returns the current thread's default reactor. + pub fn current() -> Self { + CURRENT_REACTOR.with(|slot| { + if let Some(inner) = slot.borrow().upgrade() { + return Self { inner }; + } + + let reactor = Self::new(); + *slot.borrow_mut() = Rc::downgrade(&reactor.inner); + reactor + }) + } + + /// Runs `f` in the dependency-tracking scope of `observer`. + /// + /// Existing dependencies for `observer` are cleared before `f` runs. Any calls to + /// [`observe`](Self::observe) made while `f` executes will become the observer's new + /// dependencies. + pub fn run_in_context(&self, observer: NodeId, f: impl FnOnce() -> T) -> T { + self.clear_observer_dependencies(observer); + self.inner.stack.borrow_mut().push(observer); + let inserted = self.inner.active_computations.borrow_mut().insert(observer); + debug_assert!(inserted, "observer should not already be active"); + + struct Guard<'a> { + inner: &'a ReactorInner, + } + + impl Drop for Guard<'_> { + fn drop(&mut self) { + let popped = self.inner.stack.borrow_mut().pop(); + debug_assert!(popped.is_some(), "reactor observer stack underflow"); + if let Some(node) = popped { + let removed = self.inner.active_computations.borrow_mut().remove(&node); + debug_assert!(removed, "observer should have been active"); + } + } + } + + let _guard = Guard { inner: &self.inner }; + f() + } + + /// Records that the currently executing observer depends on `observable`. + /// + /// # Panics + /// + /// Panics with [`ReactCycleError`] if `observable` is already being computed in the current + /// dependency stack. + pub fn observe(&self, observable: NodeId) { + if self + .inner + .active_computations + .borrow() + .contains(&observable) + { + let stack = self.inner.stack.borrow(); + let start = stack + .iter() + .position(|node| *node == observable) + .expect("active computation should appear in observer stack"); + let mut cycle = stack[start..].to_vec(); + cycle.push(observable); + panic_any(ReactCycleError::new(cycle)); + } + + let current = self.inner.stack.borrow().last().copied(); + let Some(observer) = current else { + return; + }; + + self.inner + .dependencies + .borrow_mut() + .entry(observer) + .or_default() + .insert(observable); + self.inner + .dependents + .borrow_mut() + .entry(observable) + .or_default() + .insert(observer); + } + + /// Notifies dependents of `observable`. + pub fn trigger(&self, observable: NodeId) { + let dependents = self + .inner + .dependents + .borrow() + .get(&observable) + .map(|nodes| nodes.iter().copied().collect::>()) + .unwrap_or_default(); + + for dependent in dependents { + let hook = self + .inner + .observers + .borrow() + .get(&dependent) + .cloned() + .and_then(|weak| weak.upgrade()); + if let Some(hook) = hook { + hook.notify(); + } else { + self.inner.observers.borrow_mut().remove(&dependent); + } + } + } + + /// Disposes all graph bookkeeping for `node`. + pub fn dispose(&self, node: NodeId) { + self.clear_observer_dependencies(node); + + let incoming = self + .inner + .dependents + .borrow_mut() + .remove(&node) + .map(|nodes| nodes.into_iter().collect::>()) + .unwrap_or_default(); + for observer in incoming { + let mut dependencies = self.inner.dependencies.borrow_mut(); + if let Some(observed) = dependencies.get_mut(&observer) { + observed.remove(&node); + if observed.is_empty() { + dependencies.remove(&observer); + } + } + } + + self.inner.observers.borrow_mut().remove(&node); + } + + /// Schedules a job to run in the reactor's microtask-backed job queue. + pub fn schedule(&self, job: impl FnOnce() + 'static) { + self.inner + .pending_jobs + .borrow_mut() + .push_back(Box::new(job)); + self.inner.ensure_flush_scheduled(); + } + + #[allow(dead_code)] + pub(crate) fn allocate_node(&self) -> NodeId { + let raw = self.inner.next_node.get(); + self.inner.next_node.set(raw.wrapping_add(1)); + NodeId::new(raw) + } + + #[allow(dead_code)] + pub(crate) fn register_observer(&self, id: NodeId, observer: Rc) { + self.inner + .observers + .borrow_mut() + .insert(id, Rc::downgrade(&observer)); + } + + #[allow(dead_code)] + pub(crate) fn unregister_observer(&self, id: NodeId) { + self.inner.observers.borrow_mut().remove(&id); + } + + fn clear_observer_dependencies(&self, observer: NodeId) { + let observed = self + .inner + .dependencies + .borrow_mut() + .remove(&observer) + .map(|nodes| nodes.into_iter().collect::>()) + .unwrap_or_default(); + + for observable in observed { + let mut dependents = self.inner.dependents.borrow_mut(); + if let Some(observers) = dependents.get_mut(&observable) { + observers.remove(&observer); + if observers.is_empty() { + dependents.remove(&observable); + } + } + } + } +} + +impl Default for Reactor { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Reactor") + .field("ptr", &Rc::as_ptr(&self.inner)) + .finish() + } +} + +struct ReactorInner { + #[allow(dead_code)] + next_node: Cell, + dependencies: RefCell>>, + dependents: RefCell>>, + observers: RefCell>>, + stack: RefCell>, + active_computations: RefCell>, + pending_jobs: RefCell>, + flush_scheduled: Cell, +} + +impl ReactorInner { + fn new() -> Self { + Self { + next_node: Cell::new(1), + dependencies: RefCell::new(BTreeMap::new()), + dependents: RefCell::new(BTreeMap::new()), + observers: RefCell::new(BTreeMap::new()), + stack: RefCell::new(Vec::new()), + active_computations: RefCell::new(BTreeSet::new()), + pending_jobs: RefCell::new(VecDeque::new()), + flush_scheduled: Cell::new(false), + } + } + + fn ensure_flush_scheduled(self: &Rc) { + if self.flush_scheduled.replace(true) { + return; + } + + let reactor = Rc::clone(self); + queue_microtask(move || { + reactor.flush_jobs(); + }); + } + + fn flush_jobs(self: Rc) { + loop { + let job = self.pending_jobs.borrow_mut().pop_front(); + let Some(job) = job else { + break; + }; + job(); + } + + self.flush_scheduled.set(false); + if !self.pending_jobs.borrow().is_empty() { + self.ensure_flush_scheduled(); + } + } +} + +#[cfg(test)] +mod tests { + use std::cell::Cell; + use std::panic::{AssertUnwindSafe, catch_unwind}; + use std::rc::Rc; + + use ruin_runtime::{queue_task, run}; + + use super::{ReactCycleError, Reactor, current}; + + #[test] + fn current_reactor_is_thread_local_singleton() { + let one = current(); + let two = current(); + assert!(Rc::ptr_eq(&one.inner, &two.inner)); + } + + #[test] + fn observe_records_dependency_edges() { + let reactor = Reactor::new(); + let observer = reactor.allocate_node(); + let observable = reactor.allocate_node(); + + reactor.run_in_context(observer, || reactor.observe(observable)); + + assert_eq!( + reactor.inner.dependencies.borrow().get(&observer), + Some(&[observable].into_iter().collect()) + ); + assert_eq!( + reactor.inner.dependents.borrow().get(&observable), + Some(&[observer].into_iter().collect()) + ); + } + + #[test] + fn cycle_detection_panics_with_structured_error() { + let reactor = Reactor::new(); + let a = reactor.allocate_node(); + let b = reactor.allocate_node(); + + let panic = catch_unwind(AssertUnwindSafe(|| { + reactor.run_in_context(a, || { + reactor.observe(b); + reactor.run_in_context(b, || reactor.observe(a)); + }); + })) + .expect_err("cycle should panic"); + + let error = panic + .downcast::() + .expect("panic payload should be ReactCycleError"); + assert_eq!(error.cycle(), &[a, b, a]); + } + + #[test] + fn scheduled_jobs_flush_on_runtime_microtask_queue() { + let observed = Rc::new(Cell::new(0usize)); + + queue_task({ + let observed = Rc::clone(&observed); + move || { + let reactor = Reactor::new(); + reactor.schedule({ + let observed = Rc::clone(&observed); + move || observed.set(1) + }); + assert_eq!(observed.get(), 0); + } + }); + + run(); + + assert_eq!(observed.get(), 1); + } +}