114 lines
3.6 KiB
Rust
114 lines
3.6 KiB
Rust
//! # tests/broker_pool_test.rs — Broker Pool Integration Tests
|
|
//!
|
|
//! Integration tests for rBroker and wBroker pool spawn. These tests require
|
|
//! a live RabbitMQ instance at the address in `tests/fixtures/beds_test.toml`.
|
|
//! If the broker is unreachable, tests skip gracefully — they do not fail.
|
|
//!
|
|
//! Run with a live broker:
|
|
//! cargo test --test broker_pool_test
|
|
//!
|
|
//! **Author:** mks
|
|
//! **Version:** 1.0
|
|
//!
|
|
//! ## History
|
|
//! * `2026-04-05` - mks - original coding
|
|
|
|
mod common;
|
|
|
|
use std::sync::Arc;
|
|
use rustybeds::brokers;
|
|
use rustybeds::services::amqp::AmqpConnection;
|
|
use rustybeds::template_registry::{RuntimeTemplateRegistry, load_runtime_rec_templates};
|
|
|
|
/// Attempts to connect to the test broker. Returns None if unreachable so
|
|
/// tests can skip rather than fail when the broker isn't running.
|
|
async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option<Arc<lapin::Connection>> {
|
|
let uri = format!(
|
|
"amqp://{}:{}@{}:{}/{}",
|
|
cfg.app_server.user,
|
|
cfg.app_server.pass,
|
|
cfg.app_server.host,
|
|
cfg.app_server.port,
|
|
cfg.vhost,
|
|
);
|
|
|
|
match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await {
|
|
Ok(conn) => Some(Arc::new(conn)),
|
|
Err(_) => None,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn r_broker_pool_spawns_configured_instances() {
|
|
let cfg = common::load_test_config();
|
|
let template_registry = std::sync::Arc::new(RuntimeTemplateRegistry::from_templates(
|
|
load_runtime_rec_templates("templates").expect("template load failed"),
|
|
));
|
|
|
|
let conn = match try_connect(&cfg.broker_services).await {
|
|
Some(c) => c,
|
|
None => {
|
|
eprintln!("SKIP: RabbitMQ not available at test address");
|
|
return;
|
|
}
|
|
};
|
|
|
|
// declare exchange first — broker tasks expect it to exist
|
|
let amqp = AmqpConnection::connect(&cfg.broker_services).await
|
|
.expect("exchange declaration connection failed");
|
|
amqp.declare_exchange().await
|
|
.expect("exchange declaration failed");
|
|
|
|
let handles = brokers::spawn_r_broker_pool(
|
|
Arc::clone(&conn),
|
|
&cfg.broker_services,
|
|
std::sync::Arc::clone(&template_registry),
|
|
)
|
|
.await
|
|
.expect("rBroker pool failed to start");
|
|
|
|
// fixture has r_broker = 2
|
|
assert_eq!(handles.len(), cfg.broker_services.app_server.instances.r_broker as usize);
|
|
|
|
// abort tasks cleanly — we're just testing spawn, not the consume loop
|
|
for h in handles {
|
|
h.abort();
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn w_broker_pool_spawns_configured_instances() {
|
|
let cfg = common::load_test_config();
|
|
let template_registry = std::sync::Arc::new(RuntimeTemplateRegistry::from_templates(
|
|
load_runtime_rec_templates("templates").expect("template load failed"),
|
|
));
|
|
|
|
let conn = match try_connect(&cfg.broker_services).await {
|
|
Some(c) => c,
|
|
None => {
|
|
eprintln!("SKIP: RabbitMQ not available at test address");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let amqp = AmqpConnection::connect(&cfg.broker_services).await
|
|
.expect("exchange declaration connection failed");
|
|
amqp.declare_exchange().await
|
|
.expect("exchange declaration failed");
|
|
|
|
let handles = brokers::spawn_w_broker_pool(
|
|
Arc::clone(&conn),
|
|
&cfg.broker_services,
|
|
std::sync::Arc::clone(&template_registry),
|
|
)
|
|
.await
|
|
.expect("wBroker pool failed to start");
|
|
|
|
// fixture has w_broker = 2
|
|
assert_eq!(handles.len(), cfg.broker_services.app_server.instances.w_broker as usize);
|
|
|
|
for h in handles {
|
|
h.abort();
|
|
}
|
|
}
|