Compare commits

..

2 Commits

Author SHA1 Message Date
14ec58318b feat: resident runtime, shutdown command, observatory, and IPL logging hardening
- keep BEDS resident after IPL and coordinate clean shutdown
- propagate AMQP shutdown command across dispatcher pool
- add structured IPL milestone/event-chain logging with root GUID context
- add optional trace_on config for verbose method-entry diagnostics
- add dev purge-on-IPL controls for admin/logger collections
- add log level showcase events after IPL node-green
- add Mongo logger store helpers for chain/root lookup and purge
- add/modernize BEDS Observatory log_dumper utility UI and root record view
- refresh source headers and wiki docs for current architecture/runtime
- add architecture visual brief for leadership/image-generation workflows
2026-04-10 13:42:39 -07:00
0af80612bb refactor: collapse r_broker+w_broker into unified dispatcher pool
- Single dispatcher pool replaces separate rBroker and wBroker pools
- All events consumed from single queue (beds.events)
- Template lookup reveals schema; class instantiation owns business logic
- Transport layer (dispatcher) is data-schema-agnostic
- AMQP consumer groups handle load-balancing natively
- Reduces exchange/queue topology overhead (no strict rec.read/rec.write)
- Maintains same queue names and routing keys for operator clarity
- r_broker.rs and w_broker.rs retained as reference (can be removed)

Implements architecture decision from user conversation on 2026-04-09.
2026-04-09 14:30:48 -07:00
16 changed files with 2258 additions and 83 deletions

290
Cargo.lock generated
View File

