diff --git a/internal/scheduling/reservations/commitments/config.go b/internal/scheduling/reservations/commitments/config.go index 7a6c9005f..17d89c8c0 100644 --- a/internal/scheduling/reservations/commitments/config.go +++ b/internal/scheduling/reservations/commitments/config.go @@ -11,10 +11,17 @@ import ( type Config struct { - // RequeueIntervalActive is the interval for requeueing active reservations for verification. + // RequeueIntervalActive is the interval for requeueing active reservations for periodic verification. RequeueIntervalActive time.Duration `json:"committedResourceRequeueIntervalActive"` // RequeueIntervalRetry is the interval for requeueing when retrying after knowledge is not ready. RequeueIntervalRetry time.Duration `json:"committedResourceRequeueIntervalRetry"` + // AllocationGracePeriod is the time window after a VM is allocated to a reservation + // during which it's expected to appear on the target host. VMs not confirmed within + // this period are considered stale and removed from the reservation. + AllocationGracePeriod time.Duration `json:"committedResourceAllocationGracePeriod"` + // RequeueIntervalGracePeriod is the interval for requeueing when VMs are in grace period. + // Shorter than RequeueIntervalActive for faster verification of new allocations. + RequeueIntervalGracePeriod time.Duration `json:"committedResourceRequeueIntervalGracePeriod"` // PipelineDefault is the default pipeline used for scheduling committed resource reservations. PipelineDefault string `json:"committedResourcePipelineDefault"` @@ -68,6 +75,12 @@ func (c *Config) ApplyDefaults() { if c.RequeueIntervalRetry == 0 { c.RequeueIntervalRetry = defaults.RequeueIntervalRetry } + if c.RequeueIntervalGracePeriod == 0 { + c.RequeueIntervalGracePeriod = defaults.RequeueIntervalGracePeriod + } + if c.AllocationGracePeriod == 0 { + c.AllocationGracePeriod = defaults.AllocationGracePeriod + } if c.PipelineDefault == "" { c.PipelineDefault = defaults.PipelineDefault } @@ -88,6 +101,8 @@ func DefaultConfig() Config { return Config{ RequeueIntervalActive: 5 * time.Minute, RequeueIntervalRetry: 1 * time.Minute, + RequeueIntervalGracePeriod: 1 * time.Minute, + AllocationGracePeriod: 15 * time.Minute, PipelineDefault: "kvm-general-purpose-load-balancing", SchedulerURL: "http://localhost:8080/scheduler/nova/external", ChangeAPIWatchReservationsTimeout: 10 * time.Second, diff --git a/internal/scheduling/reservations/commitments/controller.go b/internal/scheduling/reservations/commitments/controller.go index 9c238aeee..0560313b3 100644 --- a/internal/scheduling/reservations/commitments/controller.go +++ b/internal/scheduling/reservations/commitments/controller.go @@ -23,9 +23,9 @@ import ( schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" - "github.com/cobaltcore-dev/cortex/internal/knowledge/datasources/plugins/openstack/nova" "github.com/cobaltcore-dev/cortex/internal/knowledge/db" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + schedulingnova "github.com/cobaltcore-dev/cortex/internal/scheduling/nova" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/cobaltcore-dev/cortex/pkg/multicluster" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -44,6 +44,8 @@ type CommitmentReservationController struct { DB *db.DB // SchedulerClient for making scheduler API calls. SchedulerClient *reservations.SchedulerClient + // NovaClient for direct Nova API calls (real-time VM status). + NovaClient schedulingnova.NovaClient } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -96,13 +98,18 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr if meta.IsStatusConditionTrue(res.Status.Conditions, v1alpha1.ReservationConditionReady) { logger.V(1).Info("reservation is active, verifying allocations") - // Verify all allocations in Spec against actual VM state from database - if err := r.reconcileAllocations(ctx, &res); err != nil { + // Verify all allocations in Spec against actual VM state + result, err := r.reconcileAllocations(ctx, &res) + if err != nil { logger.Error(err, "failed to reconcile allocations") return ctrl.Result{}, err } - // Requeue periodically to keep verifying allocations + // Requeue with appropriate interval based on allocation state + // Use shorter interval if there are allocations in grace period for faster verification + if result.HasAllocationsInGracePeriod { + return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalGracePeriod}, nil + } return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalActive}, nil } @@ -322,82 +329,203 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, nil } -// reconcileAllocations verifies all allocations in Spec against actual Nova VM state. +// reconcileAllocationsResult holds the outcome of allocation reconciliation. +type reconcileAllocationsResult struct { + // HasAllocationsInGracePeriod is true if any allocations are still in grace period. + HasAllocationsInGracePeriod bool +} + +// reconcileAllocations verifies all allocations in Spec against actual VM state. // It updates Status.Allocations based on the actual host location of each VM. -func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) error { +// For new allocations (within grace period), it uses the Nova API for real-time status. +// For older allocations, it uses the Hypervisor CRD to check if VM is on the expected host. +func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) (*reconcileAllocationsResult, error) { logger := LoggerFromContext(ctx).WithValues("component", "controller") + result := &reconcileAllocationsResult{} + now := time.Now() // Skip if no CommittedResourceReservation if res.Spec.CommittedResourceReservation == nil { - return nil + return result, nil } - // TODO trigger migrations of unused reservations (to PAYG VMs) - // Skip if no allocations to verify if len(res.Spec.CommittedResourceReservation.Allocations) == 0 { logger.V(1).Info("no allocations to verify", "reservation", res.Name) - return nil + return result, nil } - // Query all VMs for this project from the database - projectID := res.Spec.CommittedResourceReservation.ProjectID - serverMap, err := r.listServersByProjectID(ctx, projectID) - if err != nil { - return fmt.Errorf("failed to list servers for project %s: %w", projectID, err) + expectedHost := res.Status.Host + + // Fetch the Hypervisor CRD for the expected host (for older allocations) + var hypervisor hv1.Hypervisor + hvInstanceSet := make(map[string]bool) + if expectedHost != "" { + if err := r.Get(ctx, client.ObjectKey{Name: expectedHost}, &hypervisor); err != nil { + if client.IgnoreNotFound(err) != nil { + return nil, fmt.Errorf("failed to get hypervisor %s: %w", expectedHost, err) + } + // Hypervisor not found - all older allocations will be checked via Nova API fallback + logger.Info("hypervisor CRD not found", "host", expectedHost) + } else { + // Build set of all VM UUIDs on this hypervisor for O(1) lookup + // Include both active and inactive VMs - stopped/shelved VMs still consume the reservation slot + for _, inst := range hypervisor.Status.Instances { + hvInstanceSet[inst.ID] = true + } + logger.V(1).Info("fetched hypervisor instances", "host", expectedHost, "instanceCount", len(hvInstanceSet)) + } } - // initialize + // Initialize status if res.Status.CommittedResourceReservation == nil { res.Status.CommittedResourceReservation = &v1alpha1.CommittedResourceReservationStatus{} } // Build new Status.Allocations map based on actual VM locations newStatusAllocations := make(map[string]string) + // Track allocations to remove from Spec (stale/leaving VMs) + var allocationsToRemove []string + + for vmUUID, allocation := range res.Spec.CommittedResourceReservation.Allocations { + allocationAge := now.Sub(allocation.CreationTimestamp.Time) + isInGracePeriod := allocationAge < r.Conf.AllocationGracePeriod + + if isInGracePeriod { + // New allocation: use Nova API for real-time status + result.HasAllocationsInGracePeriod = true + + if r.NovaClient == nil { + // No Nova client - skip verification for now, retry later + logger.V(1).Info("Nova client not available, skipping new allocation verification", + "vm", vmUUID, + "allocationAge", allocationAge) + continue + } - for vmUUID := range res.Spec.CommittedResourceReservation.Allocations { - server, exists := serverMap[vmUUID] - if exists { - // VM found - record its actual host location - actualHost := server.OSEXTSRVATTRHost - newStatusAllocations[vmUUID] = actualHost - - logger.V(1).Info("verified VM allocation", - "vm", vmUUID, - "actualHost", actualHost, - "expectedHost", res.Status.Host) - } else { - // VM not found in database - logger.Info("VM not found in database", - "vm", vmUUID, - "reservation", res.Name, - "projectID", projectID) + server, err := r.NovaClient.Get(ctx, vmUUID) + if err != nil { + // VM not yet available in Nova (still spawning) - retry on next reconcile + logger.V(1).Info("VM not yet available in Nova API", + "vm", vmUUID, + "error", err.Error(), + "allocationAge", allocationAge) + // Keep in Spec, don't add to Status - will retry on next reconcile + continue + } - // TODO handle entering and leave event + actualHost := server.ComputeHost + switch { + case actualHost == expectedHost: + // VM is on expected host - confirmed running + newStatusAllocations[vmUUID] = actualHost + logger.V(1).Info("verified new VM allocation via Nova API", + "vm", vmUUID, + "actualHost", actualHost, + "allocationAge", allocationAge) + case actualHost != "": + // VM is on different host - migration scenario (log for now) + newStatusAllocations[vmUUID] = actualHost + logger.Info("VM on different host than expected (migration?)", + "vm", vmUUID, + "actualHost", actualHost, + "expectedHost", expectedHost, + "allocationAge", allocationAge) + default: + // VM not yet on any host - still spawning + logger.V(1).Info("VM not yet on host (spawning)", + "vm", vmUUID, + "status", server.Status, + "allocationAge", allocationAge) + // Keep in Spec, don't add to Status - will retry on next reconcile + } + } else { + // Older allocation: use Hypervisor CRD for verification + if hvInstanceSet[vmUUID] { + // VM found on expected hypervisor - confirmed running + newStatusAllocations[vmUUID] = expectedHost + logger.V(1).Info("verified VM allocation via Hypervisor CRD", + "vm", vmUUID, + "host", expectedHost) + } else { + // VM not found on expected hypervisor - check Nova API as fallback + if r.NovaClient != nil { + novaServer, err := r.NovaClient.Get(ctx, vmUUID) + if err == nil && novaServer.ComputeHost != "" { + // VM exists but on different host - migration or placement change + newStatusAllocations[vmUUID] = novaServer.ComputeHost + logger.Info("VM found via Nova API fallback (not on expected host)", + "vm", vmUUID, + "actualHost", novaServer.ComputeHost, + "expectedHost", expectedHost) + continue + } + // Nova API confirms VM doesn't exist or has no host + logger.V(1).Info("Nova API confirmed VM not found", + "vm", vmUUID, + "error", err) + } + // VM not found on hypervisor and not in Nova - mark for removal (leaving VM) + allocationsToRemove = append(allocationsToRemove, vmUUID) + logger.Info("removing stale allocation (VM not found on hypervisor or Nova)", + "vm", vmUUID, + "reservation", res.Name, + "expectedHost", expectedHost, + "allocationAge", allocationAge, + "gracePeriod", r.Conf.AllocationGracePeriod) + } } } - // Patch the reservation status + // Patch the reservation old := res.DeepCopy() + specChanged := false + + // Remove stale allocations from Spec + if len(allocationsToRemove) > 0 { + for _, vmUUID := range allocationsToRemove { + delete(res.Spec.CommittedResourceReservation.Allocations, vmUUID) + } + specChanged = true + } // Update Status.Allocations res.Status.CommittedResourceReservation.Allocations = newStatusAllocations + // Patch Spec if changed (stale allocations removed) + if specChanged { + if err := r.Patch(ctx, res, client.MergeFrom(old)); err != nil { + if client.IgnoreNotFound(err) == nil { + return result, nil + } + return nil, fmt.Errorf("failed to patch reservation spec: %w", err) + } + // Re-fetch to get the updated resource version for status patch + if err := r.Get(ctx, client.ObjectKeyFromObject(res), res); err != nil { + if client.IgnoreNotFound(err) == nil { + return result, nil + } + return nil, fmt.Errorf("failed to re-fetch reservation: %w", err) + } + old = res.DeepCopy() + } + + // Patch Status patch := client.MergeFrom(old) if err := r.Status().Patch(ctx, res, patch); err != nil { - // Ignore not-found errors during background deletion if client.IgnoreNotFound(err) == nil { - // Object was deleted, no need to continue - return nil + return result, nil } - return fmt.Errorf("failed to patch reservation status: %w", err) + return nil, fmt.Errorf("failed to patch reservation status: %w", err) } logger.V(1).Info("reconciled allocations", "specAllocations", len(res.Spec.CommittedResourceReservation.Allocations), - "statusAllocations", len(newStatusAllocations)) + "statusAllocations", len(newStatusAllocations), + "removedAllocations", len(allocationsToRemove), + "hasAllocationsInGracePeriod", result.HasAllocationsInGracePeriod) - return nil + return result, nil } // getPipelineForFlavorGroup returns the pipeline name for a given flavor group. @@ -432,36 +560,20 @@ func (r *CommitmentReservationController) Init(ctx context.Context, client clien r.SchedulerClient = reservations.NewSchedulerClient(conf.SchedulerURL) logf.FromContext(ctx).Info("scheduler client initialized for commitment reservation controller", "url", conf.SchedulerURL) - return nil -} - -func (r *CommitmentReservationController) listServersByProjectID(ctx context.Context, projectID string) (map[string]*nova.Server, error) { - if r.DB == nil { - return nil, errors.New("database connection not initialized") - } - - logger := LoggerFromContext(ctx).WithValues("component", "controller") - - // Query servers from the database cache. - var servers []nova.Server - _, err := r.DB.Select(&servers, - "SELECT * FROM openstack_servers WHERE tenant_id = $1", - projectID) - if err != nil { - return nil, fmt.Errorf("failed to query servers from database: %w", err) - } - - logger.V(1).Info("queried servers from database", - "projectID", projectID, - "serverCount", len(servers)) - - // Build lookup map for O(1) access by VM UUID. - serverMap := make(map[string]*nova.Server, len(servers)) - for i := range servers { - serverMap[servers[i].ID] = &servers[i] + // Initialize Nova client for real-time VM status checks (optional). + // Skip if NovaClient is already set (e.g., injected for testing) or if keystone not configured. + if r.NovaClient == nil && conf.KeystoneSecretRef.Name != "" { + r.NovaClient = schedulingnova.NewNovaClient() + if err := r.NovaClient.Init(ctx, client, schedulingnova.NovaClientConfig{ + KeystoneSecretRef: conf.KeystoneSecretRef, + SSOSecretRef: conf.SSOSecretRef, + }); err != nil { + return fmt.Errorf("failed to initialize Nova client: %w", err) + } + logf.FromContext(ctx).Info("Nova client initialized for commitment reservation controller") } - return serverMap, nil + return nil } // commitmentReservationPredicate filters to only watch commitment reservations. diff --git a/internal/scheduling/reservations/commitments/controller_test.go b/internal/scheduling/reservations/commitments/controller_test.go index 5af5dfca9..68d13faf3 100644 --- a/internal/scheduling/reservations/commitments/controller_test.go +++ b/internal/scheduling/reservations/commitments/controller_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" @@ -140,6 +141,225 @@ func TestCommitmentReservationController_Reconcile(t *testing.T) { } } +// ============================================================================ +// Test: reconcileAllocations +// ============================================================================ + +// Note: Full reconcileAllocations tests require mocking NovaClient, which uses +// unexported types (nova.server, nova.migration). Tests for the Nova API path +// would need to be placed in the nova package or the types would need to be exported. +// For now, we test only the Hypervisor CRD path (when NovaClient is nil). + +func TestReconcileAllocations_HypervisorCRDPath(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + if err := hv1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add hypervisor scheme: %v", err) + } + + now := time.Now() + recentTime := metav1.NewTime(now.Add(-5 * time.Minute)) // 5 minutes ago (within grace period) + oldTime := metav1.NewTime(now.Add(-30 * time.Minute)) // 30 minutes ago (past grace period) + + tests := []struct { + name string + reservation *v1alpha1.Reservation + hypervisor *hv1.Hypervisor + config Config + expectedStatusAllocations map[string]string + expectedHasGracePeriodAllocs bool + }{ + { + name: "old allocation - VM found on hypervisor CRD", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-1", Name: "vm-1", Active: true}, + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-1": "host-1"}, + expectedHasGracePeriodAllocs: false, + }, + { + name: "old allocation - inactive VM still counted (stopped/shelved)", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-stopped": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-stopped", Name: "vm-stopped", Active: false}, // Inactive VM should still be found + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-stopped": "host-1"}, + expectedHasGracePeriodAllocs: false, + }, + { + name: "old allocation - VM not on hypervisor CRD (no NovaClient fallback)", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": oldTime, + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{}), // Empty + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, // Not confirmed + expectedHasGracePeriodAllocs: false, + }, + { + name: "new allocation within grace period - no Nova client", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-1": recentTime, + }), + hypervisor: nil, + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, // Can't verify without Nova + expectedHasGracePeriodAllocs: true, + }, + { + name: "mixed allocations - old verified via CRD, new in grace period", + reservation: newTestCRReservation(map[string]metav1.Time{ + "vm-new": recentTime, // In grace period + "vm-old": oldTime, // Past grace period + }), + hypervisor: newTestHypervisorCRD("host-1", []hv1.Instance{ + {ID: "vm-old", Name: "vm-old", Active: true}, + }), + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{"vm-old": "host-1"}, // Only old one confirmed via CRD + expectedHasGracePeriodAllocs: true, + }, + { + name: "empty allocations - no work to do", + reservation: newTestCRReservation(map[string]metav1.Time{}), + hypervisor: nil, + config: Config{AllocationGracePeriod: 15 * time.Minute}, + expectedStatusAllocations: map[string]string{}, + expectedHasGracePeriodAllocs: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Build fake client with objects + objects := []client.Object{tt.reservation} + if tt.hypervisor != nil { + objects = append(objects, tt.hypervisor) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + WithStatusSubresource(&v1alpha1.Reservation{}). + Build() + + controller := &CommitmentReservationController{ + Client: k8sClient, + Scheme: scheme, + Conf: tt.config, + NovaClient: nil, // No NovaClient - testing Hypervisor CRD path only + } + + ctx := WithNewGlobalRequestID(context.Background()) + result, err := controller.reconcileAllocations(ctx, tt.reservation) + if err != nil { + t.Fatalf("reconcileAllocations() error = %v", err) + } + + // Check grace period result + if result.HasAllocationsInGracePeriod != tt.expectedHasGracePeriodAllocs { + t.Errorf("expected HasAllocationsInGracePeriod=%v, got %v", + tt.expectedHasGracePeriodAllocs, result.HasAllocationsInGracePeriod) + } + + // Re-fetch reservation to check updates + var updated v1alpha1.Reservation + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(tt.reservation), &updated); err != nil { + t.Fatalf("failed to get updated reservation: %v", err) + } + + // Check status allocations + actualStatusAllocs := map[string]string{} + if updated.Status.CommittedResourceReservation != nil { + actualStatusAllocs = updated.Status.CommittedResourceReservation.Allocations + } + + if len(actualStatusAllocs) != len(tt.expectedStatusAllocations) { + t.Errorf("expected %d status allocations, got %d: %v", + len(tt.expectedStatusAllocations), len(actualStatusAllocs), actualStatusAllocs) + } + + for vmUUID, expectedHost := range tt.expectedStatusAllocations { + if actualHost, ok := actualStatusAllocs[vmUUID]; !ok { + t.Errorf("expected VM %s in status allocations", vmUUID) + } else if actualHost != expectedHost { + t.Errorf("VM %s: expected host %s, got %s", vmUUID, expectedHost, actualHost) + } + } + }) + } +} + +// newTestCRReservation creates a test CR reservation with allocations on "host-1". +func newTestCRReservation(allocations map[string]metav1.Time) *v1alpha1.Reservation { + const host = "host-1" + specAllocs := make(map[string]v1alpha1.CommittedResourceAllocation) + for vmUUID, timestamp := range allocations { + specAllocs[vmUUID] = v1alpha1.CommittedResourceAllocation{ + CreationTimestamp: timestamp, + Resources: map[hv1.ResourceName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + "cpu": resource.MustParse("2"), + }, + } + } + + return &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-reservation", + }, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + TargetHost: host, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + ProjectID: "test-project", + ResourceName: "test-flavor", + Allocations: specAllocs, + }, + }, + Status: v1alpha1.ReservationStatus{ + Host: host, + Conditions: []metav1.Condition{ + { + Type: v1alpha1.ReservationConditionReady, + Status: metav1.ConditionTrue, + Reason: "ReservationActive", + }, + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationStatus{ + Allocations: make(map[string]string), + }, + }, + } +} + +// newTestHypervisorCRD creates a test Hypervisor CRD with instances. +// +//nolint:unparam // name parameter allows future test flexibility +func newTestHypervisorCRD(name string, instances []hv1.Instance) *hv1.Hypervisor { + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: hv1.HypervisorStatus{ + Instances: instances, + }, + } +} + +// ============================================================================ +// Test: reconcileInstanceReservation_Success (existing test) +// ============================================================================ + func TestCommitmentReservationController_reconcileInstanceReservation_Success(t *testing.T) { scheme := runtime.NewScheme() if err := v1alpha1.AddToScheme(scheme); err != nil {