From 9b920a6ea5936ad08919c8a099c8000eb685965f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 28 Mar 2026 17:06:00 +0800 Subject: [PATCH 1/3] feat(cloud): add support for maintenance window handling --- cloud/data_source_pulsar_cluster.go | 20 +- cloud/pulsar_cluster_customize_diff_test.go | 56 ++++++ cloud/pulsar_cluster_state_test.go | 111 +++++++++++ cloud/pulsar_cluster_test.go | 75 +++++++- cloud/resource_pulsar_cluster.go | 202 ++++++++++++++------ docs/data-sources/pulsar_cluster.md | 2 + docs/resources/pulsar_cluster.md | 2 + 7 files changed, 405 insertions(+), 63 deletions(-) create mode 100644 cloud/pulsar_cluster_customize_diff_test.go diff --git a/cloud/data_source_pulsar_cluster.go b/cloud/data_source_pulsar_cluster.go index eb0c9ca..f828538 100644 --- a/cloud/data_source_pulsar_cluster.go +++ b/cloud/data_source_pulsar_cluster.go @@ -290,7 +290,7 @@ func dataSourcePulsarCluster() *schema.Resource { "maintenance_window": { Type: schema.TypeList, Computed: true, - Description: "Maintenance window configuration for the Pulsar cluster", + Description: "Maintenance window configuration reported by the control plane for the Pulsar cluster.", Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "window": { @@ -427,7 +427,9 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("release_channel", releaseChannel) } - _ = d.Set("instance_name", pulsarInstance.Name) + if diagErr := setPulsarClusterDataSourceIdentityState(d, pulsarCluster, pulsarInstance); diagErr != nil { + return diagErr + } // Set lakehouse_storage_enabled if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless { @@ -500,3 +502,17 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me d.SetId(fmt.Sprintf("%s/%s", pulsarCluster.Namespace, pulsarCluster.Name)) return nil } + +func setPulsarClusterDataSourceIdentityState( + d *schema.ResourceData, + pulsarCluster *cloudv1alpha1.PulsarCluster, + pulsarInstance *cloudv1alpha1.PulsarInstance, +) diag.Diagnostics { + if err := d.Set("instance_name", pulsarInstance.Name); err != nil { + return diag.FromErr(fmt.Errorf("ERROR_SET_INSTANCE_NAME: %w", err)) + } + if err := d.Set("location", pulsarCluster.Spec.Location); err != nil { + return diag.FromErr(fmt.Errorf("ERROR_SET_LOCATION: %w", err)) + } + return nil +} diff --git a/cloud/pulsar_cluster_customize_diff_test.go b/cloud/pulsar_cluster_customize_diff_test.go new file mode 100644 index 0000000..8b315e5 --- /dev/null +++ b/cloud/pulsar_cluster_customize_diff_test.go @@ -0,0 +1,56 @@ +// Copyright 2024 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloud + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestClearServerlessLakehouseStorageDiff(t *testing.T) { + resource := &schema.Resource{ + Schema: map[string]*schema.Schema{ + "lakehouse_storage_enabled": { + Type: schema.TypeBool, + Optional: true, + Computed: true, + }, + }, + CustomizeDiff: func(ctx context.Context, diff *schema.ResourceDiff, meta interface{}) error { + clearServerlessLakehouseStorageDiff(ctx, diff) + return nil + }, + } + + state := &terraform.InstanceState{ + ID: "org/cluster", + Attributes: map[string]string{ + "id": "org/cluster", + "lakehouse_storage_enabled": "true", + }, + } + config := terraform.NewResourceConfigRaw(map[string]interface{}{ + "lakehouse_storage_enabled": false, + }) + + instanceDiff, err := resource.SimpleDiff(context.Background(), state, config, nil) + require.NoError(t, err) + assert.Empty(t, instanceDiff.Attributes) +} diff --git a/cloud/pulsar_cluster_state_test.go b/cloud/pulsar_cluster_state_test.go index 0fab05b..35aaca1 100644 --- a/cloud/pulsar_cluster_state_test.go +++ b/cloud/pulsar_cluster_state_test.go @@ -15,7 +15,9 @@ package cloud import ( + "context" "testing" + "time" cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1" "github.com/stretchr/testify/assert" @@ -148,3 +150,112 @@ func TestSetPulsarClusterIdentityStateImportBYOC(t *testing.T) { assert.Equal(t, "", resourceData.Get("location")) assert.Equal(t, "pool-member-d", resourceData.Get("pool_member_name")) } + +func TestSetPulsarClusterDataSourceIdentityState(t *testing.T) { + resourceData := dataSourcePulsarCluster().TestResourceData() + + cluster := &cloudv1alpha1.PulsarCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-e", + Namespace: "org-e", + }, + Spec: cloudv1alpha1.PulsarClusterSpec{ + Location: "us-central1", + }, + } + instance := &cloudv1alpha1.PulsarInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: "instance-e", + Namespace: "org-e", + }, + } + + diagErr := setPulsarClusterDataSourceIdentityState(resourceData, cluster, instance) + assert.Nil(t, diagErr) + assert.Equal(t, "instance-e", resourceData.Get("instance_name")) + assert.Equal(t, "us-central1", resourceData.Get("location")) +} + +func TestValidateMaintenanceWindowAccepted(t *testing.T) { + expected := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "0,1", + Window: &cloudv1alpha1.Window{ + StartTime: "02:00", + Duration: &metav1.Duration{Duration: 2 * time.Hour}, + }, + } + + err := validateMaintenanceWindowAccepted(expected, expected.DeepCopy(), "UPDATE") + assert.NoError(t, err) +} + +func TestValidateMaintenanceWindowAcceptedWhenDropped(t *testing.T) { + expected := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "0,1", + } + + err := validateMaintenanceWindowAccepted(expected, nil, "CREATE") + assert.EqualError(t, err, "ERROR_CREATE_PULSAR_CLUSTER: maintenance_window is not enabled for this organization") +} + +func TestExpandMaintenanceWindow(t *testing.T) { + duration := 2 * time.Hour + + maintenanceWindow := expandMaintenanceWindow(context.Background(), []interface{}{ + map[string]interface{}{ + "recurrence": "0,1", + "window": []interface{}{ + map[string]interface{}{ + "start_time": "02:00", + "duration": "2h0m0s", + }, + }, + }, + }) + + if assert.NotNil(t, maintenanceWindow) { + assert.Equal(t, "0,1", maintenanceWindow.Recurrence) + if assert.NotNil(t, maintenanceWindow.Window) { + assert.Equal(t, "02:00", maintenanceWindow.Window.StartTime) + if assert.NotNil(t, maintenanceWindow.Window.Duration) { + assert.Equal(t, duration, maintenanceWindow.Window.Duration.Duration) + } + } + } +} + +func TestExpandMaintenanceWindowEmpty(t *testing.T) { + assert.Nil(t, expandMaintenanceWindow(context.Background(), nil)) + assert.Nil(t, expandMaintenanceWindow(context.Background(), []interface{}{})) +} + +func TestMaintenanceWindowEqual(t *testing.T) { + expected := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "0,1", + Window: &cloudv1alpha1.Window{ + StartTime: "02:00", + Duration: &metav1.Duration{Duration: 2 * time.Hour}, + }, + } + + actual := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "0,1", + Window: &cloudv1alpha1.Window{ + StartTime: "02:00", + Duration: &metav1.Duration{Duration: 2 * time.Hour}, + }, + } + + assert.True(t, maintenanceWindowEqual(expected, actual)) +} + +func TestMaintenanceWindowEqualWhenDifferent(t *testing.T) { + expected := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "0,1", + } + actual := &cloudv1alpha1.MaintenanceWindow{ + Recurrence: "2,3", + } + + assert.False(t, maintenanceWindowEqual(expected, actual)) +} diff --git a/cloud/pulsar_cluster_test.go b/cloud/pulsar_cluster_test.go index eb9e041..88d5fa0 100644 --- a/cloud/pulsar_cluster_test.go +++ b/cloud/pulsar_cluster_test.go @@ -333,6 +333,46 @@ func TestPulsarClusterNoConfigConfigDrift(t *testing.T) { }) } +func TestPulsarClusterServerlessLakehouseStorageDrift(t *testing.T) { + var clusterGeneratedName = fmt.Sprintf("t-%d-%d", rand.Intn(1000), rand.Intn(100)) + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: testCheckPulsarClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testResourceDataSourcePulsarClusterServerlessWithoutLakehouseStorage( + "sndev", + clusterGeneratedName, + "shared-gcp-prod", + "streamnative", + "us-central1"), + Check: resource.ComposeTestCheckFunc( + testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""), + resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "lakehouse_storage_enabled", "true"), + ), + }, + { + Config: testResourceDataSourcePulsarClusterServerlessWithoutLakehouseStorage( + "sndev", + clusterGeneratedName, + "shared-gcp-prod", + "streamnative", + "us-central1"), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, + }, + }) +} + func testCheckPulsarClusterDestroy(s *terraform.State) error { time.Sleep(30 * time.Second) for _, rs := range s.RootModule().Resources { @@ -517,7 +557,40 @@ data "streamnative_pulsar_cluster" "test-pulsar-cluster" { organization = streamnative_pulsar_cluster.test-pulsar-cluster.organization name = streamnative_pulsar_cluster.test-pulsar-cluster.name } -`, organization, name, poolName, poolNamespace, organization, name, name, location, releaseChannel) + `, organization, name, poolName, poolNamespace, organization, name, name, location, releaseChannel) +} + +func testResourceDataSourcePulsarClusterServerlessWithoutLakehouseStorage( + organization, + name, + poolName, + poolNamespace, + location string, +) string { + return fmt.Sprintf(` +provider "streamnative" { +} +resource "streamnative_pulsar_instance" "test-pulsar-instance" { + organization = "%s" + name = "%s" + availability_mode = "zonal" + pool_name = "%s" + pool_namespace = "%s" + type = "serverless" +} +resource "streamnative_pulsar_cluster" "test-pulsar-cluster" { + organization = "%s" + name = "%s" + instance_name = "%s" + location = "%s" + depends_on = [streamnative_pulsar_instance.test-pulsar-instance] +} +data "streamnative_pulsar_cluster" "test-pulsar-cluster" { + depends_on = [streamnative_pulsar_cluster.test-pulsar-cluster] + organization = streamnative_pulsar_cluster.test-pulsar-cluster.organization + name = streamnative_pulsar_cluster.test-pulsar-cluster.name +} +`, organization, name, poolName, poolNamespace, organization, name, name, location) } func testResourceDataSourcePulsarClusterWithMaintenanceWindowUpdated(organization, name, poolName, poolNamespace, location, releaseChannel string) string { diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index 5d88e60..8ab3a06 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -45,8 +45,8 @@ func resourcePulsarCluster() *schema.Resource { oldOrg, _ := diff.GetChange("organization") oldName, newName := diff.GetChange("name") if oldOrg.(string) == "" && oldName.(string) == "" { - // For serverless clusters, make lakehouse_storage_enabled computed - makeLakehouseStorageComputedForServerless(ctx, diff, i) + // For serverless clusters, suppress diffs for provider-managed lakehouse storage. + suppressServerlessLakehouseStorageDiff(ctx, diff, i) return nil } if oldName != "" && newName == "" { @@ -57,8 +57,8 @@ func resourcePulsarCluster() *schema.Resource { return fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " + "The pulsar cluster organization, name, instance_name, location, pool_member_name does not support updates, please recreate it") } - // For serverless clusters, make lakehouse_storage_enabled computed - makeLakehouseStorageComputedForServerless(ctx, diff, i) + // For serverless clusters, suppress diffs for provider-managed lakehouse storage. + suppressServerlessLakehouseStorageDiff(ctx, diff, i) return nil }, Importer: &schema.ResourceImporter{ @@ -385,7 +385,7 @@ func resourcePulsarCluster() *schema.Resource { Type: schema.TypeList, Optional: true, Computed: true, - Description: "Maintenance window configuration for the pulsar cluster", + Description: "Maintenance window configuration for the pulsar cluster. This field is available only when maintenance windows are enabled for the organization.", Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "window": { @@ -570,6 +570,7 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me } } } + pulsarCluster.Spec.MaintenanceWindow = expandMaintenanceWindow(ctx, d.Get("maintenance_window").([]interface{})) if pulsarInstance.Spec.Type != cloudv1alpha1.PulsarInstanceTypeServerless && !pulsarInstance.IsUsingUrsaEngine() { getPulsarClusterChanged(ctx, pulsarCluster, d) } @@ -649,6 +650,9 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me } pulsarCluster.Annotations["cloud.streamnative.io/sdt-enabled"] = "true" } + if diagErr := ensureMaintenanceWindowAcceptedOnDryRun(ctx, clientSet, namespace, pulsarCluster, "CREATE"); diagErr != nil { + return diagErr + } pc, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{ FieldManager: "terraform-create", @@ -1024,7 +1028,6 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err)) } - // Validate lakehouse_storage_enabled update: once enabled, cannot be disabled // For serverless clusters, skip validation as it's computed if serverless != string(cloudv1alpha1.PulsarInstanceTypeServerless) { @@ -1063,6 +1066,10 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me int64(storageUnit*8*1024*1024*1024), resource.DecimalSI) } changed := getPulsarClusterChanged(ctx, pulsarCluster, d) + if d.HasChange("maintenance_window") { + pulsarCluster.Spec.MaintenanceWindow = expandMaintenanceWindow(ctx, d.Get("maintenance_window").([]interface{})) + changed = true + } if displayNameChanged { displayName := d.Get("display_name").(string) pulsarCluster.Spec.DisplayName = displayName @@ -1162,6 +1169,11 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me d.HasChange("storage_unit") || d.HasChange("compute_unit_per_broker") || d.HasChange("storage_unit_per_bookie") || changed || displayNameChanged { + if d.HasChange("maintenance_window") && hasMaintenanceWindowConfigured(d) { + if diagErr := ensureMaintenanceWindowAcceptedOnDryRun(ctx, clientSet, namespace, pulsarCluster, "UPDATE"); diagErr != nil { + return diagErr + } + } _, err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Update(ctx, pulsarCluster, metav1.UpdateOptions{ FieldManager: "terraform-update", }) @@ -1358,59 +1370,127 @@ func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.P } } - // Handle maintenance_window configuration - if d.HasChange("maintenance_window") { - maintenanceWindow := d.Get("maintenance_window").([]interface{}) - if len(maintenanceWindow) > 0 { - for _, mwItem := range maintenanceWindow { - mwItemMap := mwItem.(map[string]interface{}) + tflog.Debug(ctx, "get pulsarcluster changed: %v", map[string]interface{}{ + "pulsarcluster": *pulsarCluster.Spec.Config, + }) + return changed +} - if pulsarCluster.Spec.MaintenanceWindow == nil { - pulsarCluster.Spec.MaintenanceWindow = &cloudv1alpha1.MaintenanceWindow{} - } +func hasMaintenanceWindowConfigured(d *schema.ResourceData) bool { + return len(d.Get("maintenance_window").([]interface{})) > 0 +} - // Handle recurrence - if recurrence, ok := mwItemMap["recurrence"]; ok && recurrence != "" { - pulsarCluster.Spec.MaintenanceWindow.Recurrence = recurrence.(string) - } +func validateMaintenanceWindowAccepted( + expected *cloudv1alpha1.MaintenanceWindow, + actual *cloudv1alpha1.MaintenanceWindow, + operation string, +) error { + if maintenanceWindowEqual(expected, actual) { + return nil + } + return fmt.Errorf( + "ERROR_%s_PULSAR_CLUSTER: maintenance_window is not enabled for this organization", + operation, + ) +} - // Handle window configuration - if window, ok := mwItemMap["window"].([]interface{}); ok && len(window) > 0 { - for _, windowItem := range window { - windowItemMap := windowItem.(map[string]interface{}) +func ensureMaintenanceWindowAcceptedOnDryRun( + ctx context.Context, + clientSet *cloudclient.Clientset, + namespace string, + pulsarCluster *cloudv1alpha1.PulsarCluster, + operation string, +) diag.Diagnostics { + if pulsarCluster.Spec.MaintenanceWindow == nil { + return nil + } + var ( + preview *cloudv1alpha1.PulsarCluster + err error + ) + options := metav1.UpdateOptions{ + FieldManager: fmt.Sprintf("terraform-%s-maintenance-window-validation", strings.ToLower(operation)), + DryRun: []string{metav1.DryRunAll}, + } + if operation == "CREATE" { + preview, err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{ + FieldManager: options.FieldManager, + DryRun: options.DryRun, + }) + } else { + preview, err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Update(ctx, pulsarCluster, options) + } + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_VALIDATE_MAINTENANCE_WINDOW_ON_%s_PULSAR_CLUSTER: %w", operation, err)) + } + if err := validateMaintenanceWindowAccepted(pulsarCluster.Spec.MaintenanceWindow, preview.Spec.MaintenanceWindow, operation); err != nil { + return diag.FromErr(err) + } + return nil +} - if pulsarCluster.Spec.MaintenanceWindow.Window == nil { - pulsarCluster.Spec.MaintenanceWindow.Window = &cloudv1alpha1.Window{} - } +func maintenanceWindowEqual(expected *cloudv1alpha1.MaintenanceWindow, actual *cloudv1alpha1.MaintenanceWindow) bool { + if expected == nil || actual == nil { + return expected == nil && actual == nil + } + if expected.Recurrence != actual.Recurrence { + return false + } + return windowEqual(expected.Window, actual.Window) +} - // Handle start_time - if startTime, ok := windowItemMap["start_time"]; ok && startTime != "" { - pulsarCluster.Spec.MaintenanceWindow.Window.StartTime = startTime.(string) - } +func windowEqual(expected *cloudv1alpha1.Window, actual *cloudv1alpha1.Window) bool { + if expected == nil || actual == nil { + return expected == nil && actual == nil + } + if expected.StartTime != actual.StartTime { + return false + } + if expected.Duration == nil || actual.Duration == nil { + return expected.Duration == nil && actual.Duration == nil + } + return expected.Duration.Duration == actual.Duration.Duration +} - // Handle duration - if durationStr, ok := windowItemMap["duration"]; ok && durationStr != "" { - duration, err := time.ParseDuration(durationStr.(string)) - if err != nil { - tflog.Warn(ctx, fmt.Sprintf("Failed to parse maintenance window duration: %v", err)) - } else { - pulsarCluster.Spec.MaintenanceWindow.Window.Duration = &metav1.Duration{Duration: duration} - } - } - } +func expandMaintenanceWindow(ctx context.Context, maintenanceWindow []interface{}) *cloudv1alpha1.MaintenanceWindow { + if len(maintenanceWindow) == 0 { + return nil + } + + result := &cloudv1alpha1.MaintenanceWindow{} + for _, mwItem := range maintenanceWindow { + mwItemMap := mwItem.(map[string]interface{}) + if recurrence, ok := mwItemMap["recurrence"]; ok && recurrence != "" { + result.Recurrence = recurrence.(string) + } + + windowItems, ok := mwItemMap["window"].([]interface{}) + if !ok || len(windowItems) == 0 { + continue + } + if result.Window == nil { + result.Window = &cloudv1alpha1.Window{} + } + for _, windowItem := range windowItems { + windowItemMap := windowItem.(map[string]interface{}) + if startTime, ok := windowItemMap["start_time"]; ok && startTime != "" { + result.Window.StartTime = startTime.(string) + } + if durationStr, ok := windowItemMap["duration"]; ok && durationStr != "" { + duration, err := time.ParseDuration(durationStr.(string)) + if err != nil { + tflog.Warn(ctx, fmt.Sprintf("Failed to parse maintenance window duration: %v", err)) + continue } + result.Window.Duration = &metav1.Duration{Duration: duration} } - } else { - // If maintenance_window is empty, clear the maintenance window configuration - pulsarCluster.Spec.MaintenanceWindow = nil } - changed = true } - tflog.Debug(ctx, "get pulsarcluster changed: %v", map[string]interface{}{ - "pulsarcluster": *pulsarCluster.Spec.Config, - }) - return changed + if result.Recurrence == "" && result.Window == nil { + return nil + } + return result } func getComputeUnit(d *schema.ResourceData) float64 { @@ -1447,8 +1527,8 @@ func convertCpuAndMemoryToStorageUnit(pc *cloudv1alpha1.PulsarCluster) float64 { return 0.5 // default value } -// makeLakehouseStorageComputedForServerless makes lakehouse_storage_enabled computed for serverless clusters -func makeLakehouseStorageComputedForServerless(ctx context.Context, diff *schema.ResourceDiff, meta interface{}) { +// suppressServerlessLakehouseStorageDiff keeps provider-managed serverless lakehouse storage out of plan diffs. +func suppressServerlessLakehouseStorageDiff(ctx context.Context, diff *schema.ResourceDiff, meta interface{}) { // Get instance information to check type instanceName := diff.Get("instance_name").(string) namespace := diff.Get("organization").(string) @@ -1472,16 +1552,18 @@ func makeLakehouseStorageComputedForServerless(ctx context.Context, diff *schema // Check if instance is serverless if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless { - // For serverless clusters, always set lakehouse_storage_enabled to computed - // and set its value to true - if diff.HasChange("lakehouse_storage_enabled") { - // If user tries to set it, clear the change and set as computed with value true - diff.Clear("lakehouse_storage_enabled") - } - // Always set as computed with value true for serverless - diff.SetNewComputed("lakehouse_storage_enabled") - // Set the value to true for serverless clusters - diff.SetNew("lakehouse_storage_enabled", true) + clearServerlessLakehouseStorageDiff(ctx, diff) + } +} + +func clearServerlessLakehouseStorageDiff(ctx context.Context, diff *schema.ResourceDiff) { + if !diff.HasChange("lakehouse_storage_enabled") { + return + } + if err := diff.Clear("lakehouse_storage_enabled"); err != nil { + tflog.Warn(ctx, "failed to clear serverless lakehouse_storage_enabled diff", map[string]interface{}{ + "error": err, + }) } } diff --git a/docs/data-sources/pulsar_cluster.md b/docs/data-sources/pulsar_cluster.md index adf410d..a91a49d 100644 --- a/docs/data-sources/pulsar_cluster.md +++ b/docs/data-sources/pulsar_cluster.md @@ -54,6 +54,8 @@ description: |- - `websocket_service_url` (String) If you want to connect to the pulsar cluster using the websocket protocol, use this websocket service url. - `websocket_service_urls` (List of String) If you want to connect to the pulsar cluster using the websocket protocol, use this websocket service url. There'll be multiple service urls if the cluster attached with multiple gateways +`maintenance_window` is reported only when the control plane keeps a maintenance window on the cluster. + ### Nested Schema for `config` diff --git a/docs/resources/pulsar_cluster.md b/docs/resources/pulsar_cluster.md index 8b2b82c..1f18f04 100644 --- a/docs/resources/pulsar_cluster.md +++ b/docs/resources/pulsar_cluster.md @@ -41,6 +41,8 @@ description: |- - `storage_unit_per_bookie` (Number) storage unit per bookie, 1 storage unit is 2 cpu and 8gb memory - `volume` (String) The name of the volume +`maintenance_window` is available only when maintenance windows are enabled for the organization. If the organization does not support it, apply returns `maintenance_window is not enabled for this organization`. + ### Read-Only - `bookkeeper_version` (String) The version of the bookkeeper cluster From ffdb3abb8902ca1b1e0880d5978d41605d83bbb0 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 28 Mar 2026 17:19:04 +0800 Subject: [PATCH 2/3] style(cloud): improve debug log messages in Pulsar cluster --- cloud/resource_pulsar_cluster.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index 8ab3a06..c33a33d 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -1259,7 +1259,9 @@ func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.P if len(config) > 0 { for _, configItem := range config { configItemMap := configItem.(map[string]interface{}) - tflog.Debug(ctx, "configItemMap: %v", configItemMap) + tflog.Debug(ctx, "config item map", map[string]interface{}{ + "config_item_map": configItemMap, + }) if configItemMap["websocket_enabled"] != nil { webSocketEnabled := configItemMap["websocket_enabled"].(bool) pulsarCluster.Spec.Config.WebsocketEnabled = &webSocketEnabled @@ -1370,8 +1372,8 @@ func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.P } } - tflog.Debug(ctx, "get pulsarcluster changed: %v", map[string]interface{}{ - "pulsarcluster": *pulsarCluster.Spec.Config, + tflog.Debug(ctx, "pulsar cluster config changed", map[string]interface{}{ + "pulsar_cluster_config": *pulsarCluster.Spec.Config, }) return changed } From 44dcad29d6273727492cf87526f242ae094038b2 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sat, 28 Mar 2026 17:44:28 +0800 Subject: [PATCH 3/3] fix(cloud): ensure maintenance_window schema is optional --- cloud/pulsar_cluster_state_test.go | 36 ++++++++++++++++++++++++++++++ cloud/pulsar_cluster_test.go | 10 +++++++++ cloud/resource_pulsar_cluster.go | 5 ----- docs/resources/pulsar_cluster.md | 2 ++ 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/cloud/pulsar_cluster_state_test.go b/cloud/pulsar_cluster_state_test.go index 35aaca1..f844368 100644 --- a/cloud/pulsar_cluster_state_test.go +++ b/cloud/pulsar_cluster_state_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -229,6 +230,41 @@ func TestExpandMaintenanceWindowEmpty(t *testing.T) { assert.Nil(t, expandMaintenanceWindow(context.Background(), []interface{}{})) } +func TestMaintenanceWindowSchemaStrictlyManaged(t *testing.T) { + resourceSchema := resourcePulsarCluster().Schema + maintenanceWindowSchema := resourceSchema["maintenance_window"] + if assert.NotNil(t, maintenanceWindowSchema) { + assert.True(t, maintenanceWindowSchema.Optional) + assert.False(t, maintenanceWindowSchema.Computed) + } + + maintenanceWindowResource := maintenanceWindowSchema.Elem.(*schema.Resource) + windowSchema := maintenanceWindowResource.Schema["window"] + if assert.NotNil(t, windowSchema) { + assert.True(t, windowSchema.Optional) + assert.False(t, windowSchema.Computed) + } + + windowResource := windowSchema.Elem.(*schema.Resource) + startTimeSchema := windowResource.Schema["start_time"] + if assert.NotNil(t, startTimeSchema) { + assert.True(t, startTimeSchema.Optional) + assert.False(t, startTimeSchema.Computed) + } + + durationSchema := windowResource.Schema["duration"] + if assert.NotNil(t, durationSchema) { + assert.True(t, durationSchema.Optional) + assert.False(t, durationSchema.Computed) + } + + recurrenceSchema := maintenanceWindowResource.Schema["recurrence"] + if assert.NotNil(t, recurrenceSchema) { + assert.True(t, recurrenceSchema.Optional) + assert.False(t, recurrenceSchema.Computed) + } +} + func TestMaintenanceWindowEqual(t *testing.T) { expected := &cloudv1alpha1.MaintenanceWindow{ Recurrence: "0,1", diff --git a/cloud/pulsar_cluster_test.go b/cloud/pulsar_cluster_test.go index 88d5fa0..5dc8c3e 100644 --- a/cloud/pulsar_cluster_test.go +++ b/cloud/pulsar_cluster_test.go @@ -177,6 +177,16 @@ func TestPulsarClusterRemoveMaintenanceWindow(t *testing.T) { testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"), ), }, + { + Config: testResourceDataSourcePulsarClusterWithoutConfig( + "sndev", + clusterGeneratedName, + "shared-gcp-prod", + "streamnative", + "us-central1", "rapid"), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, }, }) } diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index c33a33d..67534b3 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -384,27 +384,23 @@ func resourcePulsarCluster() *schema.Resource { "maintenance_window": { Type: schema.TypeList, Optional: true, - Computed: true, Description: "Maintenance window configuration for the pulsar cluster. This field is available only when maintenance windows are enabled for the organization.", Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "window": { Type: schema.TypeList, Optional: true, - Computed: true, Description: "Maintenance execution window", Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "start_time": { Type: schema.TypeString, Optional: true, - Computed: true, Description: "Start time of the maintenance window", }, "duration": { Type: schema.TypeString, Optional: true, - Computed: true, Description: "Duration of the maintenance window in Go duration format (e.g., \"2h0m0s\", \"30m0s\", \"1h30m0s\")", ValidateFunc: validateDuration, }, @@ -414,7 +410,6 @@ func resourcePulsarCluster() *schema.Resource { "recurrence": { Type: schema.TypeString, Optional: true, - Computed: true, Description: "Recurrence pattern for maintenance (0-6 for Monday to Sunday)", }, }, diff --git a/docs/resources/pulsar_cluster.md b/docs/resources/pulsar_cluster.md index 1f18f04..58df315 100644 --- a/docs/resources/pulsar_cluster.md +++ b/docs/resources/pulsar_cluster.md @@ -43,6 +43,8 @@ description: |- `maintenance_window` is available only when maintenance windows are enabled for the organization. If the organization does not support it, apply returns `maintenance_window is not enabled for this organization`. +Removing the `maintenance_window` block from configuration clears the upstream maintenance window on the next apply. + ### Read-Only - `bookkeeper_version` (String) The version of the bookkeeper cluster