This commit is contained in:
2026-03-20 15:25:46 -04:00
parent 15813ebb6c
commit 9ab1167fef
13 changed files with 711 additions and 12 deletions

View File

@@ -5,3 +5,7 @@ edition = "2024"
[dependencies]
ruin_runtime = { package = "ruin-runtime", path = "../runtime" }
tracing = { version = "0.1", default-features = false, features = ["std"] }
[dev-dependencies]
tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt", "std"] }

View File

@@ -0,0 +1,101 @@
//! Example tracing setup for RUIN runtime + reactivity.
//!
//! Try:
//!
//! - `cargo run -p ruin_reactivity --example tracing_subscriber_showcase`
//! - `RUST_LOG=info,ruin_runtime::runtime=debug,ruin_reactivity::graph=debug cargo run -p ruin_reactivity --example tracing_subscriber_showcase`
//! - `RUST_LOG=info,ruin_runtime::scheduler=trace,ruin_reactivity::event=trace,ruin_reactivity::effect=debug cargo run -p ruin_reactivity --example tracing_subscriber_showcase`
use ruin_reactivity::{cell, effect, event, on, thunk};
use ruin_runtime::time::sleep;
use std::time::Duration;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, fmt};
fn install_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(
"info,\
ruin_runtime::runtime=debug,\
ruin_runtime::scheduler=debug,\
ruin_reactivity::graph=debug,\
ruin_reactivity::effect=debug,\
ruin_reactivity::event=debug",
)
});
let fmt_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
.compact();
let _ = tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.try_init();
}
#[ruin_runtime::async_main]
async fn main() {
install_tracing();
tracing::info!(
event = "showcase_start",
note = "override RUST_LOG to see more or less detail",
"starting tracing subscriber showcase"
);
let count = cell(0usize);
let doubled = thunk({
let count = count.clone();
move || count.get() * 2
});
let clicks = event::<usize>();
let _drain = on(&clicks, {
let count = count.clone();
move |delta| {
tracing::info!(
event = "apply_click_delta",
delta = *delta,
"draining queued event into cell update"
);
count.update(|value| *value += *delta);
}
});
let _view = effect({
let count = count.clone();
let doubled = doubled.clone();
move || {
tracing::info!(
target: "demo::view",
count = count.get(),
doubled = doubled.get(),
"derived view state updated"
);
}
});
tracing::info!(
event = "emit_clicks",
count = 3,
"emitting queued click deltas"
);
clicks.emit(1);
clicks.emit(2);
clicks.emit(0);
tracing::info!(event = "manual_set", value = 10, "setting count directly");
let _ = count.set(10);
tracing::info!(
event = "showcase_done",
hint = "see ruin_runtime::* and ruin_reactivity::* targets in the filter",
"example body completed; awaiting once so scheduled microtasks can flush"
);
sleep(Duration::from_millis(1)).await;
}

View File

