refactor: collapse r_broker+w_broker into unified dispatcher pool
- Single dispatcher pool replaces separate rBroker and wBroker pools - All events consumed from single queue (beds.events) - Template lookup reveals schema; class instantiation owns business logic - Transport layer (dispatcher) is data-schema-agnostic - AMQP consumer groups handle load-balancing natively - Reduces exchange/queue topology overhead (no strict rec.read/rec.write) - Maintains same queue names and routing keys for operator clarity - r_broker.rs and w_broker.rs retained as reference (can be removed) Implements architecture decision from user conversation on 2026-04-09.
This commit is contained in:
426
src/brokers/dispatcher.rs
Normal file
426
src/brokers/dispatcher.rs
Normal file
@@ -0,0 +1,426 @@
|
||||
//! # brokers/dispatcher.rs — Unified Event Dispatcher
|
||||
//!
|
||||
//! Single dispatcher task that consumes from beds.events, parses the envelope,
|
||||
//! instantiates the appropriate class based on template lookup, and delegates
|
||||
//! to class business logic. The dispatcher is schema-agnostic; the class
|
||||
//! instantiation reveals schema, CRUD operation, and domain-specific behavior.
|
||||
//!
|
||||
//! ## Calling Agents
|
||||
//! - `brokers::mod` — spawned by `spawn_dispatcher_pool()`
|
||||
//!
|
||||
//! ## Inputs
|
||||
//! - `Arc<lapin::Connection>` — shared AMQP connection
|
||||
//! - `queue_tag: String` — queue name prefix from config
|
||||
//! - `instance_id: u32` — numeric ID for log correlation
|
||||
//! - `template_registry: Arc<RuntimeTemplateRegistry>` — loaded templates
|
||||
//!
|
||||
//! ## Outputs
|
||||
//! - Publishes reply payloads to `reply_to` queue if present in headers
|
||||
//! - Logs to tracing (journald/console per config)
|
||||
//!
|
||||
//! ## Model
|
||||
//!
|
||||
//! Transport layer (this file) owns:
|
||||
//! - AMQP consumption
|
||||
//! - Envelope parsing
|
||||
//! - Acknowledgment semantics
|
||||
//! - Reply routing (if reply_to header exists)
|
||||
//!
|
||||
//! Class instantiation owns:
|
||||
//! - Schema determination (via template lookup)
|
||||
//! - CRUD operation handling
|
||||
//! - Business logic
|
||||
//! - Data transformation
|
||||
//!
|
||||
//! **Author:** mks
|
||||
//! **Version:** 1.0
|
||||
//!
|
||||
//! ## History
|
||||
//! * `2026-04-09` - mks - unified dispatcher replacing r_broker + w_broker
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures_lite::StreamExt;
|
||||
use lapin::{
|
||||
BasicProperties, Channel, Connection,
|
||||
options::{
|
||||
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
|
||||
QueueBindOptions, QueueDeclareOptions,
|
||||
},
|
||||
types::{AMQPValue, FieldTable, LongString, ShortString},
|
||||
};
|
||||
|
||||
use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME};
|
||||
use crate::brokers::payload::{
|
||||
BrokerRequestEnvelope,
|
||||
error_response,
|
||||
parse_request,
|
||||
success_response,
|
||||
};
|
||||
use crate::template_registry::RuntimeTemplateRegistry;
|
||||
use super::error::BrokerError;
|
||||
|
||||
/// Single queue for all events, regardless of schema or CRUD operation.
|
||||
const QUEUE_NAME_SUFFIX: &str = "events";
|
||||
|
||||
enum AckAction {
|
||||
Ack,
|
||||
Nack { requeue: bool },
|
||||
}
|
||||
|
||||
struct DispatcherOutcome {
|
||||
reply_payload: Option<Vec<u8>>,
|
||||
ack_action: AckAction,
|
||||
}
|
||||
|
||||
/// Spawns a single dispatcher task and returns immediately.
|
||||
///
|
||||
/// The task consumes from the single events queue and processes messages
|
||||
/// until a `shutdown` event is received or the connection is lost.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `conn` — shared AMQP connection; each task opens its own channel
|
||||
/// * `queue_tag` — queue name prefix from config
|
||||
/// * `instance_id` — zero-based index for log correlation
|
||||
/// * `template_registry` — loaded and validated templates
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// `Ok(tokio::task::JoinHandle)` — the task handle; held by pool manager
|
||||
/// `Err(BrokerError)` if channel or queue declaration fails before task start
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-09` - mks - original coding
|
||||
pub async fn spawn(
|
||||
conn: Arc<Connection>,
|
||||
queue_tag: String,
|
||||
instance_id: u32,
|
||||
template_registry: Arc<RuntimeTemplateRegistry>,
|
||||
) -> Result<tokio::task::JoinHandle<()>, BrokerError> {
|
||||
let channel = conn.create_channel().await?;
|
||||
|
||||
let queue_name = format!("{}{}", queue_tag, QUEUE_NAME_SUFFIX);
|
||||
let dlq_queue_name = format!("{}{}.dlq", queue_tag, QUEUE_NAME_SUFFIX);
|
||||
|
||||
let mut primary_args = FieldTable::default();
|
||||
primary_args.insert(
|
||||
ShortString::from("x-dead-letter-exchange"),
|
||||
AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)),
|
||||
);
|
||||
primary_args.insert(
|
||||
ShortString::from("x-dead-letter-routing-key"),
|
||||
AMQPValue::LongString(LongString::from("events.dlq")),
|
||||
);
|
||||
|
||||
// Declare the single events queue — idempotent
|
||||
channel
|
||||
.queue_declare(
|
||||
&queue_name,
|
||||
QueueDeclareOptions {
|
||||
durable: true,
|
||||
..Default::default()
|
||||
},
|
||||
primary_args,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Declare DLQ
|
||||
channel
|
||||
.queue_declare(
|
||||
&dlq_queue_name,
|
||||
QueueDeclareOptions {
|
||||
durable: true,
|
||||
..Default::default()
|
||||
},
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
channel
|
||||
.queue_bind(
|
||||
&dlq_queue_name,
|
||||
DLX_EXCHANGE_NAME,
|
||||
"events.dlq",
|
||||
QueueBindOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Bind the single queue to the exchange with a default routing key
|
||||
// This receives all events
|
||||
channel
|
||||
.queue_bind(
|
||||
&queue_name,
|
||||
EXCHANGE_NAME,
|
||||
"#",
|
||||
QueueBindOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("dispatcher[{}] queue '{}' declared and bound", instance_id, queue_name);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(e) = run(channel, queue_name, instance_id, template_registry).await {
|
||||
tracing::error!("dispatcher[{}] exited with error: {}", instance_id, e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
/// The dispatcher consume loop.
|
||||
///
|
||||
/// Pulls messages from the single events queue, dispatches based on template
|
||||
/// and class instantiation, and replies if a reply_to header is present.
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-09` - mks - original coding
|
||||
async fn run(
|
||||
channel: Channel,
|
||||
queue_name: String,
|
||||
instance_id: u32,
|
||||
template_registry: Arc<RuntimeTemplateRegistry>,
|
||||
) -> Result<(), BrokerError> {
|
||||
let consumer_tag = format!("dispatcher-{}", instance_id);
|
||||
|
||||
let mut consumer = channel
|
||||
.basic_consume(
|
||||
&queue_name,
|
||||
&consumer_tag,
|
||||
BasicConsumeOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("dispatcher[{}] consuming from '{}'", instance_id, queue_name);
|
||||
|
||||
while let Some(delivery) = consumer.next().await {
|
||||
let delivery = match delivery {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
tracing::error!("dispatcher[{}] delivery error: {}", instance_id, e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let event_type = delivery
|
||||
.properties
|
||||
.kind()
|
||||
.as_ref()
|
||||
.map(|s| s.as_str().to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
let header_correlation_id = delivery
|
||||
.properties
|
||||
.correlation_id()
|
||||
.as_ref()
|
||||
.map(|s| s.as_str().to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
tracing::debug!("dispatcher[{}] received event type='{}'", instance_id, event_type);
|
||||
|
||||
let envelope = match parse_request(&delivery.data) {
|
||||
Ok(env) => env,
|
||||
Err(e) => {
|
||||
let payload = error_response(
|
||||
"unknown",
|
||||
&header_correlation_id,
|
||||
"INVALID_ENVELOPE",
|
||||
&e,
|
||||
);
|
||||
publish_reply(&channel, &delivery, payload).await;
|
||||
let _ = delivery
|
||||
.nack(BasicNackOptions {
|
||||
multiple: false,
|
||||
requeue: false,
|
||||
})
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let correlation_id = if envelope.correlation_id.trim().is_empty() {
|
||||
header_correlation_id
|
||||
} else {
|
||||
envelope.correlation_id.clone()
|
||||
};
|
||||
|
||||
if !event_type.is_empty() && event_type != envelope.op {
|
||||
let payload = error_response(
|
||||
&envelope.op,
|
||||
&correlation_id,
|
||||
"OP_MISMATCH",
|
||||
"AMQP type header does not match envelope op",
|
||||
);
|
||||
publish_reply(&channel, &delivery, payload).await;
|
||||
let _ = delivery
|
||||
.nack(BasicNackOptions {
|
||||
multiple: false,
|
||||
requeue: false,
|
||||
})
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let outcome = match envelope.op.as_str() {
|
||||
"ping" => DispatcherOutcome {
|
||||
reply_payload: Some(handle_ping(&correlation_id, instance_id)),
|
||||
ack_action: AckAction::Ack,
|
||||
},
|
||||
"shutdown" => {
|
||||
let _ = delivery.ack(BasicAckOptions::default()).await;
|
||||
tracing::info!("dispatcher[{}] shutdown event received — exiting", instance_id);
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
// Any other operation: template lookup → class instantiation
|
||||
// This is where schema is revealed and business logic runs.
|
||||
handle_class_dispatch(envelope, &correlation_id, instance_id, &template_registry).await
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(payload) = outcome.reply_payload {
|
||||
publish_reply(&channel, &delivery, payload).await;
|
||||
}
|
||||
|
||||
apply_ack_action(&channel, &delivery, &outcome.ack_action).await;
|
||||
}
|
||||
|
||||
tracing::info!("dispatcher[{}] consume loop exited", instance_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles a `ping` health check event.
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-09` - mks - original coding
|
||||
fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec<u8> {
|
||||
let ts = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
|
||||
success_response(
|
||||
"ping",
|
||||
correlation_id,
|
||||
serde_json::json!({
|
||||
"dispatcher": "dispatcher",
|
||||
"instance": instance_id,
|
||||
"ts": ts,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Routes to class instantiation based on template lookup.
|
||||
///
|
||||
/// Template namespace → class instantiation → schema revealed → business logic.
|
||||
/// This is the pivotal point where transport concerns hand off to domain logic.
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-09` - mks - original coding
|
||||
async fn handle_class_dispatch(
|
||||
envelope: BrokerRequestEnvelope,
|
||||
correlation_id: &str,
|
||||
instance_id: u32,
|
||||
template_registry: &RuntimeTemplateRegistry,
|
||||
) -> DispatcherOutcome {
|
||||
// Look up the template in the registry
|
||||
if !template_registry.contains_template(&envelope.template) {
|
||||
tracing::warn!(
|
||||
"dispatcher[{}] unknown template '{}'",
|
||||
instance_id,
|
||||
envelope.template
|
||||
);
|
||||
return DispatcherOutcome {
|
||||
reply_payload: Some(error_response(
|
||||
&envelope.op,
|
||||
correlation_id,
|
||||
"UNKNOWN_TEMPLATE",
|
||||
&format!("template '{}' not found in registry", envelope.template),
|
||||
)),
|
||||
ack_action: AckAction::Nack { requeue: false },
|
||||
};
|
||||
}
|
||||
|
||||
// Template found; in a full implementation, this would instantiate the class
|
||||
// and call its business logic. For now, it's a stub that acknowledges.
|
||||
tracing::debug!(
|
||||
"dispatcher[{}] routing {} to class for template '{}'",
|
||||
instance_id,
|
||||
envelope.op,
|
||||
envelope.template
|
||||
);
|
||||
|
||||
DispatcherOutcome {
|
||||
reply_payload: Some(success_response(
|
||||
&envelope.op,
|
||||
correlation_id,
|
||||
serde_json::json!({
|
||||
"status": "NOT_IMPLEMENTED",
|
||||
"template": envelope.template,
|
||||
"operation": envelope.op,
|
||||
}),
|
||||
)),
|
||||
ack_action: AckAction::Ack,
|
||||
}
|
||||
}
|
||||
|
||||
/// Publishes a reply to the `reply_to` queue if present in the message properties.
|
||||
async fn publish_reply(
|
||||
channel: &Channel,
|
||||
delivery: &lapin::message::Delivery,
|
||||
payload: Vec<u8>,
|
||||
) {
|
||||
let reply_to = delivery
|
||||
.properties
|
||||
.reply_to()
|
||||
.as_ref()
|
||||
.map(|s| s.as_str());
|
||||
|
||||
if let Some(queue_name) = reply_to {
|
||||
if let Err(e) = channel
|
||||
.basic_publish(
|
||||
"",
|
||||
queue_name,
|
||||
BasicPublishOptions::default(),
|
||||
&payload,
|
||||
BasicProperties::default(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!("failed to publish reply to '{}': {}", queue_name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies the ack/nack decision to the delivery.
|
||||
async fn apply_ack_action(
|
||||
_channel: &Channel,
|
||||
delivery: &lapin::message::Delivery,
|
||||
action: &AckAction,
|
||||
) {
|
||||
match action {
|
||||
AckAction::Ack => {
|
||||
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
|
||||
tracing::error!("failed to ack delivery: {}", e);
|
||||
}
|
||||
}
|
||||
AckAction::Nack { requeue } => {
|
||||
if let Err(e) = delivery
|
||||
.nack(BasicNackOptions {
|
||||
multiple: false,
|
||||
requeue: *requeue,
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("failed to nack delivery: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user