diff --git a/horizon/cmd/mcp-server/main.go b/horizon/cmd/mcp-server/main.go new file mode 100644 index 00000000..dd256803 --- /dev/null +++ b/horizon/cmd/mcp-server/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "os" + "strings" + + "github.com/Meesho/BharatMLStack/horizon/internal/configs" + mcpserver "github.com/Meesho/BharatMLStack/horizon/internal/mcp" + onlinefeaturestore "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store" + ofsConfig "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/config" + ofsHandler "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/handler" + "github.com/Meesho/BharatMLStack/horizon/pkg/etcd" + "github.com/Meesho/BharatMLStack/horizon/pkg/infra" + "github.com/Meesho/BharatMLStack/horizon/pkg/logger" + "github.com/mark3labs/mcp-go/server" + "github.com/rs/zerolog/log" +) + +// AppConfig holds the application configuration for the MCP server. +type AppConfig struct { + Configs configs.Configs + DynamicConfigs configs.DynamicConfigs +} + +func (cfg *AppConfig) GetStaticConfig() interface{} { + return &cfg.Configs +} + +func (cfg *AppConfig) GetDynamicConfig() interface{} { + return &cfg.DynamicConfigs +} + +func main() { + var appConfig AppConfig + + configs.InitConfig(&appConfig) + logger.Init(appConfig.Configs) + infra.InitDBConnectors(appConfig.Configs) + etcd.InitFromAppName(&ofsConfig.FeatureRegistry{}, appConfig.Configs.OnlineFeatureStoreAppName, appConfig.Configs) + onlinefeaturestore.Init(appConfig.Configs) + + configHandler := ofsHandler.NewConfigHandler(1) + if configHandler == nil { + log.Fatal().Msg("failed to initialize online-feature-store config handler") + } + + mcpSrv := mcpserver.NewServer(configHandler) + + addr := os.Getenv("MCP_ADDR") + if addr == "" { + addr = ":8080" + } + + var httpOpts []server.StreamableHTTPOption + + if strings.EqualFold(os.Getenv("MCP_TLS_ENABLED"), "true") { + certFile := os.Getenv("MCP_TLS_CERT_FILE") + keyFile := os.Getenv("MCP_TLS_KEY_FILE") + if certFile == "" || keyFile == "" { + log.Fatal().Msg("MCP_TLS_CERT_FILE and MCP_TLS_KEY_FILE must be set when MCP_TLS_ENABLED=true") + } + httpOpts = append(httpOpts, server.WithTLSCert(certFile, keyFile)) + log.Info().Str("addr", addr).Msg("Starting Horizon MCP server with TLS (Streamable HTTP)") + } else { + log.Info().Str("addr", addr).Msg("Starting Horizon MCP server (Streamable HTTP)") + } + + httpServer := server.NewStreamableHTTPServer(mcpSrv, httpOpts...) + if err := httpServer.Start(addr); err != nil { + log.Fatal().Err(err).Msg("MCP server error") + } +} diff --git a/horizon/go.mod b/horizon/go.mod index ddb290ee..08b919be 100644 --- a/horizon/go.mod +++ b/horizon/go.mod @@ -17,6 +17,7 @@ require ( github.com/google/go-github/v53 v53.2.0 github.com/google/uuid v1.6.0 github.com/maolinc/copier v0.0.0-20230308122822-96b2f568544f + github.com/mark3labs/mcp-go v0.44.0 github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.34.0 github.com/spf13/viper v1.20.1 @@ -48,6 +49,8 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/sonic v1.14.0 // indirect github.com/bytedance/sonic/loader v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -83,11 +86,13 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/invopop/jsonschema v0.13.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -109,6 +114,8 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/zeebo/errs v1.4.0 // indirect go.etcd.io/etcd/api/v3 v3.5.12 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect diff --git a/horizon/go.sum b/horizon/go.sum index 2066e511..ba58a716 100644 --- a/horizon/go.sum +++ b/horizon/go.sum @@ -37,6 +37,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 h1:wPbRQzjjwFc0ih8puEVAOFGELsn1zoIIYdxvML7mDxA= github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8/go.mod h1:I0gYDMZ6Z5GRU7l58bNFSkPTFN6Yl12dsUlAZ8xy98g= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= @@ -45,6 +47,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradleyfalzon/ghinstallation/v2 v2.17.0 h1:SmbUK/GxpAspRjSQbB6ARvH+ArzlNzTtHydNyXUQ6zg= github.com/bradleyfalzon/ghinstallation/v2 v2.17.0/go.mod h1:vuD/xvJT9Y+ZVZRv4HQ42cMyPFIYqpc7AbB4Gvt/DlY= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= @@ -151,10 +155,13 @@ github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81 github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= +github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -170,8 +177,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/maolinc/copier v0.0.0-20230308122822-96b2f568544f h1:sRTOY+RyQBvYIXUD64+jD+rrdJ3DmrKu/QD3s+CwQeI= github.com/maolinc/copier v0.0.0-20230308122822-96b2f568544f/go.mod h1:kZ+zAWCoPv9+nPxdwQjK7cdfhlKZH2+q3PjvdpsXR7Q= +github.com/mark3labs/mcp-go v0.44.0 h1:OlYfcVviAnwNN40QZUrrzU0QZjq3En7rCU5X09a/B7I= +github.com/mark3labs/mcp-go v0.44.0/go.mod h1:YnJfOL382MIWDx1kMY+2zsRHU/q78dBg9aFb8W6Thdw= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= @@ -245,8 +256,12 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA= github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/horizon/horizon b/horizon/horizon index a80bc40a..56437857 100755 Binary files a/horizon/horizon and b/horizon/horizon differ diff --git a/horizon/internal/mcp/server.go b/horizon/internal/mcp/server.go new file mode 100644 index 00000000..79199c30 --- /dev/null +++ b/horizon/internal/mcp/server.go @@ -0,0 +1,52 @@ +package mcp + +import ( + "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/handler" + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" +) + +// NewServer creates and configures a new MCP server with all +// Horizon online-feature-store discovery tools registered. +func NewServer(configHandler handler.Config) *server.MCPServer { + s := server.NewMCPServer( + "horizon-ofs-mcp", + "1.0.0", + server.WithToolCapabilities(false), + server.WithRecovery(), + ) + + tools := NewToolHandlers(configHandler) + registerTools(s, tools) + return s +} + +func registerTools(s *server.MCPServer, t *ToolHandlers) { + listEntities := mcp.NewTool("list_entities", + mcp.WithDescription("List all registered entity labels in the online feature store"), + ) + s.AddTool(listEntities, t.ListEntities) + + getEntityDetails := mcp.NewTool("get_entity_details", + mcp.WithDescription("Get detailed configuration for all entities including keys and cache settings"), + ) + s.AddTool(getEntityDetails, t.GetEntityDetails) + + listFeatureGroups := mcp.NewTool("list_feature_groups", + mcp.WithDescription("List feature groups for a given entity with their labels and data types"), + mcp.WithString("entity_label", + mcp.Required(), + mcp.Description("The entity label to list feature groups for"), + ), + ) + s.AddTool(listFeatureGroups, t.ListFeatureGroups) + + getFeatureGroupDetails := mcp.NewTool("get_feature_group_details", + mcp.WithDescription("Get detailed configuration for feature groups including features, versions, TTL, and cache settings"), + mcp.WithString("entity_label", + mcp.Required(), + mcp.Description("The entity label to retrieve feature group details for"), + ), + ) + s.AddTool(getFeatureGroupDetails, t.GetFeatureGroupDetails) +} diff --git a/horizon/internal/mcp/tools.go b/horizon/internal/mcp/tools.go new file mode 100644 index 00000000..c7726329 --- /dev/null +++ b/horizon/internal/mcp/tools.go @@ -0,0 +1,104 @@ +package mcp + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/Meesho/BharatMLStack/horizon/internal/online-feature-store/handler" + "github.com/mark3labs/mcp-go/mcp" +) + +// ToolHandlers holds the handler.Config dependency and implements +// all MCP tool handler functions for Horizon discovery tools. +type ToolHandlers struct { + config handler.Config +} + +// NewToolHandlers creates a new ToolHandlers with the given config handler. +func NewToolHandlers(config handler.Config) *ToolHandlers { + return &ToolHandlers{config: config} +} + +// ListEntities returns all registered entity labels. +func (t *ToolHandlers) ListEntities(_ context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + entities, err := t.config.GetAllEntities() + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to list entities: %v", err)), nil + } + + data, err := json.Marshal(entities) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to marshal entities: %v", err)), nil + } + return mcp.NewToolResultText(string(data)), nil +} + +// GetEntityDetails returns full configuration for all entities +// including keys, in-memory cache, and distributed cache settings. +func (t *ToolHandlers) GetEntityDetails(_ context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + entities, err := t.config.RetrieveEntities() + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to retrieve entity details: %v", err)), nil + } + + data, err := json.Marshal(entities) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to marshal entity details: %v", err)), nil + } + return mcp.NewToolResultText(string(data)), nil +} + +// ListFeatureGroups returns feature groups for a given entity +// with their labels and data types. +func (t *ToolHandlers) ListFeatureGroups(_ context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + entityLabel, err := request.RequireString("entity_label") + if err != nil { + return mcp.NewToolResultError("entity_label is required"), nil + } + + featureGroups, err := t.config.RetrieveFeatureGroups(entityLabel) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to list feature groups: %v", err)), nil + } + + type fgSummary struct { + Label string `json:"label"` + DataType string `json:"data-type"` + } + var summaries []fgSummary + if featureGroups != nil { + for _, fg := range *featureGroups { + summaries = append(summaries, fgSummary{ + Label: fg.FeatureGroupLabel, + DataType: string(fg.DataType), + }) + } + } + + data, err := json.Marshal(summaries) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to marshal feature groups: %v", err)), nil + } + return mcp.NewToolResultText(string(data)), nil +} + +// GetFeatureGroupDetails returns full feature group configuration +// including features, active version, TTL, and cache settings. +func (t *ToolHandlers) GetFeatureGroupDetails(_ context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + entityLabel, err := request.RequireString("entity_label") + if err != nil { + return mcp.NewToolResultError("entity_label is required"), nil + } + + featureGroups, err := t.config.RetrieveFeatureGroups(entityLabel) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to retrieve feature group details: %v", err)), nil + } + + data, err := json.Marshal(featureGroups) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to marshal feature group details: %v", err)), nil + } + return mcp.NewToolResultText(string(data)), nil +} diff --git a/horizon/mcp-server b/horizon/mcp-server new file mode 100755 index 00000000..f988a4e7 Binary files /dev/null and b/horizon/mcp-server differ diff --git a/online-feature-store/api-server b/online-feature-store/api-server new file mode 100755 index 00000000..ac3868be Binary files /dev/null and b/online-feature-store/api-server differ diff --git a/online-feature-store/cmd/mcp-server/main.go b/online-feature-store/cmd/mcp-server/main.go new file mode 100644 index 00000000..da611671 --- /dev/null +++ b/online-feature-store/cmd/mcp-server/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "os" + "strings" + + "github.com/Meesho/BharatMLStack/online-feature-store/internal/config" + featureConfig "github.com/Meesho/BharatMLStack/online-feature-store/internal/config" + "github.com/Meesho/BharatMLStack/online-feature-store/internal/data/repositories/provider" + "github.com/Meesho/BharatMLStack/online-feature-store/internal/handler/feature" + mcpserver "github.com/Meesho/BharatMLStack/online-feature-store/internal/mcp" + "github.com/Meesho/BharatMLStack/online-feature-store/internal/system" + pkgConfig "github.com/Meesho/BharatMLStack/online-feature-store/pkg/config" + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/etcd" + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/infra" + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/logger" + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/metric" + "github.com/mark3labs/mcp-go/server" + "github.com/rs/zerolog/log" +) + +const configManagerVersion = 1 +const normalizedEntitiesWatchPath = "/entities" +const registeredClientsWatchPath = "/security/reader" +const cbWatchPath = "/circuitbreaker" + +func main() { + pkgConfig.InitEnv() + logger.Init() + etcd.Init(configManagerVersion, &config.FeatureRegistry{}) + metric.Init() + infra.InitDBConnectors() + system.Init() + featureConfig.InitEtcDBridge() + + configManager := featureConfig.Instance(featureConfig.DefaultVersion) + configManager.UpdateCBConfigs() + if err := etcd.Instance().RegisterWatchPathCallback(cbWatchPath, configManager.UpdateCBConfigs); err != nil { + log.Error().Err(err).Msg("Error registering watch path callback for circuit breaker") + } + configManager.GetNormalizedEntities() + if err := etcd.Instance().RegisterWatchPathCallback(normalizedEntitiesWatchPath, configManager.GetNormalizedEntities); err != nil { + log.Error().Err(err).Msg("Error registering watch path callback for normalized entities") + } + configManager.RegisterClients() + if err := etcd.Instance().RegisterWatchPathCallback(registeredClientsWatchPath, configManager.RegisterClients); err != nil { + log.Error().Err(err).Msg("Error registering watch path callback for registered clients") + } + provider.InitProvider(configManager, etcd.Instance()) + + handler := feature.InitV1() + + mcpSrv := mcpserver.NewServer(handler) + + addr := os.Getenv("MCP_ADDR") + if addr == "" { + addr = ":8080" + } + + var httpOpts []server.StreamableHTTPOption + + if strings.EqualFold(os.Getenv("MCP_TLS_ENABLED"), "true") { + certFile := os.Getenv("MCP_TLS_CERT_FILE") + keyFile := os.Getenv("MCP_TLS_KEY_FILE") + if certFile == "" || keyFile == "" { + log.Fatal().Msg("MCP_TLS_CERT_FILE and MCP_TLS_KEY_FILE must be set when MCP_TLS_ENABLED=true") + } + httpOpts = append(httpOpts, server.WithTLSCert(certFile, keyFile)) + log.Info().Str("addr", addr).Msg("Starting OFS MCP server with TLS (Streamable HTTP)") + } else { + log.Info().Str("addr", addr).Msg("Starting OFS MCP server (Streamable HTTP)") + } + + httpServer := server.NewStreamableHTTPServer(mcpSrv, httpOpts...) + if err := httpServer.Start(addr); err != nil { + log.Fatal().Err(err).Msg("MCP server error") + } +} diff --git a/online-feature-store/go.mod b/online-feature-store/go.mod index 0cbec9a2..774b2a76 100644 --- a/online-feature-store/go.mod +++ b/online-feature-store/go.mod @@ -12,6 +12,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/gocql/gocql v1.7.0 github.com/klauspost/compress v1.17.11 + github.com/mark3labs/mcp-go v0.44.0 github.com/panjf2000/gnet/v2 v2.9.2 github.com/redis/go-redis/v9 v9.17.2 github.com/rs/zerolog v1.34.0 @@ -28,7 +29,9 @@ require ( require ( github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/bits-and-blooms/bitset v1.14.3 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect @@ -49,10 +52,13 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/invopop/jsonschema v0.13.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect @@ -73,6 +79,8 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/etcd/api/v3 v3.5.17 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.17 // indirect diff --git a/online-feature-store/go.sum b/online-feature-store/go.sum index 81bf12cf..355555f7 100644 --- a/online-feature-store/go.sum +++ b/online-feature-store/go.sum @@ -45,6 +45,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= @@ -59,6 +61,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY= github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= @@ -225,6 +229,8 @@ github.com/in-toto/in-toto-golang v0.5.0 h1:hb8bgwr0M2hGdDsLjkJ3ZqJ8JFLL/tgYdAxF github.com/in-toto/in-toto-golang v0.5.0/go.mod h1:/Rq0IZHLV7Ku5gielPT4wPHJfH1GdHMCq8+WPxw8/BE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= +github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -256,6 +262,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mark3labs/mcp-go v0.44.0 h1:OlYfcVviAnwNN40QZUrrzU0QZjq3En7rCU5X09a/B7I= +github.com/mark3labs/mcp-go v0.44.0/go.mod h1:YnJfOL382MIWDx1kMY+2zsRHU/q78dBg9aFb8W6Thdw= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= @@ -419,6 +427,8 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= @@ -427,6 +437,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/online-feature-store/internal/mcp/server.go b/online-feature-store/internal/mcp/server.go new file mode 100644 index 00000000..29e02b71 --- /dev/null +++ b/online-feature-store/internal/mcp/server.go @@ -0,0 +1,50 @@ +package mcp + +import ( + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/proto/retrieve" + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" +) + +// NewServer creates and configures a new MCP server with all +// online-feature-store tools registered. +func NewServer(handler retrieve.FeatureServiceServer) *server.MCPServer { + s := server.NewMCPServer( + "ofs-mcp", + "1.0.0", + server.WithToolCapabilities(false), + server.WithRecovery(), + ) + + tools := NewToolHandlers(handler) + registerTools(s, tools) + return s +} + +func registerTools(s *server.MCPServer, t *ToolHandlers) { + retrieveDecoded := mcp.NewTool("retrieve_decoded_features", + mcp.WithDescription("Retrieve feature values in human-readable decoded format for given entity keys across one or more feature groups"), + mcp.WithString("entity_label", + mcp.Required(), + mcp.Description("The entity label to retrieve features for (e.g. 'user', 'catalog')"), + ), + mcp.WithArray("feature_groups", + mcp.Required(), + mcp.Description("Array of feature groups to retrieve. Each element must have 'label' (string) and 'feature_labels' (array of strings)"), + ), + mcp.WithArray("keys_schema", + mcp.Required(), + mcp.Description("Column names for the entity keys (e.g. ['user_id'])"), + ), + mcp.WithArray("keys", + mcp.Required(), + mcp.Description("Array of entity keys. Each element is an array of key values corresponding to keys_schema"), + ), + ) + s.AddTool(retrieveDecoded, t.RetrieveDecodedFeatures) + + healthCheck := mcp.NewTool("health_check", + mcp.WithDescription("Check if the online feature store is healthy and reachable"), + ) + s.AddTool(healthCheck, t.HealthCheck) +} diff --git a/online-feature-store/internal/mcp/tools.go b/online-feature-store/internal/mcp/tools.go new file mode 100644 index 00000000..873421d6 --- /dev/null +++ b/online-feature-store/internal/mcp/tools.go @@ -0,0 +1,183 @@ +package mcp + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/Meesho/BharatMLStack/online-feature-store/pkg/proto/retrieve" + "github.com/mark3labs/mcp-go/mcp" + "google.golang.org/grpc/metadata" +) + +const ( + callerIDEnvVar = "OFS_CALLER_ID" + authTokenEnvVar = "OFS_AUTH_TOKEN" + + callerIDHeader = "online-feature-store-caller-id" + authTokenHeader = "online-feature-store-auth-token" +) + +// ToolHandlers holds the feature handler dependency and implements +// all MCP tool handler functions for the online feature store. +type ToolHandlers struct { + handler retrieve.FeatureServiceServer +} + +// NewToolHandlers creates a new ToolHandlers with the given feature service handler. +func NewToolHandlers(handler retrieve.FeatureServiceServer) *ToolHandlers { + return &ToolHandlers{handler: handler} +} + +// RetrieveDecodedFeatures retrieves feature values in human-readable format. +func (t *ToolHandlers) RetrieveDecodedFeatures(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + args := request.GetArguments() + + entityLabel, ok := args["entity_label"].(string) + if !ok || entityLabel == "" { + return mcp.NewToolResultError("entity_label is required and must be a string"), nil + } + + query := &retrieve.Query{ + EntityLabel: entityLabel, + } + + // Parse keys_schema + keysSchemaRaw, ok := args["keys_schema"].([]interface{}) + if !ok { + return mcp.NewToolResultError("keys_schema is required and must be an array of strings"), nil + } + for _, v := range keysSchemaRaw { + s, ok := v.(string) + if !ok { + return mcp.NewToolResultError("each element in keys_schema must be a string"), nil + } + query.KeysSchema = append(query.KeysSchema, s) + } + + // Parse feature_groups + fgRaw, ok := args["feature_groups"].([]interface{}) + if !ok { + return mcp.NewToolResultError("feature_groups is required and must be an array"), nil + } + for _, fgItem := range fgRaw { + fgMap, ok := fgItem.(map[string]interface{}) + if !ok { + return mcp.NewToolResultError("each feature_group must be an object with 'label' and 'feature_labels'"), nil + } + label, ok := fgMap["label"].(string) + if !ok || label == "" { + return mcp.NewToolResultError("each feature_group must have a 'label' string field"), nil + } + labelsRaw, ok := fgMap["feature_labels"].([]interface{}) + if !ok { + return mcp.NewToolResultError("each feature_group must have a 'feature_labels' array field"), nil + } + fg := &retrieve.FeatureGroup{Label: label} + for _, l := range labelsRaw { + s, ok := l.(string) + if !ok { + return mcp.NewToolResultError("each feature_label must be a string"), nil + } + fg.FeatureLabels = append(fg.FeatureLabels, s) + } + query.FeatureGroups = append(query.FeatureGroups, fg) + } + + // Parse keys + keysRaw, ok := args["keys"].([]interface{}) + if !ok { + return mcp.NewToolResultError("keys is required and must be an array"), nil + } + for _, keyItem := range keysRaw { + keyArr, ok := keyItem.([]interface{}) + if !ok { + return mcp.NewToolResultError("each key must be an array of strings"), nil + } + key := &retrieve.Keys{} + for _, k := range keyArr { + s, ok := k.(string) + if !ok { + return mcp.NewToolResultError("each key value must be a string"), nil + } + key.Cols = append(key.Cols, s) + } + query.Keys = append(query.Keys, key) + } + + // Inject auth metadata into context + authCtx := injectAuthContext(ctx) + + result, err := t.handler.RetrieveDecodedResult(authCtx, query) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to retrieve decoded features: %v", err)), nil + } + + // Build a JSON-friendly response + type decodedRow struct { + Keys []string `json:"keys"` + Columns []string `json:"columns"` + } + type featureInfo struct { + Label string `json:"label"` + ColumnIdx int32 `json:"column_idx"` + } + type schemaInfo struct { + FeatureGroupLabel string `json:"feature_group_label"` + Features []featureInfo `json:"features"` + } + type response struct { + KeysSchema []string `json:"keys_schema"` + FeatureSchemas []schemaInfo `json:"feature_schemas"` + Rows []decodedRow `json:"rows"` + } + + resp := response{ + KeysSchema: result.KeysSchema, + } + for _, fs := range result.FeatureSchemas { + si := schemaInfo{FeatureGroupLabel: fs.FeatureGroupLabel} + for _, f := range fs.Features { + si.Features = append(si.Features, featureInfo{ + Label: f.Label, + ColumnIdx: f.ColumnIdx, + }) + } + resp.FeatureSchemas = append(resp.FeatureSchemas, si) + } + for _, row := range result.Rows { + resp.Rows = append(resp.Rows, decodedRow{ + Keys: row.Keys, + Columns: row.Columns, + }) + } + + data, err := json.Marshal(resp) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("failed to marshal response: %v", err)), nil + } + return mcp.NewToolResultText(string(data)), nil +} + +// HealthCheck verifies the feature store is operational. +func (t *ToolHandlers) HealthCheck(_ context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + return mcp.NewToolResultText(`{"status":"ok"}`), nil +} + +// injectAuthContext reads OFS_CALLER_ID and OFS_AUTH_TOKEN from env +// and injects them as gRPC incoming metadata on the context. +func injectAuthContext(ctx context.Context) context.Context { + callerID := os.Getenv(callerIDEnvVar) + authToken := os.Getenv(authTokenEnvVar) + + if callerID == "" || authToken == "" { + return ctx + } + + md := metadata.New(map[string]string{ + callerIDHeader: callerID, + authTokenHeader: authToken, + }) + return metadata.NewIncomingContext(ctx, md) +} diff --git a/online-feature-store/mcp-server b/online-feature-store/mcp-server new file mode 100755 index 00000000..8ced628e Binary files /dev/null and b/online-feature-store/mcp-server differ