From b0be936cee365778e09768d481dbe45d57d31696 Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Mon, 11 May 2026 12:31:23 -0400 Subject: [PATCH] =?UTF-8?q?catalog:=20repair=20Role=20records=20left=20in?= =?UTF-8?q?=20pre-v81=20wire=20shape=20(v82=E2=86=92v83)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v80→v81 migration (src/catalog/src/durable/upgrade/v80_to_v81.rs) was guarded by an `is_cloud` heuristic that returned false for any catalog where, at the moment the migration committed in writable mode, mz_system was not a Managed cluster with replication_factor > 0, or where enable_password_auth was "on". Affected catalogs ran the migration with the heuristic returning false; the migration emitted Vec::new() and left every existing Role record's on-disk JSON in v80 wire shape — without an `auto_provision_source` key. On v81+ the value is still readable (serde fills None for the missing Option field), so the env doesn't crash. But any in-band write that retracts a Role record — ALTER ROLE, membership changes, etc. — re-serializes the deserialized value through the v81 type's Serialize, which emits `"auto_provision_source": null`. Those retraction bytes are not byte-equal to the v80-shape bytes on disk, so consolidation cannot cancel them; the original `+1` survives as a ghost record alongside whatever else accumulates. This change: * Adds CATALOG_VERSION v83. v83 is structurally identical to v82 except that RoleAttributes::auto_provision_source has #[serde(default, skip_serializing_if = "Option::is_none")], making the JSON round-trip byte-stable in both directions (None ⇔ no-field; Some(X) ⇔ "X"). * Adds run_versioned_upgrade_raw — a byte-precise counterpart to the existing run_versioned_upgrade. Migrations using this entry point receive each record's raw StateUpdateKindJson alongside its V1 deserialized form, and return raw (StateUpdateKindJson, Diff) updates that are appended verbatim. This is what makes byte-precise retraction of stale wire shapes possible — the standard MigrationAction::Update path re-serializes `old` through V1::Serialize, which cannot reproduce the v80 shape from a deserialized v82 value. * Adds the v82→v83 migration. For every Role record, the migration inspects the raw JSON to determine whether `auto_provision_source` is present as a key, then decides the target value: - field present, Some(X) on disk → preserve X - field absent on disk (v80 shape) → apply the email-regex heuristic from the original v80→v81 migration (Frontegg if the role name matches `.+@.+\..+`, else None) — unconditionally, with no is_cloud gate - field present and explicitly null → preserve None If the canonical v83 bytes differ from the on-disk bytes, the migration emits a retraction of the exact on-disk bytes and an insertion of the canonical v83 bytes. Records already in canonical v83 shape are skipped. Non-Role records are untouched. Tests in v82_to_v83.rs cover the four code paths plus "non-Role records untouched". Co-Authored-By: Claude Opus 4.7 --- src/catalog-protos/objects_hashes.json | 6 +- src/catalog-protos/src/lib.rs | 3 +- src/catalog-protos/src/objects.rs | 8 + src/catalog-protos/src/objects_v83.rs | 2973 +++++++++++++++++ src/catalog/src/durable/upgrade.rs | 96 +- src/catalog/src/durable/upgrade/v82_to_v83.rs | 287 ++ 6 files changed, 3370 insertions(+), 3 deletions(-) create mode 100644 src/catalog-protos/src/objects_v83.rs create mode 100644 src/catalog/src/durable/upgrade/v82_to_v83.rs diff --git a/src/catalog-protos/objects_hashes.json b/src/catalog-protos/objects_hashes.json index 2f3a39c661431..44a57b7f3cd97 100644 --- a/src/catalog-protos/objects_hashes.json +++ b/src/catalog-protos/objects_hashes.json @@ -1,7 +1,7 @@ [ { "name": "objects.rs", - "md5": "dfd89e1c62b7f1663d6429846a76159f" + "md5": "01adcd318b9bd73c0d05165b311e6073" }, { "name": "objects_v74.rs", @@ -38,5 +38,9 @@ { "name": "objects_v82.rs", "md5": "dfd89e1c62b7f1663d6429846a76159f" + }, + { + "name": "objects_v83.rs", + "md5": "01adcd318b9bd73c0d05165b311e6073" } ] diff --git a/src/catalog-protos/src/lib.rs b/src/catalog-protos/src/lib.rs index e553232530d14..c5b4c3c37ac56 100644 --- a/src/catalog-protos/src/lib.rs +++ b/src/catalog-protos/src/lib.rs @@ -20,6 +20,7 @@ pub mod objects_v79; pub mod objects_v80; pub mod objects_v81; pub mod objects_v82; +pub mod objects_v83; pub mod serialization; /// The current version of the `Catalog`. @@ -27,7 +28,7 @@ pub mod serialization; /// We will initialize new `Catalog`s with this version, and migrate existing `Catalog`s to this /// version. Whenever the `Catalog` changes, e.g. the types we serialize in the `Catalog` /// change, we need to bump this version. -pub const CATALOG_VERSION: u64 = 82; +pub const CATALOG_VERSION: u64 = 83; /// The minimum `Catalog` version number that we support migrating from. /// diff --git a/src/catalog-protos/src/objects.rs b/src/catalog-protos/src/objects.rs index 549137dd2f3dd..518646bcabb70 100644 --- a/src/catalog-protos/src/objects.rs +++ b/src/catalog-protos/src/objects.rs @@ -1308,6 +1308,14 @@ pub struct RoleAttributes { pub inherit: bool, pub superuser: Option, pub login: Option, + // `default` + `skip_serializing_if` makes the JSON round-trip + // `{...}` → None → `{...}` byte-stable for roles that were left in + // pre-v81 wire shape by the v80→v81 migration's `is_cloud` heuristic + // no-op path. Without these annotations, an absent field deserializes + // to None and re-serializes to `"auto_provision_source": null`, which + // is byte-distinct from the on-disk record and breaks consolidation of + // retractions emitted by subsequent in-band writes. See v82_to_v83.rs. + #[serde(default, skip_serializing_if = "Option::is_none")] pub auto_provision_source: Option, } diff --git a/src/catalog-protos/src/objects_v83.rs b/src/catalog-protos/src/objects_v83.rs new file mode 100644 index 0000000000000..518646bcabb70 --- /dev/null +++ b/src/catalog-protos/src/objects_v83.rs @@ -0,0 +1,2973 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use serde_repr::{Deserialize_repr, Serialize_repr}; + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ConfigKey { + pub key: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ConfigValue { + pub value: u64, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SettingKey { + pub name: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SettingValue { + pub value: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct IdAllocKey { + pub name: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct IdAllocValue { + pub next_id: u64, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct GidMappingKey { + pub schema_name: String, + pub object_type: CatalogItemType, + pub object_name: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct GidMappingValue { + pub catalog_id: SystemCatalogItemId, + pub global_id: SystemGlobalId, + pub fingerprint: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterKey { + pub id: ClusterId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterValue { + pub name: String, + pub owner_id: RoleId, + pub privileges: Vec, + pub config: ClusterConfig, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterIntrospectionSourceIndexKey { + pub cluster_id: ClusterId, + pub name: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterIntrospectionSourceIndexValue { + pub catalog_id: IntrospectionSourceIndexCatalogItemId, + pub global_id: IntrospectionSourceIndexGlobalId, + pub oid: u32, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterReplicaKey { + pub id: ReplicaId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterReplicaValue { + pub cluster_id: ClusterId, + pub name: String, + pub config: ReplicaConfig, + pub owner_id: RoleId, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct DatabaseKey { + pub id: DatabaseId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct DatabaseValue { + pub name: String, + pub owner_id: RoleId, + pub privileges: Vec, + pub oid: u32, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SchemaKey { + pub id: SchemaId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SchemaValue { + pub database_id: Option, + pub name: String, + pub owner_id: RoleId, + pub privileges: Vec, + pub oid: u32, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ItemKey { + pub gid: CatalogItemId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ItemValue { + pub schema_id: SchemaId, + pub name: String, + pub definition: CatalogItem, + pub owner_id: RoleId, + pub privileges: Vec, + pub oid: u32, + pub global_id: GlobalId, + pub extra_versions: Vec, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ItemVersion { + pub global_id: GlobalId, + pub version: Version, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleKey { + pub id: RoleId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleValue { + pub name: String, + pub attributes: RoleAttributes, + pub membership: RoleMembership, + pub vars: RoleVars, + pub oid: u32, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleAuthKey { + pub id: RoleId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleAuthValue { + pub password_hash: Option, + pub updated_at: EpochMillis, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct NetworkPolicyKey { + pub id: NetworkPolicyId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct NetworkPolicyValue { + pub name: String, + pub rules: Vec, + pub owner_id: RoleId, + pub privileges: Vec, + pub oid: u32, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ServerConfigurationKey { + pub name: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ServerConfigurationValue { + pub value: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct AuditLogKey { + pub event: AuditLogEvent, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum AuditLogEvent { + V1(AuditLogEventV1), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct CommentKey { + pub object: CommentObject, + pub sub_component: Option, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum CommentObject { + Table(CatalogItemId), + View(CatalogItemId), + MaterializedView(CatalogItemId), + Source(CatalogItemId), + Sink(CatalogItemId), + Index(CatalogItemId), + Func(CatalogItemId), + Connection(CatalogItemId), + Type(CatalogItemId), + Secret(CatalogItemId), + Role(RoleId), + Database(DatabaseId), + Schema(ResolvedSchema), + Cluster(ClusterId), + ClusterReplica(ClusterReplicaId), + NetworkPolicy(NetworkPolicyId), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum CommentSubComponent { + ColumnPos(u64), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct CommentValue { + pub comment: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SourceReferencesKey { + pub source: CatalogItemId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SourceReferencesValue { + pub references: Vec, + pub updated_at: EpochMillis, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SourceReference { + pub name: String, + pub namespace: Option, + pub columns: Vec, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct StorageCollectionMetadataKey { + pub id: GlobalId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct StorageCollectionMetadataValue { + pub shard: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct UnfinalizedShardKey { + pub shard: String, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct TxnWalShardValue { + pub shard: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Empty {} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct StringWrapper { + pub inner: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Duration { + pub secs: u64, + pub nanos: u32, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct EpochMillis { + pub millis: u64, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Version { + pub value: u64, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum CatalogItem { + V1(CatalogItemV1), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct CatalogItemV1 { + pub create_sql: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum CatalogItemId { + System(u64), + User(u64), + Transient(u64), + IntrospectionSourceIndex(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SystemCatalogItemId(pub u64); + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct IntrospectionSourceIndexCatalogItemId(pub u64); + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum GlobalId { + System(u64), + User(u64), + Transient(u64), + Explain, + IntrospectionSourceIndex(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SystemGlobalId(pub u64); + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct IntrospectionSourceIndexGlobalId(pub u64); + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ClusterId { + System(u64), + User(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum DatabaseId { + System(u64), + User(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ResolvedDatabaseSpecifier { + Ambient, + Id(DatabaseId), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum SchemaId { + System(u64), + User(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum SchemaSpecifier { + Temporary, + Id(SchemaId), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ResolvedSchema { + pub database: ResolvedDatabaseSpecifier, + pub schema: SchemaSpecifier, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ReplicaId { + System(u64), + User(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterReplicaId { + pub cluster_id: ClusterId, + pub replica_id: ReplicaId, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum NetworkPolicyId { + System(u64), + User(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ReplicaLogging { + pub log_logging: bool, + pub interval: Option, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct OptimizerFeatureOverride { + pub name: String, + pub value: String, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterScheduleRefreshOptions { + pub rehydration_time_estimate: Duration, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ClusterSchedule { + Manual, + Refresh(ClusterScheduleRefreshOptions), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterConfig { + pub workload_class: Option, + pub variant: ClusterVariant, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ClusterVariant { + Unmanaged, + Managed(ManagedCluster), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ManagedCluster { + pub size: String, + pub replication_factor: u32, + pub availability_zones: Vec, + pub logging: ReplicaLogging, + pub optimizer_feature_overrides: Vec, + pub schedule: ClusterSchedule, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ReplicaConfig { + pub logging: ReplicaLogging, + pub location: ReplicaLocation, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct UnmanagedLocation { + pub storagectl_addrs: Vec, + pub computectl_addrs: Vec, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ManagedLocation { + pub size: String, + pub availability_zone: Option, + pub internal: bool, + pub billed_as: Option, + pub pending: bool, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum ReplicaLocation { + Unmanaged(UnmanagedLocation), + Managed(ManagedLocation), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum RoleId { + System(u64), + User(u64), + Public, + Predefined(u64), +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum AutoProvisionSource { + Oidc = 0, + Frontegg = 1, + None = 2, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleAttributes { + pub inherit: bool, + pub superuser: Option, + pub login: Option, + // `default` + `skip_serializing_if` makes the JSON round-trip + // `{...}` → None → `{...}` byte-stable for roles that were left in + // pre-v81 wire shape by the v80→v81 migration's `is_cloud` heuristic + // no-op path. Without these annotations, an absent field deserializes + // to None and re-serializes to `"auto_provision_source": null`, which + // is byte-distinct from the on-disk record and breaks consolidation of + // retractions emitted by subsequent in-band writes. See v82_to_v83.rs. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub auto_provision_source: Option, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleMembership { + pub map: Vec, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleMembershipEntry { + pub key: RoleId, + pub value: RoleId, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleVars { + pub entries: Vec, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleVarsEntry { + pub key: String, + pub val: RoleVar, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum RoleVar { + Flat(String), + SqlSet(Vec), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct NetworkPolicyRule { + pub name: String, + pub address: String, + pub action: NetworkPolicyRuleAction, + pub direction: NetworkPolicyRuleDirection, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum NetworkPolicyRuleAction { + Allow, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub enum NetworkPolicyRuleDirection { + Ingress, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct AclMode { + pub bitflags: u64, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct MzAclItem { + pub grantee: RoleId, + pub grantor: RoleId, + pub acl_mode: AclMode, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct DefaultPrivilegesKey { + pub role_id: RoleId, + pub database_id: Option, + pub schema_id: Option, + pub object_type: ObjectType, + pub grantee: RoleId, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct DefaultPrivilegesValue { + pub privileges: AclMode, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SystemPrivilegesKey { + pub grantee: RoleId, + pub grantor: RoleId, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SystemPrivilegesValue { + pub acl_mode: AclMode, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct AuditLogEventV1 { + pub id: u64, + pub event_type: audit_log_event_v1::EventType, + pub object_type: audit_log_event_v1::ObjectType, + pub user: Option, + pub occurred_at: EpochMillis, + pub details: audit_log_event_v1::Details, +} + +pub mod audit_log_event_v1 { + use super::*; + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct IdFullNameV1 { + pub id: String, + pub name: FullNameV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct FullNameV1 { + pub database: String, + pub schema: String, + pub item: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct IdNameV1 { + pub id: String, + pub name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RenameClusterV1 { + pub id: String, + pub old_name: String, + pub new_name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RenameClusterReplicaV1 { + pub cluster_id: String, + pub replica_id: String, + pub old_name: String, + pub new_name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RenameItemV1 { + pub id: String, + pub old_name: FullNameV1, + pub new_name: FullNameV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateClusterReplicaV1 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub logical_size: String, + pub disk: bool, + pub billed_as: Option, + pub internal: bool, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateClusterReplicaV2 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub logical_size: String, + pub disk: bool, + pub billed_as: Option, + pub internal: bool, + pub reason: CreateOrDropClusterReplicaReasonV1, + pub scheduling_policies: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateClusterReplicaV3 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub logical_size: String, + pub disk: bool, + pub billed_as: Option, + pub internal: bool, + pub reason: CreateOrDropClusterReplicaReasonV1, + pub scheduling_policies: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateClusterReplicaV4 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub logical_size: String, + pub billed_as: Option, + pub internal: bool, + pub reason: CreateOrDropClusterReplicaReasonV1, + pub scheduling_policies: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct DropClusterReplicaV1 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct DropClusterReplicaV2 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub reason: CreateOrDropClusterReplicaReasonV1, + pub scheduling_policies: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct DropClusterReplicaV3 { + pub cluster_id: String, + pub cluster_name: String, + pub replica_id: Option, + pub replica_name: String, + pub reason: CreateOrDropClusterReplicaReasonV1, + pub scheduling_policies: Option, + } + + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateOrDropClusterReplicaReasonV1 { + pub reason: CreateOrDropClusterReplicaReasonV1Reason, + } + + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub enum CreateOrDropClusterReplicaReasonV1Reason { + Manual(Empty), + Schedule(Empty), + System(Empty), + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct SchedulingDecisionsWithReasonsV1 { + pub on_refresh: RefreshDecisionWithReasonV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct SchedulingDecisionsWithReasonsV2 { + pub on_refresh: RefreshDecisionWithReasonV2, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub enum RefreshDecision { + On(Empty), + Off(Empty), + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RefreshDecisionWithReasonV1 { + pub objects_needing_refresh: Vec, + pub rehydration_time_estimate: String, + pub decision: RefreshDecision, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RefreshDecisionWithReasonV2 { + pub objects_needing_refresh: Vec, + pub objects_needing_compaction: Vec, + pub rehydration_time_estimate: String, + pub decision: RefreshDecision, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateSourceSinkV1 { + pub id: String, + pub name: FullNameV1, + pub size: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateSourceSinkV2 { + pub id: String, + pub name: FullNameV1, + pub size: Option, + pub external_type: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateSourceSinkV3 { + pub id: String, + pub name: FullNameV1, + pub external_type: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateSourceSinkV4 { + pub id: String, + pub cluster_id: Option, + pub name: FullNameV1, + pub external_type: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateIndexV1 { + pub id: String, + pub cluster_id: String, + pub name: FullNameV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateMaterializedViewV1 { + pub id: String, + pub cluster_id: String, + pub name: FullNameV1, + pub replacement_target_id: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct AlterApplyReplacementV1 { + pub target: IdFullNameV1, + pub replacement: IdFullNameV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct AlterSourceSinkV1 { + pub id: String, + pub name: FullNameV1, + pub old_size: Option, + pub new_size: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct AlterSetClusterV1 { + pub id: String, + pub name: FullNameV1, + pub old_cluster_id: String, + pub new_cluster_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct GrantRoleV1 { + pub role_id: String, + pub member_id: String, + pub grantor_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct GrantRoleV2 { + pub role_id: String, + pub member_id: String, + pub grantor_id: String, + pub executed_by: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RevokeRoleV1 { + pub role_id: String, + pub member_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RevokeRoleV2 { + pub role_id: String, + pub member_id: String, + pub grantor_id: String, + pub executed_by: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct UpdatePrivilegeV1 { + pub object_id: String, + pub grantee_id: String, + pub grantor_id: String, + pub privileges: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct AlterDefaultPrivilegeV1 { + pub role_id: String, + pub database_id: Option, + pub schema_id: Option, + pub grantee_id: String, + pub privileges: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct UpdateOwnerV1 { + pub object_id: String, + pub old_owner_id: String, + pub new_owner_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct SchemaV1 { + pub id: String, + pub name: String, + pub database_name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct SchemaV2 { + pub id: String, + pub name: String, + pub database_name: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RenameSchemaV1 { + pub id: String, + pub database_name: Option, + pub old_name: String, + pub new_name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct UpdateItemV1 { + pub id: String, + pub name: FullNameV1, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct AlterRetainHistoryV1 { + pub id: String, + pub old_history: Option, + pub new_history: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct ToNewIdV1 { + pub id: String, + pub new_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct FromPreviousIdV1 { + pub id: String, + pub previous_id: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct SetV1 { + pub name: String, + pub value: Option, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct RotateKeysV1 { + pub id: String, + pub name: String, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub struct CreateRoleV1 { + pub id: String, + pub name: String, + pub auto_provision_source: Option, + } + + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize_repr, + Deserialize_repr, + Arbitrary + )] + #[repr(u8)] + pub enum EventType { + Unknown = 0, + Create = 1, + Drop = 2, + Alter = 3, + Grant = 4, + Revoke = 5, + Comment = 6, + } + + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize_repr, + Deserialize_repr, + Arbitrary + )] + #[repr(u8)] + pub enum ObjectType { + Unknown = 0, + Cluster = 1, + ClusterReplica = 2, + Connection = 3, + Database = 4, + Func = 5, + Index = 6, + MaterializedView = 7, + Role = 8, + Secret = 9, + Schema = 10, + Sink = 11, + Source = 12, + Table = 13, + Type = 14, + View = 15, + System = 16, + ContinualTask = 17, + NetworkPolicy = 18, + } + + #[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary + )] + pub enum Details { + CreateClusterReplicaV1(CreateClusterReplicaV1), + CreateClusterReplicaV2(CreateClusterReplicaV2), + CreateClusterReplicaV3(CreateClusterReplicaV3), + CreateClusterReplicaV4(CreateClusterReplicaV4), + DropClusterReplicaV1(DropClusterReplicaV1), + DropClusterReplicaV2(DropClusterReplicaV2), + DropClusterReplicaV3(DropClusterReplicaV3), + CreateSourceSinkV1(CreateSourceSinkV1), + CreateSourceSinkV2(CreateSourceSinkV2), + AlterSourceSinkV1(AlterSourceSinkV1), + AlterSetClusterV1(AlterSetClusterV1), + GrantRoleV1(GrantRoleV1), + GrantRoleV2(GrantRoleV2), + RevokeRoleV1(RevokeRoleV1), + RevokeRoleV2(RevokeRoleV2), + UpdatePrivilegeV1(UpdatePrivilegeV1), + AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1), + UpdateOwnerV1(UpdateOwnerV1), + IdFullNameV1(IdFullNameV1), + RenameClusterV1(RenameClusterV1), + RenameClusterReplicaV1(RenameClusterReplicaV1), + RenameItemV1(RenameItemV1), + IdNameV1(IdNameV1), + SchemaV1(SchemaV1), + SchemaV2(SchemaV2), + RenameSchemaV1(RenameSchemaV1), + UpdateItemV1(UpdateItemV1), + CreateSourceSinkV3(CreateSourceSinkV3), + AlterRetainHistoryV1(AlterRetainHistoryV1), + ToNewIdV1(ToNewIdV1), + FromPreviousIdV1(FromPreviousIdV1), + SetV1(SetV1), + ResetAllV1(Empty), + RotateKeysV1(RotateKeysV1), + CreateSourceSinkV4(CreateSourceSinkV4), + CreateIndexV1(CreateIndexV1), + CreateMaterializedViewV1(CreateMaterializedViewV1), + AlterApplyReplacementV1(AlterApplyReplacementV1), + CreateRoleV1(CreateRoleV1), + } +} + +/// The contents of a single state update. +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +// Serialize the top-level enum in the persist-backed catalog as internally tagged to set up +// persist pushdown statistics for success. +#[serde(tag = "kind")] +pub enum StateUpdateKind { + AuditLog(AuditLog), + Cluster(Cluster), + ClusterIntrospectionSourceIndex(ClusterIntrospectionSourceIndex), + ClusterReplica(ClusterReplica), + Comment(Comment), + Config(Config), + Database(Database), + DefaultPrivileges(DefaultPrivileges), + FenceToken(FenceToken), + GidMapping(GidMapping), + IdAlloc(IdAlloc), + Item(Item), + NetworkPolicy(NetworkPolicy), + Role(Role), + RoleAuth(RoleAuth), + Schema(Schema), + ServerConfiguration(ServerConfiguration), + Setting(Setting), + SourceReferences(SourceReferences), + StorageCollectionMetadata(StorageCollectionMetadata), + SystemPrivileges(SystemPrivileges), + TxnWalShard(TxnWalShard), + UnfinalizedShard(UnfinalizedShard), +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct AuditLog { + pub key: AuditLogKey, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Cluster { + pub key: ClusterKey, + pub value: ClusterValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterReplica { + pub key: ClusterReplicaKey, + pub value: ClusterReplicaValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Comment { + pub key: CommentKey, + pub value: CommentValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Config { + pub key: ConfigKey, + pub value: ConfigValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Database { + pub key: DatabaseKey, + pub value: DatabaseValue, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct DefaultPrivileges { + pub key: DefaultPrivilegesKey, + pub value: DefaultPrivilegesValue, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct FenceToken { + pub deploy_generation: u64, + pub epoch: i64, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct IdAlloc { + pub key: IdAllocKey, + pub value: IdAllocValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ClusterIntrospectionSourceIndex { + pub key: ClusterIntrospectionSourceIndexKey, + pub value: ClusterIntrospectionSourceIndexValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Item { + pub key: ItemKey, + pub value: ItemValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Role { + pub key: RoleKey, + pub value: RoleValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct RoleAuth { + pub key: RoleAuthKey, + pub value: RoleAuthValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct NetworkPolicy { + pub key: NetworkPolicyKey, + pub value: NetworkPolicyValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Schema { + pub key: SchemaKey, + pub value: SchemaValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct Setting { + pub key: SettingKey, + pub value: SettingValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct ServerConfiguration { + pub key: ServerConfigurationKey, + pub value: ServerConfigurationValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SourceReferences { + pub key: SourceReferencesKey, + pub value: SourceReferencesValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct GidMapping { + pub key: GidMappingKey, + pub value: GidMappingValue, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct SystemPrivileges { + pub key: SystemPrivilegesKey, + pub value: SystemPrivilegesValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct StorageCollectionMetadata { + pub key: StorageCollectionMetadataKey, + pub value: StorageCollectionMetadataValue, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct UnfinalizedShard { + pub key: UnfinalizedShardKey, +} + +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Arbitrary +)] +pub struct TxnWalShard { + pub value: TxnWalShardValue, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize_repr, + Deserialize_repr, + Arbitrary +)] +#[repr(u8)] +pub enum CatalogItemType { + Unknown = 0, + Table = 1, + Source = 2, + Sink = 3, + View = 4, + MaterializedView = 5, + Index = 6, + Type = 7, + Func = 8, + Secret = 9, + Connection = 10, +} + +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Serialize_repr, + Deserialize_repr, + Arbitrary +)] +#[repr(u8)] +pub enum ObjectType { + Unknown = 0, + Table = 1, + View = 2, + MaterializedView = 3, + Source = 4, + Sink = 5, + Index = 6, + Type = 7, + Role = 8, + Cluster = 9, + ClusterReplica = 10, + Secret = 11, + Connection = 12, + Database = 13, + Schema = 14, + Func = 15, + NetworkPolicy = 17, +} diff --git a/src/catalog/src/durable/upgrade.rs b/src/catalog/src/durable/upgrade.rs index 3e3b6530e677f..b01fec558b998 100644 --- a/src/catalog/src/durable/upgrade.rs +++ b/src/catalog/src/durable/upgrade.rs @@ -240,7 +240,7 @@ macro_rules! objects { } } -objects!([v74, v75, v76, v77, v78], [v79, v80, v81, v82]); +objects!([v74, v75, v76, v77, v78], [v79, v80, v81, v82, v83]); /// The current version of the `Catalog`. pub use mz_catalog_protos::CATALOG_VERSION; @@ -260,6 +260,7 @@ mod v78_to_v79; mod v79_to_v80; mod v80_to_v81; mod v81_to_v82; +mod v82_to_v83; /// Describes a single action to take during a migration from `V1` to `V2`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -414,6 +415,18 @@ async fn run_upgrade( ) .await } + 82 => { + // Uses the raw-bytes migration entry point because the migration + // needs to retract on-disk records by their exact bytes — see + // v82_to_v83.rs for the full reasoning. + run_versioned_upgrade_raw( + unopened_catalog_state, + version, + commit_ts, + v82_to_v83::upgrade, + ) + .await + } // Up-to-date, no migration needed! CATALOG_VERSION => Ok((CATALOG_VERSION, commit_ts)), @@ -488,6 +501,87 @@ async fn run_versioned_upgrade` and returns +/// `Vec<(StateUpdateKindJson, Diff)>`. Each returned update is appended to the +/// catalog directly; the framework only adds the standard version-key +/// retraction/insertion on top. +/// +/// Returns the new version and upper. +async fn run_versioned_upgrade_raw( + unopened_catalog_state: &mut UnopenedPersistCatalogState, + current_version: u64, + mut commit_ts: Timestamp, + migration_logic: F, +) -> Result<(u64, Timestamp), CatalogError> +where + V1: IntoStateUpdateKindJson, + F: FnOnce(Vec<(StateUpdateKindJson, V1)>) -> Vec<(StateUpdateKindJson, Diff)>, +{ + tracing::info!(current_version, "running versioned Catalog upgrade (raw)"); + + // 1. Pair each on-disk JSON record with its V1-deserialized form. + let snapshot: Vec<_> = unopened_catalog_state + .snapshot + .iter() + .map(|(kind, ts, diff)| { + soft_assert_eq_or_log!( + *diff, + Diff::ONE, + "snapshot is consolidated, ({kind:?}, {ts:?}, {diff:?})" + ); + let typed = V1::try_from(kind.clone()).expect("invalid catalog data persisted"); + (kind.clone(), typed) + }) + .collect(); + + // 2. Run version-specific migration logic. + let mut updates = migration_logic(snapshot); + // Validate that we're not migrating an un-migratable collection. + for (update, _) in &updates { + if update.is_always_deserializable() { + panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}"); + } + } + + // 3. Add the version-key retraction + insertion. + let next_version = current_version + 1; + updates.push((version_update_kind(current_version), Diff::MINUS_ONE)); + updates.push((version_update_kind(next_version), Diff::ONE)); + + // 4. Apply migration to catalog. + if matches!(unopened_catalog_state.mode, Mode::Writable) { + commit_ts = unopened_catalog_state + .compare_and_append(updates, commit_ts) + .await + .map_err(|e| e.unwrap_fence_error())?; + } else { + let ts = commit_ts; + let updates = updates + .into_iter() + .map(|(kind, diff)| StateUpdate { kind, ts, diff }); + commit_ts = commit_ts.step_forward(); + unopened_catalog_state.apply_updates_and_consolidate(updates)?; + } + + // 5. Consolidate snapshot to remove old versions. + unopened_catalog_state.consolidate(); + + Ok((next_version, commit_ts)) +} + /// Generates a [`proto::StateUpdateKind`] to update the user version. fn version_update_kind(version: u64) -> StateUpdateKindJson { // We can use the current version because Configs can never be migrated and are always wire diff --git a/src/catalog/src/durable/upgrade/v82_to_v83.rs b/src/catalog/src/durable/upgrade/v82_to_v83.rs new file mode 100644 index 0000000000000..47e915875788a --- /dev/null +++ b/src/catalog/src/durable/upgrade/v82_to_v83.rs @@ -0,0 +1,287 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Repair migration for the v80→v81 `is_cloud` heuristic bug. +//! +//! The v80→v81 migration (`v80_to_v81.rs`) was guarded by an `is_cloud` +//! heuristic that returned false for any catalog where, at the moment the +//! migration committed in writable mode, `mz_system` was not a `Managed` +//! cluster with `replication_factor > 0`, or where `enable_password_auth` +//! was `"on"`. Affected catalogs ran the migration with the heuristic +//! returning false; the migration emitted `Vec::new()` and left every +//! existing `Role` record's on-disk JSON in v80 wire shape — that is, +//! without an `auto_provision_source` key. +//! +//! On v81+ the value is still readable (serde fills `None` for the missing +//! `Option` field), so the env doesn't crash. But any +//! in-band write that retracts a Role record — `ALTER ROLE`, membership +//! changes, etc. — re-serializes the deserialized value through the v81 +//! type's `Serialize`, which emits `"auto_provision_source": null`. Those +//! retraction bytes are not byte-equal to the v80-shape bytes on disk, so +//! `differential_dataflow` consolidation cannot cancel them; the original +//! `+1` survives as a ghost record alongside whatever else accumulates. +//! +//! This migration: +//! +//! 1. Iterates every Role record paired with its raw on-disk JSON. +//! 2. Inspects the raw JSON to determine whether `auto_provision_source` +//! is present as a key in `value.attributes`. +//! 3. Decides the target `auto_provision_source`: +//! - Field present and `Some(...)` on disk → preserve the value. +//! - Field absent on disk (v80 wire shape) → apply the same email +//! regex heuristic as the original v80→v81 migration: +//! `Frontegg` if the role name matches `.+@.+\..+`, else `None`. +//! The classification is **unconditional** — no `is_cloud` gate. +//! That avoids reintroducing the same trap. +//! - Field present and explicitly `null` on disk → `None`. +//! 4. Constructs the canonical v83 Role bytes (v83 uses +//! `#[serde(default, skip_serializing_if = "Option::is_none")]` on +//! `auto_provision_source`, so `None` round-trips to no-field and +//! `Some(_)` round-trips to the value — byte-stable in both cases). +//! 5. If the canonical v83 bytes differ from the on-disk bytes, emits a +//! raw retraction of the exact on-disk bytes and an insertion of the +//! canonical v83 bytes. The byte-precise retraction is why this +//! migration uses [`run_versioned_upgrade_raw`] rather than the +//! standard `MigrationAction::Update` path. +//! +//! Records other than Role are untouched. v83 is structurally identical to +//! v82 for every other type, so existing v82-shape bytes remain valid v83 +//! bytes without rewriting. + +use mz_repr::Diff; +use mz_repr::adt::regex::Regex; + +use crate::durable::objects::state_update::StateUpdateKindJson; +use crate::durable::upgrade::json_compatible::JsonCompatible; +use crate::durable::upgrade::objects_v82 as v82; +use crate::durable::upgrade::objects_v83 as v83; + +pub fn upgrade( + snapshot: Vec<(StateUpdateKindJson, v82::StateUpdateKind)>, +) -> Vec<(StateUpdateKindJson, Diff)> { + let email_regex = Regex::new(r".+@.+\..+", true).expect("valid regex"); + let mut updates = Vec::new(); + + for (raw, kind) in snapshot { + let v82::StateUpdateKind::Role(role) = kind else { + continue; + }; + + let field_present_on_disk = role_has_auto_provision_source_key(&raw); + + let target_aps: Option = match ( + &role.value.attributes.auto_provision_source, + field_present_on_disk, + ) { + // Field already set to a concrete value — preserve it. + (Some(aps), _) => Some(map_aps_v82_to_v83(aps)), + // Field missing on disk: pre-v81 wire shape. Apply email + // heuristic, unconditionally. + (None, false) => { + if email_regex.is_match(&role.value.name) { + Some(v83::AutoProvisionSource::Frontegg) + } else { + None + } + } + // Field present-and-null on disk: preserve None. + (None, true) => None, + }; + + let new_role = v83::StateUpdateKind::Role(v83::Role { + key: JsonCompatible::convert(&role.key), + value: v83::RoleValue { + name: role.value.name.clone(), + attributes: v83::RoleAttributes { + inherit: role.value.attributes.inherit, + superuser: role.value.attributes.superuser, + login: role.value.attributes.login, + auto_provision_source: target_aps, + }, + membership: JsonCompatible::convert(&role.value.membership), + vars: JsonCompatible::convert(&role.value.vars), + oid: role.value.oid, + }, + }); + let new_raw: StateUpdateKindJson = new_role.into(); + + if new_raw == raw { + // Already in canonical v83 shape — no rewrite needed. + continue; + } + + // Retract by exact on-disk bytes; insert canonical v83. + updates.push((raw, Diff::MINUS_ONE)); + updates.push((new_raw, Diff::ONE)); + } + + updates +} + +/// Returns true iff the raw JSON for a `Role` `StateUpdateKind` contains an +/// `auto_provision_source` key inside `value.attributes`. The value (`null` +/// vs concrete) is not inspected here. +fn role_has_auto_provision_source_key(raw: &StateUpdateKindJson) -> bool { + let Ok(value): Result = raw.try_to_serde() else { + return false; + }; + value + .get("Role") + .and_then(|r| r.get("value")) + .and_then(|v| v.get("attributes")) + .and_then(|a| a.as_object()) + .map(|attrs| attrs.contains_key("auto_provision_source")) + .unwrap_or(false) +} + +fn map_aps_v82_to_v83(aps: &v82::AutoProvisionSource) -> v83::AutoProvisionSource { + match aps { + v82::AutoProvisionSource::Oidc => v83::AutoProvisionSource::Oidc, + v82::AutoProvisionSource::Frontegg => v83::AutoProvisionSource::Frontegg, + v82::AutoProvisionSource::None => v83::AutoProvisionSource::None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::durable::upgrade::objects_v82 as v82; + + fn make_v82_role_raw(name: &str, include_aps: bool, aps: Option<&str>) -> StateUpdateKindJson { + // Build the JSON shape directly so we can produce both v80-shape + // (no auto_provision_source key) and v82-shape (key present) bytes. + let attrs = if include_aps { + match aps { + Some(v) => serde_json::json!({ + "inherit": true, + "superuser": null, + "login": null, + "auto_provision_source": v, + }), + None => serde_json::json!({ + "inherit": true, + "superuser": null, + "login": null, + "auto_provision_source": null, + }), + } + } else { + serde_json::json!({ + "inherit": true, + "superuser": null, + "login": null, + }) + }; + let full = serde_json::json!({ + "Role": { + "key": { "id": { "User": 1 } }, + "value": { + "name": name, + "attributes": attrs, + "membership": { "map": [] }, + "vars": { "entries": [] }, + "oid": 1u32, + }, + } + }); + StateUpdateKindJson::from_serde(&full) + } + + #[mz_ore::test] + fn test_v80_shape_email_named_gets_frontegg() { + let raw = make_v82_role_raw("user@example.com", false, None); + let kind: v82::StateUpdateKind = raw.clone().into(); + let updates = upgrade(vec![(raw.clone(), kind)]); + // 1 retraction, 1 insertion + assert_eq!(updates.len(), 2); + assert_eq!(updates[0].1, Diff::MINUS_ONE); + assert_eq!(updates[0].0, raw, "retraction must byte-match on-disk"); + assert_eq!(updates[1].1, Diff::ONE); + // The insertion's value should now decode as v83 with Frontegg. + let new_kind: v83::StateUpdateKind = updates[1].0.clone().into(); + let v83::StateUpdateKind::Role(new_role) = new_kind else { + panic!("expected Role"); + }; + assert_eq!( + new_role.value.attributes.auto_provision_source, + Some(v83::AutoProvisionSource::Frontegg) + ); + } + + #[mz_ore::test] + fn test_v80_shape_non_email_stays_none_and_normalizes() { + let raw = make_v82_role_raw("service_account", false, None); + let kind: v82::StateUpdateKind = raw.clone().into(); + let updates = upgrade(vec![(raw.clone(), kind)]); + assert_eq!(updates.len(), 2); + assert_eq!(updates[0].0, raw, "retraction must byte-match on-disk"); + // Insertion's value decodes with None. v83 serializes None as + // skip-field, so re-deserializing yields None and re-serializing + // back is byte-stable. + let new_kind: v83::StateUpdateKind = updates[1].0.clone().into(); + let v83::StateUpdateKind::Role(new_role) = new_kind else { + panic!("expected Role"); + }; + assert_eq!(new_role.value.attributes.auto_provision_source, None); + // Round-trip: re-serializing should equal the canonical insertion. + let reserialized: StateUpdateKindJson = v83::StateUpdateKind::Role(new_role).into(); + assert_eq!(reserialized, updates[1].0); + } + + #[mz_ore::test] + fn test_v82_shape_frontegg_preserves_value() { + let raw = make_v82_role_raw("user@example.com", true, Some("Frontegg")); + let kind: v82::StateUpdateKind = raw.clone().into(); + let updates = upgrade(vec![(raw.clone(), kind)]); + // v82-shape bytes (with field present) differ from v83 canonical + // (which would also have field present for Some) only if the v82 + // and v83 enum serializations differ — they don't. So no rewrite. + assert!( + updates.is_empty(), + "v82-shape role with Frontegg should be canonical v83 already" + ); + } + + #[mz_ore::test] + fn test_v82_shape_explicit_null_becomes_no_field() { + let raw = make_v82_role_raw("user@example.com", true, None); + let kind: v82::StateUpdateKind = raw.clone().into(); + let updates = upgrade(vec![(raw.clone(), kind)]); + // On-disk: `"auto_provision_source": null`. Canonical v83: no field. + // Different bytes → rewrite expected. And: because the field was + // *present* on disk, we DO NOT apply the email regex — we preserve + // None. + assert_eq!(updates.len(), 2); + assert_eq!(updates[0].0, raw, "retraction must byte-match on-disk"); + let new_kind: v83::StateUpdateKind = updates[1].0.clone().into(); + let v83::StateUpdateKind::Role(new_role) = new_kind else { + panic!("expected Role"); + }; + assert_eq!( + new_role.value.attributes.auto_provision_source, None, + "field was explicitly null on disk; must not be reclassified", + ); + } + + #[mz_ore::test] + fn test_non_role_records_untouched() { + // Build a Setting record and confirm it's not touched. + let kind = v82::StateUpdateKind::Setting(v82::Setting { + key: v82::SettingKey { + name: "test".to_string(), + }, + value: v82::SettingValue { + value: "x".to_string(), + }, + }); + let raw: StateUpdateKindJson = kind.clone().into(); + let updates = upgrade(vec![(raw, kind)]); + assert!(updates.is_empty()); + } +}