@@ -1,7 +1,7 @@
use std::cell::RefCell;
use std::rc::Rc;
use crate::{NodeId, Reactor, current};
use crate::{NodeId, Reactor, current, trace_targets};
/// Creates a [`Cell`] in the current thread's default reactor.
pub fn cell<T: 'static>(initial: T) -> Cell<T> {
@@ -29,6 +29,12 @@ impl Reactor {
impl<T: 'static> Cell<T> {
fn new(reactor: Reactor, initial: T) -> Self {
let id = reactor.allocate_node();
tracing::debug!(
target: trace_targets::CELL,
event = "create_cell",
node_id = id.0,
"created reactive cell"
);
Self {
inner: Rc::new(CellInner {
reactor,
@@ -40,6 +46,13 @@ impl<T: 'static> Cell<T> {
/// Runs `f` with a shared reference to the current value.
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::CELL,
event = "read_cell",
node_id = self.inner.id.0,
"reading reactive cell"
);
self.inner.reactor.observe(self.inner.id);
let value = self.inner.value.borrow();
f(&value)
@@ -48,6 +61,12 @@ impl<T: 'static> Cell<T> {
/// Replaces the current value and notifies dependents.
pub fn replace(&self, value: T) -> T {
let previous = self.inner.value.replace(value);
tracing::debug!(
target: trace_targets::CELL,
event = "replace_cell",
node_id = self.inner.id.0,
"replaced cell value"
);
self.inner.reactor.trigger(self.inner.id);
previous
}
@@ -58,6 +77,12 @@ impl<T: 'static> Cell<T> {
let mut value = self.inner.value.borrow_mut();
f(&mut value)
};
tracing::debug!(
target: trace_targets::CELL,
event = "update_cell",
node_id = self.inner.id.0,
"updated cell value in place"
);
self.inner.reactor.trigger(self.inner.id);
output
}
@@ -78,11 +103,26 @@ impl<T: PartialEq + 'static> Cell<T> {
pub fn set(&self, value: T) -> Option<T> {
let mut current = self.inner.value.borrow_mut();
if *current == value {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::CELL,
event = "set_cell",
node_id = self.inner.id.0,
changed = false,
"suppressed unchanged cell write"
);
return None;
}
let previous = std::mem::replace(&mut *current, value);
drop(current);
tracing::debug!(
target: trace_targets::CELL,
event = "set_cell",
node_id = self.inner.id.0,
changed = true,
"set cell value"
);
self.inner.reactor.trigger(self.inner.id);
Some(previous)
}

View File

@@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell};
use std::rc::{Rc, Weak};
use crate::reactor::ObserverHook;
use crate::{NodeId, Reactor, current};
use crate::{NodeId, Reactor, current, trace_targets};
/// Creates an effect in the current thread's default reactor.
pub fn effect(f: impl Fn() + 'static) -> EffectHandle {
@@ -42,6 +42,12 @@ impl EffectHandle {
self_ref: RefCell::new(Weak::new()),
});
*inner.self_ref.borrow_mut() = Rc::downgrade(&inner);
tracing::debug!(
target: trace_targets::EFFECT,
event = "create_effect",
node_id = id.0,
"created reactive effect"
);
let observer: Rc<dyn ObserverHook> = inner.clone();
reactor.register_observer(id, observer);
@@ -72,9 +78,27 @@ struct EffectInner {
impl EffectInner {
fn schedule(&self) {
if self.disposed.get() || self.scheduled.replace(true) {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EFFECT,
event = "schedule_effect",
node_id = self.id.0,
queued = false,
disposed = self.disposed.get(),
already_scheduled = self.scheduled.get(),
"effect scheduling skipped"
);
return;
}
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EFFECT,
event = "schedule_effect",
node_id = self.id.0,
queued = true,
"queued effect for microtask flush"
);
let weak = self.self_ref.borrow().clone();
let reactor = self.reactor.clone();
reactor.schedule(move || {
@@ -87,6 +111,12 @@ impl EffectInner {
}
inner.scheduled.set(false);
let _span = tracing::debug_span!(
target: trace_targets::EFFECT,
"effect.run",
node_id = inner.id.0
)
.entered();
inner.reactor.run_in_context(inner.id, || (inner.effect)());
});
}
@@ -96,6 +126,12 @@ impl EffectInner {
return;
}
tracing::debug!(
target: trace_targets::EFFECT,
event = "dispose_effect",
node_id = self.id.0,
"disposed reactive effect"
);
self.reactor.unregister_observer(self.id);
self.reactor.dispose(self.id);
}

View File

