//! # tests/broker_pool_test.rs — Broker Pool Integration Tests //! //! Integration tests for rBroker and wBroker pool spawn. These tests require //! a live RabbitMQ instance at the address in `tests/fixtures/beds_test.toml`. //! If the broker is unreachable, tests skip gracefully — they do not fail. //! //! Run with a live broker: //! cargo test --test broker_pool_test //! //! **Author:** mks //! **Version:** 1.0 //! //! ## History //! * `2026-04-05` - mks - original coding mod common; use std::sync::Arc; use rustybeds::brokers; use rustybeds::services::amqp::AmqpConnection; /// Attempts to connect to the test broker. Returns None if unreachable so /// tests can skip rather than fail when the broker isn't running. async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option> { let uri = format!( "amqp://{}:{}@{}:{}/{}", cfg.app_server.user, cfg.app_server.pass, cfg.app_server.host, cfg.app_server.port, cfg.vhost, ); match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await { Ok(conn) => Some(Arc::new(conn)), Err(_) => None, } } #[tokio::test] async fn r_broker_pool_spawns_configured_instances() { let cfg = common::load_test_config(); let conn = match try_connect(&cfg.broker_services).await { Some(c) => c, None => { eprintln!("SKIP: RabbitMQ not available at test address"); return; } }; // declare exchange first — broker tasks expect it to exist let amqp = AmqpConnection::connect(&cfg.broker_services).await .expect("exchange declaration connection failed"); amqp.declare_exchange().await .expect("exchange declaration failed"); let handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("rBroker pool failed to start"); // fixture has r_broker = 2 assert_eq!(handles.len(), cfg.broker_services.app_server.instances.r_broker as usize); // abort tasks cleanly — we're just testing spawn, not the consume loop for h in handles { h.abort(); } } #[tokio::test] async fn w_broker_pool_spawns_configured_instances() { let cfg = common::load_test_config(); let conn = match try_connect(&cfg.broker_services).await { Some(c) => c, None => { eprintln!("SKIP: RabbitMQ not available at test address"); return; } }; let amqp = AmqpConnection::connect(&cfg.broker_services).await .expect("exchange declaration connection failed"); amqp.declare_exchange().await .expect("exchange declaration failed"); let handles = brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("wBroker pool failed to start"); // fixture has w_broker = 2 assert_eq!(handles.len(), cfg.broker_services.app_server.instances.w_broker as usize); for h in handles { h.abort(); } }