Add MongoDB reachability validation to IPL sequence
- Add rec_services config section to beds.toml and test fixture - Add RecNodeConfig struct; export from config module - Add mongo::validate() and validate_all() — TCP ping per configured REC node - Wire mongo::validate_all() into ipl() with env-aware error handling - Add mongodb crate dependency (sync feature) - Add unit tests for mongo validate error paths (closed port, bad address) - Update README status table and project structure Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
1750
Cargo.lock
generated
1750
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -9,4 +9,5 @@ config = "0.14"
|
||||
thiserror = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"]}
|
||||
tracing-journald = "0.3"
|
||||
tracing-journald = "0.3"
|
||||
mongodb = { version = "3", features = ["sync"] }
|
||||
@@ -88,6 +88,7 @@ rustybeds/
|
||||
│ │ ├── mod.rs # Loader — load() and load_from() for testability
|
||||
│ │ └── structs.rs # Typed config structs (serde Deserialize)
|
||||
│ ├── amqp.rs # RabbitMQ transport — validate(), future channel/queue ops
|
||||
│ ├── mongo.rs # MongoDB transport — validate_all(), future adapter ops
|
||||
│ ├── lib.rs # Public API surface for integration test harness
|
||||
│ ├── logging.rs # tracing + journald init
|
||||
│ └── main.rs # ipl() sequence + main()
|
||||
@@ -137,7 +138,7 @@ The `config` crate deep-merges these at startup. Only keys present in the env fi
|
||||
| IPL sequence with env-aware error handling | Done |
|
||||
| RabbitMQ reachability validation | Done |
|
||||
| Unit test scaffolding + config fixture pattern | Done |
|
||||
| MongoDB reachability validation | Next |
|
||||
| MongoDB reachability validation | Done |
|
||||
| MariaDB reachability validation | Next |
|
||||
| Shared filesystem validation | Next |
|
||||
| AMQP channel / queue declaration | Planned |
|
||||
|
||||
@@ -32,6 +32,7 @@ rustybeds/
|
||||
│ │ ├── mod.rs # load() + load_from() — layered TOML config
|
||||
│ │ └── structs.rs # Typed config structs (serde Deserialize)
|
||||
│ ├── amqp.rs # RabbitMQ transport — validate(), future channel/queue ops
|
||||
│ ├── mongo.rs # MongoDB transport — validate_all(), future adapter ops
|
||||
│ ├── lib.rs # Public API surface for integration test harness
|
||||
│ ├── logging.rs # tracing + journald + console mirror init
|
||||
│ └── main.rs # ipl() sequence + main()
|
||||
|
||||
@@ -30,4 +30,12 @@ rpi = 50
|
||||
[broker_services.app_server.instances]
|
||||
r_broker = 2
|
||||
w_broker = 2
|
||||
m_broker = 0
|
||||
m_broker = 0
|
||||
|
||||
[rec_services.app_server]
|
||||
host = "localhost"
|
||||
port = 27017
|
||||
user = "beds"
|
||||
pass = "changeme"
|
||||
database = "beds_app"
|
||||
use_ssl = false
|
||||
@@ -26,7 +26,7 @@
|
||||
//! * `2026-04-02` - mks - refactored into load() + load_from() for testability
|
||||
|
||||
mod structs;
|
||||
pub use structs::{BedsConfig, BrokerServicesConfig};
|
||||
pub use structs::{BedsConfig, BrokerServicesConfig, RecNodeConfig};
|
||||
|
||||
use config::{Config, File, FileFormat};
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct BedsConfig {
|
||||
@@ -9,6 +10,7 @@ pub struct BedsConfig {
|
||||
pub audit_on: bool,
|
||||
pub journal_on: bool,
|
||||
pub broker_services: BrokerServicesConfig,
|
||||
pub rec_services: HashMap<String, RecNodeConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -48,3 +50,13 @@ pub struct BrokerInstancesConfig {
|
||||
pub w_broker: u32,
|
||||
pub m_broker: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RecNodeConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub user: String,
|
||||
pub pass: String,
|
||||
pub database: String,
|
||||
pub use_ssl: bool,
|
||||
}
|
||||
|
||||
@@ -16,3 +16,4 @@
|
||||
pub mod amqp;
|
||||
pub mod config;
|
||||
pub mod logging;
|
||||
pub mod mongo;
|
||||
|
||||
12
src/main.rs
12
src/main.rs
@@ -24,6 +24,7 @@
|
||||
mod amqp;
|
||||
mod config;
|
||||
mod logging;
|
||||
mod mongo;
|
||||
|
||||
/// Executes the BEDS Initial Program Load (IPL) sequence.
|
||||
///
|
||||
@@ -69,6 +70,17 @@ fn ipl() -> Result<(), String> {
|
||||
}
|
||||
}
|
||||
|
||||
// validate MongoDB reachability — fatal in production, non-fatal in all other envs
|
||||
match mongo::validate_all(&cfg.rec_services) {
|
||||
Ok(()) => tracing::info!("MongoDB reachable"),
|
||||
Err(e) => {
|
||||
if cfg.id.env_name == "production" {
|
||||
return Err(e);
|
||||
}
|
||||
tracing::warn!("MongoDB unreachable (non-fatal in {}): {}", cfg.id.env_name, e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
109
src/mongo.rs
Normal file
109
src/mongo.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
//! # mongo.rs — MongoDB (REC) Transport Layer
|
||||
//!
|
||||
//! Manages all MongoDB interactions for the BEDS node. At IPL, validates that
|
||||
//! each configured REC service node is reachable before the node proceeds.
|
||||
//! Future phases will add connection pooling, authentication, and collection
|
||||
//! access via the adapter layer.
|
||||
//!
|
||||
//! ## Calling Agents
|
||||
//! - `ipl()` in main.rs — calls `validate_all()` during the IPL sequence
|
||||
//!
|
||||
//! ## Inputs
|
||||
//! - `HashMap<String, RecNodeConfig>` from the loaded BEDS configuration
|
||||
//!
|
||||
//! ## Outputs
|
||||
//! - `Ok(())` if all configured REC nodes are reachable
|
||||
//! - `Err(String)` with node name, host:port, and OS error on first failure
|
||||
//!
|
||||
//! **Author:** mks
|
||||
//! **Version:** 1.0
|
||||
//!
|
||||
//! ## History
|
||||
//! * `2026-04-04` - mks - original coding
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::TcpStream;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config::RecNodeConfig;
|
||||
|
||||
/// Validates that all configured MongoDB nodes are reachable.
|
||||
///
|
||||
/// Iterates every entry in the `rec_services` config block and opens a TCP
|
||||
/// connection to each declared host:port. Does not authenticate or issue any
|
||||
/// MongoDB wire protocol — reachability only. Fails on the first unreachable
|
||||
/// node.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `nodes` — map of service name → `RecNodeConfig` from `BedsConfig`
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// `Ok(())` if every node responds to a TCP connect within the timeout.
|
||||
/// `Err(String)` with the service name and address of the first failure.
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-04` - mks - original coding
|
||||
pub fn validate_all(nodes: &HashMap<String, RecNodeConfig>) -> Result<(), String> {
|
||||
for (name, node) in nodes {
|
||||
validate(name, node)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates that a single MongoDB node is reachable.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `name` — service name from config (e.g. "app_server") — used in error messages
|
||||
/// * `node` — `RecNodeConfig` for this node
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// `Ok(())` if the TCP handshake succeeds within 5 seconds.
|
||||
/// `Err(String)` with a descriptive message on failure.
|
||||
///
|
||||
/// # History
|
||||
///
|
||||
/// * `2026-04-04` - mks - original coding
|
||||
pub fn validate(name: &str, node: &RecNodeConfig) -> Result<(), String> {
|
||||
let addr_str = format!("{}:{}", node.host, node.port);
|
||||
|
||||
let addr: std::net::SocketAddr = addr_str
|
||||
.parse()
|
||||
.map_err(|e| format!("Invalid MongoDB address for rec_services.{} ({}): {}", name, addr_str, e))?;
|
||||
|
||||
TcpStream::connect_timeout(&addr, Duration::from_secs(5))
|
||||
.map_err(|e| format!("MongoDB unreachable at rec_services.{} ({}): {}", name, addr_str, e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::load_from;
|
||||
|
||||
fn test_cfg() -> crate::config::BedsConfig {
|
||||
load_from("tests/fixtures/beds_test.toml", "")
|
||||
.expect("test fixture beds_test.toml failed to load")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_err_on_closed_port() {
|
||||
let mut cfg = test_cfg();
|
||||
let node = cfg.rec_services.get_mut("app_server").unwrap();
|
||||
node.port = 1;
|
||||
assert!(validate("app_server", node).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_err_on_bad_address() {
|
||||
let mut cfg = test_cfg();
|
||||
let node = cfg.rec_services.get_mut("app_server").unwrap();
|
||||
node.host = "not_a_valid_host!!!".to_string();
|
||||
assert!(validate("app_server", node).is_err());
|
||||
}
|
||||
}
|
||||
8
tests/fixtures/beds_test.toml
vendored
8
tests/fixtures/beds_test.toml
vendored
@@ -49,3 +49,11 @@ rpi = 50
|
||||
r_broker = 2
|
||||
w_broker = 2
|
||||
m_broker = 0
|
||||
|
||||
[rec_services.app_server]
|
||||
host = "127.0.0.1"
|
||||
port = 27017
|
||||
user = "beds"
|
||||
pass = "changeme"
|
||||
database = "beds_test"
|
||||
use_ssl = false
|
||||
|
||||
Reference in New Issue
Block a user