Skip to content
Open
14 changes: 9 additions & 5 deletions charts/weka-operator/resources/weka_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,7 @@ def is_managed_k8s(network_device=None):


async def create_container():
if MODE not in ["compute", "drive", "client", "s3", "nfs", "data-services"]:
if MODE not in ["compute", "drive", "client", "s3", "nfs", "data-services", "data-services-fe"]:
raise NotImplementedError(f"Unsupported mode: {MODE}")

full_cores = find_full_cores(NUM_CORES)
Expand All @@ -2367,6 +2367,8 @@ async def create_container():
mode_part = "--only-frontend-cores"
elif MODE == "data-services":
mode_part = "--only-dataserv-cores"
elif MODE == "data-services-fe":
mode_part = "--only-frontend-cores"

core_str = ",".join(map(str, full_cores))
logging.info(f"Creating container with cores: {core_str}")
Expand Down Expand Up @@ -3373,7 +3375,7 @@ async def ensure_ssdproxy_container():
_, _, ec = await run_command(cmd)
if ec != 0:
raise Exception(f"Failed to ensure ssdproxy container")

if not os.path.exists("/usr/bin/weka-sign-drive"):
os.symlink("/opt/weka/dist/extracted/weka-sign-drive", "/usr/bin/weka-sign-drive")
logging.info("Created symlink /usr/bin/weka-sign-drive -> /opt/weka/dist/extracted/weka-sign-drive")
Expand Down Expand Up @@ -3532,7 +3534,7 @@ async def wait_for_resources():
if MODE == 'client':
await ensure_client_ports()

if MODE not in ['drive', 's3', 'compute', 'nfs', 'envoy', 'client', 'telemetry', 'data-services']:
if MODE not in ['drive', 's3', 'compute', 'nfs', 'envoy', 'client', 'telemetry', 'data-services', 'data-services-fe']:
return

logging.info("waiting for controller to set resources")
Expand Down Expand Up @@ -3724,7 +3726,7 @@ async def get_devices_by_selectors(selectors_str: str) -> List[str]:

async def write_management_ips():
"""Auto-discover management IPs and write them to a file"""
if MODE not in ['drive', 'compute', 's3', 'nfs', 'client', 'data-services']:
if MODE not in ['drive', 'compute', 's3', 'nfs', 'client', 'data-services', 'data-services-fe']:
return

ipAddresses = []
Expand Down Expand Up @@ -4398,7 +4400,9 @@ async def shutdown():
force_stop = True
if is_wrong_generation():
force_stop = True
if MODE not in ["s3", "drive", "compute", "nfs", "data-services"]:
if MODE not in ["s3", "drive", "compute", "nfs"]:
# data services not included due to possibility to get stuck on immediate pod term with no agent running
# TODO: Refine for more safety, but dataserv is low risk and should be surviving failures regardless due to distributed nature
force_stop = True
stop_flag = "--force" if force_stop else "-g"

