From 65560c922a6446fd1754907c5a83692b4ea047b7 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Tue, 3 Feb 2026 10:43:04 +0200 Subject: [PATCH 01/10] chore: data catalog: create fe container for dataservices (OP-241) --- .../weka-operator/resources/weka_runtime.py | 12 +- doc/api_dump/wekacluster.md | 15 ++ internal/config/env.go | 6 +- internal/controllers/allocator/templates.go | 145 ++++++++------ .../controllers/factory/container_factory.go | 12 ++ internal/controllers/resources/pod.go | 47 ++++- .../controllers/wekacluster/funcs_catalog.go | 103 ++++++++++ .../controllers/wekacluster/funcs_upgrade.go | 7 + .../wekacluster/steps_cluster_creation.go | 7 +- .../wekacluster/steps_post_cluster.go | 43 +++++ .../wekacontainer/flow_active_state.go | 21 ++ .../wekacontainer/flow_deleting_state.go | 13 ++ .../wekacontainer/flow_paused_state.go | 6 + .../funcs_active_state_catalog.go | 75 ++++++++ .../funcs_active_state_data_services.go | 182 ++++++++++++++++++ .../wekacontainer/funcs_image_upgrade.go | 7 + internal/pkg/domain/api_extension.go | 1 + internal/services/weka.go | 181 +++++++++++++++++ pkg/weka-k8s-api | 2 +- 19 files changed, 808 insertions(+), 77 deletions(-) create mode 100644 internal/controllers/wekacluster/funcs_catalog.go create mode 100644 internal/controllers/wekacontainer/funcs_active_state_catalog.go create mode 100644 internal/controllers/wekacontainer/funcs_active_state_data_services.go diff --git a/charts/weka-operator/resources/weka_runtime.py b/charts/weka-operator/resources/weka_runtime.py index 1a1b23415..03ac6819b 100644 --- a/charts/weka-operator/resources/weka_runtime.py +++ b/charts/weka-operator/resources/weka_runtime.py @@ -2350,7 +2350,7 @@ def is_managed_k8s(network_device=None): async def create_container(): - if MODE not in ["compute", "drive", "client", "s3", "nfs", "data-services"]: + if MODE not in ["compute", "drive", "client", "s3", "nfs", "data-services", "data-services-fe"]: raise NotImplementedError(f"Unsupported mode: {MODE}") full_cores = find_full_cores(NUM_CORES) @@ -2367,6 +2367,8 @@ async def create_container(): mode_part = "--only-frontend-cores" elif MODE == "data-services": mode_part = "--only-dataserv-cores" + elif MODE == "data-services-fe": + mode_part = "--only-frontend-cores" core_str = ",".join(map(str, full_cores)) logging.info(f"Creating container with cores: {core_str}") @@ -3373,7 +3375,7 @@ async def ensure_ssdproxy_container(): _, _, ec = await run_command(cmd) if ec != 0: raise Exception(f"Failed to ensure ssdproxy container") - + if not os.path.exists("/usr/bin/weka-sign-drive"): os.symlink("/opt/weka/dist/extracted/weka-sign-drive", "/usr/bin/weka-sign-drive") logging.info("Created symlink /usr/bin/weka-sign-drive -> /opt/weka/dist/extracted/weka-sign-drive") @@ -3532,7 +3534,7 @@ async def wait_for_resources(): if MODE == 'client': await ensure_client_ports() - if MODE not in ['drive', 's3', 'compute', 'nfs', 'envoy', 'client', 'telemetry', 'data-services']: + if MODE not in ['drive', 's3', 'compute', 'nfs', 'envoy', 'client', 'telemetry', 'data-services', 'data-services-fe']: return logging.info("waiting for controller to set resources") @@ -3724,7 +3726,7 @@ async def get_devices_by_selectors(selectors_str: str) -> List[str]: async def write_management_ips(): """Auto-discover management IPs and write them to a file""" - if MODE not in ['drive', 'compute', 's3', 'nfs', 'client', 'data-services']: + if MODE not in ['drive', 'compute', 's3', 'nfs', 'client', 'data-services', 'data-services-fe']: return ipAddresses = [] @@ -4398,7 +4400,7 @@ async def shutdown(): force_stop = True if is_wrong_generation(): force_stop = True - if MODE not in ["s3", "drive", "compute", "nfs", "data-services"]: + if MODE not in ["s3", "drive", "compute", "nfs", "data-services", "data-services-fe"]: force_stop = True stop_flag = "--force" if force_stop else "-g" diff --git a/doc/api_dump/wekacluster.md b/doc/api_dump/wekacluster.md index aa20581bd..7cac0be60 100644 --- a/doc/api_dump/wekacluster.md +++ b/doc/api_dump/wekacluster.md @@ -27,6 +27,7 @@ - [S3Config](#s3config) - [SmbwConfig](#smbwconfig) - [TelemetryConfig](#telemetryconfig) +- [CatalogConfig](#catalogconfig) - [ClusterMetrics](#clustermetrics) - [ClusterPrinterColumns](#clusterprintercolumns) - [RoleTopologySpreadConstraints](#roletopologyspreadconstraints) @@ -103,6 +104,7 @@ | s3 | *S3Config | | | smbw | *SmbwConfig | | | telemetry | *TelemetryConfig | Telemetry configuration for exporting audit logs and other telemetry data | +| catalog | *CatalogConfig | Catalog configuration for data catalog service | --- @@ -224,6 +226,7 @@ | envoy | int | | | smbw | int | | | dataServices | int | | +| dataServicesFe | int | | --- @@ -279,6 +282,9 @@ | dataServicesExtraCores | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: number of data services extra cores per container | | dataServicesHugepages | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage allocation for data services frontend | | dataServicesHugepagesOffset | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage offset for data services frontend | +| dataServicesFeCores | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: number of data services frontend cores per container | +| dataServicesFeHugepages | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage allocation for data services frontend container | +| dataServicesFeHugepagesOffset | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage offset for data services frontend container | --- @@ -405,6 +411,15 @@ --- +## CatalogConfig + +| JSON Field | Type | Description | +|------------|------|-------------| +| indexInterval | string | IndexInterval specifies how often the catalog index is updated (e.g., "1d", "1m") | +| retentionPeriod | string | RetentionPeriod specifies how long catalog data is retained (e.g., "30d", "10m") | + +--- + ## ClusterMetrics | JSON Field | Type | Description | diff --git a/internal/config/env.go b/internal/config/env.go index 6ac376dea..854385154 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -211,8 +211,9 @@ var Config struct { RecreateUnhealthyEnvoyThrottlingEnabled bool SkipClientsTolerationValidation bool TolerationsMismatchSettings TolerationsMismatchSettings - DeleteEnvoyWithoutS3NeighborTimeout time.Duration - DeleteTelemetryWithoutComputeNeighborTimeout time.Duration + DeleteEnvoyWithoutS3NeighborTimeout time.Duration + DeleteTelemetryWithoutComputeNeighborTimeout time.Duration + DeleteDataServicesFEWithoutDataServicesNeighborTimeout time.Duration DeleteUnschedulablePodsAfter time.Duration RemoveFailedDrivesFromWeka bool AllowMultipleProtocolsPerNode bool @@ -393,6 +394,7 @@ func ConfigureEnv(ctx context.Context) { Config.TolerationsMismatchSettings.IgnoredTaints = getStringSlice("TOLERATIONS_MISMATCH_SETTINGS_IGNORED_TAINTS") Config.DeleteEnvoyWithoutS3NeighborTimeout = getDurationEnvOrDefault("DELETE_ENVOY_WITHOUT_S3_NEIGHBOR_TIMEOUT", 5*time.Minute) Config.DeleteTelemetryWithoutComputeNeighborTimeout = getDurationEnvOrDefault("DELETE_TELEMETRY_WITHOUT_COMPUTE_NEIGHBOR_TIMEOUT", 5*time.Minute) + Config.DeleteDataServicesFEWithoutDataServicesNeighborTimeout = getDurationEnvOrDefault("DELETE_DATA_SERVICES_FE_WITHOUT_DATA_SERVICES_NEIGHBOR_TIMEOUT", 1*time.Minute) Config.DeleteUnschedulablePodsAfter = getDurationEnvOrDefault("DELETE_UNSCHEDULABLE_PODS_AFTER", 1*time.Minute) Config.RemoveFailedDrivesFromWeka = getBoolEnvOrDefault("REMOVE_FAILED_DRIVES_FROM_WEKA", false) Config.AllowMultipleProtocolsPerNode = getBoolEnvOrDefault("ALLOW_MULTIPLE_PROTOCOLS_PER_NODE", false) diff --git a/internal/controllers/allocator/templates.go b/internal/controllers/allocator/templates.go index 3b824fc1d..f2d7189c6 100644 --- a/internal/controllers/allocator/templates.go +++ b/internal/controllers/allocator/templates.go @@ -8,38 +8,41 @@ import ( ) type ClusterTemplate struct { - DriveCores int - DriveExtraCores int - ComputeCores int - ComputeExtraCores int - EnvoyCores int - S3Cores int - S3ExtraCores int - NfsCores int - NfsExtraCores int - ComputeContainers int - DriveContainers int - S3Containers int - NfsContainers int - NumDrives int - DriveCapacity int - ContainerCapacity int - DriveTypesRatio *v1alpha1.DriveTypesRatio - DriveHugepages int - DriveHugepagesOffset int - ComputeHugepages int - ComputeHugepagesOffset int - HugePageSize string - HugePagesOverride string - S3FrontendHugepages int - S3FrontendHugepagesOffset int - NfsFrontendHugepages int - NfsFrontendHugepagesOffset int - DataServicesCores int - DataServicesExtraCores int - DataServicesContainers int - DataServicesHugepages int - DataServicesHugepagesOffset int + DriveCores int + DriveExtraCores int + ComputeCores int + ComputeExtraCores int + EnvoyCores int + S3Cores int + S3ExtraCores int + NfsCores int + NfsExtraCores int + ComputeContainers int + DriveContainers int + S3Containers int + NfsContainers int + NumDrives int + DriveCapacity int + ContainerCapacity int + DriveTypesRatio *v1alpha1.DriveTypesRatio + DriveHugepages int + DriveHugepagesOffset int + ComputeHugepages int + ComputeHugepagesOffset int + HugePageSize string + HugePagesOverride string + S3FrontendHugepages int + S3FrontendHugepagesOffset int + NfsFrontendHugepages int + NfsFrontendHugepagesOffset int + DataServicesCores int + DataServicesExtraCores int + DataServicesContainers int + DataServicesHugepages int + DataServicesHugepagesOffset int + DataServicesFeCores int + DataServicesFeHugepages int + DataServicesFeHugepagesOffset int } func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate { @@ -143,6 +146,19 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate { config.DataServicesHugepagesOffset = 200 } + // Data-services-fe defaults (client-like: 1 core default, hugepages = cores * 1500) + if config.DataServicesFeCores == 0 { + config.DataServicesFeCores = 1 + } + + if config.DataServicesFeHugepages == 0 { + config.DataServicesFeHugepages = config.DataServicesFeCores * 1500 // Client-like formula + } + + if config.DataServicesFeHugepagesOffset == 0 { + config.DataServicesFeHugepagesOffset = 200 + } + if config.EnvoyCores == 0 { config.EnvoyCores = 1 } @@ -161,37 +177,40 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate { } return ClusterTemplate{ - DriveCores: config.DriveCores, - DriveExtraCores: config.DriveExtraCores, - ComputeCores: config.ComputeCores, - ComputeExtraCores: config.ComputeExtraCores, - ComputeContainers: *config.ComputeContainers, - DriveContainers: *config.DriveContainers, - S3Containers: config.S3Containers, - S3Cores: config.S3Cores, - S3ExtraCores: config.S3ExtraCores, - NfsContainers: config.NfsContainers, - NumDrives: config.NumDrives, - DriveCapacity: config.DriveCapacity, - ContainerCapacity: config.ContainerCapacity, - DriveTypesRatio: config.DriveTypesRatio, - DriveHugepages: config.DriveHugepages, - DriveHugepagesOffset: config.DriveHugepagesOffset, - ComputeHugepages: config.ComputeHugepages, - ComputeHugepagesOffset: config.ComputeHugepagesOffset, - S3FrontendHugepages: config.S3FrontendHugepages, - S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset, - HugePageSize: hgSize, - EnvoyCores: config.EnvoyCores, - NfsCores: config.NfsCores, - NfsExtraCores: config.NfsExtraCores, - NfsFrontendHugepages: config.NfsFrontendHugepages, - NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset, - DataServicesContainers: config.DataServicesContainers, - DataServicesCores: config.DataServicesCores, - DataServicesExtraCores: config.DataServicesExtraCores, - DataServicesHugepages: config.DataServicesHugepages, - DataServicesHugepagesOffset: config.DataServicesHugepagesOffset, + DriveCores: config.DriveCores, + DriveExtraCores: config.DriveExtraCores, + ComputeCores: config.ComputeCores, + ComputeExtraCores: config.ComputeExtraCores, + ComputeContainers: *config.ComputeContainers, + DriveContainers: *config.DriveContainers, + S3Containers: config.S3Containers, + S3Cores: config.S3Cores, + S3ExtraCores: config.S3ExtraCores, + NfsContainers: config.NfsContainers, + NumDrives: config.NumDrives, + DriveCapacity: config.DriveCapacity, + ContainerCapacity: config.ContainerCapacity, + DriveTypesRatio: config.DriveTypesRatio, + DriveHugepages: config.DriveHugepages, + DriveHugepagesOffset: config.DriveHugepagesOffset, + ComputeHugepages: config.ComputeHugepages, + ComputeHugepagesOffset: config.ComputeHugepagesOffset, + S3FrontendHugepages: config.S3FrontendHugepages, + S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset, + HugePageSize: hgSize, + EnvoyCores: config.EnvoyCores, + NfsCores: config.NfsCores, + NfsExtraCores: config.NfsExtraCores, + NfsFrontendHugepages: config.NfsFrontendHugepages, + NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset, + DataServicesContainers: config.DataServicesContainers, + DataServicesCores: config.DataServicesCores, + DataServicesExtraCores: config.DataServicesExtraCores, + DataServicesHugepages: config.DataServicesHugepages, + DataServicesHugepagesOffset: config.DataServicesHugepagesOffset, + DataServicesFeCores: config.DataServicesFeCores, + DataServicesFeHugepages: config.DataServicesFeHugepages, + DataServicesFeHugepagesOffset: config.DataServicesFeHugepagesOffset, } } diff --git a/internal/controllers/factory/container_factory.go b/internal/controllers/factory/container_factory.go index c009c800c..7fdfb58bb 100644 --- a/internal/controllers/factory/container_factory.go +++ b/internal/controllers/factory/container_factory.go @@ -54,6 +54,10 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster, numCores = template.DataServicesCores hugePagesNum = template.DataServicesHugepages hugePagesOffset = template.DataServicesHugepagesOffset + case "data-services-fe": + numCores = template.DataServicesFeCores + hugePagesNum = template.DataServicesFeHugepages + hugePagesOffset = template.DataServicesFeHugepagesOffset default: return nil, fmt.Errorf("unsupported role %s", role) } @@ -87,6 +91,8 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster, case "data-services": additionalMemory = cluster.Spec.AdditionalMemory.DataServices extraCores = template.DataServicesExtraCores + case "data-services-fe": + additionalMemory = cluster.Spec.AdditionalMemory.DataServicesFe case "envoy": additionalMemory = cluster.Spec.AdditionalMemory.Envoy } @@ -98,6 +104,9 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster, if slices.Contains([]string{"compute", "telemetry"}, role) { containerGroup = "compute" } + if slices.Contains([]string{"data-services", "data-services-fe"}, role) { + containerGroup = "data-services" + } wekahomeConfig, err := domain.GetWekahomeConfig(cluster) if err != nil { @@ -119,6 +128,9 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster, if role == wekav1alpha1.WekaContainerModeTelemetry { // telemetry sticks to compute, so does not need explicit node selector nodeSelector = map[string]string{} } + if role == wekav1alpha1.WekaContainerModeDataServicesFe { // data-services-fe sticks to data-services, so does not need explicit node selector + nodeSelector = map[string]string{} + } container := &wekav1alpha1.WekaContainer{ TypeMeta: metav1.TypeMeta{ diff --git a/internal/controllers/resources/pod.go b/internal/controllers/resources/pod.go index 62565df14..f9d2cb46c 100644 --- a/internal/controllers/resources/pod.go +++ b/internal/controllers/resources/pod.go @@ -1121,7 +1121,7 @@ func (f *PodFactory) getHugePagesOffset() int { func (f *PodFactory) setResources(ctx context.Context, pod *corev1.Pod) error { totalNumCores := f.container.Spec.NumCores switch f.container.Spec.Mode { - case weka.WekaContainerModeCompute, weka.WekaContainerModeDrive, weka.WekaContainerModeS3, weka.WekaContainerModeNfs, weka.WekaContainerModeDataServices: + case weka.WekaContainerModeCompute, weka.WekaContainerModeDrive, weka.WekaContainerModeS3, weka.WekaContainerModeNfs, weka.WekaContainerModeDataServices, weka.WekaContainerModeDataServicesFe: totalNumCores += f.container.Spec.ExtraCores } @@ -1170,7 +1170,7 @@ func (f *PodFactory) setResources(ctx context.Context, pod *corev1.Pod) error { totalCores = totalNumCores // inconsistency with pre-allocation, but we rather not allocate envoy too much too soon } switch f.container.Spec.Mode { - case weka.WekaContainerModeCompute, weka.WekaContainerModeDrive, weka.WekaContainerModeS3, weka.WekaContainerModeNfs, weka.WekaContainerModeDataServices: + case weka.WekaContainerModeCompute, weka.WekaContainerModeDrive, weka.WekaContainerModeS3, weka.WekaContainerModeNfs, weka.WekaContainerModeDataServices, weka.WekaContainerModeDataServicesFe: totalCores = totalCores - f.container.Spec.ExtraCores // basically reducing back what we over-allocated } cpuRequestStr = fmt.Sprintf("%d", totalCores) @@ -1294,14 +1294,22 @@ func (f *PodFactory) setResources(ctx context.Context, pod *corev1.Pod) error { } if f.container.Spec.Mode == weka.WekaContainerModeDataServices { - // Data services require at least 3.5GB reserved memory - dataServicesMemory := 3584 // 3.5GB in MiB + // Data services require 32GB reserved memory + dataServicesMemory := 32768 // 32GB in MiB managementMemory := 1965 perCoreMemory := 512 // minimal per-core overhead for data services buffer := 450 memRequest = fmt.Sprintf("%dMi", buffer+managementMemory+dataServicesMemory+perCoreMemory*f.container.Spec.NumCores+f.container.Spec.AdditionalMemory) } + if f.container.Spec.Mode == weka.WekaContainerModeDataServicesFe { + // Data-services-fe uses client-like memory profile (frontend-only) + managementMemory := 1965 + perFrontendMemory := 3050 + buffer := 2000 + memRequest = fmt.Sprintf("%dMi", buffer+managementMemory+perFrontendMemory*totalNumCores+f.container.Spec.AdditionalMemory) + } + if f.container.Spec.Mode == weka.WekaContainerModeEnvoy { total := 1024 + f.container.Spec.AdditionalMemory memRequest = fmt.Sprintf("%dMi", total) @@ -1474,11 +1482,18 @@ func (f *PodFactory) setAffinities(ctx context.Context, pod *corev1.Pod) error { if f.container.HasFrontend() && !config.Config.AllowMultipleProtocolsPerNode { // we don't want to allow more than one s3 or client container per node // Other types of containers we validate to be once for cluster + antiAffinityModes := domain.ContainerModesWithFrontend + // data-services-fe needs to be co-located with data-services, so exclude it from anti-affinity + if f.container.IsDataServicesFEContainer() { + antiAffinityModes = slices.DeleteFunc(slices.Clone(domain.ContainerModesWithFrontend), func(s string) bool { + return s == weka.WekaContainerModeDataServices + }) + } term.LabelSelector.MatchExpressions = []metav1.LabelSelectorRequirement{ { Key: domain.WekaLabelMode, Operator: metav1.LabelSelectorOpIn, - Values: domain.ContainerModesWithFrontend, + Values: antiAffinityModes, }, } } else { @@ -1543,6 +1558,28 @@ func (f *PodFactory) setAffinities(ctx context.Context, pod *corev1.Pod) error { pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = terms } } + + if f.container.IsDataServicesFEContainer() { + // schedule together with data-services, required during scheduling + term := corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "weka.io/mode": weka.WekaContainerModeDataServices, + "weka.io/cluster-id": clusterId, + }, + }, + TopologyKey: "kubernetes.io/hostname", + } + if pod.Spec.Affinity.PodAffinity == nil { + pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{term}, + } + } else { + terms := pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution + terms = append(terms, term) + pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = terms + } + } } if f.container.Spec.NoAffinityConstraints { diff --git a/internal/controllers/wekacluster/funcs_catalog.go b/internal/controllers/wekacluster/funcs_catalog.go new file mode 100644 index 000000000..738f20459 --- /dev/null +++ b/internal/controllers/wekacluster/funcs_catalog.go @@ -0,0 +1,103 @@ +package wekacluster + +import ( + "context" + + "github.com/pkg/errors" + "github.com/weka/go-weka-observability/instrumentation" + weka "github.com/weka/weka-k8s-api/api/v1alpha1" + + "github.com/weka/weka-operator/internal/services" + "github.com/weka/weka-operator/internal/services/discovery" +) + +// SelectDataServicesFEContainers returns all data-services-fe containers from the given list +func (r *wekaClusterReconcilerLoop) SelectDataServicesFEContainers(containers []*weka.WekaContainer) []*weka.WekaContainer { + var feContainers []*weka.WekaContainer + for _, container := range containers { + if container.Spec.Mode == weka.WekaContainerModeDataServicesFe { + feContainers = append(feContainers, container) + } + } + return feContainers +} + +// EnsureCatalogCluster creates the catalog cluster with initial data-services and data-services-fe containers +func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "EnsureCatalogCluster") + defer end() + + container := discovery.SelectActiveContainer(r.containers) + if container == nil { + return errors.New("No active container found") + } + + // Collect container IDs from data-services and data-services-fe containers + // that have joined the cluster (have ClusterContainerID) + var containerIds []int + + dataServicesContainers := r.SelectDataServicesContainers(r.containers) + dataServicesFEContainers := r.SelectDataServicesFEContainers(r.containers) + + for _, c := range dataServicesContainers { + if c.Status.ClusterContainerID != nil { + containerIds = append(containerIds, *c.Status.ClusterContainerID) + } + } + + for _, c := range dataServicesFEContainers { + if c.Status.ClusterContainerID != nil { + containerIds = append(containerIds, *c.Status.ClusterContainerID) + } + } + + // Need at least 2 containers (1 data-services + 1 data-services-fe) to form catalog cluster + if len(containerIds) < 2 { + logger.Info("Not enough containers to form catalog cluster", "containerIds", len(containerIds)) + return errors.New("Need at least 2 containers (data-services + data-services-fe) to form catalog cluster") + } + + wekaService := services.NewWekaService(r.ExecService, container) + err := wekaService.CreateCatalogCluster(ctx, containerIds) + if err != nil { + return err + } + + logger.Info("Catalog cluster ensured", "containerIds", containerIds) + return nil +} + +// ShouldConfigureCatalog checks if catalog configuration needs to be applied +func (r *wekaClusterReconcilerLoop) ShouldConfigureCatalog() bool { + return r.cluster.Spec.Catalog != nil && + (r.cluster.Spec.Catalog.IndexInterval != "" || r.cluster.Spec.Catalog.RetentionPeriod != "") +} + +// EnsureCatalogConfig applies the catalog configuration (index-interval, retention-period) +func (r *wekaClusterReconcilerLoop) EnsureCatalogConfig(ctx context.Context) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "EnsureCatalogConfig") + defer end() + + container := discovery.SelectActiveContainer(r.containers) + if container == nil { + return errors.New("No active container found") + } + + if r.cluster.Spec.Catalog == nil { + return nil + } + + wekaService := services.NewWekaService(r.ExecService, container) + params := services.CatalogConfigParams{ + IndexInterval: r.cluster.Spec.Catalog.IndexInterval, + RetentionPeriod: r.cluster.Spec.Catalog.RetentionPeriod, + } + + err := wekaService.UpdateCatalogConfig(ctx, params) + if err != nil { + return err + } + + logger.Info("Catalog config updated", "indexInterval", params.IndexInterval, "retentionPeriod", params.RetentionPeriod) + return nil +} diff --git a/internal/controllers/wekacluster/funcs_upgrade.go b/internal/controllers/wekacluster/funcs_upgrade.go index e9b24803c..3411b5165 100644 --- a/internal/controllers/wekacluster/funcs_upgrade.go +++ b/internal/controllers/wekacluster/funcs_upgrade.go @@ -529,7 +529,14 @@ func (r *wekaClusterReconcilerLoop) handleUpgrade(ctx context.Context) error { if err != nil { return err } + dataServicesContainers, err := clusterService.GetOwnedContainers(ctx, weka.WekaContainerModeDataServices) + if err != nil { + return err + } + // Note: data-services-fe containers are upgraded as part of data-services container upgrade + // (ensureSiblingFEUpgraded handles updating and waiting for the FE before the DS pod is deleted) feContainers := append(s3Containers, nfsContainres...) + feContainers = append(feContainers, dataServicesContainers...) prepareForUpgrade = true // if any s3 container or any NFS container changed version - do not prepare for frontends diff --git a/internal/controllers/wekacluster/steps_cluster_creation.go b/internal/controllers/wekacluster/steps_cluster_creation.go index b802b6d40..debe0f724 100644 --- a/internal/controllers/wekacluster/steps_cluster_creation.go +++ b/internal/controllers/wekacluster/steps_cluster_creation.go @@ -348,7 +348,7 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp // Check if telemetry exports are configured hasTelemetryExports := cluster.Spec.Telemetry != nil && len(cluster.Spec.Telemetry.Exports) > 0 - for _, role := range []string{"drive", "compute", "s3", "envoy", "nfs", "telemetry", "data-services"} { + for _, role := range []string{"drive", "compute", "s3", "envoy", "nfs", "telemetry", "data-services", "data-services-fe"} { var numContainers int if clusterReady { @@ -372,6 +372,8 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp } case "data-services": numContainers = template.DataServicesContainers + case "data-services-fe": + numContainers = template.DataServicesContainers // 1:1 with data-services } } else { switch role { @@ -402,6 +404,9 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp if role == "telemetry" && hasTelemetryExports { numContainers = totalByrole["compute"] } + if role == "data-services-fe" { + numContainers = totalByrole["data-services"] + } for i := currentCount; i < numContainers; i++ { diff --git a/internal/controllers/wekacluster/steps_post_cluster.go b/internal/controllers/wekacluster/steps_post_cluster.go index 559425029..34eb51fb4 100644 --- a/internal/controllers/wekacluster/steps_post_cluster.go +++ b/internal/controllers/wekacluster/steps_post_cluster.go @@ -154,6 +154,26 @@ func GetPostClusterSteps(loop *wekaClusterReconcilerLoop) []lifecycle.Step { }, Run: loop.EnsureDataServicesGlobalConfig, }, + &lifecycle.SimpleStep{ + Predicates: lifecycle.Predicates{ + loop.HasDataServicesContainers, + }, + State: &lifecycle.State{ + Name: condition.CondCatalogClusterCreated, + }, + Run: loop.EnsureCatalogCluster, + ContinueOnError: true, + }, + &lifecycle.SimpleStep{ + Predicates: lifecycle.Predicates{ + loop.HasDataServicesContainers, + loop.ShouldConfigureCatalog, + }, + State: &lifecycle.State{ + Name: condition.CondCatalogConfigured, + }, + Run: loop.EnsureCatalogConfig, + }, &lifecycle.SimpleStep{ ContinueOnError: true, Run: loop.EnsureTelemetry, @@ -330,6 +350,7 @@ func (r *wekaClusterReconcilerLoop) EnsureDefaultFS(ctx context.Context) error { } isEncrypted := r.ShouldEncryptFs() + hasDataServices := r.HasDataServicesContainers() err = wekaService.CreateFilesystem(ctx, ".config_fs", "default", services.FSParams{ TotalCapacity: strconv.FormatInt(thinProvisionedLimitsConfigFS, 10), @@ -352,6 +373,7 @@ func (r *wekaClusterReconcilerLoop) EnsureDefaultFS(ctx context.Context) error { ThinProvisioningEnabled: true, IsEncrypted: isEncrypted, NoKmsEncryption: r.IsInternalEncryptionEnabled(), + IndexEnabled: hasDataServices, }) if err != nil { var fsExists *services.FilesystemExists @@ -360,6 +382,27 @@ func (r *wekaClusterReconcilerLoop) EnsureDefaultFS(ctx context.Context) error { } } + // Create .indexfs for catalog when data services are present + if hasDataServices { + var indexFsThickCapacity int64 = 10 * 1024 * 1024 * 1024 // 10GB + var indexFsThinCapacity int64 = 100 * 1024 * 1024 * 1024 // 100GB + + err = wekaService.CreateFilesystem(ctx, ".indexfs", "default", services.FSParams{ + TotalCapacity: strconv.FormatInt(indexFsThinCapacity, 10), + ThickProvisioningCapacity: strconv.FormatInt(indexFsThickCapacity, 10), + ThinProvisioningEnabled: true, + IsEncrypted: isEncrypted, + NoKmsEncryption: r.IsInternalEncryptionEnabled(), + }) + if err != nil { + var fsExists *services.FilesystemExists + if !errors.As(err, &fsExists) { + return err + } + } + logger.Info(".indexfs filesystem ensured for data services") + } + logger.SetStatus(codes.Ok, "default filesystem ensured") return nil diff --git a/internal/controllers/wekacontainer/flow_active_state.go b/internal/controllers/wekacontainer/flow_active_state.go index fc8b297bf..46afca213 100644 --- a/internal/controllers/wekacontainer/flow_active_state.go +++ b/internal/controllers/wekacontainer/flow_active_state.go @@ -163,6 +163,12 @@ func ActiveStateFlow(r *containerReconcilerLoop) []lifecycle.Step { r.container.IsTelemetry, }, }, + &lifecycle.SimpleStep{ + Run: r.deleteDataServicesFEIfNoDataServicesNeighbor, + Predicates: lifecycle.Predicates{ + r.container.IsDataServicesFEContainer, + }, + }, &lifecycle.SimpleStep{ // let drivers being re-built if node with drivers container is not found Run: r.clearStatusOnNodeNotFound, @@ -571,6 +577,21 @@ func ActiveStateFlow(r *containerReconcilerLoop) []lifecycle.Step { r.container.HasJoinIps, }, }, + &lifecycle.SimpleStep{ + State: &lifecycle.State{ + Name: condition.CondJoinedCatalogCluster, + Message: "Joined catalog cluster", + }, + Run: r.JoinCatalogCluster, + Predicates: lifecycle.Predicates{ + func() bool { + return r.container.IsDataServicesContainer() || r.container.IsDataServicesFEContainer() + }, + func() bool { + return r.container.Status.ClusterContainerID != nil + }, + }, + }, } steps := append(steps1, metricsSteps...) diff --git a/internal/controllers/wekacontainer/flow_deleting_state.go b/internal/controllers/wekacontainer/flow_deleting_state.go index f91274cf3..374a0c0fb 100644 --- a/internal/controllers/wekacontainer/flow_deleting_state.go +++ b/internal/controllers/wekacontainer/flow_deleting_state.go @@ -119,6 +119,19 @@ func DeletingStateFlow(r *containerReconcilerLoop) []lifecycle.Step { r.container.IsNfsContainer, }, }, + &lifecycle.SimpleStep{ + State: &lifecycle.State{ + Name: condition.CondRemovedFromCatalogCluster, + Reason: "Deletion", + }, + Run: r.RemoveFromCatalogCluster, + Predicates: lifecycle.Predicates{ + r.ShouldDeactivate, + func() bool { + return r.container.IsDataServicesContainer() || r.container.IsDataServicesFEContainer() + }, + }, + }, //{ // Condition: condition.CondContainerDrivesDeactivated, // CondReason: "Deletion", diff --git a/internal/controllers/wekacontainer/flow_paused_state.go b/internal/controllers/wekacontainer/flow_paused_state.go index cd11bbb58..ae06ba600 100644 --- a/internal/controllers/wekacontainer/flow_paused_state.go +++ b/internal/controllers/wekacontainer/flow_paused_state.go @@ -26,6 +26,12 @@ func PausedStateFlow(r *containerReconcilerLoop) []lifecycle.Step { r.container.IsTelemetry, }, }, + &lifecycle.SimpleStep{ + Run: r.deleteDataServicesFEIfNoDataServicesNeighbor, + Predicates: lifecycle.Predicates{ + r.container.IsDataServicesFEContainer, + }, + }, &lifecycle.SimpleStep{ Run: r.handleStatePaused, }, diff --git a/internal/controllers/wekacontainer/funcs_active_state_catalog.go b/internal/controllers/wekacontainer/funcs_active_state_catalog.go new file mode 100644 index 000000000..c949ff236 --- /dev/null +++ b/internal/controllers/wekacontainer/funcs_active_state_catalog.go @@ -0,0 +1,75 @@ +package wekacontainer + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/weka/go-steps-engine/lifecycle" + "github.com/weka/go-weka-observability/instrumentation" + "github.com/weka/weka-k8s-api/api/v1alpha1/condition" + "k8s.io/apimachinery/pkg/api/meta" + + "github.com/weka/weka-operator/internal/services" + "github.com/weka/weka-operator/internal/services/discovery" +) + +// IsCatalogClusterFormed checks if the catalog cluster has been created at the cluster level +func (r *containerReconcilerLoop) IsCatalogClusterFormed(ctx context.Context) (bool, error) { + cluster, err := r.getCluster(ctx) + if err != nil { + return false, err + } + + return meta.IsStatusConditionTrue(cluster.Status.Conditions, condition.CondCatalogClusterCreated), nil +} + +// JoinCatalogCluster adds this container to the catalog cluster +func (r *containerReconcilerLoop) JoinCatalogCluster(ctx context.Context) error { + isFormed, err := r.IsCatalogClusterFormed(ctx) + if err != nil { + return fmt.Errorf("error checking if catalog cluster is formed: %w", err) + } + if !isFormed { + return lifecycle.NewWaitError(fmt.Errorf("catalog cluster is not formed yet, waiting for it to be formed")) + } + + wekaService := services.NewWekaService(r.ExecService, r.container) + return wekaService.JoinCatalogCluster(ctx, *r.container.Status.ClusterContainerID) +} + +// RemoveFromCatalogCluster removes this container from the catalog cluster before deactivation +func (r *containerReconcilerLoop) RemoveFromCatalogCluster(ctx context.Context) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "") + defer end() + + // Stop pod first (similar to S3 pattern) + err := r.stopAndEnsureNoPod(ctx) + if err != nil { + return err + } + + containerId := r.container.Status.ClusterContainerID + if containerId == nil { + return nil + } + + containers, err := r.getClusterContainers(ctx) + if err != nil { + return err + } + executeInContainer := discovery.SelectActiveContainer(containers) + + if executeInContainer == nil { + return errors.New("No active container found") + } + + wekaService := services.NewWekaService(r.ExecService, executeInContainer) + err = wekaService.RemoveFromCatalogCluster(ctx, *containerId) + if err != nil { + return err + } + + logger.Info("Removed container from catalog cluster", "container_id", *containerId) + return nil +} diff --git a/internal/controllers/wekacontainer/funcs_active_state_data_services.go b/internal/controllers/wekacontainer/funcs_active_state_data_services.go new file mode 100644 index 000000000..a8ce19ba9 --- /dev/null +++ b/internal/controllers/wekacontainer/funcs_active_state_data_services.go @@ -0,0 +1,182 @@ +package wekacontainer + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/weka/go-steps-engine/lifecycle" + "github.com/weka/go-weka-observability/instrumentation" + weka "github.com/weka/weka-k8s-api/api/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/weka/weka-operator/internal/config" + "github.com/weka/weka-operator/internal/pkg/domain" +) + +func (r *containerReconcilerLoop) deleteDataServicesFEIfNoDataServicesNeighbor(ctx context.Context) error { + if !r.container.IsDataServicesFEContainer() { + return nil // only data-services-fe containers should be checked + } + + ctx, logger, end := instrumentation.GetLogSpan(ctx, "") + defer end() + + nodeName := r.container.GetNodeAffinity() + + if nodeName != "" { + ownerRefs := r.container.GetOwnerReferences() + if len(ownerRefs) == 0 { + return errors.New("no owner references found") + } else if len(ownerRefs) > 1 { + return errors.New("more than one owner reference found") + } + + ownerUid := string(ownerRefs[0].UID) + // Check if there are any data-services wekacontainers on the same node + dataServicesContainers, err := r.KubeService.GetWekaContainersSimple(ctx, r.container.Namespace, string(nodeName), map[string]string{ + domain.WekaLabelClusterId: ownerUid, + domain.WekaLabelMode: weka.WekaContainerModeDataServices, + }) + if err != nil { + return err + } + if len(dataServicesContainers) > 0 { + logger.Debug("Found data-services neighbor, not deleting data-services-fe container") + return nil + } + } + + noDataServicesNeighborKey := "NoDataServicesNeighbor" + + if r.container.Status.Timestamps == nil { + r.container.Status.Timestamps = make(map[string]metav1.Time) + } + if since, ok := r.container.Status.Timestamps[noDataServicesNeighborKey]; !ok { + r.container.Status.Timestamps[noDataServicesNeighborKey] = metav1.Time{Time: time.Now()} + if err := r.Status().Update(ctx, r.container); err != nil { + return err + } + + return lifecycle.NewWaitErrorWithDuration( + errors.New("Data-services-fe container has no data-services neighbor, waiting before deleting it"), + time.Second*15, + ) + } else if time.Since(since.Time) < config.Config.DeleteDataServicesFEWithoutDataServicesNeighborTimeout { + logger.Info("Data-services-fe container has no data-services neighbor, but waiting before deleting it", + "waited", time.Since(since.Time).String(), + "node", nodeName, + ) + return nil + } + + _ = r.RecordEvent( + v1.EventTypeNormal, + "DataServicesFEContainerWithoutDataServicesNeighbor", + "Data-services-fe container has no data-services neighbor, deleting it", + ) + + if err := r.Client.Delete(ctx, r.container); err != nil { + return errors.Wrap(err, "failed to delete data-services-fe container") + } + + // Clear the timestamp to avoid re-deleting the container on next reconcile + delete(r.container.Status.Timestamps, noDataServicesNeighborKey) + if err := r.Status().Update(ctx, r.container); err != nil { + return errors.Wrap(err, "failed to update container status after deleting data-services-fe") + } + + logger.Info("Data-services-fe container deleted as it has no data-services neighbor") + + return nil +} + +// ensureSiblingFEUpgraded ensures the sibling data-services-fe container is upgraded before +// the data-services container pod is deleted for upgrade. This function: +// 1. Finds the sibling FE container on the same node +// 2. Updates its image to match the target image if needed +// 3. Waits for the FE to be running with the new version +func (r *containerReconcilerLoop) ensureSiblingFEUpgraded(ctx context.Context) error { + if !r.container.IsDataServicesContainer() { + return nil + } + + ctx, logger, end := instrumentation.GetLogSpan(ctx, "ensureSiblingFEUpgraded") + defer end() + + nodeName := r.container.GetNodeAffinity() + if nodeName == "" { + return nil + } + + ownerRefs := r.container.GetOwnerReferences() + if len(ownerRefs) == 0 { + return errors.New("no owner references found") + } + + ownerUid := string(ownerRefs[0].UID) + targetImage := r.container.Spec.Image + + // Find sibling data-services-fe container on the same node + feContainers, err := r.KubeService.GetWekaContainersSimple(ctx, r.container.Namespace, string(nodeName), map[string]string{ + domain.WekaLabelClusterId: ownerUid, + domain.WekaLabelMode: weka.WekaContainerModeDataServicesFe, + }) + if err != nil { + return err + } + + if len(feContainers) == 0 { + logger.Debug("No sibling data-services-fe container found on node", "node", nodeName) + return nil + } + + feContainer := &feContainers[0] + + // Check if FE container needs image update + if feContainer.Spec.Image != targetImage { + logger.Info("Updating sibling data-services-fe container image", + "fe_container", feContainer.Name, + "current_image", feContainer.Spec.Image, + "target_image", targetImage, + ) + + patch := map[string]any{ + "spec": map[string]any{ + "image": targetImage, + }, + } + + patchBytes, err := json.Marshal(patch) + if err != nil { + return fmt.Errorf("failed to marshal patch for FE container %s: %w", feContainer.Name, err) + } + + if err := r.Client.Patch(ctx, feContainer, client.RawPatch(types.MergePatchType, patchBytes)); err != nil { + return fmt.Errorf("failed to patch FE container %s with new image: %w", feContainer.Name, err) + } + + return lifecycle.NewWaitError(errors.New("waiting for sibling data-services-fe container to update image")) + } + + // Check if FE container is running with the new version + if feContainer.Status.LastAppliedImage != targetImage { + logger.Info("Waiting for sibling data-services-fe container to be running with new image", + "fe_container", feContainer.Name, + "target_image", targetImage, + "last_applied_image", feContainer.Status.LastAppliedImage, + ) + return lifecycle.NewWaitError(errors.New("waiting for sibling data-services-fe container to be running with new version")) + } + + logger.Info("Sibling data-services-fe container is running with target image", + "fe_container", feContainer.Name, + "target_image", targetImage, + ) + return nil +} diff --git a/internal/controllers/wekacontainer/funcs_image_upgrade.go b/internal/controllers/wekacontainer/funcs_image_upgrade.go index 975b718ed..ebc54d0b0 100644 --- a/internal/controllers/wekacontainer/funcs_image_upgrade.go +++ b/internal/controllers/wekacontainer/funcs_image_upgrade.go @@ -139,6 +139,13 @@ func (r *containerReconcilerLoop) handleImageUpdate(ctx context.Context) error { return nil } + // For data-services containers, ensure sibling FE is upgraded first + if container.IsDataServicesContainer() { + if err := r.ensureSiblingFEUpgraded(ctx); err != nil { + return err + } + } + logger.Info("Deleting pod to apply new image") // delete pod err = r.deletePod(ctx, pod) diff --git a/internal/pkg/domain/api_extension.go b/internal/pkg/domain/api_extension.go index 7a290791e..878939041 100644 --- a/internal/pkg/domain/api_extension.go +++ b/internal/pkg/domain/api_extension.go @@ -6,4 +6,5 @@ var ContainerModesWithFrontend = []string{ v1alpha1.WekaContainerModeNfs, v1alpha1.WekaContainerModeS3, v1alpha1.WekaContainerModeClient, + v1alpha1.WekaContainerModeDataServices, } diff --git a/internal/services/weka.go b/internal/services/weka.go index 41f642cb2..e2c18c5be 100644 --- a/internal/services/weka.go +++ b/internal/services/weka.go @@ -177,6 +177,7 @@ type FSParams struct { ThickProvisioningCapacity string IsEncrypted bool NoKmsEncryption bool + IndexEnabled bool } type S3Params struct { @@ -196,6 +197,18 @@ type S3Cluster struct { Filesystem string `json:"filesystem_name"` } +// CatalogCluster represents the catalog cluster status +type CatalogCluster struct { + Active bool `json:"active"` + Filesystem string `json:"filesystem_name"` +} + +// CatalogConfigParams holds parameters for catalog configuration +type CatalogConfigParams struct { + IndexInterval string + RetentionPeriod string +} + type NFSParams struct { ConfigFilesystem string SupportedVersions []string @@ -351,6 +364,12 @@ type WekaService interface { GetWekaContainer(ctx context.Context, containerId int) (*WekaClusterContainer, error) GetCapacity(ctx context.Context) (WekaCapacityInfo, error) ConfigureDataServicesGlobalConfig(ctx context.Context) error + GetCatalogCluster(ctx context.Context) (*CatalogCluster, error) + CreateCatalogCluster(ctx context.Context, containerIds []int) error + JoinCatalogCluster(ctx context.Context, containerId int) error + RemoveFromCatalogCluster(ctx context.Context, containerId int) error + ListCatalogClusterContainers(ctx context.Context) ([]int, error) + UpdateCatalogConfig(ctx context.Context, params CatalogConfigParams) error //GetFilesystemByName(ctx context.Context, name string) (WekaFilesystem, error) } @@ -1131,6 +1150,10 @@ func (c *CliWekaService) CreateFilesystem(ctx context.Context, name, group strin } } + if params.IndexEnabled { + cmd = append(cmd, "--index-enabled", "true") + } + _, stderr, err := executor.ExecNamed(ctx, "CreateFilesystem", cmd) if err != nil { if strings.Contains(stderr.String(), "already exists") { @@ -1426,3 +1449,161 @@ func (c *CliWekaService) GetCapacity(ctx context.Context) (WekaCapacityInfo, err return capacity, nil } + +func (c *CliWekaService) GetCatalogCluster(ctx context.Context) (*CatalogCluster, error) { + cmd := []string{ + "wekaauthcli", "catalog", "cluster", "--json", + } + + var catalogCluster CatalogCluster + err := c.RunJsonCmd(ctx, cmd, "GetCatalogCluster", &catalogCluster) + if err != nil { + return nil, err + } + return &catalogCluster, nil +} + +func (c *CliWekaService) CreateCatalogCluster(ctx context.Context, containerIds []int) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "CreateCatalogCluster") + defer end() + + executor, err := c.ExecService.GetExecutor(ctx, c.Container) + if err != nil { + return err + } + + cmd := []string{ + "wekaauthcli", "catalog", "cluster", "add", ".indexfs", + "--containers", commaSeparatedInts(containerIds), + } + + _, stderr, err := executor.ExecNamed(ctx, "CreateCatalogCluster", cmd) + if err != nil { + if strings.Contains(stderr.String(), "already exists") { + logger.Info("Catalog cluster already exists") + return nil + } + logger.SetError(err, "Failed to create catalog cluster", "stderr", stderr.String()) + return err + } + + return nil +} + +func (c *CliWekaService) JoinCatalogCluster(ctx context.Context, containerId int) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "JoinCatalogCluster") + defer end() + + executor, err := c.ExecService.GetExecutor(ctx, c.Container) + if err != nil { + logger.SetError(err, "Failed to get executor") + return err + } + + cmd := []string{ + "wekaauthcli", "catalog", "cluster", "update", "--add-containers", strconv.Itoa(containerId), + } + + stdout, stderr, err := executor.ExecNamed(ctx, "JoinCatalogCluster", cmd) + if err != nil && strings.Contains(stderr.String(), "already part of the catalog cluster") { + return nil + } + if err != nil { + logger.SetError(err, "Failed to join catalog cluster", "stderr", stderr.String(), "stdout", stdout.String()) + return err + } + + return nil +} + +func (c *CliWekaService) RemoveFromCatalogCluster(ctx context.Context, containerId int) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "RemoveFromCatalogCluster") + defer end() + + executor, err := c.ExecService.GetExecutor(ctx, c.Container) + if err != nil { + return err + } + + cmd := []string{ + "wekaauthcli", "catalog", "cluster", "update", "--remove-containers", strconv.Itoa(containerId), + } + + stdout, stderr, err := executor.ExecNamed(ctx, "RemoveFromCatalogCluster", cmd) + if err != nil { + if strings.Contains(stderr.String(), "is not part of the catalog cluster") { + logger.Warn("Container is not part of the catalog cluster", "containerId", containerId, "err", stderr.String(), "stdout", stdout.String()) + return nil + } + if strings.Contains(stderr.String(), fmt.Sprintf("Unrecognized host ID HostId<%d>", containerId)) { + logger.Warn("Container is not recognized by the catalog cluster", "containerId", containerId, "err", stderr.String(), "stdout", stdout.String()) + return nil + } + if strings.Contains(stderr.String(), fmt.Sprintf("Unrecognized host ID %d", containerId)) { + logger.Warn("Container is not recognized by the catalog cluster", "containerId", containerId, "err", stderr.String(), "stdout", stdout.String()) + return nil + } + if strings.Contains(stderr.String(), "Catalog cluster is not configured") { + logger.Warn("Catalog cluster is not configured", "stderr", stderr.String(), "stdout", stdout.String()) + return nil + } + logger.Error(err, "Failed to remove from catalog cluster", "stderr", stderr.String(), "stdout", stdout.String()) + return err + } + + return nil +} + +func (c *CliWekaService) ListCatalogClusterContainers(ctx context.Context) ([]int, error) { + // weka catalog cluster containers list --json + // Returns array of "HostId" strings + var hostIdStrings []string + + cmd := []string{ + "wekaauthcli", "catalog", "cluster", "containers", "list", "--json", + } + err := c.RunJsonCmd(ctx, cmd, "ListCatalogClusterContainers", &hostIdStrings) + if err != nil { + err = fmt.Errorf("failed to list catalog cluster containers: %w", err) + return nil, err + } + + containerIds := make([]int, 0, len(hostIdStrings)) + for _, hostIdStr := range hostIdStrings { + id, err := resources.HostIdToContainerId(hostIdStr) + if err != nil { + return nil, err + } + containerIds = append(containerIds, id) + } + return containerIds, nil +} + +func (c *CliWekaService) UpdateCatalogConfig(ctx context.Context, params CatalogConfigParams) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "UpdateCatalogConfig") + defer end() + + executor, err := c.ExecService.GetExecutor(ctx, c.Container) + if err != nil { + return err + } + + cmd := []string{ + "wekaauthcli", "catalog", "config", "update", + } + + if params.IndexInterval != "" { + cmd = append(cmd, "--index-interval", params.IndexInterval) + } + if params.RetentionPeriod != "" { + cmd = append(cmd, "--retention-period", params.RetentionPeriod) + } + + _, stderr, err := executor.ExecNamed(ctx, "UpdateCatalogConfig", cmd) + if err != nil { + logger.SetError(err, "Failed to update catalog config", "stderr", stderr.String()) + return err + } + + return nil +} diff --git a/pkg/weka-k8s-api b/pkg/weka-k8s-api index d82505df1..37b238444 160000 --- a/pkg/weka-k8s-api +++ b/pkg/weka-k8s-api @@ -1 +1 @@ -Subproject commit d82505df1546ed9e12fc6cf87f1cdaad570e47f2 +Subproject commit 37b2384445b57595d4b3c002863f90c4eeb3e94e From f2aecdd0c73fee4341a9337693ddddfed37f113a Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Wed, 4 Feb 2026 11:01:07 +0200 Subject: [PATCH 02/10] fix: dataserv containers sholud be stopped before destroying/pausing backends --- internal/controllers/wekacluster/steps_deletion.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/internal/controllers/wekacluster/steps_deletion.go b/internal/controllers/wekacluster/steps_deletion.go index a600e564a..e6d83eca7 100644 --- a/internal/controllers/wekacluster/steps_deletion.go +++ b/internal/controllers/wekacluster/steps_deletion.go @@ -72,6 +72,11 @@ func (r *wekaClusterReconcilerLoop) HandleGracefulDeletion(ctx context.Context) return err } + err = r.ensureContainersPaused(ctx, weka.WekaContainerModeDataServices) + if err != nil { + return err + } + err = r.ensureContainersPaused(ctx, "") if err != nil { return err @@ -255,6 +260,14 @@ func (r *wekaClusterReconcilerLoop) finalizeWekaCluster(ctx context.Context) err return err } + err = clusterService.EnsureNoContainers(ctx, weka.WekaContainerModeDataServices) + if err != nil { + reason := fmt.Sprintf("EnsureNo%sContainersError", weka.WekaContainerModeDataServices) + _ = r.RecordEventThrottled(v1.EventTypeWarning, reason, err.Error(), time.Second*30) + + return err + } + err = clusterService.EnsureNoContainers(ctx, "") if err != nil { reason := "EnsureNoContainersError" From e9df0e7dbc11f13dc135dc59af93e242bdf2d6a1 Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Wed, 4 Feb 2026 11:20:56 +0200 Subject: [PATCH 03/10] fix: do not select FEs for dataserv cluster create --- internal/controllers/wekacluster/funcs_catalog.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/controllers/wekacluster/funcs_catalog.go b/internal/controllers/wekacluster/funcs_catalog.go index 738f20459..fc4db9aba 100644 --- a/internal/controllers/wekacluster/funcs_catalog.go +++ b/internal/controllers/wekacluster/funcs_catalog.go @@ -37,7 +37,6 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er var containerIds []int dataServicesContainers := r.SelectDataServicesContainers(r.containers) - dataServicesFEContainers := r.SelectDataServicesFEContainers(r.containers) for _, c := range dataServicesContainers { if c.Status.ClusterContainerID != nil { @@ -45,12 +44,6 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er } } - for _, c := range dataServicesFEContainers { - if c.Status.ClusterContainerID != nil { - containerIds = append(containerIds, *c.Status.ClusterContainerID) - } - } - // Need at least 2 containers (1 data-services + 1 data-services-fe) to form catalog cluster if len(containerIds) < 2 { logger.Info("Not enough containers to form catalog cluster", "containerIds", len(containerIds)) From ef78fbff21564cde5e4560b4c2181f275f73e264 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 4 Feb 2026 11:12:04 +0200 Subject: [PATCH 04/10] fix: data catalog: creating fe by datasvc container and deleting before datasvc and alocate port 14611 --- internal/controllers/allocator/allocator.go | 12 ++ internal/controllers/allocator/ranges.go | 2 + .../controllers/wekacluster/funcs_catalog.go | 27 ++-- .../wekacluster/steps_cluster_creation.go | 10 +- .../wekacontainer/flow_active_state.go | 10 ++ .../funcs_active_state_catalog.go | 9 ++ .../funcs_active_state_data_services.go | 152 ++++++++++++++++++ pkg/weka-k8s-api | 2 +- 8 files changed, 199 insertions(+), 25 deletions(-) diff --git a/internal/controllers/allocator/allocator.go b/internal/controllers/allocator/allocator.go index a39ff6f15..43295c99b 100644 --- a/internal/controllers/allocator/allocator.go +++ b/internal/controllers/allocator/allocator.go @@ -291,6 +291,15 @@ func (t *ResourcesAllocator) AllocateClusterRange(ctx context.Context, cluster * } cluster.Status.Ports.S3Port = s3PortRange.Base + // Allocate Data Services port (fixed port outside cluster range, no boundary check needed) + if cluster.Status.Ports.DataServicesPort == 0 { + if cluster.Spec.Ports.DataServicesPort != 0 { + cluster.Status.Ports.DataServicesPort = cluster.Spec.Ports.DataServicesPort + } else { + cluster.Status.Ports.DataServicesPort = 14611 + } + } + // Management proxy port is allocated on-demand when the management proxy is first enabled // This avoids wasting a port if the feature is not used @@ -310,6 +319,9 @@ func GetClusterGlobalAllocatedRanges(cluster *weka.WekaCluster) (allocatedRanges if cluster.Status.Ports.ManagementProxyPort > 0 { allocatedRanges = append(allocatedRanges, Range{Base: cluster.Status.Ports.ManagementProxyPort, Size: 1}) } + if cluster.Status.Ports.DataServicesPort > 0 { + allocatedRanges = append(allocatedRanges, Range{Base: cluster.Status.Ports.DataServicesPort, Size: 2}) + } return } diff --git a/internal/controllers/allocator/ranges.go b/internal/controllers/allocator/ranges.go index 5434fb2b8..64133c58a 100644 --- a/internal/controllers/allocator/ranges.go +++ b/internal/controllers/allocator/ranges.go @@ -135,6 +135,8 @@ func getClusterPortByName(cluster *weka.WekaCluster, name string) int { return cluster.Status.Ports.S3Port case "managementProxy": return cluster.Status.Ports.ManagementProxyPort + case "dataServices": + return cluster.Status.Ports.DataServicesPort default: return 0 } diff --git a/internal/controllers/wekacluster/funcs_catalog.go b/internal/controllers/wekacluster/funcs_catalog.go index fc4db9aba..68b70ff29 100644 --- a/internal/controllers/wekacluster/funcs_catalog.go +++ b/internal/controllers/wekacluster/funcs_catalog.go @@ -5,24 +5,15 @@ import ( "github.com/pkg/errors" "github.com/weka/go-weka-observability/instrumentation" - weka "github.com/weka/weka-k8s-api/api/v1alpha1" "github.com/weka/weka-operator/internal/services" "github.com/weka/weka-operator/internal/services/discovery" ) -// SelectDataServicesFEContainers returns all data-services-fe containers from the given list -func (r *wekaClusterReconcilerLoop) SelectDataServicesFEContainers(containers []*weka.WekaContainer) []*weka.WekaContainer { - var feContainers []*weka.WekaContainer - for _, container := range containers { - if container.Spec.Mode == weka.WekaContainerModeDataServicesFe { - feContainers = append(feContainers, container) - } - } - return feContainers -} - -// EnsureCatalogCluster creates the catalog cluster with initial data-services and data-services-fe containers +// EnsureCatalogCluster creates the catalog cluster with data-services containers. +// Note: Only data-services container IDs are used for catalog cluster creation, +// not data-services-fe container IDs. The FE containers join the catalog cluster +// separately via JoinCatalogCluster. func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) error { ctx, logger, end := instrumentation.GetLogSpan(ctx, "EnsureCatalogCluster") defer end() @@ -32,8 +23,8 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er return errors.New("No active container found") } - // Collect container IDs from data-services and data-services-fe containers - // that have joined the cluster (have ClusterContainerID) + // Collect container IDs from data-services containers only + // (not data-services-fe - those join separately) var containerIds []int dataServicesContainers := r.SelectDataServicesContainers(r.containers) @@ -44,10 +35,10 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er } } - // Need at least 2 containers (1 data-services + 1 data-services-fe) to form catalog cluster + // Need at least 2 data-services containers to form catalog cluster if len(containerIds) < 2 { - logger.Info("Not enough containers to form catalog cluster", "containerIds", len(containerIds)) - return errors.New("Need at least 2 containers (data-services + data-services-fe) to form catalog cluster") + logger.Info("Not enough data-services containers to form catalog cluster", "containerIds", len(containerIds)) + return errors.New("Need at least 2 data-services containers to form catalog cluster") } wekaService := services.NewWekaService(r.ExecService, container) diff --git a/internal/controllers/wekacluster/steps_cluster_creation.go b/internal/controllers/wekacluster/steps_cluster_creation.go index debe0f724..197121966 100644 --- a/internal/controllers/wekacluster/steps_cluster_creation.go +++ b/internal/controllers/wekacluster/steps_cluster_creation.go @@ -348,7 +348,10 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp // Check if telemetry exports are configured hasTelemetryExports := cluster.Spec.Telemetry != nil && len(cluster.Spec.Telemetry.Exports) > 0 - for _, role := range []string{"drive", "compute", "s3", "envoy", "nfs", "telemetry", "data-services", "data-services-fe"} { + // Note: data-services-fe containers are NOT created here - they are created by + // the data-services container itself via ensureSiblingFECreated to ensure + // they are placed on the same node as their data-services sibling. + for _, role := range []string{"drive", "compute", "s3", "envoy", "nfs", "telemetry", "data-services"} { var numContainers int if clusterReady { @@ -372,8 +375,6 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp } case "data-services": numContainers = template.DataServicesContainers - case "data-services-fe": - numContainers = template.DataServicesContainers // 1:1 with data-services } } else { switch role { @@ -404,9 +405,6 @@ func BuildMissingContainers(ctx context.Context, cluster *weka.WekaCluster, temp if role == "telemetry" && hasTelemetryExports { numContainers = totalByrole["compute"] } - if role == "data-services-fe" { - numContainers = totalByrole["data-services"] - } for i := currentCount; i < numContainers; i++ { diff --git a/internal/controllers/wekacontainer/flow_active_state.go b/internal/controllers/wekacontainer/flow_active_state.go index 46afca213..d1ea9790b 100644 --- a/internal/controllers/wekacontainer/flow_active_state.go +++ b/internal/controllers/wekacontainer/flow_active_state.go @@ -577,6 +577,16 @@ func ActiveStateFlow(r *containerReconcilerLoop) []lifecycle.Step { r.container.HasJoinIps, }, }, + // For data-services containers, ensure sibling FE container exists on the same node + &lifecycle.SimpleStep{ + Run: r.ensureSiblingFECreated, + Predicates: lifecycle.Predicates{ + r.container.IsDataServicesContainer, + func() bool { + return r.container.Status.ClusterContainerID != nil + }, + }, + }, &lifecycle.SimpleStep{ State: &lifecycle.State{ Name: condition.CondJoinedCatalogCluster, diff --git a/internal/controllers/wekacontainer/funcs_active_state_catalog.go b/internal/controllers/wekacontainer/funcs_active_state_catalog.go index c949ff236..dd882c784 100644 --- a/internal/controllers/wekacontainer/funcs_active_state_catalog.go +++ b/internal/controllers/wekacontainer/funcs_active_state_catalog.go @@ -49,6 +49,15 @@ func (r *containerReconcilerLoop) RemoveFromCatalogCluster(ctx context.Context) return err } + // For data-services containers, ensure sibling FE is deleted first. + // The FE container has mounts that block its graceful stop, so we need + // to stop data-services first (done above), then delete the FE. + if r.container.IsDataServicesContainer() { + if err := r.ensureSiblingFEDeleted(ctx); err != nil { + return err + } + } + containerId := r.container.Status.ClusterContainerID if containerId == nil { return nil diff --git a/internal/controllers/wekacontainer/funcs_active_state_data_services.go b/internal/controllers/wekacontainer/funcs_active_state_data_services.go index a8ce19ba9..dd13aee93 100644 --- a/internal/controllers/wekacontainer/funcs_active_state_data_services.go +++ b/internal/controllers/wekacontainer/funcs_active_state_data_services.go @@ -14,11 +14,100 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/weka/weka-operator/internal/config" + "github.com/weka/weka-operator/internal/controllers/allocator" + "github.com/weka/weka-operator/internal/controllers/factory" "github.com/weka/weka-operator/internal/pkg/domain" ) +// ensureSiblingFECreated ensures a data-services-fe container exists on the same node +// as this data-services container. The FE container is created by the data-services +// container to ensure they are co-located on the same node. +func (r *containerReconcilerLoop) ensureSiblingFECreated(ctx context.Context) error { + if !r.container.IsDataServicesContainer() { + return nil + } + + ctx, logger, end := instrumentation.GetLogSpan(ctx, "ensureSiblingFECreated") + defer end() + + nodeName := r.container.GetNodeAffinity() + if nodeName == "" { + // Container not yet scheduled to a node + return nil + } + + ownerRefs := r.container.GetOwnerReferences() + if len(ownerRefs) == 0 { + return errors.New("no owner references found") + } + + ownerUid := string(ownerRefs[0].UID) + + // Check if sibling FE container already exists on the same node + // Note: We don't filter by node in the query because newly created containers + // don't have status.nodeAffinity set yet. Instead, we filter the results manually. + feContainers, err := r.KubeService.GetWekaContainersSimple(ctx, r.container.Namespace, "", map[string]string{ + domain.WekaLabelClusterId: ownerUid, + domain.WekaLabelMode: weka.WekaContainerModeDataServicesFe, + }) + if err != nil { + return err + } + + // Filter by node affinity (check both spec and status) + for _, fe := range feContainers { + feNodeAffinity := fe.Spec.NodeAffinity + if feNodeAffinity == "" { + feNodeAffinity = fe.Status.NodeAffinity + } + if feNodeAffinity == nodeName { + logger.Debug("Sibling data-services-fe container already exists on node", "node", nodeName, "fe_container", fe.Name) + return nil + } + } + + // Get cluster to create FE container + cluster, err := r.getCluster(ctx) + if err != nil { + return fmt.Errorf("failed to get cluster: %w", err) + } + + // Get template for container configuration + template, ok := allocator.GetTemplateByName(cluster.Spec.Template, *cluster) + if !ok { + return errors.New("failed to get cluster template") + } + + // Create the FE container + feName := allocator.NewContainerName(weka.WekaContainerModeDataServicesFe) + feContainer, err := factory.NewWekaContainerForWekaCluster(cluster, template, weka.WekaContainerModeDataServicesFe, feName) + if err != nil { + return fmt.Errorf("failed to build FE container: %w", err) + } + + // Set node affinity to the same node as data-services + feContainer.Spec.NodeAffinity = nodeName + + // Set owner reference to the cluster + if err := controllerutil.SetControllerReference(cluster, feContainer, r.Scheme); err != nil { + return fmt.Errorf("failed to set owner reference: %w", err) + } + + logger.Info("Creating sibling data-services-fe container", + "fe_container", feContainer.Name, + "node", nodeName, + ) + + if err := r.Client.Create(ctx, feContainer); err != nil { + return fmt.Errorf("failed to create FE container: %w", err) + } + + return lifecycle.NewWaitError(errors.New("waiting for sibling data-services-fe container to be created")) +} + func (r *containerReconcilerLoop) deleteDataServicesFEIfNoDataServicesNeighbor(ctx context.Context) error { if !r.container.IsDataServicesFEContainer() { return nil // only data-services-fe containers should be checked @@ -96,6 +185,69 @@ func (r *containerReconcilerLoop) deleteDataServicesFEIfNoDataServicesNeighbor(c return nil } +// ensureSiblingFEDeleted ensures the sibling data-services-fe container is deleted before +// the data-services container deletion proceeds. This function: +// 1. Finds the sibling FE container on the same node +// 2. Triggers deletion of the FE container +// 3. Waits for the FE container to be fully deleted +func (r *containerReconcilerLoop) ensureSiblingFEDeleted(ctx context.Context) error { + if !r.container.IsDataServicesContainer() { + return nil + } + + ctx, logger, end := instrumentation.GetLogSpan(ctx, "ensureSiblingFEDeleted") + defer end() + + nodeName := r.container.GetNodeAffinity() + if nodeName == "" { + return nil + } + + ownerRefs := r.container.GetOwnerReferences() + if len(ownerRefs) == 0 { + return errors.New("no owner references found") + } + + ownerUid := string(ownerRefs[0].UID) + + // Find sibling data-services-fe container on the same node + feContainers, err := r.KubeService.GetWekaContainersSimple(ctx, r.container.Namespace, string(nodeName), map[string]string{ + domain.WekaLabelClusterId: ownerUid, + domain.WekaLabelMode: weka.WekaContainerModeDataServicesFe, + }) + if err != nil { + return err + } + + if len(feContainers) == 0 { + logger.Debug("No sibling data-services-fe container found on node, proceeding with deletion", "node", nodeName) + return nil + } + + feContainer := &feContainers[0] + + // Trigger deletion if not already marked for deletion + if !feContainer.IsMarkedForDeletion() { + logger.Info("Triggering deletion of sibling data-services-fe container", + "fe_container", feContainer.Name, + "node", nodeName, + ) + + if err := r.Client.Delete(ctx, feContainer); err != nil { + return fmt.Errorf("failed to delete sibling FE container %s: %w", feContainer.Name, err) + } + + return lifecycle.NewWaitError(errors.New("waiting for sibling data-services-fe container to be deleted")) + } + + // FE container is being deleted, wait for it to complete + logger.Info("Waiting for sibling data-services-fe container deletion to complete", + "fe_container", feContainer.Name, + "node", nodeName, + ) + return lifecycle.NewWaitError(errors.New("waiting for sibling data-services-fe container deletion to complete")) +} + // ensureSiblingFEUpgraded ensures the sibling data-services-fe container is upgraded before // the data-services container pod is deleted for upgrade. This function: // 1. Finds the sibling FE container on the same node diff --git a/pkg/weka-k8s-api b/pkg/weka-k8s-api index 37b238444..c04e68cf0 160000 --- a/pkg/weka-k8s-api +++ b/pkg/weka-k8s-api @@ -1 +1 @@ -Subproject commit 37b2384445b57595d4b3c002863f90c4eeb3e94e +Subproject commit c04e68cf0e42d6ffdf5719a8346b1e0b422ec4ea From 628c8e7fe13b3d7365bc146351041dc7c885f742 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 4 Feb 2026 12:35:01 +0200 Subject: [PATCH 05/10] fix: set catalog port --- .../controllers/wekacluster/funcs_catalog.go | 8 +++++- internal/services/weka.go | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/internal/controllers/wekacluster/funcs_catalog.go b/internal/controllers/wekacluster/funcs_catalog.go index 68b70ff29..dd3cd87df 100644 --- a/internal/controllers/wekacluster/funcs_catalog.go +++ b/internal/controllers/wekacluster/funcs_catalog.go @@ -42,12 +42,18 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er } wekaService := services.NewWekaService(r.ExecService, container) + + // Set the catalog cluster port before creating the cluster + if err := wekaService.SetCatalogClusterPort(ctx, r.cluster.Status.Ports.DataServicesPort); err != nil { + return err + } + err := wekaService.CreateCatalogCluster(ctx, containerIds) if err != nil { return err } - logger.Info("Catalog cluster ensured", "containerIds", containerIds) + logger.Info("Catalog cluster ensured", "containerIds", containerIds, "port", r.cluster.Status.Ports.DataServicesPort) return nil } diff --git a/internal/services/weka.go b/internal/services/weka.go index e2c18c5be..19dcbf51b 100644 --- a/internal/services/weka.go +++ b/internal/services/weka.go @@ -365,6 +365,7 @@ type WekaService interface { GetCapacity(ctx context.Context) (WekaCapacityInfo, error) ConfigureDataServicesGlobalConfig(ctx context.Context) error GetCatalogCluster(ctx context.Context) (*CatalogCluster, error) + SetCatalogClusterPort(ctx context.Context, port int) error CreateCatalogCluster(ctx context.Context, containerIds []int) error JoinCatalogCluster(ctx context.Context, containerId int) error RemoveFromCatalogCluster(ctx context.Context, containerId int) error @@ -1463,6 +1464,31 @@ func (c *CliWekaService) GetCatalogCluster(ctx context.Context) (*CatalogCluster return &catalogCluster, nil } +func (c *CliWekaService) SetCatalogClusterPort(ctx context.Context, port int) error { + ctx, logger, end := instrumentation.GetLogSpan(ctx, "SetCatalogClusterPort") + defer end() + + executor, err := c.ExecService.GetExecutor(ctx, c.Container) + if err != nil { + return err + } + + cmd := []string{ + "weka", "debug", "config", "override", + "catalogInfo.clusterInfo.port", + strconv.Itoa(port), + } + + _, stderr, err := executor.ExecNamed(ctx, "SetCatalogClusterPort", cmd) + if err != nil { + logger.SetError(err, "Failed to set catalog cluster port", "stderr", stderr.String()) + return err + } + + logger.Info("Set catalog cluster port", "port", port) + return nil +} + func (c *CliWekaService) CreateCatalogCluster(ctx context.Context, containerIds []int) error { ctx, logger, end := instrumentation.GetLogSpan(ctx, "CreateCatalogCluster") defer end() From 2ce28de51c518c762a9a2446e7e1c9c975aa2f41 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 4 Feb 2026 13:53:20 +0200 Subject: [PATCH 06/10] fix: setup catalog when 2 datasvc are ready, no wait for all --- .../controllers/wekacluster/funcs_catalog.go | 28 +++++++++++++++---- .../controllers/wekacluster/funcs_helpers.go | 11 ++++++++ .../wekacontainer/flow_active_state.go | 6 ++-- internal/services/weka.go | 3 +- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/internal/controllers/wekacluster/funcs_catalog.go b/internal/controllers/wekacluster/funcs_catalog.go index dd3cd87df..fee02849d 100644 --- a/internal/controllers/wekacluster/funcs_catalog.go +++ b/internal/controllers/wekacluster/funcs_catalog.go @@ -23,22 +23,40 @@ func (r *wekaClusterReconcilerLoop) EnsureCatalogCluster(ctx context.Context) er return errors.New("No active container found") } - // Collect container IDs from data-services containers only + // Collect container IDs from data-services containers that have a ready FE on the same node // (not data-services-fe - those join separately) var containerIds []int dataServicesContainers := r.SelectDataServicesContainers(r.containers) + dataServicesFEContainers := r.selectDataServicesFEContainers(r.containers) + + // Build a map of node -> ready FE container for quick lookup + readyFEByNode := make(map[string]bool) + for _, fe := range dataServicesFEContainers { + // FE is ready if it has a ClusterContainerID and is running on a node + if fe.Status.ClusterContainerID != nil && fe.GetNodeAffinity() != "" { + readyFEByNode[string(fe.GetNodeAffinity())] = true + } + } for _, c := range dataServicesContainers { - if c.Status.ClusterContainerID != nil { + if c.Status.ClusterContainerID == nil { + continue + } + // Only include if there's a ready FE on the same node + nodeAffinity := string(c.GetNodeAffinity()) + if nodeAffinity != "" && readyFEByNode[nodeAffinity] { containerIds = append(containerIds, *c.Status.ClusterContainerID) } } - // Need at least 2 data-services containers to form catalog cluster + // Need at least 2 data-services containers with ready FE to form catalog cluster if len(containerIds) < 2 { - logger.Info("Not enough data-services containers to form catalog cluster", "containerIds", len(containerIds)) - return errors.New("Need at least 2 data-services containers to form catalog cluster") + logger.Info("Not enough data-services containers with ready FE to form catalog cluster", + "eligibleContainers", len(containerIds), + "totalDataServicesContainers", len(dataServicesContainers), + "totalFEContainers", len(dataServicesFEContainers)) + return errors.New("Need at least 2 data-services containers with ready FE to form catalog cluster") } wekaService := services.NewWekaService(r.ExecService, container) diff --git a/internal/controllers/wekacluster/funcs_helpers.go b/internal/controllers/wekacluster/funcs_helpers.go index 202f77af4..9d2fb728d 100644 --- a/internal/controllers/wekacluster/funcs_helpers.go +++ b/internal/controllers/wekacluster/funcs_helpers.go @@ -112,6 +112,17 @@ func (r *wekaClusterReconcilerLoop) SelectDataServicesContainers(containers []*w return dataServicesContainers } +func (r *wekaClusterReconcilerLoop) selectDataServicesFEContainers(containers []*weka.WekaContainer) []*weka.WekaContainer { + var feContainers []*weka.WekaContainer + for _, container := range containers { + if container.Spec.Mode == weka.WekaContainerModeDataServicesFe { + feContainers = append(feContainers, container) + } + } + + return feContainers +} + // ValidateDriveTypesRatio validates that driveTypesRatio.tlc > 0 when driveTypesRatio is specified. // This prevents QLC-only configurations which are not supported. func (r *wekaClusterReconcilerLoop) ValidateDriveTypesRatio(ctx context.Context) error { diff --git a/internal/controllers/wekacontainer/flow_active_state.go b/internal/controllers/wekacontainer/flow_active_state.go index d1ea9790b..63bfb1f9a 100644 --- a/internal/controllers/wekacontainer/flow_active_state.go +++ b/internal/controllers/wekacontainer/flow_active_state.go @@ -594,9 +594,9 @@ func ActiveStateFlow(r *containerReconcilerLoop) []lifecycle.Step { }, Run: r.JoinCatalogCluster, Predicates: lifecycle.Predicates{ - func() bool { - return r.container.IsDataServicesContainer() || r.container.IsDataServicesFEContainer() - }, + // Only data-services containers join the catalog cluster + // data-services-fe are frontend containers and should not join + r.container.IsDataServicesContainer, func() bool { return r.container.Status.ClusterContainerID != nil }, diff --git a/internal/services/weka.go b/internal/services/weka.go index 19dcbf51b..2c18ad1c9 100644 --- a/internal/services/weka.go +++ b/internal/services/weka.go @@ -1531,7 +1531,8 @@ func (c *CliWekaService) JoinCatalogCluster(ctx context.Context, containerId int } stdout, stderr, err := executor.ExecNamed(ctx, "JoinCatalogCluster", cmd) - if err != nil && strings.Contains(stderr.String(), "already part of the catalog cluster") { + if err != nil && strings.Contains(stderr.String(), "already part of") { + logger.Info("Container already part of catalog cluster", "containerId", containerId) return nil } if err != nil { From 3f95c8c597111a572f6d99d7001b541547c30d6f Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Thu, 5 Feb 2026 17:26:07 +0200 Subject: [PATCH 07/10] fix: data serv force stop --- charts/weka-operator/resources/weka_runtime.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/charts/weka-operator/resources/weka_runtime.py b/charts/weka-operator/resources/weka_runtime.py index 03ac6819b..c494ba01d 100644 --- a/charts/weka-operator/resources/weka_runtime.py +++ b/charts/weka-operator/resources/weka_runtime.py @@ -4400,7 +4400,9 @@ async def shutdown(): force_stop = True if is_wrong_generation(): force_stop = True - if MODE not in ["s3", "drive", "compute", "nfs", "data-services", "data-services-fe"]: + if MODE not in ["s3", "drive", "compute", "nfs"]: + # data services not included due to possibility to get stuck on immediate pod term with no agent running + # TODO: Refine for more safety, but dataserv is low risk and should be surviving failures regardless due to distributed nature force_stop = True stop_flag = "--force" if force_stop else "-g" From da3d54bd085712072e5445b1f8ac2bb619ed8b01 Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Thu, 5 Feb 2026 20:04:34 +0200 Subject: [PATCH 08/10] fix: dataserv and dataserv-fe metrics reporting --- internal/controllers/wekacontainer/metrics_steps.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/controllers/wekacontainer/metrics_steps.go b/internal/controllers/wekacontainer/metrics_steps.go index dea9eb14d..a2a848513 100644 --- a/internal/controllers/wekacontainer/metrics_steps.go +++ b/internal/controllers/wekacontainer/metrics_steps.go @@ -68,6 +68,8 @@ func MetricsSteps(loop *containerReconcilerLoop) []lifecycle.Step { weka.WekaContainerModeDrive, weka.WekaContainerModeEnvoy, weka.WekaContainerModeSSDProxy, + weka.WekaContainerModeDataServices, + weka.WekaContainerModeDataServicesFe, }, container.Spec.Mode) }, }, From f8de9b7da390e4d4b6afa879afbea0fdad9f1a92 Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Thu, 5 Feb 2026 21:36:59 +0200 Subject: [PATCH 09/10] fix: enable crd metrics for smbw and dataserv --- internal/controllers/wekacontainer/metrics_steps.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/controllers/wekacontainer/metrics_steps.go b/internal/controllers/wekacontainer/metrics_steps.go index a2a848513..50ee28ef4 100644 --- a/internal/controllers/wekacontainer/metrics_steps.go +++ b/internal/controllers/wekacontainer/metrics_steps.go @@ -44,6 +44,9 @@ func MetricsSteps(loop *containerReconcilerLoop) []lifecycle.Step { weka.WekaContainerModeS3, weka.WekaContainerModeNfs, weka.WekaContainerModeDrive, + weka.WekaContainerModeDataServices, + weka.WekaContainerModeSmbw, + weka.WekaContainerModeDataServicesFe, // TODO: Expand to clients, introduce API-level(or not) HasManagement check }, container.Spec.Mode) }, From 7f8d99657cce8cb67399bcc8d500092f3e4b4c08 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 5 Feb 2026 22:24:12 +0200 Subject: [PATCH 10/10] fix: update k8s api --- pkg/weka-k8s-api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/weka-k8s-api b/pkg/weka-k8s-api index c04e68cf0..d4b15bdac 160000 --- a/pkg/weka-k8s-api +++ b/pkg/weka-k8s-api @@ -1 +1 @@ -Subproject commit c04e68cf0e42d6ffdf5719a8346b1e0b422ec4ea +Subproject commit d4b15bdac09c3b80a1163ed8ed50e08cdf164c56