@@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell};
use std::collections::BTreeMap;
use std::rc::Rc;
use crate::{NodeId, Reactor, current};
use crate::{NodeId, Reactor, current, trace_targets};
type SubscriberFn<T> = dyn Fn(&T) + 'static;
@@ -68,6 +68,14 @@ impl Reactor {
let mut queued = queue.borrow_mut();
queued.drain(..).collect::<Vec<_>>()
};
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EVENT,
event = "drain_event_queue",
event_id = event.inner.id.0,
drained = drained.len(),
"draining queued event values reactively"
);
for value in &drained {
handler(value);
}
@@ -84,6 +92,12 @@ impl Reactor {
impl<T: 'static> Event<T> {
fn new(reactor: Reactor) -> Self {
let id = reactor.allocate_node();
tracing::debug!(
target: trace_targets::EVENT,
event = "create_event",
node_id = id.0,
"created reactive event"
);
Self {
inner: Rc::new(EventInner {
reactor,
@@ -95,6 +109,13 @@ impl<T: 'static> Event<T> {
}
fn observe(&self) {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EVENT,
event = "observe_event",
event_id = self.inner.id.0,
"observing event reactively"
);
self.inner.reactor.observe(self.inner.id);
}
@@ -107,6 +128,13 @@ impl<T: 'static> Event<T> {
.values()
.cloned()
.collect::<Vec<_>>();
tracing::debug!(
target: trace_targets::EVENT,
event = "emit_event",
event_id = self.inner.id.0,
subscriber_count = subscribers.len(),
"emitting event value"
);
for subscriber in subscribers {
subscriber(&value);
}
@@ -121,9 +149,25 @@ impl<T: 'static> Event<T> {
.subscribers
.borrow_mut()
.insert(id, Rc::new(handler) as Rc<SubscriberFn<T>>);
tracing::debug!(
target: trace_targets::EVENT,
event = "subscribe_event",
event_id = self.inner.id.0,
subscription_id = id,
subscriber_count = self.inner.subscribers.borrow().len(),
"added event subscriber"
);
let inner = Rc::clone(&self.inner);
Subscription::new(move || {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EVENT,
event = "unsubscribe_event",
event_id = inner.id.0,
subscription_id = id,
"removing event subscriber"
);
inner.subscribers.borrow_mut().remove(&id);
})
}
@@ -173,6 +217,12 @@ impl SubscriptionInner {
if !self.active.replace(false) {
return;
}
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::EVENT,
event = "unsubscribe",
"cancelling subscription"
);
(self.cancel)();
}
}

View File

@@ -4,6 +4,15 @@
//! single-threaded and designed to live on a runtime-managed thread, while async work feeds it
//! from the edges by updating state or emitting events.
pub(crate) mod trace_targets {
pub const GRAPH: &str = "ruin_reactivity::graph";
pub const CELL: &str = "ruin_reactivity::cell";
pub const THUNK: &str = "ruin_reactivity::thunk";
pub const MEMO: &str = "ruin_reactivity::memo";
pub const EFFECT: &str = "ruin_reactivity::effect";
pub const EVENT: &str = "ruin_reactivity::event";
}
mod cell;
mod effect;
mod event;

View File

