milestone: add runtime template state and dlq/retry broker topology

This commit is contained in:
2026-04-06 18:41:17 -07:00
parent 836a968806
commit 516a740505
12 changed files with 838 additions and 273 deletions

View File

@@ -30,6 +30,7 @@ use std::sync::Arc;
use lapin::Connection;
use crate::config::BrokerServicesConfig;
use crate::template_registry::RuntimeTemplateRegistry;
use error::BrokerError;
/// Spawns the rBroker pool — N tasks as configured in `instances.r_broker`.
@@ -54,6 +55,7 @@ use error::BrokerError;
pub async fn spawn_r_broker_pool(
conn: Arc<Connection>,
cfg: &BrokerServicesConfig,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<Vec<tokio::task::JoinHandle<()>>, BrokerError> {
let count = cfg.app_server.instances.r_broker;
let mut handles = Vec::with_capacity(count as usize);
@@ -63,6 +65,7 @@ pub async fn spawn_r_broker_pool(
Arc::clone(&conn),
cfg.queue_tag.clone(),
i,
Arc::clone(&template_registry),
).await?;
handles.push(handle);
}
@@ -89,6 +92,7 @@ pub async fn spawn_r_broker_pool(
pub async fn spawn_w_broker_pool(
conn: Arc<Connection>,
cfg: &BrokerServicesConfig,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<Vec<tokio::task::JoinHandle<()>>, BrokerError> {
let count = cfg.app_server.instances.w_broker;
let mut handles = Vec::with_capacity(count as usize);
@@ -98,6 +102,7 @@ pub async fn spawn_w_broker_pool(
Arc::clone(&conn),
cfg.queue_tag.clone(),
i,
Arc::clone(&template_registry),
).await?;
handles.push(handle);
}

View File

@@ -1,69 +1,86 @@
//! # brokers/payload.rs — AMQP Message Payload
//! # brokers/payload.rs — AMQP Envelope Contracts
//!
//! Defines the JSON body structure carried in all BEDS broker messages.
//! The AMQP envelope handles routing (type header, reply_to, correlation_id);
//! this struct is what lives in the message body.
//!
//! ## Wire Format
//!
//! ```json
//! {
//! "template": "usr",
//! "data": { "first_name": "joe", "status": "active" }
//! }
//! ```
//!
//! `template` names the data object (maps to a NamasteCore implementor).
//! `data` carries key/value pairs in user-facing field names — the template
//! maps these to actual schema names. Callers never specify primary keys on
//! writes; the template generates a GUID and returns it in the reply.
//!
//! ## Calling Agents
//! - `brokers::r_broker` — parsed from message body on fetch events
//! - `brokers::w_broker` — parsed from message body on write/update/delete events
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-05` - mks - original coding
//! Strict 1.0 request/response contracts for broker message bodies.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// The JSON body of every BEDS broker message.
///
/// # Examples
///
/// ```
/// use rustybeds::brokers::payload::BrokerPayload;
///
/// let json = r#"{"template":"usr","data":{"first_name":"joe"}}"#;
/// let payload: BrokerPayload = serde_json::from_str(json).unwrap();
/// assert_eq!(payload.template, "usr");
/// assert!(payload.data.contains_key("first_name"));
/// ```
///
/// Both read and write brokers parse this struct from the raw AMQP delivery
/// bytes. The operation type is carried in the AMQP `type` message property —
/// this struct carries the object identity and data payload.
///
/// # History
///
/// * `2026-04-05` - mks - original coding
pub const ENVELOPE_VERSION: &str = "1.0";
#[derive(Debug, Deserialize, Serialize)]
pub struct BrokerPayload {
/// Template identifier — names the NamasteCore implementor to dispatch to.
/// Matches the TLA convention from the template file (e.g. `"usr"`, `"pst"`).
pub struct BrokerRequestEnvelope {
pub version: String,
pub op: String,
pub template: String,
/// Key/value data pairs in user-facing field names.
/// For writes: the record to store (pkey excluded — generated by template).
/// For reads: query discriminants (field → value to match).
/// For deletes: discriminants identifying the record(s) to remove.
#[serde(default)]
pub data: HashMap<String, Value>,
pub correlation_id: String,
#[serde(default)]
pub payload: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct BrokerResponseEnvelope {
pub version: String,
pub op: String,
pub correlation_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
pub payload: Value,
}
pub fn parse_request(bytes: &[u8]) -> Result<BrokerRequestEnvelope, String> {
let env: BrokerRequestEnvelope =
serde_json::from_slice(bytes).map_err(|e| format!("invalid envelope JSON: {}", e))?;
if env.version != ENVELOPE_VERSION {
return Err(format!(
"unsupported envelope version '{}', expected '{}'",
env.version, ENVELOPE_VERSION
));
}
if env.op.trim().is_empty() {
return Err("missing required field 'op'".to_string());
}
if env.template.trim().is_empty() {
return Err("missing required field 'template'".to_string());
}
Ok(env)
}
pub fn success_response(op: &str, correlation_id: &str, payload: Value) -> Vec<u8> {
let response = BrokerResponseEnvelope {
version: ENVELOPE_VERSION.to_string(),
op: op.to_string(),
correlation_id: correlation_id.to_string(),
status: "ok".to_string(),
error_code: None,
message: None,
payload,
};
serde_json::to_vec(&response).unwrap_or_else(|_| b"{}".to_vec())
}
pub fn error_response(op: &str, correlation_id: &str, code: &str, message: &str) -> Vec<u8> {
let response = BrokerResponseEnvelope {
version: ENVELOPE_VERSION.to_string(),
op: op.to_string(),
correlation_id: correlation_id.to_string(),
status: "error".to_string(),
error_code: Some(code.to_string()),
message: Some(message.to_string()),
payload: Value::Object(serde_json::Map::new()),
};
serde_json::to_vec(&response).unwrap_or_else(|_| b"{}".to_vec())
}
#[cfg(test)]
@@ -71,38 +88,38 @@ mod tests {
use super::*;
#[test]
fn deserializes_full_payload() {
let json = r#"{"template":"usr","data":{"first_name":"joe","status":"active"}}"#;
let payload: BrokerPayload = serde_json::from_str(json).unwrap();
assert_eq!(payload.template, "usr");
assert_eq!(payload.data.len(), 2);
assert_eq!(payload.data["first_name"], "joe");
assert_eq!(payload.data["status"], "active");
fn parses_valid_request_envelope() {
let json = r#"{"version":"1.0","op":"fetch","template":"Logger","correlation_id":"c1","payload":{"limit":10}}"#;
let env = parse_request(json.as_bytes()).expect("parse should succeed");
assert_eq!(env.version, "1.0");
assert_eq!(env.op, "fetch");
assert_eq!(env.template, "Logger");
assert_eq!(env.correlation_id, "c1");
assert_eq!(env.payload["limit"], 10);
}
#[test]
fn deserializes_without_data_field() {
// data is optional — fetch by template name alone is valid
let json = r#"{"template":"usr"}"#;
let payload: BrokerPayload = serde_json::from_str(json).unwrap();
assert_eq!(payload.template, "usr");
assert!(payload.data.is_empty());
fn rejects_legacy_unversioned_shape() {
let json = r#"{"template":"Logger","data":{"limit":10}}"#;
let err = parse_request(json.as_bytes()).expect_err("parse should fail");
assert!(err.contains("invalid envelope JSON") || err.contains("missing required"));
}
#[test]
fn serializes_round_trip() {
let json = r#"{"template":"pst","data":{"title":"hello"}}"#;
let payload: BrokerPayload = serde_json::from_str(json).unwrap();
let serialized = serde_json::to_string(&payload).unwrap();
let round_trip: BrokerPayload = serde_json::from_str(&serialized).unwrap();
assert_eq!(round_trip.template, payload.template);
assert_eq!(round_trip.data["title"], payload.data["title"]);
fn rejects_wrong_version() {
let json = r#"{"version":"0.9","op":"fetch","template":"Logger","payload":{}}"#;
let err = parse_request(json.as_bytes()).expect_err("parse should fail");
assert!(err.contains("unsupported envelope version"));
}
#[test]
fn rejects_missing_template() {
let json = r#"{"data":{"first_name":"joe"}}"#;
let result: Result<BrokerPayload, _> = serde_json::from_str(json);
assert!(result.is_err());
fn builds_error_response_envelope() {
let bytes = error_response("write", "c123", "INVALID_PAYLOAD", "bad");
let env: BrokerResponseEnvelope = serde_json::from_slice(&bytes).expect("response should parse");
assert_eq!(env.version, ENVELOPE_VERSION);
assert_eq!(env.op, "write");
assert_eq!(env.correlation_id, "c123");
assert_eq!(env.status, "error");
assert_eq!(env.error_code.as_deref(), Some("INVALID_PAYLOAD"));
}
}

View File

@@ -37,20 +37,42 @@ use futures_lite::StreamExt;
use lapin::{
BasicProperties, Channel, Connection,
options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions,
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
QueueBindOptions, QueueDeclareOptions,
},
types::FieldTable,
types::{AMQPValue, FieldTable, LongString, ShortString},
};
use crate::services::amqp::EXCHANGE_NAME;
use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME};
use crate::brokers::logger_store;
use crate::brokers::payload::BrokerPayload;
use crate::brokers::payload::{
BrokerRequestEnvelope,
error_response,
parse_request,
success_response,
};
use crate::template_registry::RuntimeTemplateRegistry;
use super::error::BrokerError;
/// Routing key this broker binds to.
const ROUTING_KEY: &str = "rec.read";
enum AckAction {
Ack,
Nack { requeue: bool },
}
struct BrokerOutcome {
reply_payload: Option<Vec<u8>>,
ack_action: AckAction,
}
#[derive(Clone)]
struct BrokerQueues {
primary: String,
retry: String,
}
/// Spawns a single rBroker task and returns immediately.
///
/// The task runs until it receives a `shutdown` event or the AMQP connection
@@ -74,11 +96,24 @@ pub async fn spawn(
conn: Arc<Connection>,
queue_tag: String,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<tokio::task::JoinHandle<()>, BrokerError> {
// each broker task owns its own channel — channels are cheap, connections are not
let channel = conn.create_channel().await?;
let queue_name = format!("{}rec.read", queue_tag);
let retry_queue_name = format!("{}rec.read.retry", queue_tag);
let dlq_queue_name = format!("{}rec.read.dlq", queue_tag);
let mut primary_args = FieldTable::default();
primary_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)),
);
primary_args.insert(
ShortString::from("x-dead-letter-routing-key"),
AMQPValue::LongString(LongString::from("rec.read.dlq")),
);
// declare the queue — idempotent; safe to call on restart
channel
@@ -88,6 +123,52 @@ pub async fn spawn(
durable: true,
..Default::default()
},
primary_args,
)
.await?;
let mut retry_args = FieldTable::default();
retry_args.insert(
ShortString::from("x-message-ttl"),
AMQPValue::LongUInt(5_000),
);
retry_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(EXCHANGE_NAME)),
);
retry_args.insert(
ShortString::from("x-dead-letter-routing-key"),
AMQPValue::LongString(LongString::from(ROUTING_KEY)),
);
channel
.queue_declare(
&retry_queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
retry_args,
)
.await?;
channel
.queue_declare(
&dlq_queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
channel
.queue_bind(
&dlq_queue_name,
DLX_EXCHANGE_NAME,
"rec.read.dlq",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
@@ -106,7 +187,11 @@ pub async fn spawn(
tracing::info!("rBroker[{}] queue '{}' declared and bound", instance_id, queue_name);
let handle = tokio::spawn(async move {
if let Err(e) = run(channel, queue_name, instance_id).await {
let queues = BrokerQueues {
primary: queue_name,
retry: retry_queue_name,
};
if let Err(e) = run(channel, queues, instance_id, template_registry).await {
tracing::error!("rBroker[{}] exited with error: {}", instance_id, e);
}
});
@@ -126,21 +211,22 @@ pub async fn spawn(
/// * `2026-04-05` - mks - original coding
async fn run(
channel: Channel,
queue_name: String,
queues: BrokerQueues,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<(), BrokerError> {
let consumer_tag = format!("rbroker-{}", instance_id);
let mut consumer = channel
.basic_consume(
&queue_name,
&queues.primary,
&consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
tracing::info!("rBroker[{}] consuming on '{}'", instance_id, queue_name);
tracing::info!("rBroker[{}] consuming on '{}'", instance_id, queues.primary);
while let Some(delivery) = consumer.next().await {
let delivery = match delivery {
@@ -151,7 +237,6 @@ async fn run(
}
};
// extract the event type from the message type header
let event_type = delivery
.properties
.kind()
@@ -159,48 +244,89 @@ async fn run(
.map(|s| s.as_str().to_string())
.unwrap_or_default();
let header_correlation_id = delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.as_str().to_string())
.unwrap_or_default();
tracing::debug!("rBroker[{}] received event='{}'", instance_id, event_type);
let reply_payload: Option<Vec<u8>> = match event_type.as_str() {
"ping" => handle_ping(instance_id),
let envelope = match parse_request(&delivery.data) {
Ok(env) => env,
Err(e) => {
let payload = error_response(
"unknown",
&header_correlation_id,
"INVALID_ENVELOPE",
&e,
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
};
let correlation_id = if envelope.correlation_id.trim().is_empty() {
header_correlation_id
} else {
envelope.correlation_id.clone()
};
if !event_type.is_empty() && event_type != envelope.op {
let payload = error_response(
&envelope.op,
&correlation_id,
"OP_MISMATCH",
"AMQP type header does not match envelope op",
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
let outcome = match envelope.op.as_str() {
"ping" => BrokerOutcome {
reply_payload: Some(handle_ping(&correlation_id, instance_id)),
ack_action: AckAction::Ack,
},
"shutdown" => {
// ack before exiting so the message is not redelivered
let _ = delivery.ack(BasicAckOptions::default()).await;
tracing::info!("rBroker[{}] shutdown event received — exiting", instance_id);
break;
}
"fetch" => handle_fetch(&delivery.data, instance_id).await,
"fetch" => handle_fetch(envelope, &correlation_id, instance_id, &template_registry).await,
unknown => {
tracing::warn!("rBroker[{}] unknown event type '{}'", instance_id, unknown);
None
BrokerOutcome {
reply_payload: Some(error_response(
unknown,
&correlation_id,
"UNSUPPORTED_OPERATION",
"unsupported operation for rec.read",
)),
ack_action: AckAction::Nack { requeue: false },
}
}
};
// publish reply if the event specified a reply_to queue
if let Some(payload) = reply_payload {
if let Some(reply_to) = delivery.properties.reply_to().as_ref() {
let reply_queue = reply_to.as_str().to_string();
let correlation_id = delivery.properties.correlation_id().clone();
let props = BasicProperties::default()
.with_correlation_id(correlation_id.unwrap_or_default());
if let Err(e) = channel
.basic_publish(
"", // default exchange — direct to queue by name
&reply_queue,
BasicPublishOptions::default(),
&payload,
props,
)
.await
{
tracing::error!("rBroker[{}] reply publish failed: {}", instance_id, e);
}
}
if let Some(payload) = outcome.reply_payload {
publish_reply(&channel, &delivery, payload).await;
}
let _ = delivery.ack(BasicAckOptions::default()).await;
apply_ack_action(&channel, &delivery, &queues, outcome.ack_action).await;
}
tracing::info!("rBroker[{}] consume loop exited", instance_id);
@@ -212,15 +338,21 @@ async fn run(
/// # History
///
/// * `2026-04-05` - mks - original coding
fn handle_ping(instance_id: u32) -> Option<Vec<u8>> {
fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec<u8> {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let response = format!(r#"{{"status":"ok","broker":"rBroker","instance":{},"ts":{}}}"#, instance_id, ts);
tracing::debug!("rBroker[{}] ping response: {}", instance_id, response);
Some(response.into_bytes())
success_response(
"ping",
correlation_id,
serde_json::json!({
"broker": "rBroker",
"instance": instance_id,
"ts": ts,
}),
)
}
/// Stub handler for `fetch` events.
@@ -231,35 +363,43 @@ fn handle_ping(instance_id: u32) -> Option<Vec<u8>> {
/// # History
///
/// * `2026-04-05` - mks - stub
async fn handle_fetch(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
let payload: BrokerPayload = match serde_json::from_slice(data) {
Ok(p) => p,
Err(e) => {
let response = serde_json::json!({
"status": "error",
"code": "INVALID_PAYLOAD",
"message": format!("invalid JSON payload: {}", e),
});
return Some(response.to_string().into_bytes());
}
};
async fn handle_fetch(
envelope: BrokerRequestEnvelope,
correlation_id: &str,
instance_id: u32,
template_registry: &RuntimeTemplateRegistry,
) -> BrokerOutcome {
if !template_registry.contains_template(&envelope.template) {
return BrokerOutcome {
reply_payload: Some(error_response(
"fetch",
correlation_id,
"TEMPLATE_NOT_FOUND",
"requested template is not registered at runtime",
)),
ack_action: AckAction::Nack { requeue: false },
};
}
if !logger_store::is_logger_template(&payload.template) {
if !logger_store::is_logger_template(&envelope.template) {
tracing::warn!(
"rBroker[{}] fetch template '{}' not implemented yet",
instance_id,
payload.template
envelope.template
);
let response = serde_json::json!({
"status": "error",
"code": "NOT_IMPLEMENTED",
"message": "only logger fetch is implemented in PoC step 1",
});
return Some(response.to_string().into_bytes());
return BrokerOutcome {
reply_payload: Some(error_response(
"fetch",
correlation_id,
"NOT_IMPLEMENTED",
"only logger fetch is implemented in PoC step 1",
)),
ack_action: AckAction::Nack { requeue: false },
};
}
let limit = payload
.data
let limit = envelope
.payload
.get("limit")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
@@ -268,19 +408,93 @@ async fn handle_fetch(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
let logs = match logger_store::fetch_recent(limit).await {
Ok(items) => items,
Err(e) => {
let response = serde_json::json!({
"status": "error",
"code": "LOGGER_STORE_UNAVAILABLE",
"message": e,
});
return Some(response.to_string().into_bytes());
tracing::warn!("rBroker[{}] fetch retryable error: {}", instance_id, e);
return BrokerOutcome {
reply_payload: None,
ack_action: AckAction::Nack { requeue: true },
};
}
};
let response = serde_json::json!({
"status": "ok",
"code": "LOGGER_FETCH",
"count": logs.len(),
"logs": logs,
});
Some(response.to_string().into_bytes())
BrokerOutcome {
reply_payload: Some(success_response(
"fetch",
correlation_id,
serde_json::json!({
"code": "LOGGER_FETCH",
"count": logs.len(),
"logs": logs,
}),
)),
ack_action: AckAction::Ack,
}
}
async fn publish_reply(
channel: &Channel,
delivery: &lapin::message::Delivery,
payload: Vec<u8>,
) {
if let Some(reply_to) = delivery.properties.reply_to().as_ref() {
let reply_queue = reply_to.as_str().to_string();
let correlation_id = delivery.properties.correlation_id().clone();
let props = BasicProperties::default().with_correlation_id(correlation_id.unwrap_or_default());
if let Err(e) = channel
.basic_publish(
"",
&reply_queue,
BasicPublishOptions::default(),
&payload,
props,
)
.await
{
tracing::error!("reply publish failed: {}", e);
}
}
}
async fn apply_ack_action(
channel: &Channel,
delivery: &lapin::message::Delivery,
queues: &BrokerQueues,
action: AckAction,
) {
match action {
AckAction::Ack => {
let _ = delivery.ack(BasicAckOptions::default()).await;
}
AckAction::Nack { requeue } => {
if requeue {
let retry_publish = channel
.basic_publish(
"",
&queues.retry,
BasicPublishOptions::default(),
&delivery.data,
delivery.properties.clone(),
)
.await;
if retry_publish.is_ok() {
let _ = delivery.ack(BasicAckOptions::default()).await;
} else {
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: true,
})
.await;
}
return;
}
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue,
})
.await;
}
}
}

View File

@@ -37,20 +37,42 @@ use futures_lite::StreamExt;
use lapin::{
BasicProperties, Channel, Connection,
options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions,
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
QueueBindOptions, QueueDeclareOptions,
},
types::FieldTable,
types::{AMQPValue, FieldTable, LongString, ShortString},
};
use crate::services::amqp::EXCHANGE_NAME;
use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME};
use crate::brokers::logger_store;
use crate::brokers::payload::BrokerPayload;
use crate::brokers::payload::{
BrokerRequestEnvelope,
error_response,
parse_request,
success_response,
};
use crate::template_registry::RuntimeTemplateRegistry;
use super::error::BrokerError;
/// Routing key this broker binds to.
const ROUTING_KEY: &str = "rec.write";
enum AckAction {
Ack,
Nack { requeue: bool },
}
struct BrokerOutcome {
reply_payload: Option<Vec<u8>>,
ack_action: AckAction,
}
#[derive(Clone)]
struct BrokerQueues {
primary: String,
retry: String,
}
/// Spawns a single wBroker task and returns immediately.
///
/// # Arguments
@@ -71,9 +93,22 @@ pub async fn spawn(
conn: Arc<Connection>,
queue_tag: String,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<tokio::task::JoinHandle<()>, BrokerError> {
let channel = conn.create_channel().await?;
let queue_name = format!("{}rec.write", queue_tag);
let retry_queue_name = format!("{}rec.write.retry", queue_tag);
let dlq_queue_name = format!("{}rec.write.dlq", queue_tag);
let mut primary_args = FieldTable::default();
primary_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)),
);
primary_args.insert(
ShortString::from("x-dead-letter-routing-key"),
AMQPValue::LongString(LongString::from("rec.write.dlq")),
);
channel
.queue_declare(
@@ -82,6 +117,52 @@ pub async fn spawn(
durable: true,
..Default::default()
},
primary_args,
)
.await?;
let mut retry_args = FieldTable::default();
retry_args.insert(
ShortString::from("x-message-ttl"),
AMQPValue::LongUInt(5_000),
);
retry_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(EXCHANGE_NAME)),
);
retry_args.insert(
ShortString::from("x-dead-letter-routing-key"),
AMQPValue::LongString(LongString::from(ROUTING_KEY)),
);
channel
.queue_declare(
&retry_queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
retry_args,
)
.await?;
channel
.queue_declare(
&dlq_queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
channel
.queue_bind(
&dlq_queue_name,
DLX_EXCHANGE_NAME,
"rec.write.dlq",
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
@@ -99,7 +180,11 @@ pub async fn spawn(
tracing::info!("wBroker[{}] queue '{}' declared and bound", instance_id, queue_name);
let handle = tokio::spawn(async move {
if let Err(e) = run(channel, queue_name, instance_id).await {
let queues = BrokerQueues {
primary: queue_name,
retry: retry_queue_name,
};
if let Err(e) = run(channel, queues, instance_id, template_registry).await {
tracing::error!("wBroker[{}] exited with error: {}", instance_id, e);
}
});
@@ -117,21 +202,22 @@ pub async fn spawn(
/// * `2026-04-05` - mks - original coding
async fn run(
channel: Channel,
queue_name: String,
queues: BrokerQueues,
instance_id: u32,
template_registry: Arc<RuntimeTemplateRegistry>,
) -> Result<(), BrokerError> {
let consumer_tag = format!("wbroker-{}", instance_id);
let mut consumer = channel
.basic_consume(
&queue_name,
&queues.primary,
&consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queue_name);
tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queues.primary);
while let Some(delivery) = consumer.next().await {
let delivery = match delivery {
@@ -149,48 +235,90 @@ async fn run(
.map(|s| s.as_str().to_string())
.unwrap_or_default();
let header_correlation_id = delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.as_str().to_string())
.unwrap_or_default();
tracing::debug!("wBroker[{}] received event='{}'", instance_id, event_type);
let reply_payload: Option<Vec<u8>> = match event_type.as_str() {
"ping" => handle_ping(instance_id),
let envelope = match parse_request(&delivery.data) {
Ok(env) => env,
Err(e) => {
let payload = error_response(
"unknown",
&header_correlation_id,
"INVALID_ENVELOPE",
&e,
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
};
let correlation_id = if envelope.correlation_id.trim().is_empty() {
header_correlation_id
} else {
envelope.correlation_id.clone()
};
if !event_type.is_empty() && event_type != envelope.op {
let payload = error_response(
&envelope.op,
&correlation_id,
"OP_MISMATCH",
"AMQP type header does not match envelope op",
);
publish_reply(&channel, &delivery, payload).await;
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: false,
})
.await;
continue;
}
let outcome = match envelope.op.as_str() {
"ping" => BrokerOutcome {
reply_payload: Some(handle_ping(&correlation_id, instance_id)),
ack_action: AckAction::Ack,
},
"shutdown" => {
let _ = delivery.ack(BasicAckOptions::default()).await;
tracing::info!("wBroker[{}] shutdown event received — exiting", instance_id);
break;
}
"write" => handle_write(&delivery.data, instance_id).await,
"update" => handle_update(&delivery.data, instance_id),
"delete" => handle_delete(&delivery.data, instance_id),
"write" => handle_write(envelope, &correlation_id, instance_id, &template_registry).await,
"update" => handle_update(&correlation_id, instance_id),
"delete" => handle_delete(&correlation_id, instance_id),
unknown => {
tracing::warn!("wBroker[{}] unknown event type '{}'", instance_id, unknown);
None
BrokerOutcome {
reply_payload: Some(error_response(
unknown,
&correlation_id,
"UNSUPPORTED_OPERATION",
"unsupported operation for rec.write",
)),
ack_action: AckAction::Nack { requeue: false },
}
}
};
if let Some(payload) = reply_payload {
if let Some(reply_to) = delivery.properties.reply_to().as_ref() {
let reply_queue = reply_to.as_str().to_string();
let correlation_id = delivery.properties.correlation_id().clone();
let props = BasicProperties::default()
.with_correlation_id(correlation_id.unwrap_or_default());
if let Err(e) = channel
.basic_publish(
"",
&reply_queue,
BasicPublishOptions::default(),
&payload,
props,
)
.await
{
tracing::error!("wBroker[{}] reply publish failed: {}", instance_id, e);
}
}
if let Some(payload) = outcome.reply_payload {
publish_reply(&channel, &delivery, payload).await;
}
let _ = delivery.ack(BasicAckOptions::default()).await;
apply_ack_action(&channel, &delivery, &queues, outcome.ack_action).await;
}
tracing::info!("wBroker[{}] consume loop exited", instance_id);
@@ -202,17 +330,21 @@ async fn run(
/// # History
///
/// * `2026-04-05` - mks - original coding
fn handle_ping(instance_id: u32) -> Option<Vec<u8>> {
fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec<u8> {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let response = format!(
r#"{{"status":"ok","broker":"wBroker","instance":{},"ts":{}}}"#,
instance_id, ts
);
Some(response.into_bytes())
success_response(
"ping",
correlation_id,
serde_json::json!({
"broker": "wBroker",
"instance": instance_id,
"ts": ts,
}),
)
}
/// Stub handler for `write` events.
@@ -220,46 +352,63 @@ fn handle_ping(instance_id: u32) -> Option<Vec<u8>> {
/// # History
///
/// * `2026-04-05` - mks - stub
async fn handle_write(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
let payload: BrokerPayload = match serde_json::from_slice(data) {
Ok(p) => p,
Err(e) => {
let response = serde_json::json!({
"status": "error",
"code": "INVALID_PAYLOAD",
"message": format!("invalid JSON payload: {}", e),
});
return Some(response.to_string().into_bytes());
}
};
async fn handle_write(
envelope: BrokerRequestEnvelope,
correlation_id: &str,
instance_id: u32,
template_registry: &RuntimeTemplateRegistry,
) -> BrokerOutcome {
if !template_registry.contains_template(&envelope.template) {
return BrokerOutcome {
reply_payload: Some(error_response(
"write",
correlation_id,
"TEMPLATE_NOT_FOUND",
"requested template is not registered at runtime",
)),
ack_action: AckAction::Nack { requeue: false },
};
}
if !logger_store::is_logger_template(&payload.template) {
if !logger_store::is_logger_template(&envelope.template) {
tracing::warn!(
"wBroker[{}] write template '{}' not implemented yet",
instance_id,
payload.template
envelope.template
);
return not_implemented_response();
return BrokerOutcome {
reply_payload: Some(error_response(
"write",
correlation_id,
"NOT_IMPLEMENTED",
"only logger write is implemented in PoC step 1",
)),
ack_action: AckAction::Nack { requeue: false },
};
}
let token = match logger_store::append_log(payload.data).await {
let token = match logger_store::append_log(envelope.payload).await {
Ok(token) => token,
Err(e) => {
let response = serde_json::json!({
"status": "error",
"code": "LOGGER_STORE_UNAVAILABLE",
"message": e,
});
return Some(response.to_string().into_bytes());
tracing::warn!("wBroker[{}] write retryable error: {}", instance_id, e);
return BrokerOutcome {
reply_payload: None,
ack_action: AckAction::Nack { requeue: true },
};
}
};
let response = serde_json::json!({
"status": "ok",
"code": "LOGGER_WRITE",
"token": token,
});
Some(response.to_string().into_bytes())
BrokerOutcome {
reply_payload: Some(success_response(
"write",
correlation_id,
serde_json::json!({
"code": "LOGGER_WRITE",
"token": token,
}),
)),
ack_action: AckAction::Ack,
}
}
/// Stub handler for `update` events.
@@ -267,12 +416,16 @@ async fn handle_write(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
/// # History
///
/// * `2026-04-05` - mks - stub
fn handle_update(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
tracing::warn!(
"wBroker[{}] update event ({} bytes) — factory dispatch not yet implemented",
instance_id, data.len()
);
not_implemented_response()
fn handle_update(correlation_id: &str, _instance_id: u32) -> BrokerOutcome {
BrokerOutcome {
reply_payload: Some(error_response(
"update",
correlation_id,
"NOT_IMPLEMENTED",
"factory dispatch not yet implemented",
)),
ack_action: AckAction::Nack { requeue: false },
}
}
/// Stub handler for `delete` events.
@@ -280,19 +433,84 @@ fn handle_update(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
/// # History
///
/// * `2026-04-05` - mks - stub
fn handle_delete(data: &[u8], instance_id: u32) -> Option<Vec<u8>> {
tracing::warn!(
"wBroker[{}] delete event ({} bytes) — factory dispatch not yet implemented",
instance_id, data.len()
);
not_implemented_response()
fn handle_delete(correlation_id: &str, _instance_id: u32) -> BrokerOutcome {
BrokerOutcome {
reply_payload: Some(error_response(
"delete",
correlation_id,
"NOT_IMPLEMENTED",
"factory dispatch not yet implemented",
)),
ack_action: AckAction::Nack { requeue: false },
}
}
/// Standard NOT_IMPLEMENTED reply payload — used by all stub handlers.
fn not_implemented_response() -> Option<Vec<u8>> {
Some(
r#"{"status":"error","code":"NOT_IMPLEMENTED","message":"factory dispatch not yet implemented"}"#
.as_bytes()
.to_vec(),
)
async fn publish_reply(
channel: &Channel,
delivery: &lapin::message::Delivery,
payload: Vec<u8>,
) {
if let Some(reply_to) = delivery.properties.reply_to().as_ref() {
let reply_queue = reply_to.as_str().to_string();
let correlation_id = delivery.properties.correlation_id().clone();
let props = BasicProperties::default().with_correlation_id(correlation_id.unwrap_or_default());
if let Err(e) = channel
.basic_publish(
"",
&reply_queue,
BasicPublishOptions::default(),
&payload,
props,
)
.await
{
tracing::error!("reply publish failed: {}", e);
}
}
}
async fn apply_ack_action(
channel: &Channel,
delivery: &lapin::message::Delivery,
queues: &BrokerQueues,
action: AckAction,
) {
match action {
AckAction::Ack => {
let _ = delivery.ack(BasicAckOptions::default()).await;
}
AckAction::Nack { requeue } => {
if requeue {
let retry_publish = channel
.basic_publish(
"",
&queues.retry,
BasicPublishOptions::default(),
&delivery.data,
delivery.properties.clone(),
)
.await;
if retry_publish.is_ok() {
let _ = delivery.ack(BasicAckOptions::default()).await;
} else {
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue: true,
})
.await;
}
return;
}
let _ = delivery
.nack(BasicNackOptions {
multiple: false,
requeue,
})
.await;
}
}
}