From ebefb15a877626d30e9723f80262cc13d26d137e Mon Sep 17 00:00:00 2001 From: gramps Date: Sun, 5 Apr 2026 20:18:31 -0700 Subject: [PATCH] Add rBroker + wBroker pool, BrokerPayload, NamasteCore trait stub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - src/brokers/: pool manager, r_broker (rec.read), w_broker (rec.write), BrokerPayload struct, BrokerError type - src/core/: NamasteCore trait — fetch/write/update/delete interface, stubs - IPL step 6: spawns rBroker + wBroker pools after exchange declaration - tests/broker_pool_test.rs: integration tests for pool spawn (skip if broker down) - BrokerPayload unit tests + doctest in payload.rs - Added futures-lite, serde_json to Cargo.toml - README.md, CLAUDE.md, wiki updated to reflect new structure and status Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 2 + Cargo.toml | 2 + README.md | 20 ++- claude.md | 44 +++++-- src/brokers/error.rs | 33 +++++ src/brokers/mod.rs | 106 +++++++++++++++ src/brokers/payload.rs | 108 ++++++++++++++++ src/brokers/r_broker.rs | 240 ++++++++++++++++++++++++++++++++++ src/brokers/w_broker.rs | 262 ++++++++++++++++++++++++++++++++++++++ src/core/mod.rs | 88 +++++++++++++ src/lib.rs | 3 + src/main.rs | 55 +++++++- tests/broker_pool_test.rs | 98 ++++++++++++++ 13 files changed, 1042 insertions(+), 19 deletions(-) create mode 100644 src/brokers/error.rs create mode 100644 src/brokers/mod.rs create mode 100644 src/brokers/payload.rs create mode 100644 src/brokers/r_broker.rs create mode 100644 src/brokers/w_broker.rs create mode 100644 src/core/mod.rs create mode 100644 tests/broker_pool_test.rs diff --git a/Cargo.lock b/Cargo.lock index da3863c..98f9ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2247,9 +2247,11 @@ name = "rustybeds" version = "0.1.0" dependencies = [ "config", + "futures-lite 2.6.1", "lapin", "mongodb", "serde", + "serde_json", "thiserror 1.0.69", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index d0cdca0..f7bfdfe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,11 +5,13 @@ edition = "2024" [dependencies] serde = { version = "1", features = ["derive"]} +serde_json = "1" config = "0.14" thiserror = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"]} tracing-journald = "0.3" mongodb = { version = "3", features = ["sync"] } +futures-lite = "2" lapin = "2" tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/README.md b/README.md index d6e4432..385dc65 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,14 @@ rustybeds/ │ ├── config/ │ │ ├── mod.rs # Loader — load() and load_from() for testability │ │ └── structs.rs # Typed config structs (serde Deserialize) +│ ├── brokers/ +│ │ ├── mod.rs # Pool manager — spawn_r/w_broker_pool() +│ │ ├── error.rs # BrokerError type +│ │ ├── payload.rs # BrokerPayload — AMQP message body struct +│ │ ├── r_broker.rs # rBroker task — rec.read consume loop +│ │ └── w_broker.rs # wBroker task — rec.write consume loop +│ ├── core/ +│ │ └── mod.rs # NamasteCore trait — unified CRUD interface (stub) │ ├── services/ │ │ ├── mod.rs # Groups external service transport modules │ │ ├── amqp/ @@ -109,6 +117,7 @@ rustybeds/ │ ├── example_rec.toml # Canonical self-documenting REC template │ └── mst_logger_rec.toml # Logger collection template (msLogs) ├── tests/ +│ ├── broker_pool_test.rs # rBroker + wBroker pool integration tests │ ├── common/mod.rs # Shared test helpers — load_test_config() │ └── fixtures/ │ └── beds_test.toml # Canonical test config fixture @@ -149,12 +158,13 @@ The `config` crate deep-merges these at startup. Only keys present in the env fi | Unit test scaffolding + config fixture pattern | Done | | MongoDB reachability validation | Done | | MariaDB reachability validation | Done | -| Broker pool (Tokio tasks) + queue declaration | Next | -| AMQP publish / consume | Planned | -| Broker pool (Tokio tasks) | Planned | -| NamasteCore trait | Planned | +| rBroker pool (Tokio tasks, queue declare, consume loop) | Done | +| wBroker pool (Tokio tasks, queue declare, consume loop) | Done | +| BrokerPayload — AMQP message body struct | Done | +| NamasteCore trait (stub) | Done | +| Factory dispatch | Next | | Database adapters (MariaDB, MongoDB) | Planned | -| Factory dispatch | Planned | +| AMQP publish / consume (full round-trip) | Planned | | AI database object generation | Phase 2 | --- diff --git a/claude.md b/claude.md index 374b9c3..4a874cf 100644 --- a/claude.md +++ b/claude.md @@ -28,6 +28,14 @@ This is not a greenfield project. The architecture is proven. The Rust rewrite e ``` rustybeds/ ├── src/ +│ ├── brokers/ +│ │ ├── mod.rs # Pool manager — spawn_r/w_broker_pool() +│ │ ├── error.rs # BrokerError type +│ │ ├── payload.rs # BrokerPayload — AMQP message body struct +│ │ ├── r_broker.rs # rBroker task — rec.read consume loop +│ │ └── w_broker.rs # wBroker task — rec.write consume loop +│ ├── core/ +│ │ └── mod.rs # NamasteCore trait — unified CRUD interface (stub) │ ├── config/ │ │ ├── mod.rs # load() + load_from() — layered TOML config │ │ └── structs.rs # Typed config structs (serde Deserialize) @@ -53,6 +61,7 @@ rustybeds/ │ ├── example_rec.toml # Canonical self-documenting REC template │ └── mst_logger_rec.toml # Logger collection template (msLogs) ├── tests/ +│ ├── broker_pool_test.rs # rBroker + wBroker pool integration tests │ ├── common/mod.rs # Shared test helpers — load_test_config() │ └── fixtures/ │ └── beds_test.toml # Canonical test config fixture @@ -65,15 +74,11 @@ rustybeds/ ``` src/ ├── core/ -│ ├── trait.rs # NamasteCore trait definition -│ ├── factory.rs # Template name → adapter dispatch +│ ├── factory.rs # Template name → NamasteCore dispatch │ └── meta.rs # Request metadata parsing -├── adapters/ -│ ├── mysql.rs # gacPDO equivalent -│ └── mongodb.rs # gacMongoDB equivalent -└── brokers/ - ├── pool.rs # Broker pool management - └── broker.rs # Individual broker task +└── adapters/ + ├── mysql.rs # gacPDO equivalent + └── mongodb.rs # gacMongoDB equivalent ``` ## Key Rust Mappings from PHP @@ -91,17 +96,30 @@ src/ ## NamasteCore Trait Interface -The core CRUD interface. Every database adapter and every template must implement this: +The core CRUD interface in `src/core/mod.rs`. Every template struct implements this: ```rust pub trait NamasteCore { - async fn create_record(&self, payload: &Payload) -> Result; - async fn fetch_records(&self, query: &Query) -> Result, BEDSError>; - async fn update_record(&self, payload: &Payload) -> Result; - async fn delete_record(&self, id: &str) -> Result; + fn template_id(&self) -> &'static str; + fn fetch(&self, params: HashMap) -> impl Future>, String>> + Send; + fn write(&self, data: HashMap) -> impl Future, String>> + Send; + fn update(&self, params: HashMap, data: HashMap) -> impl Future> + Send; + fn delete(&self, params: HashMap) -> impl Future> + Send; } ``` +Data in/out is always `HashMap` in user-facing field names. Templates own schema translation. Callers never provide primary keys on writes — the template generates a GUID. + +## BrokerPayload — AMQP Message Body + +The JSON body carried by all broker messages (`src/brokers/payload.rs`): + +```json +{ "template": "usr", "data": { "first_name": "joe", "status": "active" } } +``` + +The AMQP `type` message property carries the operation (`fetch`, `write`, `update`, `delete`, `ping`, `shutdown`). The body carries the template identifier and data payload. + ## Dependencies (Cargo.toml) ```toml diff --git a/src/brokers/error.rs b/src/brokers/error.rs new file mode 100644 index 0000000..654cd1b --- /dev/null +++ b/src/brokers/error.rs @@ -0,0 +1,33 @@ +//! # brokers/error.rs — Broker Error Types +//! +//! Defines the error type for all broker task operations in BEDS. +//! +//! ## Calling Agents +//! - `brokers::r_broker` — returned from spawn and consume operations +//! - `brokers::mod` — surfaced from pool management +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +/// Errors that can occur in any BEDS broker task. +/// +/// AMQP protocol errors are wrapped transparently via the `From` impl. +/// Additional variants cover broker-specific failure modes. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +#[derive(Debug, thiserror::Error)] +pub enum BrokerError { + #[error("AMQP protocol error: {0}")] + Protocol(#[from] lapin::Error), + + #[error("Broker task '{0}' failed to start: {1}")] + StartupFailed(String, String), + + #[error("Message decode error in broker '{0}': {1}")] + DecodeFailed(String, String), +} diff --git a/src/brokers/mod.rs b/src/brokers/mod.rs new file mode 100644 index 0000000..00edd13 --- /dev/null +++ b/src/brokers/mod.rs @@ -0,0 +1,106 @@ +//! # brokers/mod.rs — Broker Pool Manager +//! +//! Manages the lifecycle of all broker task pools. At IPL, `spawn_r_broker_pool()` +//! reads the instance count from config, spawns N rBroker Tokio tasks, and +//! returns their JoinHandles to the caller. +//! +//! Each broker type gets its own pool function following the same pattern. +//! The pool manager holds handles but does not supervise — task exit is logged +//! by the task itself. Supervision (respawn on crash) is a future addition. +//! +//! ## Calling Agents +//! - `ipl()` in main.rs — calls pool spawn functions after exchange declaration +//! +//! ## Outputs +//! - `Vec>` per broker type — held for clean shutdown +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +pub mod error; +pub mod payload; +pub mod r_broker; +pub mod w_broker; + +use std::sync::Arc; +use lapin::Connection; + +use crate::config::BrokerServicesConfig; +use error::BrokerError; + +/// Spawns the rBroker pool — N tasks as configured in `instances.r_broker`. +/// +/// Each task gets the shared AMQP connection, the queue tag, and its zero-based +/// instance index. The connection is wrapped in `Arc` so each task can open +/// its own channel without cloning the connection. +/// +/// # Arguments +/// +/// * `conn` — the authenticated AMQP connection from IPL step 3b +/// * `cfg` — broker services config block (queue_tag + instance counts) +/// +/// # Returns +/// +/// `Ok(Vec>)` — one handle per spawned task. +/// `Err(BrokerError)` if any task fails to declare its queue before starting. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +pub async fn spawn_r_broker_pool( + conn: Arc, + cfg: &BrokerServicesConfig, +) -> Result>, BrokerError> { + let count = cfg.app_server.instances.r_broker; + let mut handles = Vec::with_capacity(count as usize); + + for i in 0..count { + let handle = r_broker::spawn( + Arc::clone(&conn), + cfg.queue_tag.clone(), + i, + ).await?; + handles.push(handle); + } + + tracing::info!("rBroker pool started: {} instance(s)", count); + Ok(handles) +} + +/// Spawns the wBroker pool — N tasks as configured in `instances.w_broker`. +/// +/// # Arguments +/// +/// * `conn` — the authenticated AMQP connection from IPL step 3b +/// * `cfg` — broker services config block (queue_tag + instance counts) +/// +/// # Returns +/// +/// `Ok(Vec>)` — one handle per spawned task. +/// `Err(BrokerError)` if any task fails to declare its queue before starting. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +pub async fn spawn_w_broker_pool( + conn: Arc, + cfg: &BrokerServicesConfig, +) -> Result>, BrokerError> { + let count = cfg.app_server.instances.w_broker; + let mut handles = Vec::with_capacity(count as usize); + + for i in 0..count { + let handle = w_broker::spawn( + Arc::clone(&conn), + cfg.queue_tag.clone(), + i, + ).await?; + handles.push(handle); + } + + tracing::info!("wBroker pool started: {} instance(s)", count); + Ok(handles) +} diff --git a/src/brokers/payload.rs b/src/brokers/payload.rs new file mode 100644 index 0000000..ef850f1 --- /dev/null +++ b/src/brokers/payload.rs @@ -0,0 +1,108 @@ +//! # brokers/payload.rs — AMQP Message Payload +//! +//! Defines the JSON body structure carried in all BEDS broker messages. +//! The AMQP envelope handles routing (type header, reply_to, correlation_id); +//! this struct is what lives in the message body. +//! +//! ## Wire Format +//! +//! ```json +//! { +//! "template": "usr", +//! "data": { "first_name": "joe", "status": "active" } +//! } +//! ``` +//! +//! `template` names the data object (maps to a NamasteCore implementor). +//! `data` carries key/value pairs in user-facing field names — the template +//! maps these to actual schema names. Callers never specify primary keys on +//! writes; the template generates a GUID and returns it in the reply. +//! +//! ## Calling Agents +//! - `brokers::r_broker` — parsed from message body on fetch events +//! - `brokers::w_broker` — parsed from message body on write/update/delete events +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// The JSON body of every BEDS broker message. +/// +/// # Examples +/// +/// ``` +/// use rustybeds::brokers::payload::BrokerPayload; +/// +/// let json = r#"{"template":"usr","data":{"first_name":"joe"}}"#; +/// let payload: BrokerPayload = serde_json::from_str(json).unwrap(); +/// assert_eq!(payload.template, "usr"); +/// assert!(payload.data.contains_key("first_name")); +/// ``` +/// +/// Both read and write brokers parse this struct from the raw AMQP delivery +/// bytes. The operation type is carried in the AMQP `type` message property — +/// this struct carries the object identity and data payload. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +#[derive(Debug, Deserialize, Serialize)] +pub struct BrokerPayload { + /// Template identifier — names the NamasteCore implementor to dispatch to. + /// Matches the TLA convention from the template file (e.g. `"usr"`, `"pst"`). + pub template: String, + + /// Key/value data pairs in user-facing field names. + /// For writes: the record to store (pkey excluded — generated by template). + /// For reads: query discriminants (field → value to match). + /// For deletes: discriminants identifying the record(s) to remove. + #[serde(default)] + pub data: HashMap, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserializes_full_payload() { + let json = r#"{"template":"usr","data":{"first_name":"joe","status":"active"}}"#; + let payload: BrokerPayload = serde_json::from_str(json).unwrap(); + assert_eq!(payload.template, "usr"); + assert_eq!(payload.data.len(), 2); + assert_eq!(payload.data["first_name"], "joe"); + assert_eq!(payload.data["status"], "active"); + } + + #[test] + fn deserializes_without_data_field() { + // data is optional — fetch by template name alone is valid + let json = r#"{"template":"usr"}"#; + let payload: BrokerPayload = serde_json::from_str(json).unwrap(); + assert_eq!(payload.template, "usr"); + assert!(payload.data.is_empty()); + } + + #[test] + fn serializes_round_trip() { + let json = r#"{"template":"pst","data":{"title":"hello"}}"#; + let payload: BrokerPayload = serde_json::from_str(json).unwrap(); + let serialized = serde_json::to_string(&payload).unwrap(); + let round_trip: BrokerPayload = serde_json::from_str(&serialized).unwrap(); + assert_eq!(round_trip.template, payload.template); + assert_eq!(round_trip.data["title"], payload.data["title"]); + } + + #[test] + fn rejects_missing_template() { + let json = r#"{"data":{"first_name":"joe"}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } +} diff --git a/src/brokers/r_broker.rs b/src/brokers/r_broker.rs new file mode 100644 index 0000000..eb03581 --- /dev/null +++ b/src/brokers/r_broker.rs @@ -0,0 +1,240 @@ +//! # brokers/r_broker.rs — Read Broker Task +//! +//! The rBroker is a Tokio task that handles all non-destructive read events +//! from the AMQP exchange. Each instance declares its queue, binds to the +//! `rec.read` routing key, enters a consume loop, and dispatches incoming +//! events to the appropriate handler. +//! +//! ## Calling Agents +//! - `brokers::mod` — spawns N instances at IPL via `spawn_pool()` +//! +//! ## Inputs +//! - `Arc` — shared AMQP connection from the broker pool +//! - `queue_tag: String` — queue name prefix from config (e.g. "dev_", "prod_") +//! - `instance_id: u32` — numeric ID for log correlation (0-based) +//! +//! ## Outputs +//! - Publishes reply payloads to the `reply_to` queue specified in each message header +//! - Log events to tracing (journald / console per config) +//! +//! ## Event Types (routing key: rec.read) +//! +//! | Event | Description | Status | +//! |------------|--------------------------------------|-------------| +//! | `ping` | Health check — reply ACK + timestamp | Implemented | +//! | `shutdown` | Ordered shutdown — cancel consumer | Implemented | +//! | `fetch` | REC store read operation | Stub | +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +use std::sync::Arc; + +use futures_lite::StreamExt; +use lapin::{ + BasicProperties, Channel, Connection, + options::{ + BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, + QueueBindOptions, QueueDeclareOptions, + }, + types::FieldTable, +}; + +use crate::services::amqp::EXCHANGE_NAME; +use super::error::BrokerError; + +/// Routing key this broker binds to. +const ROUTING_KEY: &str = "rec.read"; + +/// Spawns a single rBroker task and returns immediately. +/// +/// The task runs until it receives a `shutdown` event or the AMQP connection +/// is lost. All log output is tagged with the instance ID for correlation. +/// +/// # Arguments +/// +/// * `conn` — shared AMQP connection; each task opens its own channel +/// * `queue_tag` — queue name prefix from config (e.g. `"dev_"`) +/// * `instance_id` — zero-based index for log correlation +/// +/// # Returns +/// +/// `Ok(tokio::task::JoinHandle)` — the task handle; held by the pool manager. +/// `Err(BrokerError)` if the channel or queue declaration fails before the task starts. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +pub async fn spawn( + conn: Arc, + queue_tag: String, + instance_id: u32, +) -> Result, BrokerError> { + // each broker task owns its own channel — channels are cheap, connections are not + let channel = conn.create_channel().await?; + + let queue_name = format!("{}rec.read", queue_tag); + + // declare the queue — idempotent; safe to call on restart + channel + .queue_declare( + &queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + // bind the queue to the exchange on the rec.read routing key + channel + .queue_bind( + &queue_name, + EXCHANGE_NAME, + ROUTING_KEY, + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("rBroker[{}] queue '{}' declared and bound", instance_id, queue_name); + + let handle = tokio::spawn(async move { + if let Err(e) = run(channel, queue_name, instance_id).await { + tracing::error!("rBroker[{}] exited with error: {}", instance_id, e); + } + }); + + Ok(handle) +} + +/// The rBroker consume loop. +/// +/// Enters `basic_consume` on the declared queue and processes messages until +/// a `shutdown` event is received or the channel closes. Each message is +/// acked after processing regardless of outcome — a failed dispatch is +/// logged, not requeued. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +async fn run( + channel: Channel, + queue_name: String, + instance_id: u32, +) -> Result<(), BrokerError> { + let consumer_tag = format!("rbroker-{}", instance_id); + + let mut consumer = channel + .basic_consume( + &queue_name, + &consumer_tag, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("rBroker[{}] consuming on '{}'", instance_id, queue_name); + + while let Some(delivery) = consumer.next().await { + let delivery = match delivery { + Ok(d) => d, + Err(e) => { + tracing::error!("rBroker[{}] delivery error: {}", instance_id, e); + break; + } + }; + + // extract the event type from the message type header + let event_type = delivery + .properties + .kind() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + + tracing::debug!("rBroker[{}] received event='{}'", instance_id, event_type); + + let reply_payload: Option> = match event_type.as_str() { + "ping" => handle_ping(instance_id), + "shutdown" => { + // ack before exiting so the message is not redelivered + let _ = delivery.ack(BasicAckOptions::default()).await; + tracing::info!("rBroker[{}] shutdown event received — exiting", instance_id); + break; + } + "fetch" => handle_fetch(&delivery.data, instance_id), + unknown => { + tracing::warn!("rBroker[{}] unknown event type '{}'", instance_id, unknown); + None + } + }; + + // publish reply if the event specified a reply_to queue + if let Some(payload) = reply_payload { + if let Some(reply_to) = delivery.properties.reply_to().as_ref() { + let reply_queue = reply_to.as_str().to_string(); + let correlation_id = delivery.properties.correlation_id().clone(); + + let props = BasicProperties::default() + .with_correlation_id(correlation_id.unwrap_or_default()); + + if let Err(e) = channel + .basic_publish( + "", // default exchange — direct to queue by name + &reply_queue, + BasicPublishOptions::default(), + &payload, + props, + ) + .await + { + tracing::error!("rBroker[{}] reply publish failed: {}", instance_id, e); + } + } + } + + let _ = delivery.ack(BasicAckOptions::default()).await; + } + + tracing::info!("rBroker[{}] consume loop exited", instance_id); + Ok(()) +} + +/// Handles a `ping` event — returns a simple ACK payload with a timestamp. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +fn handle_ping(instance_id: u32) -> Option> { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let response = format!(r#"{{"status":"ok","broker":"rBroker","instance":{},"ts":{}}}"#, instance_id, ts); + tracing::debug!("rBroker[{}] ping response: {}", instance_id, response); + Some(response.into_bytes()) +} + +/// Stub handler for `fetch` events. +/// +/// Factory/adapter dispatch is not yet implemented. Returns a not-implemented +/// error payload so callers receive a defined response rather than silence. +/// +/// # History +/// +/// * `2026-04-05` - mks - stub +fn handle_fetch(data: &[u8], instance_id: u32) -> Option> { + tracing::warn!( + "rBroker[{}] fetch event received ({} bytes) — factory dispatch not yet implemented", + instance_id, + data.len() + ); + let response = r#"{"status":"error","code":"NOT_IMPLEMENTED","message":"factory dispatch not yet implemented"}"#; + Some(response.as_bytes().to_vec()) +} diff --git a/src/brokers/w_broker.rs b/src/brokers/w_broker.rs new file mode 100644 index 0000000..994ed6c --- /dev/null +++ b/src/brokers/w_broker.rs @@ -0,0 +1,262 @@ +//! # brokers/w_broker.rs — Write Broker Task +//! +//! The wBroker handles all mutating events on the REC store — writes, updates, +//! and deletes. Each instance declares its queue, binds to the `rec.write` +//! routing key, and dispatches incoming events to the factory layer. +//! +//! ## Calling Agents +//! - `brokers::mod` — spawns N instances at IPL via `spawn_w_broker_pool()` +//! +//! ## Inputs +//! - `Arc` — shared AMQP connection from the broker pool +//! - `queue_tag: String` — queue name prefix from config +//! - `instance_id: u32` — numeric ID for log correlation (0-based) +//! +//! ## Outputs +//! - Publishes reply payloads to the `reply_to` queue in each message header +//! +//! ## Event Types (routing key: rec.write) +//! +//! | Event | Description | Status | +//! |------------|------------------------------------------|-------------| +//! | `ping` | Health check — reply ACK + timestamp | Implemented | +//! | `shutdown` | Ordered shutdown — cancel consumer | Implemented | +//! | `write` | Insert a new REC record (GUID pkey) | Stub | +//! | `update` | Update fields on an existing REC record | Stub | +//! | `delete` | Remove REC record(s) by discriminants | Stub | +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +use std::sync::Arc; + +use futures_lite::StreamExt; +use lapin::{ + BasicProperties, Channel, Connection, + options::{ + BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, + QueueBindOptions, QueueDeclareOptions, + }, + types::FieldTable, +}; + +use crate::services::amqp::EXCHANGE_NAME; +use super::error::BrokerError; + +/// Routing key this broker binds to. +const ROUTING_KEY: &str = "rec.write"; + +/// Spawns a single wBroker task and returns immediately. +/// +/// # Arguments +/// +/// * `conn` — shared AMQP connection; each task opens its own channel +/// * `queue_tag` — queue name prefix from config (e.g. `"dev_"`) +/// * `instance_id` — zero-based index for log correlation +/// +/// # Returns +/// +/// `Ok(tokio::task::JoinHandle)` — the task handle held by the pool manager. +/// `Err(BrokerError)` if channel or queue declaration fails before task start. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +pub async fn spawn( + conn: Arc, + queue_tag: String, + instance_id: u32, +) -> Result, BrokerError> { + let channel = conn.create_channel().await?; + let queue_name = format!("{}rec.write", queue_tag); + + channel + .queue_declare( + &queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + channel + .queue_bind( + &queue_name, + EXCHANGE_NAME, + ROUTING_KEY, + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("wBroker[{}] queue '{}' declared and bound", instance_id, queue_name); + + let handle = tokio::spawn(async move { + if let Err(e) = run(channel, queue_name, instance_id).await { + tracing::error!("wBroker[{}] exited with error: {}", instance_id, e); + } + }); + + Ok(handle) +} + +/// The wBroker consume loop. +/// +/// Mirrors the rBroker pattern. Processes messages until a `shutdown` event +/// is received or the channel closes. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +async fn run( + channel: Channel, + queue_name: String, + instance_id: u32, +) -> Result<(), BrokerError> { + let consumer_tag = format!("wbroker-{}", instance_id); + + let mut consumer = channel + .basic_consume( + &queue_name, + &consumer_tag, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + tracing::info!("wBroker[{}] consuming on '{}'", instance_id, queue_name); + + while let Some(delivery) = consumer.next().await { + let delivery = match delivery { + Ok(d) => d, + Err(e) => { + tracing::error!("wBroker[{}] delivery error: {}", instance_id, e); + break; + } + }; + + let event_type = delivery + .properties + .kind() + .as_ref() + .map(|s| s.as_str().to_string()) + .unwrap_or_default(); + + tracing::debug!("wBroker[{}] received event='{}'", instance_id, event_type); + + let reply_payload: Option> = match event_type.as_str() { + "ping" => handle_ping(instance_id), + "shutdown" => { + let _ = delivery.ack(BasicAckOptions::default()).await; + tracing::info!("wBroker[{}] shutdown event received — exiting", instance_id); + break; + } + "write" => handle_write(&delivery.data, instance_id), + "update" => handle_update(&delivery.data, instance_id), + "delete" => handle_delete(&delivery.data, instance_id), + unknown => { + tracing::warn!("wBroker[{}] unknown event type '{}'", instance_id, unknown); + None + } + }; + + if let Some(payload) = reply_payload { + if let Some(reply_to) = delivery.properties.reply_to().as_ref() { + let reply_queue = reply_to.as_str().to_string(); + let correlation_id = delivery.properties.correlation_id().clone(); + + let props = BasicProperties::default() + .with_correlation_id(correlation_id.unwrap_or_default()); + + if let Err(e) = channel + .basic_publish( + "", + &reply_queue, + BasicPublishOptions::default(), + &payload, + props, + ) + .await + { + tracing::error!("wBroker[{}] reply publish failed: {}", instance_id, e); + } + } + } + + let _ = delivery.ack(BasicAckOptions::default()).await; + } + + tracing::info!("wBroker[{}] consume loop exited", instance_id); + Ok(()) +} + +/// Handles a `ping` health check event. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding +fn handle_ping(instance_id: u32) -> Option> { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let response = format!( + r#"{{"status":"ok","broker":"wBroker","instance":{},"ts":{}}}"#, + instance_id, ts + ); + Some(response.into_bytes()) +} + +/// Stub handler for `write` events. +/// +/// # History +/// +/// * `2026-04-05` - mks - stub +fn handle_write(data: &[u8], instance_id: u32) -> Option> { + tracing::warn!( + "wBroker[{}] write event ({} bytes) — factory dispatch not yet implemented", + instance_id, data.len() + ); + not_implemented_response() +} + +/// Stub handler for `update` events. +/// +/// # History +/// +/// * `2026-04-05` - mks - stub +fn handle_update(data: &[u8], instance_id: u32) -> Option> { + tracing::warn!( + "wBroker[{}] update event ({} bytes) — factory dispatch not yet implemented", + instance_id, data.len() + ); + not_implemented_response() +} + +/// Stub handler for `delete` events. +/// +/// # History +/// +/// * `2026-04-05` - mks - stub +fn handle_delete(data: &[u8], instance_id: u32) -> Option> { + tracing::warn!( + "wBroker[{}] delete event ({} bytes) — factory dispatch not yet implemented", + instance_id, data.len() + ); + not_implemented_response() +} + +/// Standard NOT_IMPLEMENTED reply payload — used by all stub handlers. +fn not_implemented_response() -> Option> { + Some( + r#"{"status":"error","code":"NOT_IMPLEMENTED","message":"factory dispatch not yet implemented"}"# + .as_bytes() + .to_vec(), + ) +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..26103af --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,88 @@ +//! # core/mod.rs — NamasteCore Trait +//! +//! Defines the unified CRUD interface that every BEDS data template must +//! implement. A template is a struct that knows one data domain — its field +//! mappings, indexes, protected fields, and schema-specific constraints. +//! +//! The broker layer is completely schema-agnostic. It receives a template +//! identifier in the message payload, resolves the correct implementor via +//! the factory, and calls the appropriate trait method. The trait method +//! translates user-facing key/value pairs to and from the underlying schema. +//! +//! ## Implementors (future) +//! - One struct per data domain (e.g. `UserTemplate`, `PostTemplate`) +//! - Each struct lives in its own file under `templates/` +//! - Each struct registers itself with the factory by template TLA +//! +//! ## Calling Agents +//! - Factory dispatch (not yet implemented) — resolves template by TLA, +//! calls the appropriate method +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding (stub) + +use std::collections::HashMap; +use serde_json::Value; + +/// The unified CRUD interface for all BEDS data templates. +/// +/// Every data domain in BEDS is a struct implementing this trait. The trait +/// methods accept and return data as `HashMap` — user-facing +/// field names on the way in, the same names on the way out. Schema +/// translation is the trait implementor's responsibility. +/// +/// Primary keys are always GUIDs. Callers never provide a pkey on writes — +/// `write()` generates one and includes it in the returned record map. +/// +/// All methods are async; implementations will await database adapter calls. +/// +/// # History +/// +/// * `2026-04-05` - mks - original coding (stub) +pub trait NamasteCore { + /// Returns the template's TLA identifier (e.g. `"usr"`, `"pst"`). + /// + /// Used by the factory to match incoming template names to implementors. + fn template_id(&self) -> &'static str; + + /// Fetches records matching the given query discriminants. + /// + /// `params` contains user-facing field names mapped to match values. + /// Returns a Vec of record maps, each using user-facing field names. + /// Returns an empty Vec when no records match — not an error. + fn fetch( + &self, + params: HashMap, + ) -> impl std::future::Future>, String>> + Send; + + /// Writes a new record to the data store. + /// + /// `data` contains user-facing field names mapped to values. Protected + /// fields in `data` are silently ignored. The implementation generates + /// a GUID pkey and returns the full written record, including the new pkey. + fn write( + &self, + data: HashMap, + ) -> impl std::future::Future, String>> + Send; + + /// Updates an existing record identified by the discriminants in `params`. + /// + /// `params` identifies the target record(s). `data` contains the fields + /// to update and their new values. Protected fields in `data` are ignored. + fn update( + &self, + params: HashMap, + data: HashMap, + ) -> impl std::future::Future> + Send; + + /// Deletes record(s) matching the given discriminants. + /// + /// Returns the count of records deleted. + fn delete( + &self, + params: HashMap, + ) -> impl std::future::Future> + Send; +} diff --git a/src/lib.rs b/src/lib.rs index 01dc529..780d447 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,10 @@ //! ## History //! * `2026-04-02` - mks - original coding //! * `2026-04-04` - mks - promoted service modules to pub mod services +//! * `2026-04-05` - mks - added brokers and core modules +pub mod brokers; pub mod config; +pub mod core; pub mod logging; pub mod services; diff --git a/src/main.rs b/src/main.rs index 505b152..6aae128 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,9 @@ //! * `2026-04-04` - mks - ipl() made async for AMQP connection; tokio runtime added //! * `2026-04-04` - mks - added AMQP authenticate + exchange declare to IPL sequence +mod brokers; mod config; +mod core; mod logging; mod services; @@ -42,7 +44,8 @@ mod services; /// 3b. Authenticate to RabbitMQ + declare beds.events exchange /// 4. Validate MongoDB reachability (TCP) /// 5. Validate MariaDB reachability (TCP) -/// 6. Node green +/// 6. Spawn broker pools (rBroker) +/// 7. Node green /// /// # Returns /// @@ -53,6 +56,7 @@ mod services; /// /// * `2026-04-02` - mks - original coding /// * `2026-04-04` - mks - made async; added AMQP auth + exchange declare +/// * `2026-04-05` - mks - added rBroker pool spawn (step 6) async fn ipl() -> Result<(), String> { // load configuration — fatal in all environments if this fails let cfg = config::load().map_err(|e| format!("Failed to load config: {}", e))?; @@ -125,6 +129,55 @@ async fn ipl() -> Result<(), String> { } } + // step 6: spawn broker pools — queues are declared here, not at exchange declare time + // rBroker pool requires an authenticated AMQP connection; skip in non-prod if unavailable + let _broker_handles = if let Some(ref conn) = amqp_conn { + use std::sync::Arc; + let shared_conn = Arc::new( + lapin::Connection::connect( + &format!( + "amqp://{}:{}@{}:{}/{}", + cfg.broker_services.app_server.user, + cfg.broker_services.app_server.pass, + cfg.broker_services.app_server.host, + cfg.broker_services.app_server.port, + cfg.broker_services.vhost, + ), + lapin::ConnectionProperties::default(), + ) + .await + .map_err(|e| format!("Broker pool connection failed: {}", e))?, + ); + let _ = conn; // IPL connection stays alive but broker pool owns its own connection + + let r_handles = match brokers::spawn_r_broker_pool(Arc::clone(&shared_conn), &cfg.broker_services).await { + Ok(handles) => handles, + Err(e) => { + if cfg.id.env_name == "production" { + return Err(format!("rBroker pool failed to start: {}", e)); + } + tracing::warn!("rBroker pool failed (non-fatal in {}): {}", cfg.id.env_name, e); + vec![] + } + }; + + let w_handles = match brokers::spawn_w_broker_pool(Arc::clone(&shared_conn), &cfg.broker_services).await { + Ok(handles) => handles, + Err(e) => { + if cfg.id.env_name == "production" { + return Err(format!("wBroker pool failed to start: {}", e)); + } + tracing::warn!("wBroker pool failed (non-fatal in {}): {}", cfg.id.env_name, e); + vec![] + } + }; + + r_handles.into_iter().chain(w_handles).collect() + } else { + tracing::warn!("rBroker pool skipped — no AMQP connection"); + vec![] + }; + tracing::info!("BEDS IPL complete — node green"); Ok(()) } diff --git a/tests/broker_pool_test.rs b/tests/broker_pool_test.rs new file mode 100644 index 0000000..2b135b3 --- /dev/null +++ b/tests/broker_pool_test.rs @@ -0,0 +1,98 @@ +//! # tests/broker_pool_test.rs — Broker Pool Integration Tests +//! +//! Integration tests for rBroker and wBroker pool spawn. These tests require +//! a live RabbitMQ instance at the address in `tests/fixtures/beds_test.toml`. +//! If the broker is unreachable, tests skip gracefully — they do not fail. +//! +//! Run with a live broker: +//! cargo test --test broker_pool_test +//! +//! **Author:** mks +//! **Version:** 1.0 +//! +//! ## History +//! * `2026-04-05` - mks - original coding + +mod common; + +use std::sync::Arc; +use rustybeds::brokers; +use rustybeds::services::amqp::AmqpConnection; + +/// Attempts to connect to the test broker. Returns None if unreachable so +/// tests can skip rather than fail when the broker isn't running. +async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option> { + let uri = format!( + "amqp://{}:{}@{}:{}/{}", + cfg.app_server.user, + cfg.app_server.pass, + cfg.app_server.host, + cfg.app_server.port, + cfg.vhost, + ); + + match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await { + Ok(conn) => Some(Arc::new(conn)), + Err(_) => None, + } +} + +#[tokio::test] +async fn r_broker_pool_spawns_configured_instances() { + let cfg = common::load_test_config(); + + let conn = match try_connect(&cfg.broker_services).await { + Some(c) => c, + None => { + eprintln!("SKIP: RabbitMQ not available at test address"); + return; + } + }; + + // declare exchange first — broker tasks expect it to exist + let amqp = AmqpConnection::connect(&cfg.broker_services).await + .expect("exchange declaration connection failed"); + amqp.declare_exchange().await + .expect("exchange declaration failed"); + + let handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) + .await + .expect("rBroker pool failed to start"); + + // fixture has r_broker = 2 + assert_eq!(handles.len(), cfg.broker_services.app_server.instances.r_broker as usize); + + // abort tasks cleanly — we're just testing spawn, not the consume loop + for h in handles { + h.abort(); + } +} + +#[tokio::test] +async fn w_broker_pool_spawns_configured_instances() { + let cfg = common::load_test_config(); + + let conn = match try_connect(&cfg.broker_services).await { + Some(c) => c, + None => { + eprintln!("SKIP: RabbitMQ not available at test address"); + return; + } + }; + + let amqp = AmqpConnection::connect(&cfg.broker_services).await + .expect("exchange declaration connection failed"); + amqp.declare_exchange().await + .expect("exchange declaration failed"); + + let handles = brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) + .await + .expect("wBroker pool failed to start"); + + // fixture has w_broker = 2 + assert_eq!(handles.len(), cfg.broker_services.app_server.instances.w_broker as usize); + + for h in handles { + h.abort(); + } +}