From de1b284ab25a96ca6ffc9de7faab6a8481befcde Mon Sep 17 00:00:00 2001 From: gramps Date: Mon, 6 Apr 2026 11:28:27 -0700 Subject: [PATCH] milestone: AI switchover baseline (0.1.x) --- tests/broker_message_flow_test.rs | 156 ++++++++++++++++++++++++++++++ wiki/06-queue-topology.md | 11 +++ 2 files changed, 167 insertions(+) create mode 100644 tests/broker_message_flow_test.rs diff --git a/tests/broker_message_flow_test.rs b/tests/broker_message_flow_test.rs new file mode 100644 index 0000000..dc7741a --- /dev/null +++ b/tests/broker_message_flow_test.rs @@ -0,0 +1,156 @@ +//! # tests/broker_message_flow_test.rs — Broker Message Flow Smoke Tests +//! +//! Validates end-to-end AMQP message flow for the two current POC brokers: +//! rBroker (`rec.read`) and wBroker (`rec.write`). +//! +//! These tests require a live RabbitMQ instance at the address in +//! `tests/fixtures/beds_test.toml`. If the broker is unreachable, tests skip +//! gracefully. + +mod common; + +use std::sync::Arc; +use std::time::Duration; + +use futures_lite::StreamExt; +use lapin::{ + options::{ + BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions, + }, + types::FieldTable, + BasicProperties, +}; +use rustybeds::brokers; +use rustybeds::services::amqp::{AmqpConnection, EXCHANGE_NAME}; + +/// Attempts to connect to the test broker. Returns None if unreachable so +/// tests can skip rather than fail when RabbitMQ is not running locally. +async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option> { + let uri = format!( + "amqp://{}:{}@{}:{}/{}", + cfg.app_server.user, + cfg.app_server.pass, + cfg.app_server.host, + cfg.app_server.port, + cfg.vhost, + ); + + match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await { + Ok(conn) => Some(Arc::new(conn)), + Err(_) => None, + } +} + +/// Publishes a ping event to the given routing key and verifies that a reply +/// arrives with expected broker metadata. +async fn ping_round_trip(channel: &lapin::Channel, routing_key: &str, expected_broker: &str) { + let reply_queue = channel + .queue_declare( + "", + QueueDeclareOptions { + exclusive: true, + auto_delete: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await + .expect("failed to declare reply queue") + .name() + .as_str() + .to_string(); + + let mut consumer = channel + .basic_consume( + &reply_queue, + "message-flow-test", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("failed to start reply consumer"); + + let correlation_id = format!("ping-{}", expected_broker); + + let props = BasicProperties::default() + .with_type("ping".into()) + .with_reply_to(reply_queue.clone().into()) + .with_correlation_id(correlation_id.clone().into()); + + channel + .basic_publish( + EXCHANGE_NAME, + routing_key, + BasicPublishOptions::default(), + b"{}", + props, + ) + .await + .expect("publish failed") + .await + .expect("publish confirm failed"); + + let delivery = tokio::time::timeout(Duration::from_secs(5), consumer.next()) + .await + .expect("timed out waiting for broker reply") + .expect("reply consumer ended unexpectedly") + .expect("reply delivery error"); + + let payload: serde_json::Value = + serde_json::from_slice(&delivery.data).expect("reply payload is not valid JSON"); + + assert_eq!(payload["status"], "ok"); + assert_eq!(payload["broker"], expected_broker); + + if let Some(cid) = delivery.properties.correlation_id().as_ref() { + assert_eq!(cid.as_str(), correlation_id); + } + + delivery + .ack(BasicAckOptions::default()) + .await + .expect("failed to ack reply message"); +} + +#[tokio::test] +async fn r_and_w_brokers_process_ping_events() { + let cfg = common::load_test_config(); + + let conn = match try_connect(&cfg.broker_services).await { + Some(c) => c, + None => { + eprintln!("SKIP: RabbitMQ not available at test address"); + return; + } + }; + + // The brokers require the exchange to exist before queue binding. + let amqp = AmqpConnection::connect(&cfg.broker_services) + .await + .expect("exchange declaration connection failed"); + amqp.declare_exchange() + .await + .expect("exchange declaration failed"); + + let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) + .await + .expect("rBroker pool failed to start"); + + handles.extend( + brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) + .await + .expect("wBroker pool failed to start"), + ); + + let test_channel = conn + .create_channel() + .await + .expect("failed to create test channel"); + + ping_round_trip(&test_channel, "rec.read", "rBroker").await; + ping_round_trip(&test_channel, "rec.write", "wBroker").await; + + for h in handles { + h.abort(); + } +} diff --git a/wiki/06-queue-topology.md b/wiki/06-queue-topology.md index 489ea82..6f03674 100644 --- a/wiki/06-queue-topology.md +++ b/wiki/06-queue-topology.md @@ -151,3 +151,14 @@ vhost: dev ← development traffic Even if all three environments share one RabbitMQ instance, they are fully isolated. A message published to `prod` cannot be consumed by a `dev` consumer. This was the operational pattern in the Namaste homelab — one RabbitMQ instance, three vhosts, multiple concurrent dev sessions running without interfering with each other. + +## POC Verification + +Current proof-of-concept verification for the two active appServer brokers is covered by integration tests: + +- `tests/broker_pool_test.rs` validates that configured rBroker/wBroker pool instances spawn. +- `tests/broker_message_flow_test.rs` validates end-to-end message flow by publishing `ping` events to + `rec.read` and `rec.write` and asserting broker replies. + +These tests provide a lightweight deployment confidence check while the framework is still in the +"POC before guardrails" phase.