From d9e1ad6ecd29d367a9fa3f6f26dd40304e641cd7 Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 12:58:20 +0100 Subject: [PATCH 1/6] Committed reservations allocation lifecycle --- .../reservations/commitments/config.go | 17 +- .../reservations/commitments/controller.go | 251 +++++++++++++----- .../commitments/controller_test.go | 206 ++++++++++++++ 3 files changed, 404 insertions(+), 70 deletions(-) diff --git a/internal/scheduling/reservations/commitments/config.go b/internal/scheduling/reservations/commitments/config.go index 7a6c9005f..17d89c8c0 100644 --- a/internal/scheduling/reservations/commitments/config.go +++ b/internal/scheduling/reservations/commitments/config.go @@ -11,10 +11,17 @@ import ( type Config struct { - // RequeueIntervalActive is the interval for requeueing active reservations for verification. + // RequeueIntervalActive is the interval for requeueing active reservations for periodic verification. RequeueIntervalActive time.Duration `json:"committedResourceRequeueIntervalActive"` // RequeueIntervalRetry is the interval for requeueing when retrying after knowledge is not ready. RequeueIntervalRetry time.Duration `json:"committedResourceRequeueIntervalRetry"` + // AllocationGracePeriod is the time window after a VM is allocated to a reservation + // during which it's expected to appear on the target host. VMs not confirmed within + // this period are considered stale and removed from the reservation. + AllocationGracePeriod time.Duration `json:"committedResourceAllocationGracePeriod"` + // RequeueIntervalGracePeriod is the interval for requeueing when VMs are in grace period. + // Shorter than RequeueIntervalActive for faster verification of new allocations. + RequeueIntervalGracePeriod time.Duration `json:"committedResourceRequeueIntervalGracePeriod"` // PipelineDefault is the default pipeline used for scheduling committed resource reservations. PipelineDefault string `json:"committedResourcePipelineDefault"` @@ -68,6 +75,12 @@ func (c *Config) ApplyDefaults() { if c.RequeueIntervalRetry == 0 { c.RequeueIntervalRetry = defaults.RequeueIntervalRetry } + if c.RequeueIntervalGracePeriod == 0 { + c.RequeueIntervalGracePeriod = defaults.RequeueIntervalGracePeriod + } + if c.AllocationGracePeriod == 0 { + c.AllocationGracePeriod = defaults.AllocationGracePeriod + } if c.PipelineDefault == "" { c.PipelineDefault = defaults.PipelineDefault } @@ -88,6 +101,8 @@ func DefaultConfig() Config { return Config{ RequeueIntervalActive: 5 * time.Minute, RequeueIntervalRetry: 1 * time.Minute, + RequeueIntervalGracePeriod: 1 * time.Minute, + AllocationGracePeriod: 15 * time.Minute, PipelineDefault: "kvm-general-purpose-load-balancing", SchedulerURL: "http://localhost:8080/scheduler/nova/external", ChangeAPIWatchReservationsTimeout: 10 * time.Second, diff --git a/internal/scheduling/reservations/commitments/controller.go b/internal/scheduling/reservations/commitments/controller.go index 9c238aeee..c2e364721 100644 --- a/internal/scheduling/reservations/commitments/controller.go +++ b/internal/scheduling/reservations/commitments/controller.go @@ -23,9 +23,9 @@ import ( schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" - "github.com/cobaltcore-dev/cortex/internal/knowledge/datasources/plugins/openstack/nova" "github.com/cobaltcore-dev/cortex/internal/knowledge/db" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + schedulingnova "github.com/cobaltcore-dev/cortex/internal/scheduling/nova" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/cobaltcore-dev/cortex/pkg/multicluster" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -44,6 +44,8 @@ type CommitmentReservationController struct { DB *db.DB // SchedulerClient for making scheduler API calls. SchedulerClient *reservations.SchedulerClient + // NovaClient for direct Nova API calls (real-time VM status). + NovaClient schedulingnova.NovaClient } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -96,13 +98,18 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr if meta.IsStatusConditionTrue(res.Status.Conditions, v1alpha1.ReservationConditionReady) { logger.V(1).Info("reservation is active, verifying allocations") - // Verify all allocations in Spec against actual VM state from database - if err := r.reconcileAllocations(ctx, &res); err != nil { + // Verify all allocations in Spec against actual VM state + result, err := r.reconcileAllocations(ctx, &res) + if err != nil { logger.Error(err, "failed to reconcile allocations") return ctrl.Result{}, err } - // Requeue periodically to keep verifying allocations + // Requeue with appropriate interval based on allocation state + // Use shorter interval if there are allocations in grace period for faster verification + if result.HasAllocationsInGracePeriod { + return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalGracePeriod}, nil + } return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalActive}, nil } @@ -322,82 +329,204 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, nil } -// reconcileAllocations verifies all allocations in Spec against actual Nova VM state. +// reconcileAllocationsResult holds the outcome of allocation reconciliation. +type reconcileAllocationsResult struct { + // HasAllocationsInGracePeriod is true if any allocations are still in grace period. + HasAllocationsInGracePeriod bool +} + +// reconcileAllocations verifies all allocations in Spec against actual VM state. // It updates Status.Allocations based on the actual host location of each VM. -func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) error { +// For new allocations (within grace period), it uses the Nova API for real-time status. +// For older allocations, it uses the Hypervisor CRD to check if VM is on the expected host. +func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) (*reconcileAllocationsResult, error) { logger := LoggerFromContext(ctx).WithValues("component", "controller") + result := &reconcileAllocationsResult{} + now := time.Now() // Skip if no CommittedResourceReservation if res.Spec.CommittedResourceReservation == nil { - return nil + return result, nil } - // TODO trigger migrations of unused reservations (to PAYG VMs) - // Skip if no allocations to verify if len(res.Spec.CommittedResourceReservation.Allocations) == 0 { logger.V(1).Info("no allocations to verify", "reservation", res.Name) - return nil + return result, nil } - // Query all VMs for this project from the database - projectID := res.Spec.CommittedResourceReservation.ProjectID - serverMap, err := r.listServersByProjectID(ctx, projectID) - if err != nil { - return fmt.Errorf("failed to list servers for project %s: %w", projectID, err) + expectedHost := res.Status.Host + + // Fetch the Hypervisor CRD for the expected host (for older allocations) + var hypervisor hv1.Hypervisor + hvInstanceSet := make(map[string]bool) + if expectedHost != "" { + if err := r.Get(ctx, client.ObjectKey{Name: expectedHost}, &hypervisor); err != nil { + if client.IgnoreNotFound(err) != nil { + return nil, fmt.Errorf("failed to get hypervisor %s: %w", expectedHost, err) + } + // Hypervisor not found - all older allocations will be checked via Nova API fallback + logger.Info("hypervisor CRD not found", "host", expectedHost) + } else { + // Build set of active VM UUIDs on this hypervisor for O(1) lookup + for _, inst := range hypervisor.Status.Instances { + if inst.Active { + hvInstanceSet[inst.ID] = true + } + } + logger.V(1).Info("fetched hypervisor instances", "host", expectedHost, "instanceCount", len(hvInstanceSet)) + } } - // initialize + // Initialize status if res.Status.CommittedResourceReservation == nil { res.Status.CommittedResourceReservation = &v1alpha1.CommittedResourceReservationStatus{} } // Build new Status.Allocations map based on actual VM locations newStatusAllocations := make(map[string]string) + // Track allocations to remove from Spec (stale/leaving VMs) + var allocationsToRemove []string + + for vmUUID, allocation := range res.Spec.CommittedResourceReservation.Allocations { + allocationAge := now.Sub(allocation.CreationTimestamp.Time) + isInGracePeriod := allocationAge < r.Conf.AllocationGracePeriod + + if isInGracePeriod { + // New allocation: use Nova API for real-time status + result.HasAllocationsInGracePeriod = true + + if r.NovaClient == nil { + // No Nova client - skip verification for now, retry later + logger.V(1).Info("Nova client not available, skipping new allocation verification", + "vm", vmUUID, + "allocationAge", allocationAge) + continue + } - for vmUUID := range res.Spec.CommittedResourceReservation.Allocations { - server, exists := serverMap[vmUUID] - if exists { - // VM found - record its actual host location - actualHost := server.OSEXTSRVATTRHost - newStatusAllocations[vmUUID] = actualHost - - logger.V(1).Info("verified VM allocation", - "vm", vmUUID, - "actualHost", actualHost, - "expectedHost", res.Status.Host) - } else { - // VM not found in database - logger.Info("VM not found in database", - "vm", vmUUID, - "reservation", res.Name, - "projectID", projectID) + server, err := r.NovaClient.Get(ctx, vmUUID) + if err != nil { + // VM not yet available in Nova (still spawning) - retry on next reconcile + logger.V(1).Info("VM not yet available in Nova API", + "vm", vmUUID, + "error", err.Error(), + "allocationAge", allocationAge) + // Keep in Spec, don't add to Status - will retry on next reconcile + continue + } - // TODO handle entering and leave event + actualHost := server.ComputeHost + switch { + case actualHost == expectedHost: + // VM is on expected host - confirmed running + newStatusAllocations[vmUUID] = actualHost + logger.V(1).Info("verified new VM allocation via Nova API", + "vm", vmUUID, + "actualHost", actualHost, + "allocationAge", allocationAge) + case actualHost != "": + // VM is on different host - migration scenario (log for now) + newStatusAllocations[vmUUID] = actualHost + logger.Info("VM on different host than expected (migration?)", + "vm", vmUUID, + "actualHost", actualHost, + "expectedHost", expectedHost, + "allocationAge", allocationAge) + default: + // VM not yet on any host - still spawning + logger.V(1).Info("VM not yet on host (spawning)", + "vm", vmUUID, + "status", server.Status, + "allocationAge", allocationAge) + // Keep in Spec, don't add to Status - will retry on next reconcile + } + } else { + // Older allocation: use Hypervisor CRD for verification + if hvInstanceSet[vmUUID] { + // VM found on expected hypervisor - confirmed running + newStatusAllocations[vmUUID] = expectedHost + logger.V(1).Info("verified VM allocation via Hypervisor CRD", + "vm", vmUUID, + "host", expectedHost) + } else { + // VM not found on expected hypervisor - check Nova API as fallback + if r.NovaClient != nil { + novaServer, err := r.NovaClient.Get(ctx, vmUUID) + if err == nil && novaServer.ComputeHost != "" { + // VM exists but on different host - migration or placement change + newStatusAllocations[vmUUID] = novaServer.ComputeHost + logger.Info("VM found via Nova API fallback (not on expected host)", + "vm", vmUUID, + "actualHost", novaServer.ComputeHost, + "expectedHost", expectedHost) + continue + } + // Nova API confirms VM doesn't exist or has no host + logger.V(1).Info("Nova API confirmed VM not found", + "vm", vmUUID, + "error", err) + } + // VM not found on hypervisor and not in Nova - mark for removal (leaving VM) + allocationsToRemove = append(allocationsToRemove, vmUUID) + logger.Info("removing stale allocation (VM not found on hypervisor or Nova)", + "vm", vmUUID, + "reservation", res.Name, + "expectedHost", expectedHost, + "allocationAge", allocationAge, + "gracePeriod", r.Conf.AllocationGracePeriod) + } } } - // Patch the reservation status + // Patch the reservation old := res.DeepCopy() + specChanged := false + + // Remove stale allocations from Spec + if len(allocationsToRemove) > 0 { + for _, vmUUID := range allocationsToRemove { + delete(res.Spec.CommittedResourceReservation.Allocations, vmUUID) + } + specChanged = true + } // Update Status.Allocations res.Status.CommittedResourceReservation.Allocations = newStatusAllocations + // Patch Spec if changed (stale allocations removed) + if specChanged { + if err := r.Patch(ctx, res, client.MergeFrom(old)); err != nil { + if client.IgnoreNotFound(err) == nil { + return result, nil + } + return nil, fmt.Errorf("failed to patch reservation spec: %w", err) + } + // Re-fetch to get the updated resource version for status patch + if err := r.Get(ctx, client.ObjectKeyFromObject(res), res); err != nil { + if client.IgnoreNotFound(err) == nil { + return result, nil + } + return nil, fmt.Errorf("failed to re-fetch reservation: %w", err) + } + old = res.DeepCopy() + } + + // Patch Status patch := client.MergeFrom(old) if err := r.Status().Patch(ctx, res, patch); err != nil { - // Ignore not-found errors during background deletion if client.IgnoreNotFound(err) == nil { - // Object was deleted, no need to continue - return nil + return result, nil } - return fmt.Errorf("failed to patch reservation status: %w", err) + return nil, fmt.Errorf("failed to patch reservation status: %w", err) } logger.V(1).Info("reconciled allocations", "specAllocations", len(res.Spec.CommittedResourceReservation.Allocations), - "statusAllocations", len(newStatusAllocations)) + "statusAllocations", len(newStatusAllocations), + "removedAllocations", len(allocationsToRemove), + "hasAllocationsInGracePeriod", result.HasAllocationsInGracePeriod) - return nil + return result, nil } // getPipelineForFlavorGroup returns the pipeline name for a given flavor group. @@ -432,36 +561,20 @@ func (r *CommitmentReservationController) Init(ctx context.Context, client clien r.SchedulerClient = reservations.NewSchedulerClient(conf.SchedulerURL) logf.FromContext(ctx).Info("scheduler client initialized for commitment reservation controller", "url", conf.SchedulerURL) - return nil -} - -func (r *CommitmentReservationController) listServersByProjectID(ctx context.Context, projectID string) (map[string]*nova.Server, error) { - if r.DB == nil { - return nil, errors.New("database connection not initialized") - } - - logger := LoggerFromContext(ctx).WithValues("component", "controller") - - // Query servers from the database cache. - var servers []nova.Server - _, err := r.DB.Select(&servers, - "SELECT * FROM openstack_servers WHERE tenant_id = $1", - projectID) - if err != nil { - return nil, fmt.Errorf("failed to query servers from database: %w", err) - } - - logger.V(1).Info("queried servers from database", - "projectID", projectID, - "serverCount", len(servers)) - - // Build lookup map for O(1) access by VM UUID. - serverMap := make(map[string]*nova.Server, len(servers)) - for i := range servers { - serverMap[servers[i].ID] = &servers[i] + // Initialize Nova client for real-time VM status checks (optional). + // Skip if NovaClient is already set (e.g., injected for testing) or if keystone not configured. + if r.NovaClient == nil && conf.KeystoneSecretRef.Name != "" { + r.NovaClient = schedulingnova.NewNovaClient() + if err := r.NovaClient.Init(ctx, client, schedulingnova.NovaClientConfig{ + KeystoneSecretRef: conf.KeystoneSecretRef, + SSOSecretRef: conf.SSOSecretRef, + }); err != nil { + return fmt.Errorf("failed to initialize Nova client: %w", err) + } + logf.FromContext(ctx).Info("Nova client initialized for commitment reservation controller") } - return serverMap, nil + return nil } // commitmentReservationPredicate filters to only watch commitment reservations. diff --git a/internal/scheduling/reservations/commitments/controller_test.go b/internal/scheduling/reservations/commitments/controller_test.go index 5af5dfca9..20c8f55d2 100644 --- a/internal/scheduling/reservations/commitments/controller_test.go +++ b/internal/scheduling/reservations/commitments/controller_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" @@ -140,6 +141,211 @@ func TestCommitmentReservationController_Reconcile(t *testing.T) { } } +// ============================================================================ +// Test: reconcileAllocations +// ============================================================================ + +// Note: Full reconcileAllocations tests require mocking NovaClient, which uses +// unexported types (nova.server, nova.migration). Tests for the Nova API path +// would need to be placed in the nova package or the types would need to be exported. +// For now, we test only the Hypervisor CRD path (when NovaClient is nil). + +func TestReconcileAllocations_HypervisorCRDPath(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + if err := hv1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add hypervisor scheme: %v", err) + } + + now := time.Now() + recentTime := metav1.NewTime(now.Add(-5 * time.Minute)) // 5 minutes ago (within grace period) + oldTime := metav1.NewTime(now.Add(-30 * time.Minute)) // 30 minutes ago (past grace period) + + tests := []struct { + name string + reservation *v1alpha1.Reservation + hypervisor *hv1.Hypervisor + config Config + expectedStatusAllocations map[string]string + expectedHasGracePeriodAllocs bool + }{ + { + name: "old allocation - VM found on hypervisor CRD", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-1", Name: "vm-1", Active: true}, + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-1": "host-1"}, + expectedHasGracePeriodAllocs: false, + }, + { + name: "old allocation - VM not on hypervisor CRD (no NovaClient fallback)", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{}), // Empty + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, // Not confirmed + expectedHasGracePeriodAllocs: false, + }, + { + name: "new allocation within grace period - no Nova client", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": recentTime, + }), + hypervisor: nil, + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, // Can't verify without Nova + expectedHasGracePeriodAllocs: true, + }, + { + name: "mixed allocations - old verified via CRD, new in grace period", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-new": recentTime, // In grace period + "vm-old": oldTime, // Past grace period + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-old", Name: "vm-old", Active: true}, + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-old": "host-1"}, // Only old one confirmed via CRD + expectedHasGracePeriodAllocs: true, + }, + { + name: "empty allocations - no work to do", + reservation: newTestCRReservation(map[string]metav1.Time{}), + hypervisor: nil, + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, + expectedHasGracePeriodAllocs: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Build fake client with objects + objects := []client.Object{tt.reservation} + if tt.hypervisor != nil { + objects = append(objects, tt.hypervisor) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + WithStatusSubresource(&v1alpha1.Reservation{}). + Build() + + controller := &CommitmentReservationController{ + Client: k8sClient, + Scheme: scheme, + Conf: tt.config, + NovaClient: nil, // No NovaClient - testing Hypervisor CRD path only + } + + ctx := WithNewGlobalRequestID(context.Background()) + result, err := controller.reconcileAllocations(ctx, tt.reservation) + if err != nil { + t.Fatalf("reconcileAllocations() error = %v", err) + } + + // Check grace period result + if result.HasAllocationsInGracePeriod != tt.expectedHasGracePeriodAllocs { + t.Errorf("expected HasAllocationsInGracePeriod=%v, got %v", + tt.expectedHasGracePeriodAllocs, result.HasAllocationsInGracePeriod) + } + + // Re-fetch reservation to check updates + var updated v1alpha1.Reservation + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(tt.reservation), &updated); err != nil { + t.Fatalf("failed to get updated reservation: %v", err) + } + + // Check status allocations + actualStatusAllocs := map[string]string{} + if updated.Status.CommittedResourceReservation != nil { + actualStatusAllocs = updated.Status.CommittedResourceReservation.Allocations + } + + if len(actualStatusAllocs) != len(tt.expectedStatusAllocations) { + t.Errorf("expected %d status allocations, got %d: %v", + len(tt.expectedStatusAllocations), len(actualStatusAllocs), actualStatusAllocs) + } + + for vmUUID, expectedHost := range tt.expectedStatusAllocations { + if actualHost, ok := actualStatusAllocs[vmUUID]; !ok { + t.Errorf("expected VM %s in status allocations", vmUUID) + } else if actualHost != expectedHost { + t.Errorf("VM %s: expected host %s, got %s", vmUUID, expectedHost, actualHost) + } + } + }) + } +} + +// newTestCRReservation creates a test CR reservation with allocations on "host-1". +func newTestCRReservation(allocations map[string]metav1.Time) *v1alpha1.Reservation { + const host = "host-1" + specAllocs := make(map[string]v1alpha1.CommittedResourceAllocation) + for vmUUID, timestamp := range allocations { + specAllocs[vmUUID] = v1alpha1.CommittedResourceAllocation{ + CreationTimestamp: timestamp, + Resources: map[hv1.ResourceName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + "cpu": resource.MustParse("2"), + }, + } + } + + return &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-reservation", + }, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + TargetHost: host, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: "test-project", + ResourceName: "test-flavor", + Allocations: specAllocs, + }, + }, + Status: v1alpha1.ReservationStatus{ + Host: host, + Conditions: []metav1.Condition{ + { + Type: v1alpha1.ReservationConditionReady, + Status: metav1.ConditionTrue, + Reason: "ReservationActive", + }, + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationStatus{ + Allocations: make(map[string]string), + }, + }, + } +} + +// newTestHypervisorCRD creates a test Hypervisor CRD with instances. +func newTestHypervisorCRD(name string, instances []hv1.Instance) *hv1.Hypervisor { + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: hv1.HypervisorStatus{ + Instances: instances, + }, + } +} + +// ============================================================================ +// Test: reconcileInstanceReservation_Success (existing test) +// ============================================================================ + func TestCommitmentReservationController_reconcileInstanceReservation_Success(t *testing.T) { scheme := runtime.NewScheme() if err := v1alpha1.AddToScheme(scheme); err != nil { From 0e2d00e887ad8bf4f0335ab3f3e493f2f611fa6c Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 13:14:02 +0100 Subject: [PATCH 2/6] consider inactive VMs correctly --- .../reservations/commitments/controller.go | 7 +++---- .../reservations/commitments/controller_test.go | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/scheduling/reservations/commitments/controller.go b/internal/scheduling/reservations/commitments/controller.go index c2e364721..0560313b3 100644 --- a/internal/scheduling/reservations/commitments/controller.go +++ b/internal/scheduling/reservations/commitments/controller.go @@ -368,11 +368,10 @@ func (r *CommitmentReservationController) reconcileAllocations(ctx context.Conte // Hypervisor not found - all older allocations will be checked via Nova API fallback logger.Info("hypervisor CRD not found", "host", expectedHost) } else { - // Build set of active VM UUIDs on this hypervisor for O(1) lookup + // Build set of all VM UUIDs on this hypervisor for O(1) lookup + // Include both active and inactive VMs - stopped/shelved VMs still consume the reservation slot for _, inst := range hypervisor.Status.Instances { - if inst.Active { - hvInstanceSet[inst.ID] = true - } + hvInstanceSet[inst.ID] = true } logger.V(1).Info("fetched hypervisor instances", "host", expectedHost, "instanceCount", len(hvInstanceSet)) } diff --git a/internal/scheduling/reservations/commitments/controller_test.go b/internal/scheduling/reservations/commitments/controller_test.go index 20c8f55d2..68d13faf3 100644 --- a/internal/scheduling/reservations/commitments/controller_test.go +++ b/internal/scheduling/reservations/commitments/controller_test.go @@ -183,6 +183,18 @@ func TestReconcileAllocations_HypervisorCRDPath(t *testing.T) { expectedStatusAllocations: map[string]string{"vm-1": "host-1"}, expectedHasGracePeriodAllocs: false, }, + { + name: "old allocation - inactive VM still counted (stopped/shelved)", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-stopped": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-stopped", Name: "vm-stopped", Active: false}, // Inactive VM should still be found + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-stopped": "host-1"}, + expectedHasGracePeriodAllocs: false, + }, { name: "old allocation - VM not on hypervisor CRD (no NovaClient fallback)", reservation: newTestCRReservation(map[string]metav1.Time{ @@ -331,6 +343,8 @@ func newTestCRReservation(allocations map[string]metav1.Time) *v1alpha1.Reservat } // newTestHypervisorCRD creates a test Hypervisor CRD with instances. +// +//nolint:unparam // name parameter allows future test flexibility func newTestHypervisorCRD(name string, instances []hv1.Instance) *hv1.Hypervisor { return &hv1.Hypervisor{ ObjectMeta: metav1.ObjectMeta{ From 4729f803b38106197519b9ac9304401442f92502 Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 15:42:42 +0100 Subject: [PATCH 3/6] fix alerts --- .../cortex-nova/alerts/nova.alerts.yaml | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml index 784830aac..4b5bba931 100644 --- a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml +++ b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml @@ -486,7 +486,10 @@ groups: # Committed Resource Syncer Alerts - alert: CortexNovaCommittedResourceSyncerNotRunning - expr: increase(cortex_committed_resource_syncer_runs_total{service="cortex-nova-metrics"}[2h]) == 0 + expr: | + increase(cortex_committed_resource_syncer_runs_total{service="cortex-nova-metrics"}[2h]) == 0 + or + absent(cortex_committed_resource_syncer_runs_total{service="cortex-nova-metrics"}) for: 5m labels: context: committed-resource-syncer @@ -497,8 +500,10 @@ groups: annotations: summary: "Committed Resource syncer not running" description: > - The committed resource syncer has not run in the last 2 hours. This indicates - that the syncer may have stopped or is encountering errors. Check the syncer logs for errors. + The committed resource syncer has not run in the last 2 hours or the metric is missing. + This indicates that the syncer may have stopped, is encountering errors, or the feature + is not enabled. Check the syncer logs for errors or verify the commitments-sync-task is + in the enabledTasks configuration. - alert: CortexNovaCommittedResourceSyncerErrorsHigh expr: increase(cortex_committed_resource_syncer_errors_total{service="cortex-nova-metrics"}[1h]) > 3 @@ -517,8 +522,11 @@ groups: - alert: CortexNovaCommittedResourceSyncerUnitMismatchRateHigh expr: | - rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unit_mismatch"}[1h]) - / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0.05 + ( + rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unit_mismatch"}[1h]) + / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) + ) > 0.05 + and on() rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0 for: 15m labels: context: committed-resource-syncer @@ -537,8 +545,11 @@ groups: - alert: CortexNovaCommittedResourceSyncerUnknownFlavorGroupRateHigh expr: | - rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unknown_flavor_group"}[1h]) - / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0 + ( + rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unknown_flavor_group"}[1h]) + / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) + ) > 0 + and on() rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0 for: 15m labels: context: committed-resource-syncer @@ -557,10 +568,13 @@ groups: - alert: CortexNovaCommittedResourceSyncerLocalChangeRateHigh expr: | ( - rate(cortex_committed_resource_syncer_reservations_created_total{service="cortex-nova-metrics"}[1h]) + - rate(cortex_committed_resource_syncer_reservations_deleted_total{service="cortex-nova-metrics"}[1h]) + - rate(cortex_committed_resource_syncer_reservations_repaired_total{service="cortex-nova-metrics"}[1h]) - ) / rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) > 0.01 + ( + rate(cortex_committed_resource_syncer_reservations_created_total{service="cortex-nova-metrics"}[1h]) + + rate(cortex_committed_resource_syncer_reservations_deleted_total{service="cortex-nova-metrics"}[1h]) + + rate(cortex_committed_resource_syncer_reservations_repaired_total{service="cortex-nova-metrics"}[1h]) + ) / rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) + ) > 0.01 + and on() rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) > 0 for: 15m labels: context: committed-resource-syncer @@ -578,8 +592,11 @@ groups: - alert: CortexNovaCommittedResourceSyncerRepairRateHigh expr: | - rate(cortex_committed_resource_syncer_reservations_repaired_total{service="cortex-nova-metrics"}[1h]) - / rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) > 0 + ( + rate(cortex_committed_resource_syncer_reservations_repaired_total{service="cortex-nova-metrics"}[1h]) + / rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) + ) > 0 + and on() rate(cortex_committed_resource_syncer_commitments_processed_total{service="cortex-nova-metrics"}[1h]) > 0 for: 15m labels: context: committed-resource-syncer From b8f55a1e6a630a56762b3cb2990d766ee3b1e970 Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 16:06:11 +0100 Subject: [PATCH 4/6] allocation bookeeping --- .../cortex-nova/templates/pipelines_kvm.yaml | 28 ++ .../filter_committed_resource_bookkeeping.go | 273 ++++++++++++++++++ .../filters/filter_has_enough_capacity.go | 1 + 3 files changed, 302 insertions(+) create mode 100644 internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go diff --git a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml index 8b2519207..d9bd2004d 100644 --- a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml @@ -102,6 +102,20 @@ spec: from the nova scheduler request spec. It supports filtering by host and by aggregates. Aggregates use AND logic between list elements, with comma-separated UUIDs within an element using OR logic. + - name: filter_committed_resource_bookkeeping + description: | + Bookkeeping for committed resource (CR) reservations. Note that unlocking + of CR capacity happens in filter_has_enough_capacity when project ID and + resource group (hw_version) match. This filter handles additional tasks: + tracking which VMs are expected to land on which CR reservations by + updating reservation spec allocations. In the future, this filter will + also enforce that VMs use available CR reservation slots when sufficient + slots exist among candidates. + params: + # Enable updating CR reservation allocations with VM assignments + - {key: updateReservationAllocations, boolValue: true} + # Future: enforce reservation slots (not yet implemented) + - {key: enforceReservationSlots, boolValue: false} weighers: - name: kvm_prefer_smaller_hosts params: @@ -241,6 +255,20 @@ spec: from the nova scheduler request spec. It supports filtering by host and by aggregates. Aggregates use AND logic between list elements, with comma-separated UUIDs within an element using OR logic. + - name: filter_committed_resource_bookkeeping + description: | + Bookkeeping for committed resource (CR) reservations. Note that unlocking + of CR capacity happens in filter_has_enough_capacity when project ID and + resource group (hw_version) match. This filter handles additional tasks: + tracking which VMs are expected to land on which CR reservations by + updating reservation spec allocations. In the future, this filter will + also enforce that VMs use available CR reservation slots when sufficient + slots exist among candidates. + params: + # Enable updating CR reservation allocations with VM assignments + - {key: updateReservationAllocations, boolValue: true} + # Future: enforce reservation slots (not yet implemented) + - {key: enforceReservationSlots, boolValue: false} weighers: - name: kvm_prefer_smaller_hosts params: diff --git a/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go b/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go new file mode 100644 index 000000000..c628fd434 --- /dev/null +++ b/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go @@ -0,0 +1,273 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package filters + +import ( + "context" + "log/slog" + "time" + + api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type FilterCommittedResourceBookkeepingOpts struct { + // UpdateReservationAllocations enables adding VMs to CR reservation spec allocations + // when a matching reservation's host is among the candidates. + // This tracks which VMs are expected to land on which reservations. + // Default: false + UpdateReservationAllocations bool `json:"updateReservationAllocations,omitempty"` + + // EnforceReservationSlots controls whether candidates should be filtered to only + // hosts with available CR reservation slots when enough slots exist. + // When true and sufficient reservation slots are available among candidates, + // non-reservation hosts are filtered out. + // Default: false (not yet implemented) + EnforceReservationSlots bool `json:"enforceReservationSlots,omitempty"` +} + +func (FilterCommittedResourceBookkeepingOpts) Validate() error { return nil } + +type FilterCommittedResourceBookkeeping struct { + lib.BaseFilter[api.ExternalSchedulerRequest, FilterCommittedResourceBookkeepingOpts] +} + +// Filter for committed resource reservation bookkeeping. +// +// Note: Unlocking of CR reservation capacity happens in filter_has_enough_capacity +// when project ID and resource group (hw_version) match. This filter handles +// additional bookkeeping tasks: +// +// 1. UpdateReservationAllocations: Adds VMs to matching reservation spec allocations +// to track which VMs are expected to use which reservations +// 2. EnforceReservationSlots: (Future) Filters candidates to reservation hosts +// when sufficient slots exist +// +// This filter should run AFTER filter_has_enough_capacity to ensure candidates +// have already been filtered based on physical capacity. +func (s *FilterCommittedResourceBookkeeping) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) { + result := s.IncludeAllHostsFromRequest(request) + + // Skip if no features enabled + if !s.Options.UpdateReservationAllocations && !s.Options.EnforceReservationSlots { + return result, nil + } + + // Get request details + projectID := request.Spec.Data.ProjectID + resourceGroup := request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"] + instanceUUID := request.Spec.Data.InstanceUUID + + if projectID == "" || resourceGroup == "" { + traceLog.Debug("skipping CR reservation handling: missing projectID or resourceGroup") + return result, nil + } + + // List all reservations + var reservations v1alpha1.ReservationList + if err := s.Client.List(context.Background(), &reservations); err != nil { + return nil, err + } + + // Find matching CR reservations with spare capacity + var matchingReservations []v1alpha1.Reservation + for _, reservation := range reservations.Items { + if !s.isMatchingCRReservation(traceLog, reservation, projectID, resourceGroup, request) { + continue + } + matchingReservations = append(matchingReservations, reservation) + } + + traceLog.Debug("found matching CR reservations", + "count", len(matchingReservations), + "projectID", projectID, + "resourceGroup", resourceGroup) + + // Update reservation allocations if enabled + if s.Options.UpdateReservationAllocations && len(matchingReservations) > 0 && instanceUUID != "" { + s.updateReservationAllocations(traceLog, request, result.Activations, matchingReservations) + } + + // TODO: Implement EnforceReservationSlots logic + // When enabled, filter candidates to only hosts with reservation slots if sufficient slots exist + + return result, nil +} + +// isMatchingCRReservation checks if a reservation is a matching CR reservation with spare capacity. +func (s *FilterCommittedResourceBookkeeping) isMatchingCRReservation( + traceLog *slog.Logger, + reservation v1alpha1.Reservation, + projectID, resourceGroup string, + request api.ExternalSchedulerRequest, +) bool { + // Must be Ready + if !meta.IsStatusConditionTrue(reservation.Status.Conditions, v1alpha1.ReservationConditionReady) { + return false + } + + // Must be a CR reservation + if reservation.Spec.Type != v1alpha1.ReservationTypeCommittedResource { + return false + } + + // Must have CR spec + if reservation.Spec.CommittedResourceReservation == nil { + return false + } + + // Must match project and resource group + if reservation.Spec.CommittedResourceReservation.ProjectID != projectID { + return false + } + if reservation.Spec.CommittedResourceReservation.ResourceGroup != resourceGroup { + return false + } + + // Must have a host + if reservation.Spec.TargetHost == "" && reservation.Status.Host == "" { + return false + } + + // Must have spare capacity + if !s.hasSpareCapacity(traceLog, reservation, request) { + return false + } + + return true +} + +// hasSpareCapacity checks if the reservation has enough spare capacity for the VM. +func (s *FilterCommittedResourceBookkeeping) hasSpareCapacity( + traceLog *slog.Logger, + reservation v1alpha1.Reservation, + request api.ExternalSchedulerRequest, +) bool { + // Calculate current usage from existing allocations + var usedCPU, usedMemory int64 + if reservation.Spec.CommittedResourceReservation != nil { + for _, allocation := range reservation.Spec.CommittedResourceReservation.Allocations { + if cpu, ok := allocation.Resources["cpu"]; ok { + usedCPU += cpu.Value() + } + if memory, ok := allocation.Resources["memory"]; ok { + usedMemory += memory.Value() + } + } + } + + // Get reservation's total capacity + var totalCPU, totalMemory int64 + if cpu, ok := reservation.Spec.Resources["cpu"]; ok { + totalCPU = cpu.Value() + } + if memory, ok := reservation.Spec.Resources["memory"]; ok { + totalMemory = memory.Value() + } + + // Calculate requested resources + //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk + requestedCPU := int64(request.Spec.Data.Flavor.Data.VCPUs) + //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk + requestedMemory := int64(request.Spec.Data.Flavor.Data.MemoryMB) * 1_000_000 // Convert MB to bytes + + // Check if there's enough spare capacity + spareCPU := totalCPU - usedCPU + spareMemory := totalMemory - usedMemory + + if spareCPU < requestedCPU || spareMemory < requestedMemory { + traceLog.Debug("reservation has insufficient spare capacity", + "reservation", reservation.Name, + "spareCPU", spareCPU, + "spareMemory", spareMemory, + "requestedCPU", requestedCPU, + "requestedMemory", requestedMemory) + return false + } + + return true +} + +// updateReservationAllocations adds the VM to the spec allocations of matching CR reservations +// whose host is among the candidates. +func (s *FilterCommittedResourceBookkeeping) updateReservationAllocations( + traceLog *slog.Logger, + request api.ExternalSchedulerRequest, + candidates map[string]float64, + matchingReservations []v1alpha1.Reservation, +) { + + instanceUUID := request.Spec.Data.InstanceUUID + if instanceUUID == "" { + traceLog.Warn("skipping reservation allocation update: no instance UUID in request") + return + } + + // Build resources from flavor + //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk + vmResources := map[hv1.ResourceName]resource.Quantity{ + "cpu": *resource.NewQuantity(int64(request.Spec.Data.Flavor.Data.VCPUs), resource.DecimalSI), + "memory": *resource.NewQuantity(int64(request.Spec.Data.Flavor.Data.MemoryMB)*1_000_000, resource.DecimalSI), + } + + now := metav1.NewTime(time.Now()) + + for _, reservation := range matchingReservations { + // Get reservation host + reservationHost := reservation.Spec.TargetHost + if reservationHost == "" { + reservationHost = reservation.Status.Host + } + + // Check if reservation's host is among candidates + if _, isCandidate := candidates[reservationHost]; !isCandidate { + traceLog.Debug("skipping reservation allocation: host not among candidates", + "reservation", reservation.Name, + "host", reservationHost) + continue + } + + // Check if VM is already in allocations + if reservation.Spec.CommittedResourceReservation.Allocations != nil { + if _, exists := reservation.Spec.CommittedResourceReservation.Allocations[instanceUUID]; exists { + traceLog.Debug("VM already in reservation allocations", + "reservation", reservation.Name, + "instanceUUID", instanceUUID) + continue + } + } + + // Add VM to reservation allocations + reservationCopy := reservation.DeepCopy() + if reservationCopy.Spec.CommittedResourceReservation.Allocations == nil { + reservationCopy.Spec.CommittedResourceReservation.Allocations = make(map[string]v1alpha1.CommittedResourceAllocation) + } + reservationCopy.Spec.CommittedResourceReservation.Allocations[instanceUUID] = v1alpha1.CommittedResourceAllocation{ + CreationTimestamp: now, + Resources: vmResources, + } + + if err := s.Client.Update(context.Background(), reservationCopy); err != nil { + traceLog.Warn("failed to update reservation with VM allocation", + "reservation", reservation.Name, + "instanceUUID", instanceUUID, + "error", err) + // Continue with other reservations - this is best-effort + } else { + traceLog.Info("added VM to CR reservation spec allocations", + "reservation", reservation.Name, + "instanceUUID", instanceUUID, + "host", reservationHost) + } + } +} + +func init() { + Index["filter_committed_resource_bookkeeping"] = func() NovaFilter { return &FilterCommittedResourceBookkeeping{} } +} diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index d26c7c940..194270b19 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -329,6 +329,7 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa ) } } + return result, nil } From bd25b2e4c21acf7fddd7b382911546e126fb66f7 Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 16:09:00 +0100 Subject: [PATCH 5/6] fix alerts --- helm/bundles/cortex-nova/alerts/nova.alerts.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml index 4b5bba931..654adbe1c 100644 --- a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml +++ b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml @@ -350,7 +350,7 @@ groups: - alert: CortexNovaCommittedResourceRejectionRateTooHigh expr: | - rate(cortex_committed_resource_change_api_commitment_changes_total{service="cortex-nova-metrics", result="rejected"}[5m]) + sum(rate(cortex_committed_resource_change_api_commitment_changes_total{service="cortex-nova-metrics", result="rejected"}[5m])) / sum(rate(cortex_committed_resource_change_api_commitment_changes_total{service="cortex-nova-metrics"}[5m])) > 0.5 for: 5m labels: @@ -523,10 +523,10 @@ groups: - alert: CortexNovaCommittedResourceSyncerUnitMismatchRateHigh expr: | ( - rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unit_mismatch"}[1h]) - / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) + sum(rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unit_mismatch"}[1h])) + / sum(rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h])) ) > 0.05 - and on() rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0 + and on() sum(rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h])) > 0 for: 15m labels: context: committed-resource-syncer @@ -546,10 +546,10 @@ groups: - alert: CortexNovaCommittedResourceSyncerUnknownFlavorGroupRateHigh expr: | ( - rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unknown_flavor_group"}[1h]) - / rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) + sum(rate(cortex_committed_resource_syncer_commitments_skipped_total{service="cortex-nova-metrics", reason="unknown_flavor_group"}[1h])) + / sum(rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h])) ) > 0 - and on() rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h]) > 0 + and on() sum(rate(cortex_committed_resource_syncer_commitments_total{service="cortex-nova-metrics"}[1h])) > 0 for: 15m labels: context: committed-resource-syncer From 79f269b2719cb2c0948cca9f21fb62dc7f0f76a7 Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 16:13:02 +0100 Subject: [PATCH 6/6] Revert "allocation bookeeping" This reverts commit b8f55a1e6a630a56762b3cb2990d766ee3b1e970. --- .../cortex-nova/templates/pipelines_kvm.yaml | 28 -- .../filter_committed_resource_bookkeeping.go | 273 ------------------ .../filters/filter_has_enough_capacity.go | 1 - 3 files changed, 302 deletions(-) delete mode 100644 internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go diff --git a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml index d9bd2004d..8b2519207 100644 --- a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml @@ -102,20 +102,6 @@ spec: from the nova scheduler request spec. It supports filtering by host and by aggregates. Aggregates use AND logic between list elements, with comma-separated UUIDs within an element using OR logic. - - name: filter_committed_resource_bookkeeping - description: | - Bookkeeping for committed resource (CR) reservations. Note that unlocking - of CR capacity happens in filter_has_enough_capacity when project ID and - resource group (hw_version) match. This filter handles additional tasks: - tracking which VMs are expected to land on which CR reservations by - updating reservation spec allocations. In the future, this filter will - also enforce that VMs use available CR reservation slots when sufficient - slots exist among candidates. - params: - # Enable updating CR reservation allocations with VM assignments - - {key: updateReservationAllocations, boolValue: true} - # Future: enforce reservation slots (not yet implemented) - - {key: enforceReservationSlots, boolValue: false} weighers: - name: kvm_prefer_smaller_hosts params: @@ -255,20 +241,6 @@ spec: from the nova scheduler request spec. It supports filtering by host and by aggregates. Aggregates use AND logic between list elements, with comma-separated UUIDs within an element using OR logic. - - name: filter_committed_resource_bookkeeping - description: | - Bookkeeping for committed resource (CR) reservations. Note that unlocking - of CR capacity happens in filter_has_enough_capacity when project ID and - resource group (hw_version) match. This filter handles additional tasks: - tracking which VMs are expected to land on which CR reservations by - updating reservation spec allocations. In the future, this filter will - also enforce that VMs use available CR reservation slots when sufficient - slots exist among candidates. - params: - # Enable updating CR reservation allocations with VM assignments - - {key: updateReservationAllocations, boolValue: true} - # Future: enforce reservation slots (not yet implemented) - - {key: enforceReservationSlots, boolValue: false} weighers: - name: kvm_prefer_smaller_hosts params: diff --git a/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go b/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go deleted file mode 100644 index c628fd434..000000000 --- a/internal/scheduling/nova/plugins/filters/filter_committed_resource_bookkeeping.go +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright SAP SE -// SPDX-License-Identifier: Apache-2.0 - -package filters - -import ( - "context" - "log/slog" - "time" - - api "github.com/cobaltcore-dev/cortex/api/external/nova" - "github.com/cobaltcore-dev/cortex/api/v1alpha1" - "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" - hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -type FilterCommittedResourceBookkeepingOpts struct { - // UpdateReservationAllocations enables adding VMs to CR reservation spec allocations - // when a matching reservation's host is among the candidates. - // This tracks which VMs are expected to land on which reservations. - // Default: false - UpdateReservationAllocations bool `json:"updateReservationAllocations,omitempty"` - - // EnforceReservationSlots controls whether candidates should be filtered to only - // hosts with available CR reservation slots when enough slots exist. - // When true and sufficient reservation slots are available among candidates, - // non-reservation hosts are filtered out. - // Default: false (not yet implemented) - EnforceReservationSlots bool `json:"enforceReservationSlots,omitempty"` -} - -func (FilterCommittedResourceBookkeepingOpts) Validate() error { return nil } - -type FilterCommittedResourceBookkeeping struct { - lib.BaseFilter[api.ExternalSchedulerRequest, FilterCommittedResourceBookkeepingOpts] -} - -// Filter for committed resource reservation bookkeeping. -// -// Note: Unlocking of CR reservation capacity happens in filter_has_enough_capacity -// when project ID and resource group (hw_version) match. This filter handles -// additional bookkeeping tasks: -// -// 1. UpdateReservationAllocations: Adds VMs to matching reservation spec allocations -// to track which VMs are expected to use which reservations -// 2. EnforceReservationSlots: (Future) Filters candidates to reservation hosts -// when sufficient slots exist -// -// This filter should run AFTER filter_has_enough_capacity to ensure candidates -// have already been filtered based on physical capacity. -func (s *FilterCommittedResourceBookkeeping) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) { - result := s.IncludeAllHostsFromRequest(request) - - // Skip if no features enabled - if !s.Options.UpdateReservationAllocations && !s.Options.EnforceReservationSlots { - return result, nil - } - - // Get request details - projectID := request.Spec.Data.ProjectID - resourceGroup := request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"] - instanceUUID := request.Spec.Data.InstanceUUID - - if projectID == "" || resourceGroup == "" { - traceLog.Debug("skipping CR reservation handling: missing projectID or resourceGroup") - return result, nil - } - - // List all reservations - var reservations v1alpha1.ReservationList - if err := s.Client.List(context.Background(), &reservations); err != nil { - return nil, err - } - - // Find matching CR reservations with spare capacity - var matchingReservations []v1alpha1.Reservation - for _, reservation := range reservations.Items { - if !s.isMatchingCRReservation(traceLog, reservation, projectID, resourceGroup, request) { - continue - } - matchingReservations = append(matchingReservations, reservation) - } - - traceLog.Debug("found matching CR reservations", - "count", len(matchingReservations), - "projectID", projectID, - "resourceGroup", resourceGroup) - - // Update reservation allocations if enabled - if s.Options.UpdateReservationAllocations && len(matchingReservations) > 0 && instanceUUID != "" { - s.updateReservationAllocations(traceLog, request, result.Activations, matchingReservations) - } - - // TODO: Implement EnforceReservationSlots logic - // When enabled, filter candidates to only hosts with reservation slots if sufficient slots exist - - return result, nil -} - -// isMatchingCRReservation checks if a reservation is a matching CR reservation with spare capacity. -func (s *FilterCommittedResourceBookkeeping) isMatchingCRReservation( - traceLog *slog.Logger, - reservation v1alpha1.Reservation, - projectID, resourceGroup string, - request api.ExternalSchedulerRequest, -) bool { - // Must be Ready - if !meta.IsStatusConditionTrue(reservation.Status.Conditions, v1alpha1.ReservationConditionReady) { - return false - } - - // Must be a CR reservation - if reservation.Spec.Type != v1alpha1.ReservationTypeCommittedResource { - return false - } - - // Must have CR spec - if reservation.Spec.CommittedResourceReservation == nil { - return false - } - - // Must match project and resource group - if reservation.Spec.CommittedResourceReservation.ProjectID != projectID { - return false - } - if reservation.Spec.CommittedResourceReservation.ResourceGroup != resourceGroup { - return false - } - - // Must have a host - if reservation.Spec.TargetHost == "" && reservation.Status.Host == "" { - return false - } - - // Must have spare capacity - if !s.hasSpareCapacity(traceLog, reservation, request) { - return false - } - - return true -} - -// hasSpareCapacity checks if the reservation has enough spare capacity for the VM. -func (s *FilterCommittedResourceBookkeeping) hasSpareCapacity( - traceLog *slog.Logger, - reservation v1alpha1.Reservation, - request api.ExternalSchedulerRequest, -) bool { - // Calculate current usage from existing allocations - var usedCPU, usedMemory int64 - if reservation.Spec.CommittedResourceReservation != nil { - for _, allocation := range reservation.Spec.CommittedResourceReservation.Allocations { - if cpu, ok := allocation.Resources["cpu"]; ok { - usedCPU += cpu.Value() - } - if memory, ok := allocation.Resources["memory"]; ok { - usedMemory += memory.Value() - } - } - } - - // Get reservation's total capacity - var totalCPU, totalMemory int64 - if cpu, ok := reservation.Spec.Resources["cpu"]; ok { - totalCPU = cpu.Value() - } - if memory, ok := reservation.Spec.Resources["memory"]; ok { - totalMemory = memory.Value() - } - - // Calculate requested resources - //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk - requestedCPU := int64(request.Spec.Data.Flavor.Data.VCPUs) - //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk - requestedMemory := int64(request.Spec.Data.Flavor.Data.MemoryMB) * 1_000_000 // Convert MB to bytes - - // Check if there's enough spare capacity - spareCPU := totalCPU - usedCPU - spareMemory := totalMemory - usedMemory - - if spareCPU < requestedCPU || spareMemory < requestedMemory { - traceLog.Debug("reservation has insufficient spare capacity", - "reservation", reservation.Name, - "spareCPU", spareCPU, - "spareMemory", spareMemory, - "requestedCPU", requestedCPU, - "requestedMemory", requestedMemory) - return false - } - - return true -} - -// updateReservationAllocations adds the VM to the spec allocations of matching CR reservations -// whose host is among the candidates. -func (s *FilterCommittedResourceBookkeeping) updateReservationAllocations( - traceLog *slog.Logger, - request api.ExternalSchedulerRequest, - candidates map[string]float64, - matchingReservations []v1alpha1.Reservation, -) { - - instanceUUID := request.Spec.Data.InstanceUUID - if instanceUUID == "" { - traceLog.Warn("skipping reservation allocation update: no instance UUID in request") - return - } - - // Build resources from flavor - //nolint:gosec // VCPUs and MemoryMB are bounded by OpenStack limits, no overflow risk - vmResources := map[hv1.ResourceName]resource.Quantity{ - "cpu": *resource.NewQuantity(int64(request.Spec.Data.Flavor.Data.VCPUs), resource.DecimalSI), - "memory": *resource.NewQuantity(int64(request.Spec.Data.Flavor.Data.MemoryMB)*1_000_000, resource.DecimalSI), - } - - now := metav1.NewTime(time.Now()) - - for _, reservation := range matchingReservations { - // Get reservation host - reservationHost := reservation.Spec.TargetHost - if reservationHost == "" { - reservationHost = reservation.Status.Host - } - - // Check if reservation's host is among candidates - if _, isCandidate := candidates[reservationHost]; !isCandidate { - traceLog.Debug("skipping reservation allocation: host not among candidates", - "reservation", reservation.Name, - "host", reservationHost) - continue - } - - // Check if VM is already in allocations - if reservation.Spec.CommittedResourceReservation.Allocations != nil { - if _, exists := reservation.Spec.CommittedResourceReservation.Allocations[instanceUUID]; exists { - traceLog.Debug("VM already in reservation allocations", - "reservation", reservation.Name, - "instanceUUID", instanceUUID) - continue - } - } - - // Add VM to reservation allocations - reservationCopy := reservation.DeepCopy() - if reservationCopy.Spec.CommittedResourceReservation.Allocations == nil { - reservationCopy.Spec.CommittedResourceReservation.Allocations = make(map[string]v1alpha1.CommittedResourceAllocation) - } - reservationCopy.Spec.CommittedResourceReservation.Allocations[instanceUUID] = v1alpha1.CommittedResourceAllocation{ - CreationTimestamp: now, - Resources: vmResources, - } - - if err := s.Client.Update(context.Background(), reservationCopy); err != nil { - traceLog.Warn("failed to update reservation with VM allocation", - "reservation", reservation.Name, - "instanceUUID", instanceUUID, - "error", err) - // Continue with other reservations - this is best-effort - } else { - traceLog.Info("added VM to CR reservation spec allocations", - "reservation", reservation.Name, - "instanceUUID", instanceUUID, - "host", reservationHost) - } - } -} - -func init() { - Index["filter_committed_resource_bookkeeping"] = func() NovaFilter { return &FilterCommittedResourceBookkeeping{} } -} diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index 194270b19..d26c7c940 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -329,7 +329,6 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa ) } } - return result, nil }