Phase 3: event/effect

This commit is contained in:
2026-03-20 13:27:38 -04:00
parent 9143a79a64
commit 15813ebb6c
4 changed files with 682 additions and 4 deletions

View File

@@ -0,0 +1,180 @@
use std::cell::{Cell, RefCell};
use std::rc::{Rc, Weak};
use crate::reactor::ObserverHook;
use crate::{NodeId, Reactor, current};
/// Creates an effect in the current thread's default reactor.
pub fn effect(f: impl Fn() + 'static) -> EffectHandle {
current().effect(f)
}
/// Creates an effect associated with `reactor`.
pub fn effect_in(reactor: &Reactor, f: impl Fn() + 'static) -> EffectHandle {
reactor.effect(f)
}
/// Disposable handle for a reactive effect.
#[derive(Clone)]
pub struct EffectHandle {
inner: Rc<EffectInner>,
}
impl Reactor {
/// Creates an effect associated with this reactor.
///
/// The effect is scheduled immediately and then re-scheduled whenever one of its dependencies
/// changes.
pub fn effect(&self, f: impl Fn() + 'static) -> EffectHandle {
EffectHandle::new(self.clone(), f)
}
}
impl EffectHandle {
fn new(reactor: Reactor, effect: impl Fn() + 'static) -> Self {
let id = reactor.allocate_node();
let inner = Rc::new(EffectInner {
reactor: reactor.clone(),
id,
effect: Box::new(effect),
scheduled: Cell::new(false),
disposed: Cell::new(false),
self_ref: RefCell::new(Weak::new()),
});
*inner.self_ref.borrow_mut() = Rc::downgrade(&inner);
let observer: Rc<dyn ObserverHook> = inner.clone();
reactor.register_observer(id, observer);
inner.schedule();
Self { inner }
}
/// Disposes the effect immediately.
pub fn dispose(&self) {
self.inner.dispose();
}
/// Returns `true` if the effect has been disposed.
pub fn is_disposed(&self) -> bool {
self.inner.disposed.get()
}
}
struct EffectInner {
reactor: Reactor,
id: NodeId,
effect: Box<dyn Fn() + 'static>,
scheduled: Cell<bool>,
disposed: Cell<bool>,
self_ref: RefCell<Weak<EffectInner>>,
}
impl EffectInner {
fn schedule(&self) {
if self.disposed.get() || self.scheduled.replace(true) {
return;
}
let weak = self.self_ref.borrow().clone();
let reactor = self.reactor.clone();
reactor.schedule(move || {
let Some(inner) = weak.upgrade() else {
return;
};
if inner.disposed.get() {
inner.scheduled.set(false);
return;
}
inner.scheduled.set(false);
inner.reactor.run_in_context(inner.id, || (inner.effect)());
});
}
fn dispose(&self) {
if self.disposed.replace(true) {
return;
}
self.reactor.unregister_observer(self.id);
self.reactor.dispose(self.id);
}
}
impl ObserverHook for EffectInner {
fn notify(&self) {
self.schedule();
}
}
impl Drop for EffectInner {
fn drop(&mut self) {
self.dispose();
}
}
#[cfg(test)]
mod tests {
use std::cell::{Cell as Counter, RefCell};
use std::rc::Rc;
use ruin_runtime::{queue_task, run};
use crate::{Reactor, cell_in};
use super::EffectHandle;
#[test]
fn effects_flush_through_microtasks_and_coalesce() {
let seen = Rc::new(RefCell::new(Vec::new()));
let handle_slot = Rc::new(RefCell::new(None::<EffectHandle>));
queue_task({
let seen = Rc::clone(&seen);
let handle_slot = Rc::clone(&handle_slot);
move || {
let reactor = Reactor::new();
let source = cell_in(&reactor, 0usize);
let effect = reactor.effect({
let seen = Rc::clone(&seen);
let source = source.clone();
move || seen.borrow_mut().push(source.get())
});
source.set(1);
source.set(2);
assert!(seen.borrow().is_empty(), "effect should not run inline");
*handle_slot.borrow_mut() = Some(effect);
}
});
run();
assert_eq!(&*seen.borrow(), &[2]);
let reruns = Rc::new(Counter::new(0usize));
queue_task({
let reruns = Rc::clone(&reruns);
let seen = Rc::clone(&seen);
let handle_slot = Rc::clone(&handle_slot);
move || {
let reactor = Reactor::new();
let source = cell_in(&reactor, 2usize);
let effect = reactor.effect({
let reruns = Rc::clone(&reruns);
let seen = Rc::clone(&seen);
let source = source.clone();
move || {
reruns.set(reruns.get() + 1);
seen.borrow_mut().push(source.get());
}
});
source.set(3);
source.set(4);
*handle_slot.borrow_mut() = Some(effect);
}
});
run();
assert_eq!(reruns.get(), 1);
}
}

