//! # brokers/dispatcher.rs — Unified Event Dispatcher //! //! Single dispatcher task that consumes from beds.events, parses the envelope, //! instantiates the appropriate class based on template lookup, and delegates //! to class business logic. The dispatcher is schema-agnostic; the class //! instantiation reveals schema, CRUD operation, and domain-specific behavior. //! //! ## Calling Agents //! - `brokers::mod` — spawned by `spawn_dispatcher_pool()` //! //! ## Inputs //! - `Arc` — shared AMQP connection //! - `queue_tag: String` — queue name prefix from config //! - `instance_id: u32` — numeric ID for log correlation //! - `template_registry: Arc` — loaded templates //! //! ## Outputs //! - Publishes reply payloads to `reply_to` queue if present in headers //! - Logs to tracing (journald/console per config) //! //! ## Model //! //! Transport layer (this file) owns: //! - AMQP consumption //! - Envelope parsing //! - Acknowledgment semantics //! - Reply routing (if reply_to header exists) //! //! Class instantiation owns: //! - Schema determination (via template lookup) //! - CRUD operation handling //! - Business logic //! - Data transformation //! //! **Author:** mks //! **Version:** 1.0 //! //! ## History //! * `2026-04-09` - mks - unified dispatcher replacing r_broker + w_broker use std::sync::Arc; use futures_lite::StreamExt; use lapin::{ BasicProperties, Channel, Connection, options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, QueueBindOptions, QueueDeclareOptions, }, types::{AMQPValue, FieldTable, LongString, ShortString}, }; use tokio::sync::watch; use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME}; use crate::brokers::payload::{ BrokerRequestEnvelope, error_response, parse_request, success_response, }; use crate::template_registry::RuntimeTemplateRegistry; use super::error::BrokerError; /// Single queue for all events, regardless of schema or CRUD operation. const QUEUE_NAME_SUFFIX: &str = "events"; enum AckAction { Ack, Nack { requeue: bool }, } struct DispatcherOutcome { reply_payload: Option>, ack_action: AckAction, } /// Spawns a single dispatcher task and returns immediately. /// /// The task consumes from the single events queue and processes messages /// until a `shutdown` event is received or the connection is lost. /// /// # Arguments /// /// * `conn` — shared AMQP connection; each task opens its own channel /// * `queue_tag` — queue name prefix from config /// * `instance_id` — zero-based index for log correlation /// * `template_registry` — loaded and validated templates /// /// # Returns /// /// `Ok(tokio::task::JoinHandle)` — the task handle; held by pool manager /// `Err(BrokerError)` if channel or queue declaration fails before task start /// /// # History /// /// * `2026-04-09` - mks - original coding pub async fn spawn( conn: Arc, queue_tag: String, instance_id: u32, template_registry: Arc, shutdown_tx: watch::Sender, shutdown_rx: watch::Receiver, ) -> Result, BrokerError> { let channel = conn.create_channel().await?; let queue_name = format!("{}{}", queue_tag, QUEUE_NAME_SUFFIX); let dlq_queue_name = format!("{}{}.dlq", queue_tag, QUEUE_NAME_SUFFIX); let mut primary_args = FieldTable::default(); primary_args.insert( ShortString::from("x-dead-letter-exchange"), AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)), ); primary_args.insert( ShortString::from("x-dead-letter-routing-key"), AMQPValue::LongString(LongString::from("events.dlq")), ); // Declare the single events queue — idempotent channel .queue_declare( &queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, primary_args, ) .await?; // Declare DLQ channel .queue_declare( &dlq_queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, FieldTable::default(), ) .await?; channel .queue_bind( &dlq_queue_name, DLX_EXCHANGE_NAME, "events.dlq", QueueBindOptions::default(), FieldTable::default(), ) .await?; // Bind the single queue to the exchange with a default routing key // This receives all events channel .queue_bind( &queue_name, EXCHANGE_NAME, "#", QueueBindOptions::default(), FieldTable::default(), ) .await?; tracing::info!("dispatcher[{}] queue '{}' declared and bound", instance_id, queue_name); let handle = tokio::spawn(async move { if let Err(e) = run( channel, queue_name, instance_id, template_registry, shutdown_tx, shutdown_rx, ).await { tracing::error!("dispatcher[{}] exited with error: {}", instance_id, e); } }); Ok(handle) } /// The dispatcher consume loop. /// /// Pulls messages from the single events queue, dispatches based on template /// and class instantiation, and replies if a reply_to header is present. /// /// # History /// /// * `2026-04-09` - mks - original coding async fn run( channel: Channel, queue_name: String, instance_id: u32, template_registry: Arc, shutdown_tx: watch::Sender, mut shutdown_rx: watch::Receiver, ) -> Result<(), BrokerError> { let consumer_tag = format!("dispatcher-{}", instance_id); let mut consumer = channel .basic_consume( &queue_name, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default(), ) .await?; tracing::info!("dispatcher[{}] consuming from '{}'", instance_id, queue_name); loop { tokio::select! { changed = shutdown_rx.changed() => { if changed.is_ok() && *shutdown_rx.borrow() { tracing::info!("dispatcher[{}] received global shutdown signal", instance_id); break; } if changed.is_err() { tracing::info!("dispatcher[{}] shutdown coordinator dropped", instance_id); break; } } maybe_delivery = consumer.next() => { let Some(delivery) = maybe_delivery else { break; }; let delivery = match delivery { Ok(d) => d, Err(e) => { tracing::error!("dispatcher[{}] delivery error: {}", instance_id, e); break; } }; let event_type = delivery .properties .kind() .as_ref() .map(|s| s.as_str().to_string()) .unwrap_or_default(); let header_correlation_id = delivery .properties .correlation_id() .as_ref() .map(|s| s.as_str().to_string()) .unwrap_or_default(); tracing::debug!("dispatcher[{}] received event type='{}'", instance_id, event_type); let envelope = match parse_request(&delivery.data) { Ok(env) => env, Err(e) => { let payload = error_response( "unknown", &header_correlation_id, "INVALID_ENVELOPE", &e, ); publish_reply(&channel, &delivery, payload).await; let _ = delivery .nack(BasicNackOptions { multiple: false, requeue: false, }) .await; continue; } }; let correlation_id = if envelope.correlation_id.trim().is_empty() { header_correlation_id } else { envelope.correlation_id.clone() }; if !event_type.is_empty() && event_type != envelope.op { let payload = error_response( &envelope.op, &correlation_id, "OP_MISMATCH", "AMQP type header does not match envelope op", ); publish_reply(&channel, &delivery, payload).await; let _ = delivery .nack(BasicNackOptions { multiple: false, requeue: false, }) .await; continue; } let outcome = match envelope.op.as_str() { "ping" => DispatcherOutcome { reply_payload: Some(handle_ping(&correlation_id, instance_id)), ack_action: AckAction::Ack, }, "shutdown" => { let _ = delivery.ack(BasicAckOptions::default()).await; tracing::info!("dispatcher[{}] shutdown event received — exiting", instance_id); let _ = shutdown_tx.send(true); break; } _ => { // Any other operation: template lookup → class instantiation // This is where schema is revealed and business logic runs. handle_class_dispatch(envelope, &correlation_id, instance_id, &template_registry).await } }; if let Some(payload) = outcome.reply_payload { publish_reply(&channel, &delivery, payload).await; } apply_ack_action(&channel, &delivery, &outcome.ack_action).await; } } } tracing::info!("dispatcher[{}] consume loop exited", instance_id); Ok(()) } /// Handles a `ping` health check event. /// /// # History /// /// * `2026-04-09` - mks - original coding fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); success_response( "ping", correlation_id, serde_json::json!({ "dispatcher": "dispatcher", "instance": instance_id, "ts": ts, }), ) } /// Routes to class instantiation based on template lookup. /// /// Template namespace → class instantiation → schema revealed → business logic. /// This is the pivotal point where transport concerns hand off to domain logic. /// /// # History /// /// * `2026-04-09` - mks - original coding async fn handle_class_dispatch( envelope: BrokerRequestEnvelope, correlation_id: &str, instance_id: u32, template_registry: &RuntimeTemplateRegistry, ) -> DispatcherOutcome { // Look up the template in the registry if !template_registry.contains_template(&envelope.template) { tracing::warn!( "dispatcher[{}] unknown template '{}'", instance_id, envelope.template ); return DispatcherOutcome { reply_payload: Some(error_response( &envelope.op, correlation_id, "UNKNOWN_TEMPLATE", &format!("template '{}' not found in registry", envelope.template), )), ack_action: AckAction::Nack { requeue: false }, }; } // Template found; in a full implementation, this would instantiate the class // and call its business logic. For now, it's a stub that acknowledges. tracing::debug!( "dispatcher[{}] routing {} to class for template '{}'", instance_id, envelope.op, envelope.template ); DispatcherOutcome { reply_payload: Some(success_response( &envelope.op, correlation_id, serde_json::json!({ "status": "NOT_IMPLEMENTED", "template": envelope.template, "operation": envelope.op, }), )), ack_action: AckAction::Ack, } } /// Publishes a reply to the `reply_to` queue if present in the message properties. async fn publish_reply( channel: &Channel, delivery: &lapin::message::Delivery, payload: Vec, ) { let reply_to = delivery .properties .reply_to() .as_ref() .map(|s| s.as_str()); if let Some(queue_name) = reply_to { if let Err(e) = channel .basic_publish( "", queue_name, BasicPublishOptions::default(), &payload, BasicProperties::default(), ) .await { tracing::error!("failed to publish reply to '{}': {}", queue_name, e); } } } /// Applies the ack/nack decision to the delivery. async fn apply_ack_action( _channel: &Channel, delivery: &lapin::message::Delivery, action: &AckAction, ) { match action { AckAction::Ack => { if let Err(e) = delivery.ack(BasicAckOptions::default()).await { tracing::error!("failed to ack delivery: {}", e); } } AckAction::Nack { requeue } => { if let Err(e) = delivery .nack(BasicNackOptions { multiple: false, requeue: *requeue, }) .await { tracing::error!("failed to nack delivery: {}", e); } } } }