Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions internal/facts/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,11 +995,75 @@ func discoverOllamaLocal(ctx context.Context) (models.OllamaInfo, []models.Resid
// parse the JSON blob
var parsed ollamaDiscoveryPayload
if json.Unmarshal(out, &parsed) == nil {
ApplyOllamaWarmth(&parsed.OllamaInfo, parsed.ResidentModels)
return parsed.OllamaInfo, parsed.ResidentModels
}
return info, nil
}

// applyOllamaWarmth populates ExpiresAt and WarmthScore for each ResidentModel
// from Ollama's /api/ps payload. The Ollama probe emits an `expires_at` field
// per resident model and a process-level `default_keep_alive` duration
// (Ollama 0.3.10+). Warmth is a continuous score in [0, 1] computed as
// remaining / total, where total falls back to 5m (Ollama's stock default)
// when `default_keep_alive` is absent or unparseable. When `expires_at` is
// missing or already past, WarmthScore is 0 (cold). Both fields are
// advisory metadata only — placement consumes them as a bounded
// tiebreaker in internal/placement/ranker.go modelWarmthRank.
//
// Exported for testability from internal/placement and from
// internal/facts tests.
func ApplyOllamaWarmth(info *models.OllamaInfo, rms []models.ResidentModel) {
if len(rms) == 0 {
return
}
now := time.Now()
total := DefaultOllamaKeepAlive(info)
for i := range rms {
rm := &rms[i]
if rm.ExpiresAt.IsZero() {
continue
}
if !rm.ExpiresAt.After(now) {
rm.WarmthScore = 0
continue
}
remaining := rm.ExpiresAt.Sub(now)
score := float64(remaining) / float64(total)
if score < 0 {
score = 0
}
if score > 1 {
score = 1
}
rm.WarmthScore = score
}
}

// DefaultOllamaKeepAlive resolves the process-level default_keep_alive
// duration from an Ollama /api/ps payload, falling back to 5m (Ollama's
// stock default since 0.3.10) when the field is absent or unparseable.
// Returns a positive duration on success. Exported for testability.
func DefaultOllamaKeepAlive(info *models.OllamaInfo) time.Duration {
const fallback = 5 * time.Minute
if info == nil {
return fallback
}
val := strings.TrimSpace(info.DefaultKeepAlive)
if val == "" {
return fallback
}
// If it's a bare integer (seconds), append "s" so ParseDuration can parse it.
if _, err := strconv.Atoi(val); err == nil {
val += "s"
}
d, err := time.ParseDuration(val)
if err != nil || d <= 0 {
return fallback
}
return d
}
Comment thread
toasterbook88 marked this conversation as resolved.

