From 15813ebb6c302a0a287c080eac341a660e7be3eb Mon Sep 17 00:00:00 2001 From: Will Temple Date: Fri, 20 Mar 2026 13:27:38 -0400 Subject: [PATCH] Phase 3: event/effect --- lib/reactivity/src/effect.rs | 180 ++++++++++++++++++++++++ lib/reactivity/src/event.rs | 236 +++++++++++++++++++++++++++++++ lib/reactivity/src/lib.rs | 6 +- lib/reactivity/src/thunk.rs | 264 ++++++++++++++++++++++++++++++++++- 4 files changed, 682 insertions(+), 4 deletions(-) create mode 100644 lib/reactivity/src/effect.rs create mode 100644 lib/reactivity/src/event.rs diff --git a/lib/reactivity/src/effect.rs b/lib/reactivity/src/effect.rs new file mode 100644 index 0000000..9157cc2 --- /dev/null +++ b/lib/reactivity/src/effect.rs @@ -0,0 +1,180 @@ +use std::cell::{Cell, RefCell}; +use std::rc::{Rc, Weak}; + +use crate::reactor::ObserverHook; +use crate::{NodeId, Reactor, current}; + +/// Creates an effect in the current thread's default reactor. +pub fn effect(f: impl Fn() + 'static) -> EffectHandle { + current().effect(f) +} + +/// Creates an effect associated with `reactor`. +pub fn effect_in(reactor: &Reactor, f: impl Fn() + 'static) -> EffectHandle { + reactor.effect(f) +} + +/// Disposable handle for a reactive effect. +#[derive(Clone)] +pub struct EffectHandle { + inner: Rc, +} + +impl Reactor { + /// Creates an effect associated with this reactor. + /// + /// The effect is scheduled immediately and then re-scheduled whenever one of its dependencies + /// changes. + pub fn effect(&self, f: impl Fn() + 'static) -> EffectHandle { + EffectHandle::new(self.clone(), f) + } +} + +impl EffectHandle { + fn new(reactor: Reactor, effect: impl Fn() + 'static) -> Self { + let id = reactor.allocate_node(); + let inner = Rc::new(EffectInner { + reactor: reactor.clone(), + id, + effect: Box::new(effect), + scheduled: Cell::new(false), + disposed: Cell::new(false), + self_ref: RefCell::new(Weak::new()), + }); + *inner.self_ref.borrow_mut() = Rc::downgrade(&inner); + + let observer: Rc = inner.clone(); + reactor.register_observer(id, observer); + inner.schedule(); + Self { inner } + } + + /// Disposes the effect immediately. + pub fn dispose(&self) { + self.inner.dispose(); + } + + /// Returns `true` if the effect has been disposed. + pub fn is_disposed(&self) -> bool { + self.inner.disposed.get() + } +} + +struct EffectInner { + reactor: Reactor, + id: NodeId, + effect: Box, + scheduled: Cell, + disposed: Cell, + self_ref: RefCell>, +} + +impl EffectInner { + fn schedule(&self) { + if self.disposed.get() || self.scheduled.replace(true) { + return; + } + + let weak = self.self_ref.borrow().clone(); + let reactor = self.reactor.clone(); + reactor.schedule(move || { + let Some(inner) = weak.upgrade() else { + return; + }; + if inner.disposed.get() { + inner.scheduled.set(false); + return; + } + + inner.scheduled.set(false); + inner.reactor.run_in_context(inner.id, || (inner.effect)()); + }); + } + + fn dispose(&self) { + if self.disposed.replace(true) { + return; + } + + self.reactor.unregister_observer(self.id); + self.reactor.dispose(self.id); + } +} + +impl ObserverHook for EffectInner { + fn notify(&self) { + self.schedule(); + } +} + +impl Drop for EffectInner { + fn drop(&mut self) { + self.dispose(); + } +} + +#[cfg(test)] +mod tests { + use std::cell::{Cell as Counter, RefCell}; + use std::rc::Rc; + + use ruin_runtime::{queue_task, run}; + + use crate::{Reactor, cell_in}; + + use super::EffectHandle; + + #[test] + fn effects_flush_through_microtasks_and_coalesce() { + let seen = Rc::new(RefCell::new(Vec::new())); + let handle_slot = Rc::new(RefCell::new(None::)); + + queue_task({ + let seen = Rc::clone(&seen); + let handle_slot = Rc::clone(&handle_slot); + move || { + let reactor = Reactor::new(); + let source = cell_in(&reactor, 0usize); + let effect = reactor.effect({ + let seen = Rc::clone(&seen); + let source = source.clone(); + move || seen.borrow_mut().push(source.get()) + }); + + source.set(1); + source.set(2); + assert!(seen.borrow().is_empty(), "effect should not run inline"); + + *handle_slot.borrow_mut() = Some(effect); + } + }); + + run(); + assert_eq!(&*seen.borrow(), &[2]); + + let reruns = Rc::new(Counter::new(0usize)); + queue_task({ + let reruns = Rc::clone(&reruns); + let seen = Rc::clone(&seen); + let handle_slot = Rc::clone(&handle_slot); + move || { + let reactor = Reactor::new(); + let source = cell_in(&reactor, 2usize); + let effect = reactor.effect({ + let reruns = Rc::clone(&reruns); + let seen = Rc::clone(&seen); + let source = source.clone(); + move || { + reruns.set(reruns.get() + 1); + seen.borrow_mut().push(source.get()); + } + }); + source.set(3); + source.set(4); + *handle_slot.borrow_mut() = Some(effect); + } + }); + run(); + assert_eq!(reruns.get(), 1); + } +} diff --git a/lib/reactivity/src/event.rs b/lib/reactivity/src/event.rs new file mode 100644 index 0000000..bc807dd --- /dev/null +++ b/lib/reactivity/src/event.rs @@ -0,0 +1,236 @@ +use std::cell::{Cell, RefCell}; +use std::collections::BTreeMap; +use std::rc::Rc; + +use crate::{NodeId, Reactor, current}; + +type SubscriberFn = dyn Fn(&T) + 'static; + +/// Creates an event in the current thread's default reactor. +pub fn event() -> Event { + current().event() +} + +/// Creates an event associated with `reactor`. +pub fn event_in(reactor: &Reactor) -> Event { + reactor.event() +} + +/// Creates a reactive draining subscription for `event` in the current reactor. +pub fn on(event: &Event, handler: impl Fn(&T) + 'static) -> Subscription { + current().on(event, handler) +} + +/// Creates a reactive draining subscription for `event` associated with `reactor`. +pub fn on_in( + reactor: &Reactor, + event: &Event, + handler: impl Fn(&T) + 'static, +) -> Subscription { + reactor.on(event, handler) +} + +/// Push-style event source. +#[derive(Clone)] +pub struct Event { + inner: Rc>, +} + +/// Disposable subscription handle. +#[derive(Clone)] +pub struct Subscription { + inner: Rc, +} + +impl Reactor { + /// Creates an event source associated with this reactor. + pub fn event(&self) -> Event { + Event::new(self.clone()) + } + + /// Creates a reactive draining subscription for `event`. + pub fn on( + &self, + event: &Event, + handler: impl Fn(&T) + 'static, + ) -> Subscription { + let queue = Rc::new(RefCell::new(Vec::new())); + let direct = event.subscribe({ + let queue = Rc::clone(&queue); + move |value| queue.borrow_mut().push(value.clone()) + }); + let effect = self.effect({ + let event = event.clone(); + let queue = Rc::clone(&queue); + move || { + event.observe(); + let drained = { + let mut queued = queue.borrow_mut(); + queued.drain(..).collect::>() + }; + for value in &drained { + handler(value); + } + } + }); + + Subscription::new(move || { + direct.unsubscribe(); + effect.dispose(); + }) + } +} + +impl Event { + fn new(reactor: Reactor) -> Self { + let id = reactor.allocate_node(); + Self { + inner: Rc::new(EventInner { + reactor, + id, + next_subscriber: Cell::new(1), + subscribers: RefCell::new(BTreeMap::new()), + }), + } + } + + fn observe(&self) { + self.inner.reactor.observe(self.inner.id); + } + + /// Emits a value to immediate subscribers, then notifies reactive dependents. + pub fn emit(&self, value: T) { + let subscribers = self + .inner + .subscribers + .borrow() + .values() + .cloned() + .collect::>(); + for subscriber in subscribers { + subscriber(&value); + } + self.inner.reactor.trigger(self.inner.id); + } + + /// Adds an immediate subscriber to this event. + pub fn subscribe(&self, handler: impl Fn(&T) + 'static) -> Subscription { + let id = self.inner.next_subscriber.get(); + self.inner.next_subscriber.set(id.wrapping_add(1)); + self.inner + .subscribers + .borrow_mut() + .insert(id, Rc::new(handler) as Rc>); + + let inner = Rc::clone(&self.inner); + Subscription::new(move || { + inner.subscribers.borrow_mut().remove(&id); + }) + } +} + +impl Drop for EventInner { + fn drop(&mut self) { + self.reactor.dispose(self.id); + } +} + +impl Subscription { + fn new(cancel: impl Fn() + 'static) -> Self { + Self { + inner: Rc::new(SubscriptionInner { + active: Cell::new(true), + cancel: Box::new(cancel), + }), + } + } + + /// Cancels the subscription immediately. + pub fn unsubscribe(&self) { + self.inner.unsubscribe(); + } + + /// Returns `true` if the subscription is still active. + pub fn is_active(&self) -> bool { + self.inner.active.get() + } +} + +struct EventInner { + reactor: Reactor, + id: NodeId, + next_subscriber: Cell, + subscribers: RefCell>>>, +} + +struct SubscriptionInner { + active: Cell, + cancel: Box, +} + +impl SubscriptionInner { + fn unsubscribe(&self) { + if !self.active.replace(false) { + return; + } + (self.cancel)(); + } +} + +impl Drop for SubscriptionInner { + fn drop(&mut self) { + self.unsubscribe(); + } +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use std::rc::Rc; + + use ruin_runtime::{queue_task, run}; + + use crate::{Reactor, event_in}; + + use super::{Subscription, on_in}; + + #[test] + fn emit_delivers_immediately_and_on_drains_reactively() { + let immediate = Rc::new(RefCell::new(Vec::new())); + let reactive = Rc::new(RefCell::new(Vec::new())); + let keep_alive = Rc::new(RefCell::new(Vec::::new())); + + queue_task({ + let immediate = Rc::clone(&immediate); + let reactive = Rc::clone(&reactive); + let keep_alive = Rc::clone(&keep_alive); + move || { + let reactor = Reactor::new(); + let event = event_in::(&reactor); + + let direct = event.subscribe({ + let immediate = Rc::clone(&immediate); + move |value| immediate.borrow_mut().push(*value) + }); + let draining = on_in(&reactor, &event, { + let reactive = Rc::clone(&reactive); + move |value| reactive.borrow_mut().push(*value) + }); + + event.emit(1); + event.emit(2); + + assert_eq!(&*immediate.borrow(), &[1, 2]); + assert!( + reactive.borrow().is_empty(), + "reactive drain should be deferred" + ); + + keep_alive.borrow_mut().extend([direct, draining]); + } + }); + + run(); + assert_eq!(&*reactive.borrow(), &[1, 2]); + } +} diff --git a/lib/reactivity/src/lib.rs b/lib/reactivity/src/lib.rs index df5872e..2fb708d 100644 --- a/lib/reactivity/src/lib.rs +++ b/lib/reactivity/src/lib.rs @@ -5,11 +5,15 @@ //! from the edges by updating state or emitting events. mod cell; +mod effect; +mod event; mod id; mod reactor; mod thunk; pub use cell::{Cell, cell, cell_in}; +pub use effect::{EffectHandle, effect, effect_in}; +pub use event::{Event, Subscription, event, event_in, on, on_in}; pub use id::NodeId; pub use reactor::{ReactCycleError, Reactor, current}; -pub use thunk::{Thunk, thunk, thunk_in}; +pub use thunk::{Memo, Thunk, memo, memo_by, memo_by_in, memo_in, thunk, thunk_in}; diff --git a/lib/reactivity/src/thunk.rs b/lib/reactivity/src/thunk.rs index 132ee33..ed75951 100644 --- a/lib/reactivity/src/thunk.rs +++ b/lib/reactivity/src/thunk.rs @@ -4,6 +4,9 @@ use std::rc::Rc; use crate::reactor::ObserverHook; use crate::{NodeId, Reactor, current}; +type ComputeFn = dyn Fn() -> T + 'static; +type EqualsFn = dyn Fn(&T, &T) -> bool + 'static; + /// Creates a [`Thunk`] in the current thread's default reactor. pub fn thunk(compute: impl Fn() -> T + 'static) -> Thunk { current().thunk(compute) @@ -14,17 +17,67 @@ pub fn thunk_in(reactor: &Reactor, compute: impl Fn() -> T + 'static reactor.thunk(compute) } +/// Creates an equality-aware memo in the current thread's default reactor. +pub fn memo(compute: impl Fn() -> T + 'static) -> Memo { + current().memo(compute) +} + +/// Creates an equality-aware memo associated with `reactor`. +pub fn memo_in( + reactor: &Reactor, + compute: impl Fn() -> T + 'static, +) -> Memo { + reactor.memo(compute) +} + +/// Creates a comparator-aware memo in the current thread's default reactor. +pub fn memo_by( + equals: impl Fn(&T, &T) -> bool + 'static, + compute: impl Fn() -> T + 'static, +) -> Memo { + current().memo_by(equals, compute) +} + +/// Creates a comparator-aware memo associated with `reactor`. +pub fn memo_by_in( + reactor: &Reactor, + equals: impl Fn(&T, &T) -> bool + 'static, + compute: impl Fn() -> T + 'static, +) -> Memo { + reactor.memo_by(equals, compute) +} + /// Lazy computed node in the reactive graph. #[derive(Clone)] pub struct Thunk { inner: Rc>, } +/// Equality/comparator-aware computed node. +#[derive(Clone)] +pub struct Memo { + inner: Rc>, +} + impl Reactor { /// Creates a lazy computed thunk associated with this reactor. pub fn thunk(&self, compute: impl Fn() -> T + 'static) -> Thunk { Thunk::new(self.clone(), compute) } + + /// Creates an equality-aware memo associated with this reactor. + pub fn memo(&self, compute: impl Fn() -> T + 'static) -> Memo { + self.memo_by(|left, right| left == right, compute) + } + + /// Creates a comparator-aware memo associated with this reactor. + pub fn memo_by( + &self, + equals: impl Fn(&T, &T) -> bool + 'static, + compute: impl Fn() -> T + 'static, + ) -> Memo { + Memo::new(self.clone(), equals, compute) + } } impl Thunk { @@ -61,10 +114,49 @@ impl Thunk { } } +impl Memo { + fn new( + reactor: Reactor, + equals: impl Fn(&T, &T) -> bool + 'static, + compute: impl Fn() -> T + 'static, + ) -> Self { + let id = reactor.allocate_node(); + let inner = Rc::new(MemoInner { + reactor: reactor.clone(), + id, + compute: Box::new(compute), + equals: Box::new(equals), + value: RefCell::new(None), + dirty: Cell::new(true), + }); + + let observer: Rc = inner.clone(); + reactor.register_observer(id, observer); + Self { inner } + } + + /// Runs `f` with a shared reference to the current computed value. + pub fn with(&self, f: impl FnOnce(&T) -> R) -> R { + self.inner.reactor.observe(self.inner.id); + self.inner.ensure_value(); + let value = self.inner.value.borrow(); + f(value + .as_ref() + .expect("memo should have a cached value after recomputing")) + } +} + +impl Memo { + /// Clones and returns the current computed value. + pub fn get(&self) -> T { + self.with(T::clone) + } +} + struct ThunkInner { reactor: Reactor, id: NodeId, - compute: Box T + 'static>, + compute: Box>, value: RefCell>, dirty: Cell, } @@ -81,6 +173,36 @@ impl ThunkInner { } } +struct MemoInner { + reactor: Reactor, + id: NodeId, + compute: Box>, + equals: Box>, + value: RefCell>, + dirty: Cell, +} + +impl MemoInner { + fn ensure_value(&self) { + if !self.dirty.get() { + return; + } + let _ = self.recompute(); + } + + fn recompute(&self) -> bool { + let next = self.reactor.run_in_context(self.id, || (self.compute)()); + let mut value = self.value.borrow_mut(); + let changed = match value.as_ref() { + Some(current) => !(self.equals)(current, &next), + None => true, + }; + *value = Some(next); + self.dirty.set(false); + changed + } +} + impl ObserverHook for ThunkInner { fn notify(&self) { if self.dirty.replace(true) { @@ -99,12 +221,37 @@ impl Drop for ThunkInner { } } +impl ObserverHook for MemoInner { + fn notify(&self) { + if self.value.borrow().is_none() { + self.dirty.set(true); + return; + } + + self.dirty.set(true); + if self.recompute() { + self.reactor.trigger(self.id); + } + } +} + +impl Drop for MemoInner { + fn drop(&mut self) { + self.reactor.unregister_observer(self.id); + self.reactor.dispose(self.id); + } +} + #[cfg(test)] mod tests { - use std::cell::Cell as Counter; + use std::cell::{Cell as Counter, RefCell}; + use std::panic::{AssertUnwindSafe, catch_unwind}; use std::rc::Rc; - use crate::{Reactor, cell_in, thunk_in}; + use ruin_runtime::{queue_task, run}; + + use super::Thunk; + use crate::{Cell, EffectHandle, Memo, Reactor, cell_in, memo_by_in, memo_in, thunk_in}; #[test] fn thunk_caches_until_invalidated() { @@ -170,4 +317,115 @@ mod tests { assert_eq!(double_count.get(), 2); assert_eq!(label_count.get(), 3); } + + #[test] + fn cycle_detection_reports_two_thunks_reading_each_other() { + let reactor = Reactor::new(); + let left_slot = Rc::new(RefCell::new(None::>)); + let right_slot = Rc::new(RefCell::new(None::>)); + + let left = thunk_in(&reactor, { + let right_slot = Rc::clone(&right_slot); + move || { + right_slot + .borrow() + .as_ref() + .expect("right thunk should exist") + .get() + + 1 + } + }); + let right = thunk_in(&reactor, { + let left_slot = Rc::clone(&left_slot); + move || { + left_slot + .borrow() + .as_ref() + .expect("left thunk should exist") + .get() + + 1 + } + }); + + *left_slot.borrow_mut() = Some(left.clone()); + *right_slot.borrow_mut() = Some(right.clone()); + + let panic = catch_unwind(AssertUnwindSafe(|| { + let _ = left.get(); + })) + .expect_err("mutually recursive thunks should panic"); + + let error = panic + .downcast::() + .expect("panic payload should be ReactCycleError"); + assert_eq!(error.cycle().len(), 3); + } + + #[test] + fn memo_suppresses_unchanged_results_and_memo_by_uses_custom_comparator() { + let reactive_seen = Rc::new(RefCell::new(Vec::new())); + let effect_slot = Rc::new(RefCell::new(None::)); + let source_slot = Rc::new(RefCell::new(None::>)); + let parity_slot = Rc::new(RefCell::new(None::>)); + let bucket_slot = Rc::new(RefCell::new(None::>)); + + queue_task({ + let reactive_seen = Rc::clone(&reactive_seen); + let effect_slot = Rc::clone(&effect_slot); + let source_slot = Rc::clone(&source_slot); + let parity_slot = Rc::clone(&parity_slot); + let bucket_slot = Rc::clone(&bucket_slot); + move || { + let reactor = Reactor::new(); + let source = cell_in(&reactor, 1usize); + let parity = memo_in(&reactor, { + let source = source.clone(); + move || source.get() % 2 + }); + let bucket = memo_by_in( + &reactor, + |left: &usize, right: &usize| left / 10 == right / 10, + { + let source = source.clone(); + move || source.get() + }, + ); + + let effect = reactor.effect({ + let reactive_seen = Rc::clone(&reactive_seen); + let parity = parity.clone(); + let bucket = bucket.clone(); + move || { + reactive_seen + .borrow_mut() + .push((parity.get(), bucket.get())) + } + }); + + *source_slot.borrow_mut() = Some(source); + *parity_slot.borrow_mut() = Some(parity); + *bucket_slot.borrow_mut() = Some(bucket); + *effect_slot.borrow_mut() = Some(effect); + } + }); + + run(); + assert_eq!(&*reactive_seen.borrow(), &[(1, 1)]); + + queue_task({ + let source_slot = Rc::clone(&source_slot); + move || { + let source = source_slot + .borrow() + .as_ref() + .expect("source cell should still be alive") + .clone(); + source.set(3); + source.set(9); + source.set(10); + } + }); + run(); + assert_eq!(&*reactive_seen.borrow(), &[(1, 1), (0, 10)]); + } }