diff --git a/src/brokers/dispatcher.rs b/src/brokers/dispatcher.rs new file mode 100644 index 0000000..5aa3203 --- /dev/null +++ b/src/brokers/dispatcher.rs @@ -0,0 +1,426 @@ +//! # brokers/dispatcher.rs — Unified Event Dispatcher +//! +//! Single dispatcher task that consumes from beds.events, parses the envelope, +//! instantiates the appropriate class based on template lookup, and delegates +//! to class business logic. The dispatcher is schema-agnostic; the class +//! instantiation reveals schema, CRUD operation, and domain-specific behavior. +//! +//! ## Calling Agents +//! - `brokers::mod` — spawned by `spawn_dispatcher_pool()` +//! +//! ## Inputs +//! - `Arc` — shared AMQP connection +//! - `queue_tag: String` — queue name prefix from config +//! - `instance_id: u32` — numeric ID for log correlation +//! - `template_registry: Arc` — loaded templates +//! +//! ## Outputs +//! - Publishes reply payloads to `reply_to` queue if present in headers +//! - Logs to tracing (journald/console per config) +//! +//! ## Model +//! +//! Transport layer (this file) owns: +//! - AMQP consumption +//! - Envelope parsing +//! - Acknowledgment semantics +//! - Reply routing (if reply_to header exists) +//! +//! Class instantiation owns: +//! - Schema determination (via template lookup) +//! - CRUD operation handling +//! - Business logic +//! - Data transformation +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-09` - mks - unified dispatcher replacing r_broker + w_broker + +use std::sync::Arc; + +use futures_lite::StreamExt; +use lapin::{ + BasicProperties, Channel, Connection, + options::{ + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, + QueueBindOptions, QueueDeclareOptions, + }, + types::{AMQPValue, FieldTable, LongString, ShortString}, +}; + +use crate::services::amqp::{DLX_EXCHANGE_NAME, EXCHANGE_NAME}; +use crate::brokers::payload::{ + BrokerRequestEnvelope, + error_response, + parse_request, + success_response, +}; +use crate::template_registry::RuntimeTemplateRegistry; +use super::error::BrokerError; + +/// Single queue for all events, regardless of schema or CRUD operation. +const QUEUE_NAME_SUFFIX: &str = "events"; + +enum AckAction { + Ack, + Nack { requeue: bool }, +} + +struct DispatcherOutcome { + reply_payload: Option>, + ack_action: AckAction, +} + +/// Spawns a single dispatcher task and returns immediately. +/// +/// The task consumes from the single events queue and processes messages +/// until a `shutdown` event is received or the connection is lost. +/// +/// # Arguments +/// +/// * `conn` — shared AMQP connection; each task opens its own channel +/// * `queue_tag` — queue name prefix from config +/// * `instance_id` — zero-based index for log correlation +/// * `template_registry` — loaded and validated templates +/// +/// # Returns +/// +/// `Ok(tokio::task::JoinHandle)` — the task handle; held by pool manager +/// `Err(BrokerError)` if channel or queue declaration fails before task start +/// +/// # History +/// +/// * `2026-04-09` - mks - original coding +pub async fn spawn( + conn: Arc, + queue_tag: String, + instance_id: u32, + template_registry: Arc, +) -> Result, BrokerError> { + let channel = conn.create_channel().await?; + + let queue_name = format!("{}{}", queue_tag, QUEUE_NAME_SUFFIX); + let dlq_queue_name = format!("{}{}.dlq", queue_tag, QUEUE_NAME_SUFFIX); + + let mut primary_args = FieldTable::default(); + primary_args.insert( + ShortString::from("x-dead-letter-exchange"), + AMQPValue::LongString(LongString::from(DLX_EXCHANGE_NAME)), + ); + primary_args.insert( + ShortString::from("x-dead-letter-routing-key"), + AMQPValue::LongString(LongString::from("events.dlq")), + ); + + // Declare the single events queue — idempotent + channel + .queue_declare( + &queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + primary_args, + ) + .await?; + + // Declare DLQ + channel + .queue_declare( + &dlq_queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + channel + .queue_bind( + &dlq_queue_name, + DLX_EXCHANGE_NAME, + "events.dlq", + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + // Bind the single queue to the exchange with a default routing key + // This receives all events + channel + .queue_bind( + &queue_name, + EXCHANGE_NAME, + "#", + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("dispatcher[{}] queue '{}' declared and bound", instance_id, queue_name); + + let handle = tokio::spawn(async move { + if let Err(e) = run(channel, queue_name, instance_id, template_registry).await { + tracing::error!("dispatcher[{}] exited with error: {}", instance_id, e); + } + }); + + Ok(handle) +} + +/// The dispatcher consume loop. +/// +/// Pulls messages from the single events queue, dispatches based on template +/// and class instantiation, and replies if a reply_to header is present. +/// +/// # History +/// +/// * `2026-04-09` - mks - original coding +async fn run( + channel: Channel, + queue_name: String, + instance_id: u32, + template_registry: Arc, +) -> Result<(), BrokerError> { + let consumer_tag = format!("dispatcher-{}", instance_id); + + let mut consumer = channel + .basic_consume( + &queue_name, + &consumer_tag, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("dispatcher[{}] consuming from '{}'", instance_id, queue_name); + + while let Some(delivery) = consumer.next().await { + let delivery = match delivery { + Ok(d) => d, + Err(e) => { + tracing::error!("dispatcher[{}] delivery error: {}", instance_id, e); + break; + } + }; + + let event_type = delivery + .properties + .kind() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + + let header_correlation_id = delivery + .properties + .correlation_id() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + + tracing::debug!("dispatcher[{}] received event type='{}'", instance_id, event_type); + + let envelope = match parse_request(&delivery.data) { + Ok(env) => env, + Err(e) => { + let payload = error_response( + "unknown", + &header_correlation_id, + "INVALID_ENVELOPE", + &e, + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + }; + + let correlation_id = if envelope.correlation_id.trim().is_empty() { + header_correlation_id + } else { + envelope.correlation_id.clone() + }; + + if !event_type.is_empty() && event_type != envelope.op { + let payload = error_response( + &envelope.op, + &correlation_id, + "OP_MISMATCH", + "AMQP type header does not match envelope op", + ); + publish_reply(&channel, &delivery, payload).await; + let _ = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: false, + }) + .await; + continue; + } + + let outcome = match envelope.op.as_str() { + "ping" => DispatcherOutcome { + reply_payload: Some(handle_ping(&correlation_id, instance_id)), + ack_action: AckAction::Ack, + }, + "shutdown" => { + let _ = delivery.ack(BasicAckOptions::default()).await; + tracing::info!("dispatcher[{}] shutdown event received — exiting", instance_id); + break; + } + _ => { + // Any other operation: template lookup → class instantiation + // This is where schema is revealed and business logic runs. + handle_class_dispatch(envelope, &correlation_id, instance_id, &template_registry).await + } + }; + + if let Some(payload) = outcome.reply_payload { + publish_reply(&channel, &delivery, payload).await; + } + + apply_ack_action(&channel, &delivery, &outcome.ack_action).await; + } + + tracing::info!("dispatcher[{}] consume loop exited", instance_id); + Ok(()) +} + +/// Handles a `ping` health check event. +/// +/// # History +/// +/// * `2026-04-09` - mks - original coding +fn handle_ping(correlation_id: &str, instance_id: u32) -> Vec { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + success_response( + "ping", + correlation_id, + serde_json::json!({ + "dispatcher": "dispatcher", + "instance": instance_id, + "ts": ts, + }), + ) +} + +/// Routes to class instantiation based on template lookup. +/// +/// Template namespace → class instantiation → schema revealed → business logic. +/// This is the pivotal point where transport concerns hand off to domain logic. +/// +/// # History +/// +/// * `2026-04-09` - mks - original coding +async fn handle_class_dispatch( + envelope: BrokerRequestEnvelope, + correlation_id: &str, + instance_id: u32, + template_registry: &RuntimeTemplateRegistry, +) -> DispatcherOutcome { + // Look up the template in the registry + if !template_registry.contains_template(&envelope.template) { + tracing::warn!( + "dispatcher[{}] unknown template '{}'", + instance_id, + envelope.template + ); + return DispatcherOutcome { + reply_payload: Some(error_response( + &envelope.op, + correlation_id, + "UNKNOWN_TEMPLATE", + &format!("template '{}' not found in registry", envelope.template), + )), + ack_action: AckAction::Nack { requeue: false }, + }; + } + + // Template found; in a full implementation, this would instantiate the class + // and call its business logic. For now, it's a stub that acknowledges. + tracing::debug!( + "dispatcher[{}] routing {} to class for template '{}'", + instance_id, + envelope.op, + envelope.template + ); + + DispatcherOutcome { + reply_payload: Some(success_response( + &envelope.op, + correlation_id, + serde_json::json!({ + "status": "NOT_IMPLEMENTED", + "template": envelope.template, + "operation": envelope.op, + }), + )), + ack_action: AckAction::Ack, + } +} + +/// Publishes a reply to the `reply_to` queue if present in the message properties. +async fn publish_reply( + channel: &Channel, + delivery: &lapin::message::Delivery, + payload: Vec, +) { + let reply_to = delivery + .properties + .reply_to() + .as_ref() + .map(|s| s.as_str()); + + if let Some(queue_name) = reply_to { + if let Err(e) = channel + .basic_publish( + "", + queue_name, + BasicPublishOptions::default(), + &payload, + BasicProperties::default(), + ) + .await + { + tracing::error!("failed to publish reply to '{}': {}", queue_name, e); + } + } +} + +/// Applies the ack/nack decision to the delivery. +async fn apply_ack_action( + _channel: &Channel, + delivery: &lapin::message::Delivery, + action: &AckAction, +) { + match action { + AckAction::Ack => { + if let Err(e) = delivery.ack(BasicAckOptions::default()).await { + tracing::error!("failed to ack delivery: {}", e); + } + } + AckAction::Nack { requeue } => { + if let Err(e) = delivery + .nack(BasicNackOptions { + multiple: false, + requeue: *requeue, + }) + .await + { + tracing::error!("failed to nack delivery: {}", e); + } + } + } +} diff --git a/src/brokers/mod.rs b/src/brokers/mod.rs index 52af141..e21d5ba 100644 --- a/src/brokers/mod.rs +++ b/src/brokers/mod.rs @@ -1,30 +1,33 @@ -//! # brokers/mod.rs — Broker Pool Manager +//! # brokers/mod.rs — Dispatcher Pool Manager //! -//! Manages the lifecycle of all broker task pools. At IPL, `spawn_r_broker_pool()` -//! reads the instance count from config, spawns N rBroker Tokio tasks, and +//! Manages the lifecycle of the dispatcher task pool. At IPL, `spawn_dispatcher_pool()` +//! reads the instance count from config, spawns N dispatcher Tokio tasks, and //! returns their JoinHandles to the caller. //! -//! Each broker type gets its own pool function following the same pattern. +//! The dispatcher is a unified consumer that pulls from the single events queue, +//! determines class/schema via template lookup, and delegates to class instantiation. +//! No separate r_broker/w_broker; no schema-specific routing. +//! //! The pool manager holds handles but does not supervise — task exit is logged //! by the task itself. Supervision (respawn on crash) is a future addition. //! //! ## Calling Agents -//! - `ipl()` in main.rs — calls pool spawn functions after exchange declaration +//! - `ipl()` in main.rs — calls `spawn_dispatcher_pool()` after exchange declaration //! //! ## Outputs -//! - `Vec>` per broker type — held for clean shutdown +//! - `Vec>` — held for clean shutdown //! //! **Author:** mks //! **Version:** 1.0 //! //! ## History -//! * `2026-04-05` - mks - original coding +//! * `2026-04-05` - mks - original coding (separate r_broker + w_broker) +//! * `2026-04-09` - mks - unified to single dispatcher pool +pub mod dispatcher; pub mod error; pub mod logger_store; pub mod payload; -pub mod r_broker; -pub mod w_broker; use std::sync::Arc; use lapin::Connection; @@ -33,16 +36,20 @@ use crate::config::BrokerServicesConfig; use crate::template_registry::RuntimeTemplateRegistry; use error::BrokerError; -/// Spawns the rBroker pool — N tasks as configured in `instances.r_broker`. +/// Spawns the dispatcher pool — N tasks as configured in `instances.dispatcher`. /// -/// Each task gets the shared AMQP connection, the queue tag, and its zero-based -/// instance index. The connection is wrapped in `Arc` so each task can open -/// its own channel without cloning the connection. +/// Each task gets the shared AMQP connection, the queue tag, its zero-based +/// instance index, and the template registry. The connection is wrapped in `Arc` +/// so each task can open its own channel without cloning the connection. +/// +/// The dispatcher consumes from a single queue and routes to class instantiation +/// based on template lookup. No separate r_broker/w_broker pools. /// /// # Arguments /// -/// * `conn` — the authenticated AMQP connection from IPL step 3b -/// * `cfg` — broker services config block (queue_tag + instance counts) +/// * `conn` — the authenticated AMQP connection from IPL +/// * `cfg` — broker services config block (queue_tag + instance count) +/// * `template_registry` — loaded and validated runtime templates /// /// # Returns /// @@ -51,62 +58,28 @@ use error::BrokerError; /// /// # History /// -/// * `2026-04-05` - mks - original coding -pub async fn spawn_r_broker_pool( +/// * `2026-04-09` - mks - unified dispatcher pool +pub async fn spawn_dispatcher_pool( conn: Arc, cfg: &BrokerServicesConfig, template_registry: Arc, ) -> Result>, BrokerError> { - let count = cfg.app_server.instances.r_broker; + // For now, use a sensible default if a specific dispatcher instance count isn't set + // In full config, this would be cfg.app_server.instances.dispatcher + let count = cfg.app_server.instances.r_broker.max(cfg.app_server.instances.w_broker); let mut handles = Vec::with_capacity(count as usize); for i in 0..count { - let handle = r_broker::spawn( + let handle = dispatcher::spawn( Arc::clone(&conn), cfg.queue_tag.clone(), i, Arc::clone(&template_registry), - ).await?; + ) + .await?; handles.push(handle); } - tracing::info!("rBroker pool started: {} instance(s)", count); - Ok(handles) -} - -/// Spawns the wBroker pool — N tasks as configured in `instances.w_broker`. -/// -/// # Arguments -/// -/// * `conn` — the authenticated AMQP connection from IPL step 3b -/// * `cfg` — broker services config block (queue_tag + instance counts) -/// -/// # Returns -/// -/// `Ok(Vec>)` — one handle per spawned task. -/// `Err(BrokerError)` if any task fails to declare its queue before starting. -/// -/// # History -/// -/// * `2026-04-05` - mks - original coding -pub async fn spawn_w_broker_pool( - conn: Arc, - cfg: &BrokerServicesConfig, - template_registry: Arc, -) -> Result>, BrokerError> { - let count = cfg.app_server.instances.w_broker; - let mut handles = Vec::with_capacity(count as usize); - - for i in 0..count { - let handle = w_broker::spawn( - Arc::clone(&conn), - cfg.queue_tag.clone(), - i, - Arc::clone(&template_registry), - ).await?; - handles.push(handle); - } - - tracing::info!("wBroker pool started: {} instance(s)", count); + tracing::info!("dispatcher pool started: {} instance(s)", count); Ok(handles) } diff --git a/src/core/dispatch.rs b/src/core/dispatch.rs new file mode 100644 index 0000000..784b5ed --- /dev/null +++ b/src/core/dispatch.rs @@ -0,0 +1,173 @@ +//! # core/dispatch.rs — Broker-to-Domain Dispatch Boundary +//! +//! Defines the transport-neutral seam between AMQP brokers and domain logic. +//! The intent is to keep brokers focused on delivery concerns while class, +//! schema, and base I/O layers own business and storage behavior. + +#![allow(dead_code)] + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use serde_json::Value; + +use crate::template_registry::{RuntimeRecTemplate, RuntimeTemplateRegistry}; + +pub type RecordMap = HashMap; +pub type DispatchFuture<'a, T> = Pin + Send + 'a>>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DispatchOperation { + Fetch, + Write, + Update, + Delete, +} + +impl DispatchOperation { + pub fn as_str(self) -> &'static str { + match self { + Self::Fetch => "fetch", + Self::Write => "write", + Self::Update => "update", + Self::Delete => "delete", + } + } +} + +#[derive(Debug, Clone)] +pub struct DispatchRequest { + pub op: DispatchOperation, + pub template: String, + pub correlation_id: String, + pub payload: RecordMap, +} + +#[derive(Debug, Clone, Default)] +pub struct DispatchContext { + pub reply_expected: bool, + pub best_effort: bool, +} + +#[derive(Debug, Clone)] +pub enum DispatchResponse { + Records(Vec), + Record(RecordMap), + Affected(u64), + Empty, +} + +#[derive(Debug, thiserror::Error)] +pub enum DispatchError { + #[error("unknown template '{template}'")] + UnknownTemplate { template: String }, + + #[error("operation '{operation}' unsupported for template '{template}'")] + UnsupportedOperation { template: String, operation: String }, + + #[error("validation failed: {message}")] + Validation { message: String }, + + #[error("storage failed: {message}")] + Storage { message: String }, + + #[error("internal dispatch failure: {message}")] + Internal { message: String }, +} + +pub trait DispatchResolver: Send + Sync { + fn resolve_class( + &self, + request: &DispatchRequest, + ) -> Result, DispatchError>; + + fn resolve_template( + &self, + template: &str, + ) -> Result<&RuntimeRecTemplate, DispatchError>; +} + +pub trait DomainClass: Send + Sync { + fn class_id(&self) -> &'static str; + + fn handle<'a>( + &'a self, + request: DispatchRequest, + context: DispatchContext, + ) -> DispatchFuture<'a, Result>; +} + +pub trait SchemaHandler: Send + Sync { + fn schema_id(&self) -> &'static str; + + fn execute<'a>( + &'a self, + request: &'a DispatchRequest, + ) -> DispatchFuture<'a, Result>; +} + +pub trait BaseIoAdapter: Send + Sync { + fn media_id(&self) -> &'static str; + + fn fetch<'a>( + &'a self, + template: &'a RuntimeRecTemplate, + params: &'a RecordMap, + ) -> DispatchFuture<'a, Result, DispatchError>>; + + fn write<'a>( + &'a self, + template: &'a RuntimeRecTemplate, + data: &'a RecordMap, + ) -> DispatchFuture<'a, Result>; + + fn update<'a>( + &'a self, + template: &'a RuntimeRecTemplate, + params: &'a RecordMap, + data: &'a RecordMap, + ) -> DispatchFuture<'a, Result>; + + fn delete<'a>( + &'a self, + template: &'a RuntimeRecTemplate, + params: &'a RecordMap, + ) -> DispatchFuture<'a, Result>; +} + +pub struct RegistryDispatchResolver { + template_registry: Arc, +} + +impl RegistryDispatchResolver { + pub fn new(template_registry: Arc) -> Self { + Self { template_registry } + } +} + +impl DispatchResolver for RegistryDispatchResolver { + fn resolve_class( + &self, + request: &DispatchRequest, + ) -> Result, DispatchError> { + let template = self.resolve_template(&request.template)?; + + Err(DispatchError::UnsupportedOperation { + template: template.template_class.clone(), + operation: request.op.as_str().to_string(), + }) + } + + fn resolve_template( + &self, + template: &str, + ) -> Result<&RuntimeRecTemplate, DispatchError> { + self.template_registry + .get(template) + .ok_or_else(|| DispatchError::UnknownTemplate { + template: template.to_string(), + }) + } +} \ No newline at end of file diff --git a/src/core/mod.rs b/src/core/mod.rs index 26103af..626d80c 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -24,6 +24,8 @@ //! ## History //! * `2026-04-05` - mks - original coding (stub) +pub mod dispatch; + use std::collections::HashMap; use serde_json::Value; diff --git a/src/main.rs b/src/main.rs index 6d753ac..b375676 100644 --- a/src/main.rs +++ b/src/main.rs @@ -117,9 +117,9 @@ async fn ipl() -> Result<(), String> { } } - // step 7: spawn broker pools — queues are declared here, not at exchange declare time - // rBroker pool requires an authenticated AMQP connection; skip in non-prod if unavailable - let _broker_handles: Vec> = { + // step 7: spawn dispatcher pool — single consumer from unified events queue + // each dispatcher instance instantiates classes based on template lookup + let _dispatcher_handles: Vec> = { use std::sync::Arc; let shared_conn = Arc::new( lapin::Connection::connect( @@ -134,28 +134,20 @@ async fn ipl() -> Result<(), String> { lapin::ConnectionProperties::default(), ) .await - .map_err(|e| format!("Broker pool connection failed: {}", e))?, + .map_err(|e| format!("Dispatcher pool connection failed: {}", e))?, ); let _ = &amqp_conn; // keep IPL AMQP connection alive - let r_handles = brokers::spawn_r_broker_pool( + let d_handles = brokers::spawn_dispatcher_pool( Arc::clone(&shared_conn), &cfg.broker_services, std::sync::Arc::clone(&template_registry_state), ) .await - .map_err(|e| format!("rBroker pool failed to start: {}", e))?; + .map_err(|e| format!("Dispatcher pool failed to start: {}", e))?; - let w_handles = brokers::spawn_w_broker_pool( - Arc::clone(&shared_conn), - &cfg.broker_services, - std::sync::Arc::clone(&template_registry_state), - ) - .await - .map_err(|e| format!("wBroker pool failed to start: {}", e))?; - - r_handles.into_iter().chain(w_handles).collect() + d_handles }; tracing::info!("BEDS IPL complete — node green"); diff --git a/src/template_registry/mod.rs b/src/template_registry/mod.rs index 2511de5..06f6671 100644 --- a/src/template_registry/mod.rs +++ b/src/template_registry/mod.rs @@ -56,6 +56,10 @@ impl RuntimeTemplateRegistry { pub fn contains_template(&self, template: &str) -> bool { self.by_name.contains_key(&template.to_ascii_lowercase()) } + + pub fn get(&self, template: &str) -> Option<&RuntimeRecTemplate> { + self.by_name.get(&template.to_ascii_lowercase()) + } } struct RecTemplateManifest {