236
lib/reactivity/src/event.rs Normal file
View File

@@ -0,0 +1,236 @@
use std::cell::{Cell, RefCell};
use std::collections::BTreeMap;
use std::rc::Rc;
use crate::{NodeId, Reactor, current};
type SubscriberFn<T> = dyn Fn(&T) + 'static;
/// Creates an event in the current thread's default reactor.
pub fn event<T: 'static>() -> Event<T> {
current().event()
}
/// Creates an event associated with `reactor`.
pub fn event_in<T: 'static>(reactor: &Reactor) -> Event<T> {
reactor.event()
}
/// Creates a reactive draining subscription for `event` in the current reactor.
pub fn on<T: Clone + 'static>(event: &Event<T>, handler: impl Fn(&T) + 'static) -> Subscription {
current().on(event, handler)
}
/// Creates a reactive draining subscription for `event` associated with `reactor`.
pub fn on_in<T: Clone + 'static>(
reactor: &Reactor,
event: &Event<T>,
handler: impl Fn(&T) + 'static,
) -> Subscription {
reactor.on(event, handler)
}
/// Push-style event source.
#[derive(Clone)]
pub struct Event<T> {
inner: Rc<EventInner<T>>,
}
/// Disposable subscription handle.
#[derive(Clone)]
pub struct Subscription {
inner: Rc<SubscriptionInner>,
}
impl Reactor {
/// Creates an event source associated with this reactor.
pub fn event<T: 'static>(&self) -> Event<T> {
Event::new(self.clone())
}
/// Creates a reactive draining subscription for `event`.
pub fn on<T: Clone + 'static>(
&self,
event: &Event<T>,
handler: impl Fn(&T) + 'static,
) -> Subscription {
let queue = Rc::new(RefCell::new(Vec::new()));
let direct = event.subscribe({
let queue = Rc::clone(&queue);
move |value| queue.borrow_mut().push(value.clone())
});
let effect = self.effect({
let event = event.clone();
let queue = Rc::clone(&queue);
move || {
event.observe();
let drained = {
let mut queued = queue.borrow_mut();
queued.drain(..).collect::<Vec<_>>()
};
for value in &drained {
handler(value);
}
}
});
Subscription::new(move || {
direct.unsubscribe();
effect.dispose();
})
}
}
impl<T: 'static> Event<T> {
fn new(reactor: Reactor) -> Self {
let id = reactor.allocate_node();
Self {
inner: Rc::new(EventInner {
reactor,
id,
next_subscriber: Cell::new(1),
subscribers: RefCell::new(BTreeMap::new()),
}),
}
}
fn observe(&self) {
self.inner.reactor.observe(self.inner.id);
}
/// Emits a value to immediate subscribers, then notifies reactive dependents.
pub fn emit(&self, value: T) {
let subscribers = self
.inner
.subscribers
.borrow()
.values()
.cloned()
.collect::<Vec<_>>();
for subscriber in subscribers {
subscriber(&value);
}
self.inner.reactor.trigger(self.inner.id);
}
/// Adds an immediate subscriber to this event.
pub fn subscribe(&self, handler: impl Fn(&T) + 'static) -> Subscription {
let id = self.inner.next_subscriber.get();
self.inner.next_subscriber.set(id.wrapping_add(1));
self.inner
.subscribers
.borrow_mut()
.insert(id, Rc::new(handler) as Rc<SubscriberFn<T>>);
let inner = Rc::clone(&self.inner);
Subscription::new(move || {
inner.subscribers.borrow_mut().remove(&id);
})
}
}
impl<T> Drop for EventInner<T> {
fn drop(&mut self) {
self.reactor.dispose(self.id);
}
}
impl Subscription {
fn new(cancel: impl Fn() + 'static) -> Self {
Self {
inner: Rc::new(SubscriptionInner {
active: Cell::new(true),
cancel: Box::new(cancel),
}),
}
}
/// Cancels the subscription immediately.
pub fn unsubscribe(&self) {
self.inner.unsubscribe();
}
/// Returns `true` if the subscription is still active.
pub fn is_active(&self) -> bool {
self.inner.active.get()
}
}
struct EventInner<T> {
reactor: Reactor,
id: NodeId,
next_subscriber: Cell<usize>,
subscribers: RefCell<BTreeMap<usize, Rc<SubscriberFn<T>>>>,
}
struct SubscriptionInner {
active: Cell<bool>,
cancel: Box<dyn Fn() + 'static>,
}
impl SubscriptionInner {
fn unsubscribe(&self) {
if !self.active.replace(false) {
return;
}
(self.cancel)();
}
}
impl Drop for SubscriptionInner {
fn drop(&mut self) {
self.unsubscribe();
}
}
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::rc::Rc;
use ruin_runtime::{queue_task, run};
use crate::{Reactor, event_in};
use super::{Subscription, on_in};
#[test]
fn emit_delivers_immediately_and_on_drains_reactively() {
let immediate = Rc::new(RefCell::new(Vec::new()));
let reactive = Rc::new(RefCell::new(Vec::new()));
let keep_alive = Rc::new(RefCell::new(Vec::<Subscription>::new()));
queue_task({
let immediate = Rc::clone(&immediate);
let reactive = Rc::clone(&reactive);
let keep_alive = Rc::clone(&keep_alive);
move || {
let reactor = Reactor::new();
let event = event_in::<usize>(&reactor);
let direct = event.subscribe({
let immediate = Rc::clone(&immediate);
move |value| immediate.borrow_mut().push(*value)
});
let draining = on_in(&reactor, &event, {
let reactive = Rc::clone(&reactive);
move |value| reactive.borrow_mut().push(*value)
});
event.emit(1);
event.emit(2);
assert_eq!(&*immediate.borrow(), &[1, 2]);
assert!(
reactive.borrow().is_empty(),
"reactive drain should be deferred"
);
keep_alive.borrow_mut().extend([direct, draining]);
}
});
run();
assert_eq!(&*reactive.borrow(), &[1, 2]);
}
}

