Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/ordo-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,71 @@ pub async fn admin_reload(State(state): State<AppState>) -> ApiResult<Json<serde
})))
}

// ==================== Runtime Config API ====================

/// GET /api/v1/admin/config — return the current runtime configuration.
pub async fn get_runtime_config(
State(state): State<AppState>,
) -> Json<crate::runtime_config::RuntimeConfig> {
let cfg = state.runtime_config.read().await;
Json(cfg.clone())
}

/// PUT /api/v1/admin/config — apply a new runtime configuration cluster-wide.
pub async fn put_runtime_config(
State(state): State<AppState>,
Json(new_cfg): Json<crate::runtime_config::RuntimeConfig>,
) -> ApiResult<Json<crate::runtime_config::RuntimeConfig>> {
new_cfg.validate().map_err(ApiError::bad_request)?;

// 1. Persist to in-memory store
{
let mut guard = state.runtime_config.write().await;
*guard = new_cfg.clone();
}

// 2. Apply to AuditLogger
state
.audit_logger
.set_sample_rate(new_cfg.audit_sample_rate);

// 3. Apply resource limits to RuleStore
{
let mut store = state.store.write().await;
store.set_resource_limits(new_cfg.max_rules_per_tenant, new_cfg.max_total_rules);
}

// 4. Apply tenant defaults to TenantManager
state
.tenant_manager
.update_defaults(crate::tenant::TenantDefaults {
default_qps_limit: new_cfg.default_tenant_qps,
default_burst_limit: new_cfg.default_tenant_burst,
default_timeout_ms: new_cfg.default_tenant_timeout_ms,
});

// 5. Broadcast to cluster via NATS (if enabled)
{
let tx_guard = state.sync_tx.read().await;
if let Some(tx) = tx_guard.as_ref() {
if let Ok(json) = serde_json::to_string(&new_cfg) {
let _ = tx.send(crate::sync::event::SyncEvent::RuntimeConfigChanged {
config_json: json,
});
}
}
}

tracing::info!(
audit_sample_rate = new_cfg.audit_sample_rate,
max_rules_per_tenant = ?new_cfg.max_rules_per_tenant,
max_total_rules = ?new_cfg.max_total_rules,
"Runtime config updated"
);

Ok(Json(new_cfg))
}

// ==================== Rule Testing API ====================

/// Run a test suite against a named ruleset.
Expand Down
2 changes: 2 additions & 0 deletions crates/ordo-server/src/api_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ async fn build_full_test_app() -> Router {
audit_logger,
metric_sink,
executor,
runtime_config: crate::runtime_config::new_shared(&config),
config,
signature_verifier: None,
debug_sessions,
tenant_manager,
rate_limiter,
webhook_manager,
sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
};

// CORS — permissive for tests (matches debug_enabled=false production default)
Expand Down
4 changes: 4 additions & 0 deletions crates/ordo-server/src/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ async fn build_test_app() -> Router {
audit_logger,
metric_sink,
executor,
runtime_config: crate::runtime_config::new_shared(&config),
config,
signature_verifier: None,
debug_sessions,
tenant_manager,
rate_limiter,
webhook_manager,
sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
};

Router::new()
Expand Down Expand Up @@ -395,12 +397,14 @@ async fn test_admin_reload_with_persistence() {
audit_logger,
metric_sink,
executor,
runtime_config: crate::runtime_config::new_shared(&config),
config,
signature_verifier: None,
debug_sessions,
tenant_manager,
rate_limiter,
webhook_manager,
sync_tx: std::sync::Arc::new(tokio::sync::RwLock::new(None)),
};

