Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion internal/scheduling/reservations/commitments/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@ import (

type Config struct {

// RequeueIntervalActive is the interval for requeueing active reservations for verification.
// RequeueIntervalActive is the interval for requeueing active reservations for periodic verification.
RequeueIntervalActive time.Duration `json:"committedResourceRequeueIntervalActive"`
// RequeueIntervalRetry is the interval for requeueing when retrying after knowledge is not ready.
RequeueIntervalRetry time.Duration `json:"committedResourceRequeueIntervalRetry"`
// AllocationGracePeriod is the time window after a VM is allocated to a reservation
// during which it's expected to appear on the target host. VMs not confirmed within
// this period are considered stale and removed from the reservation.
AllocationGracePeriod time.Duration `json:"committedResourceAllocationGracePeriod"`
// RequeueIntervalGracePeriod is the interval for requeueing when VMs are in grace period.
// Shorter than RequeueIntervalActive for faster verification of new allocations.
RequeueIntervalGracePeriod time.Duration `json:"committedResourceRequeueIntervalGracePeriod"`
// PipelineDefault is the default pipeline used for scheduling committed resource reservations.
PipelineDefault string `json:"committedResourcePipelineDefault"`

Expand Down Expand Up @@ -68,6 +75,12 @@ func (c *Config) ApplyDefaults() {
if c.RequeueIntervalRetry == 0 {
c.RequeueIntervalRetry = defaults.RequeueIntervalRetry
}
if c.RequeueIntervalGracePeriod == 0 {
c.RequeueIntervalGracePeriod = defaults.RequeueIntervalGracePeriod
}
if c.AllocationGracePeriod == 0 {
c.AllocationGracePeriod = defaults.AllocationGracePeriod
}
if c.PipelineDefault == "" {
c.PipelineDefault = defaults.PipelineDefault
}
Expand All @@ -88,6 +101,8 @@ func DefaultConfig() Config {
return Config{
RequeueIntervalActive: 5 * time.Minute,
RequeueIntervalRetry: 1 * time.Minute,
RequeueIntervalGracePeriod: 1 * time.Minute,
AllocationGracePeriod: 15 * time.Minute,
PipelineDefault: "kvm-general-purpose-load-balancing",
SchedulerURL: "http://localhost:8080/scheduler/nova/external",
ChangeAPIWatchReservationsTimeout: 10 * time.Second,
Expand Down
250 changes: 181 additions & 69 deletions internal/scheduling/reservations/commitments/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (

schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/knowledge/datasources/plugins/openstack/nova"
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
"github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute"
schedulingnova "github.com/cobaltcore-dev/cortex/internal/scheduling/nova"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/cobaltcore-dev/cortex/pkg/multicluster"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
Expand All @@ -44,6 +44,8 @@ type CommitmentReservationController struct {
DB *db.DB
// SchedulerClient for making scheduler API calls.
SchedulerClient *reservations.SchedulerClient
// NovaClient for direct Nova API calls (real-time VM status).
NovaClient schedulingnova.NovaClient
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -96,13 +98,18 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr
if meta.IsStatusConditionTrue(res.Status.Conditions, v1alpha1.ReservationConditionReady) {
logger.V(1).Info("reservation is active, verifying allocations")

// Verify all allocations in Spec against actual VM state from database
if err := r.reconcileAllocations(ctx, &res); err != nil {
// Verify all allocations in Spec against actual VM state
result, err := r.reconcileAllocations(ctx, &res)
if err != nil {
logger.Error(err, "failed to reconcile allocations")
return ctrl.Result{}, err
}

// Requeue periodically to keep verifying allocations
// Requeue with appropriate interval based on allocation state
// Use shorter interval if there are allocations in grace period for faster verification
if result.HasAllocationsInGracePeriod {
return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalGracePeriod}, nil
}
return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalActive}, nil
}

Expand Down Expand Up @@ -322,82 +329,203 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr
return ctrl.Result{}, nil
}

// reconcileAllocations verifies all allocations in Spec against actual Nova VM state.
// reconcileAllocationsResult holds the outcome of allocation reconciliation.
type reconcileAllocationsResult struct {
// HasAllocationsInGracePeriod is true if any allocations are still in grace period.
HasAllocationsInGracePeriod bool
}

// reconcileAllocations verifies all allocations in Spec against actual VM state.
// It updates Status.Allocations based on the actual host location of each VM.
func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) error {
// For new allocations (within grace period), it uses the Nova API for real-time status.
// For older allocations, it uses the Hypervisor CRD to check if VM is on the expected host.
func (r *CommitmentReservationController) reconcileAllocations(ctx context.Context, res *v1alpha1.Reservation) (*reconcileAllocationsResult, error) {
logger := LoggerFromContext(ctx).WithValues("component", "controller")
result := &reconcileAllocationsResult{}
now := time.Now()

// Skip if no CommittedResourceReservation
if res.Spec.CommittedResourceReservation == nil {
return nil
return result, nil
}

// TODO trigger migrations of unused reservations (to PAYG VMs)

// Skip if no allocations to verify
if len(res.Spec.CommittedResourceReservation.Allocations) == 0 {
logger.V(1).Info("no allocations to verify", "reservation", res.Name)
return nil
return result, nil
}

// Query all VMs for this project from the database
projectID := res.Spec.CommittedResourceReservation.ProjectID
serverMap, err := r.listServersByProjectID(ctx, projectID)
if err != nil {
return fmt.Errorf("failed to list servers for project %s: %w", projectID, err)
expectedHost := res.Status.Host

// Fetch the Hypervisor CRD for the expected host (for older allocations)
var hypervisor hv1.Hypervisor
hvInstanceSet := make(map[string]bool)
if expectedHost != "" {
if err := r.Get(ctx, client.ObjectKey{Name: expectedHost}, &hypervisor); err != nil {
if client.IgnoreNotFound(err) != nil {
return nil, fmt.Errorf("failed to get hypervisor %s: %w", expectedHost, err)
}
// Hypervisor not found - all older allocations will be checked via Nova API fallback
logger.Info("hypervisor CRD not found", "host", expectedHost)
} else {
// Build set of all VM UUIDs on this hypervisor for O(1) lookup
// Include both active and inactive VMs - stopped/shelved VMs still consume the reservation slot
for _, inst := range hypervisor.Status.Instances {
hvInstanceSet[inst.ID] = true
}
logger.V(1).Info("fetched hypervisor instances", "host", expectedHost, "instanceCount", len(hvInstanceSet))
}
}

// initialize
// Initialize status
if res.Status.CommittedResourceReservation == nil {
res.Status.CommittedResourceReservation = &v1alpha1.CommittedResourceReservationStatus{}
}

// Build new Status.Allocations map based on actual VM locations
newStatusAllocations := make(map[string]string)
// Track allocations to remove from Spec (stale/leaving VMs)
var allocationsToRemove []string

for vmUUID, allocation := range res.Spec.CommittedResourceReservation.Allocations {
allocationAge := now.Sub(allocation.CreationTimestamp.Time)
isInGracePeriod := allocationAge < r.Conf.AllocationGracePeriod

if isInGracePeriod {
// New allocation: use Nova API for real-time status
result.HasAllocationsInGracePeriod = true

if r.NovaClient == nil {
// No Nova client - skip verification for now, retry later
logger.V(1).Info("Nova client not available, skipping new allocation verification",
"vm", vmUUID,
"allocationAge", allocationAge)
continue
}

for vmUUID := range res.Spec.CommittedResourceReservation.Allocations {
server, exists := serverMap[vmUUID]
if exists {
// VM found - record its actual host location
actualHost := server.OSEXTSRVATTRHost
newStatusAllocations[vmUUID] = actualHost

logger.V(1).Info("verified VM allocation",
"vm", vmUUID,
"actualHost", actualHost,
"expectedHost", res.Status.Host)
} else {
// VM not found in database
logger.Info("VM not found in database",
"vm", vmUUID,
"reservation", res.Name,
"projectID", projectID)
server, err := r.NovaClient.Get(ctx, vmUUID)
if err != nil {
// VM not yet available in Nova (still spawning) - retry on next reconcile
logger.V(1).Info("VM not yet available in Nova API",
"vm", vmUUID,
"error", err.Error(),
"allocationAge", allocationAge)
// Keep in Spec, don't add to Status - will retry on next reconcile
continue
}

// TODO handle entering and leave event
actualHost := server.ComputeHost
switch {
case actualHost == expectedHost:
// VM is on expected host - confirmed running
newStatusAllocations[vmUUID] = actualHost
logger.V(1).Info("verified new VM allocation via Nova API",
"vm", vmUUID,
"actualHost", actualHost,
"allocationAge", allocationAge)
case actualHost != "":
// VM is on different host - migration scenario (log for now)
newStatusAllocations[vmUUID] = actualHost
logger.Info("VM on different host than expected (migration?)",
"vm", vmUUID,
"actualHost", actualHost,
"expectedHost", expectedHost,
"allocationAge", allocationAge)
default:
// VM not yet on any host - still spawning
logger.V(1).Info("VM not yet on host (spawning)",
"vm", vmUUID,
"status", server.Status,
"allocationAge", allocationAge)
// Keep in Spec, don't add to Status - will retry on next reconcile
}
} else {
// Older allocation: use Hypervisor CRD for verification
if hvInstanceSet[vmUUID] {
// VM found on expected hypervisor - confirmed running
newStatusAllocations[vmUUID] = expectedHost
logger.V(1).Info("verified VM allocation via Hypervisor CRD",
"vm", vmUUID,
"host", expectedHost)
} else {
// VM not found on expected hypervisor - check Nova API as fallback
if r.NovaClient != nil {
novaServer, err := r.NovaClient.Get(ctx, vmUUID)
if err == nil && novaServer.ComputeHost != "" {
// VM exists but on different host - migration or placement change
newStatusAllocations[vmUUID] = novaServer.ComputeHost
logger.Info("VM found via Nova API fallback (not on expected host)",
"vm", vmUUID,
"actualHost", novaServer.ComputeHost,
"expectedHost", expectedHost)
continue
}
// Nova API confirms VM doesn't exist or has no host
logger.V(1).Info("Nova API confirmed VM not found",
"vm", vmUUID,
"error", err)
}
// VM not found on hypervisor and not in Nova - mark for removal (leaving VM)
allocationsToRemove = append(allocationsToRemove, vmUUID)
logger.Info("removing stale allocation (VM not found on hypervisor or Nova)",
"vm", vmUUID,
"reservation", res.Name,
"expectedHost", expectedHost,
"allocationAge", allocationAge,
"gracePeriod", r.Conf.AllocationGracePeriod)
}
}
}

// Patch the reservation status
// Patch the reservation
old := res.DeepCopy()
specChanged := false

// Remove stale allocations from Spec
if len(allocationsToRemove) > 0 {
for _, vmUUID := range allocationsToRemove {
delete(res.Spec.CommittedResourceReservation.Allocations, vmUUID)
}
specChanged = true
}

// Update Status.Allocations
res.Status.CommittedResourceReservation.Allocations = newStatusAllocations

// Patch Spec if changed (stale allocations removed)
if specChanged {
if err := r.Patch(ctx, res, client.MergeFrom(old)); err != nil {
if client.IgnoreNotFound(err) == nil {
return result, nil
}
return nil, fmt.Errorf("failed to patch reservation spec: %w", err)
}
// Re-fetch to get the updated resource version for status patch
if err := r.Get(ctx, client.ObjectKeyFromObject(res), res); err != nil {
if client.IgnoreNotFound(err) == nil {
return result, nil
}
return nil, fmt.Errorf("failed to re-fetch reservation: %w", err)
}
old = res.DeepCopy()
}

// Patch Status
patch := client.MergeFrom(old)
if err := r.Status().Patch(ctx, res, patch); err != nil {
// Ignore not-found errors during background deletion
if client.IgnoreNotFound(err) == nil {
// Object was deleted, no need to continue
return nil
return result, nil
}
return fmt.Errorf("failed to patch reservation status: %w", err)
return nil, fmt.Errorf("failed to patch reservation status: %w", err)
}

logger.V(1).Info("reconciled allocations",
"specAllocations", len(res.Spec.CommittedResourceReservation.Allocations),
"statusAllocations", len(newStatusAllocations))
"statusAllocations", len(newStatusAllocations),
"removedAllocations", len(allocationsToRemove),
"hasAllocationsInGracePeriod", result.HasAllocationsInGracePeriod)

return nil
return result, nil
}

// getPipelineForFlavorGroup returns the pipeline name for a given flavor group.
Expand Down Expand Up @@ -432,36 +560,20 @@ func (r *CommitmentReservationController) Init(ctx context.Context, client clien
r.SchedulerClient = reservations.NewSchedulerClient(conf.SchedulerURL)
logf.FromContext(ctx).Info("scheduler client initialized for commitment reservation controller", "url", conf.SchedulerURL)

return nil
}

func (r *CommitmentReservationController) listServersByProjectID(ctx context.Context, projectID string) (map[string]*nova.Server, error) {
if r.DB == nil {
return nil, errors.New("database connection not initialized")
}

logger := LoggerFromContext(ctx).WithValues("component", "controller")

// Query servers from the database cache.
var servers []nova.Server
_, err := r.DB.Select(&servers,
"SELECT * FROM openstack_servers WHERE tenant_id = $1",
projectID)
if err != nil {
return nil, fmt.Errorf("failed to query servers from database: %w", err)
}

logger.V(1).Info("queried servers from database",
"projectID", projectID,
"serverCount", len(servers))

// Build lookup map for O(1) access by VM UUID.
serverMap := make(map[string]*nova.Server, len(servers))
for i := range servers {
serverMap[servers[i].ID] = &servers[i]
// Initialize Nova client for real-time VM status checks (optional).
// Skip if NovaClient is already set (e.g., injected for testing) or if keystone not configured.
if r.NovaClient == nil && conf.KeystoneSecretRef.Name != "" {
r.NovaClient = schedulingnova.NewNovaClient()
if err := r.NovaClient.Init(ctx, client, schedulingnova.NovaClientConfig{
KeystoneSecretRef: conf.KeystoneSecretRef,
SSOSecretRef: conf.SSOSecretRef,
}); err != nil {
return fmt.Errorf("failed to initialize Nova client: %w", err)
}
logf.FromContext(ctx).Info("Nova client initialized for commitment reservation controller")
}

return serverMap, nil
return nil
}

// commitmentReservationPredicate filters to only watch commitment reservations.
Expand Down
Loading
Loading