//! # tests/broker_message_flow_test.rs — Broker Message Flow Smoke Tests //! //! Validates end-to-end AMQP message flow for the two current POC brokers: //! rBroker (`rec.read`) and wBroker (`rec.write`). //! //! These tests require a live RabbitMQ instance at the address in //! `tests/fixtures/beds_test.toml`. If the broker is unreachable, tests skip //! gracefully. mod common; use std::sync::Arc; use std::time::Duration; use futures_lite::StreamExt; use lapin::{ options::{ BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions, }, types::FieldTable, BasicProperties, }; use rustybeds::brokers; use rustybeds::services::amqp::{AmqpConnection, EXCHANGE_NAME}; /// Attempts to connect to the test broker. Returns None if unreachable so /// tests can skip rather than fail when RabbitMQ is not running locally. async fn try_connect(cfg: &rustybeds::config::BrokerServicesConfig) -> Option> { let uri = format!( "amqp://{}:{}@{}:{}/{}", cfg.app_server.user, cfg.app_server.pass, cfg.app_server.host, cfg.app_server.port, cfg.vhost, ); match lapin::Connection::connect(&uri, lapin::ConnectionProperties::default()).await { Ok(conn) => Some(Arc::new(conn)), Err(_) => None, } } /// Publishes a ping event to the given routing key and verifies that a reply /// arrives with expected broker metadata. async fn ping_round_trip(channel: &lapin::Channel, routing_key: &str, expected_broker: &str) { let reply_queue = channel .queue_declare( "", QueueDeclareOptions { exclusive: true, auto_delete: true, ..Default::default() }, FieldTable::default(), ) .await .expect("failed to declare reply queue") .name() .as_str() .to_string(); let mut consumer = channel .basic_consume( &reply_queue, "message-flow-test", BasicConsumeOptions::default(), FieldTable::default(), ) .await .expect("failed to start reply consumer"); let correlation_id = format!("ping-{}", expected_broker); let props = BasicProperties::default() .with_type("ping".into()) .with_reply_to(reply_queue.clone().into()) .with_correlation_id(correlation_id.clone().into()); channel .basic_publish( EXCHANGE_NAME, routing_key, BasicPublishOptions::default(), b"{}", props, ) .await .expect("publish failed") .await .expect("publish confirm failed"); let delivery = tokio::time::timeout(Duration::from_secs(5), consumer.next()) .await .expect("timed out waiting for broker reply") .expect("reply consumer ended unexpectedly") .expect("reply delivery error"); let payload: serde_json::Value = serde_json::from_slice(&delivery.data).expect("reply payload is not valid JSON"); assert_eq!(payload["status"], "ok"); assert_eq!(payload["broker"], expected_broker); if let Some(cid) = delivery.properties.correlation_id().as_ref() { assert_eq!(cid.as_str(), correlation_id); } delivery .ack(BasicAckOptions::default()) .await .expect("failed to ack reply message"); } async fn request_reply_json( channel: &lapin::Channel, routing_key: &str, event_type: &str, body: serde_json::Value, ) -> serde_json::Value { let reply_queue = channel .queue_declare( "", QueueDeclareOptions { exclusive: true, auto_delete: true, ..Default::default() }, FieldTable::default(), ) .await .expect("failed to declare reply queue") .name() .as_str() .to_string(); let mut consumer = channel .basic_consume( &reply_queue, "message-flow-test-rpc", BasicConsumeOptions::default(), FieldTable::default(), ) .await .expect("failed to start reply consumer"); let correlation_id = format!("{}-{}", event_type, routing_key); let props = BasicProperties::default() .with_type(event_type.into()) .with_reply_to(reply_queue.clone().into()) .with_correlation_id(correlation_id.clone().into()); let body_bytes = serde_json::to_vec(&body).expect("failed to serialize request body"); channel .basic_publish( EXCHANGE_NAME, routing_key, BasicPublishOptions::default(), &body_bytes, props, ) .await .expect("publish failed") .await .expect("publish confirm failed"); let delivery = tokio::time::timeout(Duration::from_secs(5), consumer.next()) .await .expect("timed out waiting for broker reply") .expect("reply consumer ended unexpectedly") .expect("reply delivery error"); let payload: serde_json::Value = serde_json::from_slice(&delivery.data).expect("reply payload is not valid JSON"); if let Some(cid) = delivery.properties.correlation_id().as_ref() { assert_eq!(cid.as_str(), correlation_id); } delivery .ack(BasicAckOptions::default()) .await .expect("failed to ack reply message"); payload } #[tokio::test] async fn r_and_w_brokers_process_ping_events() { let cfg = common::load_test_config(); let conn = match try_connect(&cfg.broker_services).await { Some(c) => c, None => { eprintln!("SKIP: RabbitMQ not available at test address"); return; } }; // The brokers require the exchange to exist before queue binding. let amqp = AmqpConnection::connect(&cfg.broker_services) .await .expect("exchange declaration connection failed"); amqp.declare_exchange() .await .expect("exchange declaration failed"); let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("rBroker pool failed to start"); handles.extend( brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("wBroker pool failed to start"), ); let test_channel = conn .create_channel() .await .expect("failed to create test channel"); ping_round_trip(&test_channel, "rec.read", "rBroker").await; ping_round_trip(&test_channel, "rec.write", "wBroker").await; for h in handles { h.abort(); } } #[tokio::test] async fn logger_write_then_fetch_round_trip() { let cfg = common::load_test_config(); if let Err(e) = rustybeds::brokers::logger_store::init_from_rec_services(&cfg.rec_services, &cfg.id.env_name).await { eprintln!("SKIP: Mongo logger store unavailable: {}", e); return; } let conn = match try_connect(&cfg.broker_services).await { Some(c) => c, None => { eprintln!("SKIP: RabbitMQ not available at test address"); return; } }; let amqp = AmqpConnection::connect(&cfg.broker_services) .await .expect("exchange declaration connection failed"); amqp.declare_exchange() .await .expect("exchange declaration failed"); let mut handles = brokers::spawn_r_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("rBroker pool failed to start"); handles.extend( brokers::spawn_w_broker_pool(Arc::clone(&conn), &cfg.broker_services) .await .expect("wBroker pool failed to start"), ); let test_channel = conn .create_channel() .await .expect("failed to create test channel"); let write_request = serde_json::json!({ "template": "Logger", "data": { "message_log": "poc-log-message", "level_log": "info", "service_log": "app_server" } }); let write_reply = request_reply_json(&test_channel, "rec.write", "write", write_request).await; assert_eq!(write_reply["status"], "ok"); assert_eq!(write_reply["code"], "LOGGER_WRITE"); let fetch_request = serde_json::json!({ "template": "Logger", "data": { "limit": 10 } }); let fetch_reply = request_reply_json(&test_channel, "rec.read", "fetch", fetch_request).await; assert_eq!(fetch_reply["status"], "ok"); assert_eq!(fetch_reply["code"], "LOGGER_FETCH"); let logs = fetch_reply["logs"].as_array().expect("logs must be an array"); assert!( logs.iter().any(|v| v["message_log"] == "poc-log-message"), "fetched logs should include the message just written" ); for h in handles { h.abort(); } }