let app = Router::new()
Expand Down
2 changes: 1 addition & 1 deletion crates/ordo-server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl OrdoGrpcService {
if !self.multi_tenancy_enabled {
return Ok(TenantConfig::default_for_id(
tenant_id,
self.tenant_manager.defaults(),
&self.tenant_manager.defaults(),
));
}

Expand Down
42 changes: 40 additions & 2 deletions crates/ordo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod json;
mod metrics;
mod middleware;
mod rate_limiter;
mod runtime_config;
mod store;
mod sync;
mod telemetry;
Expand All @@ -73,6 +74,7 @@ use ordo_core::prelude::{RuleExecutor, TraceConfig};
use ordo_core::signature::ed25519::decode_public_key;
use ordo_core::signature::RuleVerifier;
use rate_limiter::RateLimiter;
use runtime_config::SharedRuntimeConfig;
use std::fs;
use store::RuleStore;
use sync::file_watcher::RecentWrites;
Expand All @@ -86,8 +88,10 @@ pub struct AppState {
pub metric_sink: Arc<PrometheusMetricSink>,
/// Shared executor for rule execution (avoids holding lock during execution)
pub executor: Arc<RuleExecutor>,
/// Server configuration
/// Server configuration (immutable after startup)
pub config: Arc<ServerConfig>,
/// Runtime-mutable configuration (hot-reloadable via admin API)
pub runtime_config: SharedRuntimeConfig,
/// Signature verifier (if enabled)
pub signature_verifier: Option<RuleVerifier>,
/// Debug session manager (only active in debug mode)
Expand All @@ -98,6 +102,12 @@ pub struct AppState {
pub rate_limiter: Arc<RateLimiter>,
/// Webhook manager
pub webhook_manager: Arc<webhook::WebhookManager>,
/// NATS sync sender — `None` when NATS is not configured.
/// Used to broadcast runtime config changes cluster-wide.
/// Wrapped in RwLock because NATS connects after servers are spawned.
pub sync_tx: Arc<
tokio::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<sync::event::SyncEvent>>>,
>,
}

fn build_signature_verifier(config: &ServerConfig) -> anyhow::Result<Option<RuleVerifier>> {
Expand Down Expand Up @@ -323,6 +333,17 @@ async fn main() -> anyhow::Result<()> {

let webhook_manager = webhook::WebhookManager::new(shutdown_rx.clone());

// Runtime-mutable config (hot-reloadable; shared across all servers).
let runtime_cfg = runtime_config::new_shared(&config);

// Shared NATS sync sender — None until NATS connects (populated below,
// before any request is processed). Admin API uses this to broadcast
// runtime config changes cluster-wide. Use a RwLock so it can be set
// after the Arc is distributed to the server tasks.
let shared_sync_tx: Arc<
tokio::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<sync::event::SyncEvent>>>,
> = Arc::new(tokio::sync::RwLock::new(None));

// Log server started event
{
let store_guard = store.read().await;
Expand All @@ -339,11 +360,13 @@ async fn main() -> anyhow::Result<()> {
let http_metric_sink = metric_sink.clone();
let http_executor = executor.clone();
let http_config = config.clone();
let http_runtime_cfg = runtime_cfg.clone();
let http_signature_verifier = signature_verifier.clone();
let http_debug_sessions = debug_sessions.clone();
let http_tenant_manager = tenant_manager.clone();
let http_rate_limiter = rate_limiter.clone();
let http_webhook_manager = webhook_manager.clone();
let http_sync_tx = shared_sync_tx.clone();
let http_addr = config.http_addr();
let http_shutdown_rx = shutdown_rx.clone();
tasks.push(tokio::spawn(async move {
Expand All @@ -354,11 +377,13 @@ async fn main() -> anyhow::Result<()> {
http_metric_sink,
http_executor,
http_config,
http_runtime_cfg,
http_signature_verifier,
http_debug_sessions,
http_tenant_manager,
http_rate_limiter,
http_webhook_manager,
http_sync_tx,
http_shutdown_rx,
)
.await
Expand Down Expand Up @@ -479,7 +504,9 @@ async fn main() -> anyhow::Result<()> {
let mut store_guard = store.write().await;
store_guard.set_sync_tx(sync_tx.clone());
}
tenant_manager.set_sync_tx(sync_tx).await;
tenant_manager.set_sync_tx(sync_tx.clone()).await;
// Share the sender with HTTP handlers for runtime config broadcast.
*shared_sync_tx.write().await = Some(sync_tx);
info!("NATS publisher started (writer mode)");
}

Expand All @@ -492,6 +519,7 @@ async fn main() -> anyhow::Result<()> {
instance_id.clone(),
store.clone(),
tenant_manager.clone(),
runtime_cfg.clone(),
);
nats_subscriber_handle =
Some(subscriber.start(shutdown_rx.clone()));
Expand Down Expand Up @@ -607,11 +635,15 @@ async fn start_http_server(
metric_sink: Arc<PrometheusMetricSink>,
executor: Arc<RuleExecutor>,
config: Arc<ServerConfig>,
runtime_config: SharedRuntimeConfig,
signature_verifier: Option<RuleVerifier>,
debug_sessions: Arc<debug::DebugSessionManager>,
tenant_manager: Arc<TenantManager>,
rate_limiter: Arc<RateLimiter>,
webhook_manager: Arc<webhook::WebhookManager>,
sync_tx: Arc<
tokio::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<sync::event::SyncEvent>>>,
>,
mut shutdown_rx: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let debug_enabled = config.debug_enabled();
Expand All @@ -622,11 +654,13 @@ async fn start_http_server(
metric_sink,
executor,
config,
runtime_config,
signature_verifier,
debug_sessions,
tenant_manager,
rate_limiter,
webhook_manager,
sync_tx,
};

// Build base router
Expand Down Expand Up @@ -700,6 +734,10 @@ async fn start_http_server(
)
// Admin API
.route("/api/v1/admin/reload", post(api::admin_reload))
.route(
"/api/v1/admin/config",
get(api::get_runtime_config).put(api::put_runtime_config),
)
// Metrics
.route("/metrics", get(prometheus_metrics))
// Tenant management
Expand Down
4 changes: 2 additions & 2 deletions crates/ordo-server/src/middleware/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn tenant_middleware(
if is_tenant_management_path(path) {
// This is a tenant management endpoint, use default tenant context
let tenant_id = state.config.default_tenant.clone();
let config = TenantConfig::default_for_id(&tenant_id, state.tenant_manager.defaults());
let config = TenantConfig::default_for_id(&tenant_id, &state.tenant_manager.defaults());
req.extensions_mut().insert(TenantContext {
id: tenant_id,
config,
Expand All @@ -36,7 +36,7 @@ pub async fn tenant_middleware(
let tenant_id = extract_tenant_id(&req).unwrap_or_else(|| state.config.default_tenant.clone());

if !state.config.multi_tenancy_enabled {
let config = TenantConfig::default_for_id(&tenant_id, state.tenant_manager.defaults());
let config = TenantConfig::default_for_id(&tenant_id, &state.tenant_manager.defaults());
req.extensions_mut().insert(TenantContext {
id: tenant_id,
config,
Expand Down
83 changes: 83 additions & 0 deletions crates/ordo-server/src/runtime_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Runtime-mutable server configuration.
//!
//! A subset of [`ServerConfig`] that can be changed at runtime via the admin API
//! without restarting the server. Changes are applied immediately to all dependent
//! components and, when NATS sync is active, broadcast to every node in the cluster.
//!
//! # Mutable fields
//!
//! | Field | Effect |
//! |-------|--------|
//! | `audit_sample_rate` | Fraction of executions written to the audit log |
//! | `default_tenant_qps` | Token-bucket refill rate for tenants with no explicit QPS limit |
//! | `default_tenant_burst` | Token-bucket capacity for tenants with no explicit burst limit |
//! | `default_tenant_timeout_ms` | Execution deadline for tenants with no explicit timeout |
//! | `max_rules_per_tenant` | Cap on rulesets per tenant (`null` = unlimited) |
//! | `max_total_rules` | Cap on total rulesets across all tenants (`null` = unlimited) |

use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::config::ServerConfig;

/// The runtime-mutable portion of server configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeConfig {
/// Audit-log sampling rate (0 = none, 100 = all executions).
pub audit_sample_rate: u8,

/// Default QPS limit applied to tenants that have no explicit `qps_limit`.
/// `null` means unlimited.
pub default_tenant_qps: Option<u32>,

/// Default burst limit applied to tenants that have no explicit `burst_limit`.
/// `null` means unlimited.
pub default_tenant_burst: Option<u32>,

/// Default execution timeout (ms) applied to tenants with no explicit timeout.
pub default_tenant_timeout_ms: u64,

/// Maximum number of rulesets a single tenant may own.
/// `null` means unlimited.
pub max_rules_per_tenant: Option<usize>,

/// Maximum total rulesets across all tenants combined.
/// `null` means unlimited.
pub max_total_rules: Option<usize>,
}

impl RuntimeConfig {
/// Initialise from the startup [`ServerConfig`].
pub fn from_server_config(config: &ServerConfig) -> Self {
Self {
audit_sample_rate: config.audit_sample_rate,
default_tenant_qps: config.default_tenant_qps,
default_tenant_burst: config.default_tenant_burst,
default_tenant_timeout_ms: config.default_tenant_timeout_ms,
max_rules_per_tenant: config.max_rules_per_tenant,
max_total_rules: config.max_total_rules,
}
}

/// Validate the update request.
pub fn validate(&self) -> Result<(), String> {
if self.audit_sample_rate > 100 {
return Err(format!(
"audit_sample_rate must be 0–100, got {}",
self.audit_sample_rate
));
}
Ok(())
}
}

/// Shared handle to the live [`RuntimeConfig`].
pub type SharedRuntimeConfig = Arc<RwLock<RuntimeConfig>>;

/// Create a new shared config initialised from `server_config`.
pub fn new_shared(server_config: &ServerConfig) -> SharedRuntimeConfig {
Arc::new(RwLock::new(RuntimeConfig::from_server_config(
server_config,
)))
}
Loading
Loading