From a6f3f41de7bfb861ca87886a3a7f7c3463d42523 Mon Sep 17 00:00:00 2001 From: TheShigure7 <2947458856@qq.com> Date: Sat, 9 May 2026 21:38:46 +0800 Subject: [PATCH 1/2] feat: add marketplace phase2 vector retrieval backend --- cmd/anyclaw-registry/main.go | 73 ++- cmd/anyclaw-registry/main_test.go | 2 +- pkg/capability/markettools/marketplace.go | 64 +- .../markettools/marketplace_test.go | 19 +- pkg/marketplace/bridge/bridge.go | 32 +- pkg/marketplace/capability_index.go | 27 +- pkg/marketplace/registry/client.go | 22 +- pkg/marketplace/registry/client_test.go | 19 +- pkg/marketplace/registry/convert.go | 84 ++- pkg/marketplace/registry/types.go | 31 +- pkg/marketplace/types.go | 138 ++-- pkg/marketregistry/embedding_jobs.go | 601 ++++++++++++++++++ pkg/marketregistry/embedding_text.go | 21 + pkg/marketregistry/embedding_types.go | 55 ++ pkg/marketregistry/metrics.go | 130 ++++ pkg/marketregistry/search_filters.go | 52 ++ pkg/marketregistry/search_lexical.go | 256 ++++++++ pkg/marketregistry/search_types.go | 11 + pkg/marketregistry/server.go | 166 ++++- pkg/marketregistry/server_test.go | 591 ++++++++++++++++- pkg/marketregistry/store.go | 517 ++++++++++++--- pkg/marketregistry/types.go | 48 +- pkg/marketregistry/vector.go | 157 +++++ 23 files changed, 2889 insertions(+), 227 deletions(-) create mode 100644 pkg/marketregistry/embedding_jobs.go create mode 100644 pkg/marketregistry/embedding_text.go create mode 100644 pkg/marketregistry/embedding_types.go create mode 100644 pkg/marketregistry/metrics.go create mode 100644 pkg/marketregistry/search_filters.go create mode 100644 pkg/marketregistry/search_lexical.go create mode 100644 pkg/marketregistry/search_types.go create mode 100644 pkg/marketregistry/vector.go diff --git a/cmd/anyclaw-registry/main.go b/cmd/anyclaw-registry/main.go index 3ab117bb..d43652c3 100644 --- a/cmd/anyclaw-registry/main.go +++ b/cmd/anyclaw-registry/main.go @@ -9,7 +9,9 @@ import ( "net/http" "os" "os/signal" + "strconv" "syscall" + "time" "github.com/1024XEngineer/anyclaw/pkg/marketregistry" ) @@ -42,21 +44,42 @@ func serve(args []string) error { dbDriver := fs.String("db-driver", "sqlite", "database/sql driver name") dbDSN := fs.String("db-dsn", "", "database DSN; defaults to /registry.db for sqlite") adminToken := fs.String("admin-token", os.Getenv("ANYCLAW_REGISTRY_ADMIN_TOKEN"), "admin bearer token; defaults to ANYCLAW_REGISTRY_ADMIN_TOKEN") + requireAdminToken := fs.Bool("require-admin-token", envBool("ANYCLAW_REGISTRY_REQUIRE_ADMIN_TOKEN", false), "fail startup when admin token is empty") seed := fs.Bool("seed", true, "seed fixture artifacts when the registry is empty") if err := fs.Parse(args); err != nil { return err } + vectorCfg := marketregistry.VectorConfig{ + Enabled: envBool("ANYCLAW_MARKET_VECTOR_ENABLED", false), + FailOpen: envBool("ANYCLAW_MARKET_VECTOR_FAIL_OPEN", true), + Provider: os.Getenv("ANYCLAW_MARKET_EMBEDDING_PROVIDER"), + Model: firstNonEmpty(os.Getenv("ANYCLAW_MARKET_EMBEDDING_MODEL"), os.Getenv("ANYCLAW_MARKET_EMBEDDING_DASHSCOPE_MODEL")), + QueryModel: firstNonEmpty(os.Getenv("ANYCLAW_MARKET_EMBEDDING_QUERY_MODEL"), os.Getenv("ANYCLAW_MARKET_EMBEDDING_DASHSCOPE_QUERY_MODEL")), + APIKey: firstNonEmpty(os.Getenv("ANYCLAW_MARKET_EMBEDDING_API_KEY"), os.Getenv("ANYCLAW_MARKET_EMBEDDING_DASHSCOPE_API_KEY")), + BaseURL: os.Getenv("ANYCLAW_MARKET_EMBEDDING_BASE_URL"), + SecretKey: os.Getenv("ANYCLAW_MARKET_EMBEDDING_SECRET_KEY"), + QueryTimeout: time.Duration(envInt("ANYCLAW_MARKET_VECTOR_TIMEOUT_MS", 12000)) * time.Millisecond, + WorkerPoll: time.Duration(envInt("ANYCLAW_MARKET_VECTOR_WORKER_POLL_MS", 5000)) * time.Millisecond, + MaxJobAttempts: envInt("ANYCLAW_MARKET_VECTOR_MAX_JOB_ATTEMPTS", 3), + HybridTopK: envInt("ANYCLAW_MARKET_VECTOR_TOP_K", 25), + HybridCandidate: envInt("ANYCLAW_MARKET_VECTOR_CANDIDATE_LIMIT", 200), + QueryCacheTTL: time.Duration(envInt("ANYCLAW_MARKET_QUERY_CACHE_TTL_SECONDS", 600)) * time.Second, + QueryCacheSize: envInt("ANYCLAW_MARKET_QUERY_CACHE_SIZE", 512), + } + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() server, err := marketregistry.NewServer(ctx, marketregistry.ServerConfig{ - Addr: *addr, - DataDir: *dataDir, - DBDriver: *dbDriver, - DBDSN: *dbDSN, - AdminToken: *adminToken, - Seed: *seed, + Addr: *addr, + DataDir: *dataDir, + DBDriver: *dbDriver, + DBDSN: *dbDSN, + AdminToken: *adminToken, + RequireAdminToken: *requireAdminToken, + Seed: *seed, + Vector: vectorCfg, }) if err != nil { return err @@ -72,5 +95,41 @@ func serve(args []string) error { } func printUsage() { - fmt.Println("Usage: anyclaw-registry serve [--addr :8791] [--data-dir .anyclaw-registry] [--db-driver sqlite] [--db-dsn path-or-url] [--admin-token token] [--seed=true]") + fmt.Println("Usage: anyclaw-registry serve [--addr :8791] [--data-dir .anyclaw-registry] [--db-driver sqlite] [--db-dsn path-or-url] [--admin-token token] [--require-admin-token=true] [--seed=true]") +} + +func envBool(name string, fallback bool) bool { + value := os.Getenv(name) + if value == "" { + return fallback + } + switch value { + case "1", "true", "TRUE", "True", "yes", "YES", "on", "ON": + return true + case "0", "false", "FALSE", "False", "no", "NO", "off", "OFF": + return false + default: + return fallback + } +} + +func envInt(name string, fallback int) int { + value := os.Getenv(name) + if value == "" { + return fallback + } + n, err := strconv.Atoi(value) + if err != nil { + return fallback + } + return n +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" } diff --git a/cmd/anyclaw-registry/main_test.go b/cmd/anyclaw-registry/main_test.go index 43d25dd3..2c119706 100644 --- a/cmd/anyclaw-registry/main_test.go +++ b/cmd/anyclaw-registry/main_test.go @@ -11,7 +11,7 @@ import ( func TestRunDefaultsToServeAndRequiresAdminToken(t *testing.T) { t.Setenv("ANYCLAW_REGISTRY_ADMIN_TOKEN", "") - err := run([]string{"serve", "--data-dir", t.TempDir(), "--seed=false"}) + err := run([]string{"serve", "--data-dir", t.TempDir(), "--seed=false", "--require-admin-token=true"}) if err == nil || !strings.Contains(err.Error(), "admin token is required") { t.Fatalf("expected missing admin token error, got %v", err) } diff --git a/pkg/capability/markettools/marketplace.go b/pkg/capability/markettools/marketplace.go index eaa05695..6451230c 100644 --- a/pkg/capability/markettools/marketplace.go +++ b/pkg/capability/markettools/marketplace.go @@ -16,6 +16,13 @@ type Options struct { AuditLogger tools.AuditLogger } +type retrievalMetaPayload struct { + SearchMode string `json:"search_mode,omitempty"` + VectorApplied *bool `json:"vector_applied,omitempty"` + VectorFallbackReason string `json:"vector_fallback_reason,omitempty"` + CandidateCounts map[string]int `json:"candidate_counts,omitempty"` +} + func Register(registry *tools.Registry, opts Options) { if registry == nil || opts.Bridge == nil { return @@ -26,10 +33,11 @@ func Register(registry *tools.Registry, opts Options) { InputSchema: map[string]any{ "type": "object", "properties": map[string]any{ - "query": map[string]string{"type": "string", "description": "Capability need or search query"}, - "kind": map[string]string{"type": "string", "description": "Optional kind: agent, skill, or cli"}, - "source": map[string]string{"type": "string", "description": "Optional source: local, cloud, or all"}, - "limit": map[string]string{"type": "number", "description": "Maximum results"}, + "query": map[string]string{"type": "string", "description": "Capability need or search query"}, + "kind": map[string]string{"type": "string", "description": "Optional kind: agent, skill, or cli"}, + "source": map[string]string{"type": "string", "description": "Optional source: local, cloud, or all"}, + "search_mode": map[string]string{"type": "string", "description": "Optional search mode: auto, lexical, or hybrid"}, + "limit": map[string]string{"type": "number", "description": "Maximum results"}, }, "required": []string{"query"}, }, @@ -99,22 +107,31 @@ func searchArtifacts(ctx context.Context, opts Options, input map[string]any) (s query := stringValue(input["query"]) kind := marketplace.NormalizeKind(stringValue(input["kind"])) source := marketplace.NormalizeSource(stringValue(input["source"])) + searchMode := marketplace.NormalizeSearchMode(stringValue(input["search_mode"])) limit := intValue(input["limit"], 5) - result, err := opts.Bridge.Search(ctx, marketbridge.SearchRequest{Query: query, Kind: kind, Source: source, Limit: limit}) + result, err := opts.Bridge.Search(ctx, marketbridge.SearchRequest{ + Query: query, + Kind: kind, + Source: source, + SearchMode: searchMode, + Limit: limit, + }) if err != nil { return "", err } route := marketplace.RouteCapabilityNeed(query, result.Local, result.Cloud, limit) return marshalJSON(map[string]any{ - "query": query, - "kind": kind, - "source": firstNonEmpty(string(source), "all"), - "route": route, - "local_count": len(result.Local), - "cloud_count": len(result.Cloud), - "local": marketplace.BuildCapabilityIndex(result.Local), - "cloud": marketplace.BuildCapabilityIndex(result.Cloud), - "cloud_error": result.CloudErr, + "query": query, + "kind": kind, + "source": firstNonEmpty(string(source), "all"), + "route": route, + "local_count": len(result.Local), + "cloud_count": len(result.Cloud), + "local": marketplace.BuildCapabilityIndex(result.Local), + "cloud": marketplace.BuildCapabilityIndex(result.Cloud), + "cloud_artifacts": result.Cloud, + "cloud_error": result.CloudErr, + "retrieval_meta": buildRetrievalMetaPayload(result.RetrievalMeta), }) } @@ -235,3 +252,22 @@ func marshalJSON(value any) (string, error) { } return string(data), nil } + +func buildRetrievalMetaPayload(meta *marketplace.RetrievalMeta) *retrievalMetaPayload { + if meta == nil { + return nil + } + payload := &retrievalMetaPayload{ + SearchMode: string(meta.SearchMode), + VectorApplied: meta.VectorApplied, + VectorFallbackReason: meta.VectorFallbackReason, + } + if meta.CandidateCounts != nil { + payload.CandidateCounts = map[string]int{ + "lexical": meta.CandidateCounts.Lexical, + "vector": meta.CandidateCounts.Vector, + "merged": meta.CandidateCounts.Merged, + } + } + return payload +} diff --git a/pkg/capability/markettools/marketplace_test.go b/pkg/capability/markettools/marketplace_test.go index a63e0677..66bffef2 100644 --- a/pkg/capability/markettools/marketplace_test.go +++ b/pkg/capability/markettools/marketplace_test.go @@ -53,6 +53,12 @@ func TestSearchToolRoutesMissingCapabilityToCloud(t *testing.T) { if !strings.Contains(out, `"action": "install_from_market"`) || !strings.Contains(out, "cloud.skill.release-notes") { t.Fatalf("output = %s, want cloud install route", out) } + if !strings.Contains(out, `"retrieval_meta"`) || !strings.Contains(out, `"search_mode": "lexical"`) { + t.Fatalf("output = %s, want retrieval meta", out) + } + if !strings.Contains(out, `"cloud_artifacts"`) || !strings.Contains(out, `"final_score": 0.91`) { + t.Fatalf("output = %s, want full cloud artifacts with final_score", out) + } } func TestInstallToolAskReturnsConfirmationWithoutInstalling(t *testing.T) { @@ -244,11 +250,16 @@ func testMarketRegistryServer(t *testing.T, id, kind, risk, trust string, permis "tags": []string{"release notes", "changelog", "code review", "pull request"}, "hit_signals": []string{"release notes", "code review"}, "score": 0.91, + "final_score": 0.91, + "match_signals": []string{"lexical", "trust"}, }}, - "total": 1, - "limit": 10, - "offset": 0, - }}) + "total": 1, + "limit": 10, + "offset": 0, + "retrieval_meta": map[string]any{"search_mode": "lexical", "vector_applied": false}, + }, + "meta": map[string]any{"search_mode": "lexical", "vector_applied": false}, + }) default: http.NotFound(w, r) } diff --git a/pkg/marketplace/bridge/bridge.go b/pkg/marketplace/bridge/bridge.go index 62cadfd4..748667c2 100644 --- a/pkg/marketplace/bridge/bridge.go +++ b/pkg/marketplace/bridge/bridge.go @@ -30,16 +30,18 @@ type Bridge interface { } type SearchRequest struct { - Query string - Kind marketplace.ArtifactKind - Source marketplace.SourceKind - Limit int + Query string + Kind marketplace.ArtifactKind + Source marketplace.SourceKind + SearchMode marketplace.SearchMode + Limit int } type SearchResult struct { - Local []marketplace.Artifact - Cloud []marketplace.Artifact - CloudErr string + Local []marketplace.Artifact + Cloud []marketplace.Artifact + CloudErr string + RetrievalMeta *marketplace.RetrievalMeta } type ListResult struct { @@ -107,18 +109,30 @@ func (b *DefaultBridge) Search(ctx context.Context, req SearchRequest) (SearchRe } var cloud []marketplace.Artifact var cloudErr string + var retrievalMeta *marketplace.RetrievalMeta if req.Source != marketplace.SourceLocal && b.registry != nil { - result, err := b.registry.List(ctx, marketplace.Filter{Kind: req.Kind, Query: req.Query, Limit: limit}) + result, err := b.registry.List(ctx, marketplace.Filter{ + Kind: req.Kind, + Query: req.Query, + SearchMode: req.SearchMode, + Limit: limit, + }) if err != nil { cloudErr = err.Error() } else { cloud = result.Items + retrievalMeta = result.RetrievalMeta } } if req.Source == marketplace.SourceCloud { local = nil } - return SearchResult{Local: local, Cloud: cloud, CloudErr: cloudErr}, nil + return SearchResult{ + Local: local, + Cloud: cloud, + CloudErr: cloudErr, + RetrievalMeta: retrievalMeta, + }, nil } func (b *DefaultBridge) List(ctx context.Context, filter marketplace.Filter) (ListResult, error) { diff --git a/pkg/marketplace/capability_index.go b/pkg/marketplace/capability_index.go index 64c1c41a..a0fe41e7 100644 --- a/pkg/marketplace/capability_index.go +++ b/pkg/marketplace/capability_index.go @@ -6,16 +6,23 @@ func BuildCapabilityIndex(items []Artifact) []CapabilityIndexItem { out := make([]CapabilityIndexItem, 0, len(items)) for _, item := range items { out = append(out, CapabilityIndexItem{ - ArtifactID: item.ID, - Kind: item.Kind, - Name: firstNonEmpty(item.DisplayName, item.Name), - Source: item.Source, - Status: string(item.Status), - Capabilities: artifactCapabilityTerms(item), - Permissions: append([]string(nil), item.Permissions...), - RiskLevel: item.RiskLevel, - TrustLevel: item.TrustLevel, - Score: item.Score, + ID: item.ID, + ArtifactID: item.ID, + Kind: item.Kind, + Name: firstNonEmpty(item.DisplayName, item.Name), + Description: item.Description, + Source: item.Source, + Status: string(item.Status), + Capabilities: artifactCapabilityTerms(item), + Permissions: append([]string(nil), item.Permissions...), + RiskLevel: item.RiskLevel, + TrustLevel: item.TrustLevel, + Compatibility: item.Compatibility, + Installed: item.Installed, + Score: item.Score, + FinalScore: item.FinalScore, + VectorScore: item.VectorScore, + MatchSignals: append([]string(nil), item.MatchSignals...), }) } return out diff --git a/pkg/marketplace/registry/client.go b/pkg/marketplace/registry/client.go index 06b91b86..a8104ed2 100644 --- a/pkg/marketplace/registry/client.go +++ b/pkg/marketplace/registry/client.go @@ -110,6 +110,9 @@ func (c *Client) List(ctx context.Context, filter marketplace.Filter) (marketpla if filter.Query != "" { values.Set("q", filter.Query) } + if filter.SearchMode != "" { + values.Set("search_mode", string(filter.SearchMode)) + } if filter.Risk != "" { values.Set("risk", filter.Risk) } @@ -149,10 +152,11 @@ func (c *Client) List(ctx context.Context, filter marketplace.Filter) (marketpla items = append(items, convertArtifact(item)) } return marketplace.ListResult{ - Items: items, - Total: envelope.Data.Total, - Limit: envelope.Data.Limit, - Offset: envelope.Data.Offset, + Items: items, + Total: envelope.Data.Total, + Limit: envelope.Data.Limit, + Offset: envelope.Data.Offset, + RetrievalMeta: firstRetrievalMeta(envelope.Data.RetrievalMeta, &envelope.Meta), }, nil } @@ -381,3 +385,13 @@ type remoteStatusError struct { func (e remoteStatusError) Error() string { return fmt.Sprintf("marketplace registry returned HTTP %d", e.StatusCode) } + +func firstRetrievalMeta(values ...*remoteRetrievalMeta) *marketplace.RetrievalMeta { + for _, value := range values { + if value == nil { + continue + } + return convertRetrievalMeta(value) + } + return nil +} diff --git a/pkg/marketplace/registry/client_test.go b/pkg/marketplace/registry/client_test.go index fd29a696..ff2de93b 100644 --- a/pkg/marketplace/registry/client_test.go +++ b/pkg/marketplace/registry/client_test.go @@ -41,12 +41,16 @@ func TestClientListConvertsCloudArtifactsAndCaches(t *testing.T) { "tags": []string{"release"}, "hit_signals": []string{"changelog"}, "score": 0.9, + "final_score": 0.9, + "match_signals": []string{"lexical", "trust"}, }, }, - "total": 1, - "limit": 10, - "offset": 0, + "total": 1, + "limit": 10, + "offset": 0, + "retrieval_meta": map[string]any{"search_mode": "lexical", "vector_applied": false}, }, + "meta": map[string]any{"search_mode": "lexical", "vector_applied": false}, }) })) defer server.Close() @@ -77,6 +81,15 @@ func TestClientListConvertsCloudArtifactsAndCaches(t *testing.T) { if item.Description != "Detailed release notes skill." { t.Fatalf("unexpected description %q", item.Description) } + if item.FinalScore != 0.9 || len(item.MatchSignals) == 0 { + t.Fatalf("unexpected scoring metadata: %#v", item) + } + if result.RetrievalMeta == nil || result.RetrievalMeta.SearchMode != marketplace.SearchModeLexical { + t.Fatalf("expected retrieval meta, got %#v", result.RetrievalMeta) + } + if result.RetrievalMeta.VectorApplied == nil || *result.RetrievalMeta.VectorApplied { + t.Fatalf("expected vector_applied=false, got %#v", result.RetrievalMeta) + } if _, err := client.List(context.Background(), marketplace.Filter{Kind: marketplace.ArtifactKindSkill, Limit: 10}); err != nil { t.Fatal(err) diff --git a/pkg/marketplace/registry/convert.go b/pkg/marketplace/registry/convert.go index 6e8f8cfd..9367a29e 100644 --- a/pkg/marketplace/registry/convert.go +++ b/pkg/marketplace/registry/convert.go @@ -30,34 +30,42 @@ func convertArtifact(item remoteArtifact) marketplace.Artifact { description := firstNonEmpty(item.DescriptionMD, item.Summary) version := firstNonEmpty(item.Version, item.LatestVersion) return marketplace.Artifact{ - ID: item.ID, - Kind: item.Kind, - Name: item.Name, - DisplayName: item.Name, - Description: description, - Version: version, - LatestVersion: item.LatestVersion, - Source: marketplace.SourceCloud, - SourceID: firstNonEmpty(item.Source, "registry"), - Status: marketplace.StatusAvailable, - Installed: false, - Bound: false, - Active: false, - Enabled: true, - Owner: item.Publisher, - Category: string(item.Kind), - Tags: append([]string(nil), item.Tags...), - Permissions: append([]string(nil), item.Permissions...), - RiskLevel: item.RiskLevel, - TrustLevel: item.TrustLevel, - Verified: strings.EqualFold(item.TrustLevel, "verified"), - Compatibility: convertCompatibility(item.Compatibility), - Dependencies: convertDependencies(item.Dependencies), - HitSignals: append([]string(nil), item.HitSignals...), - Score: item.Score, - TargetHints: targetHintsForKind(item.Kind), - Capabilities: appendUnique(nil, append(append(item.Tags, item.HitSignals...), string(item.Kind))...), - Metadata: metadata, + ID: item.ID, + Kind: item.Kind, + Name: item.Name, + DisplayName: item.Name, + Description: description, + Version: version, + LatestVersion: item.LatestVersion, + Source: marketplace.SourceCloud, + SourceID: firstNonEmpty(item.Source, "registry"), + Status: marketplace.StatusAvailable, + Installed: false, + Bound: false, + Active: false, + Enabled: true, + Owner: item.Publisher, + Category: string(item.Kind), + Tags: append([]string(nil), item.Tags...), + Permissions: append([]string(nil), item.Permissions...), + RiskLevel: item.RiskLevel, + TrustLevel: item.TrustLevel, + Verified: strings.EqualFold(item.TrustLevel, "verified"), + Compatibility: convertCompatibility(item.Compatibility), + Dependencies: convertDependencies(item.Dependencies), + HitSignals: append([]string(nil), item.HitSignals...), + Score: item.Score, + LexicalScore: item.LexicalScore, + VectorScore: item.VectorScore, + TagScore: item.TagScore, + TrustScore: item.TrustScore, + FreshnessScore: item.FreshnessScore, + RiskPenalty: item.RiskPenalty, + FinalScore: item.FinalScore, + MatchSignals: append([]string(nil), item.MatchSignals...), + TargetHints: targetHintsForKind(item.Kind), + Capabilities: appendUnique(nil, append(append(item.Tags, item.HitSignals...), string(item.Kind))...), + Metadata: metadata, } } @@ -132,3 +140,23 @@ func appendUnique(base []string, values ...string) []string { } return filtered } + +func convertRetrievalMeta(meta *remoteRetrievalMeta) *marketplace.RetrievalMeta { + if meta == nil { + return nil + } + var counts *marketplace.CandidateCounts + if meta.CandidateCounts != nil { + counts = &marketplace.CandidateCounts{ + Lexical: meta.CandidateCounts.Lexical, + Vector: meta.CandidateCounts.Vector, + Merged: meta.CandidateCounts.Merged, + } + } + return &marketplace.RetrievalMeta{ + SearchMode: meta.SearchMode, + VectorApplied: meta.VectorApplied, + VectorFallbackReason: meta.VectorFallbackReason, + CandidateCounts: counts, + } +} diff --git a/pkg/marketplace/registry/types.go b/pkg/marketplace/registry/types.go index e3e5de32..cb3fd03e 100644 --- a/pkg/marketplace/registry/types.go +++ b/pkg/marketplace/registry/types.go @@ -23,10 +23,31 @@ type remoteArtifact struct { Tags []string `json:"tags,omitempty"` HitSignals []string `json:"hit_signals,omitempty"` Score float64 `json:"score,omitempty"` + LexicalScore float64 `json:"lexical_score,omitempty"` + VectorScore *float64 `json:"vector_score,omitempty"` + TagScore float64 `json:"tag_score,omitempty"` + TrustScore float64 `json:"trust_score,omitempty"` + FreshnessScore float64 `json:"freshness_score,omitempty"` + RiskPenalty float64 `json:"risk_penalty,omitempty"` + FinalScore float64 `json:"final_score,omitempty"` + MatchSignals []string `json:"match_signals,omitempty"` UpdatedAt string `json:"updated_at,omitempty"` ManifestSummary map[string]string `json:"manifest_summary,omitempty"` } +type remoteCandidateCounts struct { + Lexical int `json:"lexical,omitempty"` + Vector int `json:"vector,omitempty"` + Merged int `json:"merged,omitempty"` +} + +type remoteRetrievalMeta struct { + SearchMode marketplace.SearchMode `json:"search_mode,omitempty"` + VectorApplied *bool `json:"vector_applied,omitempty"` + VectorFallbackReason string `json:"vector_fallback_reason,omitempty"` + CandidateCounts *remoteCandidateCounts `json:"candidate_counts,omitempty"` +} + type remoteCompatibility struct { AnyClawMin string `json:"anyclaw_min,omitempty"` OS []string `json:"os,omitempty"` @@ -79,11 +100,13 @@ type ResolvedArtifact struct { type listEnvelope struct { Data struct { - Items []remoteArtifact `json:"items"` - Total int `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Items []remoteArtifact `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + RetrievalMeta *remoteRetrievalMeta `json:"retrieval_meta,omitempty"` } `json:"data"` + Meta remoteRetrievalMeta `json:"meta"` } type artifactEnvelope struct { diff --git a/pkg/marketplace/types.go b/pkg/marketplace/types.go index 3cd72e36..e25774f6 100644 --- a/pkg/marketplace/types.go +++ b/pkg/marketplace/types.go @@ -18,6 +18,15 @@ const ( SourceCloud SourceKind = "cloud" ) +type SearchMode string + +const ( + SearchModeAuto SearchMode = "auto" + SearchModeLexical SearchMode = "lexical" + SearchModeHybrid SearchMode = "hybrid" + SearchModeLexicalFallback SearchMode = "lexical_fallback" +) + type ArtifactStatus string const ( @@ -61,36 +70,57 @@ type ArtifactVersion struct { Deprecated bool `json:"deprecated,omitempty"` } +type CandidateCounts struct { + Lexical int `json:"lexical,omitempty"` + Vector int `json:"vector,omitempty"` + Merged int `json:"merged,omitempty"` +} + +type RetrievalMeta struct { + SearchMode SearchMode `json:"search_mode,omitempty"` + VectorApplied *bool `json:"vector_applied,omitempty"` + VectorFallbackReason string `json:"vector_fallback_reason,omitempty"` + CandidateCounts *CandidateCounts `json:"candidate_counts,omitempty"` +} + type Artifact struct { - ID string `json:"id"` - Kind ArtifactKind `json:"kind"` - Name string `json:"name"` - DisplayName string `json:"display_name,omitempty"` - Description string `json:"description,omitempty"` - Version string `json:"version,omitempty"` - LatestVersion string `json:"latest_version,omitempty"` - Source SourceKind `json:"source"` - SourceID string `json:"source_id,omitempty"` - Status ArtifactStatus `json:"status"` - Installed bool `json:"installed"` - Bound bool `json:"bound"` - Active bool `json:"active"` - Enabled bool `json:"enabled"` - Owner string `json:"owner,omitempty"` - Category string `json:"category,omitempty"` - Tags []string `json:"tags,omitempty"` - Permissions []string `json:"permissions,omitempty"` - RiskLevel string `json:"risk_level,omitempty"` - TrustLevel string `json:"trust_level,omitempty"` - Verified bool `json:"verified,omitempty"` - Compatibility Compatibility `json:"compatibility,omitempty"` - Dependencies []ArtifactDependency `json:"dependencies,omitempty"` - HitSignals []string `json:"hit_signals,omitempty"` - Score float64 `json:"score,omitempty"` - InstallHint string `json:"install_hint,omitempty"` - TargetHints []string `json:"target_hints,omitempty"` - Capabilities []string `json:"capabilities,omitempty"` - Metadata map[string]string `json:"metadata,omitempty"` + ID string `json:"id"` + Kind ArtifactKind `json:"kind"` + Name string `json:"name"` + DisplayName string `json:"display_name,omitempty"` + Description string `json:"description,omitempty"` + Version string `json:"version,omitempty"` + LatestVersion string `json:"latest_version,omitempty"` + Source SourceKind `json:"source"` + SourceID string `json:"source_id,omitempty"` + Status ArtifactStatus `json:"status"` + Installed bool `json:"installed"` + Bound bool `json:"bound"` + Active bool `json:"active"` + Enabled bool `json:"enabled"` + Owner string `json:"owner,omitempty"` + Category string `json:"category,omitempty"` + Tags []string `json:"tags,omitempty"` + Permissions []string `json:"permissions,omitempty"` + RiskLevel string `json:"risk_level,omitempty"` + TrustLevel string `json:"trust_level,omitempty"` + Verified bool `json:"verified,omitempty"` + Compatibility Compatibility `json:"compatibility,omitempty"` + Dependencies []ArtifactDependency `json:"dependencies,omitempty"` + HitSignals []string `json:"hit_signals,omitempty"` + Score float64 `json:"score,omitempty"` + LexicalScore float64 `json:"lexical_score,omitempty"` + VectorScore *float64 `json:"vector_score,omitempty"` + TagScore float64 `json:"tag_score,omitempty"` + TrustScore float64 `json:"trust_score,omitempty"` + FreshnessScore float64 `json:"freshness_score,omitempty"` + RiskPenalty float64 `json:"risk_penalty,omitempty"` + FinalScore float64 `json:"final_score,omitempty"` + MatchSignals []string `json:"match_signals,omitempty"` + InstallHint string `json:"install_hint,omitempty"` + TargetHints []string `json:"target_hints,omitempty"` + Capabilities []string `json:"capabilities,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` } type JobState string @@ -278,16 +308,23 @@ type MarketEventListResult struct { } type CapabilityIndexItem struct { - ArtifactID string `json:"artifact_id"` - Kind ArtifactKind `json:"kind"` - Name string `json:"name"` - Source SourceKind `json:"source"` - Status string `json:"status"` - Capabilities []string `json:"capabilities,omitempty"` - Permissions []string `json:"permissions,omitempty"` - RiskLevel string `json:"risk_level,omitempty"` - TrustLevel string `json:"trust_level,omitempty"` - Score float64 `json:"score,omitempty"` + ID string `json:"id,omitempty"` + ArtifactID string `json:"artifact_id"` + Kind ArtifactKind `json:"kind"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Source SourceKind `json:"source"` + Status string `json:"status"` + Capabilities []string `json:"capabilities,omitempty"` + Permissions []string `json:"permissions,omitempty"` + RiskLevel string `json:"risk_level,omitempty"` + TrustLevel string `json:"trust_level,omitempty"` + Compatibility Compatibility `json:"compatibility,omitempty"` + Installed bool `json:"installed,omitempty"` + Score float64 `json:"score,omitempty"` + FinalScore float64 `json:"final_score,omitempty"` + VectorScore *float64 `json:"vector_score,omitempty"` + MatchSignals []string `json:"match_signals,omitempty"` } type CapabilityRoute struct { @@ -302,6 +339,7 @@ type Filter struct { Kind ArtifactKind Source SourceKind Query string + SearchMode SearchMode Status ArtifactStatus Risk string Trust string @@ -316,10 +354,11 @@ type Filter struct { } type ListResult struct { - Items []Artifact `json:"items"` - Total int `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Items []Artifact `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + RetrievalMeta *RetrievalMeta `json:"retrieval_meta,omitempty"` } func NormalizeKind(value string) ArtifactKind { @@ -348,6 +387,19 @@ func NormalizeSource(value string) SourceKind { } } +func NormalizeSearchMode(value string) SearchMode { + switch SearchMode(strings.ToLower(strings.TrimSpace(value))) { + case SearchModeAuto: + return SearchModeAuto + case SearchModeLexical: + return SearchModeLexical + case SearchModeHybrid: + return SearchModeHybrid + default: + return "" + } +} + func NormalizeStatus(value string) ArtifactStatus { switch ArtifactStatus(strings.ToLower(strings.TrimSpace(value))) { case StatusAvailable: diff --git a/pkg/marketregistry/embedding_jobs.go b/pkg/marketregistry/embedding_jobs.go new file mode 100644 index 00000000..6bfd1c5a --- /dev/null +++ b/pkg/marketregistry/embedding_jobs.go @@ -0,0 +1,601 @@ +package marketregistry + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "sort" + "strings" + "time" + + "github.com/1024XEngineer/anyclaw/pkg/vec" +) + +func (s *Store) queueArtifactEmbeddingJobTx(ctx context.Context, tx *sql.Tx, artifact Artifact) error { + if s == nil || tx == nil { + return nil + } + now := time.Now().UTC().Format(time.RFC3339) + embeddingText := buildArtifactEmbeddingText(artifact) + model := "" + if s.vector != nil { + model = s.vector.artifactModel() + } + if _, err := tx.ExecContext(ctx, `INSERT INTO artifact_embeddings ( + artifact_id, model, vector_json, vector_dim, embedding_text, status, error, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(artifact_id) DO UPDATE SET + model = excluded.model, + embedding_text = excluded.embedding_text, + status = excluded.status, + error = excluded.error, + updated_at = excluded.updated_at`, + artifact.ID, model, "[]", 0, embeddingText, artifactEmbeddingStatusPending, "", now); err != nil { + return err + } + _, err := tx.ExecContext(ctx, `INSERT INTO artifact_embedding_jobs ( + artifact_id, job_type, status, attempt_count, error, created_at, updated_at + ) VALUES (?, ?, ?, 0, '', ?, ?) + ON CONFLICT(artifact_id, job_type) DO UPDATE SET + status = excluded.status, + attempt_count = 0, + error = excluded.error, + updated_at = excluded.updated_at`, + artifact.ID, artifactEmbeddingJobType, artifactEmbeddingJobQueued, now, now) + return err +} + +func (s *Store) claimNextArtifactEmbeddingJob(ctx context.Context) (*ArtifactEmbeddingJob, error) { + if s == nil || s.db == nil { + return nil, nil + } + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + row := tx.QueryRowContext(ctx, `SELECT id, artifact_id, job_type, status, attempt_count, error, created_at, updated_at + FROM artifact_embedding_jobs + WHERE job_type = ? AND status IN (?, ?) + AND attempt_count < ? + ORDER BY updated_at ASC, id ASC + LIMIT 1`, artifactEmbeddingJobType, artifactEmbeddingJobQueued, artifactEmbeddingJobFailed, s.vector.cfg.MaxJobAttempts) + var job ArtifactEmbeddingJob + scanErr := row.Scan(&job.ID, &job.ArtifactID, &job.JobType, &job.Status, &job.AttemptCount, &job.Error, &job.CreatedAt, &job.UpdatedAt) + if errors.Is(scanErr, sql.ErrNoRows) { + _ = tx.Rollback() + return nil, nil + } + if scanErr != nil { + return nil, scanErr + } + + now := time.Now().UTC().Format(time.RFC3339) + if _, err = tx.ExecContext(ctx, `UPDATE artifact_embedding_jobs + SET status = ?, attempt_count = attempt_count + 1, updated_at = ? + WHERE id = ?`, artifactEmbeddingJobRunning, now, job.ID); err != nil { + return nil, err + } + if _, err = tx.ExecContext(ctx, `UPDATE artifact_embeddings + SET status = ?, updated_at = ? + WHERE artifact_id = ?`, artifactEmbeddingStatusPending, now, job.ArtifactID); err != nil { + return nil, err + } + if err = tx.Commit(); err != nil { + return nil, err + } + job.Status = artifactEmbeddingJobRunning + job.AttemptCount++ + job.UpdatedAt = now + return &job, nil +} + +func (s *Store) markArtifactEmbeddingJobSucceeded(ctx context.Context, job ArtifactEmbeddingJob, artifact Artifact, vectorData []float32) error { + vectorJSON, err := encodeVector(vectorData) + if err != nil { + return err + } + now := time.Now().UTC().Format(time.RFC3339) + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + if _, err = tx.ExecContext(ctx, `UPDATE artifact_embedding_jobs + SET status = ?, error = '', updated_at = ? + WHERE id = ?`, artifactEmbeddingJobSucceeded, now, job.ID); err != nil { + return err + } + if _, err = tx.ExecContext(ctx, `UPDATE artifact_embeddings + SET model = ?, vector_json = ?, vector_dim = ?, embedding_text = ?, status = ?, error = '', updated_at = ? + WHERE artifact_id = ?`, + s.vector.artifactModel(), vectorJSON, len(vectorData), buildArtifactEmbeddingText(artifact), artifactEmbeddingStatusReady, now, artifact.ID); err != nil { + return err + } + return tx.Commit() +} + +func (s *Store) markArtifactEmbeddingJobFailed(ctx context.Context, job ArtifactEmbeddingJob, reason string) error { + now := time.Now().UTC().Format(time.RFC3339) + jobStatus := artifactEmbeddingJobFailed + embeddingStatus := artifactEmbeddingStatusPending + if s != nil && s.vector != nil && job.AttemptCount >= s.vector.cfg.MaxJobAttempts { + embeddingStatus = artifactEmbeddingStatusFailed + } + _, err := s.db.ExecContext(ctx, `UPDATE artifact_embedding_jobs + SET status = ?, error = ?, updated_at = ? + WHERE id = ?`, jobStatus, reason, now, job.ID) + if err != nil { + return err + } + _, err = s.db.ExecContext(ctx, `UPDATE artifact_embeddings + SET status = ?, error = ?, updated_at = ? + WHERE artifact_id = ?`, embeddingStatus, reason, now, job.ArtifactID) + return err +} + +func (s *Store) loadArtifactEmbedding(ctx context.Context, artifactID string) (*ArtifactEmbedding, error) { + row := s.db.QueryRowContext(ctx, `SELECT artifact_id, model, vector_json, vector_dim, embedding_text, status, error, updated_at + FROM artifact_embeddings WHERE artifact_id = ?`, strings.TrimSpace(artifactID)) + var item ArtifactEmbedding + var vectorJSON string + if err := row.Scan(&item.ArtifactID, &item.Model, &vectorJSON, &item.VectorDim, &item.EmbeddingText, &item.Status, &item.Error, &item.LastUpdatedAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + vectorData, err := decodeVector(vectorJSON) + if err != nil { + return nil, err + } + item.Vector = vectorData + return &item, nil +} + +func (s *Store) ListArtifactEmbeddingJobs(ctx context.Context, status string, limit int) (ArtifactEmbeddingJobList, error) { + if limit <= 0 { + limit = 100 + } + status = strings.TrimSpace(status) + query := `SELECT id, artifact_id, job_type, status, attempt_count, error, created_at, updated_at + FROM artifact_embedding_jobs` + args := make([]any, 0, 2) + if status != "" { + query += ` WHERE status = ?` + args = append(args, status) + } + query += ` ORDER BY updated_at DESC, id DESC LIMIT ?` + args = append(args, limit) + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return ArtifactEmbeddingJobList{}, err + } + defer rows.Close() + items := make([]ArtifactEmbeddingJob, 0, limit) + for rows.Next() { + var item ArtifactEmbeddingJob + if err := rows.Scan(&item.ID, &item.ArtifactID, &item.JobType, &item.Status, &item.AttemptCount, &item.Error, &item.CreatedAt, &item.UpdatedAt); err != nil { + return ArtifactEmbeddingJobList{}, err + } + items = append(items, item) + } + if err := rows.Err(); err != nil { + return ArtifactEmbeddingJobList{}, err + } + return ArtifactEmbeddingJobList{Items: items, Total: len(items)}, nil +} + +func (s *Store) ListArtifactEmbeddings(ctx context.Context, status string, limit int) (ArtifactEmbeddingList, error) { + if limit <= 0 { + limit = 100 + } + status = strings.TrimSpace(status) + query := `SELECT artifact_id, model, vector_json, vector_dim, embedding_text, status, error, updated_at + FROM artifact_embeddings` + args := make([]any, 0, 2) + if status != "" { + query += ` WHERE status = ?` + args = append(args, status) + } + query += ` ORDER BY updated_at DESC, artifact_id ASC LIMIT ?` + args = append(args, limit) + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return ArtifactEmbeddingList{}, err + } + defer rows.Close() + items := make([]ArtifactEmbedding, 0, limit) + for rows.Next() { + var item ArtifactEmbedding + var vectorJSON string + if err := rows.Scan(&item.ArtifactID, &item.Model, &vectorJSON, &item.VectorDim, &item.EmbeddingText, &item.Status, &item.Error, &item.LastUpdatedAt); err != nil { + return ArtifactEmbeddingList{}, err + } + vectorData, err := decodeVector(vectorJSON) + if err != nil { + return ArtifactEmbeddingList{}, err + } + item.Vector = vectorData + items = append(items, item) + } + if err := rows.Err(); err != nil { + return ArtifactEmbeddingList{}, err + } + return ArtifactEmbeddingList{Items: items, Total: len(items)}, nil +} + +func (s *Store) EmbeddingAdminStats(ctx context.Context) (embeddingAdminStats, error) { + var stats embeddingAdminStats + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM artifact_embedding_jobs WHERE status = ?`, artifactEmbeddingJobQueued).Scan(&stats.QueuedJobs); err != nil { + return embeddingAdminStats{}, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM artifact_embedding_jobs WHERE status = ?`, artifactEmbeddingJobFailed).Scan(&stats.FailedJobs); err != nil { + return embeddingAdminStats{}, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM artifact_embeddings WHERE status = ?`, artifactEmbeddingStatusReady).Scan(&stats.ReadyEmbeddings); err != nil { + return embeddingAdminStats{}, err + } + if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM artifact_embeddings WHERE status = ?`, artifactEmbeddingStatusPending).Scan(&stats.PendingEmbedding); err != nil { + return embeddingAdminStats{}, err + } + return stats, nil +} + +func (s *Store) loadEmbeddingsForArtifacts(ctx context.Context, artifactIDs []string) (map[string]ArtifactEmbedding, error) { + if len(artifactIDs) == 0 { + return map[string]ArtifactEmbedding{}, nil + } + placeholders := make([]string, 0, len(artifactIDs)) + args := make([]any, 0, len(artifactIDs)) + for _, artifactID := range artifactIDs { + artifactID = strings.TrimSpace(artifactID) + if artifactID == "" { + continue + } + placeholders = append(placeholders, "?") + args = append(args, artifactID) + } + if len(placeholders) == 0 { + return map[string]ArtifactEmbedding{}, nil + } + rows, err := s.db.QueryContext(ctx, `SELECT artifact_id, model, vector_json, vector_dim, embedding_text, status, error, updated_at + FROM artifact_embeddings WHERE artifact_id IN (`+strings.Join(placeholders, ",")+`) AND status = ? AND model = ?`, + append(args, artifactEmbeddingStatusReady, s.vector.artifactModel())...) + if err != nil { + return nil, err + } + defer rows.Close() + out := make(map[string]ArtifactEmbedding, len(artifactIDs)) + for rows.Next() { + var item ArtifactEmbedding + var vectorJSON string + if err := rows.Scan(&item.ArtifactID, &item.Model, &vectorJSON, &item.VectorDim, &item.EmbeddingText, &item.Status, &item.Error, &item.LastUpdatedAt); err != nil { + return nil, err + } + vectorData, err := decodeVector(vectorJSON) + if err != nil { + return nil, err + } + item.Vector = vectorData + out[item.ArtifactID] = item + } + return out, rows.Err() +} + +func (s *Server) runEmbeddingWorker(ctx context.Context) { + ticker := time.NewTicker(s.vector.cfg.WorkerPoll) + defer ticker.Stop() + for { + s.processOneEmbeddingJob(ctx) + select { + case <-ctx.Done(): + return + case <-ticker.C: + default: + time.Sleep(200 * time.Millisecond) + } + } +} + +func (s *Server) processOneEmbeddingJob(ctx context.Context) { + if s == nil || s.vector == nil || !s.vector.enabled() { + return + } + timer := s.metrics.newTimer("anyclaw_registry_embedding_job_duration_seconds", "Embedding job duration") + job, err := s.store.claimNextArtifactEmbeddingJob(ctx) + if err != nil || job == nil { + return + } + artifact, err := s.store.Get(ctx, job.ArtifactID) + if err != nil { + if timer != nil { + timer.Stop() + } + s.metrics.recordEmbeddingJob(false) + _ = s.store.markArtifactEmbeddingJobFailed(ctx, *job, err.Error()) + return + } + embedCtx, cancel := context.WithTimeout(ctx, s.vector.cfg.QueryTimeout) + defer cancel() + vectorData, err := s.vector.artifactEmbed.Embed(embedCtx, buildArtifactEmbeddingText(artifact)) + if err != nil { + if timer != nil { + timer.Stop() + } + s.metrics.recordEmbeddingJob(false) + _ = s.store.markArtifactEmbeddingJobFailed(ctx, *job, err.Error()) + return + } + if len(vectorData) == 0 { + if timer != nil { + timer.Stop() + } + s.metrics.recordEmbeddingJob(false) + _ = s.store.markArtifactEmbeddingJobFailed(ctx, *job, "empty embedding") + return + } + if timer != nil { + timer.Stop() + } + s.metrics.recordEmbeddingJob(true) + _ = s.store.markArtifactEmbeddingJobSucceeded(ctx, *job, artifact, vectorData) +} + +func (s *Store) searchHybrid(ctx context.Context, filter SearchFilter, lexicalCandidates []searchCandidate, total int) ([]searchCandidate, int, RetrievalMeta) { + meta := RetrievalMeta{ + SearchMode: SearchModeLexicalFallback, + VectorApplied: false, + CandidateCounts: &CandidateCounts{ + Lexical: total, + Vector: 0, + Merged: total, + }, + } + if s == nil || s.vector == nil || !s.vector.enabled() { + meta.VectorFallbackReason = "disabled" + if s != nil && s.vector != nil && s.vector.metrics != nil { + s.vector.metrics.recordVectorFallback("disabled") + } + return lexicalCandidates, total, meta + } + searchTimer := s.vector.metrics.newTimer("anyclaw_registry_vector_search_duration_seconds", "Vector search duration") + defer func() { + if searchTimer != nil { + searchTimer.Stop() + } + }() + embedCtx, cancel := context.WithTimeout(ctx, s.vector.cfg.QueryTimeout) + defer cancel() + queryVector, err := s.vector.embedQuery(embedCtx, strings.TrimSpace(filter.Query)) + if err != nil || len(queryVector) == 0 { + meta.VectorFallbackReason = "provider_error" + if s.vector.metrics != nil { + s.vector.metrics.recordVectorFallback("provider_error") + } + return lexicalCandidates, total, meta + } + + vectorPool, err := s.listVectorPoolCandidates(ctx, filter) + if err != nil { + meta.VectorFallbackReason = "embedding_unavailable" + if s.vector.metrics != nil { + s.vector.metrics.recordVectorFallback("embedding_unavailable") + } + return lexicalCandidates, total, meta + } + vectorCandidates, err := s.rankVectorCandidates(ctx, vectorPool, queryVector) + if err != nil { + meta.VectorFallbackReason = "embedding_unavailable" + if s.vector.metrics != nil { + s.vector.metrics.recordVectorFallback("embedding_unavailable") + } + return lexicalCandidates, total, meta + } + if len(vectorCandidates) == 0 { + meta.VectorFallbackReason = "empty_index" + if s.vector.metrics != nil { + s.vector.metrics.recordVectorFallback("empty_index") + } + return lexicalCandidates, total, meta + } + + merged := mergeHybridCandidates(lexicalCandidates, vectorCandidates) + sortSearchCandidates(merged, filter.Sort) + meta.SearchMode = SearchModeHybrid + meta.VectorApplied = true + meta.VectorFallbackReason = "" + meta.CandidateCounts = &CandidateCounts{ + Lexical: total, + Vector: len(vectorCandidates), + Merged: len(merged), + } + mergedTotal := total + if len(merged) > mergedTotal { + mergedTotal = len(merged) + } + if s.vector.metrics != nil { + s.vector.metrics.recordVectorApplied() + } + return merged, mergedTotal, meta +} + +func (r *vectorRuntime) embedQuery(ctx context.Context, query string) ([]float32, error) { + if r == nil || !r.enabled() { + return nil, fmt.Errorf("vector runtime disabled") + } + query = strings.TrimSpace(query) + if query == "" { + return nil, fmt.Errorf("query is empty") + } + if r.queryCache != nil { + if cached, ok := r.queryCache.Get(query); ok { + if r.metrics != nil { + r.metrics.recordQueryCacheHit() + } + return cached, nil + } + } + if r.metrics != nil { + r.metrics.recordQueryCacheMiss() + } + timer := r.metrics.newTimer("anyclaw_registry_query_embedding_duration_seconds", "Query embedding duration") + embeddingData, err := r.queryEmbed.Embed(ctx, query) + if timer != nil { + timer.Stop() + } + if err != nil { + return nil, err + } + if r.queryCache != nil && len(embeddingData) > 0 { + r.queryCache.Set(query, embeddingData) + } + return embeddingData, nil +} + +func (s *Store) listVectorPoolCandidates(ctx context.Context, filter SearchFilter) ([]searchCandidate, error) { + vectorFilter := filter + vectorFilter.Offset = 0 + if s != nil && s.vector != nil && s.vector.cfg.HybridCandidate > 0 { + vectorFilter.Limit = s.vector.cfg.HybridCandidate + } + candidates, _, err := s.listStructuredCandidates(ctx, vectorFilter) + if err != nil { + return nil, err + } + if limit := vectorFilter.Limit; limit > 0 && len(candidates) > limit { + candidates = candidates[:limit] + } + return candidates, nil +} + +func (s *Store) rankVectorCandidates(ctx context.Context, lexicalCandidates []searchCandidate, queryVector []float32) ([]searchCandidate, error) { + artifactIDs := make([]string, 0, len(lexicalCandidates)) + for _, candidate := range lexicalCandidates { + artifactIDs = append(artifactIDs, candidate.ID) + } + embeddings, err := s.loadEmbeddingsForArtifacts(ctx, artifactIDs) + if err != nil { + return nil, err + } + vectorCandidates := make([]searchCandidate, 0, len(embeddings)) + for _, candidate := range lexicalCandidates { + item, ok := embeddings[candidate.ID] + if !ok || len(item.Vector) == 0 { + continue + } + score := normalizeCosineScore(vec.CosineSimilarity(queryVector, item.Vector)) + candidate.VectorScore = &score + candidate.FinalScore = hybridFinalScore(candidate.Artifact, score) + candidate.Score = candidate.FinalScore + candidate.MatchSignals = appendUniqueSignals(candidate.MatchSignals, "vector") + vectorCandidates = append(vectorCandidates, candidate) + } + sort.SliceStable(vectorCandidates, func(i, j int) bool { + left := valueOrZero(vectorCandidates[i].VectorScore) + right := valueOrZero(vectorCandidates[j].VectorScore) + if left == right { + return vectorCandidates[i].FinalScore > vectorCandidates[j].FinalScore + } + return left > right + }) + if limit := s.vector.cfg.HybridTopK; limit > 0 && len(vectorCandidates) > limit { + vectorCandidates = vectorCandidates[:limit] + } + return vectorCandidates, nil +} + +func mergeHybridCandidates(lexicalCandidates, vectorCandidates []searchCandidate) []searchCandidate { + merged := make(map[string]searchCandidate, len(lexicalCandidates)+len(vectorCandidates)) + for _, candidate := range lexicalCandidates { + merged[candidate.ID] = candidate + } + for _, candidate := range vectorCandidates { + if existing, ok := merged[candidate.ID]; ok { + if candidate.VectorScore != nil { + existing.VectorScore = candidate.VectorScore + } + existing.FinalScore = hybridFinalScore(existing.Artifact, valueOrZero(candidate.VectorScore)) + existing.Score = existing.FinalScore + existing.MatchSignals = appendUniqueSignals(existing.MatchSignals, "vector") + merged[candidate.ID] = existing + continue + } + merged[candidate.ID] = candidate + } + out := make([]searchCandidate, 0, len(merged)) + for _, candidate := range merged { + out = append(out, candidate) + } + return out +} + +func hybridFinalScore(artifact Artifact, vectorScore float64) float64 { + base := artifact.FinalScore + if base <= 0 { + base = finalScore(artifact) + } + if vectorScore <= 0 { + return clampScore(base) + } + return clampScore(base*0.7 + vectorScore*0.3) +} + +func normalizeCosineScore(score float64) float64 { + return clampScore((score + 1.0) / 2.0) +} + +func valueOrZero(score *float64) float64 { + if score == nil { + return 0 + } + return *score +} + +func appendUniqueSignals(base []string, values ...string) []string { + seen := make(map[string]struct{}, len(base)+len(values)) + out := make([]string, 0, len(base)+len(values)) + for _, item := range append(base, values...) { + item = strings.TrimSpace(item) + if item == "" { + continue + } + key := strings.ToLower(item) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, item) + } + return out +} + +func encodeVector(vectorData []float32) (string, error) { + data, err := json.Marshal(vectorData) + if err != nil { + return "", err + } + return string(data), nil +} + +func decodeVector(raw string) ([]float32, error) { + if strings.TrimSpace(raw) == "" { + return nil, nil + } + var vectorData []float32 + if err := json.Unmarshal([]byte(raw), &vectorData); err != nil { + return nil, fmt.Errorf("decode vector: %w", err) + } + return vectorData, nil +} diff --git a/pkg/marketregistry/embedding_text.go b/pkg/marketregistry/embedding_text.go new file mode 100644 index 00000000..5f67194a --- /dev/null +++ b/pkg/marketregistry/embedding_text.go @@ -0,0 +1,21 @@ +package marketregistry + +import ( + "fmt" + "strings" +) + +func buildArtifactEmbeddingText(artifact Artifact) string { + lines := []string{ + fmt.Sprintf("kind: %s", strings.TrimSpace(string(artifact.Kind))), + fmt.Sprintf("name: %s", strings.TrimSpace(artifact.Name)), + fmt.Sprintf("summary: %s", strings.TrimSpace(artifact.Summary)), + fmt.Sprintf("description: %s", strings.TrimSpace(artifact.DescriptionMD)), + fmt.Sprintf("publisher: %s", strings.TrimSpace(artifact.Publisher)), + fmt.Sprintf("tags: %s", strings.Join(trimmedStrings(artifact.Tags), ", ")), + fmt.Sprintf("hit_signals: %s", strings.Join(trimmedStrings(artifact.HitSignals), ", ")), + fmt.Sprintf("use_case: %s", strings.TrimSpace(artifact.ManifestSummary["use_case"])), + fmt.Sprintf("permissions: %s", strings.Join(trimmedStrings(artifact.Permissions), ", ")), + } + return strings.Join(compactStrings(lines...), "\n") +} diff --git a/pkg/marketregistry/embedding_types.go b/pkg/marketregistry/embedding_types.go new file mode 100644 index 00000000..fa70fcea --- /dev/null +++ b/pkg/marketregistry/embedding_types.go @@ -0,0 +1,55 @@ +package marketregistry + +const ( + artifactEmbeddingStatusPending = "pending" + artifactEmbeddingStatusReady = "ready" + artifactEmbeddingStatusFailed = "failed" + + artifactEmbeddingJobType = "artifact_embedding" + artifactEmbeddingJobQueued = "queued" + artifactEmbeddingJobRunning = "running" + artifactEmbeddingJobSucceeded = "succeeded" + artifactEmbeddingJobFailed = "failed" +) + +type ArtifactEmbedding struct { + ArtifactID string `json:"artifact_id"` + Model string `json:"model"` + Vector []float32 `json:"vector,omitempty"` + VectorDim int `json:"vector_dim"` + EmbeddingText string `json:"embedding_text,omitempty"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + LastJobError string `json:"last_job_error,omitempty"` + LastJobStatus string `json:"last_job_status,omitempty"` + LastAttemptAt string `json:"last_attempt_at,omitempty"` + LastUpdatedAt string `json:"last_updated_at,omitempty"` +} + +type ArtifactEmbeddingJob struct { + ID int64 `json:"id"` + ArtifactID string `json:"artifact_id"` + JobType string `json:"job_type"` + Status string `json:"status"` + AttemptCount int `json:"attempt_count"` + Error string `json:"error,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +type ArtifactEmbeddingJobList struct { + Items []ArtifactEmbeddingJob `json:"items"` + Total int `json:"total"` +} + +type ArtifactEmbeddingList struct { + Items []ArtifactEmbedding `json:"items"` + Total int `json:"total"` +} + +type embeddingAdminStats struct { + QueuedJobs int `json:"queued_jobs"` + FailedJobs int `json:"failed_jobs"` + ReadyEmbeddings int `json:"ready_embeddings"` + PendingEmbedding int `json:"pending_embeddings"` +} diff --git a/pkg/marketregistry/metrics.go b/pkg/marketregistry/metrics.go new file mode 100644 index 00000000..89f22197 --- /dev/null +++ b/pkg/marketregistry/metrics.go @@ -0,0 +1,130 @@ +package marketregistry + +import ( + "encoding/json" + "net/http" + stdruntime "runtime" + "sync/atomic" + "time" + + "github.com/1024XEngineer/anyclaw/pkg/state/observability" +) + +type registryMetrics struct { + registry *observability.Registry + + queryCacheHits atomic.Int64 + queryCacheMisses atomic.Int64 +} + +func newRegistryMetrics() *registryMetrics { + reg := observability.NewRegistry() + reg.Counter("anyclaw_registry_embedding_jobs_total", "Total embedding jobs processed", nil) + reg.Counter("anyclaw_registry_embedding_jobs_failed_total", "Total failed embedding jobs", nil) + reg.Counter("anyclaw_registry_vector_search_fallback_total", "Total vector-search fallback events", nil) + reg.Counter("anyclaw_registry_vector_search_applied_total", "Total successful hybrid vector searches", nil) + reg.Counter("anyclaw_registry_query_cache_hits_total", "Total query embedding cache hits", nil) + reg.Counter("anyclaw_registry_query_cache_misses_total", "Total query embedding cache misses", nil) + reg.Gauge("anyclaw_registry_query_cache_size", "Current query embedding cache size", nil) + reg.Gauge("anyclaw_registry_embedding_jobs_queued", "Queued embedding jobs", nil) + reg.Gauge("anyclaw_registry_embedding_jobs_failed", "Failed embedding jobs", nil) + reg.Gauge("anyclaw_registry_artifact_embeddings_ready", "Ready artifact embeddings", nil) + reg.Histogram("anyclaw_registry_query_embedding_duration_seconds", "Query embedding duration", nil) + reg.Histogram("anyclaw_registry_vector_search_duration_seconds", "Vector search duration", nil) + reg.Histogram("anyclaw_registry_embedding_job_duration_seconds", "Embedding job duration", nil) + return ®istryMetrics{registry: reg} +} + +func (m *registryMetrics) recordQueryCacheHit() { + if m == nil { + return + } + m.queryCacheHits.Add(1) + m.registry.Counter("anyclaw_registry_query_cache_hits_total", "Total query embedding cache hits", nil).Inc() +} + +func (m *registryMetrics) recordQueryCacheMiss() { + if m == nil { + return + } + m.queryCacheMisses.Add(1) + m.registry.Counter("anyclaw_registry_query_cache_misses_total", "Total query embedding cache misses", nil).Inc() +} + +func (m *registryMetrics) recordVectorFallback(reason string) { + if m == nil { + return + } + m.registry.Counter("anyclaw_registry_vector_search_fallback_total", "Total vector-search fallback events", map[string]string{"reason": reason}).Inc() +} + +func (m *registryMetrics) recordVectorApplied() { + if m == nil { + return + } + m.registry.Counter("anyclaw_registry_vector_search_applied_total", "Total successful hybrid vector searches", nil).Inc() +} + +func (m *registryMetrics) newTimer(name, help string) *observability.Timer { + if m == nil { + return nil + } + return m.registry.NewTimer(name, help, nil) +} + +func (m *registryMetrics) recordEmbeddingJob(success bool) { + if m == nil { + return + } + m.registry.Counter("anyclaw_registry_embedding_jobs_total", "Total embedding jobs processed", map[string]string{"status": boolLabel(success)}).Inc() + if !success { + m.registry.Counter("anyclaw_registry_embedding_jobs_failed_total", "Total failed embedding jobs", nil).Inc() + } +} + +func (m *registryMetrics) updateRuntimeState(cacheSize int, stats embeddingAdminStats) { + if m == nil { + return + } + var mem stdruntime.MemStats + stdruntime.ReadMemStats(&mem) + m.registry.Gauge("anyclaw_memory_usage_bytes", "Memory usage in bytes", nil).Set(float64(mem.Alloc)) + m.registry.Gauge("anyclaw_goroutines", "Number of goroutines", nil).Set(float64(stdruntime.NumGoroutine())) + m.registry.Gauge("anyclaw_registry_query_cache_size", "Current query embedding cache size", nil).Set(float64(cacheSize)) + m.registry.Gauge("anyclaw_registry_embedding_jobs_queued", "Queued embedding jobs", nil).Set(float64(stats.QueuedJobs)) + m.registry.Gauge("anyclaw_registry_embedding_jobs_failed", "Failed embedding jobs", nil).Set(float64(stats.FailedJobs)) + m.registry.Gauge("anyclaw_registry_artifact_embeddings_ready", "Ready artifact embeddings", nil).Set(float64(stats.ReadyEmbeddings)) +} + +func (m *registryMetrics) prometheus(w http.ResponseWriter, r *http.Request) { + if m == nil { + http.Error(w, "metrics unavailable", http.StatusServiceUnavailable) + return + } + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(m.registry.PrometheusFormat())) +} + +func (m *registryMetrics) json(w http.ResponseWriter, r *http.Request) { + if m == nil { + http.Error(w, "metrics unavailable", http.StatusServiceUnavailable) + return + } + payload := m.registry.JSONFormat() + payload["registry_runtime"] = map[string]any{ + "query_cache_hits": m.queryCacheHits.Load(), + "query_cache_misses": m.queryCacheMisses.Load(), + "generated_at": time.Now().UTC().Format(time.RFC3339), + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(payload) +} + +func boolLabel(v bool) string { + if v { + return "success" + } + return "failed" +} diff --git a/pkg/marketregistry/search_filters.go b/pkg/marketregistry/search_filters.go new file mode 100644 index 00000000..e44b5964 --- /dev/null +++ b/pkg/marketregistry/search_filters.go @@ -0,0 +1,52 @@ +package marketregistry + +import ( + "strings" +) + +func appendStructuredFilterClauses(base []string, args []any, filter SearchFilter) ([]string, []any) { + base = append(base, `NOT EXISTS (SELECT 1 FROM quarantine q WHERE q.artifact_id = a.id)`) + if filter.Kind != "" { + base = append(base, `a.kind = ?`) + args = append(args, string(filter.Kind)) + } + if filter.Source != "" { + base = append(base, `LOWER(a.source) = LOWER(?)`) + args = append(args, filter.Source) + } + if filter.Risk != "" { + base = append(base, `LOWER(a.risk_level) = LOWER(?)`) + args = append(args, filter.Risk) + } + if filter.Trust != "" { + base = append(base, `LOWER(a.trust_level) = LOWER(?)`) + args = append(args, filter.Trust) + } + if filter.Tag != "" { + base = append(base, `EXISTS (SELECT 1 FROM json_each(a.tags_json) jt WHERE LOWER(TRIM(jt.value)) = LOWER(?))`) + args = append(args, strings.TrimSpace(filter.Tag)) + } + if filter.Permission != "" { + base = append(base, `EXISTS (SELECT 1 FROM json_each(a.permissions_json) jp WHERE LOWER(TRIM(jp.value)) = LOWER(?))`) + args = append(args, strings.TrimSpace(filter.Permission)) + } + if filter.Publisher != "" { + base = append(base, `LOWER(a.publisher) LIKE LOWER(?)`) + args = append(args, "%"+strings.TrimSpace(filter.Publisher)+"%") + } + if filter.OS != "" { + base = append(base, `( + NOT EXISTS (SELECT 1 FROM json_each(a.compatibility_json, '$.os')) + OR EXISTS (SELECT 1 FROM json_each(a.compatibility_json, '$.os') jo WHERE LOWER(TRIM(jo.value)) = LOWER(?)) + )`) + args = append(args, strings.TrimSpace(filter.OS)) + } + if filter.Arch != "" { + base = append(base, `( + NOT EXISTS (SELECT 1 FROM json_each(a.compatibility_json, '$.arch')) + OR EXISTS (SELECT 1 FROM json_each(a.compatibility_json, '$.arch') ja WHERE LOWER(TRIM(ja.value)) = LOWER(?)) + )`) + args = append(args, strings.TrimSpace(filter.Arch)) + } + return base, args +} diff --git a/pkg/marketregistry/search_lexical.go b/pkg/marketregistry/search_lexical.go new file mode 100644 index 00000000..16ecace2 --- /dev/null +++ b/pkg/marketregistry/search_lexical.go @@ -0,0 +1,256 @@ +package marketregistry + +import ( + "math" + "regexp" + "strings" + "time" +) + +var searchTokenPattern = regexp.MustCompile(`[\pL\pN_./:\-]+`) + +func buildArtifactSearchDocument(artifact Artifact) map[string]string { + return map[string]string{ + "artifact_id": strings.TrimSpace(artifact.ID), + "name": strings.TrimSpace(artifact.Name), + "summary": strings.TrimSpace(artifact.Summary), + "description_md": strings.TrimSpace(artifact.DescriptionMD), + "publisher": strings.TrimSpace(artifact.Publisher), + "kind": strings.TrimSpace(string(artifact.Kind)), + "tags_text": strings.Join(trimmedStrings(artifact.Tags), " "), + "hit_signals_text": strings.Join(trimmedStrings(artifact.HitSignals), " "), + "use_case_text": strings.TrimSpace(artifact.ManifestSummary["use_case"]), + "search_text": buildSearchText(artifact), + } +} + +func buildSearchText(artifact Artifact) string { + parts := []string{ + strings.TrimSpace(artifact.ID), + strings.TrimSpace(artifact.Name), + strings.TrimSpace(artifact.Summary), + strings.TrimSpace(artifact.DescriptionMD), + strings.TrimSpace(artifact.Publisher), + strings.TrimSpace(string(artifact.Kind)), + strings.Join(trimmedStrings(artifact.Tags), " "), + strings.Join(trimmedStrings(artifact.HitSignals), " "), + strings.TrimSpace(artifact.ManifestSummary["use_case"]), + } + return strings.Join(compactStrings(parts...), " ") +} + +func buildFTSQuery(query string) string { + tokens := searchTokenPattern.FindAllString(strings.ToLower(strings.TrimSpace(query)), -1) + if len(tokens) == 0 { + cleaned := escapeFTSString(strings.TrimSpace(query)) + if cleaned == "" { + return "" + } + return `"` + cleaned + `"` + } + parts := make([]string, 0, len(tokens)) + for _, token := range tokens { + token = escapeFTSString(token) + if token == "" { + continue + } + parts = append(parts, token+"*") + } + return strings.Join(parts, " AND ") +} + +func escapeFTSString(value string) string { + value = strings.TrimSpace(value) + value = strings.ReplaceAll(value, `"`, `""`) + return value +} + +func applySearchScores(candidate *searchCandidate, filter SearchFilter) { + if candidate == nil { + return + } + candidate.LexicalScore = lexicalScore(candidate.Artifact, filter.Query, candidate.lexicalRank) + candidate.TagScore = tagScore(candidate.Artifact, filter.Tag, filter.Query) + candidate.TrustScore = trustScore(candidate.TrustLevel) + candidate.FreshnessScore = freshnessScore(candidate.UpdatedAt) + candidate.RiskPenalty = riskPenalty(candidate.RiskLevel) + candidate.FinalScore = finalScore(candidate.Artifact) + candidate.Score = candidate.FinalScore + candidate.MatchSignals = buildMatchSignals(candidate.Artifact) +} + +func lexicalScore(artifact Artifact, query string, rank float64) float64 { + query = normalizeSearchText(query) + if query == "" { + return 0 + } + base := 0.0 + rank = math.Abs(rank) + if rank > 0 { + base = 1.0 / (1.0 + rank) + } + if rank == 0 { + base = 0.55 + } + idText := normalizeSearchText(artifact.ID) + nameText := normalizeSearchText(artifact.Name) + summaryText := normalizeSearchText(artifact.Summary) + switch { + case query == idText: + base += 0.5 + case query == nameText: + base += 0.4 + case strings.Contains(idText, query), strings.Contains(nameText, query): + base += 0.2 + case strings.Contains(summaryText, query): + base += 0.08 + } + return clampScore(base) +} + +func tagScore(artifact Artifact, tagFilter string, query string) float64 { + if tagFilter != "" && containsFold(artifact.Tags, tagFilter) { + return 1 + } + queryTokens := tokenSet(query) + if len(queryTokens) == 0 || len(artifact.Tags) == 0 { + return 0 + } + hits := 0 + for _, tag := range artifact.Tags { + if _, ok := queryTokens[normalizeSearchText(tag)]; ok { + hits++ + } + } + if hits == 0 { + return 0 + } + return clampScore(float64(hits) / float64(len(queryTokens))) +} + +func trustScore(level string) float64 { + switch strings.ToLower(strings.TrimSpace(level)) { + case "verified": + return 1 + case "trusted": + return 0.8 + case "community": + return 0.55 + default: + return 0.35 + } +} + +func freshnessScore(updatedAt string) float64 { + if strings.TrimSpace(updatedAt) == "" { + return 0 + } + ts, err := time.Parse(time.RFC3339, strings.TrimSpace(updatedAt)) + if err != nil { + return 0 + } + ageDays := time.Since(ts.UTC()).Hours() / 24 + switch { + case ageDays <= 30: + return 1 + case ageDays <= 90: + return 0.8 + case ageDays <= 180: + return 0.6 + case ageDays <= 365: + return 0.35 + default: + return 0.15 + } +} + +func riskPenalty(level string) float64 { + switch strings.ToLower(strings.TrimSpace(level)) { + case "high": + return 0.35 + case "medium": + return 0.15 + default: + return 0 + } +} + +func finalScore(artifact Artifact) float64 { + score := artifact.LexicalScore*0.70 + + artifact.TagScore*0.10 + + artifact.TrustScore*0.10 + + artifact.FreshnessScore*0.10 - + artifact.RiskPenalty + return clampScore(score) +} + +func clampScore(value float64) float64 { + switch { + case value < 0: + return 0 + case value > 1: + return 1 + default: + return value + } +} + +func normalizeSearchText(value string) string { + value = strings.ToLower(strings.TrimSpace(value)) + value = strings.Join(searchTokenPattern.FindAllString(value, -1), " ") + return strings.TrimSpace(value) +} + +func tokenSet(value string) map[string]struct{} { + items := searchTokenPattern.FindAllString(strings.ToLower(strings.TrimSpace(value)), -1) + if len(items) == 0 { + return nil + } + out := make(map[string]struct{}, len(items)) + for _, item := range items { + item = normalizeSearchText(item) + if item != "" { + out[item] = struct{}{} + } + } + return out +} + +func compactStrings(items ...string) []string { + out := make([]string, 0, len(items)) + for _, item := range items { + item = strings.TrimSpace(item) + if item != "" { + out = append(out, item) + } + } + return out +} + +func trimmedStrings(items []string) []string { + out := make([]string, 0, len(items)) + for _, item := range items { + item = strings.TrimSpace(item) + if item != "" { + out = append(out, item) + } + } + return out +} + +func buildMatchSignals(artifact Artifact) []string { + signals := make([]string, 0, 4) + if artifact.LexicalScore > 0 { + signals = append(signals, "lexical") + } + if artifact.TagScore > 0 { + signals = append(signals, "tag") + } + if artifact.TrustScore > 0 { + signals = append(signals, "trust") + } + if artifact.FreshnessScore > 0 { + signals = append(signals, "freshness") + } + return signals +} diff --git a/pkg/marketregistry/search_types.go b/pkg/marketregistry/search_types.go new file mode 100644 index 00000000..848dc2e6 --- /dev/null +++ b/pkg/marketregistry/search_types.go @@ -0,0 +1,11 @@ +package marketregistry + +type searchCandidate struct { + Artifact + lexicalRank float64 +} + +const ( + defaultSearchLimit = 50 + defaultSearchCandidateCap = 200 +) diff --git a/pkg/marketregistry/server.go b/pkg/marketregistry/server.go index a69d6fb3..753056e7 100644 --- a/pkg/marketregistry/server.go +++ b/pkg/marketregistry/server.go @@ -22,19 +22,26 @@ type ServerConfig struct { RequireAdminToken bool Seed bool MaxRequestBodyBytes int64 + Vector VectorConfig + TestArtifactEmbed embeddingProvider + TestQueryEmbed embeddingProvider } type Server struct { mux *http.ServeMux store *Store storage PackageStorage + vector *vectorRuntime + metrics *registryMetrics addr string adminToken string maxRequestBodyBytes int64 + workerCancel context.CancelFunc + workerDone chan struct{} } func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { - if strings.TrimSpace(cfg.AdminToken) == "" { + if strings.TrimSpace(cfg.AdminToken) == "" && cfg.RequireAdminToken { return nil, fmt.Errorf("admin token is required") } store, err := OpenStoreWithConfig(ctx, StoreConfig{DataDir: cfg.DataDir, Driver: cfg.DBDriver, DSN: cfg.DBDSN}) @@ -52,14 +59,30 @@ func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { return nil, err } } + var vectorRuntime *vectorRuntime + if cfg.TestArtifactEmbed != nil || cfg.TestQueryEmbed != nil { + vectorRuntime, err = newVectorRuntimeWithProviders(cfg.Vector, cfg.TestArtifactEmbed, cfg.TestQueryEmbed) + } else { + vectorRuntime, err = newVectorRuntime(cfg.Vector) + } + if err != nil { + _ = store.Close() + return nil, err + } + store.SetVectorRuntime(vectorRuntime) s := &Server{ mux: http.NewServeMux(), store: store, storage: storage, + vector: vectorRuntime, + metrics: newRegistryMetrics(), addr: cfg.Addr, adminToken: strings.TrimSpace(cfg.AdminToken), maxRequestBodyBytes: cfg.MaxRequestBodyBytes, } + if vectorRuntime != nil { + vectorRuntime.metrics = s.metrics + } if s.addr == "" { s.addr = ":8791" } @@ -67,6 +90,15 @@ func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) { s.maxRequestBodyBytes = 2 << 20 } s.registerRoutes() + if s.vector != nil && s.vector.enabled() { + workerCtx, workerCancel := context.WithCancel(ctx) + s.workerCancel = workerCancel + s.workerDone = make(chan struct{}) + go func() { + defer close(s.workerDone) + s.runEmbeddingWorker(workerCtx) + }() + } return s, nil } @@ -74,6 +106,12 @@ func (s *Server) Close() error { if s == nil { return nil } + if s.workerCancel != nil { + s.workerCancel() + } + if s.workerDone != nil { + <-s.workerDone + } return s.store.Close() } @@ -102,6 +140,8 @@ func (s *Server) StartWithContext(ctx context.Context) error { func (s *Server) registerRoutes() { s.mux.HandleFunc("GET /v1/health", s.handleHealth) s.mux.HandleFunc("GET /v1/control-plane", s.handleControlPlane) + s.mux.HandleFunc("GET /metrics", s.handleMetricsPrometheus) + s.mux.HandleFunc("GET /metrics.json", s.handleMetricsJSON) s.mux.HandleFunc("GET /v1/sources", s.handleSources) s.mux.HandleFunc("GET /v1/artifacts", s.handleListArtifacts) s.mux.HandleFunc("GET /v1/artifacts/{id}", s.handleArtifactDetail) @@ -118,10 +158,28 @@ func (s *Server) registerRoutes() { s.mux.HandleFunc("POST /v1/artifacts/{id}/unquarantine", s.handleUnquarantine) s.mux.HandleFunc("GET /v1/admin/audit", s.handleAdminAudit) s.mux.HandleFunc("GET /v1/admin/downloads", s.handleAdminDownloads) + s.mux.HandleFunc("GET /v1/admin/embedding-jobs", s.handleAdminEmbeddingJobs) + s.mux.HandleFunc("GET /v1/admin/embeddings", s.handleAdminEmbeddings) + s.mux.HandleFunc("GET /v1/admin/embeddings/{id}", s.handleAdminEmbeddingDetail) } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { - writeData(w, http.StatusOK, map[string]string{"status": "ok", "service": "anyclaw-registry"}, 0) + stats, _ := s.store.EmbeddingAdminStats(r.Context()) + cacheSize := 0 + if s.vector != nil && s.vector.queryCache != nil { + cacheSize = s.vector.queryCache.Len() + } + if s.metrics != nil { + s.metrics.updateRuntimeState(cacheSize, stats) + } + writeData(w, http.StatusOK, map[string]any{ + "status": "ok", + "service": "anyclaw-registry", + "vector_enabled": s.vector != nil && s.vector.enabled(), + "query_cache_size": cacheSize, + "embedding_ready_count": stats.ReadyEmbeddings, + "embedding_failed_jobs": stats.FailedJobs, + }, 0) } func (s *Server) handleControlPlane(w http.ResponseWriter, r *http.Request) { @@ -144,11 +202,44 @@ func (s *Server) handleControlPlane(w http.ResponseWriter, r *http.Request) { "POST /v1/admin/tokens/{id}/revoke", "GET /v1/admin/audit", "GET /v1/admin/downloads", + "GET /v1/admin/embedding-jobs", + "GET /v1/admin/embeddings", + "GET /v1/admin/embeddings/{id}", + "GET /metrics", + "GET /metrics.json", }, "admin_auth": "bearer", }, 0) } +func (s *Server) handleMetricsPrometheus(w http.ResponseWriter, r *http.Request) { + stats, _ := s.store.EmbeddingAdminStats(r.Context()) + cacheSize := 0 + if s.vector != nil && s.vector.queryCache != nil { + cacheSize = s.vector.queryCache.Len() + } + if s.metrics != nil { + s.metrics.updateRuntimeState(cacheSize, stats) + s.metrics.prometheus(w, r) + return + } + http.Error(w, "metrics unavailable", http.StatusServiceUnavailable) +} + +func (s *Server) handleMetricsJSON(w http.ResponseWriter, r *http.Request) { + stats, _ := s.store.EmbeddingAdminStats(r.Context()) + cacheSize := 0 + if s.vector != nil && s.vector.queryCache != nil { + cacheSize = s.vector.queryCache.Len() + } + if s.metrics != nil { + s.metrics.updateRuntimeState(cacheSize, stats) + s.metrics.json(w, r) + return + } + http.Error(w, "metrics unavailable", http.StatusServiceUnavailable) +} + func (s *Server) handleSources(w http.ResponseWriter, r *http.Request) { writeData(w, http.StatusOK, map[string]any{ "items": []map[string]any{ @@ -429,6 +520,49 @@ func (s *Server) handleAdminDownloads(w http.ResponseWriter, r *http.Request) { writeData(w, http.StatusOK, result, len(result.Items)) } +func (s *Server) handleAdminEmbeddingJobs(w http.ResponseWriter, r *http.Request) { + if !s.authorizeAdmin(r) { + writeError(w, http.StatusUnauthorized, "unauthorized", "admin token required", "") + return + } + result, err := s.store.ListArtifactEmbeddingJobs(r.Context(), strings.TrimSpace(r.URL.Query().Get("status")), parseInt(r.URL.Query().Get("limit"), 100)) + if err != nil { + writeError(w, http.StatusInternalServerError, "embedding_jobs_failed", "failed to list embedding jobs", err.Error()) + return + } + writeData(w, http.StatusOK, result, len(result.Items)) +} + +func (s *Server) handleAdminEmbeddings(w http.ResponseWriter, r *http.Request) { + if !s.authorizeAdmin(r) { + writeError(w, http.StatusUnauthorized, "unauthorized", "admin token required", "") + return + } + result, err := s.store.ListArtifactEmbeddings(r.Context(), strings.TrimSpace(r.URL.Query().Get("status")), parseInt(r.URL.Query().Get("limit"), 100)) + if err != nil { + writeError(w, http.StatusInternalServerError, "embeddings_failed", "failed to list embeddings", err.Error()) + return + } + writeData(w, http.StatusOK, result, len(result.Items)) +} + +func (s *Server) handleAdminEmbeddingDetail(w http.ResponseWriter, r *http.Request) { + if !s.authorizeAdmin(r) { + writeError(w, http.StatusUnauthorized, "unauthorized", "admin token required", "") + return + } + item, err := s.store.loadArtifactEmbedding(r.Context(), r.PathValue("id")) + if err != nil { + writeError(w, http.StatusInternalServerError, "embedding_failed", "failed to load embedding", err.Error()) + return + } + if item == nil { + writeError(w, http.StatusNotFound, "not_found", "embedding not found", "") + return + } + writeData(w, http.StatusOK, item, 1) +} + func (s *Server) decodeJSON(w http.ResponseWriter, r *http.Request, dst any) bool { r.Body = http.MaxBytesReader(w, r.Body, s.maxRequestBodyBytes) dec := json.NewDecoder(r.Body) @@ -449,6 +583,7 @@ func filterFromQuery(r *http.Request) SearchFilter { Kind: ArtifactKind(strings.TrimSpace(q.Get("kind"))), Source: strings.TrimSpace(q.Get("source")), Query: strings.TrimSpace(q.Get("q")), + SearchMode: normalizeSearchMode(q.Get("search_mode")), Risk: strings.TrimSpace(q.Get("risk")), Trust: strings.TrimSpace(q.Get("trust")), Tag: strings.TrimSpace(q.Get("tag")), @@ -515,15 +650,34 @@ func writeStoreError(w http.ResponseWriter, err error) { } func writeData(w http.ResponseWriter, status int, data any, count int) { + meta := ResponseMeta{ + ProtocolVersion: defaultProtocolVersion, + Count: count, + } + if result, ok := data.(ListResult); ok && result.RetrievalMeta != nil { + meta.SearchMode = result.RetrievalMeta.SearchMode + meta.VectorFallbackReason = result.RetrievalMeta.VectorFallbackReason + meta.CandidateCounts = result.RetrievalMeta.CandidateCounts + vectorApplied := result.RetrievalMeta.VectorApplied + meta.VectorApplied = &vectorApplied + } writeJSON(w, status, map[string]any{ "data": data, - "meta": ResponseMeta{ - ProtocolVersion: defaultProtocolVersion, - Count: count, - }, + "meta": meta, }) } +func normalizeSearchMode(value string) SearchMode { + switch SearchMode(strings.ToLower(strings.TrimSpace(value))) { + case SearchModeLexical: + return SearchModeLexical + case SearchModeHybrid: + return SearchModeHybrid + default: + return SearchModeAuto + } +} + func writeError(w http.ResponseWriter, status int, code, message, detail string) { var payload ErrorResponse payload.Error.Code = code diff --git a/pkg/marketregistry/server_test.go b/pkg/marketregistry/server_test.go index 29e00d27..94f682f1 100644 --- a/pkg/marketregistry/server_test.go +++ b/pkg/marketregistry/server_test.go @@ -10,8 +10,10 @@ import ( "io" "net/http" "net/http/httptest" + "reflect" "strings" "testing" + "time" ) func TestServerSeededCatalogRoutes(t *testing.T) { @@ -103,6 +105,28 @@ func TestServerSearchAndErrors(t *testing.T) { } } +func TestServerSearchUsesLexicalIndexAndScores(t *testing.T) { + server := newTestServer(t) + + var search struct { + Data ListResult `json:"data"` + } + doJSON(t, server, http.MethodPost, "/v1/search", strings.NewReader(`{"kind":"skill","q":"relea"}`), http.StatusOK, &search) + if search.Data.Total != 1 || search.Data.Items[0].ID != "cloud.skill.release-notes" { + t.Fatalf("unexpected lexical search result: %#v", search.Data) + } + item := search.Data.Items[0] + if item.FinalScore <= 0 || item.LexicalScore <= 0 { + t.Fatalf("expected score breakdown in search result: %#v", item) + } + if item.Score != item.FinalScore { + t.Fatalf("expected score to mirror final_score, got score=%v final=%v", item.Score, item.FinalScore) + } + if len(item.MatchSignals) == 0 { + t.Fatalf("expected match signals in search result: %#v", item) + } +} + func TestServerSearchFiltersCombine(t *testing.T) { server := newTestServer(t) @@ -123,6 +147,330 @@ func TestServerSearchFiltersCombine(t *testing.T) { } } +func TestServerListSupportsStructuredFilteringWithoutQuery(t *testing.T) { + server, err := NewServer(context.Background(), ServerConfig{DataDir: t.TempDir()}) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + now := "2026-05-08T10:00:00Z" + for _, artifact := range []Artifact{ + { + ID: "cloud.skill.alpha-writer", + Kind: ArtifactKindSkill, + Name: "Alpha Writer", + Summary: "Writes release notes.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + Compatibility: Compatibility{OS: []string{"windows"}, Arch: []string{"amd64"}}, + Tags: []string{"writer", "notes"}, + UpdatedAt: now, + }, + { + ID: "cloud.skill.beta-audit", + Kind: ArtifactKindSkill, + Name: "Beta Audit", + Summary: "Audits permissions.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "Community Labs", + RiskLevel: "medium", + TrustLevel: "community", + Permissions: []string{"process.exec"}, + Compatibility: Compatibility{OS: []string{"linux"}, Arch: []string{"arm64"}}, + Tags: []string{"audit"}, + UpdatedAt: now, + }, + } { + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatalf("upsert artifact %s: %v", artifact.ID, err) + } + } + + var list struct { + Data ListResult `json:"data"` + } + doJSON(t, server, http.MethodGet, "/v1/artifacts?kind=skill&tag=writer&permission=fs.read&publisher=AnyClaw%20Labs&os=windows&arch=amd64", nil, http.StatusOK, &list) + if list.Data.Total != 1 || list.Data.Items[0].ID != "cloud.skill.alpha-writer" { + t.Fatalf("unexpected structured filter result without query: %#v", list.Data) + } +} + +func TestServerListSortsByUpdatedAndRiskAwareScore(t *testing.T) { + server, err := NewServer(context.Background(), ServerConfig{DataDir: t.TempDir()}) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + artifacts := []Artifact{ + { + ID: "cloud.skill.safe-writer", + Kind: ArtifactKindSkill, + Name: "Safe Writer", + Summary: "Writes summaries safely.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + Tags: []string{"writer"}, + UpdatedAt: "2026-05-01T10:00:00Z", + }, + { + ID: "cloud.skill.risky-writer", + Kind: ArtifactKindSkill, + Name: "Risky Writer", + Summary: "Writes summaries with elevated permissions.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "high", + TrustLevel: "verified", + Permissions: []string{"process.exec"}, + Tags: []string{"writer"}, + UpdatedAt: "2026-05-01T10:00:00Z", + }, + { + ID: "cloud.skill.new-writer", + Kind: ArtifactKindSkill, + Name: "New Writer", + Summary: "Newest writing helper.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + Tags: []string{"writer"}, + UpdatedAt: "2026-05-08T10:00:00Z", + }, + } + for _, artifact := range artifacts { + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatalf("upsert artifact %s: %v", artifact.ID, err) + } + } + + var scoreList struct { + Data ListResult `json:"data"` + } + doJSON(t, server, http.MethodGet, "/v1/artifacts?kind=skill&tag=writer", nil, http.StatusOK, &scoreList) + if scoreList.Data.Total != 3 { + t.Fatalf("expected 3 score-sorted items, got %#v", scoreList.Data) + } + if scoreList.Data.Items[0].ID != "cloud.skill.new-writer" || scoreList.Data.Items[2].ID != "cloud.skill.risky-writer" { + t.Fatalf("unexpected default score order: %#v", scoreList.Data.Items) + } + + var updatedList struct { + Data ListResult `json:"data"` + } + doJSON(t, server, http.MethodGet, "/v1/artifacts?kind=skill&tag=writer&sort=updated", nil, http.StatusOK, &updatedList) + if updatedList.Data.Total != 3 { + t.Fatalf("expected 3 updated-sorted items, got %#v", updatedList.Data) + } + if updatedList.Data.Items[0].ID != "cloud.skill.new-writer" { + t.Fatalf("expected newest item first for updated sort, got %#v", updatedList.Data.Items) + } +} + +func TestServerSearchModeLexicalMeta(t *testing.T) { + server := newTestServer(t) + + var search struct { + Data ListResult `json:"data"` + Meta ResponseMeta `json:"meta"` + } + doJSON(t, server, http.MethodGet, "/v1/artifacts?kind=skill&q=release&search_mode=lexical", nil, http.StatusOK, &search) + if search.Meta.SearchMode != SearchModeLexical { + t.Fatalf("expected lexical meta search mode, got %#v", search.Meta) + } + if search.Meta.VectorApplied == nil || *search.Meta.VectorApplied { + t.Fatalf("expected vector_applied=false, got %#v", search.Meta) + } + if search.Data.RetrievalMeta == nil || search.Data.RetrievalMeta.SearchMode != SearchModeLexical { + t.Fatalf("expected retrieval meta in data, got %#v", search.Data.RetrievalMeta) + } +} + +func TestServerSearchModeHybridFallsBackOpen(t *testing.T) { + server := newTestServer(t) + + var search struct { + Data ListResult `json:"data"` + Meta ResponseMeta `json:"meta"` + } + doJSON(t, server, http.MethodPost, "/v1/search", strings.NewReader(`{"kind":"skill","q":"release","search_mode":"hybrid"}`), http.StatusOK, &search) + if search.Data.Total == 0 { + t.Fatalf("expected lexical fallback results, got %#v", search.Data) + } + if search.Meta.SearchMode != SearchModeLexicalFallback { + t.Fatalf("expected lexical_fallback meta, got %#v", search.Meta) + } + if search.Meta.VectorApplied == nil || *search.Meta.VectorApplied { + t.Fatalf("expected vector_applied=false during fallback, got %#v", search.Meta) + } + if search.Meta.VectorFallbackReason != "disabled" { + t.Fatalf("expected disabled fallback reason, got %#v", search.Meta) + } + if search.Data.RetrievalMeta == nil || search.Data.RetrievalMeta.SearchMode != SearchModeLexicalFallback { + t.Fatalf("expected fallback retrieval meta in data, got %#v", search.Data.RetrievalMeta) + } +} + +func TestServerHybridSearchUsesVectorAndCanExpandBeyondLexicalHits(t *testing.T) { + artifactProvider := &mockEmbeddingProvider{ + name: "mock-artifact", + dim: 3, + embeddings: map[string][]float32{ + buildArtifactEmbeddingText(Artifact{ + Kind: ArtifactKindSkill, + Name: "Alpha Writer", + Summary: "Semantic authoring helper.", + Publisher: "AnyClaw Labs", + Permissions: []string{"fs.read"}, + ManifestSummary: map[string]string{"use_case": "semantic writing"}, + }): {0.99, 0.01, 0}, + buildArtifactEmbeddingText(Artifact{ + Kind: ArtifactKindSkill, + Name: "Beta Release Tool", + Summary: "Release note generator.", + Publisher: "AnyClaw Labs", + Permissions: []string{"fs.read"}, + ManifestSummary: map[string]string{"use_case": "release notes"}, + }): {0.20, 0.80, 0}, + }, + } + queryProvider := &mockEmbeddingProvider{ + name: "mock-query", + dim: 3, + embeddings: map[string][]float32{ + "semantic author": {1, 0, 0}, + }, + } + server, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + Vector: VectorConfig{ + Enabled: true, + FailOpen: true, + Model: "mock-doc", + QueryModel: "mock-query", + HybridTopK: 10, + HybridCandidate: 20, + }, + TestArtifactEmbed: artifactProvider, + TestQueryEmbed: queryProvider, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + alpha := Artifact{ + ID: "cloud.skill.alpha-writer", + Kind: ArtifactKindSkill, + Name: "Alpha Writer", + Summary: "Semantic authoring helper.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + Tags: []string{"semantic"}, + UpdatedAt: "2026-05-08T10:00:00Z", + ManifestSummary: map[string]string{"use_case": "semantic writing"}, + } + beta := Artifact{ + ID: "cloud.skill.beta-release", + Kind: ArtifactKindSkill, + Name: "Beta Release Tool", + Summary: "Release note generator.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + Tags: []string{"release"}, + UpdatedAt: "2026-05-08T10:00:00Z", + ManifestSummary: map[string]string{"use_case": "release notes"}, + } + for _, artifact := range []Artifact{alpha, beta} { + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatalf("upsert artifact %s: %v", artifact.ID, err) + } + } + server.processOneEmbeddingJob(context.Background()) + server.processOneEmbeddingJob(context.Background()) + + var search struct { + Data ListResult `json:"data"` + Meta ResponseMeta `json:"meta"` + } + doJSON(t, server, http.MethodPost, "/v1/search", strings.NewReader(`{"kind":"skill","q":"semantic author","search_mode":"hybrid"}`), http.StatusOK, &search) + if search.Meta.SearchMode != SearchModeHybrid { + t.Fatalf("expected hybrid mode, got %#v", search.Meta) + } + if search.Meta.VectorApplied == nil || !*search.Meta.VectorApplied { + t.Fatalf("expected vector_applied=true, got %#v", search.Meta) + } + if search.Data.Total < 1 || len(search.Data.Items) < 1 { + t.Fatalf("expected hybrid results, got %#v", search.Data) + } + if search.Data.Items[0].ID != "cloud.skill.alpha-writer" { + t.Fatalf("expected semantic vector hit first, got %#v", search.Data.Items) + } + if search.Data.Items[0].VectorScore == nil || *search.Data.Items[0].VectorScore <= 0 { + t.Fatalf("expected vector score on first item, got %#v", search.Data.Items[0]) + } +} + +func TestServerPublishQueuesEmbeddingJob(t *testing.T) { + server, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + Vector: VectorConfig{ + Enabled: true, + FailOpen: true, + Model: "mock-doc", + }, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + artifact := Artifact{ + ID: "cloud.skill.embedding-test", + Kind: ArtifactKindSkill, + Name: "Embedding Test", + Summary: "Queues embedding jobs.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + } + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatal(err) + } + job, err := server.store.claimNextArtifactEmbeddingJob(context.Background()) + if err != nil { + t.Fatal(err) + } + if job == nil || job.ArtifactID != artifact.ID { + t.Fatalf("expected queued embedding job, got %#v", job) + } +} + func TestServerAdminTokenPublishQuarantineAndStats(t *testing.T) { server, err := NewServer(context.Background(), ServerConfig{ DataDir: t.TempDir(), @@ -212,15 +560,25 @@ func TestServerAdminTokenPublishQuarantineAndStats(t *testing.T) { func TestServerRequiresAdminToken(t *testing.T) { _, err := NewServer(context.Background(), ServerConfig{ - DataDir: t.TempDir(), + DataDir: t.TempDir(), + RequireAdminToken: true, }) if err == nil || !strings.Contains(err.Error(), "admin token is required") { t.Fatalf("expected missing admin token error, got %v", err) } + serverWithoutAdmin, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + }) + if err != nil { + t.Fatalf("expected registry to start without admin token when require flag is false, got %v", err) + } + t.Cleanup(func() { _ = serverWithoutAdmin.Close() }) + server, err := NewServer(context.Background(), ServerConfig{ - DataDir: t.TempDir(), - AdminToken: "admin-secret", + DataDir: t.TempDir(), + AdminToken: "admin-secret", + RequireAdminToken: true, }) if err != nil { t.Fatal(err) @@ -393,3 +751,230 @@ func assertZipContains(t *testing.T, data []byte, name string) { } t.Fatalf("zip did not contain %s", name) } + +type mockEmbeddingProvider struct { + name string + dim int + shouldFail bool + embeddings map[string][]float32 + callCount int +} + +func (m *mockEmbeddingProvider) Embed(ctx context.Context, text string) ([]float32, error) { + m.callCount++ + if m.shouldFail { + return nil, context.Canceled + } + if vectorData, ok := m.embeddings[text]; ok { + return append([]float32(nil), vectorData...), nil + } + result := make([]float32, m.dim) + for i := range result { + result[i] = float32(len(text) + i + 1) + } + return result, nil +} + +func (m *mockEmbeddingProvider) EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) { + out := make([][]float32, 0, len(texts)) + for _, text := range texts { + item, err := m.Embed(ctx, text) + if err != nil { + return nil, err + } + out = append(out, item) + } + return out, nil +} + +func (m *mockEmbeddingProvider) Name() string { + return m.name +} + +func (m *mockEmbeddingProvider) Dimension() int { + return m.dim +} + +func TestWorkerCreatesArtifactEmbeddingRecord(t *testing.T) { + provider := &mockEmbeddingProvider{ + name: "mock", + dim: 3, + } + server, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + Vector: VectorConfig{ + Enabled: true, + Model: "mock-doc", + }, + TestArtifactEmbed: provider, + TestQueryEmbed: provider, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + artifact := Artifact{ + ID: "cloud.skill.worker-test", + Kind: ArtifactKindSkill, + Name: "Worker Test", + Summary: "Creates embeddings asynchronously.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + } + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatal(err) + } + server.processOneEmbeddingJob(context.Background()) + item, err := server.store.loadArtifactEmbedding(context.Background(), artifact.ID) + if err != nil { + t.Fatal(err) + } + if item == nil || item.Status != artifactEmbeddingStatusReady { + t.Fatalf("expected ready embedding record, got %#v", item) + } + wantText := buildArtifactEmbeddingText(artifact) + if item.EmbeddingText != wantText { + t.Fatalf("unexpected embedding text: got %q want %q", item.EmbeddingText, wantText) + } + if len(item.Vector) != provider.dim { + t.Fatalf("unexpected vector dimension: %#v", item) + } + if item.Model != "mock-doc" { + t.Fatalf("unexpected model: %#v", item) + } + if reflect.DeepEqual(item.Vector, []float32{}) { + t.Fatalf("expected non-empty vector") + } +} + +func TestServerAdminEmbeddingEndpointsAndMetrics(t *testing.T) { + provider := &mockEmbeddingProvider{name: "mock", dim: 3} + server, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + AdminToken: "admin-secret", + Vector: VectorConfig{ + Enabled: true, + Model: "mock-doc", + }, + TestArtifactEmbed: provider, + TestQueryEmbed: provider, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + artifact := Artifact{ + ID: "cloud.skill.admin-embed", + Kind: ArtifactKindSkill, + Name: "Admin Embed", + Summary: "Visible from admin endpoints.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + } + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatal(err) + } + server.processOneEmbeddingJob(context.Background()) + + var jobs struct { + Data ArtifactEmbeddingJobList `json:"data"` + } + doJSONWithAuth(t, server, http.MethodGet, "/v1/admin/embedding-jobs", nil, "admin-secret", http.StatusOK, &jobs) + if jobs.Data.Total == 0 || jobs.Data.Items[0].ArtifactID != artifact.ID { + t.Fatalf("unexpected embedding jobs: %#v", jobs.Data) + } + + var embeddings struct { + Data ArtifactEmbeddingList `json:"data"` + } + doJSONWithAuth(t, server, http.MethodGet, "/v1/admin/embeddings?status=ready", nil, "admin-secret", http.StatusOK, &embeddings) + if embeddings.Data.Total == 0 || embeddings.Data.Items[0].ArtifactID != artifact.ID { + t.Fatalf("unexpected embeddings list: %#v", embeddings.Data) + } + + var detail struct { + Data ArtifactEmbedding `json:"data"` + } + doJSONWithAuth(t, server, http.MethodGet, "/v1/admin/embeddings/"+artifact.ID, nil, "admin-secret", http.StatusOK, &detail) + if detail.Data.ArtifactID != artifact.ID || detail.Data.Status != artifactEmbeddingStatusReady { + t.Fatalf("unexpected embedding detail: %#v", detail.Data) + } + + req := httptest.NewRequest(http.MethodGet, "/metrics.json", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected metrics.json 200, got %d body=%s", rec.Code, rec.Body.String()) + } + var metricsPayload map[string]any + if err := json.NewDecoder(rec.Body).Decode(&metricsPayload); err != nil { + t.Fatalf("decode metrics json: %v", err) + } + if _, ok := metricsPayload["gauges"]; !ok { + t.Fatalf("expected gauges in metrics payload, got %#v", metricsPayload) + } +} + +func TestServerHybridQueryCacheAvoidsRepeatedEmbeddingCalls(t *testing.T) { + artifactProvider := &mockEmbeddingProvider{name: "doc", dim: 3} + queryProvider := &mockEmbeddingProvider{ + name: "query", + dim: 3, + embeddings: map[string][]float32{ + "semantic author": {1, 0, 0}, + }, + } + server, err := NewServer(context.Background(), ServerConfig{ + DataDir: t.TempDir(), + Vector: VectorConfig{ + Enabled: true, + Model: "mock-doc", + QueryModel: "mock-query", + QueryCacheTTL: time.Minute, + QueryCacheSize: 16, + }, + TestArtifactEmbed: artifactProvider, + TestQueryEmbed: queryProvider, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = server.Close() }) + + artifact := Artifact{ + ID: "cloud.skill.cache-target", + Kind: ArtifactKindSkill, + Name: "Semantic Author", + Summary: "Semantic authoring helper.", + LatestVersion: "1.0.0", + Source: defaultRegistrySourceID, + Publisher: "AnyClaw Labs", + RiskLevel: "low", + TrustLevel: "verified", + Permissions: []string{"fs.read"}, + ManifestSummary: map[string]string{"use_case": "semantic writing"}, + } + if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { + t.Fatal(err) + } + server.processOneEmbeddingJob(context.Background()) + + var result struct { + Data ListResult `json:"data"` + } + doJSON(t, server, http.MethodPost, "/v1/search", strings.NewReader(`{"kind":"skill","q":"semantic author","search_mode":"hybrid"}`), http.StatusOK, &result) + doJSON(t, server, http.MethodPost, "/v1/search", strings.NewReader(`{"kind":"skill","q":"semantic author","search_mode":"hybrid"}`), http.StatusOK, &result) + if queryProvider.callCount != 1 { + t.Fatalf("expected query provider to be called once due to cache, got %d", queryProvider.callCount) + } +} diff --git a/pkg/marketregistry/store.go b/pkg/marketregistry/store.go index 736067d2..679506ef 100644 --- a/pkg/marketregistry/store.go +++ b/pkg/marketregistry/store.go @@ -29,7 +29,14 @@ var ( ) type Store struct { - db *sql.DB + db *sql.DB + vector *vectorRuntime +} + +type searchExecution struct { + items []searchCandidate + total int + meta RetrievalMeta } func OpenStore(ctx context.Context, dataDir string) (*Store, error) { @@ -77,6 +84,13 @@ func (s *Store) Close() error { return s.db.Close() } +func (s *Store) SetVectorRuntime(runtime *vectorRuntime) { + if s == nil { + return + } + s.vector = runtime +} + func (s *Store) migrate(ctx context.Context) error { statements := []string{ `CREATE TABLE IF NOT EXISTS artifacts ( @@ -150,6 +164,48 @@ func (s *Store) migrate(ctx context.Context) error { detail_json TEXT NOT NULL, created_at TEXT NOT NULL )`, + `CREATE TABLE IF NOT EXISTS artifact_embeddings ( + artifact_id TEXT PRIMARY KEY, + model TEXT NOT NULL DEFAULT '', + vector_json TEXT NOT NULL DEFAULT '[]', + vector_dim INTEGER NOT NULL DEFAULT 0, + embedding_text TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'pending', + error TEXT NOT NULL DEFAULT '', + updated_at TEXT NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS artifact_embedding_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + artifact_id TEXT NOT NULL, + job_type TEXT NOT NULL, + status TEXT NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + )`, + `CREATE VIRTUAL TABLE IF NOT EXISTS artifacts_fts USING fts5( + artifact_id UNINDEXED, + name, + summary, + description_md, + publisher, + kind, + tags_text, + hit_signals_text, + use_case_text, + search_text, + tokenize = 'unicode61' + )`, + `CREATE INDEX IF NOT EXISTS idx_artifacts_kind ON artifacts(kind)`, + `CREATE INDEX IF NOT EXISTS idx_artifacts_source ON artifacts(source)`, + `CREATE INDEX IF NOT EXISTS idx_artifacts_risk ON artifacts(risk_level)`, + `CREATE INDEX IF NOT EXISTS idx_artifacts_trust ON artifacts(trust_level)`, + `CREATE INDEX IF NOT EXISTS idx_artifacts_updated ON artifacts(updated_at DESC)`, + `CREATE INDEX IF NOT EXISTS idx_quarantine_artifact_id ON quarantine(artifact_id)`, + `CREATE UNIQUE INDEX IF NOT EXISTS idx_artifact_embedding_jobs_unique ON artifact_embedding_jobs(artifact_id, job_type)`, + `CREATE INDEX IF NOT EXISTS idx_artifact_embedding_jobs_status ON artifact_embedding_jobs(status, updated_at, id)`, + `CREATE INDEX IF NOT EXISTS idx_artifact_embeddings_status ON artifact_embeddings(status, updated_at)`, } for _, stmt := range statements { if _, err := s.db.ExecContext(ctx, stmt); err != nil { @@ -157,7 +213,7 @@ func (s *Store) migrate(ctx context.Context) error { } } _, _ = s.db.ExecContext(ctx, `ALTER TABLE artifact_versions ADD COLUMN signature TEXT NOT NULL DEFAULT ''`) - return nil + return s.rebuildArtifactSearchIndex(ctx) } func (s *Store) CountArtifacts(ctx context.Context) (int, error) { @@ -186,6 +242,15 @@ func (s *Store) DeleteArtifact(ctx context.Context, artifactID string) (Artifact if _, err = tx.ExecContext(ctx, `DELETE FROM artifact_versions WHERE artifact_id = ?`, artifactID); err != nil { return ArtifactDeletion{}, err } + if _, err = tx.ExecContext(ctx, `DELETE FROM artifact_embedding_jobs WHERE artifact_id = ?`, artifactID); err != nil { + return ArtifactDeletion{}, err + } + if _, err = tx.ExecContext(ctx, `DELETE FROM artifact_embeddings WHERE artifact_id = ?`, artifactID); err != nil { + return ArtifactDeletion{}, err + } + if _, err = tx.ExecContext(ctx, `DELETE FROM artifacts_fts WHERE artifact_id = ?`, artifactID); err != nil { + return ArtifactDeletion{}, err + } if _, err = tx.ExecContext(ctx, `DELETE FROM quarantine WHERE artifact_id = ?`, artifactID); err != nil { return ArtifactDeletion{}, err } @@ -348,58 +413,32 @@ func (s *Store) UpsertArtifact(ctx context.Context, artifact Artifact, versions return err } } + if err := s.upsertArtifactSearchDocTx(ctx, tx, artifact); err != nil { + return err + } + if err := s.queueArtifactEmbeddingJobTx(ctx, tx, artifact); err != nil { + return err + } err = tx.Commit() return err } func (s *Store) List(ctx context.Context, filter SearchFilter) (ListResult, error) { - if filter.Limit <= 0 { - filter.Limit = 50 - } - if filter.Offset < 0 { - filter.Offset = 0 - } - - rows, err := s.db.QueryContext(ctx, `SELECT - id, kind, name, summary, description_md, latest_version, source, publisher, - risk_level, trust_level, permissions_json, compatibility_json, dependencies_json, - icon_url, tags_json, hit_signals_json, score, updated_at, manifest_summary_json - FROM artifacts`) + normalized := normalizeSearchFilter(filter) + exec, err := s.search(ctx, normalized) if err != nil { return ListResult{}, err } - defer rows.Close() - - var all []Artifact - for rows.Next() { - artifact, err := scanArtifact(rows) - if err != nil { - return ListResult{}, err - } - if matchesArtifact(artifact, filter) { - all = append(all, artifact) - } - } - if err := rows.Err(); err != nil { - return ListResult{}, err - } - - sortArtifacts(all, filter.Sort) - - total := len(all) - if filter.Offset >= len(all) { - all = nil - } else { - all = all[filter.Offset:] - if len(all) > filter.Limit { - all = all[:filter.Limit] - } + items := make([]Artifact, 0, len(exec.items)) + for _, candidate := range exec.items { + items = append(items, candidate.Artifact) } return ListResult{ - Items: all, - Total: total, - Limit: filter.Limit, - Offset: filter.Offset, + Items: items, + Total: exec.total, + Limit: normalized.Limit, + Offset: normalized.Offset, + RetrievalMeta: cloneRetrievalMeta(exec.meta), }, nil } @@ -760,78 +799,386 @@ func scanVersion(row scanner) (ArtifactVersion, error) { return version, nil } -func matchesArtifact(artifact Artifact, filter SearchFilter) bool { - if filter.Kind != "" && artifact.Kind != filter.Kind { - return false +func sortArtifacts(items []Artifact, mode string) { + sortMode := strings.ToLower(strings.TrimSpace(mode)) + sort.SliceStable(items, func(i, j int) bool { + switch sortMode { + case "updated", "updated_desc": + if items[i].UpdatedAt == items[j].UpdatedAt { + return fallbackArtifactLess(items[i], items[j]) + } + return items[i].UpdatedAt > items[j].UpdatedAt + case "name", "name_asc": + left := strings.ToLower(items[i].Name) + right := strings.ToLower(items[j].Name) + if left == right { + return fallbackArtifactLess(items[i], items[j]) + } + return left < right + default: + leftScore := scoreValue(items[i]) + rightScore := scoreValue(items[j]) + if leftScore == rightScore { + if items[i].UpdatedAt == items[j].UpdatedAt { + return fallbackArtifactLess(items[i], items[j]) + } + return items[i].UpdatedAt > items[j].UpdatedAt + } + return leftScore > rightScore + } + }) +} + +func (s *Store) search(ctx context.Context, filter SearchFilter) (searchExecution, error) { + candidates, total, err := s.searchCandidates(ctx, filter) + if err != nil { + return searchExecution{}, err + } + meta := RetrievalMeta{ + SearchMode: resolvedSearchMode(filter, false), + VectorApplied: false, + CandidateCounts: &CandidateCounts{ + Lexical: total, + Vector: 0, + Merged: total, + }, + } + if shouldAttemptHybrid(filter) { + merged, mergedTotal, hybridMeta := s.searchHybrid(ctx, filter, candidates, total) + if hybridMeta.VectorApplied { + candidates = merged + total = mergedTotal + meta = hybridMeta + } else { + meta = hybridMeta + } } - if filter.Source != "" && !strings.EqualFold(artifact.Source, filter.Source) { - return false + return searchExecution{ + items: paginateCandidates(candidates, filter.Offset, filter.Limit), + total: total, + meta: meta, + }, nil +} + +func (s *Store) searchCandidates(ctx context.Context, filter SearchFilter) ([]searchCandidate, int, error) { + if strings.TrimSpace(filter.Query) == "" { + return s.listStructuredCandidates(ctx, filter) } - if filter.Risk != "" && !strings.EqualFold(artifact.RiskLevel, filter.Risk) { - return false + return s.listLexicalCandidates(ctx, filter) +} + +func (s *Store) listStructuredCandidates(ctx context.Context, filter SearchFilter) ([]searchCandidate, int, error) { + where, args := appendStructuredFilterClauses(nil, nil, filter) + whereSQL := "" + if len(where) > 0 { + whereSQL = " WHERE " + strings.Join(where, " AND ") } - if filter.Trust != "" && !strings.EqualFold(artifact.TrustLevel, filter.Trust) { - return false + countQuery := `SELECT COUNT(*) FROM artifacts a` + whereSQL + var total int + if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + return nil, 0, err } - if filter.Tag != "" && !containsFold(artifact.Tags, filter.Tag) { - return false + if total == 0 || filter.Offset >= total { + return nil, total, nil } - if filter.Permission != "" && !containsFold(artifact.Permissions, filter.Permission) { - return false + rows, err := s.db.QueryContext(ctx, `SELECT + a.id, a.kind, a.name, a.summary, a.description_md, a.latest_version, a.source, a.publisher, + a.risk_level, a.trust_level, a.permissions_json, a.compatibility_json, a.dependencies_json, + a.icon_url, a.tags_json, a.hit_signals_json, a.score, a.updated_at, a.manifest_summary_json + FROM artifacts a`+whereSQL, args...) + if err != nil { + return nil, 0, err } - if filter.Publisher != "" && !strings.Contains(strings.ToLower(artifact.Publisher), strings.ToLower(strings.TrimSpace(filter.Publisher))) { - return false + defer rows.Close() + candidates := make([]searchCandidate, 0, total) + for rows.Next() { + artifact, err := scanArtifact(rows) + if err != nil { + return nil, 0, err + } + candidate := searchCandidate{Artifact: artifact} + applySearchScores(&candidate, filter) + candidates = append(candidates, candidate) } - if filter.OS != "" && len(artifact.Compatibility.OS) > 0 && !containsFold(artifact.Compatibility.OS, filter.OS) { - return false + if err := rows.Err(); err != nil { + return nil, 0, err } - if filter.Arch != "" && len(artifact.Compatibility.Arch) > 0 && !containsFold(artifact.Compatibility.Arch, filter.Arch) { - return false + sortSearchCandidates(candidates, filter.Sort) + return candidates, total, nil +} + +func (s *Store) listLexicalCandidates(ctx context.Context, filter SearchFilter) ([]searchCandidate, int, error) { + matchQuery := buildFTSQuery(filter.Query) + if matchQuery == "" { + return s.listStructuredCandidates(ctx, filter) } - query := strings.ToLower(strings.TrimSpace(filter.Query)) - if query == "" { - return true + where, args := appendStructuredFilterClauses([]string{`artifacts_fts MATCH ?`}, []any{matchQuery}, filter) + whereSQL := " WHERE " + strings.Join(where, " AND ") + countQuery := `SELECT COUNT(*) FROM artifacts_fts JOIN artifacts a ON a.id = artifacts_fts.artifact_id` + whereSQL + var total int + if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + return nil, 0, err + } + if total == 0 || filter.Offset >= total { + return nil, total, nil + } + + candidateLimit := filter.Offset + filter.Limit*4 + if candidateLimit < filter.Offset+filter.Limit { + candidateLimit = filter.Offset + filter.Limit + } + if candidateLimit < filter.Limit*2 { + candidateLimit = filter.Limit * 2 + } + if candidateLimit < defaultSearchCandidateCap/2 { + candidateLimit = defaultSearchCandidateCap / 2 + } + if candidateLimit > defaultSearchCandidateCap { + candidateLimit = defaultSearchCandidateCap + } + + query := `SELECT + a.id, a.kind, a.name, a.summary, a.description_md, a.latest_version, a.source, a.publisher, + a.risk_level, a.trust_level, a.permissions_json, a.compatibility_json, a.dependencies_json, + a.icon_url, a.tags_json, a.hit_signals_json, a.score, a.updated_at, a.manifest_summary_json, + bm25(artifacts_fts) AS lexical_rank + FROM artifacts_fts + JOIN artifacts a ON a.id = artifacts_fts.artifact_id` + whereSQL + ` + ORDER BY bm25(artifacts_fts), a.updated_at DESC + LIMIT ?` + argsWithLimit := append(append([]any(nil), args...), candidateLimit) + rows, err := s.db.QueryContext(ctx, query, argsWithLimit...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + candidates := make([]searchCandidate, 0, candidateLimit) + for rows.Next() { + artifact, rank, err := scanArtifactWithLexicalRank(rows) + if err != nil { + return nil, 0, err + } + candidate := searchCandidate{Artifact: artifact, lexicalRank: rank} + applySearchScores(&candidate, filter) + candidates = append(candidates, candidate) } - fields := []string{ - artifact.ID, - artifact.Name, - artifact.Summary, - artifact.DescriptionMD, - artifact.Publisher, - strings.Join(artifact.Tags, " "), - strings.Join(artifact.HitSignals, " "), + if err := rows.Err(); err != nil { + return nil, 0, err } - return strings.Contains(strings.ToLower(strings.Join(fields, " ")), query) + sortSearchCandidates(candidates, filter.Sort) + return candidates, total, nil } -func sortArtifacts(items []Artifact, mode string) { - sortMode := strings.ToLower(strings.TrimSpace(mode)) +func paginateCandidates(items []searchCandidate, offset int, limit int) []searchCandidate { + if offset < 0 { + offset = 0 + } + if limit <= 0 { + limit = defaultSearchLimit + } + if offset >= len(items) { + return nil + } + items = items[offset:] + if len(items) > limit { + items = items[:limit] + } + return items +} + +func sortSearchCandidates(items []searchCandidate, mode string) { sort.SliceStable(items, func(i, j int) bool { - switch sortMode { + switch strings.ToLower(strings.TrimSpace(mode)) { case "updated", "updated_desc": if items[i].UpdatedAt == items[j].UpdatedAt { - return fallbackArtifactLess(items[i], items[j]) + return fallbackArtifactLess(items[i].Artifact, items[j].Artifact) } return items[i].UpdatedAt > items[j].UpdatedAt case "name", "name_asc": left := strings.ToLower(items[i].Name) right := strings.ToLower(items[j].Name) if left == right { - return fallbackArtifactLess(items[i], items[j]) + return fallbackArtifactLess(items[i].Artifact, items[j].Artifact) } return left < right default: - if items[i].Score == items[j].Score { - if items[i].UpdatedAt == items[j].UpdatedAt { - return fallbackArtifactLess(items[i], items[j]) + if items[i].FinalScore == items[j].FinalScore { + if items[i].lexicalRank == items[j].lexicalRank { + return fallbackArtifactLess(items[i].Artifact, items[j].Artifact) } - return items[i].UpdatedAt > items[j].UpdatedAt + return items[i].lexicalRank < items[j].lexicalRank } - return items[i].Score > items[j].Score + return items[i].FinalScore > items[j].FinalScore } }) } +func normalizeSearchFilter(filter SearchFilter) SearchFilter { + if filter.Limit <= 0 { + filter.Limit = defaultSearchLimit + } + if filter.Offset < 0 { + filter.Offset = 0 + } + filter.Query = strings.TrimSpace(filter.Query) + switch filter.SearchMode { + case SearchModeAuto, SearchModeLexical, SearchModeHybrid: + default: + filter.SearchMode = SearchModeAuto + } + return filter +} + +func resolvedSearchMode(filter SearchFilter, vectorApplied bool) SearchMode { + if strings.TrimSpace(filter.Query) == "" { + return SearchModeLexical + } + switch filter.SearchMode { + case SearchModeLexical: + return SearchModeLexical + case SearchModeAuto, SearchModeHybrid: + if vectorApplied { + return SearchModeHybrid + } + return SearchModeHybrid + default: + return SearchModeLexical + } +} + +func shouldAttemptHybrid(filter SearchFilter) bool { + if strings.TrimSpace(filter.Query) == "" { + return false + } + return filter.SearchMode == SearchModeAuto || filter.SearchMode == SearchModeHybrid +} + +func cloneRetrievalMeta(meta RetrievalMeta) *RetrievalMeta { + counts := meta.CandidateCounts + if counts != nil { + copyCounts := *counts + counts = ©Counts + } + return &RetrievalMeta{ + SearchMode: meta.SearchMode, + VectorApplied: meta.VectorApplied, + VectorFallbackReason: meta.VectorFallbackReason, + CandidateCounts: counts, + } +} + +func scoreValue(item Artifact) float64 { + if item.FinalScore > 0 { + return item.FinalScore + } + if item.Score > 0 { + return item.Score + } + return item.LexicalScore +} + +func scanArtifactWithLexicalRank(row scanner) (Artifact, float64, error) { + var artifact Artifact + var permissions, compatibility, dependencies, tags, hitSignals, manifestSummary string + var lexicalRank float64 + err := row.Scan( + &artifact.ID, &artifact.Kind, &artifact.Name, &artifact.Summary, + &artifact.DescriptionMD, &artifact.LatestVersion, &artifact.Source, + &artifact.Publisher, &artifact.RiskLevel, &artifact.TrustLevel, + &permissions, &compatibility, &dependencies, &artifact.IconURL, + &tags, &hitSignals, &artifact.Score, &artifact.UpdatedAt, + &manifestSummary, &lexicalRank, + ) + if err != nil { + return Artifact{}, 0, err + } + artifact.Version = artifact.LatestVersion + if err := decodeJSON(permissions, &artifact.Permissions); err != nil { + return Artifact{}, 0, err + } + if err := decodeJSON(compatibility, &artifact.Compatibility); err != nil { + return Artifact{}, 0, err + } + if err := decodeJSON(dependencies, &artifact.Dependencies); err != nil { + return Artifact{}, 0, err + } + if err := decodeJSON(tags, &artifact.Tags); err != nil { + return Artifact{}, 0, err + } + if err := decodeJSON(hitSignals, &artifact.HitSignals); err != nil { + return Artifact{}, 0, err + } + if err := decodeJSON(manifestSummary, &artifact.ManifestSummary); err != nil { + return Artifact{}, 0, err + } + return artifact, lexicalRank, nil +} + +func (s *Store) upsertArtifactSearchDocTx(ctx context.Context, tx *sql.Tx, artifact Artifact) error { + doc := buildArtifactSearchDocument(artifact) + if _, err := tx.ExecContext(ctx, `DELETE FROM artifacts_fts WHERE artifact_id = ?`, artifact.ID); err != nil { + return err + } + _, err := tx.ExecContext(ctx, `INSERT INTO artifacts_fts ( + artifact_id, name, summary, description_md, publisher, kind, + tags_text, hit_signals_text, use_case_text, search_text + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + doc["artifact_id"], doc["name"], doc["summary"], doc["description_md"], + doc["publisher"], doc["kind"], doc["tags_text"], doc["hit_signals_text"], + doc["use_case_text"], doc["search_text"], + ) + return err +} + +func (s *Store) rebuildArtifactSearchIndex(ctx context.Context) error { + rows, err := s.db.QueryContext(ctx, `SELECT + id, kind, name, summary, description_md, latest_version, source, publisher, + risk_level, trust_level, permissions_json, compatibility_json, dependencies_json, + icon_url, tags_json, hit_signals_json, score, updated_at, manifest_summary_json + FROM artifacts`) + if err != nil { + return err + } + artifacts := make([]Artifact, 0, 64) + for rows.Next() { + artifact, scanErr := scanArtifact(rows) + if scanErr != nil { + _ = rows.Close() + return scanErr + } + artifacts = append(artifacts, artifact) + } + if err := rows.Err(); err != nil { + _ = rows.Close() + return err + } + if err := rows.Close(); err != nil { + return err + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + if _, err = tx.ExecContext(ctx, `DELETE FROM artifacts_fts`); err != nil { + return err + } + for _, artifact := range artifacts { + if err = s.upsertArtifactSearchDocTx(ctx, tx, artifact); err != nil { + return err + } + } + err = tx.Commit() + return err +} + func fallbackArtifactLess(left, right Artifact) bool { if left.Kind != right.Kind { return left.Kind < right.Kind diff --git a/pkg/marketregistry/types.go b/pkg/marketregistry/types.go index 4626b939..16b0974d 100644 --- a/pkg/marketregistry/types.go +++ b/pkg/marketregistry/types.go @@ -29,10 +29,40 @@ type Artifact struct { Tags []string `json:"tags,omitempty"` HitSignals []string `json:"hit_signals,omitempty"` Score float64 `json:"score,omitempty"` + LexicalScore float64 `json:"lexical_score,omitempty"` + VectorScore *float64 `json:"vector_score,omitempty"` + TagScore float64 `json:"tag_score,omitempty"` + TrustScore float64 `json:"trust_score,omitempty"` + FreshnessScore float64 `json:"freshness_score,omitempty"` + RiskPenalty float64 `json:"risk_penalty,omitempty"` + FinalScore float64 `json:"final_score,omitempty"` + MatchSignals []string `json:"match_signals,omitempty"` UpdatedAt string `json:"updated_at,omitempty"` ManifestSummary map[string]string `json:"manifest_summary,omitempty"` } +type SearchMode string + +const ( + SearchModeAuto SearchMode = "auto" + SearchModeLexical SearchMode = "lexical" + SearchModeHybrid SearchMode = "hybrid" + SearchModeLexicalFallback SearchMode = "lexical_fallback" +) + +type CandidateCounts struct { + Lexical int `json:"lexical,omitempty"` + Vector int `json:"vector,omitempty"` + Merged int `json:"merged,omitempty"` +} + +type RetrievalMeta struct { + SearchMode SearchMode `json:"search_mode,omitempty"` + VectorApplied bool `json:"vector_applied,omitempty"` + VectorFallbackReason string `json:"vector_fallback_reason,omitempty"` + CandidateCounts *CandidateCounts `json:"candidate_counts,omitempty"` +} + type Compatibility struct { AnyClawMin string `json:"anyclaw_min,omitempty"` OS []string `json:"os,omitempty"` @@ -89,6 +119,7 @@ type SearchFilter struct { Kind ArtifactKind `json:"kind,omitempty"` Source string `json:"source,omitempty"` Query string `json:"q,omitempty"` + SearchMode SearchMode `json:"search_mode,omitempty"` Risk string `json:"risk,omitempty"` Trust string `json:"trust,omitempty"` Tag string `json:"tag,omitempty"` @@ -102,10 +133,11 @@ type SearchFilter struct { } type ListResult struct { - Items []Artifact `json:"items"` - Total int `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Items []Artifact `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + RetrievalMeta *RetrievalMeta `json:"retrieval_meta,omitempty"` } type VersionListResult struct { @@ -114,8 +146,12 @@ type VersionListResult struct { } type ResponseMeta struct { - ProtocolVersion string `json:"protocol_version,omitempty"` - Count int `json:"count,omitempty"` + ProtocolVersion string `json:"protocol_version,omitempty"` + Count int `json:"count,omitempty"` + SearchMode SearchMode `json:"search_mode,omitempty"` + VectorApplied *bool `json:"vector_applied,omitempty"` + VectorFallbackReason string `json:"vector_fallback_reason,omitempty"` + CandidateCounts *CandidateCounts `json:"candidate_counts,omitempty"` } type ErrorResponse struct { diff --git a/pkg/marketregistry/vector.go b/pkg/marketregistry/vector.go new file mode 100644 index 00000000..5a09e360 --- /dev/null +++ b/pkg/marketregistry/vector.go @@ -0,0 +1,157 @@ +package marketregistry + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/1024XEngineer/anyclaw/pkg/embedding" +) + +type embeddingProvider interface { + Embed(ctx context.Context, text string) ([]float32, error) + EmbedBatch(ctx context.Context, texts []string) ([][]float32, error) + Name() string + Dimension() int +} + +type VectorConfig struct { + Enabled bool + FailOpen bool + Provider string + Model string + QueryModel string + APIKey string + BaseURL string + SecretKey string + QueryTimeout time.Duration + WorkerPoll time.Duration + MaxJobAttempts int + HybridTopK int + HybridCandidate int + QueryCacheTTL time.Duration + QueryCacheSize int +} + +type vectorRuntime struct { + cfg VectorConfig + artifactEmbed embeddingProvider + queryEmbed embeddingProvider + queryCache *embedding.Cache + metrics *registryMetrics +} + +func normalizeVectorConfig(cfg VectorConfig) VectorConfig { + if cfg.QueryTimeout <= 0 { + cfg.QueryTimeout = 12 * time.Second + } + if cfg.WorkerPoll <= 0 { + cfg.WorkerPoll = 5 * time.Second + } + if cfg.MaxJobAttempts <= 0 { + cfg.MaxJobAttempts = 3 + } + if cfg.HybridTopK <= 0 { + cfg.HybridTopK = 25 + } + if cfg.HybridCandidate <= 0 { + cfg.HybridCandidate = 200 + } + if cfg.QueryCacheTTL <= 0 { + cfg.QueryCacheTTL = 10 * time.Minute + } + if cfg.QueryCacheSize <= 0 { + cfg.QueryCacheSize = 512 + } + cfg.Provider = strings.TrimSpace(strings.ToLower(cfg.Provider)) + cfg.Model = strings.TrimSpace(cfg.Model) + cfg.QueryModel = strings.TrimSpace(cfg.QueryModel) + if cfg.QueryModel == "" { + cfg.QueryModel = cfg.Model + } + return cfg +} + +func newVectorRuntime(cfg VectorConfig) (*vectorRuntime, error) { + cfg = normalizeVectorConfig(cfg) + if !cfg.Enabled { + return &vectorRuntime{cfg: cfg}, nil + } + if cfg.Provider == "" || cfg.Model == "" || cfg.QueryModel == "" || strings.TrimSpace(cfg.APIKey) == "" { + if cfg.FailOpen { + return &vectorRuntime{cfg: cfg}, nil + } + return nil, fmt.Errorf("vector search is enabled but embedding provider/model/api key is incomplete") + } + + artifactProvider, err := embedding.NewProvider(embedding.Config{ + Provider: embedding.ProviderType(cfg.Provider), + APIKey: cfg.APIKey, + SecretKey: cfg.SecretKey, + BaseURL: cfg.BaseURL, + Model: cfg.Model, + }) + if err != nil { + if cfg.FailOpen { + return &vectorRuntime{cfg: cfg}, nil + } + return nil, err + } + queryProvider, err := embedding.NewProvider(embedding.Config{ + Provider: embedding.ProviderType(cfg.Provider), + APIKey: cfg.APIKey, + SecretKey: cfg.SecretKey, + BaseURL: cfg.BaseURL, + Model: cfg.QueryModel, + }) + if err != nil { + if cfg.FailOpen { + return &vectorRuntime{cfg: cfg}, nil + } + return nil, err + } + return &vectorRuntime{ + cfg: cfg, + artifactEmbed: artifactProvider, + queryEmbed: queryProvider, + queryCache: embedding.NewCache(cfg.QueryCacheSize, cfg.QueryCacheTTL), + }, nil +} + +func newVectorRuntimeWithProviders(cfg VectorConfig, artifactProvider, queryProvider embeddingProvider) (*vectorRuntime, error) { + cfg = normalizeVectorConfig(cfg) + if artifactProvider == nil { + artifactProvider = queryProvider + } + if queryProvider == nil { + queryProvider = artifactProvider + } + if !cfg.Enabled || artifactProvider == nil || queryProvider == nil { + return &vectorRuntime{cfg: cfg}, nil + } + return &vectorRuntime{ + cfg: cfg, + artifactEmbed: artifactProvider, + queryEmbed: queryProvider, + queryCache: embedding.NewCache(cfg.QueryCacheSize, cfg.QueryCacheTTL), + }, nil +} + +func (r *vectorRuntime) enabled() bool { + return r != nil && r.cfg.Enabled && r.artifactEmbed != nil && r.queryEmbed != nil +} + +func (r *vectorRuntime) artifactModel() string { + if r == nil { + return "" + } + return r.cfg.Model +} + +func (r *vectorRuntime) queryModel() string { + if r == nil { + return "" + } + return r.cfg.QueryModel +} From f55c503c30cb1f91a7fd30c4979098f0cc7e2e66 Mon Sep 17 00:00:00 2001 From: TheShigure7 <2947458856@qq.com> Date: Sat, 9 May 2026 23:53:58 +0800 Subject: [PATCH 2/2] fix(marketplace): harden vector backend compatibility --- pkg/marketplace/registry/client.go | 3 ++ pkg/marketplace/registry/client_test.go | 30 ++++++++++++ pkg/marketregistry/embedding_types.go | 22 ++++----- pkg/marketregistry/server_test.go | 31 ++++++++++--- pkg/marketregistry/store.go | 61 +++++++++++++++++-------- 5 files changed, 111 insertions(+), 36 deletions(-) diff --git a/pkg/marketplace/registry/client.go b/pkg/marketplace/registry/client.go index a8104ed2..7a586a24 100644 --- a/pkg/marketplace/registry/client.go +++ b/pkg/marketplace/registry/client.go @@ -391,6 +391,9 @@ func firstRetrievalMeta(values ...*remoteRetrievalMeta) *marketplace.RetrievalMe if value == nil { continue } + if value.SearchMode == "" && value.VectorApplied == nil && value.VectorFallbackReason == "" && value.CandidateCounts == nil { + continue + } return convertRetrievalMeta(value) } return nil diff --git a/pkg/marketplace/registry/client_test.go b/pkg/marketplace/registry/client_test.go index ff2de93b..6aa51cca 100644 --- a/pkg/marketplace/registry/client_test.go +++ b/pkg/marketplace/registry/client_test.go @@ -99,6 +99,36 @@ func TestClientListConvertsCloudArtifactsAndCaches(t *testing.T) { } } +func TestClientListLegacyTopLevelMetaDoesNotFabricateRetrievalMeta(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + writeTestJSON(t, w, map[string]any{ + "data": map[string]any{ + "items": []map[string]any{testRemoteArtifact("skill")}, + "total": 1, + "limit": 10, + "offset": 0, + }, + "meta": map[string]any{ + "protocol_version": "1.0", + "count": 1, + }, + }) + })) + defer server.Close() + + client := NewClient(ClientConfig{Endpoint: server.URL}) + result, err := client.List(context.Background(), marketplace.Filter{Limit: 10}) + if err != nil { + t.Fatal(err) + } + if result.Total != 1 || len(result.Items) != 1 { + t.Fatalf("unexpected result: %#v", result) + } + if result.RetrievalMeta != nil { + t.Fatalf("expected nil retrieval meta for legacy top-level meta, got %#v", result.RetrievalMeta) + } +} + func TestClientDetailVersionsAndResolve(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { diff --git a/pkg/marketregistry/embedding_types.go b/pkg/marketregistry/embedding_types.go index fa70fcea..2dc7b42a 100644 --- a/pkg/marketregistry/embedding_types.go +++ b/pkg/marketregistry/embedding_types.go @@ -13,17 +13,17 @@ const ( ) type ArtifactEmbedding struct { - ArtifactID string `json:"artifact_id"` - Model string `json:"model"` - Vector []float32 `json:"vector,omitempty"` - VectorDim int `json:"vector_dim"` - EmbeddingText string `json:"embedding_text,omitempty"` - Status string `json:"status"` - Error string `json:"error,omitempty"` - LastJobError string `json:"last_job_error,omitempty"` - LastJobStatus string `json:"last_job_status,omitempty"` - LastAttemptAt string `json:"last_attempt_at,omitempty"` - LastUpdatedAt string `json:"last_updated_at,omitempty"` + ArtifactID string `json:"artifact_id"` + Model string `json:"model"` + Vector []float32 `json:"vector,omitempty"` + VectorDim int `json:"vector_dim"` + EmbeddingText string `json:"embedding_text,omitempty"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + LastJobError string `json:"last_job_error,omitempty"` + LastJobStatus string `json:"last_job_status,omitempty"` + LastAttemptAt string `json:"last_attempt_at,omitempty"` + LastUpdatedAt string `json:"last_updated_at,omitempty"` } type ArtifactEmbeddingJob struct { diff --git a/pkg/marketregistry/server_test.go b/pkg/marketregistry/server_test.go index 94f682f1..23c38c43 100644 --- a/pkg/marketregistry/server_test.go +++ b/pkg/marketregistry/server_test.go @@ -738,6 +738,27 @@ func doJSONWithAuth(t *testing.T, handler http.Handler, method, path string, bod } } +func waitForEmbeddingStatus(t *testing.T, server *Server, artifactID string, wantStatus string) *ArtifactEmbedding { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + item, err := server.store.loadArtifactEmbedding(context.Background(), artifactID) + if err != nil { + t.Fatal(err) + } + if item != nil && item.Status == wantStatus { + return item + } + time.Sleep(25 * time.Millisecond) + } + item, err := server.store.loadArtifactEmbedding(context.Background(), artifactID) + if err != nil { + t.Fatal(err) + } + t.Fatalf("timed out waiting for embedding %s to reach status %s, last=%#v", artifactID, wantStatus, item) + return nil +} + func assertZipContains(t *testing.T, data []byte, name string) { t.Helper() reader, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) @@ -829,11 +850,7 @@ func TestWorkerCreatesArtifactEmbeddingRecord(t *testing.T) { if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { t.Fatal(err) } - server.processOneEmbeddingJob(context.Background()) - item, err := server.store.loadArtifactEmbedding(context.Background(), artifact.ID) - if err != nil { - t.Fatal(err) - } + item := waitForEmbeddingStatus(t, server, artifact.ID, artifactEmbeddingStatusReady) if item == nil || item.Status != artifactEmbeddingStatusReady { t.Fatalf("expected ready embedding record, got %#v", item) } @@ -884,7 +901,7 @@ func TestServerAdminEmbeddingEndpointsAndMetrics(t *testing.T) { if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { t.Fatal(err) } - server.processOneEmbeddingJob(context.Background()) + waitForEmbeddingStatus(t, server, artifact.ID, artifactEmbeddingStatusReady) var jobs struct { Data ArtifactEmbeddingJobList `json:"data"` @@ -967,7 +984,7 @@ func TestServerHybridQueryCacheAvoidsRepeatedEmbeddingCalls(t *testing.T) { if err := server.store.UpsertArtifact(context.Background(), artifact, nil); err != nil { t.Fatal(err) } - server.processOneEmbeddingJob(context.Background()) + waitForEmbeddingStatus(t, server, artifact.ID, artifactEmbeddingStatusReady) var result struct { Data ListResult `json:"data"` diff --git a/pkg/marketregistry/store.go b/pkg/marketregistry/store.go index 679506ef..6a9d7383 100644 --- a/pkg/marketregistry/store.go +++ b/pkg/marketregistry/store.go @@ -29,8 +29,9 @@ var ( ) type Store struct { - db *sql.DB - vector *vectorRuntime + db *sql.DB + vector *vectorRuntime + enableFTS bool } type searchExecution struct { @@ -69,7 +70,13 @@ func OpenStoreWithConfig(ctx context.Context, cfg StoreConfig) (*Store, error) { if err != nil { return nil, err } - store := &Store{db: db} + if driver == "sqlite" { + // Serialize SQLite access so publish/upsert requests and the embedding + // worker do not contend on concurrent writes. + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + } + store := &Store{db: db, enableFTS: driver == "sqlite"} if err := store.migrate(ctx); err != nil { _ = db.Close() return nil, err @@ -184,19 +191,6 @@ func (s *Store) migrate(ctx context.Context) error { created_at TEXT NOT NULL, updated_at TEXT NOT NULL )`, - `CREATE VIRTUAL TABLE IF NOT EXISTS artifacts_fts USING fts5( - artifact_id UNINDEXED, - name, - summary, - description_md, - publisher, - kind, - tags_text, - hit_signals_text, - use_case_text, - search_text, - tokenize = 'unicode61' - )`, `CREATE INDEX IF NOT EXISTS idx_artifacts_kind ON artifacts(kind)`, `CREATE INDEX IF NOT EXISTS idx_artifacts_source ON artifacts(source)`, `CREATE INDEX IF NOT EXISTS idx_artifacts_risk ON artifacts(risk_level)`, @@ -212,7 +206,27 @@ func (s *Store) migrate(ctx context.Context) error { return err } } + if s.enableFTS { + if _, err := s.db.ExecContext(ctx, `CREATE VIRTUAL TABLE IF NOT EXISTS artifacts_fts USING fts5( + artifact_id UNINDEXED, + name, + summary, + description_md, + publisher, + kind, + tags_text, + hit_signals_text, + use_case_text, + search_text, + tokenize = 'unicode61' + )`); err != nil { + return err + } + } _, _ = s.db.ExecContext(ctx, `ALTER TABLE artifact_versions ADD COLUMN signature TEXT NOT NULL DEFAULT ''`) + if !s.enableFTS { + return nil + } return s.rebuildArtifactSearchIndex(ctx) } @@ -248,8 +262,10 @@ func (s *Store) DeleteArtifact(ctx context.Context, artifactID string) (Artifact if _, err = tx.ExecContext(ctx, `DELETE FROM artifact_embeddings WHERE artifact_id = ?`, artifactID); err != nil { return ArtifactDeletion{}, err } - if _, err = tx.ExecContext(ctx, `DELETE FROM artifacts_fts WHERE artifact_id = ?`, artifactID); err != nil { - return ArtifactDeletion{}, err + if s.enableFTS { + if _, err = tx.ExecContext(ctx, `DELETE FROM artifacts_fts WHERE artifact_id = ?`, artifactID); err != nil { + return ArtifactDeletion{}, err + } } if _, err = tx.ExecContext(ctx, `DELETE FROM quarantine WHERE artifact_id = ?`, artifactID); err != nil { return ArtifactDeletion{}, err @@ -908,6 +924,9 @@ func (s *Store) listStructuredCandidates(ctx context.Context, filter SearchFilte } func (s *Store) listLexicalCandidates(ctx context.Context, filter SearchFilter) ([]searchCandidate, int, error) { + if s == nil || !s.enableFTS { + return s.listStructuredCandidates(ctx, filter) + } matchQuery := buildFTSQuery(filter.Query) if matchQuery == "" { return s.listStructuredCandidates(ctx, filter) @@ -1116,6 +1135,9 @@ func scanArtifactWithLexicalRank(row scanner) (Artifact, float64, error) { } func (s *Store) upsertArtifactSearchDocTx(ctx context.Context, tx *sql.Tx, artifact Artifact) error { + if s == nil || !s.enableFTS { + return nil + } doc := buildArtifactSearchDocument(artifact) if _, err := tx.ExecContext(ctx, `DELETE FROM artifacts_fts WHERE artifact_id = ?`, artifact.ID); err != nil { return err @@ -1132,6 +1154,9 @@ func (s *Store) upsertArtifactSearchDocTx(ctx context.Context, tx *sql.Tx, artif } func (s *Store) rebuildArtifactSearchIndex(ctx context.Context) error { + if s == nil || !s.enableFTS { + return nil + } rows, err := s.db.QueryContext(ctx, `SELECT id, kind, name, summary, description_md, latest_version, source, publisher, risk_level, trust_level, permissions_json, compatibility_json, dependencies_json,