Expand Down
15 changes: 15 additions & 0 deletions doc/api_dump/wekacluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [S3Config](#s3config)
- [SmbwConfig](#smbwconfig)
- [TelemetryConfig](#telemetryconfig)
- [CatalogConfig](#catalogconfig)
- [ClusterMetrics](#clustermetrics)
- [ClusterPrinterColumns](#clusterprintercolumns)
- [RoleTopologySpreadConstraints](#roletopologyspreadconstraints)
Expand Down Expand Up @@ -103,6 +104,7 @@
| s3 | *S3Config | |
| smbw | *SmbwConfig | |
| telemetry | *TelemetryConfig | Telemetry configuration for exporting audit logs and other telemetry data |
| catalog | *CatalogConfig | Catalog configuration for data catalog service |

---

Expand Down Expand Up @@ -224,6 +226,7 @@
| envoy | int | |
| smbw | int | |
| dataServices | int | |
| dataServicesFe | int | |

---

Expand Down Expand Up @@ -279,6 +282,9 @@
| dataServicesExtraCores | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: number of data services extra cores per container |
| dataServicesHugepages | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage allocation for data services frontend |
| dataServicesHugepagesOffset | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage offset for data services frontend |
| dataServicesFeCores | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: number of data services frontend cores per container |
| dataServicesFeHugepages | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage allocation for data services frontend container |
| dataServicesFeHugepagesOffset | int | EXPERIMENTAL, ALPHA STATE, should not be used in production: hugepage offset for data services frontend container |

---

Expand Down Expand Up @@ -405,6 +411,15 @@

---

## CatalogConfig

| JSON Field | Type | Description |
|------------|------|-------------|
| indexInterval | string | IndexInterval specifies how often the catalog index is updated (e.g., "1d", "1m") |
| retentionPeriod | string | RetentionPeriod specifies how long catalog data is retained (e.g., "30d", "10m") |

---

## ClusterMetrics

| JSON Field | Type | Description |
Expand Down
6 changes: 4 additions & 2 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ var Config struct {
RecreateUnhealthyEnvoyThrottlingEnabled bool
SkipClientsTolerationValidation bool
TolerationsMismatchSettings TolerationsMismatchSettings
DeleteEnvoyWithoutS3NeighborTimeout time.Duration
DeleteTelemetryWithoutComputeNeighborTimeout time.Duration
DeleteEnvoyWithoutS3NeighborTimeout time.Duration
DeleteTelemetryWithoutComputeNeighborTimeout time.Duration
DeleteDataServicesFEWithoutDataServicesNeighborTimeout time.Duration
DeleteUnschedulablePodsAfter time.Duration
RemoveFailedDrivesFromWeka bool
AllowMultipleProtocolsPerNode bool
Expand Down Expand Up @@ -393,6 +394,7 @@ func ConfigureEnv(ctx context.Context) {
Config.TolerationsMismatchSettings.IgnoredTaints = getStringSlice("TOLERATIONS_MISMATCH_SETTINGS_IGNORED_TAINTS")
Config.DeleteEnvoyWithoutS3NeighborTimeout = getDurationEnvOrDefault("DELETE_ENVOY_WITHOUT_S3_NEIGHBOR_TIMEOUT", 5*time.Minute)
Config.DeleteTelemetryWithoutComputeNeighborTimeout = getDurationEnvOrDefault("DELETE_TELEMETRY_WITHOUT_COMPUTE_NEIGHBOR_TIMEOUT", 5*time.Minute)
Config.DeleteDataServicesFEWithoutDataServicesNeighborTimeout = getDurationEnvOrDefault("DELETE_DATA_SERVICES_FE_WITHOUT_DATA_SERVICES_NEIGHBOR_TIMEOUT", 1*time.Minute)
Config.DeleteUnschedulablePodsAfter = getDurationEnvOrDefault("DELETE_UNSCHEDULABLE_PODS_AFTER", 1*time.Minute)
Config.RemoveFailedDrivesFromWeka = getBoolEnvOrDefault("REMOVE_FAILED_DRIVES_FROM_WEKA", false)
Config.AllowMultipleProtocolsPerNode = getBoolEnvOrDefault("ALLOW_MULTIPLE_PROTOCOLS_PER_NODE", false)
Expand Down
12 changes: 12 additions & 0 deletions internal/controllers/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,15 @@ func (t *ResourcesAllocator) AllocateClusterRange(ctx context.Context, cluster *
}
cluster.Status.Ports.S3Port = s3PortRange.Base

// Allocate Data Services port (fixed port outside cluster range, no boundary check needed)
if cluster.Status.Ports.DataServicesPort == 0 {
if cluster.Spec.Ports.DataServicesPort != 0 {
cluster.Status.Ports.DataServicesPort = cluster.Spec.Ports.DataServicesPort
} else {
cluster.Status.Ports.DataServicesPort = 14611
}
}

// Management proxy port is allocated on-demand when the management proxy is first enabled
// This avoids wasting a port if the feature is not used

Expand All @@ -310,6 +319,9 @@ func GetClusterGlobalAllocatedRanges(cluster *weka.WekaCluster) (allocatedRanges
if cluster.Status.Ports.ManagementProxyPort > 0 {
allocatedRanges = append(allocatedRanges, Range{Base: cluster.Status.Ports.ManagementProxyPort, Size: 1})
}
if cluster.Status.Ports.DataServicesPort > 0 {
allocatedRanges = append(allocatedRanges, Range{Base: cluster.Status.Ports.DataServicesPort, Size: 2})
}
return
}

Expand Down
2 changes: 2 additions & 0 deletions internal/controllers/allocator/ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func getClusterPortByName(cluster *weka.WekaCluster, name string) int {
return cluster.Status.Ports.S3Port
case "managementProxy":
return cluster.Status.Ports.ManagementProxyPort
case "dataServices":
return cluster.Status.Ports.DataServicesPort
default:
return 0
}
Expand Down
145 changes: 82 additions & 63 deletions internal/controllers/allocator/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,41 @@ import (
)

type ClusterTemplate struct {
DriveCores int
DriveExtraCores int
ComputeCores int
ComputeExtraCores int
EnvoyCores int
S3Cores int
S3ExtraCores int
NfsCores int
NfsExtraCores int
ComputeContainers int
DriveContainers int
S3Containers int
NfsContainers int
NumDrives int
DriveCapacity int
ContainerCapacity int
DriveTypesRatio *v1alpha1.DriveTypesRatio
DriveHugepages int
DriveHugepagesOffset int
ComputeHugepages int
ComputeHugepagesOffset int
HugePageSize string
HugePagesOverride string
S3FrontendHugepages int
S3FrontendHugepagesOffset int
NfsFrontendHugepages int
NfsFrontendHugepagesOffset int
DataServicesCores int
DataServicesExtraCores int
DataServicesContainers int
DataServicesHugepages int
DataServicesHugepagesOffset int
DriveCores int
DriveExtraCores int
ComputeCores int
ComputeExtraCores int
EnvoyCores int
S3Cores int
S3ExtraCores int
NfsCores int
NfsExtraCores int
ComputeContainers int
DriveContainers int
S3Containers int
NfsContainers int
NumDrives int
DriveCapacity int
ContainerCapacity int
DriveTypesRatio *v1alpha1.DriveTypesRatio
DriveHugepages int
DriveHugepagesOffset int
ComputeHugepages int
ComputeHugepagesOffset int
HugePageSize string
HugePagesOverride string
S3FrontendHugepages int
S3FrontendHugepagesOffset int
NfsFrontendHugepages int
NfsFrontendHugepagesOffset int
DataServicesCores int
DataServicesExtraCores int
DataServicesContainers int
DataServicesHugepages int
DataServicesHugepagesOffset int
DataServicesFeCores int
DataServicesFeHugepages int
DataServicesFeHugepagesOffset int
}

func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate {
Expand Down Expand Up @@ -143,6 +146,19 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate {
config.DataServicesHugepagesOffset = 200
}

// Data-services-fe defaults (client-like: 1 core default, hugepages = cores * 1500)
if config.DataServicesFeCores == 0 {
config.DataServicesFeCores = 1
}

if config.DataServicesFeHugepages == 0 {
config.DataServicesFeHugepages = config.DataServicesFeCores * 1500 // Client-like formula
}

if config.DataServicesFeHugepagesOffset == 0 {
config.DataServicesFeHugepagesOffset = 200
}

if config.EnvoyCores == 0 {
config.EnvoyCores = 1
}
Expand All @@ -161,37 +177,40 @@ func BuildDynamicTemplate(config *v1alpha1.WekaConfig) ClusterTemplate {
}

return ClusterTemplate{
DriveCores: config.DriveCores,
DriveExtraCores: config.DriveExtraCores,
ComputeCores: config.ComputeCores,
ComputeExtraCores: config.ComputeExtraCores,
ComputeContainers: *config.ComputeContainers,
DriveContainers: *config.DriveContainers,
S3Containers: config.S3Containers,
S3Cores: config.S3Cores,
S3ExtraCores: config.S3ExtraCores,
NfsContainers: config.NfsContainers,
NumDrives: config.NumDrives,
DriveCapacity: config.DriveCapacity,
ContainerCapacity: config.ContainerCapacity,
DriveTypesRatio: config.DriveTypesRatio,
DriveHugepages: config.DriveHugepages,
DriveHugepagesOffset: config.DriveHugepagesOffset,
ComputeHugepages: config.ComputeHugepages,
ComputeHugepagesOffset: config.ComputeHugepagesOffset,
S3FrontendHugepages: config.S3FrontendHugepages,
S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset,
HugePageSize: hgSize,
EnvoyCores: config.EnvoyCores,
NfsCores: config.NfsCores,
NfsExtraCores: config.NfsExtraCores,
NfsFrontendHugepages: config.NfsFrontendHugepages,
NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset,
DataServicesContainers: config.DataServicesContainers,
DataServicesCores: config.DataServicesCores,
DataServicesExtraCores: config.DataServicesExtraCores,
DataServicesHugepages: config.DataServicesHugepages,
DataServicesHugepagesOffset: config.DataServicesHugepagesOffset,
DriveCores: config.DriveCores,
DriveExtraCores: config.DriveExtraCores,
ComputeCores: config.ComputeCores,
ComputeExtraCores: config.ComputeExtraCores,
ComputeContainers: *config.ComputeContainers,
DriveContainers: *config.DriveContainers,
S3Containers: config.S3Containers,
S3Cores: config.S3Cores,
S3ExtraCores: config.S3ExtraCores,
NfsContainers: config.NfsContainers,
NumDrives: config.NumDrives,
DriveCapacity: config.DriveCapacity,
ContainerCapacity: config.ContainerCapacity,
DriveTypesRatio: config.DriveTypesRatio,
DriveHugepages: config.DriveHugepages,
DriveHugepagesOffset: config.DriveHugepagesOffset,
ComputeHugepages: config.ComputeHugepages,
ComputeHugepagesOffset: config.ComputeHugepagesOffset,
S3FrontendHugepages: config.S3FrontendHugepages,
S3FrontendHugepagesOffset: config.S3FrontendHugepagesOffset,
HugePageSize: hgSize,
EnvoyCores: config.EnvoyCores,
NfsCores: config.NfsCores,
NfsExtraCores: config.NfsExtraCores,
NfsFrontendHugepages: config.NfsFrontendHugepages,
NfsFrontendHugepagesOffset: config.NfsFrontendHugepagesOffset,
DataServicesContainers: config.DataServicesContainers,
DataServicesCores: config.DataServicesCores,
DataServicesExtraCores: config.DataServicesExtraCores,
DataServicesHugepages: config.DataServicesHugepages,
DataServicesHugepagesOffset: config.DataServicesHugepagesOffset,
DataServicesFeCores: config.DataServicesFeCores,
DataServicesFeHugepages: config.DataServicesFeHugepages,
DataServicesFeHugepagesOffset: config.DataServicesFeHugepagesOffset,
}

}
Expand Down
12 changes: 12 additions & 0 deletions internal/controllers/factory/container_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster,
numCores = template.DataServicesCores
hugePagesNum = template.DataServicesHugepages
hugePagesOffset = template.DataServicesHugepagesOffset
case "data-services-fe":
numCores = template.DataServicesFeCores
hugePagesNum = template.DataServicesFeHugepages
hugePagesOffset = template.DataServicesFeHugepagesOffset
default:
return nil, fmt.Errorf("unsupported role %s", role)
}
Expand Down Expand Up @@ -87,6 +91,8 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster,
case "data-services":
additionalMemory = cluster.Spec.AdditionalMemory.DataServices
extraCores = template.DataServicesExtraCores
case "data-services-fe":
additionalMemory = cluster.Spec.AdditionalMemory.DataServicesFe
case "envoy":
additionalMemory = cluster.Spec.AdditionalMemory.Envoy
}
Expand All @@ -98,6 +104,9 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster,
if slices.Contains([]string{"compute", "telemetry"}, role) {
containerGroup = "compute"
}
if slices.Contains([]string{"data-services", "data-services-fe"}, role) {
containerGroup = "data-services"
}

wekahomeConfig, err := domain.GetWekahomeConfig(cluster)
if err != nil {
Expand All @@ -119,6 +128,9 @@ func NewWekaContainerForWekaCluster(cluster *wekav1alpha1.WekaCluster,
if role == wekav1alpha1.WekaContainerModeTelemetry { // telemetry sticks to compute, so does not need explicit node selector
nodeSelector = map[string]string{}
}
if role == wekav1alpha1.WekaContainerModeDataServicesFe { // data-services-fe sticks to data-services, so does not need explicit node selector
nodeSelector = map[string]string{}
}

container := &wekav1alpha1.WekaContainer{
TypeMeta: metav1.TypeMeta{
Expand Down
Loading
Loading