From 5d955bf77c6e1f780b44648d772363283454f2b8 Mon Sep 17 00:00:00 2001 From: Alex Hunt Date: Mon, 11 May 2026 16:51:55 +0100 Subject: [PATCH 1/2] Support Option in dyncfg-file --- src/dyncfg-file/src/lib.rs | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/dyncfg-file/src/lib.rs b/src/dyncfg-file/src/lib.rs index df0ade7b5e5da..c05c11b25879f 100644 --- a/src/dyncfg-file/src/lib.rs +++ b/src/dyncfg-file/src/lib.rs @@ -176,6 +176,10 @@ fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result Ok(ConfigVal::String(v.clone())), + (ConfigVal::OptString(_), JsonValue::Null) => Ok(ConfigVal::OptString(None)), + (ConfigVal::OptString(_), JsonValue::String(v)) => { + Ok(ConfigVal::OptString(Some(v.clone()))) + } (ConfigVal::Duration(_), JsonValue::String(v)) => { Ok(ConfigVal::Duration(humantime::parse_duration(v)?)) } @@ -240,4 +244,42 @@ mod tests { assert_eq!(BOOL_CONFIG.get(&set), false); assert_eq!(STRING_CONFIG.get(&set), "modified"); } + + #[mz_ore::test(tokio::test)] + async fn test_file_sync_opt_string() { + const OPT_STRING_CONFIG: Config> = + Config::new("test_opt_string", None, "A test optional string config"); + let set = ConfigSet::default().add(&OPT_STRING_CONFIG); + + let mut config_file = tempfile::NamedTempFile::new().unwrap(); + config_file + .write_all(b"{\"test_opt_string\": \"hello\"}") + .unwrap(); + sync_file_to_configset( + set.clone(), + &config_file, + Duration::from_secs(1), + None, + |_, _| {}, + ) + .await + .unwrap(); + assert_eq!(OPT_STRING_CONFIG.get(&set), Some("hello".to_string())); + + // null clears the value back to None + let mut config_file = tempfile::NamedTempFile::new().unwrap(); + config_file + .write_all(b"{\"test_opt_string\": null}") + .unwrap(); + sync_file_to_configset( + set.clone(), + &config_file, + Duration::from_secs(1), + None, + |_, _| {}, + ) + .await + .unwrap(); + assert_eq!(OPT_STRING_CONFIG.get(&set), None); + } } From cbeeda0cde6eea8e6f1adb64f17b61cb8895411d Mon Sep 17 00:00:00 2001 From: Alex Hunt Date: Mon, 11 May 2026 16:52:09 +0100 Subject: [PATCH 2/2] Orchestratord E2E OIDC test --- ci/nightly/pipeline.template.yml | 14 ++ test/orchestratord/mzcompose.py | 390 +++++++++++++++++++++++++++++++ 2 files changed, 404 insertions(+) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index d10055451b451..9a8079b2fc592 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2365,6 +2365,20 @@ steps: agents: queue: hetzner-aarch64-16cpu-32gb + - id: orchestratord-oidc-auth + label: Orchestratord OIDC auth end-to-end + artifact_paths: ["mz_debug_*.zip"] + depends_on: devel-docker-tags + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + run: oidc-auth + args: [--recreate-cluster] + ci-builder: stable + agents: + queue: hetzner-aarch64-16cpu-32gb + - id: emulator label: Materialize Emulator depends_on: build-aarch64 diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 6e99c53fe861f..5f19956bf3d01 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -3161,3 +3161,393 @@ def post_run_check_balancer(definition: dict[str, Any], expect_fail: bool) -> No ] ) raise ValueError("Never completed") + + +# OIDC end-to-end testing +# +# Spins up Ory Hydra as a real OIDC provider next to orchestratord/environmentd +# in kind and verifies: +# 1. mz_system password fallback login (Materialize CR backend secret). +# 2. pgwire login as an OIDC user with a JWT issued by Hydra via the +# client_credentials grant. + +OIDC_HYDRA_NAMESPACE = "hydra" +OIDC_HYDRA_IMAGE = "oryd/hydra:v2.2.0" +OIDC_HYDRA_PUBLIC_PORT = 4444 +OIDC_HYDRA_ADMIN_PORT = 4445 +OIDC_HYDRA_ISSUER = f"http://hydra-public.{OIDC_HYDRA_NAMESPACE}.svc.cluster.local:{OIDC_HYDRA_PUBLIC_PORT}" +OIDC_CLIENT_ID = "mz-test-client" +OIDC_CLIENT_SECRET = "mz-test-client-secret" +OIDC_AUDIENCE = "mz-test-audience" +OIDC_SYSTEM_PARAMS_CM = "oidc-system-params" +OIDC_MZ_SYSTEM_PASSWORD = "oidc-test-mz-system-password" + + +def _hydra_manifests() -> list[dict[str, Any]]: + # `hydra serve all --dev` relaxes TLS requirements for the public/admin + # endpoints, which is required since the kind cluster has no TLS. + # `secrets.system` must be >= 16 bytes; `dsn: memory` keeps everything + # in-memory so no separate database is needed for the test. + hydra_config = yaml.dump( + { + "dsn": "memory", + "secrets": {"system": ["mz-test-secret-please-change-me"]}, + "urls": { + "self": {"issuer": OIDC_HYDRA_ISSUER}, + "login": "http://unused/login", + "consent": "http://unused/consent", + }, + "oauth2": { + "expose_internal_errors": True, + }, + "strategies": {"access_token": "jwt"}, + "log": {"level": "info", "leak_sensitive_values": True}, + "ttl": {"access_token": "10m"}, + } + ) + return [ + { + "apiVersion": "v1", + "kind": "Namespace", + "metadata": {"name": OIDC_HYDRA_NAMESPACE}, + }, + { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "hydra-config", "namespace": OIDC_HYDRA_NAMESPACE}, + "data": {"hydra.yaml": hydra_config}, + }, + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"name": "hydra", "namespace": OIDC_HYDRA_NAMESPACE}, + "spec": { + "replicas": 1, + "selector": {"matchLabels": {"app": "hydra"}}, + "template": { + "metadata": {"labels": {"app": "hydra"}}, + "spec": { + "containers": [ + { + "name": "hydra", + "image": OIDC_HYDRA_IMAGE, + "imagePullPolicy": "IfNotPresent", + "args": [ + "serve", + "all", + "--dev", + "--config", + "/etc/hydra/hydra.yaml", + ], + "ports": [ + {"containerPort": OIDC_HYDRA_PUBLIC_PORT}, + {"containerPort": OIDC_HYDRA_ADMIN_PORT}, + ], + "readinessProbe": { + "httpGet": { + "path": "/health/ready", + "port": OIDC_HYDRA_ADMIN_PORT, + }, + "initialDelaySeconds": 2, + "periodSeconds": 2, + }, + "volumeMounts": [ + { + "name": "config", + "mountPath": "/etc/hydra", + "readOnly": True, + } + ], + } + ], + "volumes": [ + { + "name": "config", + "configMap": {"name": "hydra-config"}, + } + ], + }, + }, + }, + }, + { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": "hydra-public", + "namespace": OIDC_HYDRA_NAMESPACE, + }, + "spec": { + "selector": {"app": "hydra"}, + "ports": [ + { + "port": OIDC_HYDRA_PUBLIC_PORT, + "targetPort": OIDC_HYDRA_PUBLIC_PORT, + } + ], + }, + }, + { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": "hydra-admin", + "namespace": OIDC_HYDRA_NAMESPACE, + }, + "spec": { + "selector": {"app": "hydra"}, + "ports": [ + { + "port": OIDC_HYDRA_ADMIN_PORT, + "targetPort": OIDC_HYDRA_ADMIN_PORT, + } + ], + }, + }, + ] + + +def install_hydra() -> None: + """Deploy Ory Hydra into the kind cluster and create the test client.""" + spawn.runv( + ["kubectl", "apply", "-f", "-"], + stdin=yaml.dump_all(_hydra_manifests()).encode(), + ) + spawn.runv( + [ + "kubectl", + "wait", + "-n", + OIDC_HYDRA_NAMESPACE, + "deployment/hydra", + "--for=condition=Available", + "--timeout=300s", + ] + ) + _create_hydra_client() + + +def _create_hydra_client() -> None: + """Idempotently register the OAuth2 client used by the test.""" + with _port_forward( + "deployment/hydra", + OIDC_HYDRA_ADMIN_PORT, + namespace=OIDC_HYDRA_NAMESPACE, + ) as admin_port: + # Delete any pre-existing client of the same id (best effort). + try: + requests.delete( + f"http://127.0.0.1:{admin_port}/admin/clients/{OIDC_CLIENT_ID}", + timeout=10, + ) + except requests.RequestException: + pass + # `audience` on the client constrains which audiences the client may + # request; the `aud` claim is asserted in the token request below. + resp = requests.post( + f"http://127.0.0.1:{admin_port}/admin/clients", + json={ + "client_id": OIDC_CLIENT_ID, + "client_secret": OIDC_CLIENT_SECRET, + "grant_types": ["client_credentials"], + "token_endpoint_auth_method": "client_secret_post", + "audience": [OIDC_AUDIENCE], + "access_token_strategy": "jwt", + }, + timeout=10, + ) + assert resp.status_code in ( + 200, + 201, + ), f"hydra create client failed: {resp.status_code} {resp.text}" + + +@contextmanager +def _port_forward( + target: str, port: int, namespace: str, local_port: int | None = None +) -> Iterator[int]: + """Generic kubectl port-forward context manager.""" + local_port = local_port if local_port is not None else port + process = subprocess.Popen( + [ + "kubectl", + "port-forward", + target, + f"{local_port}:{port}", + "-n", + namespace, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + time.sleep(2) + if process.poll() is not None: + stdout, stderr = process.communicate() + raise RuntimeError( + f"Port forward to {target} failed: " + f"stdout={stdout.decode()}, stderr={stderr.decode()}" + ) + yield local_port + finally: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + +def _fetch_hydra_jwt() -> str: + """Run a client_credentials grant against Hydra and return the access token.""" + with _port_forward( + "deployment/hydra", + OIDC_HYDRA_PUBLIC_PORT, + namespace=OIDC_HYDRA_NAMESPACE, + ) as public_port: + resp = requests.post( + f"http://127.0.0.1:{public_port}/oauth2/token", + data={ + "grant_type": "client_credentials", + "client_id": OIDC_CLIENT_ID, + "client_secret": OIDC_CLIENT_SECRET, + "audience": OIDC_AUDIENCE, + }, + timeout=10, + ) + assert ( + resp.status_code == 200 + ), f"hydra token grant failed: {resp.status_code} {resp.text}" + body = resp.json() + token = body.get("access_token") + assert token, f"no access_token in response: {body}" + # Cheap sanity check that we got a JWT (three base64 segments), not an + # opaque reference token; a misconfigured strategy would silently + # return an opaque token that environmentd can't validate. + assert token.count(".") == 2, f"hydra returned non-JWT token: {token!r}" + return token + + +def _oidc_system_params_configmap() -> dict[str, Any]: + return { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": OIDC_SYSTEM_PARAMS_CM, + "namespace": "materialize-environment", + }, + "data": { + "system-params.json": json.dumps( + { + "oidc_issuer": OIDC_HYDRA_ISSUER, + "oidc_audience": [OIDC_AUDIENCE], + "oidc_authentication_claim": "sub", + } + ) + }, + } + + +def workflow_oidc_auth(c: Composition, parser: WorkflowArgumentParser) -> None: + """End-to-end OIDC auth test via orchestratord + Ory Hydra.""" + parser.add_argument( + "--recreate-cluster", + action=argparse.BooleanOptionalAction, + help="Recreate cluster if it exists already", + ) + parser.add_argument( + "--tag", + type=str, + help="Custom version tag to use", + ) + parser.add_argument( + "--orchestratord-override", + default=True, + action=argparse.BooleanOptionalAction, + help="Override orchestratord tag", + ) + args = parser.parse_args() + + version = get_version(args.tag) + min_version = MzVersion.parse_mz("v26.16.0-dev.0") + if version < min_version: + raise ValueError( + f"workflow_oidc_auth requires environmentd >= {min_version}, got {version}" + ) + + definition = setup(c, args) + + install_hydra() + + definition["materialize"]["spec"]["authenticatorKind"] = "Oidc" + definition["materialize"]["spec"][ + "systemParameterConfigmapName" + ] = OIDC_SYSTEM_PARAMS_CM + definition["secret"]["stringData"][ + "external_login_password_mz_system" + ] = OIDC_MZ_SYSTEM_PASSWORD + definition["system_params_configmap"] = _oidc_system_params_configmap() + + init(definition) + run(definition, expect_fail=False) + + with port_forward_environmentd() as port: + print("Verifying mz_system password fallback...") + with ( + psycopg.connect( + host="127.0.0.1", + port=port, + user="mz_system", + password=OIDC_MZ_SYSTEM_PASSWORD, + dbname="materialize", + connect_timeout=30, + ) as conn, + conn.cursor() as cur, + ): + cur.execute("SELECT current_user") + row = cur.fetchone() + assert row == ("mz_system",), f"unexpected current_user: {row}" + + print(f"Fetching JWT from Hydra for client {OIDC_CLIENT_ID}...") + token = _fetch_hydra_jwt() + + print(f"Verifying OIDC pgwire login as {OIDC_CLIENT_ID}...") + # `-c oidc_auth_enabled=true` opts this connection into the OIDC + # authenticator. The role is auto-provisioned by environmentd on first + # login if it doesn't already exist. + with ( + psycopg.connect( + host="127.0.0.1", + port=port, + user=OIDC_CLIENT_ID, + password=token, + dbname="materialize", + options="-c oidc_auth_enabled=true", + connect_timeout=30, + ) as conn, + conn.cursor() as cur, + ): + cur.execute("SELECT current_user") + row = cur.fetchone() + assert row == (OIDC_CLIENT_ID,), f"unexpected current_user: {row}" + + print("Verifying OIDC login rejects an invalid token...") + bad_token = token[:-4] + "AAAA" + try: + psycopg.connect( + host="127.0.0.1", + port=port, + user=OIDC_CLIENT_ID, + password=bad_token, + dbname="materialize", + options="-c oidc_auth_enabled=true", + connect_timeout=30, + ).close() + except psycopg.OperationalError: + pass + else: + raise AssertionError( + "OIDC login with tampered token unexpectedly succeeded" + ) + + print("OIDC end-to-end auth test passed.")