Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions cmd/api/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ func (s *ApiService) CreateInstance(ctx context.Context, request oapi.CreateInst
Code: "name_conflict",
Message: err.Error(),
}, nil
case errors.Is(err, instances.ErrInsufficientResources):
return oapi.CreateInstance409JSONResponse{
Code: "insufficient_resources",
Message: err.Error(),
}, nil
default:
log.ErrorContext(ctx, "failed to create instance", "error", err, "image", request.Body.Image)
return oapi.CreateInstance500JSONResponse{
Expand Down Expand Up @@ -309,15 +314,15 @@ func (s *ApiService) GetInstanceStats(ctx context.Context, request oapi.GetInsta
// vmStatsToOAPI converts vm_metrics.VMStats to oapi.InstanceStats
func vmStatsToOAPI(s *vm_metrics.VMStats) oapi.InstanceStats {
stats := oapi.InstanceStats{
InstanceId: s.InstanceID,
InstanceName: s.InstanceName,
CpuSeconds: s.CPUSeconds(),
MemoryRssBytes: int64(s.MemoryRSSBytes),
MemoryVmsBytes: int64(s.MemoryVMSBytes),
NetworkRxBytes: int64(s.NetRxBytes),
NetworkTxBytes: int64(s.NetTxBytes),
AllocatedVcpus: s.AllocatedVcpus,
AllocatedMemoryBytes: s.AllocatedMemoryBytes,
InstanceId: s.InstanceID,
InstanceName: s.InstanceName,
CpuSeconds: s.CPUSeconds(),
MemoryRssBytes: int64(s.MemoryRSSBytes),
MemoryVmsBytes: int64(s.MemoryVMSBytes),
NetworkRxBytes: int64(s.NetRxBytes),
NetworkTxBytes: int64(s.NetTxBytes),
AllocatedVcpus: s.AllocatedVcpus,
AllocatedMemoryBytes: s.AllocatedMemoryBytes,
MemoryUtilizationRatio: s.MemoryUtilizationRatio(),
}
return stats
Expand Down Expand Up @@ -464,6 +469,11 @@ func (s *ApiService) StartInstance(ctx context.Context, request oapi.StartInstan
Code: "invalid_state",
Message: err.Error(),
}, nil
case errors.Is(err, instances.ErrInsufficientResources):
return oapi.StartInstance409JSONResponse{
Code: "insufficient_resources",
Message: err.Error(),
}, nil
default:
log.ErrorContext(ctx, "failed to start instance", "error", err)
return oapi.StartInstance500JSONResponse{
Expand Down
3 changes: 3 additions & 0 deletions cmd/api/api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ func (s *ApiService) GetResources(ctx context.Context, _ oapi.GetResourcesReques
}

// Convert to API response
diskIO := convertResourceStatus(status.DiskIO)
resp := oapi.Resources{
Cpu: convertResourceStatus(status.CPU),
Memory: convertResourceStatus(status.Memory),
Disk: convertResourceStatus(status.Disk),
Network: convertResourceStatus(status.Network),
DiskIo: &diskIO,
Allocations: make([]oapi.ResourceAllocation, 0, len(status.Allocations)),
}

Expand All @@ -53,6 +55,7 @@ func (s *ApiService) GetResources(ctx context.Context, _ oapi.GetResourcesReques
DiskBytes: &alloc.DiskBytes,
NetworkDownloadBps: &alloc.NetworkDownloadBps,
NetworkUploadBps: &alloc.NetworkUploadBps,
DiskIoBps: &alloc.DiskIOBps,
})
}

Expand Down
8 changes: 3 additions & 5 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ type Config struct {
MaxMemoryPerInstance string // Max memory for a single VM (0 = unlimited)

// Resource limits - aggregate
MaxTotalVcpus int // Aggregate vCPU limit across all instances (0 = unlimited)
MaxTotalMemory string // Aggregate memory limit across all instances (0 = unlimited)
// Note: CPU/memory aggregate limits are now handled via oversubscription ratios (OVERSUB_CPU, OVERSUB_MEMORY)
MaxTotalVolumeStorage string // Total volume storage limit (0 = unlimited)

// OpenTelemetry configuration
Expand Down Expand Up @@ -166,9 +165,8 @@ func Load() *Config {
MaxVcpusPerInstance: getEnvInt("MAX_VCPUS_PER_INSTANCE", 16),
MaxMemoryPerInstance: getEnv("MAX_MEMORY_PER_INSTANCE", "32GB"),

// Resource limits - aggregate (0 or empty = unlimited)
MaxTotalVcpus: getEnvInt("MAX_TOTAL_VCPUS", 0),
MaxTotalMemory: getEnv("MAX_TOTAL_MEMORY", ""),
// Resource limits - aggregate
// Note: CPU/memory aggregate limits are now handled via oversubscription ratios
MaxTotalVolumeStorage: getEnv("MAX_TOTAL_VOLUME_STORAGE", ""),

// OpenTelemetry configuration
Expand Down
6 changes: 6 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ func run() error {
logger.Warn("failed to reconcile mdev devices", "error", err)
}

// Wire up resource validator for aggregate limit checking
// This enables the instance manager to validate CPU, memory, network, and GPU
// availability before creating or starting instances.
app.InstanceManager.SetResourceValidator(app.ResourceManager)
logger.Info("Resource validator configured")

// Initialize ingress manager (starts Caddy daemon and DNS server for dynamic upstreams)
logger.Info("Initializing ingress manager...")
if err := app.IngressManager.Initialize(app.Ctx); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions integration/systemd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func TestSystemdMode(t *testing.T) {
MaxOverlaySize: 100 * 1024 * 1024 * 1024,
MaxVcpusPerInstance: 0,
MaxMemoryPerInstance: 0,
MaxTotalVcpus: 0,
MaxTotalMemory: 0,
}

instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil)
Expand Down
3 changes: 0 additions & 3 deletions integration/vgpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func TestVGPU(t *testing.T) {
MaxOverlaySize: 100 * 1024 * 1024 * 1024,
MaxVcpusPerInstance: 0,
MaxMemoryPerInstance: 0,
MaxTotalVcpus: 0,
MaxTotalMemory: 0,
}

instanceManager := instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil)
Expand Down Expand Up @@ -272,4 +270,3 @@ func checkVGPUTestPrerequisites() (string, string) {

return "vGPU test requires at least one available VF (all VFs are in use)", ""
}

6 changes: 5 additions & 1 deletion lib/builds/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (m *mockInstanceManager) ListRunningInstancesInfo(ctx context.Context) ([]r
return nil, nil
}

func (m *mockInstanceManager) SetResourceValidator(v instances.ResourceValidator) {
// no-op for mock
}

// mockVolumeManager implements volumes.Manager for testing
type mockVolumeManager struct {
volumes map[string]*volumes.Volume
Expand Down Expand Up @@ -611,7 +615,7 @@ func TestBuildQueue_ConcurrencyLimit(t *testing.T) {
queue := NewBuildQueue(2) // Max 2 concurrent

started := make(chan string, 5)

// Enqueue 5 builds with blocking start functions
for i := 0; i < 5; i++ {
id := string(rune('A' + i))
Expand Down
45 changes: 7 additions & 38 deletions lib/instances/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"strings"
"time"

"github.com/nrednav/cuid2"
"github.com/kernel/hypeman/lib/devices"
"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/images"
"github.com/kernel/hypeman/lib/logger"
"github.com/kernel/hypeman/lib/network"
"github.com/kernel/hypeman/lib/system"
"github.com/kernel/hypeman/lib/volumes"
"github.com/nrednav/cuid2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"gvisor.dev/gvisor/pkg/cleanup"
Expand Down Expand Up @@ -48,31 +48,6 @@ var systemDirectories = []string{
"/var",
}

// AggregateUsage represents total resource usage across all instances
type AggregateUsage struct {
TotalVcpus int
TotalMemory int64 // in bytes
}

// calculateAggregateUsage calculates total resource usage across all running instances
func (m *manager) calculateAggregateUsage(ctx context.Context) (AggregateUsage, error) {
instances, err := m.listInstances(ctx)
if err != nil {
return AggregateUsage{}, err
}

var usage AggregateUsage
for _, inst := range instances {
// Only count running/paused instances (those consuming resources)
if inst.State == StateRunning || inst.State == StatePaused || inst.State == StateCreated {
usage.TotalVcpus += inst.Vcpus
usage.TotalMemory += inst.Size + inst.HotplugSize
}
}

return usage, nil
}

// generateVsockCID converts first 8 chars of instance ID to a unique CID
// CIDs 0-2 are reserved (hypervisor, loopback, host)
// Returns value in range 3 to 4294967295
Expand Down Expand Up @@ -174,18 +149,12 @@ func (m *manager) createInstance(
return nil, fmt.Errorf("total memory %d (size + hotplug_size) exceeds maximum allowed %d per instance", totalMemory, m.limits.MaxMemoryPerInstance)
}

// Validate aggregate resource limits
if m.limits.MaxTotalVcpus > 0 || m.limits.MaxTotalMemory > 0 {
usage, err := m.calculateAggregateUsage(ctx)
if err != nil {
log.WarnContext(ctx, "failed to calculate aggregate usage, skipping limit check", "error", err)
} else {
if m.limits.MaxTotalVcpus > 0 && usage.TotalVcpus+vcpus > m.limits.MaxTotalVcpus {
return nil, fmt.Errorf("total vcpus would be %d, exceeds aggregate limit of %d", usage.TotalVcpus+vcpus, m.limits.MaxTotalVcpus)
}
if m.limits.MaxTotalMemory > 0 && usage.TotalMemory+totalMemory > m.limits.MaxTotalMemory {
return nil, fmt.Errorf("total memory would be %d, exceeds aggregate limit of %d", usage.TotalMemory+totalMemory, m.limits.MaxTotalMemory)
}
// Validate aggregate resource limits via ResourceValidator (if configured)
if m.resourceValidator != nil {
needsGPU := req.GPU != nil && req.GPU.Profile != ""
if err := m.resourceValidator.ValidateAllocation(ctx, vcpus, totalMemory, req.NetworkBandwidthDownload, req.NetworkBandwidthUpload, req.DiskIOBps, needsGPU); err != nil {
log.ErrorContext(ctx, "resource validation failed", "error", err)
return nil, fmt.Errorf("%w: %v", ErrInsufficientResources, err)
}
}

Expand Down
3 changes: 3 additions & 0 deletions lib/instances/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ var (

// ErrAmbiguousName is returned when multiple instances have the same name
ErrAmbiguousName = errors.New("multiple instances with the same name")

// ErrInsufficientResources is returned when resources (CPU, memory, network, GPU) are not available
ErrInsufficientResources = errors.New("insufficient resources")
)
42 changes: 29 additions & 13 deletions lib/instances/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,38 @@ type Manager interface {
// ListRunningInstancesInfo returns info needed for utilization metrics collection.
// Used by the resource manager for VM utilization tracking.
ListRunningInstancesInfo(ctx context.Context) ([]resources.InstanceUtilizationInfo, error)
// SetResourceValidator sets the validator for aggregate resource limit checking.
// Called after initialization to avoid circular dependencies.
SetResourceValidator(v ResourceValidator)
}

// ResourceLimits contains configurable resource limits for instances
type ResourceLimits struct {
MaxOverlaySize int64 // Maximum overlay disk size in bytes per instance
MaxVcpusPerInstance int // Maximum vCPUs per instance (0 = unlimited)
MaxMemoryPerInstance int64 // Maximum memory in bytes per instance (0 = unlimited)
MaxTotalVcpus int // Maximum total vCPUs across all instances (0 = unlimited)
MaxTotalMemory int64 // Maximum total memory in bytes across all instances (0 = unlimited)
}

// ResourceValidator validates if resources can be allocated
type ResourceValidator interface {
// ValidateAllocation checks if the requested resources are available.
// Returns nil if allocation is allowed, or a detailed error describing
// which resource is insufficient and the current capacity/usage.
ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, needsGPU bool) error
}

type manager struct {
paths *paths.Paths
imageManager images.Manager
systemManager system.Manager
networkManager network.Manager
deviceManager devices.Manager
volumeManager volumes.Manager
limits ResourceLimits
instanceLocks sync.Map // map[string]*sync.RWMutex - per-instance locks
hostTopology *HostTopology // Cached host CPU topology
metrics *Metrics
paths *paths.Paths
imageManager images.Manager
systemManager system.Manager
networkManager network.Manager
deviceManager devices.Manager
volumeManager volumes.Manager
limits ResourceLimits
resourceValidator ResourceValidator // Optional validator for aggregate resource limits
instanceLocks sync.Map // map[string]*sync.RWMutex - per-instance locks
hostTopology *HostTopology // Cached host CPU topology
metrics *Metrics

// Hypervisor support
vmStarters map[hypervisor.Type]hypervisor.VMStarter
Expand Down Expand Up @@ -106,6 +116,12 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste
return m
}

// SetResourceValidator sets the resource validator for aggregate limit checking.
// This is called after initialization to avoid circular dependencies.
func (m *manager) SetResourceValidator(v ResourceValidator) {
m.resourceValidator = v
}

// getHypervisor creates a hypervisor client for the given socket and type.
// Used for connecting to already-running VMs (e.g., for state queries).
func (m *manager) getHypervisor(socketPath string, hvType hypervisor.Type) (hypervisor.Hypervisor, error) {
Expand Down Expand Up @@ -324,6 +340,7 @@ func (m *manager) ListInstanceAllocations(ctx context.Context) ([]resources.Inst
VolumeOverlayBytes: volumeOverlayBytes,
NetworkDownloadBps: inst.NetworkBandwidthDownload,
NetworkUploadBps: inst.NetworkBandwidthUpload,
DiskIOBps: inst.DiskIOBps,
State: string(inst.State),
VolumeBytes: volumeBytes,
})
Expand Down Expand Up @@ -366,4 +383,3 @@ func (m *manager) ListRunningInstancesInfo(ctx context.Context) ([]resources.Ins

return infos, nil
}

26 changes: 18 additions & 8 deletions lib/instances/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kernel/hypeman/lib/ingress"
"github.com/kernel/hypeman/lib/network"
"github.com/kernel/hypeman/lib/paths"
"github.com/kernel/hypeman/lib/resources"
"github.com/kernel/hypeman/lib/system"
"github.com/kernel/hypeman/lib/vmm"
"github.com/kernel/hypeman/lib/volumes"
Expand Down Expand Up @@ -54,11 +55,18 @@ func setupTestManager(t *testing.T) (*manager, string) {
MaxOverlaySize: 100 * 1024 * 1024 * 1024, // 100GB
MaxVcpusPerInstance: 0, // unlimited
MaxMemoryPerInstance: 0, // unlimited
MaxTotalVcpus: 0, // unlimited
MaxTotalMemory: 0, // unlimited
}
mgr := NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil).(*manager)

// Set up resource validation using the real ResourceManager
resourceMgr := resources.NewManager(cfg, p)
resourceMgr.SetInstanceLister(mgr)
resourceMgr.SetImageLister(imageManager)
resourceMgr.SetVolumeLister(volumeManager)
err = resourceMgr.Initialize(context.Background())
require.NoError(t, err)
mgr.SetResourceValidator(resourceMgr)

// Register cleanup to kill any orphaned Cloud Hypervisor processes
t.Cleanup(func() {
cleanupOrphanedProcesses(t, mgr)
Expand Down Expand Up @@ -922,10 +930,14 @@ func TestStorageOperations(t *testing.T) {
tmpDir := t.TempDir()

cfg := &config.Config{
DataDir: tmpDir,
BridgeName: "vmbr0",
SubnetCIDR: "10.100.0.0/16",
DNSServer: "1.1.1.1",
DataDir: tmpDir,
BridgeName: "vmbr0",
SubnetCIDR: "10.100.0.0/16",
DNSServer: "1.1.1.1",
OversubCPU: 1.0,
OversubMemory: 1.0,
OversubDisk: 1.0,
OversubNetwork: 1.0,
}

p := paths.New(tmpDir)
Expand All @@ -938,8 +950,6 @@ func TestStorageOperations(t *testing.T) {
MaxOverlaySize: 100 * 1024 * 1024 * 1024, // 100GB
MaxVcpusPerInstance: 0, // unlimited
MaxMemoryPerInstance: 0, // unlimited
MaxTotalVcpus: 0, // unlimited
MaxTotalMemory: 0, // unlimited
}
manager := NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, "", nil, nil).(*manager)

Expand Down
12 changes: 10 additions & 2 deletions lib/instances/qemu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/kernel/hypeman/lib/ingress"
"github.com/kernel/hypeman/lib/network"
"github.com/kernel/hypeman/lib/paths"
"github.com/kernel/hypeman/lib/resources"
"github.com/kernel/hypeman/lib/system"
"github.com/kernel/hypeman/lib/volumes"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -52,11 +53,18 @@ func setupTestManagerForQEMU(t *testing.T) (*manager, string) {
MaxOverlaySize: 100 * 1024 * 1024 * 1024, // 100GB
MaxVcpusPerInstance: 0, // unlimited
MaxMemoryPerInstance: 0, // unlimited
MaxTotalVcpus: 0, // unlimited
MaxTotalMemory: 0, // unlimited
}
mgr := NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, hypervisor.TypeQEMU, nil, nil).(*manager)

// Set up resource validation using the real ResourceManager
resourceMgr := resources.NewManager(cfg, p)
resourceMgr.SetInstanceLister(mgr)
resourceMgr.SetImageLister(imageManager)
resourceMgr.SetVolumeLister(volumeManager)
err = resourceMgr.Initialize(context.Background())
require.NoError(t, err)
mgr.SetResourceValidator(resourceMgr)

// Register cleanup to kill any orphaned QEMU processes
t.Cleanup(func() {
cleanupOrphanedQEMUProcesses(t, mgr)
Expand Down
Loading