Files
ruin/lib/ruin_app/example/05_async_runtime_io.rs
2026-03-22 02:30:11 -04:00

640 lines
24 KiB
Rust

use std::io::{Read as _, Write as _};
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use ruin_app::prelude::*;
const SNAPSHOT_PATH: &str = "target/ruin-example05-manifest-snapshot.toml";
#[ruin_runtime::async_main]
async fn main() -> ruin_app::Result<()> {
let demo_server_addr = spawn_demo_server()?;
App::new()
.window(
Window::new()
.title("RUIN Async Runtime I/O")
.app_id("dev.ruin.async-runtime-io")
.size(1280.0, 820.0),
)
.mount(view! {
RuntimeIoExample(server_addr = demo_server_addr) {}
})
.run()
.await
}
#[component]
fn RuntimeIoExample(server_addr: SocketAddr) -> impl IntoView {
let manifest_path = use_signal(|| "Cargo.lock".to_string());
let manifest_reload = use_signal(|| 0_u64);
let manifest_scroll = use_signal(|| 0.0_f32);
let endpoint = use_signal(|| DemoEndpoint::Release);
let endpoint_reload = use_signal(|| 0_u64);
let response_scroll = use_signal(|| 0.0_f32);
let save_in_flight = use_signal(|| false);
let save_status = use_signal(|| Some(format!("Snapshot target: {SNAPSHOT_PATH}")));
let manifest = use_resource({
let manifest_path = manifest_path.clone();
let manifest_reload = manifest_reload.clone();
move || {
let path = manifest_path.get();
let _ = manifest_reload.get();
async move {
ruin_runtime::fs::read_to_string(&path)
.await
.map_err(|error| error.to_string())
}
}
});
let response = use_resource({
let endpoint = endpoint.clone();
let endpoint_reload = endpoint_reload.clone();
move || {
let route = endpoint.get();
let _ = endpoint_reload.get();
async move { fetch_demo_endpoint(server_addr, route).await }
}
});
use_effect({
let manifest_path = manifest_path.clone();
move || {
eprintln!(
"example05: manifest path changed -> {}",
manifest_path.get()
);
}
});
use_effect({
let endpoint = endpoint.clone();
move || {
eprintln!(
"example05: demo endpoint changed -> {}",
endpoint.get().path()
);
}
});
use_effect({
let manifest_path = manifest_path.clone();
let manifest_reload = manifest_reload.clone();
let manifest_scroll = manifest_scroll.clone();
move || {
let _ = manifest_path.get();
let _ = manifest_reload.get();
let _ = manifest_scroll.set(0.0);
}
});
use_effect({
let endpoint = endpoint.clone();
let endpoint_reload = endpoint_reload.clone();
let response_scroll = response_scroll.clone();
move || {
let _ = endpoint.get();
let _ = endpoint_reload.get();
let _ = response_scroll.set(0.0);
}
});
let manifest_state = manifest.read();
let response_state = response.read();
view! {
row(gap = 20.0, padding = 20.0) {
block(
flex = 1.0,
padding = 18.0,
gap = 14.0,
background = surfaces::raised(),
border_radius = 16.0,
) {
text(role = TextRole::Heading(1), size = 28.0, weight = FontWeight::Semibold) {
"Filesystem"
}
text(color = colors::muted(), wrap = TextWrap::Word) {
"Real async file reads use `ruin_runtime::fs`, while the snapshot button writes \
the current contents back through the runtime on a background future."
}
row(gap = 8.0) {
button(
on_press = {
let manifest_path = manifest_path.clone();
move |_| {
let _ = manifest_path.set("Cargo.toml".to_string());
}
},
) {
"Cargo.toml"
}
button(
on_press = {
let manifest_path = manifest_path.clone();
move |_| {
let _ = manifest_path.set("Cargo.lock".to_string());
}
},
) {
"Cargo.lock"
}
button(
on_press = {
let manifest_path = manifest_path.clone();
move |_| {
let _ = manifest_path.set("missing-file.toml".to_string());
}
},
) {
"Missing file"
}
button(
on_press = {
let manifest_reload = manifest_reload.clone();
move |_| {
manifest_reload.update(|value| *value += 1);
}
},
) {
"Reload"
}
}
text(size = 16.0, weight = FontWeight::Semibold) {
"Path: "; manifest_path.clone()
}
match manifest_state {
Pending => view! {
block(
padding = 14.0,
background = surfaces::canvas(),
border_radius = 12.0,
) {
text(color = colors::muted()) { "Reading file through the runtime..." }
}
},
Ready(Ok(contents)) => view! {
column(gap = 12.0) {
row(gap = 8.0) {
button(
on_press = {
let manifest = manifest.clone();
let save_in_flight = save_in_flight.clone();
let save_status = save_status.clone();
move |_| {
if save_in_flight.get() {
return;
}
let current_manifest = match manifest.read() {
Ready(Ok(contents)) => contents,
Pending => {
let _ = save_status.set(Some(
"Manifest is still loading; try again in a moment."
.to_string(),
));
return;
}
Ready(Err(error)) => {
let _ = save_status.set(Some(format!(
"Cannot snapshot the current manifest: {error}"
)));
return;
}
};
let _ = save_in_flight.set(true);
let _ = save_status
.set(Some("Saving snapshot through ruin_runtime::fs...".to_string()));
std::mem::drop(ruin_runtime::queue_future({
let save_in_flight = save_in_flight.clone();
let save_status = save_status.clone();
async move {
let result = ruin_runtime::fs::write(
SNAPSHOT_PATH,
current_manifest,
)
.await;
let _ = save_in_flight.set(false);
let completed_at = timestamp_label();
let message = match result {
Ok(()) => format!(
"Saved snapshot to {SNAPSHOT_PATH} at {completed_at}"
),
Err(error) => {
format!(
"Snapshot write failed at {completed_at}: {error}"
)
}
};
eprintln!("example05: {message}");
let _ = save_status.set(Some(message));
}
}));
}
},
) {
if save_in_flight.get() {
"Saving snapshot..."
} else {
"Save snapshot"
}
}
}
text(color = colors::muted()) {
"Loaded "; contents.len(); " bytes."
}
if let Some(status) = save_status.get() {
view! {
text(
color = if status.contains("failed") {
colors::danger()
} else {
colors::muted()
},
wrap = TextWrap::Word,
) {
status
}
}
} else {
View::from_element(Element::column())
}
scroll_box(
height = 500.0,
offset_y = manifest_scroll.clone(),
padding = 12.0,
background = surfaces::canvas(),
border_radius = 12.0,
border = (2.0, colors::muted()),
) {
text(
color = colors::muted(),
font_family = TextFontFamily::Monospace,
) {
contents
}
}
}
},
Ready(Err(error)) => view! {
block(
padding = 14.0,
gap = 8.0,
background = surfaces::canvas(),
border_radius = 12.0,
border = (2.0, colors::danger()),
) {
text(size = 18.0, weight = FontWeight::Semibold, color = colors::danger()) {
"Failed to read file"
}
text(color = colors::muted(), wrap = TextWrap::Word) { error }
}
},
}
}
block(
flex = 1.0,
padding = 18.0,
gap = 14.0,
background = surfaces::raised(),
border_radius = 16.0,
) {
text(role = TextRole::Heading(1), size = 28.0, weight = FontWeight::Semibold) {
"Network"
}
text(color = colors::muted(), wrap = TextWrap::Word) {
"This panel talks to a tiny local HTTP server over `ruin_runtime::net::TcpStream`. \
It keeps the example truthful without pretending TLS, a JSON client, or text \
input already exist."
}
row(gap = 8.0) {
button(
on_press = {
let endpoint = endpoint.clone();
move |_| {
let _ = endpoint.set(DemoEndpoint::Release);
}
},
) {
"Release"
}
button(
on_press = {
let endpoint = endpoint.clone();
move |_| {
let _ = endpoint.set(DemoEndpoint::Health);
}
},
) {
"Health"
}
button(
on_press = {
let endpoint = endpoint.clone();
move |_| {
let _ = endpoint.set(DemoEndpoint::Notes);
}
},
) {
"Notes"
}
button(
on_press = {
let endpoint = endpoint.clone();
move |_| {
let _ = endpoint.set(DemoEndpoint::Missing);
}
},
) {
"404"
}
button(
on_press = {
let endpoint_reload = endpoint_reload.clone();
move |_| {
endpoint_reload.update(|value| *value += 1);
}
},
) {
"Reload"
}
}
text(size = 16.0, weight = FontWeight::Semibold) {
"Endpoint: "; endpoint.get().path()
}
match response_state {
Pending => view! {
block(
padding = 14.0,
background = surfaces::canvas(),
border_radius = 12.0,
) {
text(color = colors::muted()) {
"Fetching a local HTTP response through the runtime..."
}
}
},
Ready(Ok(response)) => view! {
column(gap = 12.0) {
text(size = 18.0, weight = FontWeight::Semibold) {
response.status_line.clone()
}
text(color = colors::muted(), wrap = TextWrap::Word) {
"Received "; response.body.len(); " body bytes from ";
response.path.clone(); "."
}
if let Some(content_type) = response.content_type.clone() {
view! {
text(color = colors::muted()) {
"content-type: "; content_type
}
}
} else {
View::from_element(Element::column())
}
scroll_box(
height = 500.0,
offset_y = response_scroll.clone(),
padding = 12.0,
background = surfaces::canvas(),
border_radius = 12.0,
border = (2.0, colors::muted()),
) {
text(
color = colors::muted(),
font_family = TextFontFamily::Monospace,
wrap = TextWrap::Word,
) {
response.body
}
}
}
},
Ready(Err(error)) => view! {
block(
padding = 14.0,
gap = 8.0,
background = surfaces::canvas(),
border_radius = 12.0,
border = (2.0, colors::danger()),
) {
text(size = 18.0, weight = FontWeight::Semibold, color = colors::danger()) {
"Request failed"
}
text(color = colors::muted(), wrap = TextWrap::Word) { error }
}
},
}
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum DemoEndpoint {
Release,
Health,
Notes,
Missing,
}
impl DemoEndpoint {
fn path(self) -> &'static str {
match self {
Self::Release => "/release",
Self::Health => "/health",
Self::Notes => "/notes",
Self::Missing => "/missing",
}
}
}
#[derive(Clone, Debug)]
struct DemoResponse {
path: String,
status_line: String,
content_type: Option<String>,
body: String,
}
fn spawn_demo_server() -> std::io::Result<SocketAddr> {
let listener = StdTcpListener::bind(("127.0.0.1", 0))?;
let address = listener.local_addr()?;
thread::Builder::new()
.name("ruin-example05-demo-server".into())
.spawn(move || {
for stream in listener.incoming() {
let Ok(mut stream) = stream else {
break;
};
let mut request = [0_u8; 4096];
let read = match stream.read(&mut request) {
Ok(read) => read,
Err(_) => continue,
};
let request = String::from_utf8_lossy(&request[..read]);
let path = request
.lines()
.next()
.and_then(|line| line.split_whitespace().nth(1))
.unwrap_or("/");
let (status, body) = demo_response_for_path(path);
let response = format!(
"HTTP/1.1 {status}\r\ncontent-type: text/plain; charset=utf-8\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes());
}
})
.map_err(std::io::Error::other)?;
Ok(address)
}
fn demo_response_for_path(path: &str) -> (&'static str, String) {
match path {
"/release" => (
"200 OK",
"release = v0.1.0\npublished_at = 2026-03-22T00:00:00Z\nnotes = honest runtime IO demo backed by ruin_runtime".to_string(),
),
"/health" => (
"200 OK",
"status = ok\nreactor = draining microtasks\nio = idle".to_string(),
),
"/notes" => (
"200 OK",
"The network pane is intentionally local and plaintext.\nIt still exercises ruin_runtime::net::TcpStream end-to-end,\nbut it does not pretend that TLS, a JSON client, or text inputs are finished yet.".to_string(),
),
_ => (
"404 Not Found",
format!("No demo route is registered for {path}. Try /release, /health, or /notes."),
),
}
}
fn timestamp_label() -> String {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => {
let seconds = duration.as_secs();
let millis = duration.subsec_millis();
format!("{seconds}.{millis:03}s since unix epoch")
}
Err(_) => "time went backwards".to_string(),
}
}
async fn fetch_demo_endpoint(
address: SocketAddr,
endpoint: DemoEndpoint,
) -> std::result::Result<DemoResponse, String> {
let path = endpoint.path().to_string();
let mut stream = ruin_runtime::net::TcpStream::connect(address)
.await
.map_err(|error| error.to_string())?;
let request = format!("GET {path} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n");
stream
.write_all(request.as_bytes())
.await
.map_err(|error| error.to_string())?;
let mut bytes = Vec::new();
let mut chunk = [0_u8; 4096];
loop {
let read = stream
.read(&mut chunk)
.await
.map_err(|error| error.to_string())?;
if read == 0 {
break;
}
bytes.extend_from_slice(&chunk[..read]);
}
parse_http_response(&path, &bytes)
}
fn parse_http_response(path: &str, bytes: &[u8]) -> std::result::Result<DemoResponse, String> {
let text = String::from_utf8(bytes.to_vec()).map_err(|error| error.to_string())?;
let Some((head, body)) = text.split_once("\r\n\r\n") else {
return Err("demo server returned a malformed HTTP response".to_string());
};
let mut lines = head.lines();
let status_line = lines
.next()
.ok_or_else(|| "demo server returned an empty status line".to_string())?
.to_string();
let content_type = lines.find_map(|line| {
line.split_once(':').and_then(|(name, value)| {
name.eq_ignore_ascii_case("content-type")
.then(|| value.trim().to_string())
})
});
if !status_line.contains("200") {
return Err(format!("{status_line}\n\n{body}"));
}
Ok(DemoResponse {
path: path.to_string(),
status_line,
content_type,
body: body.to_string(),
})
}
#[cfg(test)]
mod tests {
use super::{DemoEndpoint, parse_http_response};
#[test]
fn parse_http_response_extracts_status_body_and_content_type() {
let parsed = parse_http_response(
DemoEndpoint::Release.path(),
b"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\nconnection: close\r\n\r\nhello",
)
.expect("response should parse");
assert_eq!(parsed.status_line, "HTTP/1.1 200 OK");
assert_eq!(parsed.content_type.as_deref(), Some("text/plain"));
assert_eq!(parsed.body, "hello");
}
#[test]
fn parse_http_response_surfaces_non_success_statuses_as_errors() {
let error = parse_http_response(
DemoEndpoint::Missing.path(),
b"HTTP/1.1 404 Not Found\r\ncontent-type: text/plain\r\n\r\nmissing",
)
.expect_err("non-200 responses should surface as errors");
assert!(error.contains("404 Not Found"));
assert!(error.contains("missing"));
}
}