@@ -89,6 +89,15 @@ dependencies = [
"url",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.102"
@@ -290,6 +299,58 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "axum"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde_core",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "base64"
version = "0.21.7"
@@ -426,6 +487,17 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "chrono"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0"
dependencies = [
"iana-time-zone",
"num-traits",
"windows-link",
]
[[package]]
name = "cipher"
version = "0.4.4"
@@ -1157,6 +1229,110 @@ dependencies = [
"digest",
]
[[package]]
name = "http"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
dependencies = [
"bytes",
"itoa",
]
[[package]]
name = "http-body"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-body-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca"
dependencies = [
"atomic-waker",
"bytes",
"futures-channel",
"futures-core",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
dependencies = [
"bytes",
"http",
"http-body",
"hyper",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "iana-time-zone"
version = "0.1.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "2.2.0"
@@ -1490,6 +1666,12 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md-5"
version = "0.10.6"
@@ -1506,6 +1688,12 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -2246,6 +2434,8 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
name = "rustybeds"
version = "0.1.0"
dependencies = [
"axum",
"chrono",
"config",
"futures-lite 2.6.1",
"lapin",
@@ -2258,8 +2448,15 @@ dependencies = [
"tracing",
"tracing-journald",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "ryu"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "salsa20"
version = "0.10.2"
@@ -2378,6 +2575,17 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
@@ -2387,6 +2595,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_with"
version = "3.18.0"
@@ -2547,6 +2767,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "synstructure"
version = "0.13.2"
@@ -2795,12 +3021,41 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tower"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -3132,6 +3387,41 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.2.1"

View File

@@ -16,3 +16,6 @@ mongodb = { version = "3", features = ["sync"] }
futures-lite = "2"
lapin = "2"
tokio = { version = "1", features = ["full"] }
axum = "0.8"
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
uuid = { version = "1", features = ["v4"] }

662
src/bin/log_dumper.rs Normal file
View File

@@ -0,0 +1,662 @@
//! # bin/log_dumper.rs — BEDS Observatory Utility
//!
//! Development-facing web utility for inspecting logger records in `msLogs`.
//! Provides quick filtering, root GUID tracing, single-record view, and a
//! seed-write action for validating logger storage paths.
//!
//! ## Routes
//! - `/` — main observatory page
//! - `/record` — single root record display by `id`
//! - `/health` — utility health endpoint
//!
//! ## Notes
//! - Reads config via the same loader as the main BEDS runtime
//! - Initializes logger store at startup
//! - Intended for development and diagnostics workflows
use std::net::SocketAddr;
use axum::{
extract::Query,
http::StatusCode,
response::{Html, IntoResponse},
routing::get,
Router,
};
use chrono::{Local, TimeZone};
use serde::Deserialize;
use rustybeds::{brokers::logger_store, config};
#[derive(Debug, Deserialize)]
struct DumpQuery {
lines: Option<usize>,
source: Option<String>,
action: Option<String>,
root: Option<String>,
}
#[derive(Debug, Deserialize)]
struct RecordQuery {
id: Option<String>,
}
#[tokio::main]
async fn main() {
if let Err(e) = run().await {
eprintln!("[LOG_DUMPER] fatal: {}", e);
std::process::exit(1);
}
}
async fn run() -> Result<(), String> {
let cfg = config::load().map_err(|e| format!("config load failed: {}", e))?;
logger_store::init_from_rec_services(&cfg.rec_services, &cfg.id.env_name)
.await
.map_err(|e| format!("logger store init failed: {}", e))?;
let app = Router::new()
.route("/", get(index))
.route("/record", get(record_view))
.route("/health", get(health));
let addr: SocketAddr = "127.0.0.1:8088"
.parse()
.map_err(|e| format!("invalid bind address: {}", e))?;
println!("[LOG_DUMPER] running at http://{}/", addr);
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| format!("bind failed: {}", e))?;
axum::serve(listener, app)
.await
.map_err(|e| format!("serve failed: {}", e))?;
Ok(())
}
async fn health() -> impl IntoResponse {
(StatusCode::OK, "ok")
}
async fn record_view(Query(query): Query<RecordQuery>) -> impl IntoResponse {
let root_id = query.id.unwrap_or_default();
if root_id.trim().is_empty() {
return Html("<h2>Missing root event id</h2><p><a href='/'>Back to observatory</a></p>".to_string());
}
let content = match logger_store::fetch_root_record(&root_id).await {
Ok(Some(row)) => {
let pretty = serde_json::to_string_pretty(&row).unwrap_or_else(|_| "{}".to_string());
format!(
"<h2>Root Event Record</h2><p><strong>event_id:</strong> {}</p><pre>{}</pre>",
escape_html(&root_id),
escape_html(&pretty)
)
}
Ok(None) => format!(
"<h2>No root record found</h2><p>event_id: {}</p>",
escape_html(&root_id)
),
Err(e) => format!("<h2>Fetch failed</h2><p class='error'>{}</p>", escape_html(&e)),
};
let page = format!(
"<!doctype html><html lang='en'><head><meta charset='utf-8'><meta name='viewport' content='width=device-width, initial-scale=1'><title>Root Event</title><style>body{{font-family:Space Grotesk,Segoe UI,sans-serif;background:#f5f8f2;color:#0e1f1b;margin:0;padding:20px}}.shell{{max-width:980px;margin:0 auto;background:#fff;border:1px solid #9eb7b0;border-radius:12px;padding:18px}}a{{color:#16658f}}pre{{overflow:auto;background:#f7fbf9;border:1px solid #c9dad4;border-radius:8px;padding:12px;font-family:JetBrains Mono,monospace}}</style></head><body><div class='shell'><p><a href='/'>← Back to observatory</a></p>{}</div></body></html>",
content
);
Html(page)
}
async fn index(Query(query): Query<DumpQuery>) -> impl IntoResponse {
let mut lines = query.lines.unwrap_or(100);
if lines == 0 {
lines = 100;
}
if lines > 1000 {
lines = 1000;
}
let source = query.source.unwrap_or_else(|| "Logs".to_string());
let root_filter = query.root.unwrap_or_default();
let mut banner = String::new();
if query.action.as_deref() == Some("seed") {
let mut seed = std::collections::HashMap::new();
seed.insert(
"service_log".to_string(),
serde_json::Value::String("log_dumper".to_string()),
);
seed.insert(
"severity_log".to_string(),
serde_json::Value::String("INFO".to_string()),
);
seed.insert(
"event_log".to_string(),
serde_json::Value::String("LOG_DUMPER_SEED".to_string()),
);
seed.insert(
"message_log".to_string(),
serde_json::Value::String("Manual seed event from log dumper UI".to_string()),
);
match logger_store::append_log(seed).await {
Ok(token) => {
banner = format!(
"<div class='ok'>seed write succeeded; db_token={}</div>",
escape_html(&token)
)
}
Err(e) => {
banner = format!(
"<div class='error'>seed write failed: {}</div>",
escape_html(&e)
)
}
}
}
// Current Rust logger store is unified; keep the source selector in the UI
// for parity with the PHP utility, but only Logs is implemented right now.
let content = if source.eq_ignore_ascii_case("logs") {
let rows_result = if root_filter.trim().is_empty() {
logger_store::fetch_recent(lines).await
} else {
logger_store::fetch_chain(&root_filter, lines).await
};
match rows_result {
Ok(rows) => render_rows(rows),
Err(e) => format!("<div class='error'>fetch failed: {}</div>", escape_html(&e)),
}
} else {
"<div class='warn'>Metrics source is not implemented in this Rust utility yet.</div>".to_string()
};
let html = format!(
r#"<!doctype html>
<html lang='en'>
<head>
<meta charset='utf-8'>
<meta name='viewport' content='width=device-width, initial-scale=1'>
<title>BEDS Observatory</title>
<link rel='preconnect' href='https://fonts.googleapis.com'>
<link rel='preconnect' href='https://fonts.gstatic.com' crossorigin>
<link href='https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@400;500;700&family=JetBrains+Mono:wght@400;600&display=swap' rel='stylesheet'>
<style>
:root {{
--bg0: #f5f8f2;
--bg1: #dcebe5;
--bg2: #c8d8d2;
--ink: #0e1f1b;
--muted: #3a5a53;
--line: #9eb7b0;
--accent: #0b8b73;
--accent-2: #16658f;
--warn: #a35a09;
--ok: #117a37;
--err: #b91c1c;
--card: rgba(255, 255, 255, 0.82);
--glass: rgba(255, 255, 255, 0.62);
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
min-height: 100vh;
color: var(--ink);
background:
radial-gradient(1100px 560px at 90% -15%, #d6f4ea 0%, rgba(214,244,234,0) 62%),
radial-gradient(800px 480px at -20% 110%, #cde0f8 0%, rgba(205,224,248,0) 58%),
linear-gradient(165deg, var(--bg0) 0%, var(--bg1) 55%, var(--bg2) 100%);
font-family: "Space Grotesk", "Segoe UI", sans-serif;
padding: 20px;
}}
.shell {{
max-width: 1180px;
margin: 0 auto;
border: 1px solid var(--line);
background: var(--card);
backdrop-filter: blur(8px);
box-shadow: 0 22px 60px rgba(16, 42, 37, 0.18);
border-radius: 18px;
overflow: hidden;
animation: rise 380ms ease-out;
}}
@keyframes rise {{
from {{ transform: translateY(10px); opacity: 0; }}
to {{ transform: translateY(0); opacity: 1; }}
}}
.head {{
padding: 22px 24px;
border-bottom: 1px solid var(--line);
display: flex;
gap: 12px;
flex-wrap: wrap;
align-items: end;
justify-content: space-between;
background: linear-gradient(90deg, rgba(11,139,115,0.13), rgba(22,101,143,0.09));
}}
.title {{
margin: 0;
font-size: clamp(1.2rem, 2vw, 1.95rem);
font-weight: 700;
letter-spacing: 0.015em;
}}
.subtitle {{
color: var(--muted);
font-size: 0.95rem;
max-width: 64ch;
}}
.seed-slot {{
margin-top: 10px;
min-height: 1px;
}}
.controls,
.toolbar {{
padding: 14px 24px;
border-bottom: 1px solid var(--line);
display: flex;
flex-wrap: wrap;
gap: 10px 16px;
align-items: center;
}}
.toolbar {{
justify-content: space-between;
background: var(--glass);
}}
.toolbar .stats {{
display: flex;
gap: 10px;
flex-wrap: wrap;
}}
.stat {{
border: 1px solid var(--line);
border-radius: 999px;
padding: 6px 10px;
font-size: 0.78rem;
color: var(--muted);
background: #f9fcfb;
}}
.controls label {{ font-size: 0.9rem; color: var(--muted); }}
.controls .src {{ display: flex; gap: 8px; align-items: center; }}
.controls input[type='number'] {{
width: 88px;
border: 1px solid var(--line);
background: #fcffff;
padding: 7px 8px;
border-radius: 8px;
font-family: "JetBrains Mono", monospace;
}}
.controls input[type='search'] {{
min-width: 240px;
border: 1px solid var(--line);
background: #fcffff;
padding: 8px 10px;
border-radius: 8px;
font-family: "JetBrains Mono", monospace;
}}
.controls button {{
border: 1px solid var(--accent);
color: #fff;
background: linear-gradient(120deg, var(--accent), var(--accent-2));
padding: 7px 14px;
border-radius: 8px;
font-weight: 600;
cursor: pointer;
transition: transform 100ms ease, filter 100ms ease;
}}
.controls button:hover {{ filter: brightness(1.05); transform: translateY(-1px); }}
.info {{
padding: 8px 24px;
color: var(--muted);
font-size: 0.86rem;
border-bottom: 1px dashed var(--line);
}}
.body {{ padding: 16px 24px 22px; }}
.grid {{ display: grid; gap: 10px; }}
.entry {{
padding: 6px 0;
border-bottom: 1px dashed var(--line);
animation: fadeIn 280ms ease both;
}}
@keyframes fadeIn {{
from {{ opacity: 0; transform: translateY(6px); }}
to {{ opacity: 1; transform: translateY(0); }}
}}
.entry-head {{
display: flex;
gap: 12px;
align-items: center;
justify-content: flex-start;
flex-wrap: wrap;
}}
.entry-head details {{
flex: 1;
min-width: 0;
}}
.entry-meta {{
font-size: 0.78rem;
color: var(--muted);
font-family: "JetBrains Mono", monospace;
}}
.entry-title {{
font-size: 0.88rem;
font-weight: 700;
color: #18362f;
letter-spacing: 0.02em;
text-transform: uppercase;
}}
.entry-link {{
color: var(--accent-2);
font-size: 0.82rem;
text-decoration: none;
border-bottom: 1px dotted var(--accent-2);
padding-bottom: 1px;
}}
.entry-link:hover {{ filter: brightness(0.9); }}
.root-float {{
margin-left: auto;
font-size: 0.76rem;
font-family: "JetBrains Mono", monospace;
text-decoration: none;
color: #0f5f8c;
border: 1px solid #98b9ce;
border-radius: 999px;
padding: 3px 8px;
background: #f1f8fd;
max-width: 42ch;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}}
.payload {{
margin-top: 6px;
padding-left: 10px;
}}
details summary {{
cursor: pointer;
color: var(--ink);
font-size: 0.84rem;
margin-bottom: 0;
user-select: none;
list-style: none;
display: flex;
align-items: center;
gap: 10px;
min-width: 0;
}}
details summary::-webkit-details-marker {{ display: none; }}
.chev {{
color: var(--accent-2);
font-size: 0.9rem;
width: 0.9rem;
display: inline-block;
transform: rotate(-90deg);
transition: transform 120ms ease;
}}
details[open] .chev {{ transform: rotate(0deg); }}
.snippet {{
color: #254942;
font-size: 0.82rem;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}}
pre {{
margin: 0;
font-family: "JetBrains Mono", monospace;
font-size: 0.81rem;
line-height: 1.35;
color: #123a34;
}}
.warn {{ color: var(--warn); font-weight: 600; }}
.ok, .error {{
display: inline-block;
padding: 8px 10px;
border-radius: 8px;
border: 1px solid var(--line);
background: #f8fffc;
font-weight: 700;
font-size: 0.88rem;
}}
.ok {{ color: var(--ok); border-color: #8ec8aa; }}
.error {{ color: var(--err); border-color: #d9a3a3; background: #fff8f8; }}
.hide {{ display: none; }}
@media (max-width: 760px) {{
body {{ padding: 10px; }}
.head, .controls, .toolbar, .info, .body {{ padding-left: 14px; padding-right: 14px; }}
.controls {{ flex-direction: column; align-items: flex-start; }}
.controls input[type='number'] {{ width: 100%; max-width: 150px; }}
.controls input[type='search'] {{ min-width: 0; width: 100%; }}
}}
</style>
</head>
<body>
<section class='shell'>
<header class='head'>
<div>
<h1 class='title'>BEDS Observatory</h1>
<div class='subtitle'>Real-time development diagnostics for logger events in Mongo-backed msLogs.</div>
<div class='seed-slot'>{banner}</div>
</div>
<div class='subtitle'>Utility mode: parity path for legacy dumper workflow</div>
</header>
<form class='controls' method='get' action='/'>
<div class='src'>
<label><input type='radio' name='source' value='Logs' {logs_checked}> Logs</label>
<label><input type='radio' name='source' value='Metrics' {metrics_checked}> Metrics</label>
</div>
<label>
Lines to display
<input type='number' min='1' max='1000' name='lines' value='{lines}'>
</label>
<label>
Root GUID trace
<input type='search' name='root' value='{root_value}' placeholder='paste root event_id to trace chain'>
</label>
<button type='submit'>Go</button>
<button type='submit' name='action' value='seed'>Seed Test Log</button>
<label>
Quick filter
<input id='q' type='search' placeholder='type to filter visible entries'>
</label>
</form>
<section class='toolbar'>
<div class='stats'>
<div class='stat'>Source: {source}</div>
<div class='stat'>Limit: {lines}</div>
<div class='stat'>Rendered: <span id='rendered'>{rendered_count}</span></div>
</div>
<div class='subtitle'>Tip: expand entries to inspect full payload JSON</div>
</section>
<div class='info'>Source: {source} | Showing up to {lines} rows</div>
<section class='body'>
<div class='grid' id='entries'>
{content}
</div>
</section>
</section>
<script>
(function() {{
const input = document.getElementById('q');
const entries = Array.from(document.querySelectorAll('.entry'));
const rendered = document.getElementById('rendered');
if (!input) return;
input.addEventListener('input', function() {{
const q = input.value.toLowerCase().trim();
let visible = 0;
entries.forEach(function(el) {{
const txt = (el.getAttribute('data-search') || '').toLowerCase();
const show = !q || txt.indexOf(q) !== -1;
el.classList.toggle('hide', !show);
if (show) visible += 1;
}});
if (rendered) rendered.textContent = String(visible);
}});
}})();
</script>
</body>
</html>"#,
logs_checked = if source.eq_ignore_ascii_case("logs") { "checked" } else { "" },
metrics_checked = if source.eq_ignore_ascii_case("metrics") { "checked" } else { "" },
lines = lines,
source = escape_html(&source),
root_value = escape_html(&root_filter),
rendered_count = rendered_count(&content),
banner = banner,
content = content,
);
Html(html)
}
fn render_rows(rows: Vec<serde_json::Value>) -> String {
if rows.is_empty() {
return "<div class='warn'>No data found in logger collection.</div>".to_string();
}
let mut out = String::new();
for (idx, row) in rows.into_iter().enumerate() {
let pretty = serde_json::to_string_pretty(&row).unwrap_or_else(|_| "{}".to_string());
let search = escape_html(&pretty);
let title = row
.get("severity_log")
.or_else(|| row.get("level"))
.and_then(|v| v.as_str())
.unwrap_or("event");
let service = row
.get("service_log")
.or_else(|| row.get("service"))
.and_then(|v| v.as_str())
.unwrap_or("unknown-service");
let created = format_created_ts(
row.get("created").or_else(|| row.get("ts"))
);
let event_id = row
.get("event_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let parent_id = row
.get("parent_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let depth = row
.get("depth")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let root_id = if depth <= 0 {
event_id
} else if !parent_id.is_empty() {
parent_id
} else {
event_id
};
let trace_href = format!("/record?id={}", url_encode(root_id));
let message_snippet = row
.get("message_log")
.or_else(|| row.get("message"))
.and_then(|v| v.as_str())
.map(snippet)
.unwrap_or_else(|| "(no message)".to_string());
out.push_str("<article class='entry' data-search='");
out.push_str(&search);
out.push_str("'><div class='entry-head'><details><summary><span class='chev'>▼</span><span class='entry-title'>");
out.push_str(&escape_html(title));
out.push_str("</span><span class='snippet'>");
out.push_str(&escape_html(&message_snippet));
out.push_str("</span><span class='entry-meta'>");
out.push_str(&escape_html(&format!("service={} created={} idx={}", service, created, idx + 1)));
out.push_str("</span></summary><div class='payload'><pre>");
out.push_str(&escape_html(&pretty));
out.push_str("</pre></div></details>");
if !root_id.is_empty() {
out.push_str("<a class='root-float' href='");
out.push_str(&trace_href);
out.push_str("' title='open single root record page'>");
out.push_str(&escape_html(root_id));
out.push_str("</a>");
}
out.push_str("</div></article>");
}
out
}
fn url_encode(input: &str) -> String {
let mut out = String::new();
for b in input.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(char::from(b));
}
b' ' => out.push_str("%20"),
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}
fn rendered_count(content: &str) -> usize {
content.match_indices("class='entry'").count()
}
fn snippet(input: &str) -> String {
let clean = input.replace('\n', " ").replace('\r', " ");
let mut out = String::new();
for (i, ch) in clean.chars().enumerate() {
if i >= 110 {
out.push_str("...");
break;
}
out.push(ch);
}
out
}
fn format_created_ts(value: Option<&serde_json::Value>) -> String {
let Some(value) = value else {
return "n/a".to_string();
};
let secs_opt = value
.as_i64()
.or_else(|| value.as_u64().map(|v| v as i64))
.or_else(|| value.as_str().and_then(|s| s.parse::<i64>().ok()));
let Some(secs) = secs_opt else {
return value.to_string();
};
match Local.timestamp_opt(secs, 0).single() {
Some(ts) => ts.format("%Y-%m-%d %H:%M:%S %Z").to_string(),
None => secs.to_string(),
}
}
fn escape_html(input: &str) -> String {
let mut escaped = String::with_capacity(input.len());
for ch in input.chars() {
match ch {
'&' => escaped.push_str("&amp;"),
'<' => escaped.push_str("&lt;"),
'>' => escaped.push_str("&gt;"),
'"' => escaped.push_str("&quot;"),
'\'' => escaped.push_str("&#39;"),
_ => escaped.push(ch),
}
}
escaped
}

457
src/brokers/dispatcher.rs Normal file
View File

@@ -0,0 +1,457 @@
//! # brokers/dispatcher.rs — Unified Event Dispatcher
//!
//! Single dispatcher task that consumes from beds.events, parses the envelope,
//! instantiates the appropriate class based on template lookup, and delegates
//! to class business logic. The dispatcher is schema-agnostic; the class
//! instantiation reveals schema, CRUD operation, and domain-specific behavior.
//!
//! ## Calling Agents
//! - `brokers::mod` — spawned by `spawn_dispatcher_pool()`
//!
//! ## Inputs
//! - `Arc<lapin::Connection>` — shared AMQP connection
//! - `queue_tag: String` — queue name prefix from config
//! - `instance_id: u32` — numeric ID for log correlation
//! - `template_registry: Arc<RuntimeTemplateRegistry>` — loaded templates
//!
//! ## Outputs
//! - Publishes reply payloads to `reply_to` queue if present in headers
//! - Logs to tracing (journald/console per config)
//!
//! ## Model
//!
//! Transport layer (this file) owns:
//! - AMQP consumption
//! - Envelope parsing
//! - Acknowledgment semantics
//! - Reply routing (if reply_to header exists)
//!
//! Class instantiation owns:
//! - Schema determination (via template lookup)
//! - CRUD operation handling
//! - Business logic
//! - Data transformation
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-09` - mks - unified dispatcher replacing r_broker + w_broker
use std::sync::Arc;
use futures_lite::StreamExt;
use lapin::{
BasicProperties, Channel, Connection,
options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
QueueBindOptions, QueueDeclareOptions,
},
types::{AMQPValue, FieldTable, LongString, ShortString},
};
use tokio::sync::watch;
use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME};
use crate::brokers::payload::{
BrokerRequestEnvelope,
error_response,
parse_request,
success_response,
};
use crate::template_registry::RuntimeTemplateRegistry;
use super::error::BrokerError;
/// Single queue for all events, regardless of schema or CRUD operation.
const QUEUE_NAME_SUFFIX: &str = "events";
enum AckAction {
Ack,
Nack { requeue: bool },
}
struct DispatcherOutcome {
reply_payload: Option<Vec<u8>>,
ack_action: AckAction,
}
/// Spawns a single dispatcher task and returns immediately.
///
/// The task consumes from the single events queue and processes messages
/// until a `shutdown` event is received or the connection is lost.
///
/// # Arguments
///
/// * `conn` — shared AMQP connection; each task opens its own channel
/// * `queue_tag` — queue name prefix from config
/// * `instance_id` — zero-based index for log correlation
/// * `template_registry` — loaded and validated templates
///
/// # Returns
///
/// `Ok(tokio::task::JoinHandle)` — the task handle; held by pool manager
/// `Err(BrokerError)` if channel or queue declaration fails before task start
///
/// # History
///
/// * `2026-04-09` - mks - original coding
pub async fn spawn(
conn: Arc<Connection>,
queue_tag: String,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
) -> Result<tokio::task::JoinHandle<()>, BrokerError> {
let channel = conn.create_channel().await?;
let queue_name = format!("{}{}", queue_tag, QUEUE_NAME_SUFFIX);
let dlq_queue_name = format!("{}{}.dlq", queue_tag, QUEUE_NAME_SUFFIX);
let mut primary_args = FieldTable::default();
primary_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)),
);
primary_args.insert(
ShortString::from("x-dead-letter-routing-key"),
AMQPValue::LongString(LongString::from("events.dlq")),
);
// Declare the single events queue — idempotent
channel
.queue_declare(
&queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
primary_args,
)
.await?;
// Declare DLQ
channel
.queue_declare(
&dlq_queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
channel
.queue_bind(
&dlq_queue_name,
DLX_EXCHANGE_NAME,
"events.dlq",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
// Bind the single queue to the exchange with a default routing key
// This receives all events
channel
.queue_bind(
&queue_name,
EXCHANGE_NAME,
"#",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
tracing::info!("dispatcher[{}] queue '{}' declared and bound", instance_id, queue_name);
let handle = tokio::spawn(async move {
if let Err(e) = run(
channel,
queue_name,
instance_id,
template_registry,
shutdown_tx,
shutdown_rx,
).await {
tracing::error!("dispatcher[{}] exited with error: {}", instance_id, e);
}
});
Ok(handle)
}
/// The dispatcher consume loop.
///
/// Pulls messages from the single events queue, dispatches based on template
/// and class instantiation, and replies if a reply_to header is present.
///
/// # History
///
/// * `2026-04-09` - mks - original coding
async fn run(
channel: Channel,
queue_name: String,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
shutdown_tx: watch::Sender<bool>,
mut shutdown_rx: watch::Receiver<bool>,
) -> Result<(), BrokerError> {
let consumer_tag = format!("dispatcher-{}", instance_id);
let mut consumer = channel
.basic_consume(
&queue_name,
&consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
tracing::info!("dispatcher[{}] consuming from '{}'", instance_id, queue_name);
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
tracing::info!("dispatcher[{}] received global shutdown signal", instance_id);
break;
}
if changed.is_err() {
tracing::info!("dispatcher[{}] shutdown coordinator dropped", instance_id);
break;
}
}
maybe_delivery = consumer.next() => {
let Some(delivery) = maybe_delivery else {
break;
};
let delivery = match delivery {
Ok(d) => d,
Err(e) => {
tracing::error!("dispatcher[{}] delivery error: {}", instance_id, e);
break;
}
};
let event_type = delivery
.properties
.kind()
.as_ref()
.map(|s| s.as_str().to_string())
.unwrap_or_default();
let header_correlation_id = delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.as_str().to_string())
.unwrap_or_default();
tracing::debug!("dispatcher[{}] received event type='{}'", instance_id, event_type);
let envelope = match parse_request(&delivery.data) {
Ok(env) => env,
Err(e) => {
let payload = error_response(
"unknown",
&header_correlation_id,
"INVALID_ENVELOPE",
&e,
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
};
let correlation_id = if envelope.correlation_id.trim().is_empty() {
header_correlation_id
} else {
envelope.correlation_id.clone()
};
if !event_type.is_empty() && event_type != envelope.op {
let payload = error_response(
&envelope.op,
&correlation_id,
"OP_MISMATCH",
"AMQP type header does not match envelope op",
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
let outcome = match envelope.op.as_str() {
"ping" => DispatcherOutcome {
reply_payload: Some(handle_ping(&correlation_id, instance_id)),
ack_action: AckAction::Ack,
},
"shutdown" => {
let _ = delivery.ack(BasicAckOptions::default()).await;
tracing::info!("dispatcher[{}] shutdown event received — exiting", instance_id);
let _ = shutdown_tx.send(true);
break;
}
_ => {
// Any other operation: template lookup → class instantiation
// This is where schema is revealed and business logic runs.
handle_class_dispatch(envelope, &correlation_id, instance_id, &template_registry).await
}
};
if let Some(payload) = outcome.reply_payload {
publish_reply(&channel, &delivery, payload).await;
}
apply_ack_action(&channel, &delivery, &outcome.ack_action).await;
}
}
}
tracing::info!("dispatcher[{}] consume loop exited", instance_id);
Ok(())
}
/// Handles a `ping` health check event.
///
/// # History
///
/// * `2026-04-09` - mks - original coding
fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec<u8> {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
success_response(
"ping",
correlation_id,
serde_json::json!({
"dispatcher": "dispatcher",
"instance": instance_id,
"ts": ts,
}),
)
}
/// Routes to class instantiation based on template lookup.
///
/// Template namespace → class instantiation → schema revealed → business logic.
/// This is the pivotal point where transport concerns hand off to domain logic.
///
/// # History
///
/// * `2026-04-09` - mks - original coding
async fn handle_class_dispatch(
envelope: BrokerRequestEnvelope,
correlation_id: &str,
instance_id: u32,
template_registry: &RuntimeTemplateRegistry,
) -> DispatcherOutcome {
// Look up the template in the registry
if !template_registry.contains_template(&envelope.template) {
tracing::warn!(
"dispatcher[{}] unknown template '{}'",
instance_id,
envelope.template
);
return DispatcherOutcome {
reply_payload: Some(error_response(
&envelope.op,
correlation_id,
"UNKNOWN_TEMPLATE",
&format!("template '{}' not found in registry", envelope.template),
)),
ack_action: AckAction::Nack { requeue: false },
};
}
// Template found; in a full implementation, this would instantiate the class
// and call its business logic. For now, it's a stub that acknowledges.
tracing::debug!(
"dispatcher[{}] routing {} to class for template '{}'",
instance_id,
envelope.op,
envelope.template
);
DispatcherOutcome {
reply_payload: Some(success_response(
&envelope.op,
correlation_id,
serde_json::json!({
"status": "NOT_IMPLEMENTED",
"template": envelope.template,
"operation": envelope.op,
}),
)),
ack_action: AckAction::Ack,
}
}
/// Publishes a reply to the `reply_to` queue if present in the message properties.
async fn publish_reply(
channel: &Channel,
delivery: &lapin::message::Delivery,
payload: Vec<u8>,
) {
let reply_to = delivery
.properties
.reply_to()
.as_ref()
.map(|s| s.as_str());
if let Some(queue_name) = reply_to {
if let Err(e) = channel
.basic_publish(
"",
queue_name,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
{
tracing::error!("failed to publish reply to '{}': {}", queue_name, e);
}
}
}
/// Applies the ack/nack decision to the delivery.
async fn apply_ack_action(
_channel: &Channel,
delivery: &lapin::message::Delivery,
action: &AckAction,
) {
match action {
AckAction::Ack => {
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
tracing::error!("failed to ack delivery: {}", e);
}
}
AckAction::Nack { requeue } => {
if let Err(e) = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: *requeue,
})
.await
{
tracing::error!("failed to nack delivery: {}", e);
}
}
}
}

View File

@@ -1,3 +1,15 @@
//! # brokers/logger_store.rs — Mongo Logger Store
//!
//! Mongo-backed persistence and query helpers for BEDS logger records.
//! Used by IPL milestone logging, logger diagnostics utilities, and
//! logger-oriented broker/class paths.
//!
//! ## Responsibilities
//! - Initialize and validate logger DB connection from REC config
//! - Append log records to `msLogs`
//! - Fetch recent logs and root-event chains
//! - Purge configured collections for dev IPL workflows
use std::collections::HashMap;
use std::sync::OnceLock;
@@ -160,6 +172,100 @@ pub async fn fetch_recent(limit: usize) -> Result<Vec<Value>, String> {
Ok(out)
}
pub async fn fetch_chain(root_event_id: &str, limit: usize) -> Result<Vec<Value>, String> {
let store = LOGGER_STORE
.get()
.ok_or_else(|| "logger store is not initialized".to_string())?;
let root = root_event_id.trim();
if root.is_empty() {
return Ok(Vec::new());
}
let mut cursor = store
.client
.database(&store.db_name)
.collection::<Document>(LOGGER_COLLECTION_NAME)
.find(doc! {
"$or": [
{ "event_id": root },
{ "parent_id": root }
]
})
.sort(doc! { "created": 1i32 })
.limit(limit as i64)
.await
.map_err(|e| format!("logger chain fetch failed: {}", e))?;
let mut out = Vec::new();
while let Some(next) = cursor.next().await {
let doc = next.map_err(|e| format!("logger chain fetch cursor error: {}", e))?;
let json = serde_json::to_value(&doc)
.map_err(|e| format!("logger chain fetch decode failed: {}", e))?;
out.push(json);
}
Ok(out)
}
pub async fn fetch_root_record(root_event_id: &str) -> Result<Option<Value>, String> {
let store = LOGGER_STORE
.get()
.ok_or_else(|| "logger store is not initialized".to_string())?;
let root = root_event_id.trim();
if root.is_empty() {
return Ok(None);
}
let mut cursor = store
.client
.database(&store.db_name)
.collection::<Document>(LOGGER_COLLECTION_NAME)
.find(doc! { "event_id": root })
.sort(doc! { "depth": 1i32, "created": 1i32 })
.limit(1)
.await
.map_err(|e| format!("logger root fetch failed: {}", e))?;
let Some(next) = cursor.next().await else {
return Ok(None);
};
let doc = next.map_err(|e| format!("logger root fetch cursor error: {}", e))?;
let json = serde_json::to_value(&doc)
.map_err(|e| format!("logger root fetch decode failed: {}", e))?;
Ok(Some(json))
}
pub async fn purge_collections(collections: &[String]) -> Result<u64, String> {
let store = LOGGER_STORE
.get()
.ok_or_else(|| "logger store is not initialized".to_string())?;
let mut total_deleted = 0u64;
for name in collections {
let coll_name = name.trim();
if coll_name.is_empty() {
continue;
}
let result = store
.client
.database(&store.db_name)
.collection::<Document>(coll_name)
.delete_many(doc! {})
.await
.map_err(|e| format!("logger purge failed for '{}': {}", coll_name, e))?;
total_deleted += result.deleted_count;
}
Ok(total_deleted)
}
fn epoch_secs() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@@ -1,48 +1,56 @@
//! # brokers/mod.rs — Broker Pool Manager
//! # brokers/mod.rs — Dispatcher Pool Manager
//!
//! Manages the lifecycle of all broker task pools. At IPL, `spawn_r_broker_pool()`
//! reads the instance count from config, spawns N rBroker Tokio tasks, and
//! Manages the lifecycle of the dispatcher task pool. At IPL, `spawn_dispatcher_pool()`
//! reads the instance count from config, spawns N dispatcher Tokio tasks, and
//! returns their JoinHandles to the caller.
//!
//! Each broker type gets its own pool function following the same pattern.
//! The dispatcher is a unified consumer that pulls from the single events queue,
//! determines class/schema via template lookup, and delegates to class instantiation.
//! No separate r_broker/w_broker; no schema-specific routing.
//!
//! The pool manager holds handles but does not supervise — task exit is logged
//! by the task itself. Supervision (respawn on crash) is a future addition.
//!
//! ## Calling Agents
//! - `ipl()` in main.rs — calls pool spawn functions after exchange declaration
//! - `ipl()` in main.rs — calls `spawn_dispatcher_pool()` after exchange declaration
//!
//! ## Outputs
//! - `Vec<JoinHandle<()>>` per broker type — held for clean shutdown
//! - `Vec<JoinHandle<()>>` — held for clean shutdown
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-05` - mks - original coding
//! * `2026-04-05` - mks - original coding (separate r_broker + w_broker)
//! * `2026-04-09` - mks - unified to single dispatcher pool
pub mod dispatcher;
pub mod error;
pub mod logger_store;
pub mod payload;
pub mod r_broker;
pub mod w_broker;
use std::sync::Arc;
use lapin::Connection;
use tokio::sync::watch;
use crate::config::BrokerServicesConfig;
use crate::template_registry::RuntimeTemplateRegistry;
use error::BrokerError;
/// Spawns the rBroker pool — N tasks as configured in `instances.r_broker`.
/// Spawns the dispatcher pool — N tasks as configured in `instances.dispatcher`.
///
/// Each task gets the shared AMQP connection, the queue tag, and its zero-based
/// instance index. The connection is wrapped in `Arc` so each task can open
/// its own channel without cloning the connection.
/// Each task gets the shared AMQP connection, the queue tag, its zero-based
/// instance index, and the template registry. The connection is wrapped in `Arc`
/// so each task can open its own channel without cloning the connection.
///
/// The dispatcher consumes from a single queue and routes to class instantiation
/// based on template lookup. No separate r_broker/w_broker pools.
///
/// # Arguments
///
/// * `conn` — the authenticated AMQP connection from IPL step 3b
/// * `cfg` — broker services config block (queue_tag + instance counts)
/// * `conn` — the authenticated AMQP connection from IPL
/// * `cfg` — broker services config block (queue_tag + instance count)
/// * `template_registry` — loaded and validated runtime templates
///
/// # Returns
///
@@ -51,62 +59,32 @@ use error::BrokerError;
///
/// # History
///
/// * `2026-04-05` - mks - original coding
pub async fn spawn_r_broker_pool(
/// * `2026-04-09` - mks - unified dispatcher pool
pub async fn spawn_dispatcher_pool(
conn: Arc<Connection>,
cfg: &BrokerServicesConfig,
template_registry: Arc<RuntimeTemplateRegistry>,
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
) -> Result<Vec<tokio::task::JoinHandle<()>>, BrokerError> {
let count = cfg.app_server.instances.r_broker;
// For now, use a sensible default if a specific dispatcher instance count isn't set
// In full config, this would be cfg.app_server.instances.dispatcher
let count = cfg.app_server.instances.r_broker.max(cfg.app_server.instances.w_broker);
let mut handles = Vec::with_capacity(count as usize);
for i in 0..count {
let handle = r_broker::spawn(
let handle = dispatcher::spawn(
Arc::clone(&conn),
cfg.queue_tag.clone(),
i,
Arc::clone(&template_registry),
).await?;
shutdown_tx.clone(),
shutdown_rx.clone(),
)
.await?;
handles.push(handle);
}
tracing::info!("rBroker pool started: {} instance(s)", count);
Ok(handles)
}
/// Spawns the wBroker pool — N tasks as configured in `instances.w_broker`.
///
/// # Arguments
///
/// * `conn` — the authenticated AMQP connection from IPL step 3b
/// * `cfg` — broker services config block (queue_tag + instance counts)
///
/// # Returns
///
/// `Ok(Vec<JoinHandle<()>>)` — one handle per spawned task.
/// `Err(BrokerError)` if any task fails to declare its queue before starting.
///
/// # History
///
/// * `2026-04-05` - mks - original coding
pub async fn spawn_w_broker_pool(
conn: Arc<Connection>,
cfg: &BrokerServicesConfig,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<Vec<tokio::task::JoinHandle<()>>, BrokerError> {
let count = cfg.app_server.instances.w_broker;
let mut handles = Vec::with_capacity(count as usize);
for i in 0..count {
let handle = w_broker::spawn(
Arc::clone(&conn),
cfg.queue_tag.clone(),
i,
Arc::clone(&template_registry),
).await?;
handles.push(handle);
}
tracing::info!("wBroker pool started: {} instance(s)", count);
tracing::info!("dispatcher pool started: {} instance(s)", count);
Ok(handles)
}

View File

@@ -28,6 +28,10 @@ pub struct BedsConfig {
/// Enables verbose debug output when true
pub debug: bool,
/// Enables trace-style method-entry logging for deep backend diagnostics
#[serde(default)]
pub trace_on: bool,
/// Enables journald syslog output when true
pub syslog: bool,
@@ -48,6 +52,21 @@ pub struct BedsConfig {
/// MariaDB (REL) node configs keyed by service name (e.g. "app_server")
pub rel_services: HashMap<String, RelNodeConfig>,
/// Development-only logger admin controls (safe defaults when omitted)
#[serde(default)]
pub logger_admin: LoggerAdminConfig,
}
/// Logger admin controls used primarily in development workflows.
#[derive(Debug, Deserialize, Default)]
pub struct LoggerAdminConfig {
/// When true in dev/development, purge selected collections on IPL.
pub purge_on_ipl: bool,
/// Collection names to purge when `purge_on_ipl` is enabled.
#[serde(default)]
pub purge_collections: Vec<String>,
}
/// Node identity block. Identifies this node's role and environment.

173
src/core/dispatch.rs Normal file
View File

@@ -0,0 +1,173 @@
//! # core/dispatch.rs — Broker-to-Domain Dispatch Boundary
//!
//! Defines the transport-neutral seam between AMQP brokers and domain logic.
//! The intent is to keep brokers focused on delivery concerns while class,
//! schema, and base I/O layers own business and storage behavior.
#![allow(dead_code)]
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use serde_json::Value;
use crate::template_registry::{RuntimeRecTemplate, RuntimeTemplateRegistry};
pub type RecordMap = HashMap<String, Value>;
pub type DispatchFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispatchOperation {
Fetch,
Write,
Update,
Delete,
}
impl DispatchOperation {
pub fn as_str(self) -> &'static str {
match self {
Self::Fetch => "fetch",
Self::Write => "write",
Self::Update => "update",
Self::Delete => "delete",
}
}
}
#[derive(Debug, Clone)]
pub struct DispatchRequest {
pub op: DispatchOperation,
pub template: String,
pub correlation_id: String,
pub payload: RecordMap,
}
#[derive(Debug, Clone, Default)]
pub struct DispatchContext {
pub reply_expected: bool,
pub best_effort: bool,
}
#[derive(Debug, Clone)]
pub enum DispatchResponse {
Records(Vec<RecordMap>),
Record(RecordMap),
Affected(u64),
Empty,
}
#[derive(Debug, thiserror::Error)]
pub enum DispatchError {
#[error("unknown template '{template}'")]
UnknownTemplate { template: String },
#[error("operation '{operation}' unsupported for template '{template}'")]
UnsupportedOperation { template: String, operation: String },
#[error("validation failed: {message}")]
Validation { message: String },
#[error("storage failed: {message}")]
Storage { message: String },
#[error("internal dispatch failure: {message}")]
Internal { message: String },
}
pub trait DispatchResolver: Send + Sync {
fn resolve_class(
&self,
request: &DispatchRequest,
) -> Result<Arc<dyn DomainClass>, DispatchError>;
fn resolve_template(
&self,
template: &str,
) -> Result<&RuntimeRecTemplate, DispatchError>;
}
pub trait DomainClass: Send + Sync {
fn class_id(&self) -> &'static str;
fn handle<'a>(
&'a self,
request: DispatchRequest,
context: DispatchContext,
) -> DispatchFuture<'a, Result<DispatchResponse, DispatchError>>;
}
pub trait SchemaHandler: Send + Sync {
fn schema_id(&self) -> &'static str;
fn execute<'a>(
&'a self,
request: &'a DispatchRequest,
) -> DispatchFuture<'a, Result<DispatchResponse, DispatchError>>;
}
pub trait BaseIoAdapter: Send + Sync {
fn media_id(&self) -> &'static str;
fn fetch<'a>(
&'a self,
template: &'a RuntimeRecTemplate,
params: &'a RecordMap,
) -> DispatchFuture<'a, Result<Vec<RecordMap>, DispatchError>>;
fn write<'a>(
&'a self,
template: &'a RuntimeRecTemplate,
data: &'a RecordMap,
) -> DispatchFuture<'a, Result<RecordMap, DispatchError>>;
fn update<'a>(
&'a self,
template: &'a RuntimeRecTemplate,
params: &'a RecordMap,
data: &'a RecordMap,
) -> DispatchFuture<'a, Result<u64, DispatchError>>;
fn delete<'a>(
&'a self,
template: &'a RuntimeRecTemplate,
params: &'a RecordMap,
) -> DispatchFuture<'a, Result<u64, DispatchError>>;
}
pub struct RegistryDispatchResolver {
template_registry: Arc<RuntimeTemplateRegistry>,
}
impl RegistryDispatchResolver {
pub fn new(template_registry: Arc<RuntimeTemplateRegistry>) -> Self {
Self { template_registry }
}
}
impl DispatchResolver for RegistryDispatchResolver {
fn resolve_class(
&self,
request: &DispatchRequest,
) -> Result<Arc<dyn DomainClass>, DispatchError> {
let template = self.resolve_template(&request.template)?;
Err(DispatchError::UnsupportedOperation {
template: template.template_class.clone(),
operation: request.op.as_str().to_string(),
})
}
fn resolve_template(
&self,
template: &str,
) -> Result<&RuntimeRecTemplate, DispatchError> {
self.template_registry
.get(template)
.ok_or_else(|| DispatchError::UnknownTemplate {
template: template.to_string(),
})
}
}

View File

@@ -24,6 +24,8 @@
//! ## History
//! * `2026-04-05` - mks - original coding (stub)
pub mod dispatch;
use std::collections::HashMap;
use serde_json::Value;

View File

@@ -11,7 +11,7 @@
//! - `BEDS_ENV` environment variable selects the env config file (default: dev)
//!
//! ## Outputs
//! - Running BEDS node, blocked on signal handler (not yet implemented)
//! - Running BEDS resident node after IPL (daemon-style foreground process)
//! - On fatal IPL failure: error output to console, process exits non-zero
//!
//! **Author:** ms
@@ -23,6 +23,7 @@
//! * `2026-04-04` - mks - promoted service modules to services/ directory
//! * `2026-04-04` - mks - ipl() made async for AMQP connection; tokio runtime added
//! * `2026-04-04` - mks - added AMQP authenticate + exchange declare to IPL sequence
//! * `2026-04-10` - mks - resident runtime loop + coordinated shutdown signaling
mod brokers;
mod config;
@@ -31,6 +32,176 @@ mod logging;
mod services;
mod template_registry;
use std::collections::HashMap;
use serde_json::Value;
use tokio::sync::watch;
use uuid::Uuid;
fn level_to_val(level: &str) -> i64 {
match level.to_ascii_lowercase().as_str() {
"debug" => 0,
"data" => 1,
"info" => 2,
"event" => 3,
"warning" | "warn" => 4,
"error" => 5,
"fatal" => 6,
"timer" => 7,
_ => 2,
}
}
struct IplLogContext {
env_name: String,
node_name: String,
root_event_id: String,
trace_on: bool,
seq: u64,
}
impl IplLogContext {
fn new(wbid: &str, env_name: &str, trace_on: bool) -> Self {
Self {
env_name: env_name.to_string(),
node_name: wbid.to_string(),
root_event_id: format!("{}.{}.{}", wbid, env_name, Uuid::new_v4()),
trace_on,
seq: 0,
}
}
fn next_event_id(&mut self) -> String {
self.seq += 1;
if self.seq == 1 {
self.root_event_id.clone()
} else {
format!("{}:{}", self.root_event_id, self.seq)
}
}
}
async fn append_ipl_log(
ctx: &mut IplLogContext,
event: &str,
level: &str,
message: &str,
method: &str,
line: u32,
) {
let event_id = ctx.next_event_id();
let depth = if event_id == ctx.root_event_id { 0i64 } else { 1i64 };
let mut data = HashMap::<String, Value>::new();
data.insert("service_log".to_string(), Value::String("beds".to_string()));
data.insert("node_log".to_string(), Value::String(ctx.node_name.clone()));
data.insert("resource".to_string(), Value::String("BEDS".to_string()));
data.insert("event_id".to_string(), Value::String(event_id));
data.insert("parent_id".to_string(), Value::String(ctx.root_event_id.clone()));
data.insert("depth".to_string(), Value::from(depth));
data.insert("level_log".to_string(), Value::String(level.to_ascii_lowercase()));
data.insert("severity_log".to_string(), Value::String(level.to_ascii_uppercase()));
data.insert("level_val".to_string(), Value::from(level_to_val(level)));
data.insert("event_log".to_string(), Value::String(event.to_string()));
data.insert("message_log".to_string(), Value::String(message.to_string()));
data.insert("env_log".to_string(), Value::String(ctx.env_name.clone()));
data.insert("file_log".to_string(), Value::String(file!().to_string()));
data.insert("method_log".to_string(), Value::String(method.to_string()));
data.insert("line_log".to_string(), Value::from(line as i64));
if let Err(e) = brokers::logger_store::append_log(data).await {
tracing::warn!("IPL logger event '{}' failed (non-fatal): {}", event, e);
}
}
async fn emit_log_level_showcase(ctx: &mut IplLogContext) {
let examples = [
(
"debug",
"LOG_LEVEL_EXAMPLE_DEBUG",
"DEBUG example: verbose developer diagnostics for method flow and state transitions.",
),
(
"data",
"LOG_LEVEL_EXAMPLE_DATA",
"DATA example: structured payload snapshot for validating in-flight data shape.",
),
(
"info",
"LOG_LEVEL_EXAMPLE_INFO",
"INFO example: normal lifecycle event confirming expected framework behavior.",
),
(
"event",
"LOG_LEVEL_EXAMPLE_EVENT",
"EVENT example: domain/event-chain marker tied to root GUID lineage.",
),
(
"warn",
"LOG_LEVEL_EXAMPLE_WARN",
"WARN example: degraded but recoverable condition that should be reviewed.",
),
(
"error",
"LOG_LEVEL_EXAMPLE_ERROR",
"ERROR example: operation failure requiring developer attention.",
),
(
"fatal",
"LOG_LEVEL_EXAMPLE_FATAL",
"FATAL example: unrecoverable condition requiring immediate operator intervention.",
),
(
"timer",
"LOG_LEVEL_EXAMPLE_TIMER",
"TIMER example: latency/elapsed-time marker for performance diagnostics.",
),
];
for (level, event, message) in examples {
append_ipl_log(ctx, event, level, message, "emit_log_level_showcase", line!()).await;
}
}
async fn append_ipl_trace(ctx: &mut IplLogContext, method: &str, line: u32) {
if !ctx.trace_on {
return;
}
append_ipl_log(
ctx,
"TRACE_METHOD_ENTER",
"debug",
&format!("enter method {}", method),
method,
line,
)
.await;
}
async fn try_log_ipl_failure(err: &str) {
let Ok(cfg) = config::load() else {
return;
};
if brokers::logger_store::init_from_rec_services(&cfg.rec_services, &cfg.id.env_name)
.await
.is_err()
{
return;
}
let mut ctx = IplLogContext::new(&cfg.id.wbid, &cfg.id.env_name, cfg.trace_on);
append_ipl_log(
&mut ctx,
"IPL_FATAL",
"fatal",
&format!("IPL failed: {}", err),
"main",
line!(),
)
.await;
}
/// Executes the BEDS Initial Program Load (IPL) sequence.
///
/// IPL bootstraps the node in strict order. Each step must succeed before
@@ -46,8 +217,8 @@ mod template_registry;
/// 4b. Authenticate to RabbitMQ + declare beds.events exchange
/// 5. Validate MongoDB reachability (TCP)
/// 6. Validate MariaDB reachability (TCP)
/// 7. Spawn broker pools (rBroker)
/// 8. Node green
/// 7. Spawn dispatcher pool
/// 8. Node green and resident runtime waits for shutdown command/signal
///
/// # Returns
///
@@ -59,17 +230,24 @@ mod template_registry;
/// * `2026-04-02` - mks - original coding
/// * `2026-04-04` - mks - made async; added AMQP auth + exchange declare
/// * `2026-04-05` - mks - added rBroker pool spawn (step 6)
async fn ipl() -> Result<(), String> {
async fn ipl(
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
) -> Result<Vec<tokio::task::JoinHandle<()>>, String> {
// load configuration — fatal in all environments if this fails
let cfg = config::load().map_err(|e| format!("Failed to load config: {}", e))?;
// initialize logging — must come before any tracing calls
logging::init_from_config(cfg.syslog, cfg.syslog_mirror_console);
let mut ipl_log = IplLogContext::new(&cfg.id.wbid, &cfg.id.env_name, cfg.trace_on);
tracing::info!("BEDS IPL starting, node={} env={}", cfg.id.wbid, cfg.id.env_name);
tracing::info!("Configuration loaded");
tracing::info!("Logging initialized");
append_ipl_trace(&mut ipl_log, "ipl", line!()).await;
// step 2b: load and validate REC templates used by runtime dispatch
let templates = template_registry::load_runtime_rec_templates("templates")
.map_err(|e| format!("Template validation failed: {}", e))?;
@@ -105,21 +283,77 @@ async fn ipl() -> Result<(), String> {
.map_err(|e| format!("Mongo logger store initialization failed: {}", e))?;
tracing::info!("Mongo logger store initialized");
let is_dev_env = matches!(cfg.id.env_name.as_str(), "dev" | "development");
if is_dev_env && cfg.logger_admin.purge_on_ipl {
append_ipl_trace(&mut ipl_log, "purge_on_ipl", line!()).await;
let deleted = brokers::logger_store::purge_collections(&cfg.logger_admin.purge_collections)
.await
.map_err(|e| format!("Mongo logger purge-on-IPL failed: {}", e))?;
tracing::warn!(
"DEV purge-on-IPL enabled: cleared {} record(s) from {:?}",
deleted,
cfg.logger_admin.purge_collections
);
}
// Emit startup milestones into logger storage after logger init succeeds.
// This gives the log dumper a reliable IPL trail.
append_ipl_log(
&mut ipl_log,
"IPL_LOGGER_STORE_READY",
"info",
"logger store initialized; IPL milestone capture active",
"ipl",
line!(),
)
.await;
append_ipl_log(
&mut ipl_log,
"IPL_BOOTSTRAP_RECORDED",
"info",
"completed bootstrap phases: config, logging, template validation, AMQP validation/auth/exchange, Mongo validation",
"ipl",
line!(),
)
.await;
// step 6: validate MariaDB reachability — fatal in production, non-fatal in all other envs
// secondary instance failures are always non-fatal (handled inside validate_all)
match services::mariadb::validate_all(&cfg.rel_services) {
Ok(()) => tracing::info!("MariaDB reachable"),
Ok(()) => {
tracing::info!("MariaDB reachable");
append_ipl_log(
&mut ipl_log,
"IPL_MARIADB_READY",
"info",
"MariaDB validation passed",
"ipl",
line!(),
)
.await;
}
Err(e) => {
if cfg.id.env_name == "production" {
return Err(e);
}
tracing::warn!("MariaDB unreachable (non-fatal in {}): {}", cfg.id.env_name, e);
append_ipl_log(
&mut ipl_log,
"IPL_MARIADB_DEGRADED",
"warn",
&format!("MariaDB unreachable (non-fatal in {}): {}", cfg.id.env_name, e),
"ipl",
line!(),
)
.await;
}
}
// step 7: spawn broker poolsqueues are declared here, not at exchange declare time
// rBroker pool requires an authenticated AMQP connection; skip in non-prod if unavailable
let _broker_handles: Vec<tokio::task::JoinHandle<()>> = {
// step 7: spawn dispatcher pool — single consumer from unified events queue
// each dispatcher instance instantiates classes based on template lookup
let dispatcher_handles: Vec<tokio::task::JoinHandle<()>> = {
use std::sync::Arc;
let shared_conn = Arc::new(
lapin::Connection::connect(
@@ -134,38 +368,89 @@ async fn ipl() -> Result<(), String> {
lapin::ConnectionProperties::default(),
)
.await
.map_err(|e| format!("Broker pool connection failed: {}", e))?,
.map_err(|e| format!("Dispatcher pool connection failed: {}", e))?,
);
let _ = &amqp_conn; // keep IPL AMQP connection alive
let r_handles = brokers::spawn_r_broker_pool(
let d_handles = brokers::spawn_dispatcher_pool(
Arc::clone(&shared_conn),
&cfg.broker_services,
std::sync::Arc::clone(&template_registry_state),
shutdown_tx,
shutdown_rx,
)
.await
.map_err(|e| format!("rBroker pool failed to start: {}", e))?;
.map_err(|e| format!("Dispatcher pool failed to start: {}", e))?;
let w_handles = brokers::spawn_w_broker_pool(
Arc::clone(&shared_conn),
&cfg.broker_services,
std::sync::Arc::clone(&template_registry_state),
)
.await
.map_err(|e| format!("wBroker pool failed to start: {}", e))?;
r_handles.into_iter().chain(w_handles).collect()
d_handles
};
append_ipl_log(
&mut ipl_log,
"IPL_DISPATCHER_POOL_READY",
"info",
"dispatcher pool started",
"ipl",
line!(),
)
.await;
tracing::info!("BEDS IPL complete — node green");
Ok(())
append_ipl_log(
&mut ipl_log,
"IPL_NODE_GREEN",
"info",
"BEDS IPL complete; node is green",
"ipl",
line!(),
)
.await;
emit_log_level_showcase(&mut ipl_log).await;
Ok(dispatcher_handles)
}
#[tokio::main]
async fn main() {
if let Err(e) = ipl().await {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let mut handles = match ipl(shutdown_tx.clone(), shutdown_rx.clone()).await {
Ok(handles) => handles,
Err(e) => {
try_log_ipl_failure(&e).await;
eprintln!("[BEDS] [FATAL] [IPL] {}", e);
std::process::exit(1);
}
};
tracing::info!("BEDS resident runtime active; waiting for shutdown command or Ctrl+C");
tokio::select! {
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
tracing::info!("BEDS shutdown requested by broker command");
}
}
sig = tokio::signal::ctrl_c() => {
match sig {
Ok(()) => {
tracing::info!("BEDS shutdown requested by Ctrl+C");
let _ = shutdown_tx.send(true);
}
Err(e) => {
tracing::error!("BEDS signal listener failure: {}", e);
}
}
}
}
while let Some(handle) = handles.pop() {
if let Err(e) = handle.await {
tracing::warn!("dispatcher task join error during shutdown: {}", e);
}
}
tracing::info!("BEDS shutdown complete");
}

View File

@@ -56,6 +56,10 @@ impl RuntimeTemplateRegistry {
pub fn contains_template(&self, template: &str) -> bool {
self.by_name.contains_key(&template.to_ascii_lowercase())
}
pub fn get(&self, template: &str) -> Option<&RuntimeRecTemplate> {
self.by_name.get(&template.to_ascii_lowercase())
}
}
struct RecTemplateManifest {

View File

@@ -6,6 +6,19 @@ IPL (Initial Program Load) is the BEDS bootstrap sequence. The term comes from I
`ipl()` is the first function called from `main()`. If IPL completes successfully, the node is green and enters its operational state. If any required step fails, IPL aborts and the process exits with a console error report.
## Current Runtime State (2026-04)
The current Rust runtime behavior has advanced beyond the original POC notes in this page:
1. `main()` is now resident after IPL completes.
2. Dispatcher workers are started as a pool and the process remains active waiting for shutdown.
3. A broker `shutdown` command now triggers coordinated process shutdown.
4. IPL writes structured startup milestones to logger storage (`msLogs`) after logger init.
5. Development config can optionally purge selected admin/logger collections on IPL.
6. Optional trace-style method-entry logging is available through config (`trace_on`) for deep backend diagnostics.
Treat the sequence below as conceptual IPL ordering; operational lifecycle now includes post-IPL resident runtime and coordinated shutdown.
## Why Order Matters
The IPL sequence is not arbitrary. Each step depends on the previous one:

View File

@@ -1,5 +1,17 @@
# Queue Topology
## Current Runtime Topology (2026-04)
The active Rust runtime currently uses a unified dispatcher model:
1. Single primary events queue per environment tag (`{queue_tag}events`).
2. Unified dispatcher pool consumes from that queue.
3. Queue binding uses topic wildcard (`#`) on `beds.events`.
4. `shutdown` operation is handled as a broker command that triggers coordinated global shutdown.
5. Legacy `rec.read` and `rec.write` broker files remain in repository history/reference, but runtime path is centered on unified dispatcher workers.
The broader broker matrix below remains useful as historical and planned topology context.
## Overview
BEDS uses a single RabbitMQ topic exchange for all data events. Topic exchanges route messages based on a dotted routing key — this gives BEDS fine-grained control over which brokers receive which events without the overhead of managing multiple exchanges.

View File

@@ -53,6 +53,10 @@ Implementation status update:
- Phase B has started: REC template registry loading and startup validation are now implemented in IPL.
- Phase B progression: runtime template registry state is now persisted and passed into broker workers for dispatch-time template validation.
- Reliability progression: deterministic ack/nack behavior and retry/DLQ queue topology are implemented for `rec.read` and `rec.write`.
- Runtime progression: unified dispatcher pool and single-queue consumption model are active.
- Lifecycle progression: BEDS now runs as a resident process after IPL and supports coordinated shutdown via broker `shutdown` command.
- Diagnostics progression: startup now emits structured IPL milestone logs (including severity examples) into logger storage for observability validation.
- Tooling progression: Rust Observatory utility (`log_dumper`) provides development log browsing, root-guid tracing, and single-record view.
## Must-Keep Invariants

View File

@@ -0,0 +1,166 @@
# BEDS Architecture Visual Brief
Audience: Senior managers, directors, and architects
Purpose: Explain what BEDS is, why AMQP is central, and how the software stack delivers operational impact.
## Executive Summary
BEDS (Back End Data System) is a data-service framework where AMQP (RabbitMQ) is the backbone for all data movement.
Instead of app code talking directly to databases, every request goes through AMQP first. This provides:
- Strong control over in-transit data paths
- Centralized routing and policy enforcement
- Better observability and auditability
- Scale-out through queue consumers (dispatcher pools)
- Isolation between transport concerns and domain/business logic
Business impact:
- Faster onboarding of new data domains
- More consistent operational controls
- Higher confidence in reliability and replayability
- Lower risk from tightly coupled app-to-database patterns
## One-Sentence Product Positioning
BEDS is an AMQP-first data orchestration platform that turns data access into a governed, scalable event pipeline instead of direct database calls.
## Architecture in Plain Language
When a client asks to read or write data:
1. The request is published into RabbitMQ.
2. A dispatcher pool consumes messages from the queue.
3. The dispatcher validates the envelope and selects the target template/class.
4. The class chain executes business logic and schema-specific behavior.
5. The storage adapters execute persistence in MongoDB or SQL systems.
6. A response is published back (if the request expects one).
The key design point: AMQP is the stable transport spine; domain classes can evolve without changing transport scaffolding.
## Software Stack (Top to Bottom)
### Layer 1: Experience and Integration
- API clients
- Internal tools (for example: log reader UI)
- Service integrations
### Layer 2: Transport and Delivery (AMQP)
- RabbitMQ exchange and queue topology
- Message envelopes (versioned request/response contracts)
- Acknowledgment and negative acknowledgment behavior
- Retry and dead-letter patterns
### Layer 3: Dispatch and Domain Runtime
- Dispatcher pool (consumer workers)
- Template registry lookup
- Class instantiation (domain object selected by template)
- Class -> Schema -> Base I/O execution path
### Layer 4: Data Adapters and Persistence
- MongoDB access (REC/document-oriented workloads)
- Relational DB access (REL/SQL workloads)
- Logger event storage (msLogs)
### Layer 5: Operations and Governance
- Structured logging and diagnostics
- Environment-configured runtime behavior
- Health checks and startup validation (IPL)
- Future auto-scaling supervisor concept for dispatcher pools
## Why AMQP Scaffolding Matters
AMQP is not just transport in this architecture. It is the control plane.
Benefits in management terms:
- Policy centralization: transport rules are applied consistently
- Observability: queue behavior, consumer behavior, and message flow are inspectable
- Safety: retries and DLQs protect against transient failures
- Flexibility: adding new domains does not require redesigning transport
- Scale: increase dispatcher workers to handle backlog without app rewrites
## Current Node and Flow Model
- Single binary deployment with role/config-driven runtime behavior
- Unified dispatcher model consuming queue messages
- Template-driven class resolution to mask schema details from transport observers
In effect, transport sees standard envelopes; schema details are revealed only at class execution time.
## Reference Data Flow (Narrative)
Client request -> RabbitMQ message -> Dispatcher consumes -> Template/class resolution -> Domain logic -> Mongo/SQL adapter -> Reply message
This pattern supports both:
- Request/response operations
- Fire-and-forget telemetry (for example: logger semantics)
## Leadership-Level Benefits
- Architectural consistency across teams and services
- Reduced change risk due to clear layer boundaries
- Operational leverage through queue-based scaling
- Better compliance posture from centralized message handling
- Strong foundation for future AI-assisted data object generation
## Visual Blueprint (for Diagram or Image Generation)
Use this structure when creating architecture visuals:
- Left side: Client/API and internal tools
- Center spine: RabbitMQ (highlighted as "AMQP Control Plane")
- Right side: Dispatcher pool and class runtime
- Bottom: MongoDB + SQL data stores
- Overlay badges: Auditability, Retry/DLQ, Scale-out Consumers, Config-Driven Nodes
Color and style suggestions:
- Transport spine in teal/blue
- Domain runtime in neutral steel/graphite
- Data stores in earthy accents
- Use arrows to emphasize directional flow and feedback loop for responses
## Prompt Pack for AI Image Generation
### Prompt A: Executive Architecture Poster
Create a modern enterprise architecture poster titled "BEDS: AMQP-First Data Orchestration". Show a left-to-right pipeline: clients and tools -> RabbitMQ AMQP control plane -> dispatcher worker pool -> class/schema execution -> MongoDB and SQL storage. Include callout badges: auditability, delivery guarantees, retry/DLQ safety, scalable consumers, config-driven deployment. Style should be clean, boardroom-ready, and easy to understand for non-engineering leadership.
### Prompt B: Technical Leadership Slide Graphic
Generate a diagram for a CTO slide that explains a message-driven data platform. Center RabbitMQ as the control plane. Show message envelopes entering from APIs, dispatcher consumers scaling horizontally, template-driven class resolution, and adapters writing to document and relational stores. Emphasize "no direct app-to-database coupling". Use polished enterprise iconography and concise labels.
### Prompt C: Impact-Focused Infographic
Produce an infographic with three bands: "How it works", "Why it matters", "Business impact". In "How it works", illustrate AMQP-first request flow through dispatchers into data stores. In "Why it matters", list governance, observability, reliability, and scaling. In "Business impact", highlight faster delivery, lower operational risk, and better control of data in transit.
## Diagram Source (Mermaid)
```mermaid
flowchart LR
A[Clients and Internal Tools] --> B[RabbitMQ AMQP Control Plane]
B --> C[Dispatcher Pool]
C --> D[Template Registry and Class Resolution]
D --> E[Class -> Schema -> Base IO]
E --> F[(MongoDB REC)]
E --> G[(SQL REL)]
E --> H[(msLogs Logger Store)]
E --> I[Response Envelope]
I --> A
B -.-> J[Retry and DLQ]
C -.-> K[Horizontal Scale-Out]
B -.-> L[Audit and Transport Controls]
```
## Suggested Talking Track (30-45 seconds)
"BEDS moves data access from direct database calls into an AMQP-governed pipeline. RabbitMQ becomes the transport control plane, and dispatcher workers handle execution through template-driven class resolution. This gives us stronger observability, safer failure handling, and easier horizontal scaling. Operationally, we gain consistency and control; strategically, we gain a platform that is easier to extend and govern."

View File

@@ -18,6 +18,7 @@ If you are reading this as a new contributor, start here and read in order. The
- [IPL — Initial Program Load](04-ipl.md) — The bootstrap sequence, step by step, and why order matters
- [Configuration System](05-configuration.md) — Layered TOML, environment files, topology options
- [Modernization Roadmap](10-modernization-roadmap.md) — POC-first execution sequence and modernization requirements
- [Architecture Visual Brief](11-beds-architecture-visual-brief.md) — Leadership-facing architecture narrative and diagram prompts
### Messaging
- [Queue Topology](06-queue-topology.md) — AMQP exchanges, queues, routing keys, and the broker model