@@ -7,7 +7,7 @@ use std::rc::{Rc, Weak};
use ruin_runtime::queue_microtask;
use crate::NodeId;
use crate::{NodeId, trace_targets};
type Job = Box<dyn FnOnce() + 'static>;
@@ -72,20 +72,37 @@ pub struct Reactor {
impl Reactor {
/// Creates a new empty reactor.
pub fn new() -> Self {
Self {
let reactor = Self {
inner: Rc::new(ReactorInner::new()),
}
};
tracing::debug!(
target: trace_targets::GRAPH,
event = "reactor_new",
"created reactive reactor"
);
reactor
}
/// Returns the current thread's default reactor.
pub fn current() -> Self {
CURRENT_REACTOR.with(|slot| {
if let Some(inner) = slot.borrow().upgrade() {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "current_reactor_reuse",
"reusing current thread default reactor"
);
return Self { inner };
}
let reactor = Self::new();
*slot.borrow_mut() = Rc::downgrade(&reactor.inner);
tracing::debug!(
target: trace_targets::GRAPH,
event = "current_reactor_install",
"installed current thread default reactor"
);
reactor
})
}
@@ -96,6 +113,12 @@ impl Reactor {
/// [`observe`](Self::observe) made while `f` executes will become the observer's new
/// dependencies.
pub fn run_in_context<T>(&self, observer: NodeId, f: impl FnOnce() -> T) -> T {
let _span = tracing::debug_span!(
target: trace_targets::GRAPH,
"reactor.run_in_context",
observer_id = observer.0
)
.entered();
self.clear_observer_dependencies(observer);
self.inner.stack.borrow_mut().push(observer);
let inserted = self.inner.active_computations.borrow_mut().insert(observer);
@@ -140,6 +163,13 @@ impl Reactor {
.expect("active computation should appear in observer stack");
let mut cycle = stack[start..].to_vec();
cycle.push(observable);
tracing::debug!(
target: trace_targets::GRAPH,
event = "cycle_detected",
observable_id = observable.0,
cycle_len = cycle.len(),
"reactive cycle detected"
);
panic_any(ReactCycleError::new(cycle));
}
@@ -148,6 +178,15 @@ impl Reactor {
return;
};
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "observe",
observer_id = observer.0,
observable_id = observable.0,
"recording reactive dependency"
);
self.inner
.dependencies
.borrow_mut()
@@ -172,6 +211,15 @@ impl Reactor {
.map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
.unwrap_or_default();
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "trigger",
observable_id = observable.0,
dependent_count = dependents.len(),
"triggering reactive dependents"
);
for dependent in dependents {
let hook = self
.inner
@@ -190,6 +238,12 @@ impl Reactor {
/// Disposes all graph bookkeeping for `node`.
pub fn dispose(&self, node: NodeId) {
tracing::debug!(
target: trace_targets::GRAPH,
event = "dispose_node",
node_id = node.0,
"disposing reactive node bookkeeping"
);
self.clear_observer_dependencies(node);
let incoming = self
@@ -218,13 +272,28 @@ impl Reactor {
.pending_jobs
.borrow_mut()
.push_back(Box::new(job));
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "schedule_job",
pending_jobs = self.inner.pending_jobs.borrow().len(),
"queued reactive job for microtask flush"
);
self.inner.ensure_flush_scheduled();
}
pub(crate) fn allocate_node(&self) -> NodeId {
let raw = self.inner.next_node.get();
self.inner.next_node.set(raw.wrapping_add(1));
NodeId::new(raw)
let id = NodeId::new(raw);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "allocate_node",
node_id = id.0,
"allocated reactive node id"
);
id
}
pub(crate) fn register_observer(&self, id: NodeId, observer: Rc<dyn ObserverHook>) {
@@ -303,6 +372,13 @@ impl ReactorInner {
return;
}
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "schedule_flush",
pending_jobs = self.pending_jobs.borrow().len(),
"scheduling reactive microtask flush"
);
let reactor = Rc::clone(self);
queue_microtask(move || {
reactor.flush_jobs();
@@ -310,11 +386,23 @@ impl ReactorInner {
}
fn flush_jobs(self: Rc<Self>) {
let _span = tracing::debug_span!(
target: trace_targets::GRAPH,
"reactor.flush_jobs"
)
.entered();
loop {
let job = self.pending_jobs.borrow_mut().pop_front();
let Some(job) = job else {
break;
};
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::GRAPH,
event = "run_job",
remaining_jobs = self.pending_jobs.borrow().len(),
"running reactive scheduled job"
);
job();
}

View File

@@ -2,7 +2,7 @@ use std::cell::{Cell, RefCell};
use std::rc::Rc;
use crate::reactor::ObserverHook;
use crate::{NodeId, Reactor, current};
use crate::{NodeId, Reactor, current, trace_targets};
type ComputeFn<T> = dyn Fn() -> T + 'static;
type EqualsFn<T> = dyn Fn(&T, &T) -> bool + 'static;
@@ -93,11 +93,24 @@ impl<T: 'static> Thunk<T> {
let observer: Rc<dyn ObserverHook> = inner.clone();
reactor.register_observer(id, observer);
tracing::debug!(
target: trace_targets::THUNK,
event = "create_thunk",
node_id = id.0,
"created reactive thunk"
);
Self { inner }
}
/// Runs `f` with a shared reference to the current computed value.
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::THUNK,
event = "read_thunk",
node_id = self.inner.id.0,
"reading thunk value"
);
self.inner.reactor.observe(self.inner.id);
self.inner.ensure_value();
let value = self.inner.value.borrow();
@@ -132,11 +145,24 @@ impl<T: 'static> Memo<T> {
let observer: Rc<dyn ObserverHook> = inner.clone();
reactor.register_observer(id, observer);
tracing::debug!(
target: trace_targets::MEMO,
event = "create_memo",
node_id = id.0,
"created reactive memo"
);
Self { inner }
}
/// Runs `f` with a shared reference to the current computed value.
pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::MEMO,
event = "read_memo",
node_id = self.inner.id.0,
"reading memo value"
);
self.inner.reactor.observe(self.inner.id);
self.inner.ensure_value();
let value = self.inner.value.borrow();
@@ -167,6 +193,12 @@ impl<T> ThunkInner<T> {
return;
}
let _span = tracing::debug_span!(
target: trace_targets::THUNK,
"thunk.recompute",
node_id = self.id.0
)
.entered();
let next = self.reactor.run_in_context(self.id, || (self.compute)());
*self.value.borrow_mut() = Some(next);
self.dirty.set(false);
@@ -191,6 +223,12 @@ impl<T> MemoInner<T> {
}
fn recompute(&self) -> bool {
let _span = tracing::debug_span!(
target: trace_targets::MEMO,
"memo.recompute",
node_id = self.id.0
)
.entered();
let next = self.reactor.run_in_context(self.id, || (self.compute)());
let mut value = self.value.borrow_mut();
let changed = match value.as_ref() {
@@ -199,13 +237,29 @@ impl<T> MemoInner<T> {
};
*value = Some(next);
self.dirty.set(false);
tracing::debug!(
target: trace_targets::MEMO,
event = "memo_recompute",
node_id = self.id.0,
changed,
"recomputed memo"
);
changed
}
}
impl<T: 'static> ObserverHook for ThunkInner<T> {
fn notify(&self) {
if self.dirty.replace(true) {
let already_dirty = self.dirty.replace(true);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::THUNK,
event = "invalidate_thunk",
node_id = self.id.0,
already_dirty,
"invalidating thunk"
);
if already_dirty {
return;
}
@@ -225,11 +279,29 @@ impl<T: 'static> ObserverHook for MemoInner<T> {
fn notify(&self) {
if self.value.borrow().is_none() {
self.dirty.set(true);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::MEMO,
event = "invalidate_memo",
node_id = self.id.0,
eagerly_recomputed = false,
"marked uninitialized memo dirty"
);
return;
}
self.dirty.set(true);
if self.recompute() {
let changed = self.recompute();
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::MEMO,
event = "invalidate_memo",
node_id = self.id.0,
eagerly_recomputed = true,
changed,
"invalidated memo"
);
if changed {
self.reactor.trigger(self.id);
}
}

View File

@@ -7,6 +7,7 @@ edition = "2024"
hyper = { version = "1.8", default-features = false, features = ["client", "http1"] }
libc = "0.2"
ruin_runtime_proc_macros = { package = "ruin-runtime-proc-macros", path = "../runtime_proc_macros" }
tracing = { version = "0.1", default-features = false, features = ["std"] }
[dev-dependencies]
bytes = "1"

View File

@@ -25,6 +25,14 @@ compile_error!("ruin-runtime currently supports only Linux x86_64.");
extern crate alloc;
pub(crate) mod trace_targets {
pub const DRIVER: &str = "ruin_runtime::driver";
pub const RUNTIME: &str = "ruin_runtime::runtime";
pub const SCHEDULER: &str = "ruin_runtime::scheduler";
pub const TIMER: &str = "ruin_runtime::timer";
pub const ASYNC: &str = "ruin_runtime::async";
}
pub mod channel;
pub mod fs;
pub mod net;

View File

@@ -10,6 +10,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use super::uring::{IORING_OP_ASYNC_CANCEL, IoUring, IoUringCqe, IoUringSqe};
use crate::trace_targets;
const WAKE_TARGET_TOKEN: u64 = 1;
const TOKEN_KIND_SHIFT: u64 = 56;
@@ -34,6 +35,13 @@ struct NotifierInner {
impl NotifierInner {
fn notify(&self) -> io::Result<()> {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::DRIVER,
event = "notify",
ring_fd = self.ring_fd,
"sending cross-thread driver wake"
);
if self.closed.load(Ordering::Acquire) {
return Err(io::Error::new(
io::ErrorKind::BrokenPipe,
@@ -96,6 +104,12 @@ pub fn create() -> io::Result<(Driver, ThreadNotifier)> {
/// emphasize driver construction.
pub fn create_driver() -> io::Result<(Driver, ThreadNotifier)> {
let ring = IoUring::new(64)?;
tracing::debug!(
target: trace_targets::DRIVER,
event = "create_driver",
ring_fd = ring.ring_fd(),
"created runtime driver"
);
let notifier = Arc::new(NotifierInner {
ring_fd: ring.ring_fd(),
closed: AtomicBool::new(false),
@@ -130,11 +144,27 @@ impl Driver {
let saw_any = self
.ring
.drain_completions(|cqe| self.process_cqe(cqe, &mut ready));
#[cfg(debug_assertions)]
if saw_any {
tracing::trace!(
target: trace_targets::DRIVER,
event = "poll_ready",
timer_ready = ready.timer,
wake_ready = ready.wake,
"driver poll produced ready events"
);
}
if saw_any { Ok(Some(ready)) } else { Ok(None) }
}
/// Blocks until at least one completion is available.
pub fn wait(&self) -> io::Result<()> {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::DRIVER,
event = "wait",
"waiting for driver completion"
);
self.ring.wait_for_cqe()
}
@@ -142,6 +172,13 @@ impl Driver {
///
/// Passing `None` removes any active timer.
pub fn rearm_timer(&self, deadline: Option<Duration>) -> io::Result<()> {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "rearm_timer",
deadline_ns = deadline.map(|value| value.as_nanos() as u64),
"rearming driver timer"
);
match (self.active_timer_token.get(), deadline) {
(Some(active), Some(deadline)) => {
self.ring.submit_timeout_update(active, deadline)?;
@@ -168,6 +205,13 @@ impl Driver {
on_complete: impl FnOnce(IoUringCqe) + Send + 'static,
) -> io::Result<u64> {
let token = self.next_token(CompletionKind::Operation);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::ASYNC,
event = "submit_operation",
token,
"submitting async driver operation"
);
self.completions
.borrow_mut()
.insert(token, Box::new(on_complete));
@@ -181,6 +225,13 @@ impl Driver {
}
pub(crate) fn cancel_operation(&self, token: u64) -> io::Result<()> {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::ASYNC,
event = "cancel_operation",
token,
"submitting async driver cancellation"
);
self.ring
.submit_with_token(self.next_token(CompletionKind::OperationCancel), |sqe| {
sqe.opcode = IORING_OP_ASYNC_CANCEL;
@@ -216,6 +267,14 @@ impl Driver {
}
fn process_cqe(&self, cqe: IoUringCqe, ready: &mut ReadyEvents) {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::DRIVER,
event = "process_cqe",
user_data = cqe.user_data,
result = cqe.res,
"processing io_uring completion"
);
if cqe.user_data == WAKE_TARGET_TOKEN {
ready.wake = true;
let wakes = cqe.res.max(1) as u64;
@@ -256,6 +315,11 @@ impl Driver {
impl Drop for Driver {
fn drop(&mut self) {
tracing::debug!(
target: trace_targets::DRIVER,
event = "drop_driver",
"dropping runtime driver"
);
self.notifier.closed.store(true, Ordering::Release);
}
}

View File

@@ -12,6 +12,7 @@ use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
use super::driver::{Driver, ThreadNotifier, create_driver, monotonic_now};
use crate::trace_targets;
type LocalTask = Box<dyn FnOnce() + 'static>;
type SendTask = Box<dyn FnOnce() + Send + 'static>;
@@ -95,6 +96,13 @@ pub fn queue_task<F>(task: F)
where
F: FnOnce() + 'static,
{
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "queue_task",
queue = "local_macro",
"queueing local macrotask"
);
push_local_macrotask(Box::new(task));
}
@@ -110,6 +118,13 @@ pub fn queue_microtask<F>(task: F)
where
F: FnOnce() + 'static,
{
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "queue_microtask",
queue = "local_micro",
"queueing local microtask"
);
current_thread()
.local_microtasks
.borrow_mut()
@@ -128,6 +143,15 @@ where
let owner = current_thread_ptr();
let id = allocate_timer_id();
let deadline = deadline_from_now(delay);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "set_timeout",
timer_id = id,
delay_ns = delay.as_nanos() as u64,
deadline_ns = deadline.as_nanos() as u64,
"scheduling timeout"
);
let timer = TimerNode::timeout(id, deadline, Box::new(callback));
current_thread().timers.borrow_mut().insert(timer);
@@ -146,6 +170,13 @@ where
///
/// Panics if called from a different runtime thread than the one that created `handle`.
pub fn clear_timeout(handle: &TimeoutHandle) {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "clear_timeout",
timer_id = handle.id,
"clearing timeout"
);
clear_timer(handle.owner, handle.id);
}
@@ -163,6 +194,15 @@ where
let owner = current_thread_ptr();
let id = allocate_timer_id();
let deadline = deadline_from_now(delay);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "set_interval",
timer_id = id,
delay_ns = delay.as_nanos() as u64,
deadline_ns = deadline.as_nanos() as u64,
"scheduling interval"
);
let timer = TimerNode::interval(
id,
deadline,
@@ -186,6 +226,13 @@ where
///
/// Panics if called from a different runtime thread than the one that created `handle`.
pub fn clear_interval(handle: &IntervalHandle) {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "clear_interval",
timer_id = handle.id,
"clearing interval"
);
clear_timer(handle.owner, handle.id);
}
@@ -212,6 +259,12 @@ where
F: Future + 'static,
F::Output: 'static,
{
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::ASYNC,
event = "queue_future",
"queueing local future"
);
let state = Rc::new(JoinState::new());
let completion = Rc::clone(&state);
let task = Rc::new(FutureTask {
@@ -240,6 +293,11 @@ where
Init: FnOnce() + Send + 'static,
Exit: FnOnce() + 'static,
{
tracing::debug!(
target: trace_targets::RUNTIME,
event = "spawn_worker",
"spawning runtime worker thread"
);
let parent = current_thread();
let (driver, notifier) = create_driver().expect("worker driver should initialize");
let shared = Arc::new(ThreadShared::new(notifier));
@@ -281,6 +339,16 @@ where
///
/// Panics if runtime initialization fails or if the underlying driver returns an unexpected error.
pub fn run() {
let _span = tracing::debug_span!(
target: trace_targets::RUNTIME,
"runtime.run"
)
.entered();
tracing::debug!(
target: trace_targets::RUNTIME,
event = "run_enter",
"entering runtime event loop"
);
let _ = current_thread();
loop {
@@ -316,6 +384,15 @@ pub fn run() {
if has_pending_timers() || state.has_live_children() || state.has_live_async_operations() {
state.shared.closing.store(false, Ordering::Release);
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::RUNTIME,
event = "run_wait",
pending_timers = has_pending_timers(),
live_children = state.has_live_children(),
live_async = state.has_live_async_operations(),
"runtime waiting for more work"
);
state.driver.wait().expect("driver wait should succeed");
continue;
}
@@ -327,6 +404,11 @@ pub fn run() {
state.shared.closed.store(true, Ordering::Release);
state.shared.notify();
tracing::debug!(
target: trace_targets::RUNTIME,
event = "run_exit",
"runtime event loop exiting"
);
teardown_thread();
return;
}
@@ -351,7 +433,16 @@ impl ThreadHandle {
where
F: FnOnce() + Send + 'static,
{
self.shared.enqueue_macro(Box::new(task))
let queued = self.shared.enqueue_macro(Box::new(task));
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "queue_remote_task",
queue = "remote_macro",
queued,
"queueing remote macrotask"
);
queued
}
/// Queues a microtask onto this runtime thread.
@@ -361,7 +452,16 @@ impl ThreadHandle {
where
F: FnOnce() + Send + 'static,
{
self.shared.enqueue_micro(Box::new(task))
let queued = self.shared.enqueue_micro(Box::new(task));
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::SCHEDULER,
event = "queue_remote_microtask",
queue = "remote_micro",
queued,
"queueing remote microtask"
);
queued
}
/// Returns `true` if the target runtime thread has shut down.
@@ -859,12 +959,24 @@ fn drain_driver_events() {
let state = current_thread();
if ready.wake {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::DRIVER,
event = "drain_wake",
"draining driver wake notifications"
);
let _ = state
.driver
.drain_wake()
.expect("wake drain should succeed");
}
if ready.timer {
#[cfg(debug_assertions)]
tracing::trace!(
target: trace_targets::TIMER,
event = "drain_timer",
"draining expired runtime timers"
);
let _ = state
.driver
.drain_timer()