// discoverLlamaServerLocal probes for a running llama-server process and
// returns its resident models. Returns nil if llama-server is not installed or
// not running.
Expand Down
56 changes: 54 additions & 2 deletions internal/facts/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,65 @@ const OllamaDiscoveryScript = `set -o pipefail;
LISTENING=true
fi
GPU=$($OLLAMA_BIN ps 2>/dev/null | grep -o 'gpu:[^ ]*' | head -1)
RESIDENT=$($OLLAMA_BIN ps 2>/dev/null | awk 'NR>1 && NF { proc=""; size_mb=0; for(i=1;i<=NF;i++){if($i~/[0-9]+%/){proc=$i" "$(i+1)} if(($i=="GB"||$i=="GiB")&&i>1&&($(i-1)+0)>0){size_mb=int($(i-1)*1024+0.5)} if(($i=="MB"||$i=="MiB")&&i>1&&($(i-1)+0)>0){size_mb=int($(i-1)+0.5)}} gsub(/"/, "\\\"", proc); printf "%s{\"name\":\"%s\",\"runtime\":\"ollama\",\"processor\":\"%s\",\"size_vram_mb\":%d,\"source\":\"ollama-ps\"}", (n++ ? "," : ""), $1, proc, size_mb }')
# 'ollama ps -qq' (added in Ollama 0.3.10) emits JSON: each entry
# includes name, expires_at (RFC3339) and size_vram. Parse it
# with python3 (always present on nodes with ollama) and emit
# one JSON object per model. Falls back to the existing awk
# parser when the JSON is unavailable (older Ollama).
PS_JSON=$($OLLAMA_BIN ps -qq 2>/dev/null || echo "")
if [ -n "$PS_JSON" ]; then
RESIDENT=$(printf '%s' "$PS_JSON" | python3 - 2>/dev/null <<'PYEOF' || echo ""
import json, sys
try:
data = json.loads(sys.stdin.read() or "[]")
entries = data.get("models", data) if isinstance(data, dict) else data
if not isinstance(entries, list):
entries = []
out = []
for e in entries:
if not isinstance(e, dict):
continue
name = e.get("name", "")
if not name:
continue
vram = e.get("size_vram")
try:
vram_val = int(vram) if vram is not None else 0
except (ValueError, TypeError):
vram_val = 0
out.append(json.dumps({
"name": name,
"runtime": "ollama",
"processor": e.get("processor", "gpu"),
"size_vram_mb": vram_val // (1024*1024),
"source": "ollama-ps",
"expires_at": e.get("expires_at", ""),
}))
print(",".join(out))
except Exception:
sys.exit(0)
PYEOF
)
else
RESIDENT=""
fi
if [ -z "$RESIDENT" ]; then
# Fallback: original awk parser (older Ollama, no 'ps -qq').
# No expires_at field is emitted by this path; the local
# parser will leave ExpiresAt zero and WarmthScore at 0.
RESIDENT=$($OLLAMA_BIN ps 2>/dev/null | awk 'NR>1 && NF { proc=""; size_mb=0; for(i=1;i<=NF;i++){if($i~/[0-9]+%/){proc=$i" "$(i+1)} if(($i=="GB"||$i=="GiB")&&i>1&&($(i-1)+0)>0){size_mb=int($(i-1)*1024+0.5)} if(($i=="MB"||$i=="MiB")&&i>1&&($(i-1)+0)>0){size_mb=int($(i-1)+0.5)}} gsub(/"/, "\\\"", proc); printf "%s{\"name\":\"%s\",\"runtime\":\"ollama\",\"processor\":\"%s\",\"size_vram_mb\":%d,\"source\":\"ollama-ps\"}", (n++ ? "," : ""), $1, proc, size_mb }')
fi
if [ -n "$RESIDENT" ]; then
RESIDENT="[$RESIDENT]"
else
RESIDENT="[]"
fi
echo "{\"installed\":true,\"path\":\"$OLLAMA_BIN\",\"version\":\"${VERSION:-unknown}\",\"running\":$( [ -n \"$PGREP\" ] && echo true || echo false ),\"listening\":$LISTENING,\"port\":11434,\"models\":$MODELS,\"resident_models\":$RESIDENT,\"gpu_offload\":\"${GPU:-none}\"}"
# Process-level default_keep_alive (added in Ollama 0.3.10). Read
# from /api/ps; tolerate older Ollama (or versions that omit
# the field) by emitting an empty string. Treat null and any
# failure to parse as empty.
KEEPALIVE=$(curl -s --max-time 2 http://127.0.0.1:11434/api/ps 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); v=d.get('default_keep_alive'); print('' if v is None else v)" 2>/dev/null || echo "")
echo "{\"installed\":true,\"path\":\"$OLLAMA_BIN\",\"version\":\"${VERSION:-unknown}\",\"running\":$( [ -n \"$PGREP\" ] && echo true || echo false ),\"listening\":$LISTENING,\"port\":11434,\"models\":$MODELS,\"resident_models\":$RESIDENT,\"gpu_offload\":\"${GPU:-none}\",\"default_keep_alive\":\"${KEEPALIVE}\"}"
`

// LlamaServerDiscoveryScript is the bash script used to detect a running
Expand Down
26 changes: 25 additions & 1 deletion internal/models/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,41 @@ type OllamaInfo struct {
Port int `json:"port,omitempty" yaml:"port,omitempty"`
Models []string `json:"models,omitempty" yaml:"models,omitempty"`
GPUOffload string `json:"gpu_offload,omitempty" yaml:"gpu_offload,omitempty"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
// DefaultKeepAlive is the process-level Ollama default keep-alive
// duration string (e.g. "5m", "1h"). Populated from /api/ps on
// Ollama 0.3.10+; empty when unknown or on older Ollama. The warmth
// computation in internal/facts/local.go (applyOllamaWarmth) parses
// this and falls back to 5m when empty.
DefaultKeepAlive string `json:"default_keep_alive,omitempty" yaml:"default_keep_alive,omitempty"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
}

