diff --git a/src/brokers/mod.rs b/src/brokers/mod.rs index 3b2293a..52af141 100644 --- a/src/brokers/mod.rs +++ b/src/brokers/mod.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use lapin::Connection; use crate::config::BrokerServicesConfig; +use crate::template_registry::RuntimeTemplateRegistry; use error::BrokerError; /// Spawns the rBroker pool — N tasks as configured in `instances.r_broker`. @@ -54,6 +55,7 @@ use error::BrokerError; pub async fn spawn_r_broker_pool( conn: Arc, cfg: &BrokerServicesConfig, + template_registry: Arc, ) -> Result>, BrokerError> { let count = cfg.app_server.instances.r_broker; let mut handles = Vec::with_capacity(count as usize); @@ -63,6 +65,7 @@ pub async fn spawn_r_broker_pool( Arc::clone(&conn), cfg.queue_tag.clone(), i, + Arc::clone(&template_registry), ).await?; handles.push(handle); } @@ -89,6 +92,7 @@ pub async fn spawn_r_broker_pool( pub async fn spawn_w_broker_pool( conn: Arc, cfg: &BrokerServicesConfig, + template_registry: Arc, ) -> Result>, BrokerError> { let count = cfg.app_server.instances.w_broker; let mut handles = Vec::with_capacity(count as usize); @@ -98,6 +102,7 @@ pub async fn spawn_w_broker_pool( Arc::clone(&conn), cfg.queue_tag.clone(), i, + Arc::clone(&template_registry), ).await?; handles.push(handle); } diff --git a/src/brokers/payload.rs b/src/brokers/payload.rs index ef850f1..fe14683 100644 --- a/src/brokers/payload.rs +++ b/src/brokers/payload.rs @@ -1,69 +1,86 @@ -//! # brokers/payload.rs — AMQP Message Payload +//! # brokers/payload.rs — AMQP Envelope Contracts //! -//! Defines the JSON body structure carried in all BEDS broker messages. -//! The AMQP envelope handles routing (type header, reply_to, correlation_id); -//! this struct is what lives in the message body. -//! -//! ## Wire Format -//! -//! ```json -//! { -//! "template": "usr", -//! "data": { "first_name": "joe", "status": "active" } -//! } -//! ``` -//! -//! `template` names the data object (maps to a NamasteCore implementor). -//! `data` carries key/value pairs in user-facing field names — the template -//! maps these to actual schema names. Callers never specify primary keys on -//! writes; the template generates a GUID and returns it in the reply. -//! -//! ## Calling Agents -//! - `brokers::r_broker` — parsed from message body on fetch events -//! - `brokers::w_broker` — parsed from message body on write/update/delete events -//! -//! **Author:** mks -//! **Version:** 1.0 -//! -//! ## History -//! * `2026-04-05` - mks - original coding +//! Strict 1.0 request/response contracts for broker message bodies. use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_json::Value; -/// The JSON body of every BEDS broker message. -/// -/// # Examples -/// -/// ``` -/// use rustybeds::brokers::payload::BrokerPayload; -/// -/// let json = r#"{"template":"usr","data":{"first_name":"joe"}}"#; -/// let payload: BrokerPayload = serde_json::from_str(json).unwrap(); -/// assert_eq!(payload.template, "usr"); -/// assert!(payload.data.contains_key("first_name")); -/// ``` -/// -/// Both read and write brokers parse this struct from the raw AMQP delivery -/// bytes. The operation type is carried in the AMQP `type` message property — -/// this struct carries the object identity and data payload. -/// -/// # History -/// -/// * `2026-04-05` - mks - original coding +pub const ENVELOPE_VERSION: &str = "1.0"; + #[derive(Debug, Deserialize, Serialize)] -pub struct BrokerPayload { - /// Template identifier — names the NamasteCore implementor to dispatch to. - /// Matches the TLA convention from the template file (e.g. `"usr"`, `"pst"`). +pub struct BrokerRequestEnvelope { + pub version: String, + pub op: String, pub template: String, - /// Key/value data pairs in user-facing field names. - /// For writes: the record to store (pkey excluded — generated by template). - /// For reads: query discriminants (field → value to match). - /// For deletes: discriminants identifying the record(s) to remove. #[serde(default)] - pub data: HashMap, + pub correlation_id: String, + + #[serde(default)] + pub payload: HashMap, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct BrokerResponseEnvelope { + pub version: String, + pub op: String, + pub correlation_id: String, + pub status: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub error_code: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + + pub payload: Value, +} + +pub fn parse_request(bytes: &[u8]) -> Result { + let env: BrokerRequestEnvelope = + serde_json::from_slice(bytes).map_err(|e| format!("invalid envelope JSON: {}", e))?; + + if env.version != ENVELOPE_VERSION { + return Err(format!( + "unsupported envelope version '{}', expected '{}'", + env.version, ENVELOPE_VERSION + )); + } + if env.op.trim().is_empty() { + return Err("missing required field 'op'".to_string()); + } + if env.template.trim().is_empty() { + return Err("missing required field 'template'".to_string()); + } + + Ok(env) +} + +pub fn success_response(op: &str, correlation_id: &str, payload: Value) -> Vec { + let response = BrokerResponseEnvelope { + version: ENVELOPE_VERSION.to_string(), + op: op.to_string(), + correlation_id: correlation_id.to_string(), + status: "ok".to_string(), + error_code: None, + message: None, + payload, + }; + serde_json::to_vec(&response).unwrap_or_else(|_| b"{}".to_vec()) +} + +pub fn error_response(op: &str, correlation_id: &str, code: &str, message: &str) -> Vec { + let response = BrokerResponseEnvelope { + version: ENVELOPE_VERSION.to_string(), + op: op.to_string(), + correlation_id: correlation_id.to_string(), + status: "error".to_string(), + error_code: Some(code.to_string()), + message: Some(message.to_string()), + payload: Value::Object(serde_json::Map::new()), + }; + serde_json::to_vec(&response).unwrap_or_else(|_| b"{}".to_vec()) } #[cfg(test)] @@ -71,38 +88,38 @@ mod tests { use super::*; #[test] - fn deserializes_full_payload() { - let json = r#"{"template":"usr","data":{"first_name":"joe","status":"active"}}"#; - let payload: BrokerPayload = serde_json::from_str(json).unwrap(); - assert_eq!(payload.template, "usr"); - assert_eq!(payload.data.len(), 2); - assert_eq!(payload.data["first_name"], "joe"); - assert_eq!(payload.data["status"], "active"); + fn parses_valid_request_envelope() { + let json = r#"{"version":"1.0","op":"fetch","template":"Logger","correlation_id":"c1","payload":{"limit":10}}"#; + let env = parse_request(json.as_bytes()).expect("parse should succeed"); + assert_eq!(env.version, "1.0"); + assert_eq!(env.op, "fetch"); + assert_eq!(env.template, "Logger"); + assert_eq!(env.correlation_id, "c1"); + assert_eq!(env.payload["limit"], 10); } #[test] - fn deserializes_without_data_field() { - // data is optional — fetch by template name alone is valid - let json = r#"{"template":"usr"}"#; - let payload: BrokerPayload = serde_json::from_str(json).unwrap(); - assert_eq!(payload.template, "usr"); - assert!(payload.data.is_empty()); + fn rejects_legacy_unversioned_shape() { + let json = r#"{"template":"Logger","data":{"limit":10}}"#; + let err = parse_request(json.as_bytes()).expect_err("parse should fail"); + assert!(err.contains("invalid envelope JSON") || err.contains("missing required")); } #[test] - fn serializes_round_trip() { - let json = r#"{"template":"pst","data":{"title":"hello"}}"#; - let payload: BrokerPayload = serde_json::from_str(json).unwrap(); - let serialized = serde_json::to_string(&payload).unwrap(); - let round_trip: BrokerPayload = serde_json::from_str(&serialized).unwrap(); - assert_eq!(round_trip.template, payload.template); - assert_eq!(round_trip.data["title"], payload.data["title"]); + fn rejects_wrong_version() { + let json = r#"{"version":"0.9","op":"fetch","template":"Logger","payload":{}}"#; + let err = parse_request(json.as_bytes()).expect_err("parse should fail"); + assert!(err.contains("unsupported envelope version")); } #[test] - fn rejects_missing_template() { - let json = r#"{"data":{"first_name":"joe"}}"#; - let result: Result = serde_json::from_str(json); - assert!(result.is_err()); + fn builds_error_response_envelope() { + let bytes = error_response("write", "c123", "INVALID_PAYLOAD", "bad"); + let env: BrokerResponseEnvelope = serde_json::from_slice(&bytes).expect("response should parse"); + assert_eq!(env.version, ENVELOPE_VERSION); + assert_eq!(env.op, "write"); + assert_eq!(env.correlation_id, "c123"); + assert_eq!(env.status, "error"); + assert_eq!(env.error_code.as_deref(), Some("INVALID_PAYLOAD")); } } diff --git a/src/brokers/r_broker.rs b/src/brokers/r_broker.rs index 501be46..3c50ae5 100644 --- a/src/brokers/r_broker.rs +++ b/src/brokers/r_broker.rs @@ -37,20 +37,42 @@ use futures_lite::StreamExt; use lapin::{ BasicProperties, Channel, Connection, options::{ - BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, QueueBindOptions, QueueDeclareOptions, }, - types::FieldTable, + types::{AMQPValue, FieldTable, LongString, ShortString}, }; -use crate::services::amqp::EXCHANGE_NAME; +use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME}; use crate::brokers::logger_store; -use crate::brokers::payload::BrokerPayload; +use crate::brokers::payload::{ + BrokerRequestEnvelope, + error_response, + parse_request, + success_response, +}; +use crate::template_registry::RuntimeTemplateRegistry; use super::error::BrokerError; /// Routing key this broker binds to. const ROUTING_KEY: &str = "rec.read"; +enum AckAction { + Ack, + Nack { requeue: bool }, +} + +struct BrokerOutcome { + reply_payload: Option>, + ack_action: AckAction, +} + +#[derive(Clone)] +struct BrokerQueues { + primary: String, + retry: String, +} + /// Spawns a single rBroker task and returns immediately. /// /// The task runs until it receives a `shutdown` event or the AMQP connection @@ -74,11 +96,24 @@ pub async fn spawn( conn: Arc, queue_tag: String, instance_id: u32, + template_registry: Arc, ) -> Result, BrokerError> { // each broker task owns its own channel — channels are cheap, connections are not let channel = conn.create_channel().await?; let queue_name = format!("{}rec.read", queue_tag); + let retry_queue_name = format!("{}rec.read.retry", queue_tag); + let dlq_queue_name = format!("{}rec.read.dlq", queue_tag); + + let mut primary_args = FieldTable::default(); + primary_args.insert( + ShortString::from("x-dead-letter-exchange"), + AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)), + ); + primary_args.insert( + ShortString::from("x-dead-letter-routing-key"), + AMQPValue::LongString(LongString::from("rec.read.dlq")), + ); // declare the queue — idempotent; safe to call on restart channel @@ -88,6 +123,52 @@ pub async fn spawn( durable: true, ..Default::default() }, + primary_args, + ) + .await?; + + let mut retry_args = FieldTable::default(); + retry_args.insert( + ShortString::from("x-message-ttl"), + AMQPValue::LongUInt(5_000), + ); + retry_args.insert( + ShortString::from("x-dead-letter-exchange"), + AMQPValue::LongString(LongString::from(EXCHANGE_NAME)), + ); + retry_args.insert( + ShortString::from("x-dead-letter-routing-key"), + AMQPValue::LongString(LongString::from(ROUTING_KEY)), + ); + + channel + .queue_declare( + &retry_queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + retry_args, + ) + .await?; + + channel + .queue_declare( + &dlq_queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + channel + .queue_bind( + &dlq_queue_name, + DLX_EXCHANGE_NAME, + "rec.read.dlq", + QueueBindOptions::default(), FieldTable::default(), ) .await?; @@ -106,7 +187,11 @@ pub async fn spawn( tracing::info!("rBroker[{}] queue '{}' declared and bound", instance_id, queue_name); let handle = tokio::spawn(async move { - if let Err(e) = run(channel, queue_name, instance_id).await { + let queues = BrokerQueues { + primary: queue_name, + retry: retry_queue_name, + }; + if let Err(e) = run(channel, queues, instance_id, template_registry).await { tracing::error!("rBroker[{}] exited with error: {}", instance_id, e); } }); @@ -126,21 +211,22 @@ pub async fn spawn( /// * `2026-04-05` - mks - original coding async fn run( channel: Channel, - queue_name: String, + queues: BrokerQueues, instance_id: u32, + template_registry: Arc, ) -> Result<(), BrokerError> { let consumer_tag = format!("rbroker-{}", instance_id); let mut consumer = channel .basic_consume( - &queue_name, + &queues.primary, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default(), ) .await?; - tracing::info!("rBroker[{}] consuming on '{}'", instance_id, queue_name); + tracing::info!("rBroker[{}] consuming on '{}'", instance_id, queues.primary); while let Some(delivery) = consumer.next().await { let delivery = match delivery { @@ -151,7 +237,6 @@ async fn run( } }; - // extract the event type from the message type header let event_type = delivery .properties .kind() @@ -159,48 +244,89 @@ async fn run( .map(|s| s.as_str().to_string()) .unwrap_or_default(); + let header_correlation_id = delivery + .properties + .correlation_id() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + tracing::debug!("rBroker[{}] received event='{}'", instance_id, event_type); - let reply_payload: Option> = match event_type.as_str() { - "ping" => handle_ping(instance_id), + let envelope = match parse_request(&delivery.data) { + Ok(env) => env, + Err(e) => { + let payload = error_response( + "unknown", + &header_correlation_id, + "INVALID_ENVELOPE", + &e, + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + }; + + let correlation_id = if envelope.correlation_id.trim().is_empty() { + header_correlation_id + } else { + envelope.correlation_id.clone() + }; + + if !event_type.is_empty() && event_type != envelope.op { + let payload = error_response( + &envelope.op, + &correlation_id, + "OP_MISMATCH", + "AMQP type header does not match envelope op", + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + + let outcome = match envelope.op.as_str() { + "ping" => BrokerOutcome { + reply_payload: Some(handle_ping(&correlation_id, instance_id)), + ack_action: AckAction::Ack, + }, "shutdown" => { // ack before exiting so the message is not redelivered let _ = delivery.ack(BasicAckOptions::default()).await; tracing::info!("rBroker[{}] shutdown event received — exiting", instance_id); break; } - "fetch" => handle_fetch(&delivery.data, instance_id).await, + "fetch" => handle_fetch(envelope, &correlation_id, instance_id, &template_registry).await, unknown => { tracing::warn!("rBroker[{}] unknown event type '{}'", instance_id, unknown); - None + BrokerOutcome { + reply_payload: Some(error_response( + unknown, + &correlation_id, + "UNSUPPORTED_OPERATION", + "unsupported operation for rec.read", + )), + ack_action: AckAction::Nack { requeue: false }, + } } }; - // publish reply if the event specified a reply_to queue - if let Some(payload) = reply_payload { - if let Some(reply_to) = delivery.properties.reply_to().as_ref() { - let reply_queue = reply_to.as_str().to_string(); - let correlation_id = delivery.properties.correlation_id().clone(); - - let props = BasicProperties::default() - .with_correlation_id(correlation_id.unwrap_or_default()); - - if let Err(e) = channel - .basic_publish( - "", // default exchange — direct to queue by name - &reply_queue, - BasicPublishOptions::default(), - &payload, - props, - ) - .await - { - tracing::error!("rBroker[{}] reply publish failed: {}", instance_id, e); - } - } + if let Some(payload) = outcome.reply_payload { + publish_reply(&channel, &delivery, payload).await; } - let _ = delivery.ack(BasicAckOptions::default()).await; + apply_ack_action(&channel, &delivery, &queues, outcome.ack_action).await; } tracing::info!("rBroker[{}] consume loop exited", instance_id); @@ -212,15 +338,21 @@ async fn run( /// # History /// /// * `2026-04-05` - mks - original coding -fn handle_ping(instance_id: u32) -> Option> { +fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); - let response = format!(r#"{{"status":"ok","broker":"rBroker","instance":{},"ts":{}}}"#, instance_id, ts); - tracing::debug!("rBroker[{}] ping response: {}", instance_id, response); - Some(response.into_bytes()) + success_response( + "ping", + correlation_id, + serde_json::json!({ + "broker": "rBroker", + "instance": instance_id, + "ts": ts, + }), + ) } /// Stub handler for `fetch` events. @@ -231,35 +363,43 @@ fn handle_ping(instance_id: u32) -> Option> { /// # History /// /// * `2026-04-05` - mks - stub -async fn handle_fetch(data: &[u8], instance_id: u32) -> Option> { - let payload: BrokerPayload = match serde_json::from_slice(data) { - Ok(p) => p, - Err(e) => { - let response = serde_json::json!({ - "status": "error", - "code": "INVALID_PAYLOAD", - "message": format!("invalid JSON payload: {}", e), - }); - return Some(response.to_string().into_bytes()); - } - }; +async fn handle_fetch( + envelope: BrokerRequestEnvelope, + correlation_id: &str, + instance_id: u32, + template_registry: &RuntimeTemplateRegistry, +) -> BrokerOutcome { + if !template_registry.contains_template(&envelope.template) { + return BrokerOutcome { + reply_payload: Some(error_response( + "fetch", + correlation_id, + "TEMPLATE_NOT_FOUND", + "requested template is not registered at runtime", + )), + ack_action: AckAction::Nack { requeue: false }, + }; + } - if !logger_store::is_logger_template(&payload.template) { + if !logger_store::is_logger_template(&envelope.template) { tracing::warn!( "rBroker[{}] fetch template '{}' not implemented yet", instance_id, - payload.template + envelope.template ); - let response = serde_json::json!({ - "status": "error", - "code": "NOT_IMPLEMENTED", - "message": "only logger fetch is implemented in PoC step 1", - }); - return Some(response.to_string().into_bytes()); + return BrokerOutcome { + reply_payload: Some(error_response( + "fetch", + correlation_id, + "NOT_IMPLEMENTED", + "only logger fetch is implemented in PoC step 1", + )), + ack_action: AckAction::Nack { requeue: false }, + }; } - let limit = payload - .data + let limit = envelope + .payload .get("limit") .and_then(|v| v.as_u64()) .map(|n| n as usize) @@ -268,19 +408,93 @@ async fn handle_fetch(data: &[u8], instance_id: u32) -> Option> { let logs = match logger_store::fetch_recent(limit).await { Ok(items) => items, Err(e) => { - let response = serde_json::json!({ - "status": "error", - "code": "LOGGER_STORE_UNAVAILABLE", - "message": e, - }); - return Some(response.to_string().into_bytes()); + tracing::warn!("rBroker[{}] fetch retryable error: {}", instance_id, e); + return BrokerOutcome { + reply_payload: None, + ack_action: AckAction::Nack { requeue: true }, + }; } }; - let response = serde_json::json!({ - "status": "ok", - "code": "LOGGER_FETCH", - "count": logs.len(), - "logs": logs, - }); - Some(response.to_string().into_bytes()) + + BrokerOutcome { + reply_payload: Some(success_response( + "fetch", + correlation_id, + serde_json::json!({ + "code": "LOGGER_FETCH", + "count": logs.len(), + "logs": logs, + }), + )), + ack_action: AckAction::Ack, + } +} + +async fn publish_reply( + channel: &Channel, + delivery: &lapin::message::Delivery, + payload: Vec, +) { + if let Some(reply_to) = delivery.properties.reply_to().as_ref() { + let reply_queue = reply_to.as_str().to_string(); + let correlation_id = delivery.properties.correlation_id().clone(); + + let props = BasicProperties::default().with_correlation_id(correlation_id.unwrap_or_default()); + + if let Err(e) = channel + .basic_publish( + "", + &reply_queue, + BasicPublishOptions::default(), + &payload, + props, + ) + .await + { + tracing::error!("reply publish failed: {}", e); + } + } +} + +async fn apply_ack_action( + channel: &Channel, + delivery: &lapin::message::Delivery, + queues: &BrokerQueues, + action: AckAction, +) { + match action { + AckAction::Ack => { + let _ = delivery.ack(BasicAckOptions::default()).await; + } + AckAction::Nack { requeue } => { + if requeue { + let retry_publish = channel + .basic_publish( + "", + &queues.retry, + BasicPublishOptions::default(), + &delivery.data, + delivery.properties.clone(), + ) + .await; + if retry_publish.is_ok() { + let _ = delivery.ack(BasicAckOptions::default()).await; + } else { + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: true, + }) + .await; + } + return; + } + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue, + }) + .await; + } + } } diff --git a/src/brokers/w_broker.rs b/src/brokers/w_broker.rs index e3d11d8..b1a31e7 100644 --- a/src/brokers/w_broker.rs +++ b/src/brokers/w_broker.rs @@ -37,20 +37,42 @@ use futures_lite::StreamExt; use lapin::{ BasicProperties, Channel, Connection, options::{ - BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, QueueBindOptions, QueueDeclareOptions, }, - types::FieldTable, + types::{AMQPValue, FieldTable, LongString, ShortString}, }; -use crate::services::amqp::EXCHANGE_NAME; +use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME}; use crate::brokers::logger_store; -use crate::brokers::payload::BrokerPayload; +use crate::brokers::payload::{ + BrokerRequestEnvelope, + error_response, + parse_request, + success_response, +}; +use crate::template_registry::RuntimeTemplateRegistry; use super::error::BrokerError; /// Routing key this broker binds to. const ROUTING_KEY: &str = "rec.write"; +enum AckAction { + Ack, + Nack { requeue: bool }, +} + +struct BrokerOutcome { + reply_payload: Option>, + ack_action: AckAction, +} + +#[derive(Clone)] +struct BrokerQueues { + primary: String, + retry: String, +} + /// Spawns a single wBroker task and returns immediately. /// /// # Arguments @@ -71,9 +93,22 @@ pub async fn spawn( conn: Arc, queue_tag: String, instance_id: u32, + template_registry: Arc, ) -> Result, BrokerError> { let channel = conn.create_channel().await?; let queue_name = format!("{}rec.write", queue_tag); + let retry_queue_name = format!("{}rec.write.retry", queue_tag); + let dlq_queue_name = format!("{}rec.write.dlq", queue_tag); + + let mut primary_args = FieldTable::default(); + primary_args.insert( + ShortString::from("x-dead-letter-exchange"), + AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)), + ); + primary_args.insert( + ShortString::from("x-dead-letter-routing-key"), + AMQPValue::LongString(LongString::from("rec.write.dlq")), + ); channel .queue_declare( @@ -82,6 +117,52 @@ pub async fn spawn( durable: true, ..Default::default() }, + primary_args, + ) + .await?; + + let mut retry_args = FieldTable::default(); + retry_args.insert( + ShortString::from("x-message-ttl"), + AMQPValue::LongUInt(5_000), + ); + retry_args.insert( + ShortString::from("x-dead-letter-exchange"), + AMQPValue::LongString(LongString::from(EXCHANGE_NAME)), + ); + retry_args.insert( + ShortString::from("x-dead-letter-routing-key"), + AMQPValue::LongString(LongString::from(ROUTING_KEY)), + ); + + channel + .queue_declare( + &retry_queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + retry_args, + ) + .await?; + + channel + .queue_declare( + &dlq_queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + channel + .queue_bind( + &dlq_queue_name, + DLX_EXCHANGE_NAME, + "rec.write.dlq", + QueueBindOptions::default(), FieldTable::default(), ) .await?; @@ -99,7 +180,11 @@ pub async fn spawn( tracing::info!("wBroker[{}] queue '{}' declared and bound", instance_id, queue_name); let handle = tokio::spawn(async move { - if let Err(e) = run(channel, queue_name, instance_id).await { + let queues = BrokerQueues { + primary: queue_name, + retry: retry_queue_name, + }; + if let Err(e) = run(channel, queues, instance_id, template_registry).await { tracing::error!("wBroker[{}] exited with error: {}", instance_id, e); } }); @@ -117,21 +202,22 @@ pub async fn spawn( /// * `2026-04-05` - mks - original coding async fn run( channel: Channel, - queue_name: String, + queues: BrokerQueues, instance_id: u32, + template_registry: Arc, ) -> Result<(), BrokerError> { let consumer_tag = format!("wbroker-{}", instance_id); let mut consumer = channel .basic_consume( - &queue_name, + &queues.primary, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default(), ) .await?; - tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queue_name); + tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queues.primary); while let Some(delivery) = consumer.next().await { let delivery = match delivery { @@ -149,48 +235,90 @@ async fn run( .map(|s| s.as_str().to_string()) .unwrap_or_default(); + let header_correlation_id = delivery + .properties + .correlation_id() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + tracing::debug!("wBroker[{}] received event='{}'", instance_id, event_type); - let reply_payload: Option> = match event_type.as_str() { - "ping" => handle_ping(instance_id), + let envelope = match parse_request(&delivery.data) { + Ok(env) => env, + Err(e) => { + let payload = error_response( + "unknown", + &header_correlation_id, + "INVALID_ENVELOPE", + &e, + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + }; + + let correlation_id = if envelope.correlation_id.trim().is_empty() { + header_correlation_id + } else { + envelope.correlation_id.clone() + }; + + if !event_type.is_empty() && event_type != envelope.op { + let payload = error_response( + &envelope.op, + &correlation_id, + "OP_MISMATCH", + "AMQP type header does not match envelope op", + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + + let outcome = match envelope.op.as_str() { + "ping" => BrokerOutcome { + reply_payload: Some(handle_ping(&correlation_id, instance_id)), + ack_action: AckAction::Ack, + }, "shutdown" => { let _ = delivery.ack(BasicAckOptions::default()).await; tracing::info!("wBroker[{}] shutdown event received — exiting", instance_id); break; } - "write" => handle_write(&delivery.data, instance_id).await, - "update" => handle_update(&delivery.data, instance_id), - "delete" => handle_delete(&delivery.data, instance_id), + "write" => handle_write(envelope, &correlation_id, instance_id, &template_registry).await, + "update" => handle_update(&correlation_id, instance_id), + "delete" => handle_delete(&correlation_id, instance_id), unknown => { tracing::warn!("wBroker[{}] unknown event type '{}'", instance_id, unknown); - None + BrokerOutcome { + reply_payload: Some(error_response( + unknown, + &correlation_id, + "UNSUPPORTED_OPERATION", + "unsupported operation for rec.write", + )), + ack_action: AckAction::Nack { requeue: false }, + } } }; - if let Some(payload) = reply_payload { - if let Some(reply_to) = delivery.properties.reply_to().as_ref() { - let reply_queue = reply_to.as_str().to_string(); - let correlation_id = delivery.properties.correlation_id().clone(); - - let props = BasicProperties::default() - .with_correlation_id(correlation_id.unwrap_or_default()); - - if let Err(e) = channel - .basic_publish( - "", - &reply_queue, - BasicPublishOptions::default(), - &payload, - props, - ) - .await - { - tracing::error!("wBroker[{}] reply publish failed: {}", instance_id, e); - } - } + if let Some(payload) = outcome.reply_payload { + publish_reply(&channel, &delivery, payload).await; } - let _ = delivery.ack(BasicAckOptions::default()).await; + apply_ack_action(&channel, &delivery, &queues, outcome.ack_action).await; } tracing::info!("wBroker[{}] consume loop exited", instance_id); @@ -202,17 +330,21 @@ async fn run( /// # History /// /// * `2026-04-05` - mks - original coding -fn handle_ping(instance_id: u32) -> Option> { +fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); - let response = format!( - r#"{{"status":"ok","broker":"wBroker","instance":{},"ts":{}}}"#, - instance_id, ts - ); - Some(response.into_bytes()) + success_response( + "ping", + correlation_id, + serde_json::json!({ + "broker": "wBroker", + "instance": instance_id, + "ts": ts, + }), + ) } /// Stub handler for `write` events. @@ -220,46 +352,63 @@ fn handle_ping(instance_id: u32) -> Option> { /// # History /// /// * `2026-04-05` - mks - stub -async fn handle_write(data: &[u8], instance_id: u32) -> Option> { - let payload: BrokerPayload = match serde_json::from_slice(data) { - Ok(p) => p, - Err(e) => { - let response = serde_json::json!({ - "status": "error", - "code": "INVALID_PAYLOAD", - "message": format!("invalid JSON payload: {}", e), - }); - return Some(response.to_string().into_bytes()); - } - }; +async fn handle_write( + envelope: BrokerRequestEnvelope, + correlation_id: &str, + instance_id: u32, + template_registry: &RuntimeTemplateRegistry, +) -> BrokerOutcome { + if !template_registry.contains_template(&envelope.template) { + return BrokerOutcome { + reply_payload: Some(error_response( + "write", + correlation_id, + "TEMPLATE_NOT_FOUND", + "requested template is not registered at runtime", + )), + ack_action: AckAction::Nack { requeue: false }, + }; + } - if !logger_store::is_logger_template(&payload.template) { + if !logger_store::is_logger_template(&envelope.template) { tracing::warn!( "wBroker[{}] write template '{}' not implemented yet", instance_id, - payload.template + envelope.template ); - return not_implemented_response(); + return BrokerOutcome { + reply_payload: Some(error_response( + "write", + correlation_id, + "NOT_IMPLEMENTED", + "only logger write is implemented in PoC step 1", + )), + ack_action: AckAction::Nack { requeue: false }, + }; } - let token = match logger_store::append_log(payload.data).await { + let token = match logger_store::append_log(envelope.payload).await { Ok(token) => token, Err(e) => { - let response = serde_json::json!({ - "status": "error", - "code": "LOGGER_STORE_UNAVAILABLE", - "message": e, - }); - return Some(response.to_string().into_bytes()); + tracing::warn!("wBroker[{}] write retryable error: {}", instance_id, e); + return BrokerOutcome { + reply_payload: None, + ack_action: AckAction::Nack { requeue: true }, + }; } }; - let response = serde_json::json!({ - "status": "ok", - "code": "LOGGER_WRITE", - "token": token, - }); - Some(response.to_string().into_bytes()) + BrokerOutcome { + reply_payload: Some(success_response( + "write", + correlation_id, + serde_json::json!({ + "code": "LOGGER_WRITE", + "token": token, + }), + )), + ack_action: AckAction::Ack, + } } /// Stub handler for `update` events. @@ -267,12 +416,16 @@ async fn handle_write(data: &[u8], instance_id: u32) -> Option> { /// # History /// /// * `2026-04-05` - mks - stub -fn handle_update(data: &[u8], instance_id: u32) -> Option> { - tracing::warn!( - "wBroker[{}] update event ({} bytes) — factory dispatch not yet implemented", - instance_id, data.len() - ); - not_implemented_response() +fn handle_update(correlation_id: &str, _instance_id: u32) -> BrokerOutcome { + BrokerOutcome { + reply_payload: Some(error_response( + "update", + correlation_id, + "NOT_IMPLEMENTED", + "factory dispatch not yet implemented", + )), + ack_action: AckAction::Nack { requeue: false }, + } } /// Stub handler for `delete` events. @@ -280,19 +433,84 @@ fn handle_update(data: &[u8], instance_id: u32) -> Option> { /// # History /// /// * `2026-04-05` - mks - stub -fn handle_delete(data: &[u8], instance_id: u32) -> Option> { - tracing::warn!( - "wBroker[{}] delete event ({} bytes) — factory dispatch not yet implemented", - instance_id, data.len() - ); - not_implemented_response() +fn handle_delete(correlation_id: &str, _instance_id: u32) -> BrokerOutcome { + BrokerOutcome { + reply_payload: Some(error_response( + "delete", + correlation_id, + "NOT_IMPLEMENTED", + "factory dispatch not yet implemented", + )), + ack_action: AckAction::Nack { requeue: false }, + } } -/// Standard NOT_IMPLEMENTED reply payload — used by all stub handlers. -fn not_implemented_response() -> Option> { - Some( - r#"{"status":"error","code":"NOT_IMPLEMENTED","message":"factory dispatch not yet implemented"}"# - .as_bytes() - .to_vec(), - ) +async fn publish_reply( + channel: &Channel, + delivery: &lapin::message::Delivery, + payload: Vec, +) { + if let Some(reply_to) = delivery.properties.reply_to().as_ref() { + let reply_queue = reply_to.as_str().to_string(); + let correlation_id = delivery.properties.correlation_id().clone(); + + let props = BasicProperties::default().with_correlation_id(correlation_id.unwrap_or_default()); + + if let Err(e) = channel + .basic_publish( + "", + &reply_queue, + BasicPublishOptions::default(), + &payload, + props, + ) + .await + { + tracing::error!("reply publish failed: {}", e); + } + } +} + +async fn apply_ack_action( + channel: &Channel, + delivery: &lapin::message::Delivery, + queues: &BrokerQueues, + action: AckAction, +) { + match action { + AckAction::Ack => { + let _ = delivery.ack(BasicAckOptions::default()).await; + } + AckAction::Nack { requeue } => { + if requeue { + let retry_publish = channel + .basic_publish( + "", + &queues.retry, + BasicPublishOptions::default(), + &delivery.data, + delivery.properties.clone(), + ) + .await; + + if retry_publish.is_ok() { + let _ = delivery.ack(BasicAckOptions::default()).await; + } else { + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: true, + }) + .await; + } + return; + } + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue, + }) + .await; + } + } } diff --git a/src/main.rs b/src/main.rs index 66eff91..6d753ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,19 +71,13 @@ async fn ipl() -> Result<(), String> { tracing::info!("Logging initialized"); // step 2b: load and validate REC templates used by runtime dispatch - match template_registry::load_runtime_rec_templates("templates") { - Ok(templates) => tracing::info!("REC templates validated: {}", templates.len()), - Err(e) => { - if cfg.id.env_name == "production" { - return Err(format!("Template validation failed: {}", e)); - } - tracing::warn!( - "Template validation failed (non-fatal in {}): {}", - cfg.id.env_name, - e - ); - } - } + let templates = template_registry::load_runtime_rec_templates("templates") + .map_err(|e| format!("Template validation failed: {}", e))?; + tracing::info!("REC templates validated: {}", templates.len()); + + let template_registry_state = std::sync::Arc::new( + template_registry::RuntimeTemplateRegistry::from_templates(templates), + ); // step 4: validate broker reachability (TCP) — required in all environments services::amqp::validate(&cfg.broker_services)?; @@ -145,11 +139,19 @@ async fn ipl() -> Result<(), String> { let _ = &amqp_conn; // keep IPL AMQP connection alive - let r_handles = brokers::spawn_r_broker_pool(Arc::clone(&shared_conn), &cfg.broker_services) + let r_handles = brokers::spawn_r_broker_pool( + Arc::clone(&shared_conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry_state), + ) .await .map_err(|e| format!("rBroker pool failed to start: {}", e))?; - let w_handles = brokers::spawn_w_broker_pool(Arc::clone(&shared_conn), &cfg.broker_services) + let w_handles = brokers::spawn_w_broker_pool( + Arc::clone(&shared_conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry_state), + ) .await .map_err(|e| format!("wBroker pool failed to start: {}", e))?; diff --git a/src/services/amqp/connection.rs b/src/services/amqp/connection.rs index 053f9b9..a2391cd 100644 --- a/src/services/amqp/connection.rs +++ b/src/services/amqp/connection.rs @@ -31,6 +31,7 @@ use super::error::AmqpError; /// The name of the BEDS topic exchange. All events in the cluster are /// published to this exchange and routed by key to their consumers. pub const EXCHANGE_NAME: &str = "beds.events"; +pub const DLX_EXCHANGE_NAME: &str = "beds.dlx"; /// An authenticated AMQP connection with an open channel. /// @@ -130,6 +131,18 @@ impl AmqpConnection { ) .await?; + self.channel + .exchange_declare( + DLX_EXCHANGE_NAME, + ExchangeKind::Topic, + ExchangeDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + Ok(()) } } diff --git a/src/services/amqp/mod.rs b/src/services/amqp/mod.rs index b65cf5e..f08f4f2 100644 --- a/src/services/amqp/mod.rs +++ b/src/services/amqp/mod.rs @@ -18,7 +18,7 @@ pub mod connection; pub mod error; -pub use connection::{AmqpConnection, EXCHANGE_NAME}; +pub use connection::{AmqpConnection, DLX_EXCHANGE_NAME, EXCHANGE_NAME}; pub use error::AmqpError; // will be used by broker pool error handling use std::net::{TcpStream, ToSocketAddrs}; diff --git a/src/template_registry/mod.rs b/src/template_registry/mod.rs index f49b16f..2511de5 100644 --- a/src/template_registry/mod.rs +++ b/src/template_registry/mod.rs @@ -39,6 +39,25 @@ pub struct RuntimeRecTemplate { pub cache_map: HashMap, } +#[derive(Debug, Clone, Default)] +pub struct RuntimeTemplateRegistry { + by_name: HashMap, +} + +impl RuntimeTemplateRegistry { + pub fn from_templates(templates: Vec) -> Self { + let mut by_name = HashMap::new(); + for t in templates { + by_name.insert(t.template_class.to_ascii_lowercase(), t); + } + Self { by_name } + } + + pub fn contains_template(&self, template: &str) -> bool { + self.by_name.contains_key(&template.to_ascii_lowercase()) + } +} + struct RecTemplateManifest { service: String, schema: String, diff --git a/tests/broker_message_flow_test.rs b/tests/broker_message_flow_test.rs index 9057113..e8b1b16 100644 --- a/tests/broker_message_flow_test.rs +++ b/tests/broker_message_flow_test.rs @@ -21,7 +21,9 @@ use lapin::{ BasicProperties, }; use rustybeds::brokers; +use rustybeds::brokers::payload::ENVELOPE_VERSION; use rustybeds::services::amqp::{AmqpConnection, EXCHANGE_NAME}; +use rustybeds::template_registry::{RuntimeTemplateRegistry, load_runtime_rec_templates}; /// Attempts to connect to the test broker. Returns None if unreachable so /// tests can skip rather than fail when RabbitMQ is not running locally. @@ -77,12 +79,20 @@ async fn ping_round_trip(channel: &lapin::Channel, routing_key: &str, expected_b .with_reply_to(reply_queue.clone().into()) .with_correlation_id(correlation_id.clone().into()); + let body = serde_json::json!({ + "version": ENVELOPE_VERSION, + "op": "ping", + "template": "Logger", + "correlation_id": correlation_id, + "payload": {}, + }); + channel .basic_publish( EXCHANGE_NAME, routing_key, BasicPublishOptions::default(), - b"{}", + &serde_json::to_vec(&body).expect("failed to encode ping envelope"), props, ) .await @@ -99,8 +109,10 @@ async fn ping_round_trip(channel: &lapin::Channel, routing_key: &str, expected_b let payload: serde_json::Value = serde_json::from_slice(&delivery.data).expect("reply payload is not valid JSON"); + assert_eq!(payload["version"], ENVELOPE_VERSION); assert_eq!(payload["status"], "ok"); - assert_eq!(payload["broker"], expected_broker); + assert_eq!(payload["op"], "ping"); + assert_eq!(payload["payload"]["broker"], expected_broker); if let Some(cid) = delivery.properties.correlation_id().as_ref() { assert_eq!(cid.as_str(), correlation_id); @@ -207,12 +219,20 @@ async fn r_and_w_brokers_process_ping_events() { .await .expect("exchange declaration failed"); - let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) + let template_registry = std::sync::Arc::new(RuntimeTemplateRegistry::from_templates( + load_runtime_rec_templates("templates").expect("template load failed"), + )); + + let mut handles = brokers::spawn_r_broker_pool( + Arc::clone(&conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry), + ) .await .expect("rBroker pool failed to start"); handles.extend( - brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) + brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services, std::sync::Arc::clone(&template_registry)) .await .expect("wBroker pool failed to start"), ); @@ -254,12 +274,20 @@ async fn logger_write_then_fetch_round_trip() { .await .expect("exchange declaration failed"); - let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) + let template_registry = std::sync::Arc::new(RuntimeTemplateRegistry::from_templates( + load_runtime_rec_templates("templates").expect("template load failed"), + )); + + let mut handles = brokers::spawn_r_broker_pool( + Arc::clone(&conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry), + ) .await .expect("rBroker pool failed to start"); handles.extend( - brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) + brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services, std::sync::Arc::clone(&template_registry)) .await .expect("wBroker pool failed to start"), ); @@ -270,8 +298,11 @@ async fn logger_write_then_fetch_round_trip() { .expect("failed to create test channel"); let write_request = serde_json::json!({ + "version": ENVELOPE_VERSION, + "op": "write", "template": "Logger", - "data": { + "correlation_id": "logger-write-1", + "payload": { "message_log": "poc-log-message", "level_log": "info", "service_log": "app_server" @@ -280,20 +311,27 @@ async fn logger_write_then_fetch_round_trip() { let write_reply = request_reply_json(&test_channel, "rec.write", "write", write_request).await; assert_eq!(write_reply["status"], "ok"); - assert_eq!(write_reply["code"], "LOGGER_WRITE"); + assert_eq!(write_reply["version"], ENVELOPE_VERSION); + assert_eq!(write_reply["op"], "write"); + assert_eq!(write_reply["payload"]["code"], "LOGGER_WRITE"); let fetch_request = serde_json::json!({ + "version": ENVELOPE_VERSION, + "op": "fetch", "template": "Logger", - "data": { + "correlation_id": "logger-fetch-1", + "payload": { "limit": 10 } }); let fetch_reply = request_reply_json(&test_channel, "rec.read", "fetch", fetch_request).await; assert_eq!(fetch_reply["status"], "ok"); - assert_eq!(fetch_reply["code"], "LOGGER_FETCH"); + assert_eq!(fetch_reply["version"], ENVELOPE_VERSION); + assert_eq!(fetch_reply["op"], "fetch"); + assert_eq!(fetch_reply["payload"]["code"], "LOGGER_FETCH"); - let logs = fetch_reply["logs"].as_array().expect("logs must be an array"); + let logs = fetch_reply["payload"]["logs"].as_array().expect("logs must be an array"); assert!( logs.iter().any(|v| v["message_log"] == "poc-log-message"), "fetched logs should include the message just written" diff --git a/tests/broker_pool_test.rs b/tests/broker_pool_test.rs index 2b135b3..98a833f 100644 --- a/tests/broker_pool_test.rs +++ b/tests/broker_pool_test.rs @@ -18,6 +18,7 @@ mod common; use std::sync::Arc; use rustybeds::brokers; use rustybeds::services::amqp::AmqpConnection; +use rustybeds::template_registry::{RuntimeTemplateRegistry, load_runtime_rec_templates}; /// Attempts to connect to the test broker. Returns None if unreachable so /// tests can skip rather than fail when the broker isn't running. @@ -40,6 +41,9 @@ async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option c, @@ -55,7 +59,11 @@ async fn r_broker_pool_spawns_configured_instances() { amqp.declare_exchange().await .expect("exchange declaration failed"); - let handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) + let handles = brokers::spawn_r_broker_pool( + Arc::clone(&conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry), + ) .await .expect("rBroker pool failed to start"); @@ -71,6 +79,9 @@ async fn r_broker_pool_spawns_configured_instances() { #[tokio::test] async fn w_broker_pool_spawns_configured_instances() { let cfg = common::load_test_config(); + let template_registry = std::sync::Arc::new(RuntimeTemplateRegistry::from_templates( + load_runtime_rec_templates("templates").expect("template load failed"), + )); let conn = match try_connect(&cfg.broker_services).await { Some(c) => c, @@ -85,7 +96,11 @@ async fn w_broker_pool_spawns_configured_instances() { amqp.declare_exchange().await .expect("exchange declaration failed"); - let handles = brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) + let handles = brokers::spawn_w_broker_pool( + Arc::clone(&conn), + &cfg.broker_services, + std::sync::Arc::clone(&template_registry), + ) .await .expect("wBroker pool failed to start"); diff --git a/wiki/06-queue-topology.md b/wiki/06-queue-topology.md index c3efe38..f5559b9 100644 --- a/wiki/06-queue-topology.md +++ b/wiki/06-queue-topology.md @@ -138,6 +138,28 @@ All BEDS queues are: This is non-negotiable for a production framework. The performance cost of persistence (disk write per message) is acceptable given the correctness guarantee. +## DLQ and Retry Topology (Implemented) + +For the active POC queues (`rec.read`, `rec.write`), BEDS now provisions: + +- Primary queue: `{tag}rec.read` / `{tag}rec.write` +- Retry queue: `{tag}rec.read.retry` / `{tag}rec.write.retry` +- Dead-letter queue: `{tag}rec.read.dlq` / `{tag}rec.write.dlq` + +Dead-letter flow: + +- Primary queues are configured with dead-letter exchange `beds.dlx`. +- Non-retryable failures (`nack requeue=false`) route to `*.dlq` via routing keys + `rec.read.dlq` and `rec.write.dlq`. + +Retry flow: + +- Retryable failures are republished to `*.retry` queues. +- Retry queues apply TTL backoff and dead-letter back to `beds.events` using the + original routing keys (`rec.read` / `rec.write`). + +This avoids tight immediate requeue loops and creates deterministic failure lanes. + ## The `vhost` Isolation Model Each environment gets its own RabbitMQ virtual host. A vhost is a completely isolated namespace — queues, exchanges, and bindings in one vhost are invisible to another. A RabbitMQ user is granted access to specific vhosts. diff --git a/wiki/10-modernization-roadmap.md b/wiki/10-modernization-roadmap.md index bd50b13..cc5f160 100644 --- a/wiki/10-modernization-roadmap.md +++ b/wiki/10-modernization-roadmap.md @@ -51,6 +51,8 @@ Guardrails are intentionally deferred until POC behavior is stable. Implementation status update: - Phase A transport stability evidence exists: live RabbitMQ round-trip tests for `rec.read` and `rec.write` ping paths. - Phase B has started: REC template registry loading and startup validation are now implemented in IPL. +- Phase B progression: runtime template registry state is now persisted and passed into broker workers for dispatch-time template validation. +- Reliability progression: deterministic ack/nack behavior and retry/DLQ queue topology are implemented for `rec.read` and `rec.write`. ## Must-Keep Invariants