From 14ec58318b99aab747b3da2530beeaf1b41c60fb Mon Sep 17 00:00:00 2001 From: gramps Date: Fri, 10 Apr 2026 13:42:39 -0700 Subject: [PATCH] feat: resident runtime, shutdown command, observatory, and IPL logging hardening - keep BEDS resident after IPL and coordinate clean shutdown - propagate AMQP shutdown command across dispatcher pool - add structured IPL milestone/event-chain logging with root GUID context - add optional trace_on config for verbose method-entry diagnostics - add dev purge-on-IPL controls for admin/logger collections - add log level showcase events after IPL node-green - add Mongo logger store helpers for chain/root lookup and purge - add/modernize BEDS Observatory log_dumper utility UI and root record view - refresh source headers and wiki docs for current architecture/runtime - add architecture visual brief for leadership/image-generation workflows --- Cargo.lock | 290 ++++++++++ Cargo.toml | 5 +- src/bin/log_dumper.rs | 662 ++++++++++++++++++++++ src/brokers/dispatcher.rs | 35 +- src/brokers/logger_store.rs | 106 ++++ src/brokers/mod.rs | 5 + src/config/structs.rs | 19 + src/main.rs | 313 +++++++++- wiki/04-ipl.md | 13 + wiki/06-queue-topology.md | 12 + wiki/10-modernization-roadmap.md | 4 + wiki/11-beds-architecture-visual-brief.md | 166 ++++++ wiki/Home.md | 1 + 13 files changed, 1618 insertions(+), 13 deletions(-) create mode 100644 src/bin/log_dumper.rs create mode 100644 wiki/11-beds-architecture-visual-brief.md diff --git a/Cargo.lock b/Cargo.lock index 3da47a2..502e677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,15 @@ dependencies = [ "url", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -290,6 +299,58 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "base64" version = "0.21.7" @@ -426,6 +487,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chrono" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +dependencies = [ + "iana-time-zone", + "num-traits", + "windows-link", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1157,6 +1229,110 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -1490,6 +1666,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "md-5" version = "0.10.6" @@ -1506,6 +1688,12 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2246,6 +2434,8 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" name = "rustybeds" version = "0.1.0" dependencies = [ + "axum", + "chrono", "config", "futures-lite 2.6.1", "lapin", @@ -2258,8 +2448,15 @@ dependencies = [ "tracing", "tracing-journald", "tracing-subscriber", + "uuid", ] +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "salsa20" version = "0.10.2" @@ -2378,6 +2575,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -2387,6 +2595,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "3.18.0" @@ -2547,6 +2767,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -2795,12 +3021,41 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3132,6 +3387,41 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 8ca4e46..9fad3a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,7 @@ tracing-journald = "0.3" mongodb = { version = "3", features = ["sync"] } futures-lite = "2" lapin = "2" -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } +axum = "0.8" +chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } +uuid = { version = "1", features = ["v4"] } \ No newline at end of file diff --git a/src/bin/log_dumper.rs b/src/bin/log_dumper.rs new file mode 100644 index 0000000..31508b5 --- /dev/null +++ b/src/bin/log_dumper.rs @@ -0,0 +1,662 @@ +//! # bin/log_dumper.rs — BEDS Observatory Utility +//! +//! Development-facing web utility for inspecting logger records in `msLogs`. +//! Provides quick filtering, root GUID tracing, single-record view, and a +//! seed-write action for validating logger storage paths. +//! +//! ## Routes +//! - `/` — main observatory page +//! - `/record` — single root record display by `id` +//! - `/health` — utility health endpoint +//! +//! ## Notes +//! - Reads config via the same loader as the main BEDS runtime +//! - Initializes logger store at startup +//! - Intended for development and diagnostics workflows + +use std::net::SocketAddr; + +use axum::{ + extract::Query, + http::StatusCode, + response::{Html, IntoResponse}, + routing::get, + Router, +}; + use chrono::{Local, TimeZone}; +use serde::Deserialize; + +use rustybeds::{brokers::logger_store, config}; + +#[derive(Debug, Deserialize)] +struct DumpQuery { + lines: Option, + source: Option, + action: Option, + root: Option, +} + +#[derive(Debug, Deserialize)] +struct RecordQuery { + id: Option, +} + +#[tokio::main] +async fn main() { + if let Err(e) = run().await { + eprintln!("[LOG_DUMPER] fatal: {}", e); + std::process::exit(1); + } +} + +async fn run() -> Result<(), String> { + let cfg = config::load().map_err(|e| format!("config load failed: {}", e))?; + + logger_store::init_from_rec_services(&cfg.rec_services, &cfg.id.env_name) + .await + .map_err(|e| format!("logger store init failed: {}", e))?; + + let app = Router::new() + .route("/", get(index)) + .route("/record", get(record_view)) + .route("/health", get(health)); + + let addr: SocketAddr = "127.0.0.1:8088" + .parse() + .map_err(|e| format!("invalid bind address: {}", e))?; + + println!("[LOG_DUMPER] running at http://{}/", addr); + + let listener = tokio::net::TcpListener::bind(addr) + .await + .map_err(|e| format!("bind failed: {}", e))?; + + axum::serve(listener, app) + .await + .map_err(|e| format!("serve failed: {}", e))?; + + Ok(()) +} + +async fn health() -> impl IntoResponse { + (StatusCode::OK, "ok") +} + +async fn record_view(Query(query): Query) -> impl IntoResponse { + let root_id = query.id.unwrap_or_default(); + + if root_id.trim().is_empty() { + return Html("

