reactivity phase 2: cell/thunk
This commit is contained in:
113
lib/reactivity/examples/cell_thunk_showcase.rs
Normal file
113
lib/reactivity/examples/cell_thunk_showcase.rs
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
use ruin_reactivity::{cell, thunk};
|
||||||
|
use std::cell::Cell as Counter;
|
||||||
|
use std::fmt;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
static START: OnceLock<Instant> = OnceLock::new();
|
||||||
|
static ACTUAL_ORDER: AtomicUsize = AtomicUsize::new(1);
|
||||||
|
|
||||||
|
macro_rules! log_event {
|
||||||
|
($expected:expr, $($arg:tt)*) => {{
|
||||||
|
log_event_impl($expected, format_args!($($arg)*));
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_event_impl(expected: usize, message: fmt::Arguments<'_>) {
|
||||||
|
let actual = ACTUAL_ORDER.fetch_add(1, Ordering::SeqCst);
|
||||||
|
let elapsed = START
|
||||||
|
.get()
|
||||||
|
.expect("showcase start time should be initialized")
|
||||||
|
.elapsed()
|
||||||
|
.as_millis();
|
||||||
|
println!(
|
||||||
|
"[actual {actual:02} | expected {expected:02} | +{elapsed:04}ms | ts {}] {message}",
|
||||||
|
unix_timestamp_millis(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unix_timestamp_millis() -> String {
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("system clock should be after the Unix epoch");
|
||||||
|
format!("{}.{:03}", now.as_secs(), now.subsec_millis())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[ruin_runtime::main]
|
||||||
|
fn main() {
|
||||||
|
START.get_or_init(Instant::now);
|
||||||
|
|
||||||
|
let first = cell(String::from("Ruin"));
|
||||||
|
let visits = cell(1usize);
|
||||||
|
let greeting_compute = Rc::new(Counter::new(0usize));
|
||||||
|
let summary_compute = Rc::new(Counter::new(0usize));
|
||||||
|
|
||||||
|
let greeting = thunk({
|
||||||
|
let first = first.clone();
|
||||||
|
let greeting_compute = Rc::clone(&greeting_compute);
|
||||||
|
move || {
|
||||||
|
let expected = match greeting_compute.get() {
|
||||||
|
0 => 3,
|
||||||
|
_ => 18,
|
||||||
|
};
|
||||||
|
greeting_compute.set(greeting_compute.get() + 1);
|
||||||
|
log_event!(expected, "[compute] greeting thunk recomputes");
|
||||||
|
format!("Hello, {}!", first.get())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let summary = thunk({
|
||||||
|
let greeting = greeting.clone();
|
||||||
|
let visits = visits.clone();
|
||||||
|
let summary_compute = Rc::clone(&summary_compute);
|
||||||
|
move || {
|
||||||
|
let expected = match summary_compute.get() {
|
||||||
|
0 => 2,
|
||||||
|
1 => 9,
|
||||||
|
2 => 17,
|
||||||
|
_ => 22,
|
||||||
|
};
|
||||||
|
summary_compute.set(summary_compute.get() + 1);
|
||||||
|
log_event!(expected, "[compute] summary thunk recomputes");
|
||||||
|
format!("{} Visits: {}", greeting.get(), visits.get())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log_event!(1, "[main] read summary for the first time");
|
||||||
|
log_event!(4, "[main] summary = {}", summary.get());
|
||||||
|
|
||||||
|
log_event!(5, "[main] read summary again (should hit caches)");
|
||||||
|
log_event!(6, "[main] summary = {}", summary.get());
|
||||||
|
|
||||||
|
log_event!(7, "[main] set visits to 2");
|
||||||
|
visits
|
||||||
|
.set(2)
|
||||||
|
.expect("visit count should report the previous value");
|
||||||
|
|
||||||
|
log_event!(8, "[main] read summary after visits change");
|
||||||
|
log_event!(10, "[main] summary = {}", summary.get());
|
||||||
|
|
||||||
|
log_event!(11, "[main] set first to the same value");
|
||||||
|
assert!(
|
||||||
|
first.set(String::from("Ruin")).is_none(),
|
||||||
|
"same-value set should be suppressed",
|
||||||
|
);
|
||||||
|
|
||||||
|
log_event!(12, "[main] read summary after unchanged write");
|
||||||
|
log_event!(13, "[main] summary = {}", summary.get());
|
||||||
|
|
||||||
|
log_event!(14, "[main] replace first with `Reactive`");
|
||||||
|
let old = first.replace(String::from("Reactive"));
|
||||||
|
log_event!(15, "[main] replace returned old value `{old}`");
|
||||||
|
|
||||||
|
log_event!(16, "[main] read summary after name change");
|
||||||
|
log_event!(19, "[main] summary = {}", summary.get());
|
||||||
|
|
||||||
|
log_event!(20, "[main] update visits in place");
|
||||||
|
visits.update(|count| *count += 1);
|
||||||
|
|
||||||
|
log_event!(21, "[main] read summary after update()");
|
||||||
|
log_event!(23, "[main] summary = {}", summary.get());
|
||||||
|
}
|
||||||
131
lib/reactivity/src/cell.rs
Normal file
131
lib/reactivity/src/cell.rs
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
use std::cell::RefCell;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use crate::{NodeId, Reactor, current};
|
||||||
|
|
||||||
|
/// Creates a [`Cell`] in the current thread's default reactor.
|
||||||
|
pub fn cell<T: 'static>(initial: T) -> Cell<T> {
|
||||||
|
current().cell(initial)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a [`Cell`] associated with `reactor`.
|
||||||
|
pub fn cell_in<T: 'static>(reactor: &Reactor, initial: T) -> Cell<T> {
|
||||||
|
reactor.cell(initial)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutable source node in the reactive graph.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Cell<T> {
|
||||||
|
inner: Rc<CellInner<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Reactor {
|
||||||
|
/// Creates a mutable source cell associated with this reactor.
|
||||||
|
pub fn cell<T: 'static>(&self, initial: T) -> Cell<T> {
|
||||||
|
Cell::new(self.clone(), initial)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> Cell<T> {
|
||||||
|
fn new(reactor: Reactor, initial: T) -> Self {
|
||||||
|
let id = reactor.allocate_node();
|
||||||
|
Self {
|
||||||
|
inner: Rc::new(CellInner {
|
||||||
|
reactor,
|
||||||
|
id,
|
||||||
|
value: RefCell::new(initial),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs `f` with a shared reference to the current value.
|
||||||
|
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
|
||||||
|
self.inner.reactor.observe(self.inner.id);
|
||||||
|
let value = self.inner.value.borrow();
|
||||||
|
f(&value)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replaces the current value and notifies dependents.
|
||||||
|
pub fn replace(&self, value: T) -> T {
|
||||||
|
let previous = self.inner.value.replace(value);
|
||||||
|
self.inner.reactor.trigger(self.inner.id);
|
||||||
|
previous
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutates the current value in place and notifies dependents.
|
||||||
|
pub fn update<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
|
||||||
|
let output = {
|
||||||
|
let mut value = self.inner.value.borrow_mut();
|
||||||
|
f(&mut value)
|
||||||
|
};
|
||||||
|
self.inner.reactor.trigger(self.inner.id);
|
||||||
|
output
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone + 'static> Cell<T> {
|
||||||
|
/// Clones and returns the current value.
|
||||||
|
pub fn get(&self) -> T {
|
||||||
|
self.with(T::clone)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: PartialEq + 'static> Cell<T> {
|
||||||
|
/// Sets the cell to `value`, suppressing unchanged writes.
|
||||||
|
///
|
||||||
|
/// Returns the previous value if the cell changed, or `None` when the new value was equal to
|
||||||
|
/// the old one.
|
||||||
|
pub fn set(&self, value: T) -> Option<T> {
|
||||||
|
let mut current = self.inner.value.borrow_mut();
|
||||||
|
if *current == value {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let previous = std::mem::replace(&mut *current, value);
|
||||||
|
drop(current);
|
||||||
|
self.inner.reactor.trigger(self.inner.id);
|
||||||
|
Some(previous)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CellInner<T> {
|
||||||
|
reactor: Reactor,
|
||||||
|
id: NodeId,
|
||||||
|
value: RefCell<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for CellInner<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.reactor.dispose(self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::Cell;
|
||||||
|
use crate::Reactor;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn set_suppresses_unchanged_writes() {
|
||||||
|
let reactor = Reactor::new();
|
||||||
|
let value = reactor.cell(10usize);
|
||||||
|
|
||||||
|
assert_eq!(value.set(10), None);
|
||||||
|
assert_eq!(value.get(), 10);
|
||||||
|
assert_eq!(value.set(11), Some(10));
|
||||||
|
assert_eq!(value.get(), 11);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn replace_and_update_write_values() {
|
||||||
|
let reactor = Reactor::new();
|
||||||
|
let value: Cell<Vec<usize>> = reactor.cell(vec![1, 2]);
|
||||||
|
|
||||||
|
let old = value.replace(vec![3]);
|
||||||
|
assert_eq!(old, vec![1, 2]);
|
||||||
|
assert_eq!(value.with(|items| items.clone()), vec![3]);
|
||||||
|
|
||||||
|
value.update(|items| items.push(4));
|
||||||
|
assert_eq!(value.with(|items| items.clone()), vec![3, 4]);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,7 +5,6 @@ use core::fmt;
|
|||||||
pub struct NodeId(pub(crate) u64);
|
pub struct NodeId(pub(crate) u64);
|
||||||
|
|
||||||
impl NodeId {
|
impl NodeId {
|
||||||
#[allow(dead_code)]
|
|
||||||
pub(crate) const fn new(raw: u64) -> Self {
|
pub(crate) const fn new(raw: u64) -> Self {
|
||||||
Self(raw)
|
Self(raw)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,8 +4,12 @@
|
|||||||
//! single-threaded and designed to live on a runtime-managed thread, while async work feeds it
|
//! single-threaded and designed to live on a runtime-managed thread, while async work feeds it
|
||||||
//! from the edges by updating state or emitting events.
|
//! from the edges by updating state or emitting events.
|
||||||
|
|
||||||
|
mod cell;
|
||||||
mod id;
|
mod id;
|
||||||
mod reactor;
|
mod reactor;
|
||||||
|
mod thunk;
|
||||||
|
|
||||||
|
pub use cell::{Cell, cell, cell_in};
|
||||||
pub use id::NodeId;
|
pub use id::NodeId;
|
||||||
pub use reactor::{ReactCycleError, Reactor, current};
|
pub use reactor::{ReactCycleError, Reactor, current};
|
||||||
|
pub use thunk::{Thunk, thunk, thunk_in};
|
||||||
|
|||||||
@@ -221,14 +221,12 @@ impl Reactor {
|
|||||||
self.inner.ensure_flush_scheduled();
|
self.inner.ensure_flush_scheduled();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub(crate) fn allocate_node(&self) -> NodeId {
|
pub(crate) fn allocate_node(&self) -> NodeId {
|
||||||
let raw = self.inner.next_node.get();
|
let raw = self.inner.next_node.get();
|
||||||
self.inner.next_node.set(raw.wrapping_add(1));
|
self.inner.next_node.set(raw.wrapping_add(1));
|
||||||
NodeId::new(raw)
|
NodeId::new(raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub(crate) fn register_observer(&self, id: NodeId, observer: Rc<dyn ObserverHook>) {
|
pub(crate) fn register_observer(&self, id: NodeId, observer: Rc<dyn ObserverHook>) {
|
||||||
self.inner
|
self.inner
|
||||||
.observers
|
.observers
|
||||||
@@ -236,7 +234,6 @@ impl Reactor {
|
|||||||
.insert(id, Rc::downgrade(&observer));
|
.insert(id, Rc::downgrade(&observer));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub(crate) fn unregister_observer(&self, id: NodeId) {
|
pub(crate) fn unregister_observer(&self, id: NodeId) {
|
||||||
self.inner.observers.borrow_mut().remove(&id);
|
self.inner.observers.borrow_mut().remove(&id);
|
||||||
}
|
}
|
||||||
@@ -277,7 +274,6 @@ impl fmt::Debug for Reactor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ReactorInner {
|
struct ReactorInner {
|
||||||
#[allow(dead_code)]
|
|
||||||
next_node: Cell<u64>,
|
next_node: Cell<u64>,
|
||||||
dependencies: RefCell<BTreeMap<NodeId, BTreeSet<NodeId>>>,
|
dependencies: RefCell<BTreeMap<NodeId, BTreeSet<NodeId>>>,
|
||||||
dependents: RefCell<BTreeMap<NodeId, BTreeSet<NodeId>>>,
|
dependents: RefCell<BTreeMap<NodeId, BTreeSet<NodeId>>>,
|
||||||
|
|||||||
173
lib/reactivity/src/thunk.rs
Normal file
173
lib/reactivity/src/thunk.rs
Normal file
@@ -0,0 +1,173 @@
|
|||||||
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use crate::reactor::ObserverHook;
|
||||||
|
use crate::{NodeId, Reactor, current};
|
||||||
|
|
||||||
|
/// Creates a [`Thunk`] in the current thread's default reactor.
|
||||||
|
pub fn thunk<T: 'static>(compute: impl Fn() -> T + 'static) -> Thunk<T> {
|
||||||
|
current().thunk(compute)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a [`Thunk`] associated with `reactor`.
|
||||||
|
pub fn thunk_in<T: 'static>(reactor: &Reactor, compute: impl Fn() -> T + 'static) -> Thunk<T> {
|
||||||
|
reactor.thunk(compute)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lazy computed node in the reactive graph.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Thunk<T> {
|
||||||
|
inner: Rc<ThunkInner<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Reactor {
|
||||||
|
/// Creates a lazy computed thunk associated with this reactor.
|
||||||
|
pub fn thunk<T: 'static>(&self, compute: impl Fn() -> T + 'static) -> Thunk<T> {
|
||||||
|
Thunk::new(self.clone(), compute)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> Thunk<T> {
|
||||||
|
fn new(reactor: Reactor, compute: impl Fn() -> T + 'static) -> Self {
|
||||||
|
let id = reactor.allocate_node();
|
||||||
|
let inner = Rc::new(ThunkInner {
|
||||||
|
reactor: reactor.clone(),
|
||||||
|
id,
|
||||||
|
compute: Box::new(compute),
|
||||||
|
value: RefCell::new(None),
|
||||||
|
dirty: Cell::new(true),
|
||||||
|
});
|
||||||
|
|
||||||
|
let observer: Rc<dyn ObserverHook> = inner.clone();
|
||||||
|
reactor.register_observer(id, observer);
|
||||||
|
Self { inner }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs `f` with a shared reference to the current computed value.
|
||||||
|
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
|
||||||
|
self.inner.reactor.observe(self.inner.id);
|
||||||
|
self.inner.ensure_value();
|
||||||
|
let value = self.inner.value.borrow();
|
||||||
|
f(value
|
||||||
|
.as_ref()
|
||||||
|
.expect("thunk should have a cached value after recomputing"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone + 'static> Thunk<T> {
|
||||||
|
/// Clones and returns the current computed value.
|
||||||
|
pub fn get(&self) -> T {
|
||||||
|
self.with(T::clone)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ThunkInner<T> {
|
||||||
|
reactor: Reactor,
|
||||||
|
id: NodeId,
|
||||||
|
compute: Box<dyn Fn() -> T + 'static>,
|
||||||
|
value: RefCell<Option<T>>,
|
||||||
|
dirty: Cell<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ThunkInner<T> {
|
||||||
|
fn ensure_value(&self) {
|
||||||
|
if !self.dirty.get() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let next = self.reactor.run_in_context(self.id, || (self.compute)());
|
||||||
|
*self.value.borrow_mut() = Some(next);
|
||||||
|
self.dirty.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> ObserverHook for ThunkInner<T> {
|
||||||
|
fn notify(&self) {
|
||||||
|
if self.dirty.replace(true) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = self.value.borrow_mut().take();
|
||||||
|
self.reactor.trigger(self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for ThunkInner<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.reactor.unregister_observer(self.id);
|
||||||
|
self.reactor.dispose(self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::cell::Cell as Counter;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use crate::{Reactor, cell_in, thunk_in};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn thunk_caches_until_invalidated() {
|
||||||
|
let reactor = Reactor::new();
|
||||||
|
let source = cell_in(&reactor, 2usize);
|
||||||
|
let compute_count = Rc::new(Counter::new(0usize));
|
||||||
|
let thunk = thunk_in(&reactor, {
|
||||||
|
let source = source.clone();
|
||||||
|
let compute_count = Rc::clone(&compute_count);
|
||||||
|
move || {
|
||||||
|
compute_count.set(compute_count.get() + 1);
|
||||||
|
source.get() * 2
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(thunk.get(), 4);
|
||||||
|
assert_eq!(thunk.get(), 4);
|
||||||
|
assert_eq!(compute_count.get(), 1);
|
||||||
|
|
||||||
|
source.set(3);
|
||||||
|
assert_eq!(thunk.get(), 6);
|
||||||
|
assert_eq!(compute_count.get(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nested_thunks_recompute_only_affected_layers() {
|
||||||
|
let reactor = Reactor::new();
|
||||||
|
let base = cell_in(&reactor, 5usize);
|
||||||
|
let extra = cell_in(&reactor, 1usize);
|
||||||
|
|
||||||
|
let double_count = Rc::new(Counter::new(0usize));
|
||||||
|
let label_count = Rc::new(Counter::new(0usize));
|
||||||
|
|
||||||
|
let doubled = thunk_in(&reactor, {
|
||||||
|
let base = base.clone();
|
||||||
|
let double_count = Rc::clone(&double_count);
|
||||||
|
move || {
|
||||||
|
double_count.set(double_count.get() + 1);
|
||||||
|
base.get() * 2
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let label = thunk_in(&reactor, {
|
||||||
|
let doubled = doubled.clone();
|
||||||
|
let extra = extra.clone();
|
||||||
|
let label_count = Rc::clone(&label_count);
|
||||||
|
move || {
|
||||||
|
label_count.set(label_count.get() + 1);
|
||||||
|
format!("{} + {}", doubled.get(), extra.get())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(label.get(), "10 + 1");
|
||||||
|
assert_eq!(double_count.get(), 1);
|
||||||
|
assert_eq!(label_count.get(), 1);
|
||||||
|
|
||||||
|
extra.set(2);
|
||||||
|
assert_eq!(label.get(), "10 + 2");
|
||||||
|
assert_eq!(double_count.get(), 1);
|
||||||
|
assert_eq!(label_count.get(), 2);
|
||||||
|
|
||||||
|
base.set(6);
|
||||||
|
assert_eq!(label.get(), "12 + 2");
|
||||||
|
assert_eq!(double_count.get(), 2);
|
||||||
|
assert_eq!(label_count.get(), 3);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user