milestone: AI switchover baseline (0.1.x)
This commit is contained in:
156
tests/broker_message_flow_test.rs
Normal file
156
tests/broker_message_flow_test.rs
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
//! # tests/broker_message_flow_test.rs — Broker Message Flow Smoke Tests
|
||||||
|
//!
|
||||||
|
//! Validates end-to-end AMQP message flow for the two current POC brokers:
|
||||||
|
//! rBroker (`rec.read`) and wBroker (`rec.write`).
|
||||||
|
//!
|
||||||
|
//! These tests require a live RabbitMQ instance at the address in
|
||||||
|
//! `tests/fixtures/beds_test.toml`. If the broker is unreachable, tests skip
|
||||||
|
//! gracefully.
|
||||||
|
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use futures_lite::StreamExt;
|
||||||
|
use lapin::{
|
||||||
|
options::{
|
||||||
|
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
|
||||||
|
},
|
||||||
|
types::FieldTable,
|
||||||
|
BasicProperties,
|
||||||
|
};
|
||||||
|
use rustybeds::brokers;
|
||||||
|
use rustybeds::services::amqp::{AmqpConnection, EXCHANGE_NAME};
|
||||||
|
|
||||||
|
/// Attempts to connect to the test broker. Returns None if unreachable so
|
||||||
|
/// tests can skip rather than fail when RabbitMQ is not running locally.
|
||||||
|
async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option<Arc<lapin::Connection>> {
|
||||||
|
let uri = format!(
|
||||||
|
"amqp://{}:{}@{}:{}/{}",
|
||||||
|
cfg.app_server.user,
|
||||||
|
cfg.app_server.pass,
|
||||||
|
cfg.app_server.host,
|
||||||
|
cfg.app_server.port,
|
||||||
|
cfg.vhost,
|
||||||
|
);
|
||||||
|
|
||||||
|
match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await {
|
||||||
|
Ok(conn) => Some(Arc::new(conn)),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publishes a ping event to the given routing key and verifies that a reply
|
||||||
|
/// arrives with expected broker metadata.
|
||||||
|
async fn ping_round_trip(channel: &lapin::Channel, routing_key: &str, expected_broker: &str) {
|
||||||
|
let reply_queue = channel
|
||||||
|
.queue_declare(
|
||||||
|
"",
|
||||||
|
QueueDeclareOptions {
|
||||||
|
exclusive: true,
|
||||||
|
auto_delete: true,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
FieldTable::default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("failed to declare reply queue")
|
||||||
|
.name()
|
||||||
|
.as_str()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let mut consumer = channel
|
||||||
|
.basic_consume(
|
||||||
|
&reply_queue,
|
||||||
|
"message-flow-test",
|
||||||
|
BasicConsumeOptions::default(),
|
||||||
|
FieldTable::default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("failed to start reply consumer");
|
||||||
|
|
||||||
|
let correlation_id = format!("ping-{}", expected_broker);
|
||||||
|
|
||||||
|
let props = BasicProperties::default()
|
||||||
|
.with_type("ping".into())
|
||||||
|
.with_reply_to(reply_queue.clone().into())
|
||||||
|
.with_correlation_id(correlation_id.clone().into());
|
||||||
|
|
||||||
|
channel
|
||||||
|
.basic_publish(
|
||||||
|
EXCHANGE_NAME,
|
||||||
|
routing_key,
|
||||||
|
BasicPublishOptions::default(),
|
||||||
|
b"{}",
|
||||||
|
props,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("publish failed")
|
||||||
|
.await
|
||||||
|
.expect("publish confirm failed");
|
||||||
|
|
||||||
|
let delivery = tokio::time::timeout(Duration::from_secs(5), consumer.next())
|
||||||
|
.await
|
||||||
|
.expect("timed out waiting for broker reply")
|
||||||
|
.expect("reply consumer ended unexpectedly")
|
||||||
|
.expect("reply delivery error");
|
||||||
|
|
||||||
|
let payload: serde_json::Value =
|
||||||
|
serde_json::from_slice(&delivery.data).expect("reply payload is not valid JSON");
|
||||||
|
|
||||||
|
assert_eq!(payload["status"], "ok");
|
||||||
|
assert_eq!(payload["broker"], expected_broker);
|
||||||
|
|
||||||
|
if let Some(cid) = delivery.properties.correlation_id().as_ref() {
|
||||||
|
assert_eq!(cid.as_str(), correlation_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
delivery
|
||||||
|
.ack(BasicAckOptions::default())
|
||||||
|
.await
|
||||||
|
.expect("failed to ack reply message");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn r_and_w_brokers_process_ping_events() {
|
||||||
|
let cfg = common::load_test_config();
|
||||||
|
|
||||||
|
let conn = match try_connect(&cfg.broker_services).await {
|
||||||
|
Some(c) => c,
|
||||||
|
None => {
|
||||||
|
eprintln!("SKIP: RabbitMQ not available at test address");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// The brokers require the exchange to exist before queue binding.
|
||||||
|
let amqp = AmqpConnection::connect(&cfg.broker_services)
|
||||||
|
.await
|
||||||
|
.expect("exchange declaration connection failed");
|
||||||
|
amqp.declare_exchange()
|
||||||
|
.await
|
||||||
|
.expect("exchange declaration failed");
|
||||||
|
|
||||||
|
let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services)
|
||||||
|
.await
|
||||||
|
.expect("rBroker pool failed to start");
|
||||||
|
|
||||||
|
handles.extend(
|
||||||
|
brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services)
|
||||||
|
.await
|
||||||
|
.expect("wBroker pool failed to start"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let test_channel = conn
|
||||||
|
.create_channel()
|
||||||
|
.await
|
||||||
|
.expect("failed to create test channel");
|
||||||
|
|
||||||
|
ping_round_trip(&test_channel, "rec.read", "rBroker").await;
|
||||||
|
ping_round_trip(&test_channel, "rec.write", "wBroker").await;
|
||||||
|
|
||||||
|
for h in handles {
|
||||||
|
h.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -151,3 +151,14 @@ vhost: dev ← development traffic
|
|||||||
Even if all three environments share one RabbitMQ instance, they are fully isolated. A message published to `prod` cannot be consumed by a `dev` consumer.
|
Even if all three environments share one RabbitMQ instance, they are fully isolated. A message published to `prod` cannot be consumed by a `dev` consumer.
|
||||||
|
|
||||||
This was the operational pattern in the Namaste homelab — one RabbitMQ instance, three vhosts, multiple concurrent dev sessions running without interfering with each other.
|
This was the operational pattern in the Namaste homelab — one RabbitMQ instance, three vhosts, multiple concurrent dev sessions running without interfering with each other.
|
||||||
|
|
||||||
|
## POC Verification
|
||||||
|
|
||||||
|
Current proof-of-concept verification for the two active appServer brokers is covered by integration tests:
|
||||||
|
|
||||||
|
- `tests/broker_pool_test.rs` validates that configured rBroker/wBroker pool instances spawn.
|
||||||
|
- `tests/broker_message_flow_test.rs` validates end-to-end message flow by publishing `ping` events to
|
||||||
|
`rec.read` and `rec.write` and asserting broker replies.
|
||||||
|
|
||||||
|
These tests provide a lightweight deployment confidence check while the framework is still in the
|
||||||
|
"POC before guardrails" phase.
|
||||||
|
|||||||
Reference in New Issue
Block a user