View File

@@ -5,11 +5,15 @@
//! from the edges by updating state or emitting events. //! from the edges by updating state or emitting events.
mod cell; mod cell;
mod effect;
mod event;
mod id; mod id;
mod reactor; mod reactor;
mod thunk; mod thunk;
pub use cell::{Cell, cell, cell_in}; pub use cell::{Cell, cell, cell_in};
pub use effect::{EffectHandle, effect, effect_in};
pub use event::{Event, Subscription, event, event_in, on, on_in};
pub use id::NodeId; pub use id::NodeId;
pub use reactor::{ReactCycleError, Reactor, current}; pub use reactor::{ReactCycleError, Reactor, current};
pub use thunk::{Thunk, thunk, thunk_in}; pub use thunk::{Memo, Thunk, memo, memo_by, memo_by_in, memo_in, thunk, thunk_in};

View File

@@ -4,6 +4,9 @@ use std::rc::Rc;
use crate::reactor::ObserverHook; use crate::reactor::ObserverHook;
use crate::{NodeId, Reactor, current}; use crate::{NodeId, Reactor, current};
type ComputeFn<T> = dyn Fn() -> T + 'static;
type EqualsFn<T> = dyn Fn(&T, &T) -> bool + 'static;
/// Creates a [`Thunk`] in the current thread's default reactor. /// Creates a [`Thunk`] in the current thread's default reactor.
pub fn thunk<T: 'static>(compute: impl Fn() -> T + 'static) -> Thunk<T> { pub fn thunk<T: 'static>(compute: impl Fn() -> T + 'static) -> Thunk<T> {
current().thunk(compute) current().thunk(compute)
@@ -14,17 +17,67 @@ pub fn thunk_in<T: 'static>(reactor: &Reactor, compute: impl Fn() -> T + 'static
reactor.thunk(compute) reactor.thunk(compute)
} }
/// Creates an equality-aware memo in the current thread's default reactor.
pub fn memo<T: PartialEq + 'static>(compute: impl Fn() -> T + 'static) -> Memo<T> {
current().memo(compute)
}
/// Creates an equality-aware memo associated with `reactor`.
pub fn memo_in<T: PartialEq + 'static>(
reactor: &Reactor,
compute: impl Fn() -> T + 'static,
) -> Memo<T> {
reactor.memo(compute)
}
/// Creates a comparator-aware memo in the current thread's default reactor.
pub fn memo_by<T: 'static>(
equals: impl Fn(&T, &T) -> bool + 'static,
compute: impl Fn() -> T + 'static,
) -> Memo<T> {
current().memo_by(equals, compute)
}
/// Creates a comparator-aware memo associated with `reactor`.
pub fn memo_by_in<T: 'static>(
reactor: &Reactor,
equals: impl Fn(&T, &T) -> bool + 'static,
compute: impl Fn() -> T + 'static,
) -> Memo<T> {
reactor.memo_by(equals, compute)
}
/// Lazy computed node in the reactive graph. /// Lazy computed node in the reactive graph.
#[derive(Clone)] #[derive(Clone)]
pub struct Thunk<T> { pub struct Thunk<T> {
inner: Rc<ThunkInner<T>>, inner: Rc<ThunkInner<T>>,
} }
/// Equality/comparator-aware computed node.
#[derive(Clone)]
pub struct Memo<T> {
inner: Rc<MemoInner<T>>,
}
impl Reactor { impl Reactor {
/// Creates a lazy computed thunk associated with this reactor. /// Creates a lazy computed thunk associated with this reactor.
pub fn thunk<T: 'static>(&self, compute: impl Fn() -> T + 'static) -> Thunk<T> { pub fn thunk<T: 'static>(&self, compute: impl Fn() -> T + 'static) -> Thunk<T> {
Thunk::new(self.clone(), compute) Thunk::new(self.clone(), compute)
} }
/// Creates an equality-aware memo associated with this reactor.
pub fn memo<T: PartialEq + 'static>(&self, compute: impl Fn() -> T + 'static) -> Memo<T> {
self.memo_by(|left, right| left == right, compute)
}
/// Creates a comparator-aware memo associated with this reactor.
pub fn memo_by<T: 'static>(
&self,
equals: impl Fn(&T, &T) -> bool + 'static,
compute: impl Fn() -> T + 'static,
) -> Memo<T> {
Memo::new(self.clone(), equals, compute)
}
} }
impl<T: 'static> Thunk<T> { impl<T: 'static> Thunk<T> {
@@ -61,10 +114,49 @@ impl<T: Clone + 'static> Thunk<T> {
} }
} }
impl<T: 'static> Memo<T> {
fn new(
reactor: Reactor,
equals: impl Fn(&T, &T) -> bool + 'static,
compute: impl Fn() -> T + 'static,
) -> Self {
let id = reactor.allocate_node();
let inner = Rc::new(MemoInner {
reactor: reactor.clone(),
id,
compute: Box::new(compute),
equals: Box::new(equals),
value: RefCell::new(None),
dirty: Cell::new(true),
});
let observer: Rc<dyn ObserverHook> = inner.clone();
reactor.register_observer(id, observer);
Self { inner }
}
/// Runs `f` with a shared reference to the current computed value.
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
self.inner.reactor.observe(self.inner.id);
self.inner.ensure_value();
let value = self.inner.value.borrow();
f(value
.as_ref()
.expect("memo should have a cached value after recomputing"))
}
}
impl<T: Clone + 'static> Memo<T> {
/// Clones and returns the current computed value.
pub fn get(&self) -> T {
self.with(T::clone)
}
}
struct ThunkInner<T> { struct ThunkInner<T> {
reactor: Reactor, reactor: Reactor,
id: NodeId, id: NodeId,
compute: Box<dyn Fn() -> T + 'static>, compute: Box<ComputeFn<T>>,
value: RefCell<Option<T>>, value: RefCell<Option<T>>,
dirty: Cell<bool>, dirty: Cell<bool>,
} }
@@ -81,6 +173,36 @@ impl<T> ThunkInner<T> {
} }
} }
struct MemoInner<T> {
reactor: Reactor,
id: NodeId,
compute: Box<ComputeFn<T>>,
equals: Box<EqualsFn<T>>,
value: RefCell<Option<T>>,
dirty: Cell<bool>,
}
impl<T> MemoInner<T> {
fn ensure_value(&self) {
if !self.dirty.get() {
return;
}
let _ = self.recompute();
}
fn recompute(&self) -> bool {
let next = self.reactor.run_in_context(self.id, || (self.compute)());
let mut value = self.value.borrow_mut();
let changed = match value.as_ref() {
Some(current) => !(self.equals)(current, &next),
None => true,
};
*value = Some(next);
self.dirty.set(false);
changed
}
}
impl<T: 'static> ObserverHook for ThunkInner<T> { impl<T: 'static> ObserverHook for ThunkInner<T> {
fn notify(&self) { fn notify(&self) {
if self.dirty.replace(true) { if self.dirty.replace(true) {
@@ -99,12 +221,37 @@ impl<T> Drop for ThunkInner<T> {
} }
} }
impl<T: 'static> ObserverHook for MemoInner<T> {
fn notify(&self) {
if self.value.borrow().is_none() {
self.dirty.set(true);
return;
}
self.dirty.set(true);
if self.recompute() {
self.reactor.trigger(self.id);
}
}
}
impl<T> Drop for MemoInner<T> {
fn drop(&mut self) {
self.reactor.unregister_observer(self.id);
self.reactor.dispose(self.id);
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::cell::Cell as Counter; use std::cell::{Cell as Counter, RefCell};
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::rc::Rc; use std::rc::Rc;
use crate::{Reactor, cell_in, thunk_in}; use ruin_runtime::{queue_task, run};
use super::Thunk;
use crate::{Cell, EffectHandle, Memo, Reactor, cell_in, memo_by_in, memo_in, thunk_in};
#[test] #[test]
fn thunk_caches_until_invalidated() { fn thunk_caches_until_invalidated() {
@@ -170,4 +317,115 @@ mod tests {
assert_eq!(double_count.get(), 2); assert_eq!(double_count.get(), 2);
assert_eq!(label_count.get(), 3); assert_eq!(label_count.get(), 3);
} }
#[test]
fn cycle_detection_reports_two_thunks_reading_each_other() {
let reactor = Reactor::new();
let left_slot = Rc::new(RefCell::new(None::<Thunk<i32>>));
let right_slot = Rc::new(RefCell::new(None::<Thunk<i32>>));
let left = thunk_in(&reactor, {
let right_slot = Rc::clone(&right_slot);
move || {
right_slot
.borrow()
.as_ref()
.expect("right thunk should exist")
.get()
+ 1
}
});
let right = thunk_in(&reactor, {
let left_slot = Rc::clone(&left_slot);
move || {
left_slot
.borrow()
.as_ref()
.expect("left thunk should exist")
.get()
+ 1
}
});
*left_slot.borrow_mut() = Some(left.clone());
*right_slot.borrow_mut() = Some(right.clone());
let panic = catch_unwind(AssertUnwindSafe(|| {
let _ = left.get();
}))
.expect_err("mutually recursive thunks should panic");
let error = panic
.downcast::<crate::ReactCycleError>()
.expect("panic payload should be ReactCycleError");
assert_eq!(error.cycle().len(), 3);
}
#[test]
fn memo_suppresses_unchanged_results_and_memo_by_uses_custom_comparator() {
let reactive_seen = Rc::new(RefCell::new(Vec::new()));
let effect_slot = Rc::new(RefCell::new(None::<EffectHandle>));
let source_slot = Rc::new(RefCell::new(None::<Cell<usize>>));
let parity_slot = Rc::new(RefCell::new(None::<Memo<usize>>));
let bucket_slot = Rc::new(RefCell::new(None::<Memo<usize>>));
queue_task({
let reactive_seen = Rc::clone(&reactive_seen);
let effect_slot = Rc::clone(&effect_slot);
let source_slot = Rc::clone(&source_slot);
let parity_slot = Rc::clone(&parity_slot);
let bucket_slot = Rc::clone(&bucket_slot);
move || {
let reactor = Reactor::new();
let source = cell_in(&reactor, 1usize);
let parity = memo_in(&reactor, {
let source = source.clone();
move || source.get() % 2
});
let bucket = memo_by_in(
&reactor,
|left: &usize, right: &usize| left / 10 == right / 10,
{
let source = source.clone();
move || source.get()
},
);
let effect = reactor.effect({
let reactive_seen = Rc::clone(&reactive_seen);
let parity = parity.clone();
let bucket = bucket.clone();
move || {
reactive_seen
.borrow_mut()
.push((parity.get(), bucket.get()))
}
});
*source_slot.borrow_mut() = Some(source);
*parity_slot.borrow_mut() = Some(parity);
*bucket_slot.borrow_mut() = Some(bucket);
*effect_slot.borrow_mut() = Some(effect);
}
});
run();
assert_eq!(&*reactive_seen.borrow(), &[(1, 1)]);
queue_task({
let source_slot = Rc::clone(&source_slot);
move || {
let source = source_slot
.borrow()
.as_ref()
.expect("source cell should still be alive")
.clone();
source.set(3);
source.set(9);
source.set(10);
}
});
run();
assert_eq!(&*reactive_seen.borrow(), &[(1, 1), (0, 10)]);
}
} }