// ResidentModel is additive truth-plane metadata describing a model that is
// currently resident in a node runtime according to a live probe.
//
// ExpiresAt and WarmthScore are populated from Ollama's /api/ps
// `expires_at` and `default_keep_alive` fields (Ollama 0.3.10+). They are
// optional: when absent (older Ollama, no `keep_alive`, or other runtimes
// such as llama-server / mlx_lm.server), both fields remain zero and
// WarmthScore is treated as cold. The fields are advisory metadata only;
// placement uses them as a bounded tiebreaker, never as a primary signal
// (see internal/placement/ranker.go modelWarmthRank).
type ResidentModel struct {
Name string `json:"name" yaml:"name"`
Runtime string `json:"runtime,omitempty" yaml:"runtime,omitempty"`
Processor string `json:"processor,omitempty" yaml:"processor,omitempty"`
Source string `json:"source,omitempty" yaml:"source,omitempty"`
SizeVRAMMB int64 `json:"size_vram_mb,omitempty" yaml:"size_vram_mb,omitempty"` // 0 = unknown/not reported by the runtime; currently populated only by the Ollama probe

// ExpiresAt is the wall-clock time at which the model is expected to
// be unloaded by the runtime. Zero when unknown.
ExpiresAt time.Time `json:"expires_at,omitempty" yaml:"expires_at,omitempty"`

// WarmthScore is a continuous 0.0–1.0 measure of how recently the
// model was loaded, derived from ExpiresAt and the runtime's
// default_keep_alive. 1.0 = freshly loaded, 0.0 = expired or unknown.
// Always non-negative; clamped to [0, 1] at compute time.
WarmthScore float64 `json:"warmth_score,omitempty" yaml:"warmth_score,omitempty"`
}

// TurboQuantInfo records whether a node appears able to run a TurboQuant-like
Expand Down
36 changes: 36 additions & 0 deletions internal/placement/empirical.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,46 @@ func relevantResidentModels(n models.NodeFacts, reqs models.TaskRequirements) []
return relevant
}

// residentModelRank returns a higher score for nodes with a relevant
// resident model already loaded. Currently a count; reserved for
// future qualitative scoring (warmth is layered on as a separate
// modelWarmthRank tiebreaker in ranker.go, not folded into this).
func residentModelRank(n models.NodeFacts, reqs models.TaskRequirements) int {
return len(relevantResidentModels(n, reqs))
}

// modelWarmthRank returns a bounded rank ∈ {0, 1, 2} for the warmth
// of the relevant resident model on a node: 0 = cold (unknown, expired,
// or zero), 1 = warm (>0.5), 2 = hot (>0.9). When no relevant model is
// loaded, returns 0. Used as a tiebreaker at position 10 in
// RankCandidates — never a primary signal.
func modelWarmthRank(n models.NodeFacts, reqs models.TaskRequirements) int {
relevant := relevantResidentModels(n, reqs)
if len(relevant) == 0 {
return 0
}
best := 0.0
for _, m := range relevant {
if m.WarmthScore > best {
best = m.WarmthScore
}
}
return warmthToRank(best)
}

// warmthToRank maps a continuous [0, 1] score to a 0/1/2 rank.
// Boundaries: 0 = cold, 1 = warm (>0.5), 2 = hot (>0.9).
func warmthToRank(score float64) int {
switch {
case score > 0.9:
return 2
case score > 0.5:
return 1
default:
return 0
}
}

func residentModelReason(n models.NodeFacts, reqs models.TaskRequirements) string {
modelsForReq := relevantResidentModels(n, reqs)
if len(modelsForReq) == 0 {
Expand Down
6 changes: 6 additions & 0 deletions internal/placement/ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func RankCandidates(candidates []models.NodeFacts, reqs models.TaskRequirements,
turboQuantRank int
unifiedMemoryRank int
pressureRank int
modelWarmthRank int
reservationRatio float64
clusterReservationShare float64
}
Expand Down Expand Up @@ -132,6 +133,7 @@ func RankCandidates(candidates []models.NodeFacts, reqs models.TaskRequirements,
turboQuantRank: turboQuantRank(n),
unifiedMemoryRank: unifiedMemoryRank(n, reqs),
pressureRank: pressureRank(pressureOf(n)),
modelWarmthRank: modelWarmthRank(n, reqs),
reservationRatio: reservationRatio(n),
clusterReservationShare: share,
}
Expand Down Expand Up @@ -176,6 +178,10 @@ func RankCandidates(candidates []models.NodeFacts, reqs models.TaskRequirements,
return keys[i].pressureRank < keys[j].pressureRank
}

if keys[i].modelWarmthRank != keys[j].modelWarmthRank {
return keys[i].modelWarmthRank > keys[j].modelWarmthRank
}

if keys[i].reservationRatio != keys[j].reservationRatio {
return keys[i].reservationRatio < keys[j].reservationRatio
}
Expand Down
Loading