//! # brokers/w_broker.rs — Write Broker Task //! //! The wBroker handles all mutating events on the REC store — writes, updates, //! and deletes. Each instance declares its queue, binds to the `rec.write` //! routing key, and dispatches incoming events to the factory layer. //! //! ## Calling Agents //! - `brokers::mod` — spawns N instances at IPL via `spawn_w_broker_pool()` //! //! ## Inputs //! - `Arc` — shared AMQP connection from the broker pool //! - `queue_tag: String` — queue name prefix from config //! - `instance_id: u32` — numeric ID for log correlation (0-based) //! //! ## Outputs //! - Publishes reply payloads to the `reply_to` queue in each message header //! //! ## Event Types (routing key: rec.write) //! //! | Event | Description | Status | //! |------------|------------------------------------------|-------------| //! | `ping` | Health check — reply ACK + timestamp | Implemented | //! | `shutdown` | Ordered shutdown — cancel consumer | Implemented | //! | `write` | Insert a new REC record (GUID pkey) | Stub | //! | `update` | Update fields on an existing REC record | Stub | //! | `delete` | Remove REC record(s) by discriminants | Stub | //! //! **Author:** mks //! **Version:** 1.0 //! //! ## History //! * `2026-04-05` - mks - original coding use std::sync::Arc; use futures_lite::StreamExt; use lapin::{ BasicProperties, Channel, Connection, options::{ BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueBindOptions, QueueDeclareOptions, }, types::FieldTable, }; use crate::services::amqp::EXCHANGE_NAME; use super::error::BrokerError; /// Routing key this broker binds to. const ROUTING_KEY: &str = "rec.write"; /// Spawns a single wBroker task and returns immediately. /// /// # Arguments /// /// * `conn` — shared AMQP connection; each task opens its own channel /// * `queue_tag` — queue name prefix from config (e.g. `"dev_"`) /// * `instance_id` — zero-based index for log correlation /// /// # Returns /// /// `Ok(tokio::task::JoinHandle)` — the task handle held by the pool manager. /// `Err(BrokerError)` if channel or queue declaration fails before task start. /// /// # History /// /// * `2026-04-05` - mks - original coding pub async fn spawn( conn: Arc, queue_tag: String, instance_id: u32, ) -> Result, BrokerError> { let channel = conn.create_channel().await?; let queue_name = format!("{}rec.write", queue_tag); channel .queue_declare( &queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, FieldTable::default(), ) .await?; channel .queue_bind( &queue_name, EXCHANGE_NAME, ROUTING_KEY, QueueBindOptions::default(), FieldTable::default(), ) .await?; tracing::info!("wBroker[{}] queue '{}' declared and bound", instance_id, queue_name); let handle = tokio::spawn(async move { if let Err(e) = run(channel, queue_name, instance_id).await { tracing::error!("wBroker[{}] exited with error: {}", instance_id, e); } }); Ok(handle) } /// The wBroker consume loop. /// /// Mirrors the rBroker pattern. Processes messages until a `shutdown` event /// is received or the channel closes. /// /// # History /// /// * `2026-04-05` - mks - original coding async fn run( channel: Channel, queue_name: String, instance_id: u32, ) -> Result<(), BrokerError> { let consumer_tag = format!("wbroker-{}", instance_id); let mut consumer = channel .basic_consume( &queue_name, &consumer_tag, BasicConsumeOptions::default(), FieldTable::default(), ) .await?; tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queue_name); while let Some(delivery) = consumer.next().await { let delivery = match delivery { Ok(d) => d, Err(e) => { tracing::error!("wBroker[{}] delivery error: {}", instance_id, e); break; } }; let event_type = delivery .properties .kind() .as_ref() .map(|s| s.as_str().to_string()) .unwrap_or_default(); tracing::debug!("wBroker[{}] received event='{}'", instance_id, event_type); let reply_payload: Option> = match event_type.as_str() { "ping" => handle_ping(instance_id), "shutdown" => { let _ = delivery.ack(BasicAckOptions::default()).await; tracing::info!("wBroker[{}] shutdown event received — exiting", instance_id); break; } "write" => handle_write(&delivery.data, instance_id), "update" => handle_update(&delivery.data, instance_id), "delete" => handle_delete(&delivery.data, instance_id), unknown => { tracing::warn!("wBroker[{}] unknown event type '{}'", instance_id, unknown); None } }; if let Some(payload) = reply_payload { if let Some(reply_to) = delivery.properties.reply_to().as_ref() { let reply_queue = reply_to.as_str().to_string(); let correlation_id = delivery.properties.correlation_id().clone(); let props = BasicProperties::default() .with_correlation_id(correlation_id.unwrap_or_default()); if let Err(e) = channel .basic_publish( "", &reply_queue, BasicPublishOptions::default(), &payload, props, ) .await { tracing::error!("wBroker[{}] reply publish failed: {}", instance_id, e); } } } let _ = delivery.ack(BasicAckOptions::default()).await; } tracing::info!("wBroker[{}] consume loop exited", instance_id); Ok(()) } /// Handles a `ping` health check event. /// /// # History /// /// * `2026-04-05` - mks - original coding fn handle_ping(instance_id: u32) -> Option> { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); let response = format!( r#"{{"status":"ok","broker":"wBroker","instance":{},"ts":{}}}"#, instance_id, ts ); Some(response.into_bytes()) } /// Stub handler for `write` events. /// /// # History /// /// * `2026-04-05` - mks - stub fn handle_write(data: &[u8], instance_id: u32) -> Option> { tracing::warn!( "wBroker[{}] write event ({} bytes) — factory dispatch not yet implemented", instance_id, data.len() ); not_implemented_response() } /// Stub handler for `update` events. /// /// # History /// /// * `2026-04-05` - mks - stub fn handle_update(data: &[u8], instance_id: u32) -> Option> { tracing::warn!( "wBroker[{}] update event ({} bytes) — factory dispatch not yet implemented", instance_id, data.len() ); not_implemented_response() } /// Stub handler for `delete` events. /// /// # History /// /// * `2026-04-05` - mks - stub fn handle_delete(data: &[u8], instance_id: u32) -> Option> { tracing::warn!( "wBroker[{}] delete event ({} bytes) — factory dispatch not yet implemented", instance_id, data.len() ); not_implemented_response() } /// Standard NOT_IMPLEMENTED reply payload — used by all stub handlers. fn not_implemented_response() -> Option> { Some( r#"{"status":"error","code":"NOT_IMPLEMENTED","message":"factory dispatch not yet implemented"}"# .as_bytes() .to_vec(), ) }