diff --git a/Cargo.lock b/Cargo.lock index 86624cd..5be4d2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -14,6 +23,12 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + [[package]] name = "futures-channel" version = "0.3.32" @@ -95,12 +110,39 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -131,6 +173,23 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "ruin-runtime" version = "0.1.0" @@ -140,6 +199,7 @@ dependencies = [ "hyper", "libc", "ruin-runtime-proc-macros", + "tracing", ] [[package]] @@ -156,6 +216,17 @@ name = "ruin_reactivity" version = "0.1.0" dependencies = [ "ruin-runtime", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", ] [[package]] @@ -175,6 +246,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tokio" version = "1.50.0" @@ -184,6 +264,40 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "once_cell", + "regex-automata", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/lib/reactivity/Cargo.toml b/lib/reactivity/Cargo.toml index 525518c..0006d50 100644 --- a/lib/reactivity/Cargo.toml +++ b/lib/reactivity/Cargo.toml @@ -5,3 +5,7 @@ edition = "2024" [dependencies] ruin_runtime = { package = "ruin-runtime", path = "../runtime" } +tracing = { version = "0.1", default-features = false, features = ["std"] } + +[dev-dependencies] +tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt", "std"] } diff --git a/lib/reactivity/examples/tracing_subscriber_showcase.rs b/lib/reactivity/examples/tracing_subscriber_showcase.rs new file mode 100644 index 0000000..f0ed5c4 --- /dev/null +++ b/lib/reactivity/examples/tracing_subscriber_showcase.rs @@ -0,0 +1,101 @@ +//! Example tracing setup for RUIN runtime + reactivity. +//! +//! Try: +//! +//! - `cargo run -p ruin_reactivity --example tracing_subscriber_showcase` +//! - `RUST_LOG=info,ruin_runtime::runtime=debug,ruin_reactivity::graph=debug cargo run -p ruin_reactivity --example tracing_subscriber_showcase` +//! - `RUST_LOG=info,ruin_runtime::scheduler=trace,ruin_reactivity::event=trace,ruin_reactivity::effect=debug cargo run -p ruin_reactivity --example tracing_subscriber_showcase` + +use ruin_reactivity::{cell, effect, event, on, thunk}; +use ruin_runtime::time::sleep; +use std::time::Duration; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, fmt}; + +fn install_tracing() { + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { + EnvFilter::new( + "info,\ + ruin_runtime::runtime=debug,\ + ruin_runtime::scheduler=debug,\ + ruin_reactivity::graph=debug,\ + ruin_reactivity::effect=debug,\ + ruin_reactivity::event=debug", + ) + }); + + let fmt_layer = fmt::layer() + .with_target(true) + .with_thread_ids(true) + .with_thread_names(true) + .compact(); + + let _ = tracing_subscriber::registry() + .with(filter) + .with(fmt_layer) + .try_init(); +} + +#[ruin_runtime::async_main] +async fn main() { + install_tracing(); + + tracing::info!( + event = "showcase_start", + note = "override RUST_LOG to see more or less detail", + "starting tracing subscriber showcase" + ); + + let count = cell(0usize); + let doubled = thunk({ + let count = count.clone(); + move || count.get() * 2 + }); + let clicks = event::(); + + let _drain = on(&clicks, { + let count = count.clone(); + move |delta| { + tracing::info!( + event = "apply_click_delta", + delta = *delta, + "draining queued event into cell update" + ); + count.update(|value| *value += *delta); + } + }); + + let _view = effect({ + let count = count.clone(); + let doubled = doubled.clone(); + move || { + tracing::info!( + target: "demo::view", + count = count.get(), + doubled = doubled.get(), + "derived view state updated" + ); + } + }); + + tracing::info!( + event = "emit_clicks", + count = 3, + "emitting queued click deltas" + ); + clicks.emit(1); + clicks.emit(2); + clicks.emit(0); + + tracing::info!(event = "manual_set", value = 10, "setting count directly"); + let _ = count.set(10); + + tracing::info!( + event = "showcase_done", + hint = "see ruin_runtime::* and ruin_reactivity::* targets in the filter", + "example body completed; awaiting once so scheduled microtasks can flush" + ); + + sleep(Duration::from_millis(1)).await; +} diff --git a/lib/reactivity/src/cell.rs b/lib/reactivity/src/cell.rs index b643aa1..412acb8 100644 --- a/lib/reactivity/src/cell.rs +++ b/lib/reactivity/src/cell.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; use std::rc::Rc; -use crate::{NodeId, Reactor, current}; +use crate::{NodeId, Reactor, current, trace_targets}; /// Creates a [`Cell`] in the current thread's default reactor. pub fn cell(initial: T) -> Cell { @@ -29,6 +29,12 @@ impl Reactor { impl Cell { fn new(reactor: Reactor, initial: T) -> Self { let id = reactor.allocate_node(); + tracing::debug!( + target: trace_targets::CELL, + event = "create_cell", + node_id = id.0, + "created reactive cell" + ); Self { inner: Rc::new(CellInner { reactor, @@ -40,6 +46,13 @@ impl Cell { /// Runs `f` with a shared reference to the current value. pub fn with(&self, f: impl FnOnce(&T) -> R) -> R { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::CELL, + event = "read_cell", + node_id = self.inner.id.0, + "reading reactive cell" + ); self.inner.reactor.observe(self.inner.id); let value = self.inner.value.borrow(); f(&value) @@ -48,6 +61,12 @@ impl Cell { /// Replaces the current value and notifies dependents. pub fn replace(&self, value: T) -> T { let previous = self.inner.value.replace(value); + tracing::debug!( + target: trace_targets::CELL, + event = "replace_cell", + node_id = self.inner.id.0, + "replaced cell value" + ); self.inner.reactor.trigger(self.inner.id); previous } @@ -58,6 +77,12 @@ impl Cell { let mut value = self.inner.value.borrow_mut(); f(&mut value) }; + tracing::debug!( + target: trace_targets::CELL, + event = "update_cell", + node_id = self.inner.id.0, + "updated cell value in place" + ); self.inner.reactor.trigger(self.inner.id); output } @@ -78,11 +103,26 @@ impl Cell { pub fn set(&self, value: T) -> Option { let mut current = self.inner.value.borrow_mut(); if *current == value { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::CELL, + event = "set_cell", + node_id = self.inner.id.0, + changed = false, + "suppressed unchanged cell write" + ); return None; } let previous = std::mem::replace(&mut *current, value); drop(current); + tracing::debug!( + target: trace_targets::CELL, + event = "set_cell", + node_id = self.inner.id.0, + changed = true, + "set cell value" + ); self.inner.reactor.trigger(self.inner.id); Some(previous) } diff --git a/lib/reactivity/src/effect.rs b/lib/reactivity/src/effect.rs index 9157cc2..b2343c3 100644 --- a/lib/reactivity/src/effect.rs +++ b/lib/reactivity/src/effect.rs @@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell}; use std::rc::{Rc, Weak}; use crate::reactor::ObserverHook; -use crate::{NodeId, Reactor, current}; +use crate::{NodeId, Reactor, current, trace_targets}; /// Creates an effect in the current thread's default reactor. pub fn effect(f: impl Fn() + 'static) -> EffectHandle { @@ -42,6 +42,12 @@ impl EffectHandle { self_ref: RefCell::new(Weak::new()), }); *inner.self_ref.borrow_mut() = Rc::downgrade(&inner); + tracing::debug!( + target: trace_targets::EFFECT, + event = "create_effect", + node_id = id.0, + "created reactive effect" + ); let observer: Rc = inner.clone(); reactor.register_observer(id, observer); @@ -72,9 +78,27 @@ struct EffectInner { impl EffectInner { fn schedule(&self) { if self.disposed.get() || self.scheduled.replace(true) { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EFFECT, + event = "schedule_effect", + node_id = self.id.0, + queued = false, + disposed = self.disposed.get(), + already_scheduled = self.scheduled.get(), + "effect scheduling skipped" + ); return; } + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EFFECT, + event = "schedule_effect", + node_id = self.id.0, + queued = true, + "queued effect for microtask flush" + ); let weak = self.self_ref.borrow().clone(); let reactor = self.reactor.clone(); reactor.schedule(move || { @@ -87,6 +111,12 @@ impl EffectInner { } inner.scheduled.set(false); + let _span = tracing::debug_span!( + target: trace_targets::EFFECT, + "effect.run", + node_id = inner.id.0 + ) + .entered(); inner.reactor.run_in_context(inner.id, || (inner.effect)()); }); } @@ -96,6 +126,12 @@ impl EffectInner { return; } + tracing::debug!( + target: trace_targets::EFFECT, + event = "dispose_effect", + node_id = self.id.0, + "disposed reactive effect" + ); self.reactor.unregister_observer(self.id); self.reactor.dispose(self.id); } diff --git a/lib/reactivity/src/event.rs b/lib/reactivity/src/event.rs index bc807dd..7c6333b 100644 --- a/lib/reactivity/src/event.rs +++ b/lib/reactivity/src/event.rs @@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell}; use std::collections::BTreeMap; use std::rc::Rc; -use crate::{NodeId, Reactor, current}; +use crate::{NodeId, Reactor, current, trace_targets}; type SubscriberFn = dyn Fn(&T) + 'static; @@ -68,6 +68,14 @@ impl Reactor { let mut queued = queue.borrow_mut(); queued.drain(..).collect::>() }; + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EVENT, + event = "drain_event_queue", + event_id = event.inner.id.0, + drained = drained.len(), + "draining queued event values reactively" + ); for value in &drained { handler(value); } @@ -84,6 +92,12 @@ impl Reactor { impl Event { fn new(reactor: Reactor) -> Self { let id = reactor.allocate_node(); + tracing::debug!( + target: trace_targets::EVENT, + event = "create_event", + node_id = id.0, + "created reactive event" + ); Self { inner: Rc::new(EventInner { reactor, @@ -95,6 +109,13 @@ impl Event { } fn observe(&self) { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EVENT, + event = "observe_event", + event_id = self.inner.id.0, + "observing event reactively" + ); self.inner.reactor.observe(self.inner.id); } @@ -107,6 +128,13 @@ impl Event { .values() .cloned() .collect::>(); + tracing::debug!( + target: trace_targets::EVENT, + event = "emit_event", + event_id = self.inner.id.0, + subscriber_count = subscribers.len(), + "emitting event value" + ); for subscriber in subscribers { subscriber(&value); } @@ -121,9 +149,25 @@ impl Event { .subscribers .borrow_mut() .insert(id, Rc::new(handler) as Rc>); + tracing::debug!( + target: trace_targets::EVENT, + event = "subscribe_event", + event_id = self.inner.id.0, + subscription_id = id, + subscriber_count = self.inner.subscribers.borrow().len(), + "added event subscriber" + ); let inner = Rc::clone(&self.inner); Subscription::new(move || { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EVENT, + event = "unsubscribe_event", + event_id = inner.id.0, + subscription_id = id, + "removing event subscriber" + ); inner.subscribers.borrow_mut().remove(&id); }) } @@ -173,6 +217,12 @@ impl SubscriptionInner { if !self.active.replace(false) { return; } + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::EVENT, + event = "unsubscribe", + "cancelling subscription" + ); (self.cancel)(); } } diff --git a/lib/reactivity/src/lib.rs b/lib/reactivity/src/lib.rs index 2fb708d..4343c60 100644 --- a/lib/reactivity/src/lib.rs +++ b/lib/reactivity/src/lib.rs @@ -4,6 +4,15 @@ //! single-threaded and designed to live on a runtime-managed thread, while async work feeds it //! from the edges by updating state or emitting events. +pub(crate) mod trace_targets { + pub const GRAPH: &str = "ruin_reactivity::graph"; + pub const CELL: &str = "ruin_reactivity::cell"; + pub const THUNK: &str = "ruin_reactivity::thunk"; + pub const MEMO: &str = "ruin_reactivity::memo"; + pub const EFFECT: &str = "ruin_reactivity::effect"; + pub const EVENT: &str = "ruin_reactivity::event"; +} + mod cell; mod effect; mod event; diff --git a/lib/reactivity/src/reactor.rs b/lib/reactivity/src/reactor.rs index 0a66cdb..3b29941 100644 --- a/lib/reactivity/src/reactor.rs +++ b/lib/reactivity/src/reactor.rs @@ -7,7 +7,7 @@ use std::rc::{Rc, Weak}; use ruin_runtime::queue_microtask; -use crate::NodeId; +use crate::{NodeId, trace_targets}; type Job = Box; @@ -72,20 +72,37 @@ pub struct Reactor { impl Reactor { /// Creates a new empty reactor. pub fn new() -> Self { - Self { + let reactor = Self { inner: Rc::new(ReactorInner::new()), - } + }; + tracing::debug!( + target: trace_targets::GRAPH, + event = "reactor_new", + "created reactive reactor" + ); + reactor } /// Returns the current thread's default reactor. pub fn current() -> Self { CURRENT_REACTOR.with(|slot| { if let Some(inner) = slot.borrow().upgrade() { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "current_reactor_reuse", + "reusing current thread default reactor" + ); return Self { inner }; } let reactor = Self::new(); *slot.borrow_mut() = Rc::downgrade(&reactor.inner); + tracing::debug!( + target: trace_targets::GRAPH, + event = "current_reactor_install", + "installed current thread default reactor" + ); reactor }) } @@ -96,6 +113,12 @@ impl Reactor { /// [`observe`](Self::observe) made while `f` executes will become the observer's new /// dependencies. pub fn run_in_context(&self, observer: NodeId, f: impl FnOnce() -> T) -> T { + let _span = tracing::debug_span!( + target: trace_targets::GRAPH, + "reactor.run_in_context", + observer_id = observer.0 + ) + .entered(); self.clear_observer_dependencies(observer); self.inner.stack.borrow_mut().push(observer); let inserted = self.inner.active_computations.borrow_mut().insert(observer); @@ -140,6 +163,13 @@ impl Reactor { .expect("active computation should appear in observer stack"); let mut cycle = stack[start..].to_vec(); cycle.push(observable); + tracing::debug!( + target: trace_targets::GRAPH, + event = "cycle_detected", + observable_id = observable.0, + cycle_len = cycle.len(), + "reactive cycle detected" + ); panic_any(ReactCycleError::new(cycle)); } @@ -148,6 +178,15 @@ impl Reactor { return; }; + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "observe", + observer_id = observer.0, + observable_id = observable.0, + "recording reactive dependency" + ); + self.inner .dependencies .borrow_mut() @@ -172,6 +211,15 @@ impl Reactor { .map(|nodes| nodes.iter().copied().collect::>()) .unwrap_or_default(); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "trigger", + observable_id = observable.0, + dependent_count = dependents.len(), + "triggering reactive dependents" + ); + for dependent in dependents { let hook = self .inner @@ -190,6 +238,12 @@ impl Reactor { /// Disposes all graph bookkeeping for `node`. pub fn dispose(&self, node: NodeId) { + tracing::debug!( + target: trace_targets::GRAPH, + event = "dispose_node", + node_id = node.0, + "disposing reactive node bookkeeping" + ); self.clear_observer_dependencies(node); let incoming = self @@ -218,13 +272,28 @@ impl Reactor { .pending_jobs .borrow_mut() .push_back(Box::new(job)); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "schedule_job", + pending_jobs = self.inner.pending_jobs.borrow().len(), + "queued reactive job for microtask flush" + ); self.inner.ensure_flush_scheduled(); } pub(crate) fn allocate_node(&self) -> NodeId { let raw = self.inner.next_node.get(); self.inner.next_node.set(raw.wrapping_add(1)); - NodeId::new(raw) + let id = NodeId::new(raw); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "allocate_node", + node_id = id.0, + "allocated reactive node id" + ); + id } pub(crate) fn register_observer(&self, id: NodeId, observer: Rc) { @@ -303,6 +372,13 @@ impl ReactorInner { return; } + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "schedule_flush", + pending_jobs = self.pending_jobs.borrow().len(), + "scheduling reactive microtask flush" + ); let reactor = Rc::clone(self); queue_microtask(move || { reactor.flush_jobs(); @@ -310,11 +386,23 @@ impl ReactorInner { } fn flush_jobs(self: Rc) { + let _span = tracing::debug_span!( + target: trace_targets::GRAPH, + "reactor.flush_jobs" + ) + .entered(); loop { let job = self.pending_jobs.borrow_mut().pop_front(); let Some(job) = job else { break; }; + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::GRAPH, + event = "run_job", + remaining_jobs = self.pending_jobs.borrow().len(), + "running reactive scheduled job" + ); job(); } diff --git a/lib/reactivity/src/thunk.rs b/lib/reactivity/src/thunk.rs index ed75951..43f9be5 100644 --- a/lib/reactivity/src/thunk.rs +++ b/lib/reactivity/src/thunk.rs @@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell}; use std::rc::Rc; use crate::reactor::ObserverHook; -use crate::{NodeId, Reactor, current}; +use crate::{NodeId, Reactor, current, trace_targets}; type ComputeFn = dyn Fn() -> T + 'static; type EqualsFn = dyn Fn(&T, &T) -> bool + 'static; @@ -93,11 +93,24 @@ impl Thunk { let observer: Rc = inner.clone(); reactor.register_observer(id, observer); + tracing::debug!( + target: trace_targets::THUNK, + event = "create_thunk", + node_id = id.0, + "created reactive thunk" + ); Self { inner } } /// Runs `f` with a shared reference to the current computed value. pub fn with(&self, f: impl FnOnce(&T) -> R) -> R { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::THUNK, + event = "read_thunk", + node_id = self.inner.id.0, + "reading thunk value" + ); self.inner.reactor.observe(self.inner.id); self.inner.ensure_value(); let value = self.inner.value.borrow(); @@ -132,11 +145,24 @@ impl Memo { let observer: Rc = inner.clone(); reactor.register_observer(id, observer); + tracing::debug!( + target: trace_targets::MEMO, + event = "create_memo", + node_id = id.0, + "created reactive memo" + ); Self { inner } } /// Runs `f` with a shared reference to the current computed value. pub fn with(&self, f: impl FnOnce(&T) -> R) -> R { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::MEMO, + event = "read_memo", + node_id = self.inner.id.0, + "reading memo value" + ); self.inner.reactor.observe(self.inner.id); self.inner.ensure_value(); let value = self.inner.value.borrow(); @@ -167,6 +193,12 @@ impl ThunkInner { return; } + let _span = tracing::debug_span!( + target: trace_targets::THUNK, + "thunk.recompute", + node_id = self.id.0 + ) + .entered(); let next = self.reactor.run_in_context(self.id, || (self.compute)()); *self.value.borrow_mut() = Some(next); self.dirty.set(false); @@ -191,6 +223,12 @@ impl MemoInner { } fn recompute(&self) -> bool { + let _span = tracing::debug_span!( + target: trace_targets::MEMO, + "memo.recompute", + node_id = self.id.0 + ) + .entered(); let next = self.reactor.run_in_context(self.id, || (self.compute)()); let mut value = self.value.borrow_mut(); let changed = match value.as_ref() { @@ -199,13 +237,29 @@ impl MemoInner { }; *value = Some(next); self.dirty.set(false); + tracing::debug!( + target: trace_targets::MEMO, + event = "memo_recompute", + node_id = self.id.0, + changed, + "recomputed memo" + ); changed } } impl ObserverHook for ThunkInner { fn notify(&self) { - if self.dirty.replace(true) { + let already_dirty = self.dirty.replace(true); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::THUNK, + event = "invalidate_thunk", + node_id = self.id.0, + already_dirty, + "invalidating thunk" + ); + if already_dirty { return; } @@ -225,11 +279,29 @@ impl ObserverHook for MemoInner { fn notify(&self) { if self.value.borrow().is_none() { self.dirty.set(true); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::MEMO, + event = "invalidate_memo", + node_id = self.id.0, + eagerly_recomputed = false, + "marked uninitialized memo dirty" + ); return; } self.dirty.set(true); - if self.recompute() { + let changed = self.recompute(); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::MEMO, + event = "invalidate_memo", + node_id = self.id.0, + eagerly_recomputed = true, + changed, + "invalidated memo" + ); + if changed { self.reactor.trigger(self.id); } } diff --git a/lib/runtime/Cargo.toml b/lib/runtime/Cargo.toml index d5624d5..a128d42 100644 --- a/lib/runtime/Cargo.toml +++ b/lib/runtime/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" hyper = { version = "1.8", default-features = false, features = ["client", "http1"] } libc = "0.2" ruin_runtime_proc_macros = { package = "ruin-runtime-proc-macros", path = "../runtime_proc_macros" } +tracing = { version = "0.1", default-features = false, features = ["std"] } [dev-dependencies] bytes = "1" diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 9559c9d..306a922 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -25,6 +25,14 @@ compile_error!("ruin-runtime currently supports only Linux x86_64."); extern crate alloc; +pub(crate) mod trace_targets { + pub const DRIVER: &str = "ruin_runtime::driver"; + pub const RUNTIME: &str = "ruin_runtime::runtime"; + pub const SCHEDULER: &str = "ruin_runtime::scheduler"; + pub const TIMER: &str = "ruin_runtime::timer"; + pub const ASYNC: &str = "ruin_runtime::async"; +} + pub mod channel; pub mod fs; pub mod net; diff --git a/lib/runtime/src/platform/linux_x86_64/driver.rs b/lib/runtime/src/platform/linux_x86_64/driver.rs index f4578bd..92cca05 100644 --- a/lib/runtime/src/platform/linux_x86_64/driver.rs +++ b/lib/runtime/src/platform/linux_x86_64/driver.rs @@ -10,6 +10,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use super::uring::{IORING_OP_ASYNC_CANCEL, IoUring, IoUringCqe, IoUringSqe}; +use crate::trace_targets; const WAKE_TARGET_TOKEN: u64 = 1; const TOKEN_KIND_SHIFT: u64 = 56; @@ -34,6 +35,13 @@ struct NotifierInner { impl NotifierInner { fn notify(&self) -> io::Result<()> { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::DRIVER, + event = "notify", + ring_fd = self.ring_fd, + "sending cross-thread driver wake" + ); if self.closed.load(Ordering::Acquire) { return Err(io::Error::new( io::ErrorKind::BrokenPipe, @@ -96,6 +104,12 @@ pub fn create() -> io::Result<(Driver, ThreadNotifier)> { /// emphasize driver construction. pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> { let ring = IoUring::new(64)?; + tracing::debug!( + target: trace_targets::DRIVER, + event = "create_driver", + ring_fd = ring.ring_fd(), + "created runtime driver" + ); let notifier = Arc::new(NotifierInner { ring_fd: ring.ring_fd(), closed: AtomicBool::new(false), @@ -130,11 +144,27 @@ impl Driver { let saw_any = self .ring .drain_completions(|cqe| self.process_cqe(cqe, &mut ready)); + #[cfg(debug_assertions)] + if saw_any { + tracing::trace!( + target: trace_targets::DRIVER, + event = "poll_ready", + timer_ready = ready.timer, + wake_ready = ready.wake, + "driver poll produced ready events" + ); + } if saw_any { Ok(Some(ready)) } else { Ok(None) } } /// Blocks until at least one completion is available. pub fn wait(&self) -> io::Result<()> { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::DRIVER, + event = "wait", + "waiting for driver completion" + ); self.ring.wait_for_cqe() } @@ -142,6 +172,13 @@ impl Driver { /// /// Passing `None` removes any active timer. pub fn rearm_timer(&self, deadline: Option) -> io::Result<()> { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "rearm_timer", + deadline_ns = deadline.map(|value| value.as_nanos() as u64), + "rearming driver timer" + ); match (self.active_timer_token.get(), deadline) { (Some(active), Some(deadline)) => { self.ring.submit_timeout_update(active, deadline)?; @@ -168,6 +205,13 @@ impl Driver { on_complete: impl FnOnce(IoUringCqe) + Send + 'static, ) -> io::Result { let token = self.next_token(CompletionKind::Operation); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::ASYNC, + event = "submit_operation", + token, + "submitting async driver operation" + ); self.completions .borrow_mut() .insert(token, Box::new(on_complete)); @@ -181,6 +225,13 @@ impl Driver { } pub(crate) fn cancel_operation(&self, token: u64) -> io::Result<()> { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::ASYNC, + event = "cancel_operation", + token, + "submitting async driver cancellation" + ); self.ring .submit_with_token(self.next_token(CompletionKind::OperationCancel), |sqe| { sqe.opcode = IORING_OP_ASYNC_CANCEL; @@ -216,6 +267,14 @@ impl Driver { } fn process_cqe(&self, cqe: IoUringCqe, ready: &mut ReadyEvents) { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::DRIVER, + event = "process_cqe", + user_data = cqe.user_data, + result = cqe.res, + "processing io_uring completion" + ); if cqe.user_data == WAKE_TARGET_TOKEN { ready.wake = true; let wakes = cqe.res.max(1) as u64; @@ -256,6 +315,11 @@ impl Driver { impl Drop for Driver { fn drop(&mut self) { + tracing::debug!( + target: trace_targets::DRIVER, + event = "drop_driver", + "dropping runtime driver" + ); self.notifier.closed.store(true, Ordering::Release); } } diff --git a/lib/runtime/src/platform/linux_x86_64/runtime.rs b/lib/runtime/src/platform/linux_x86_64/runtime.rs index ef2ba5a..83e5f43 100644 --- a/lib/runtime/src/platform/linux_x86_64/runtime.rs +++ b/lib/runtime/src/platform/linux_x86_64/runtime.rs @@ -12,6 +12,7 @@ use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; use super::driver::{Driver, ThreadNotifier, create_driver, monotonic_now}; +use crate::trace_targets; type LocalTask = Box; type SendTask = Box; @@ -95,6 +96,13 @@ pub fn queue_task(task: F) where F: FnOnce() + 'static, { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::SCHEDULER, + event = "queue_task", + queue = "local_macro", + "queueing local macrotask" + ); push_local_macrotask(Box::new(task)); } @@ -110,6 +118,13 @@ pub fn queue_microtask(task: F) where F: FnOnce() + 'static, { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::SCHEDULER, + event = "queue_microtask", + queue = "local_micro", + "queueing local microtask" + ); current_thread() .local_microtasks .borrow_mut() @@ -128,6 +143,15 @@ where let owner = current_thread_ptr(); let id = allocate_timer_id(); let deadline = deadline_from_now(delay); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "set_timeout", + timer_id = id, + delay_ns = delay.as_nanos() as u64, + deadline_ns = deadline.as_nanos() as u64, + "scheduling timeout" + ); let timer = TimerNode::timeout(id, deadline, Box::new(callback)); current_thread().timers.borrow_mut().insert(timer); @@ -146,6 +170,13 @@ where /// /// Panics if called from a different runtime thread than the one that created `handle`. pub fn clear_timeout(handle: &TimeoutHandle) { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "clear_timeout", + timer_id = handle.id, + "clearing timeout" + ); clear_timer(handle.owner, handle.id); } @@ -163,6 +194,15 @@ where let owner = current_thread_ptr(); let id = allocate_timer_id(); let deadline = deadline_from_now(delay); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "set_interval", + timer_id = id, + delay_ns = delay.as_nanos() as u64, + deadline_ns = deadline.as_nanos() as u64, + "scheduling interval" + ); let timer = TimerNode::interval( id, deadline, @@ -186,6 +226,13 @@ where /// /// Panics if called from a different runtime thread than the one that created `handle`. pub fn clear_interval(handle: &IntervalHandle) { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "clear_interval", + timer_id = handle.id, + "clearing interval" + ); clear_timer(handle.owner, handle.id); } @@ -212,6 +259,12 @@ where F: Future + 'static, F::Output: 'static, { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::ASYNC, + event = "queue_future", + "queueing local future" + ); let state = Rc::new(JoinState::new()); let completion = Rc::clone(&state); let task = Rc::new(FutureTask { @@ -240,6 +293,11 @@ where Init: FnOnce() + Send + 'static, Exit: FnOnce() + 'static, { + tracing::debug!( + target: trace_targets::RUNTIME, + event = "spawn_worker", + "spawning runtime worker thread" + ); let parent = current_thread(); let (driver, notifier) = create_driver().expect("worker driver should initialize"); let shared = Arc::new(ThreadShared::new(notifier)); @@ -281,6 +339,16 @@ where /// /// Panics if runtime initialization fails or if the underlying driver returns an unexpected error. pub fn run() { + let _span = tracing::debug_span!( + target: trace_targets::RUNTIME, + "runtime.run" + ) + .entered(); + tracing::debug!( + target: trace_targets::RUNTIME, + event = "run_enter", + "entering runtime event loop" + ); let _ = current_thread(); loop { @@ -316,6 +384,15 @@ pub fn run() { if has_pending_timers() || state.has_live_children() || state.has_live_async_operations() { state.shared.closing.store(false, Ordering::Release); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::RUNTIME, + event = "run_wait", + pending_timers = has_pending_timers(), + live_children = state.has_live_children(), + live_async = state.has_live_async_operations(), + "runtime waiting for more work" + ); state.driver.wait().expect("driver wait should succeed"); continue; } @@ -327,6 +404,11 @@ pub fn run() { state.shared.closed.store(true, Ordering::Release); state.shared.notify(); + tracing::debug!( + target: trace_targets::RUNTIME, + event = "run_exit", + "runtime event loop exiting" + ); teardown_thread(); return; } @@ -351,7 +433,16 @@ impl ThreadHandle { where F: FnOnce() + Send + 'static, { - self.shared.enqueue_macro(Box::new(task)) + let queued = self.shared.enqueue_macro(Box::new(task)); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::SCHEDULER, + event = "queue_remote_task", + queue = "remote_macro", + queued, + "queueing remote macrotask" + ); + queued } /// Queues a microtask onto this runtime thread. @@ -361,7 +452,16 @@ impl ThreadHandle { where F: FnOnce() + Send + 'static, { - self.shared.enqueue_micro(Box::new(task)) + let queued = self.shared.enqueue_micro(Box::new(task)); + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::SCHEDULER, + event = "queue_remote_microtask", + queue = "remote_micro", + queued, + "queueing remote microtask" + ); + queued } /// Returns `true` if the target runtime thread has shut down. @@ -859,12 +959,24 @@ fn drain_driver_events() { let state = current_thread(); if ready.wake { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::DRIVER, + event = "drain_wake", + "draining driver wake notifications" + ); let _ = state .driver .drain_wake() .expect("wake drain should succeed"); } if ready.timer { + #[cfg(debug_assertions)] + tracing::trace!( + target: trace_targets::TIMER, + event = "drain_timer", + "draining expired runtime timers" + ); let _ = state .driver .drain_timer()