Promote service modules to services/ directory; add AmqpConnection + async IPL

- Flat src/amqp.rs, src/mongo.rs, src/mariadb.rs promoted to src/services/{amqp,mongo,mariadb}/
- services/amqp/connection.rs: AmqpConnection struct with connect() and declare_exchange()
- services/amqp/error.rs: AmqpError type (thiserror, wraps lapin::Error)
- ipl() made async; #[tokio::main] added to main()
- IPL step 3b: authenticate to RabbitMQ + declare beds.events topic exchange (durable)
- Added lapin = "2" and tokio = { version = "1", features = ["full"] } to Cargo.toml
- 12 unit tests pass
- Docs: README, CLAUDE.md, wiki/04-ipl.md, wiki/06-queue-topology.md updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-04 16:52:18 -07:00
parent 2a9afe7d77
commit e8fdb39ea2
14 changed files with 1419 additions and 126 deletions

View File

@@ -12,9 +12,8 @@
//!
//! ## History
//! * `2026-04-02` - mks - original coding
//! * `2026-04-04` - mks - promoted service modules to pub mod services
pub mod amqp;
pub mod config;
pub mod logging;
pub mod mariadb;
pub mod mongo;
pub mod services;

View File

@@ -20,12 +20,13 @@
//! ## History
//! * `2026-04-02` - mks - original coding
//! * `2026-04-02` - mks - refactored startup sequence into ipl()
//! * `2026-04-04` - mks - promoted service modules to services/ directory
//! * `2026-04-04` - mks - ipl() made async for AMQP connection; tokio runtime added
//! * `2026-04-04` - mks - added AMQP authenticate + exchange declare to IPL sequence
mod amqp;
mod config;
mod logging;
mod mariadb;
mod mongo;
mod services;
/// Executes the BEDS Initial Program Load (IPL) sequence.
///
@@ -37,9 +38,11 @@ mod mongo;
/// ## IPL Sequence
/// 1. Load configuration (beds.toml + env override)
/// 2. Initialize logging
/// 3. Connect to required services (AMQP, store adapters) — not yet implemented
/// 4. Declare queues based on node role — not yet implemented
/// 5. Node green
/// 3. Validate RabbitMQ reachability (TCP)
/// 3b. Authenticate to RabbitMQ + declare beds.events exchange
/// 4. Validate MongoDB reachability (TCP)
/// 5. Validate MariaDB reachability (TCP)
/// 6. Node green
///
/// # Returns
///
@@ -49,7 +52,8 @@ mod mongo;
/// # History
///
/// * `2026-04-02` - mks - original coding
fn ipl() -> Result<(), String> {
/// * `2026-04-04` - mks - made async; added AMQP auth + exchange declare
async fn ipl() -> Result<(), String> {
// load configuration — fatal in all environments if this fails
let cfg = config::load().map_err(|e| format!("Failed to load config: {}", e))?;
@@ -60,8 +64,8 @@ fn ipl() -> Result<(), String> {
tracing::info!("Configuration loaded");
tracing::info!("Logging initialized");
// validate broker reachability — fatal in production, non-fatal in all other envs
match amqp::validate(&cfg.broker_services) {
// step 3: validate broker reachability (TCP) — fast pre-flight before auth
match services::amqp::validate(&cfg.broker_services) {
Ok(()) => tracing::info!("RabbitMQ reachable"),
Err(e) => {
if cfg.id.env_name == "production" {
@@ -71,8 +75,35 @@ fn ipl() -> Result<(), String> {
}
}
// validate MongoDB reachability — fatal in production, non-fatal in all other envs
match mongo::validate_all(&cfg.rec_services) {
// step 3b: authenticate to RabbitMQ and declare the beds.events exchange
let amqp_conn = match services::amqp::AmqpConnection::connect(&cfg.broker_services).await {
Ok(conn) => {
tracing::info!("RabbitMQ authenticated");
Some(conn)
}
Err(e) => {
if cfg.id.env_name == "production" {
return Err(format!("RabbitMQ authentication failed: {}", e));
}
tracing::warn!("RabbitMQ authentication failed (non-fatal in {}): {}", cfg.id.env_name, e);
None
}
};
if let Some(ref conn) = amqp_conn {
match conn.declare_exchange().await {
Ok(()) => tracing::info!("AMQP exchange '{}' declared", services::amqp::EXCHANGE_NAME),
Err(e) => {
if cfg.id.env_name == "production" {
return Err(format!("Exchange declaration failed: {}", e));
}
tracing::warn!("Exchange declaration failed (non-fatal in {}): {}", cfg.id.env_name, e);
}
}
}
// step 4: validate MongoDB reachability — fatal in production, non-fatal in all other envs
match services::mongo::validate_all(&cfg.rec_services) {
Ok(()) => tracing::info!("MongoDB reachable"),
Err(e) => {
if cfg.id.env_name == "production" {
@@ -82,9 +113,9 @@ fn ipl() -> Result<(), String> {
}
}
// validate MariaDB reachability — fatal in production, non-fatal in all other envs
// step 5: validate MariaDB reachability — fatal in production, non-fatal in all other envs
// secondary instance failures are always non-fatal (handled inside validate_all)
match mariadb::validate_all(&cfg.rel_services) {
match services::mariadb::validate_all(&cfg.rel_services) {
Ok(()) => tracing::info!("MariaDB reachable"),
Err(e) => {
if cfg.id.env_name == "production" {
@@ -94,11 +125,13 @@ fn ipl() -> Result<(), String> {
}
}
tracing::info!("BEDS IPL complete — node green");
Ok(())
}
fn main() {
if let Err(e) = ipl() {
#[tokio::main]
async fn main() {
if let Err(e) = ipl().await {
eprintln!("[BEDS] [FATAL] [IPL] {}", e);
std::process::exit(1);
}

View File

@@ -0,0 +1,135 @@
//! # services/amqp/connection.rs — AMQP Connection
//!
//! Defines the `AmqpConnection` struct — the authenticated, channel-holding
//! connection to a RabbitMQ broker. Responsible for establishing the AMQP
//! session and declaring the `beds.events` topic exchange at IPL.
//!
//! ## Calling Agents
//! - `ipl()` in main.rs — calls `AmqpConnection::connect()` then `declare_exchange()`
//! - Broker tasks (future) — hold an `AmqpConnection` for publish/consume operations
//!
//! ## Inputs
//! - `BrokerServicesConfig` — host, port, vhost, credentials, queue_tag from config
//!
//! ## Outputs
//! - `AmqpConnection` — authenticated connection + open channel, ready for use
//! - `AmqpError` on authentication failure or broker unavailability
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-04` - mks - original coding
use lapin::{Channel, Connection, ConnectionProperties, ExchangeKind};
use lapin::options::ExchangeDeclareOptions;
use lapin::types::FieldTable;
use crate::config::BrokerServicesConfig;
use super::error::AmqpError;
/// The name of the BEDS topic exchange. All events in the cluster are
/// published to this exchange and routed by key to their consumers.
pub const EXCHANGE_NAME: &str = "beds.events";
/// An authenticated AMQP connection with an open channel.
///
/// Created during IPL by `AmqpConnection::connect()`. Holds both the
/// underlying `lapin::Connection` (to prevent premature drop) and the
/// `lapin::Channel` used for all subsequent operations.
///
/// The `queue_tag` field carries the configured queue name prefix
/// (e.g. `"prod_"`) so that any queue declarations made through this
/// connection are automatically namespaced to the correct environment.
///
/// # History
///
/// * `2026-04-04` - mks - original coding
pub struct AmqpConnection {
/// Held to keep the underlying TCP connection alive.
/// lapin::Channel holds an internal Arc reference to the connection,
/// but we store it explicitly for clarity and to prevent accidental drop.
_connection: Connection,
/// The open channel used for all AMQP operations on this connection.
pub channel: Channel,
/// Queue name prefix from config — prepended to every queue name to
/// isolate environments on a shared broker. Example: "prod_", "dev_"
pub queue_tag: String,
}
impl AmqpConnection {
/// Connects to the configured RabbitMQ broker and opens a channel.
///
/// Authenticates using the credentials in `cfg.app_server` and connects
/// to the configured vhost. Returns an `AmqpConnection` with an open
/// channel ready for exchange declaration and queue binding.
///
/// # Arguments
///
/// * `cfg` — broker services configuration block from `BedsConfig`
///
/// # Returns
///
/// `Ok(AmqpConnection)` on successful authentication and channel open.
/// `Err(AmqpError)` if the broker rejects the connection or credentials.
///
/// # History
///
/// * `2026-04-04` - mks - original coding
pub async fn connect(cfg: &BrokerServicesConfig) -> Result<Self, AmqpError> {
let uri = format!(
"amqp://{}:{}@{}:{}/{}",
cfg.app_server.user,
cfg.app_server.pass,
cfg.app_server.host,
cfg.app_server.port,
cfg.vhost,
);
let connection = Connection::connect(&uri, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
Ok(Self {
_connection: connection,
channel,
queue_tag: cfg.queue_tag.clone(),
})
}
/// Declares the `beds.events` topic exchange on the broker.
///
/// The exchange is declared durable — it survives broker restarts.
/// If the exchange already exists with identical parameters, this is
/// a no-op. If it exists with different parameters, the broker returns
/// an error.
///
/// This must be called once at IPL before any broker tasks attempt to
/// bind queues or publish events.
///
/// # Returns
///
/// `Ok(())` if the exchange is declared or already exists.
/// `Err(AmqpError)` if the broker rejects the declaration.
///
/// # History
///
/// * `2026-04-04` - mks - original coding
pub async fn declare_exchange(&self) -> Result<(), AmqpError> {
self.channel
.exchange_declare(
EXCHANGE_NAME,
ExchangeKind::Topic,
ExchangeDeclareOptions {
durable: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
Ok(())
}
}

View File

@@ -0,0 +1,30 @@
//! # services/amqp/error.rs — AMQP Error Types
//!
//! Defines the error type for all AMQP operations in the BEDS transport layer.
//!
//! ## Calling Agents
//! - `services::amqp::connection` — returned from connect() and declare_exchange()
//! - `ipl()` in main.rs — maps AmqpError to String for the IPL error chain
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-04` - mks - original coding
/// Errors that can occur in the AMQP transport layer.
///
/// All lapin protocol errors are wrapped transparently via the `From` impl.
/// Additional variants cover BEDS-specific failure modes.
///
/// # History
///
/// * `2026-04-04` - mks - original coding
#[derive(Debug, thiserror::Error)]
pub enum AmqpError {
#[error("AMQP protocol error: {0}")]
Protocol(#[from] lapin::Error),
#[error("Invalid broker address '{0}': {1}")]
InvalidAddress(String, String),
}

View File

@@ -1,45 +1,48 @@
//! # amqp.rs — RabbitMQ Transport Layer
//! # services/amqp/mod.rs — AMQP Transport Module
//!
//! Manages all AMQP interactions for the BEDS node. At IPL, validates that
//! the RabbitMQ broker is reachable before the node proceeds. Future phases
//! will add channel acquisition, queue declaration, and message dispatch.
//! RabbitMQ transport layer. Provides IPL reachability validation and the
//! `AmqpConnection` struct for authenticated broker sessions.
//!
//! ## Calling Agents
//! - `ipl()` in main.rs — calls `validate()` during the IPL sequence
//!
//! ## Inputs
//! - `BrokerServicesConfig` from the loaded BEDS configuration
//!
//! ## Outputs
//! - `Ok(())` if the broker is reachable
//! - `Err(String)` with host:port and OS error if the broker cannot be reached
//! ## Public Surface
//! - `validate()` — TCP reachability check (IPL step 3)
//! - `AmqpConnection` — authenticated connection + channel (IPL step 3b)
//! - `EXCHANGE_NAME` — the canonical `beds.events` exchange name constant
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-02` - mks - original coding
//! * `2026-04-02` - mks - original coding (flat amqp.rs)
//! * `2026-04-04` - mks - promoted to services/amqp/, added AmqpConnection struct
pub mod connection;
pub mod error;
pub use connection::{AmqpConnection, EXCHANGE_NAME};
pub use error::AmqpError; // will be used by broker pool error handling
use std::net::TcpStream;
use std::time::Duration;
use crate::config::BrokerServicesConfig;
/// Validates that the RabbitMQ broker is reachable.
/// Validates that the RabbitMQ broker is reachable via TCP.
///
/// Opens a TCP connection to the configured broker host and port. Does not
/// authenticate or open an AMQP channel — reachability only. The connection
/// is closed immediately after a successful connect.
///
/// Called at IPL step 3 as a fast pre-flight check before the more expensive
/// `AmqpConnection::connect()` authentication step.
///
/// # Arguments
///
/// * `cfg` — broker services configuration block from `BedsConfig`
///
/// # Returns
///
/// `Ok(())` if the TCP handshake succeeds.
/// `Err(String)` with a descriptive message if the broker cannot be reached
/// or if the configured address is malformed.
/// `Ok(())` if the TCP handshake succeeds within 5 seconds.
/// `Err(String)` with a descriptive message if the broker cannot be reached.
///
/// # History
///
@@ -62,7 +65,6 @@ mod tests {
use super::*;
use crate::config::load_from;
/// Loads the test fixture config. Panics if the fixture is missing or malformed.
fn test_cfg() -> crate::config::BedsConfig {
load_from("tests/fixtures/beds_test.toml", "")
.expect("test fixture beds_test.toml failed to load")
@@ -70,8 +72,6 @@ mod tests {
#[test]
fn validate_err_on_closed_port() {
// port 1 is reserved and always closed — guarantees a connection refusal
// without requiring any live service
let mut cfg = test_cfg();
cfg.broker_services.app_server.port = 1;
assert!(validate(&cfg.broker_services).is_err());

View File

@@ -1,12 +1,9 @@
//! # mariadb.rs — MariaDB (REL) Transport Layer
//! # services/mariadb/mod.rs — MariaDB (REL) Transport Module
//!
//! Manages all MariaDB interactions for the BEDS node. At IPL, validates that
//! the master instance of each configured REL service node is reachable before
//! the node proceeds. The secondary instance is optional — its absence or
//! unreachability is logged but never fatal.
//!
//! Future phases will add connection pooling, authentication, and query
//! dispatch via the adapter layer.
//! MariaDB transport layer. Provides IPL reachability validation for all
//! configured REL service nodes. Master failure is fatal; secondary failure
//! is always non-fatal. Future phases will add the MariaDbConnection struct
//! for authenticated sessions and query dispatch.
//!
//! ## Calling Agents
//! - `ipl()` in main.rs — calls `validate_all()` during the IPL sequence
@@ -22,7 +19,8 @@
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-04` - mks - original coding
//! * `2026-04-04` - mks - original coding (flat mariadb.rs)
//! * `2026-04-04` - mks - promoted to services/mariadb/
use std::collections::HashMap;
use std::net::TcpStream;
@@ -51,10 +49,8 @@ use crate::config::{RelInstanceConfig, RelNodeConfig};
/// * `2026-04-04` - mks - original coding
pub fn validate_all(nodes: &HashMap<String, RelNodeConfig>) -> Result<(), String> {
for (name, node) in nodes {
// master is required — failure is propagated to the caller
validate(&format!("{}.master", name), &node.master)?;
// secondary is optional — log absence but do not fail
if let Some(secondary) = &node.secondary {
if let Err(e) = validate(&format!("{}.secondary", name), secondary) {
tracing::warn!("MariaDB secondary unreachable (non-fatal): {}", e);

20
src/services/mod.rs Normal file
View File

@@ -0,0 +1,20 @@
//! # services/mod.rs — External Service Transports
//!
//! Groups all external service transport modules. Each submodule owns the
//! TCP/protocol-level connection lifecycle for one service type — from IPL
//! reachability validation through to authenticated connection pooling.
//!
//! ## Submodules
//! - `amqp` — RabbitMQ transport (AMQP 0-9-1 via lapin)
//! - `mongo` — MongoDB transport (REC document store)
//! - `mariadb` — MariaDB transport (REL relational store)
//!
//! **Author:** mks
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-04` - mks - original coding — promoted from flat src/ files
pub mod amqp;
pub mod mariadb;
pub mod mongo;

View File

@@ -1,9 +1,8 @@
//! # mongo.rs — MongoDB (REC) Transport Layer
//! # services/mongo/mod.rs — MongoDB (REC) Transport Module
//!
//! Manages all MongoDB interactions for the BEDS node. At IPL, validates that
//! each configured REC service node is reachable before the node proceeds.
//! Future phases will add connection pooling, authentication, and collection
//! access via the adapter layer.
//! MongoDB transport layer. Provides IPL reachability validation for all
//! configured REC service nodes. Future phases will add the MongoConnection
//! struct for authenticated sessions and collection access.
//!
//! ## Calling Agents
//! - `ipl()` in main.rs — calls `validate_all()` during the IPL sequence
@@ -19,7 +18,8 @@
//! **Version:** 1.0
//!
//! ## History
//! * `2026-04-04` - mks - original coding
//! * `2026-04-04` - mks - original coding (flat mongo.rs)
//! * `2026-04-04` - mks - promoted to services/mongo/
use std::collections::HashMap;
use std::net::TcpStream;