diff --git a/crates/ordo-server/src/api.rs b/crates/ordo-server/src/api.rs index d0ead88a..3d2d8ebb 100644 --- a/crates/ordo-server/src/api.rs +++ b/crates/ordo-server/src/api.rs @@ -1644,6 +1644,71 @@ pub async fn admin_reload(State(state): State) -> ApiResult, +) -> Json { + let cfg = state.runtime_config.read().await; + Json(cfg.clone()) +} + +/// PUT /api/v1/admin/config — apply a new runtime configuration cluster-wide. +pub async fn put_runtime_config( + State(state): State, + Json(new_cfg): Json, +) -> ApiResult> { + new_cfg.validate().map_err(ApiError::bad_request)?; + + // 1. Persist to in-memory store + { + let mut guard = state.runtime_config.write().await; + *guard = new_cfg.clone(); + } + + // 2. Apply to AuditLogger + state + .audit_logger + .set_sample_rate(new_cfg.audit_sample_rate); + + // 3. Apply resource limits to RuleStore + { + let mut store = state.store.write().await; + store.set_resource_limits(new_cfg.max_rules_per_tenant, new_cfg.max_total_rules); + } + + // 4. Apply tenant defaults to TenantManager + state + .tenant_manager + .update_defaults(crate::tenant::TenantDefaults { + default_qps_limit: new_cfg.default_tenant_qps, + default_burst_limit: new_cfg.default_tenant_burst, + default_timeout_ms: new_cfg.default_tenant_timeout_ms, + }); + + // 5. Broadcast to cluster via NATS (if enabled) + { + let tx_guard = state.sync_tx.read().await; + if let Some(tx) = tx_guard.as_ref() { + if let Ok(json) = serde_json::to_string(&new_cfg) { + let _ = tx.send(crate::sync::event::SyncEvent::RuntimeConfigChanged { + config_json: json, + }); + } + } + } + + tracing::info!( + audit_sample_rate = new_cfg.audit_sample_rate, + max_rules_per_tenant = ?new_cfg.max_rules_per_tenant, + max_total_rules = ?new_cfg.max_total_rules, + "Runtime config updated" + ); + + Ok(Json(new_cfg)) +} + // ==================== Rule Testing API ==================== /// Run a test suite against a named ruleset. diff --git a/crates/ordo-server/src/api_integration_tests.rs b/crates/ordo-server/src/api_integration_tests.rs index d9b3ed36..3c0e9c79 100644 --- a/crates/ordo-server/src/api_integration_tests.rs +++ b/crates/ordo-server/src/api_integration_tests.rs @@ -65,12 +65,14 @@ async fn build_full_test_app() -> Router { audit_logger, metric_sink, executor, + runtime_config: crate::runtime_config::new_shared(&config), config, signature_verifier: None, debug_sessions, tenant_manager, rate_limiter, webhook_manager, + sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)), }; // CORS — permissive for tests (matches debug_enabled=false production default) diff --git a/crates/ordo-server/src/api_tests.rs b/crates/ordo-server/src/api_tests.rs index 0b1bf4ac..502ff303 100644 --- a/crates/ordo-server/src/api_tests.rs +++ b/crates/ordo-server/src/api_tests.rs @@ -50,12 +50,14 @@ async fn build_test_app() -> Router { audit_logger, metric_sink, executor, + runtime_config: crate::runtime_config::new_shared(&config), config, signature_verifier: None, debug_sessions, tenant_manager, rate_limiter, webhook_manager, + sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)), }; Router::new() @@ -395,12 +397,14 @@ async fn test_admin_reload_with_persistence() { audit_logger, metric_sink, executor, + runtime_config: crate::runtime_config::new_shared(&config), config, signature_verifier: None, debug_sessions, tenant_manager, rate_limiter, webhook_manager, + sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)), }; let app = Router::new() diff --git a/crates/ordo-server/src/grpc.rs b/crates/ordo-server/src/grpc.rs index 4a3c16e6..e7cedcad 100644 --- a/crates/ordo-server/src/grpc.rs +++ b/crates/ordo-server/src/grpc.rs @@ -113,7 +113,7 @@ impl OrdoGrpcService { if !self.multi_tenancy_enabled { return Ok(TenantConfig::default_for_id( tenant_id, - self.tenant_manager.defaults(), + &self.tenant_manager.defaults(), )); } diff --git a/crates/ordo-server/src/main.rs b/crates/ordo-server/src/main.rs index a8ca437f..825a619f 100644 --- a/crates/ordo-server/src/main.rs +++ b/crates/ordo-server/src/main.rs @@ -57,6 +57,7 @@ mod json; mod metrics; mod middleware; mod rate_limiter; +mod runtime_config; mod store; mod sync; mod telemetry; @@ -73,6 +74,7 @@ use ordo_core::prelude::{RuleExecutor, TraceConfig}; use ordo_core::signature::ed25519::decode_public_key; use ordo_core::signature::RuleVerifier; use rate_limiter::RateLimiter; +use runtime_config::SharedRuntimeConfig; use std::fs; use store::RuleStore; use sync::file_watcher::RecentWrites; @@ -86,8 +88,10 @@ pub struct AppState { pub metric_sink: Arc, /// Shared executor for rule execution (avoids holding lock during execution) pub executor: Arc, - /// Server configuration + /// Server configuration (immutable after startup) pub config: Arc, + /// Runtime-mutable configuration (hot-reloadable via admin API) + pub runtime_config: SharedRuntimeConfig, /// Signature verifier (if enabled) pub signature_verifier: Option, /// Debug session manager (only active in debug mode) @@ -98,6 +102,12 @@ pub struct AppState { pub rate_limiter: Arc, /// Webhook manager pub webhook_manager: Arc, + /// NATS sync sender — `None` when NATS is not configured. + /// Used to broadcast runtime config changes cluster-wide. + /// Wrapped in RwLock because NATS connects after servers are spawned. + pub sync_tx: Arc< + tokio::sync::RwLock>>, + >, } fn build_signature_verifier(config: &ServerConfig) -> anyhow::Result> { @@ -323,6 +333,17 @@ async fn main() -> anyhow::Result<()> { let webhook_manager = webhook::WebhookManager::new(shutdown_rx.clone()); + // Runtime-mutable config (hot-reloadable; shared across all servers). + let runtime_cfg = runtime_config::new_shared(&config); + + // Shared NATS sync sender — None until NATS connects (populated below, + // before any request is processed). Admin API uses this to broadcast + // runtime config changes cluster-wide. Use a RwLock so it can be set + // after the Arc is distributed to the server tasks. + let shared_sync_tx: Arc< + tokio::sync::RwLock>>, + > = Arc::new(tokio::sync::RwLock::new(None)); + // Log server started event { let store_guard = store.read().await; @@ -339,11 +360,13 @@ async fn main() -> anyhow::Result<()> { let http_metric_sink = metric_sink.clone(); let http_executor = executor.clone(); let http_config = config.clone(); + let http_runtime_cfg = runtime_cfg.clone(); let http_signature_verifier = signature_verifier.clone(); let http_debug_sessions = debug_sessions.clone(); let http_tenant_manager = tenant_manager.clone(); let http_rate_limiter = rate_limiter.clone(); let http_webhook_manager = webhook_manager.clone(); + let http_sync_tx = shared_sync_tx.clone(); let http_addr = config.http_addr(); let http_shutdown_rx = shutdown_rx.clone(); tasks.push(tokio::spawn(async move { @@ -354,11 +377,13 @@ async fn main() -> anyhow::Result<()> { http_metric_sink, http_executor, http_config, + http_runtime_cfg, http_signature_verifier, http_debug_sessions, http_tenant_manager, http_rate_limiter, http_webhook_manager, + http_sync_tx, http_shutdown_rx, ) .await @@ -479,7 +504,9 @@ async fn main() -> anyhow::Result<()> { let mut store_guard = store.write().await; store_guard.set_sync_tx(sync_tx.clone()); } - tenant_manager.set_sync_tx(sync_tx).await; + tenant_manager.set_sync_tx(sync_tx.clone()).await; + // Share the sender with HTTP handlers for runtime config broadcast. + *shared_sync_tx.write().await = Some(sync_tx); info!("NATS publisher started (writer mode)"); } @@ -492,6 +519,7 @@ async fn main() -> anyhow::Result<()> { instance_id.clone(), store.clone(), tenant_manager.clone(), + runtime_cfg.clone(), ); nats_subscriber_handle = Some(subscriber.start(shutdown_rx.clone())); @@ -607,11 +635,15 @@ async fn start_http_server( metric_sink: Arc, executor: Arc, config: Arc, + runtime_config: SharedRuntimeConfig, signature_verifier: Option, debug_sessions: Arc, tenant_manager: Arc, rate_limiter: Arc, webhook_manager: Arc, + sync_tx: Arc< + tokio::sync::RwLock>>, + >, mut shutdown_rx: watch::Receiver, ) -> anyhow::Result<()> { let debug_enabled = config.debug_enabled(); @@ -622,11 +654,13 @@ async fn start_http_server( metric_sink, executor, config, + runtime_config, signature_verifier, debug_sessions, tenant_manager, rate_limiter, webhook_manager, + sync_tx, }; // Build base router @@ -700,6 +734,10 @@ async fn start_http_server( ) // Admin API .route("/api/v1/admin/reload", post(api::admin_reload)) + .route( + "/api/v1/admin/config", + get(api::get_runtime_config).put(api::put_runtime_config), + ) // Metrics .route("/metrics", get(prometheus_metrics)) // Tenant management diff --git a/crates/ordo-server/src/middleware/tenant.rs b/crates/ordo-server/src/middleware/tenant.rs index fb9530e7..6672d1ab 100644 --- a/crates/ordo-server/src/middleware/tenant.rs +++ b/crates/ordo-server/src/middleware/tenant.rs @@ -25,7 +25,7 @@ pub async fn tenant_middleware( if is_tenant_management_path(path) { // This is a tenant management endpoint, use default tenant context let tenant_id = state.config.default_tenant.clone(); - let config = TenantConfig::default_for_id(&tenant_id, state.tenant_manager.defaults()); + let config = TenantConfig::default_for_id(&tenant_id, &state.tenant_manager.defaults()); req.extensions_mut().insert(TenantContext { id: tenant_id, config, @@ -36,7 +36,7 @@ pub async fn tenant_middleware( let tenant_id = extract_tenant_id(&req).unwrap_or_else(|| state.config.default_tenant.clone()); if !state.config.multi_tenancy_enabled { - let config = TenantConfig::default_for_id(&tenant_id, state.tenant_manager.defaults()); + let config = TenantConfig::default_for_id(&tenant_id, &state.tenant_manager.defaults()); req.extensions_mut().insert(TenantContext { id: tenant_id, config, diff --git a/crates/ordo-server/src/runtime_config.rs b/crates/ordo-server/src/runtime_config.rs new file mode 100644 index 00000000..675c28c4 --- /dev/null +++ b/crates/ordo-server/src/runtime_config.rs @@ -0,0 +1,83 @@ +//! Runtime-mutable server configuration. +//! +//! A subset of [`ServerConfig`] that can be changed at runtime via the admin API +//! without restarting the server. Changes are applied immediately to all dependent +//! components and, when NATS sync is active, broadcast to every node in the cluster. +//! +//! # Mutable fields +//! +//! | Field | Effect | +//! |-------|--------| +//! | `audit_sample_rate` | Fraction of executions written to the audit log | +//! | `default_tenant_qps` | Token-bucket refill rate for tenants with no explicit QPS limit | +//! | `default_tenant_burst` | Token-bucket capacity for tenants with no explicit burst limit | +//! | `default_tenant_timeout_ms` | Execution deadline for tenants with no explicit timeout | +//! | `max_rules_per_tenant` | Cap on rulesets per tenant (`null` = unlimited) | +//! | `max_total_rules` | Cap on total rulesets across all tenants (`null` = unlimited) | + +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::config::ServerConfig; + +/// The runtime-mutable portion of server configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeConfig { + /// Audit-log sampling rate (0 = none, 100 = all executions). + pub audit_sample_rate: u8, + + /// Default QPS limit applied to tenants that have no explicit `qps_limit`. + /// `null` means unlimited. + pub default_tenant_qps: Option, + + /// Default burst limit applied to tenants that have no explicit `burst_limit`. + /// `null` means unlimited. + pub default_tenant_burst: Option, + + /// Default execution timeout (ms) applied to tenants with no explicit timeout. + pub default_tenant_timeout_ms: u64, + + /// Maximum number of rulesets a single tenant may own. + /// `null` means unlimited. + pub max_rules_per_tenant: Option, + + /// Maximum total rulesets across all tenants combined. + /// `null` means unlimited. + pub max_total_rules: Option, +} + +impl RuntimeConfig { + /// Initialise from the startup [`ServerConfig`]. + pub fn from_server_config(config: &ServerConfig) -> Self { + Self { + audit_sample_rate: config.audit_sample_rate, + default_tenant_qps: config.default_tenant_qps, + default_tenant_burst: config.default_tenant_burst, + default_tenant_timeout_ms: config.default_tenant_timeout_ms, + max_rules_per_tenant: config.max_rules_per_tenant, + max_total_rules: config.max_total_rules, + } + } + + /// Validate the update request. + pub fn validate(&self) -> Result<(), String> { + if self.audit_sample_rate > 100 { + return Err(format!( + "audit_sample_rate must be 0–100, got {}", + self.audit_sample_rate + )); + } + Ok(()) + } +} + +/// Shared handle to the live [`RuntimeConfig`]. +pub type SharedRuntimeConfig = Arc>; + +/// Create a new shared config initialised from `server_config`. +pub fn new_shared(server_config: &ServerConfig) -> SharedRuntimeConfig { + Arc::new(RwLock::new(RuntimeConfig::from_server_config( + server_config, + ))) +} diff --git a/crates/ordo-server/src/sync/event.rs b/crates/ordo-server/src/sync/event.rs index 86dd6a04..45ad2782 100644 --- a/crates/ordo-server/src/sync/event.rs +++ b/crates/ordo-server/src/sync/event.rs @@ -23,6 +23,9 @@ pub enum SyncEvent { /// Tenant configuration was changed (create/update/delete). /// Carries the full tenants map so readers can replace atomically. TenantConfigChanged { config_json: String }, + /// Runtime configuration was changed via the admin API. + /// Carries the full [`RuntimeConfig`] as JSON so every node applies it atomically. + RuntimeConfigChanged { config_json: String }, } impl SyncEvent { @@ -32,6 +35,7 @@ impl SyncEvent { SyncEvent::RulePut { .. } => "RulePut", SyncEvent::RuleDeleted { .. } => "RuleDeleted", SyncEvent::TenantConfigChanged { .. } => "TenantConfigChanged", + SyncEvent::RuntimeConfigChanged { .. } => "RuntimeConfigChanged", } } } @@ -74,6 +78,9 @@ impl SyncMessage { SyncEvent::TenantConfigChanged { .. } => { format!("{}.tenants", prefix) } + SyncEvent::RuntimeConfigChanged { .. } => { + format!("{}.runtime-config", prefix) + } } } } diff --git a/crates/ordo-server/src/sync/nats_sync.rs b/crates/ordo-server/src/sync/nats_sync.rs index 5f3b1fc7..c47d9919 100644 --- a/crates/ordo-server/src/sync/nats_sync.rs +++ b/crates/ordo-server/src/sync/nats_sync.rs @@ -139,6 +139,7 @@ pub struct NatsSubscriber { instance_id: String, store: Arc>, tenant_manager: Arc, + runtime_config: crate::runtime_config::SharedRuntimeConfig, } impl NatsSubscriber { @@ -147,12 +148,14 @@ impl NatsSubscriber { instance_id: String, store: Arc>, tenant_manager: Arc, + runtime_config: crate::runtime_config::SharedRuntimeConfig, ) -> Self { Self { consumer, instance_id, store, tenant_manager, + runtime_config, } } @@ -253,6 +256,9 @@ impl NatsSubscriber { SyncEvent::TenantConfigChanged { config_json } => { self.apply_tenant_config_changed(config_json).await; } + SyncEvent::RuntimeConfigChanged { config_json } => { + self.apply_runtime_config_changed(config_json).await; + } } } @@ -318,6 +324,37 @@ impl NatsSubscriber { } } } + + async fn apply_runtime_config_changed(&self, config_json: &str) { + let new_cfg: crate::runtime_config::RuntimeConfig = match serde_json::from_str(config_json) + { + Ok(c) => c, + Err(e) => { + error!("Failed to deserialize RuntimeConfigChanged: {}", e); + metrics::record_sync_failed("RuntimeConfigChanged", "deserialize"); + return; + } + }; + + // Apply to all mutable components on this node. + { + let mut guard = self.runtime_config.write().await; + *guard = new_cfg.clone(); + } + self.tenant_manager + .update_defaults(crate::tenant::TenantDefaults { + default_qps_limit: new_cfg.default_tenant_qps, + default_burst_limit: new_cfg.default_tenant_burst, + default_timeout_ms: new_cfg.default_tenant_timeout_ms, + }); + { + let mut store = self.store.write().await; + store.set_resource_limits(new_cfg.max_rules_per_tenant, new_cfg.max_total_rules); + } + + info!("Applied sync RuntimeConfigChanged"); + metrics::record_sync_applied("RuntimeConfigChanged"); + } } // ─── Setup helpers ──────────────────────────────────────────────────────────── diff --git a/crates/ordo-server/src/tenant.rs b/crates/ordo-server/src/tenant.rs index 6b814fc1..203e49f2 100644 --- a/crates/ordo-server/src/tenant.rs +++ b/crates/ordo-server/src/tenant.rs @@ -82,7 +82,8 @@ impl TenantStore { pub struct TenantManager { tenants: RwLock>, store: Option, - defaults: TenantDefaults, + /// Defaults for new tenants — hot-reloadable via the admin API. + defaults: parking_lot::RwLock, /// Sync channel for publishing tenant config changes to NATS. sync_tx: RwLock>>, } @@ -97,19 +98,27 @@ impl TenantManager { Ok(Self { tenants: RwLock::new(tenants), store, - defaults, + defaults: parking_lot::RwLock::new(defaults), sync_tx: RwLock::new(None), }) } - pub fn defaults(&self) -> &TenantDefaults { - &self.defaults + pub fn defaults(&self) -> parking_lot::RwLockReadGuard<'_, TenantDefaults> { + self.defaults.read() + } + + /// Update the tenant defaults used when creating new tenants. + /// + /// Does not affect existing tenants; only influences calls to + /// [`ensure_default`] and [`TenantConfig::default_for_id`] made afterwards. + pub fn update_defaults(&self, new_defaults: TenantDefaults) { + *self.defaults.write() = new_defaults; } pub async fn ensure_default(&self, tenant_id: &str) -> io::Result<()> { let mut guard = self.tenants.write().await; if !guard.contains_key(tenant_id) { - let config = TenantConfig::default_for_id(tenant_id, &self.defaults); + let config = TenantConfig::default_for_id(tenant_id, &self.defaults.read()); guard.insert(tenant_id.to_string(), config); if let Some(store) = &self.store { store.save(&guard).await?; @@ -267,11 +276,11 @@ mod tests { let mut tenants = HashMap::new(); tenants.insert( "default".to_string(), - TenantConfig::default_for_id("default", manager.defaults()), + TenantConfig::default_for_id("default", &manager.defaults()), ); tenants.insert( "new-tenant".to_string(), - TenantConfig::default_for_id("new-tenant", manager.defaults()), + TenantConfig::default_for_id("new-tenant", &manager.defaults()), ); let data = serde_json::to_vec_pretty(&tenants).unwrap(); tokio::fs::write(&store_path, data).await.unwrap();