Missing root event id

Back to observatory

".to_string()); + } + + let content = match logger_store::fetch_root_record(&root_id).await { + Ok(Some(row)) => { + let pretty = serde_json::to_string_pretty(&row).unwrap_or_else(|_| "{}".to_string()); + format!( + "

Root Event Record

event_id: {}

{}
", + escape_html(&root_id), + escape_html(&pretty) + ) + } + Ok(None) => format!( + "

No root record found

event_id: {}

", + escape_html(&root_id) + ), + Err(e) => format!("

Fetch failed

{}

", escape_html(&e)), + }; + + let page = format!( + "Root Event", + content + ); + + Html(page) +} + +async fn index(Query(query): Query) -> impl IntoResponse { + let mut lines = query.lines.unwrap_or(100); + if lines == 0 { + lines = 100; + } + if lines > 1000 { + lines = 1000; + } + + let source = query.source.unwrap_or_else(|| "Logs".to_string()); + let root_filter = query.root.unwrap_or_default(); + + let mut banner = String::new(); + if query.action.as_deref() == Some("seed") { + let mut seed = std::collections::HashMap::new(); + seed.insert( + "service_log".to_string(), + serde_json::Value::String("log_dumper".to_string()), + ); + seed.insert( + "severity_log".to_string(), + serde_json::Value::String("INFO".to_string()), + ); + seed.insert( + "event_log".to_string(), + serde_json::Value::String("LOG_DUMPER_SEED".to_string()), + ); + seed.insert( + "message_log".to_string(), + serde_json::Value::String("Manual seed event from log dumper UI".to_string()), + ); + + match logger_store::append_log(seed).await { + Ok(token) => { + banner = format!( + "
seed write succeeded; db_token={}
", + escape_html(&token) + ) + } + Err(e) => { + banner = format!( + "
seed write failed: {}
", + escape_html(&e) + ) + } + } + } + + // Current Rust logger store is unified; keep the source selector in the UI + // for parity with the PHP utility, but only Logs is implemented right now. + let content = if source.eq_ignore_ascii_case("logs") { + let rows_result = if root_filter.trim().is_empty() { + logger_store::fetch_recent(lines).await + } else { + logger_store::fetch_chain(&root_filter, lines).await + }; + + match rows_result { + Ok(rows) => render_rows(rows), + Err(e) => format!("
fetch failed: {}
", escape_html(&e)), + } + } else { + "
Metrics source is not implemented in this Rust utility yet.
".to_string() + }; + + let html = format!( + r#" + + + + + BEDS Observatory + + + + + + +
+
+
+

BEDS Observatory

+
Real-time development diagnostics for logger events in Mongo-backed msLogs.
+
{banner}
+
+
Utility mode: parity path for legacy dumper workflow
+
+
+
+ + +
+ + + + + +
+
+
+
Source: {source}
+
Limit: {lines}
+
Rendered: {rendered_count}
+
+
Tip: expand entries to inspect full payload JSON
+
+
Source: {source} | Showing up to {lines} rows
+
+
+ {content} +
+
+
+ + +"#, + logs_checked = if source.eq_ignore_ascii_case("logs") { "checked" } else { "" }, + metrics_checked = if source.eq_ignore_ascii_case("metrics") { "checked" } else { "" }, + lines = lines, + source = escape_html(&source), + root_value = escape_html(&root_filter), + rendered_count = rendered_count(&content), + banner = banner, + content = content, + ); + + Html(html) +} + +fn render_rows(rows: Vec) -> String { + if rows.is_empty() { + return "
No data found in logger collection.
".to_string(); + } + + let mut out = String::new(); + for (idx, row) in rows.into_iter().enumerate() { + let pretty = serde_json::to_string_pretty(&row).unwrap_or_else(|_| "{}".to_string()); + let search = escape_html(&pretty); + let title = row + .get("severity_log") + .or_else(|| row.get("level")) + .and_then(|v| v.as_str()) + .unwrap_or("event"); + + let service = row + .get("service_log") + .or_else(|| row.get("service")) + .and_then(|v| v.as_str()) + .unwrap_or("unknown-service"); + + let created = format_created_ts( + row.get("created").or_else(|| row.get("ts")) + ); + + let event_id = row + .get("event_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let parent_id = row + .get("parent_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let depth = row + .get("depth") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + let root_id = if depth <= 0 { + event_id + } else if !parent_id.is_empty() { + parent_id + } else { + event_id + }; + + let trace_href = format!("/record?id={}", url_encode(root_id)); + + let message_snippet = row + .get("message_log") + .or_else(|| row.get("message")) + .and_then(|v| v.as_str()) + .map(snippet) + .unwrap_or_else(|| "(no message)".to_string()); + + out.push_str("
"); + out.push_str(&escape_html(title)); + out.push_str(""); + out.push_str(&escape_html(&message_snippet)); + out.push_str("
");
+        out.push_str(&escape_html(&pretty));
+    out.push_str("
"); + if !root_id.is_empty() { + out.push_str(""); + out.push_str(&escape_html(root_id)); + out.push_str(""); + } + out.push_str("
"); + } + + out +} + +fn url_encode(input: &str) -> String { + let mut out = String::new(); + for b in input.bytes() { + match b { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(char::from(b)); + } + b' ' => out.push_str("%20"), + _ => out.push_str(&format!("%{:02X}", b)), + } + } + out +} + +fn rendered_count(content: &str) -> usize { + content.match_indices("class='entry'").count() +} + +fn snippet(input: &str) -> String { + let clean = input.replace('\n', " ").replace('\r', " "); + let mut out = String::new(); + for (i, ch) in clean.chars().enumerate() { + if i >= 110 { + out.push_str("..."); + break; + } + out.push(ch); + } + out +} + +fn format_created_ts(value: Option<&serde_json::Value>) -> String { + let Some(value) = value else { + return "n/a".to_string(); + }; + + let secs_opt = value + .as_i64() + .or_else(|| value.as_u64().map(|v| v as i64)) + .or_else(|| value.as_str().and_then(|s| s.parse::().ok())); + + let Some(secs) = secs_opt else { + return value.to_string(); + }; + + match Local.timestamp_opt(secs, 0).single() { + Some(ts) => ts.format("%Y-%m-%d %H:%M:%S %Z").to_string(), + None => secs.to_string(), + } +} + +fn escape_html(input: &str) -> String { + let mut escaped = String::with_capacity(input.len()); + for ch in input.chars() { + match ch { + '&' => escaped.push_str("&"), + '<' => escaped.push_str("<"), + '>' => escaped.push_str(">"), + '"' => escaped.push_str("""), + '\'' => escaped.push_str("'"), + _ => escaped.push(ch), + } + } + escaped +} + diff --git a/src/brokers/dispatcher.rs b/src/brokers/dispatcher.rs index 5aa3203..049c9ea 100644 --- a/src/brokers/dispatcher.rs +++ b/src/brokers/dispatcher.rs @@ -49,6 +49,7 @@ use lapin::{ }, types::{AMQPValue, FieldTable, LongString, ShortString}, }; +use tokio::sync::watch; use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME}; use crate::brokers::payload::{ @@ -98,6 +99,8 @@ pub async fn spawn( queue_tag: String, instance_id: u32, template_registry: Arc, + shutdown_tx: watch::Sender, + shutdown_rx: watch::Receiver, ) -> Result, BrokerError> { let channel = conn.create_channel().await?; @@ -163,7 +166,14 @@ pub async fn spawn( tracing::info!("dispatcher[{}] queue '{}' declared and bound", instance_id, queue_name); let handle = tokio::spawn(async move { - if let Err(e) = run(channel, queue_name, instance_id, template_registry).await { + if let Err(e) = run( + channel, + queue_name, + instance_id, + template_registry, + shutdown_tx, + shutdown_rx, + ).await { tracing::error!("dispatcher[{}] exited with error: {}", instance_id, e); } }); @@ -184,6 +194,8 @@ async fn run( queue_name: String, instance_id: u32, template_registry: Arc, + shutdown_tx: watch::Sender, + mut shutdown_rx: watch::Receiver, ) -> Result<(), BrokerError> { let consumer_tag = format!("dispatcher-{}", instance_id); @@ -198,7 +210,23 @@ async fn run( tracing::info!("dispatcher[{}] consuming from '{}'", instance_id, queue_name); - while let Some(delivery) = consumer.next().await { + loop { + tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_ok() && *shutdown_rx.borrow() { + tracing::info!("dispatcher[{}] received global shutdown signal", instance_id); + break; + } + if changed.is_err() { + tracing::info!("dispatcher[{}] shutdown coordinator dropped", instance_id); + break; + } + } + maybe_delivery = consumer.next() => { + let Some(delivery) = maybe_delivery else { + break; + }; + let delivery = match delivery { Ok(d) => d, Err(e) => { @@ -274,6 +302,7 @@ async fn run( "shutdown" => { let _ = delivery.ack(BasicAckOptions::default()).await; tracing::info!("dispatcher[{}] shutdown event received — exiting", instance_id); + let _ = shutdown_tx.send(true); break; } _ => { @@ -288,6 +317,8 @@ async fn run( } apply_ack_action(&channel, &delivery, &outcome.ack_action).await; + } + } } tracing::info!("dispatcher[{}] consume loop exited", instance_id); diff --git a/src/brokers/logger_store.rs b/src/brokers/logger_store.rs index 8a14247..7616310 100644 --- a/src/brokers/logger_store.rs +++ b/src/brokers/logger_store.rs @@ -1,3 +1,15 @@ +//! # brokers/logger_store.rs — Mongo Logger Store +//! +//! Mongo-backed persistence and query helpers for BEDS logger records. +//! Used by IPL milestone logging, logger diagnostics utilities, and +//! logger-oriented broker/class paths. +//! +//! ## Responsibilities +//! - Initialize and validate logger DB connection from REC config +//! - Append log records to `msLogs` +//! - Fetch recent logs and root-event chains +//! - Purge configured collections for dev IPL workflows + use std::collections::HashMap; use std::sync::OnceLock; @@ -160,6 +172,100 @@ pub async fn fetch_recent(limit: usize) -> Result, String> { Ok(out) } +pub async fn fetch_chain(root_event_id: &str, limit: usize) -> Result, String> { + let store = LOGGER_STORE + .get() + .ok_or_else(|| "logger store is not initialized".to_string())?; + + let root = root_event_id.trim(); + if root.is_empty() { + return Ok(Vec::new()); + } + + let mut cursor = store + .client + .database(&store.db_name) + .collection::(LOGGER_COLLECTION_NAME) + .find(doc! { + "$or": [ + { "event_id": root }, + { "parent_id": root } + ] + }) + .sort(doc! { "created": 1i32 }) + .limit(limit as i64) + .await + .map_err(|e| format!("logger chain fetch failed: {}", e))?; + + let mut out = Vec::new(); + while let Some(next) = cursor.next().await { + let doc = next.map_err(|e| format!("logger chain fetch cursor error: {}", e))?; + let json = serde_json::to_value(&doc) + .map_err(|e| format!("logger chain fetch decode failed: {}", e))?; + out.push(json); + } + + Ok(out) +} + +pub async fn fetch_root_record(root_event_id: &str) -> Result, String> { + let store = LOGGER_STORE + .get() + .ok_or_else(|| "logger store is not initialized".to_string())?; + + let root = root_event_id.trim(); + if root.is_empty() { + return Ok(None); + } + + let mut cursor = store + .client + .database(&store.db_name) + .collection::(LOGGER_COLLECTION_NAME) + .find(doc! { "event_id": root }) + .sort(doc! { "depth": 1i32, "created": 1i32 }) + .limit(1) + .await + .map_err(|e| format!("logger root fetch failed: {}", e))?; + + let Some(next) = cursor.next().await else { + return Ok(None); + }; + + let doc = next.map_err(|e| format!("logger root fetch cursor error: {}", e))?; + let json = serde_json::to_value(&doc) + .map_err(|e| format!("logger root fetch decode failed: {}", e))?; + + Ok(Some(json)) +} + +pub async fn purge_collections(collections: &[String]) -> Result { + let store = LOGGER_STORE + .get() + .ok_or_else(|| "logger store is not initialized".to_string())?; + + let mut total_deleted = 0u64; + + for name in collections { + let coll_name = name.trim(); + if coll_name.is_empty() { + continue; + } + + let result = store + .client + .database(&store.db_name) + .collection::(coll_name) + .delete_many(doc! {}) + .await + .map_err(|e| format!("logger purge failed for '{}': {}", coll_name, e))?; + + total_deleted += result.deleted_count; + } + + Ok(total_deleted) +} + fn epoch_secs() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/brokers/mod.rs b/src/brokers/mod.rs index e21d5ba..bc80dae 100644 --- a/src/brokers/mod.rs +++ b/src/brokers/mod.rs @@ -31,6 +31,7 @@ pub mod payload; use std::sync::Arc; use lapin::Connection; +use tokio::sync::watch; use crate::config::BrokerServicesConfig; use crate::template_registry::RuntimeTemplateRegistry; @@ -63,6 +64,8 @@ pub async fn spawn_dispatcher_pool( conn: Arc, cfg: &BrokerServicesConfig, template_registry: Arc, + shutdown_tx: watch::Sender, + shutdown_rx: watch::Receiver, ) -> Result>, BrokerError> { // For now, use a sensible default if a specific dispatcher instance count isn't set // In full config, this would be cfg.app_server.instances.dispatcher @@ -75,6 +78,8 @@ pub async fn spawn_dispatcher_pool( cfg.queue_tag.clone(), i, Arc::clone(&template_registry), + shutdown_tx.clone(), + shutdown_rx.clone(), ) .await?; handles.push(handle); diff --git a/src/config/structs.rs b/src/config/structs.rs index c4ca89f..1a6ab25 100644 --- a/src/config/structs.rs +++ b/src/config/structs.rs @@ -28,6 +28,10 @@ pub struct BedsConfig { /// Enables verbose debug output when true pub debug: bool, + /// Enables trace-style method-entry logging for deep backend diagnostics + #[serde(default)] + pub trace_on: bool, + /// Enables journald syslog output when true pub syslog: bool, @@ -48,6 +52,21 @@ pub struct BedsConfig { /// MariaDB (REL) node configs keyed by service name (e.g. "app_server") pub rel_services: HashMap, + + /// Development-only logger admin controls (safe defaults when omitted) + #[serde(default)] + pub logger_admin: LoggerAdminConfig, +} + +/// Logger admin controls used primarily in development workflows. +#[derive(Debug, Deserialize, Default)] +pub struct LoggerAdminConfig { + /// When true in dev/development, purge selected collections on IPL. + pub purge_on_ipl: bool, + + /// Collection names to purge when `purge_on_ipl` is enabled. + #[serde(default)] + pub purge_collections: Vec, } /// Node identity block. Identifies this node's role and environment. diff --git a/src/main.rs b/src/main.rs index b375676..4b54fe6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ //! - `BEDS_ENV` environment variable selects the env config file (default: dev) //! //! ## Outputs -//! - Running BEDS node, blocked on signal handler (not yet implemented) +//! - Running BEDS resident node after IPL (daemon-style foreground process) //! - On fatal IPL failure: error output to console, process exits non-zero //! //! **Author:** ms @@ -23,6 +23,7 @@ //! * `2026-04-04` - mks - promoted service modules to services/ directory //! * `2026-04-04` - mks - ipl() made async for AMQP connection; tokio runtime added //! * `2026-04-04` - mks - added AMQP authenticate + exchange declare to IPL sequence +//! * `2026-04-10` - mks - resident runtime loop + coordinated shutdown signaling mod brokers; mod config; @@ -31,6 +32,176 @@ mod logging; mod services; mod template_registry; +use std::collections::HashMap; + +use serde_json::Value; +use tokio::sync::watch; +use uuid::Uuid; + +fn level_to_val(level: &str) -> i64 { + match level.to_ascii_lowercase().as_str() { + "debug" => 0, + "data" => 1, + "info" => 2, + "event" => 3, + "warning" | "warn" => 4, + "error" => 5, + "fatal" => 6, + "timer" => 7, + _ => 2, + } +} + +struct IplLogContext { + env_name: String, + node_name: String, + root_event_id: String, + trace_on: bool, + seq: u64, +} + +impl IplLogContext { + fn new(wbid: &str, env_name: &str, trace_on: bool) -> Self { + Self { + env_name: env_name.to_string(), + node_name: wbid.to_string(), + root_event_id: format!("{}.{}.{}", wbid, env_name, Uuid::new_v4()), + trace_on, + seq: 0, + } + } + + fn next_event_id(&mut self) -> String { + self.seq += 1; + if self.seq == 1 { + self.root_event_id.clone() + } else { + format!("{}:{}", self.root_event_id, self.seq) + } + } +} + +async fn append_ipl_log( + ctx: &mut IplLogContext, + event: &str, + level: &str, + message: &str, + method: &str, + line: u32, +) { + let event_id = ctx.next_event_id(); + let depth = if event_id == ctx.root_event_id { 0i64 } else { 1i64 }; + + let mut data = HashMap::::new(); + data.insert("service_log".to_string(), Value::String("beds".to_string())); + data.insert("node_log".to_string(), Value::String(ctx.node_name.clone())); + data.insert("resource".to_string(), Value::String("BEDS".to_string())); + data.insert("event_id".to_string(), Value::String(event_id)); + data.insert("parent_id".to_string(), Value::String(ctx.root_event_id.clone())); + data.insert("depth".to_string(), Value::from(depth)); + data.insert("level_log".to_string(), Value::String(level.to_ascii_lowercase())); + data.insert("severity_log".to_string(), Value::String(level.to_ascii_uppercase())); + data.insert("level_val".to_string(), Value::from(level_to_val(level))); + data.insert("event_log".to_string(), Value::String(event.to_string())); + data.insert("message_log".to_string(), Value::String(message.to_string())); + data.insert("env_log".to_string(), Value::String(ctx.env_name.clone())); + data.insert("file_log".to_string(), Value::String(file!().to_string())); + data.insert("method_log".to_string(), Value::String(method.to_string())); + data.insert("line_log".to_string(), Value::from(line as i64)); + + if let Err(e) = brokers::logger_store::append_log(data).await { + tracing::warn!("IPL logger event '{}' failed (non-fatal): {}", event, e); + } +} + +async fn emit_log_level_showcase(ctx: &mut IplLogContext) { + let examples = [ + ( + "debug", + "LOG_LEVEL_EXAMPLE_DEBUG", + "DEBUG example: verbose developer diagnostics for method flow and state transitions.", + ), + ( + "data", + "LOG_LEVEL_EXAMPLE_DATA", + "DATA example: structured payload snapshot for validating in-flight data shape.", + ), + ( + "info", + "LOG_LEVEL_EXAMPLE_INFO", + "INFO example: normal lifecycle event confirming expected framework behavior.", + ), + ( + "event", + "LOG_LEVEL_EXAMPLE_EVENT", + "EVENT example: domain/event-chain marker tied to root GUID lineage.", + ), + ( + "warn", + "LOG_LEVEL_EXAMPLE_WARN", + "WARN example: degraded but recoverable condition that should be reviewed.", + ), + ( + "error", + "LOG_LEVEL_EXAMPLE_ERROR", + "ERROR example: operation failure requiring developer attention.", + ), + ( + "fatal", + "LOG_LEVEL_EXAMPLE_FATAL", + "FATAL example: unrecoverable condition requiring immediate operator intervention.", + ), + ( + "timer", + "LOG_LEVEL_EXAMPLE_TIMER", + "TIMER example: latency/elapsed-time marker for performance diagnostics.", + ), + ]; + + for (level, event, message) in examples { + append_ipl_log(ctx, event, level, message, "emit_log_level_showcase", line!()).await; + } +} + +async fn append_ipl_trace(ctx: &mut IplLogContext, method: &str, line: u32) { + if !ctx.trace_on { + return; + } + append_ipl_log( + ctx, + "TRACE_METHOD_ENTER", + "debug", + &format!("enter method {}", method), + method, + line, + ) + .await; +} + +async fn try_log_ipl_failure(err: &str) { + let Ok(cfg) = config::load() else { + return; + }; + + if brokers::logger_store::init_from_rec_services(&cfg.rec_services, &cfg.id.env_name) + .await + .is_err() + { + return; + } + + let mut ctx = IplLogContext::new(&cfg.id.wbid, &cfg.id.env_name, cfg.trace_on); + append_ipl_log( + &mut ctx, + "IPL_FATAL", + "fatal", + &format!("IPL failed: {}", err), + "main", + line!(), + ) + .await; +} + /// Executes the BEDS Initial Program Load (IPL) sequence. /// /// IPL bootstraps the node in strict order. Each step must succeed before @@ -46,8 +217,8 @@ mod template_registry; /// 4b. Authenticate to RabbitMQ + declare beds.events exchange /// 5. Validate MongoDB reachability (TCP) /// 6. Validate MariaDB reachability (TCP) -/// 7. Spawn broker pools (rBroker) -/// 8. Node green +/// 7. Spawn dispatcher pool +/// 8. Node green and resident runtime waits for shutdown command/signal /// /// # Returns /// @@ -59,17 +230,24 @@ mod template_registry; /// * `2026-04-02` - mks - original coding /// * `2026-04-04` - mks - made async; added AMQP auth + exchange declare /// * `2026-04-05` - mks - added rBroker pool spawn (step 6) -async fn ipl() -> Result<(), String> { +async fn ipl( + shutdown_tx: watch::Sender, + shutdown_rx: watch::Receiver, +) -> Result>, String> { // load configuration — fatal in all environments if this fails let cfg = config::load().map_err(|e| format!("Failed to load config: {}", e))?; // initialize logging — must come before any tracing calls logging::init_from_config(cfg.syslog, cfg.syslog_mirror_console); + let mut ipl_log = IplLogContext::new(&cfg.id.wbid, &cfg.id.env_name, cfg.trace_on); + tracing::info!("BEDS IPL starting, node={} env={}", cfg.id.wbid, cfg.id.env_name); tracing::info!("Configuration loaded"); tracing::info!("Logging initialized"); + append_ipl_trace(&mut ipl_log, "ipl", line!()).await; + // step 2b: load and validate REC templates used by runtime dispatch let templates = template_registry::load_runtime_rec_templates("templates") .map_err(|e| format!("Template validation failed: {}", e))?; @@ -105,21 +283,77 @@ async fn ipl() -> Result<(), String> { .map_err(|e| format!("Mongo logger store initialization failed: {}", e))?; tracing::info!("Mongo logger store initialized"); + let is_dev_env = matches!(cfg.id.env_name.as_str(), "dev" | "development"); + if is_dev_env && cfg.logger_admin.purge_on_ipl { + append_ipl_trace(&mut ipl_log, "purge_on_ipl", line!()).await; + let deleted = brokers::logger_store::purge_collections(&cfg.logger_admin.purge_collections) + .await + .map_err(|e| format!("Mongo logger purge-on-IPL failed: {}", e))?; + + tracing::warn!( + "DEV purge-on-IPL enabled: cleared {} record(s) from {:?}", + deleted, + cfg.logger_admin.purge_collections + ); + } + + // Emit startup milestones into logger storage after logger init succeeds. + // This gives the log dumper a reliable IPL trail. + append_ipl_log( + &mut ipl_log, + "IPL_LOGGER_STORE_READY", + "info", + "logger store initialized; IPL milestone capture active", + "ipl", + line!(), + ) + .await; + + append_ipl_log( + &mut ipl_log, + "IPL_BOOTSTRAP_RECORDED", + "info", + "completed bootstrap phases: config, logging, template validation, AMQP validation/auth/exchange, Mongo validation", + "ipl", + line!(), + ) + .await; + // step 6: validate MariaDB reachability — fatal in production, non-fatal in all other envs // secondary instance failures are always non-fatal (handled inside validate_all) match services::mariadb::validate_all(&cfg.rel_services) { - Ok(()) => tracing::info!("MariaDB reachable"), + Ok(()) => { + tracing::info!("MariaDB reachable"); + append_ipl_log( + &mut ipl_log, + "IPL_MARIADB_READY", + "info", + "MariaDB validation passed", + "ipl", + line!(), + ) + .await; + } Err(e) => { if cfg.id.env_name == "production" { return Err(e); } tracing::warn!("MariaDB unreachable (non-fatal in {}): {}", cfg.id.env_name, e); + append_ipl_log( + &mut ipl_log, + "IPL_MARIADB_DEGRADED", + "warn", + &format!("MariaDB unreachable (non-fatal in {}): {}", cfg.id.env_name, e), + "ipl", + line!(), + ) + .await; } } // step 7: spawn dispatcher pool — single consumer from unified events queue // each dispatcher instance instantiates classes based on template lookup - let _dispatcher_handles: Vec> = { + let dispatcher_handles: Vec> = { use std::sync::Arc; let shared_conn = Arc::new( lapin::Connection::connect( @@ -143,6 +377,8 @@ async fn ipl() -> Result<(), String> { Arc::clone(&shared_conn), &cfg.broker_services, std::sync::Arc::clone(&template_registry_state), + shutdown_tx, + shutdown_rx, ) .await .map_err(|e| format!("Dispatcher pool failed to start: {}", e))?; @@ -150,14 +386,71 @@ async fn ipl() -> Result<(), String> { d_handles }; + append_ipl_log( + &mut ipl_log, + "IPL_DISPATCHER_POOL_READY", + "info", + "dispatcher pool started", + "ipl", + line!(), + ) + .await; + tracing::info!("BEDS IPL complete — node green"); - Ok(()) + append_ipl_log( + &mut ipl_log, + "IPL_NODE_GREEN", + "info", + "BEDS IPL complete; node is green", + "ipl", + line!(), + ) + .await; + + emit_log_level_showcase(&mut ipl_log).await; + Ok(dispatcher_handles) } #[tokio::main] async fn main() { - if let Err(e) = ipl().await { - eprintln!("[BEDS] [FATAL] [IPL] {}", e); - std::process::exit(1); + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); + + let mut handles = match ipl(shutdown_tx.clone(), shutdown_rx.clone()).await { + Ok(handles) => handles, + Err(e) => { + try_log_ipl_failure(&e).await; + eprintln!("[BEDS] [FATAL] [IPL] {}", e); + std::process::exit(1); + } + }; + + tracing::info!("BEDS resident runtime active; waiting for shutdown command or Ctrl+C"); + + tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_ok() && *shutdown_rx.borrow() { + tracing::info!("BEDS shutdown requested by broker command"); + } + } + sig = tokio::signal::ctrl_c() => { + match sig { + Ok(()) => { + tracing::info!("BEDS shutdown requested by Ctrl+C"); + let _ = shutdown_tx.send(true); + } + Err(e) => { + tracing::error!("BEDS signal listener failure: {}", e); + } + } + } } + + while let Some(handle) = handles.pop() { + if let Err(e) = handle.await { + tracing::warn!("dispatcher task join error during shutdown: {}", e); + } + } + + tracing::info!("BEDS shutdown complete"); } + diff --git a/wiki/04-ipl.md b/wiki/04-ipl.md index b75cd2f..6e87298 100644 --- a/wiki/04-ipl.md +++ b/wiki/04-ipl.md @@ -6,6 +6,19 @@ IPL (Initial Program Load) is the BEDS bootstrap sequence. The term comes from I `ipl()` is the first function called from `main()`. If IPL completes successfully, the node is green and enters its operational state. If any required step fails, IPL aborts and the process exits with a console error report. +## Current Runtime State (2026-04) + +The current Rust runtime behavior has advanced beyond the original POC notes in this page: + +1. `main()` is now resident after IPL completes. +2. Dispatcher workers are started as a pool and the process remains active waiting for shutdown. +3. A broker `shutdown` command now triggers coordinated process shutdown. +4. IPL writes structured startup milestones to logger storage (`msLogs`) after logger init. +5. Development config can optionally purge selected admin/logger collections on IPL. +6. Optional trace-style method-entry logging is available through config (`trace_on`) for deep backend diagnostics. + +Treat the sequence below as conceptual IPL ordering; operational lifecycle now includes post-IPL resident runtime and coordinated shutdown. + ## Why Order Matters The IPL sequence is not arbitrary. Each step depends on the previous one: diff --git a/wiki/06-queue-topology.md b/wiki/06-queue-topology.md index f5559b9..cdf019b 100644 --- a/wiki/06-queue-topology.md +++ b/wiki/06-queue-topology.md @@ -1,5 +1,17 @@ # Queue Topology +## Current Runtime Topology (2026-04) + +The active Rust runtime currently uses a unified dispatcher model: + +1. Single primary events queue per environment tag (`{queue_tag}events`). +2. Unified dispatcher pool consumes from that queue. +3. Queue binding uses topic wildcard (`#`) on `beds.events`. +4. `shutdown` operation is handled as a broker command that triggers coordinated global shutdown. +5. Legacy `rec.read` and `rec.write` broker files remain in repository history/reference, but runtime path is centered on unified dispatcher workers. + +The broader broker matrix below remains useful as historical and planned topology context. + ## Overview BEDS uses a single RabbitMQ topic exchange for all data events. Topic exchanges route messages based on a dotted routing key — this gives BEDS fine-grained control over which brokers receive which events without the overhead of managing multiple exchanges. diff --git a/wiki/10-modernization-roadmap.md b/wiki/10-modernization-roadmap.md index cc5f160..2007456 100644 --- a/wiki/10-modernization-roadmap.md +++ b/wiki/10-modernization-roadmap.md @@ -53,6 +53,10 @@ Implementation status update: - Phase B has started: REC template registry loading and startup validation are now implemented in IPL. - Phase B progression: runtime template registry state is now persisted and passed into broker workers for dispatch-time template validation. - Reliability progression: deterministic ack/nack behavior and retry/DLQ queue topology are implemented for `rec.read` and `rec.write`. +- Runtime progression: unified dispatcher pool and single-queue consumption model are active. +- Lifecycle progression: BEDS now runs as a resident process after IPL and supports coordinated shutdown via broker `shutdown` command. +- Diagnostics progression: startup now emits structured IPL milestone logs (including severity examples) into logger storage for observability validation. +- Tooling progression: Rust Observatory utility (`log_dumper`) provides development log browsing, root-guid tracing, and single-record view. ## Must-Keep Invariants diff --git a/wiki/11-beds-architecture-visual-brief.md b/wiki/11-beds-architecture-visual-brief.md new file mode 100644 index 0000000..6b8b89a --- /dev/null +++ b/wiki/11-beds-architecture-visual-brief.md @@ -0,0 +1,166 @@ +# BEDS Architecture Visual Brief + +Audience: Senior managers, directors, and architects +Purpose: Explain what BEDS is, why AMQP is central, and how the software stack delivers operational impact. + +## Executive Summary + +BEDS (Back End Data System) is a data-service framework where AMQP (RabbitMQ) is the backbone for all data movement. + +Instead of app code talking directly to databases, every request goes through AMQP first. This provides: + +- Strong control over in-transit data paths +- Centralized routing and policy enforcement +- Better observability and auditability +- Scale-out through queue consumers (dispatcher pools) +- Isolation between transport concerns and domain/business logic + +Business impact: + +- Faster onboarding of new data domains +- More consistent operational controls +- Higher confidence in reliability and replayability +- Lower risk from tightly coupled app-to-database patterns + +## One-Sentence Product Positioning + +BEDS is an AMQP-first data orchestration platform that turns data access into a governed, scalable event pipeline instead of direct database calls. + +## Architecture in Plain Language + +When a client asks to read or write data: + +1. The request is published into RabbitMQ. +2. A dispatcher pool consumes messages from the queue. +3. The dispatcher validates the envelope and selects the target template/class. +4. The class chain executes business logic and schema-specific behavior. +5. The storage adapters execute persistence in MongoDB or SQL systems. +6. A response is published back (if the request expects one). + +The key design point: AMQP is the stable transport spine; domain classes can evolve without changing transport scaffolding. + +## Software Stack (Top to Bottom) + +### Layer 1: Experience and Integration + +- API clients +- Internal tools (for example: log reader UI) +- Service integrations + +### Layer 2: Transport and Delivery (AMQP) + +- RabbitMQ exchange and queue topology +- Message envelopes (versioned request/response contracts) +- Acknowledgment and negative acknowledgment behavior +- Retry and dead-letter patterns + +### Layer 3: Dispatch and Domain Runtime + +- Dispatcher pool (consumer workers) +- Template registry lookup +- Class instantiation (domain object selected by template) +- Class -> Schema -> Base I/O execution path + +### Layer 4: Data Adapters and Persistence + +- MongoDB access (REC/document-oriented workloads) +- Relational DB access (REL/SQL workloads) +- Logger event storage (msLogs) + +### Layer 5: Operations and Governance + +- Structured logging and diagnostics +- Environment-configured runtime behavior +- Health checks and startup validation (IPL) +- Future auto-scaling supervisor concept for dispatcher pools + +## Why AMQP Scaffolding Matters + +AMQP is not just transport in this architecture. It is the control plane. + +Benefits in management terms: + +- Policy centralization: transport rules are applied consistently +- Observability: queue behavior, consumer behavior, and message flow are inspectable +- Safety: retries and DLQs protect against transient failures +- Flexibility: adding new domains does not require redesigning transport +- Scale: increase dispatcher workers to handle backlog without app rewrites + +## Current Node and Flow Model + +- Single binary deployment with role/config-driven runtime behavior +- Unified dispatcher model consuming queue messages +- Template-driven class resolution to mask schema details from transport observers + +In effect, transport sees standard envelopes; schema details are revealed only at class execution time. + +## Reference Data Flow (Narrative) + +Client request -> RabbitMQ message -> Dispatcher consumes -> Template/class resolution -> Domain logic -> Mongo/SQL adapter -> Reply message + +This pattern supports both: + +- Request/response operations +- Fire-and-forget telemetry (for example: logger semantics) + +## Leadership-Level Benefits + +- Architectural consistency across teams and services +- Reduced change risk due to clear layer boundaries +- Operational leverage through queue-based scaling +- Better compliance posture from centralized message handling +- Strong foundation for future AI-assisted data object generation + +## Visual Blueprint (for Diagram or Image Generation) + +Use this structure when creating architecture visuals: + +- Left side: Client/API and internal tools +- Center spine: RabbitMQ (highlighted as "AMQP Control Plane") +- Right side: Dispatcher pool and class runtime +- Bottom: MongoDB + SQL data stores +- Overlay badges: Auditability, Retry/DLQ, Scale-out Consumers, Config-Driven Nodes + +Color and style suggestions: + +- Transport spine in teal/blue +- Domain runtime in neutral steel/graphite +- Data stores in earthy accents +- Use arrows to emphasize directional flow and feedback loop for responses + +## Prompt Pack for AI Image Generation + +### Prompt A: Executive Architecture Poster + +Create a modern enterprise architecture poster titled "BEDS: AMQP-First Data Orchestration". Show a left-to-right pipeline: clients and tools -> RabbitMQ AMQP control plane -> dispatcher worker pool -> class/schema execution -> MongoDB and SQL storage. Include callout badges: auditability, delivery guarantees, retry/DLQ safety, scalable consumers, config-driven deployment. Style should be clean, boardroom-ready, and easy to understand for non-engineering leadership. + +### Prompt B: Technical Leadership Slide Graphic + +Generate a diagram for a CTO slide that explains a message-driven data platform. Center RabbitMQ as the control plane. Show message envelopes entering from APIs, dispatcher consumers scaling horizontally, template-driven class resolution, and adapters writing to document and relational stores. Emphasize "no direct app-to-database coupling". Use polished enterprise iconography and concise labels. + +### Prompt C: Impact-Focused Infographic + +Produce an infographic with three bands: "How it works", "Why it matters", "Business impact". In "How it works", illustrate AMQP-first request flow through dispatchers into data stores. In "Why it matters", list governance, observability, reliability, and scaling. In "Business impact", highlight faster delivery, lower operational risk, and better control of data in transit. + +## Diagram Source (Mermaid) + +```mermaid +flowchart LR + A[Clients and Internal Tools] --> B[RabbitMQ AMQP Control Plane] + B --> C[Dispatcher Pool] + C --> D[Template Registry and Class Resolution] + D --> E[Class -> Schema -> Base IO] + E --> F[(MongoDB REC)] + E --> G[(SQL REL)] + E --> H[(msLogs Logger Store)] + E --> I[Response Envelope] + I --> A + + B -.-> J[Retry and DLQ] + C -.-> K[Horizontal Scale-Out] + B -.-> L[Audit and Transport Controls] +``` + +## Suggested Talking Track (30-45 seconds) + +"BEDS moves data access from direct database calls into an AMQP-governed pipeline. RabbitMQ becomes the transport control plane, and dispatcher workers handle execution through template-driven class resolution. This gives us stronger observability, safer failure handling, and easier horizontal scaling. Operationally, we gain consistency and control; strategically, we gain a platform that is easier to extend and govern." diff --git a/wiki/Home.md b/wiki/Home.md index 4555279..29bf77b 100644 --- a/wiki/Home.md +++ b/wiki/Home.md @@ -18,6 +18,7 @@ If you are reading this as a new contributor, start here and read in order. The - [IPL — Initial Program Load](04-ipl.md) — The bootstrap sequence, step by step, and why order matters - [Configuration System](05-configuration.md) — Layered TOML, environment files, topology options - [Modernization Roadmap](10-modernization-roadmap.md) — POC-first execution sequence and modernization requirements +- [Architecture Visual Brief](11-beds-architecture-visual-brief.md) — Leadership-facing architecture narrative and diagram prompts ### Messaging - [Queue Topology](06-queue-topology.md) — AMQP exchanges, queues, routing keys, and the broker model