From 7f797f4d9c860a4eb722ca73d583ef11eb9782cb Mon Sep 17 00:00:00 2001 From: dhruvgupta-meesho Date: Tue, 23 Dec 2025 10:48:38 +0530 Subject: [PATCH 1/4] added schema api changes --- .../inferflow/controller/controller.go | 13 + horizon/internal/inferflow/handler/config.go | 1 + .../internal/inferflow/handler/inferflow.go | 261 ++++++++++++++++++ horizon/internal/inferflow/handler/models.go | 9 + horizon/internal/inferflow/route/router.go | 1 + horizon/internal/middleware/middleware.go | 3 +- .../repositories/sql/inferflow/models.go | 10 + .../sql/inferflow/request/repository.go | 13 + horizon/pkg/configschemaclient/client.go | 245 ++++++++++++++++ horizon/pkg/configschemaclient/types.go | 119 ++++++++ 10 files changed, 674 insertions(+), 1 deletion(-) create mode 100644 horizon/pkg/configschemaclient/client.go create mode 100644 horizon/pkg/configschemaclient/types.go diff --git a/horizon/internal/inferflow/controller/controller.go b/horizon/internal/inferflow/controller/controller.go index 0d62160f..c71ab8df 100644 --- a/horizon/internal/inferflow/controller/controller.go +++ b/horizon/internal/inferflow/controller/controller.go @@ -26,6 +26,7 @@ type Config interface { ExecuteFuncitonalTestRequest(ctx *gin.Context) GetLatestRequest(ctx *gin.Context) GetLoggingTTL(ctx *gin.Context) + GetFeatureSchema(ctx *gin.Context) } var ( @@ -365,3 +366,15 @@ func (c *V1) GetLoggingTTL(ctx *gin.Context) { } ctx.JSON(200, response) } + +func (c *V1)GetFeatureSchema(ctx *gin.Context) { + response, err := c.Config.GetFeatureSchema(handler.FeatureSchemaRequest{ + ModelConfigId: ctx.Query("model_config_id"), + Version: strings.TrimSpace(ctx.Query("version")), + }) + if err != nil { + ctx.JSON(api.NewBadRequestError(err.Error()).StatusCode, "Error getting feature schema") + return + } + ctx.JSON(200, response) +} diff --git a/horizon/internal/inferflow/handler/config.go b/horizon/internal/inferflow/handler/config.go index 829ae74e..2c543893 100644 --- a/horizon/internal/inferflow/handler/config.go +++ b/horizon/internal/inferflow/handler/config.go @@ -16,4 +16,5 @@ type Config interface { ExecuteFuncitonalTestRequest(request ExecuteRequestFunctionalTestingRequest) (ExecuteRequestFunctionalTestingResponse, error) GetLatestRequest(requestID string) (GetLatestRequestResponse, error) GetLoggingTTL() (GetLoggingTTLResponse, error) + GetFeatureSchema(FeatureSchemaRequest) (FeatureSchemaResponse, error) } diff --git a/horizon/internal/inferflow/handler/inferflow.go b/horizon/internal/inferflow/handler/inferflow.go index 1376a510..1ee165f4 100644 --- a/horizon/internal/inferflow/handler/inferflow.go +++ b/horizon/internal/inferflow/handler/inferflow.go @@ -17,6 +17,7 @@ import ( inferflow_config "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/inferflow/config" inferflow_request "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/inferflow/request" service_deployable_config "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/servicedeployableconfig" + configschemaclient "github.com/Meesho/BharatMLStack/horizon/pkg/configschemaclient" "github.com/Meesho/BharatMLStack/horizon/pkg/grpc" "github.com/Meesho/BharatMLStack/horizon/pkg/infra" "github.com/Meesho/BharatMLStack/horizon/pkg/random" @@ -1110,3 +1111,263 @@ func (m *InferFlow) GetLoggingTTL() (GetLoggingTTLResponse, error) { Data: []int{30, 60, 90}, }, nil } + +func (m *InferFlow) GetFeatureSchema(request FeatureSchemaRequest) (FeatureSchemaResponse, error) { + version, err := strconv.Atoi(request.Version) + if err != nil { + return FeatureSchemaResponse{ + Data: []inferflow.SchemaComponents{}, + }, err + } + modelProxyRequests, err := m.InferFlowRequestRepo.GetByConfigIDandVersion(request.ModelConfigId, version) + if err != nil { + log.Error().Err(err).Str("model_config_id", request.ModelConfigId).Msg("Failed to get model proxy config") + return FeatureSchemaResponse{ + Data: []inferflow.SchemaComponents{}, + }, err + } + modelProxyConfig := modelProxyRequests[0].Payload + componentConfig := &modelProxyConfig.ConfigValue.ComponentConfig + responseConfig := &modelProxyConfig.ConfigValue.ResponseConfig + + clientComponentConfig := toClientComponentConfig(componentConfig) + clientResponseConfig := toClientResponseConfig(responseConfig) + + response := configschemaclient.BuildFeatureSchema(clientComponentConfig, clientResponseConfig) + + if responseConfig.LogSelectiveFeatures { + clientSchemaComponents := configschemaclient.ProcessResponseConfig(clientResponseConfig, response) + log.Info().Str("model_config_id", request.ModelConfigId).Int("schema_components_count", len(clientSchemaComponents)).Msg("Successfully generated feature schema") + return FeatureSchemaResponse{ + Data: toInferflowSchemaComponents(clientSchemaComponents), + }, nil + } + + log.Info().Str("model_config_id", request.ModelConfigId).Int("schema_components_count", len(response)).Msg("Successfully generated feature schema") + return FeatureSchemaResponse{ + Data: toInferflowSchemaComponents(response), + }, nil +} + +func toClientComponentConfig(config *inferflow.ComponentConfig) *configschemaclient.ComponentConfig { + if config == nil { + return nil + } + + return &configschemaclient.ComponentConfig{ + CacheEnabled: config.CacheEnabled, + CacheTTL: config.CacheTTL, + CacheVersion: config.CacheVersion, + FeatureComponents: toClientFeatureComponents(config.FeatureComponents), + RTPComponents: toClientRTPComponents(config.RTPComponents), + PredatorComponents: toClientPredatorComponents(config.PredatorComponents), + NumerixComponents: toClientNumerixComponents(config.NumerixComponents), + } +} + +func toClientResponseConfig(config *inferflow.ResponseConfig) *configschemaclient.ResponseConfig { + if config == nil { + return nil + } + + return &configschemaclient.ResponseConfig{ + LoggingPerc: config.LoggingPerc, + ModelSchemaPerc: config.ModelSchemaPerc, + Features: config.Features, + LogSelectiveFeatures: config.LogSelectiveFeatures, + LogBatchSize: config.LogBatchSize, + } +} + +func toClientNumerixComponents(components []inferflow.NumerixComponent) []configschemaclient.NumerixComponent { + if len(components) == 0 { + return nil + } + + result := make([]configschemaclient.NumerixComponent, len(components)) + for i, c := range components { + result[i] = configschemaclient.NumerixComponent{ + Component: c.Component, + ComponentID: c.ComponentID, + ScoreCol: c.ScoreCol, + ComputeID: c.ComputeID, + ScoreMapping: c.ScoreMapping, + DataType: c.DataType, + } + } + return result +} + +func toClientFeatureComponents(components []inferflow.FeatureComponent) []configschemaclient.FeatureComponent { + if len(components) == 0 { + return nil + } + + result := make([]configschemaclient.FeatureComponent, len(components)) + for i, c := range components { + result[i] = configschemaclient.FeatureComponent{ + Component: c.Component, + ComponentID: c.ComponentID, + ColNamePrefix: c.ColNamePrefix, + CompCacheEnabled: c.CompCacheEnabled, + CompCacheTTL: c.CompCacheTTL, + CompositeID: c.CompositeID, + FSKeys: toClientFSKeys(c.FSKeys), + FSRequest: toClientFSRequest(c.FSRequest), + FSFlattenRespKeys: c.FSFlattenRespKeys, + } + } + return result +} + +func toClientRTPComponents(components []inferflow.RTPComponent) []configschemaclient.RTPComponent { + if len(components) == 0 { + return nil + } + + result := make([]configschemaclient.RTPComponent, len(components)) + for i, c := range components { + result[i] = configschemaclient.RTPComponent{ + Component: c.Component, + ComponentID: c.ComponentID, + CompositeID: c.CompositeID, + FSKeys: toClientFSKeys(c.FSKeys), + FSRequest: toClientFSRequest(c.FSRequest), + FSFlattenRespKeys: c.FSFlattenRespKeys, + ColNamePrefix: c.ColNamePrefix, + CompCacheEnabled: c.CompCacheEnabled, + } + } + return result +} + +func toClientPredatorComponents(components []inferflow.PredatorComponent) []configschemaclient.PredatorComponent { + if len(components) == 0 { + return nil + } + + result := make([]configschemaclient.PredatorComponent, len(components)) + for i, c := range components { + result[i] = configschemaclient.PredatorComponent{ + Component: c.Component, + ComponentID: c.ComponentID, + ModelName: c.ModelName, + ModelEndPoint: c.ModelEndPoint, + Calibration: c.Calibration, + Deadline: c.Deadline, + BatchSize: c.BatchSize, + Inputs: toClientPredatorInputs(c.Inputs), + Outputs: toClientPredatorOutputs(c.Outputs), + RoutingConfig: toClientRoutingConfigs(c.RoutingConfig), + } + } + return result +} + +func toClientFSKeys(keys []inferflow.FSKey) []configschemaclient.FSKey { + if len(keys) == 0 { + return nil + } + + result := make([]configschemaclient.FSKey, len(keys)) + for i, k := range keys { + result[i] = configschemaclient.FSKey{ + Schema: k.Schema, + Col: k.Col, + } + } + return result +} + +func toClientFSRequest(req *inferflow.FSRequest) *configschemaclient.FSRequest { + if req == nil { + return nil + } + + return &configschemaclient.FSRequest{ + Label: req.Label, + FeatureGroups: toClientFSFeatureGroups(req.FeatureGroups), + } +} + +func toClientFSFeatureGroups(groups []inferflow.FSFeatureGroup) []configschemaclient.FSFeatureGroup { + if len(groups) == 0 { + return nil + } + + result := make([]configschemaclient.FSFeatureGroup, len(groups)) + for i, g := range groups { + result[i] = configschemaclient.FSFeatureGroup{ + Label: g.Label, + Features: g.Features, + DataType: g.DataType, + } + } + return result +} + +func toClientPredatorInputs(inputs []inferflow.PredatorInput) []configschemaclient.PredatorInput { + if len(inputs) == 0 { + return nil + } + + result := make([]configschemaclient.PredatorInput, len(inputs)) + for i, input := range inputs { + result[i] = configschemaclient.PredatorInput{ + Name: input.Name, + Features: input.Features, + Dims: input.Dims, + DataType: input.DataType, + } + } + return result +} + +func toClientPredatorOutputs(outputs []inferflow.PredatorOutput) []configschemaclient.PredatorOutput { + if len(outputs) == 0 { + return nil + } + + result := make([]configschemaclient.PredatorOutput, len(outputs)) + for i, output := range outputs { + result[i] = configschemaclient.PredatorOutput{ + Name: output.Name, + ModelScores: output.ModelScores, + ModelScoresDims: output.ModelScoresDims, + DataType: output.DataType, + } + } + return result +} + +func toClientRoutingConfigs(configs []inferflow.RoutingConfig) []configschemaclient.RoutingConfig { + if len(configs) == 0 { + return nil + } + + result := make([]configschemaclient.RoutingConfig, len(configs)) + for i, c := range configs { + result[i] = configschemaclient.RoutingConfig{ + ModelName: c.ModelName, + ModelEndpoint: c.ModelEndpoint, + RoutingPercentage: c.RoutingPercentage, + } + } + return result +} + +func toInferflowSchemaComponents(components []configschemaclient.SchemaComponents) []inferflow.SchemaComponents { + if len(components) == 0 { + return nil + } + + result := make([]inferflow.SchemaComponents, len(components)) + for i, c := range components { + result[i] = inferflow.SchemaComponents{ + FeatureName: c.FeatureName, + FeatureType: c.FeatureType, + FeatureSize: c.FeatureSize, + } + } + return result +} diff --git a/horizon/internal/inferflow/handler/models.go b/horizon/internal/inferflow/handler/models.go index c595ba93..f1155403 100644 --- a/horizon/internal/inferflow/handler/models.go +++ b/horizon/internal/inferflow/handler/models.go @@ -445,3 +445,12 @@ type RTPFeatureGroup struct { Features []string `json:"features"` DataType string `json:"dataType"` } + +type FeatureSchemaRequest struct { + ModelConfigId string `json:"model_config_id"` + Version string `json:"version"` +} + +type FeatureSchemaResponse struct { + Data []dbModel.SchemaComponents `json:"data"` +} \ No newline at end of file diff --git a/horizon/internal/inferflow/route/router.go b/horizon/internal/inferflow/route/router.go index 8483c6a8..429ed761 100644 --- a/horizon/internal/inferflow/route/router.go +++ b/horizon/internal/inferflow/route/router.go @@ -25,6 +25,7 @@ func Init() { register.GET("/logging-ttl", controller.NewConfigController().GetLoggingTTL) register.PATCH("/delete", controller.NewConfigController().Delete) register.GET("/latestRequest/:config_id", controller.NewConfigController().GetLatestRequest) + register.GET("/get_feature_schema", controller.NewConfigController().GetFeatureSchema) } discovery := v1.Group("/inferflow-config-discovery") diff --git a/horizon/internal/middleware/middleware.go b/horizon/internal/middleware/middleware.go index 1642b666..7daaaee1 100644 --- a/horizon/internal/middleware/middleware.go +++ b/horizon/internal/middleware/middleware.go @@ -98,7 +98,8 @@ func (m *MiddlewareHandler) AuthMiddleware() gin.HandlerFunc { if strings.HasPrefix(c.Request.URL.Path, "/login") || strings.HasPrefix(c.Request.URL.Path, "/register") || strings.HasPrefix(c.Request.URL.Path, "/health") || - strings.HasPrefix(c.Request.URL.Path, "/api/1.0/fs-config") { + strings.HasPrefix(c.Request.URL.Path, "/api/1.0/fs-config")|| + strings.HasPrefix(c.Request.URL.Path, "/api/v1/inferflow-config-registry/get_feature_schema") { c.Next() return } diff --git a/horizon/internal/repositories/sql/inferflow/models.go b/horizon/internal/repositories/sql/inferflow/models.go index 68ecce49..0e7db82a 100644 --- a/horizon/internal/repositories/sql/inferflow/models.go +++ b/horizon/internal/repositories/sql/inferflow/models.go @@ -163,6 +163,16 @@ type TestResults struct { Message string `json:"message"` } +type GetSchemaResponse struct { + Components []SchemaComponents +} + +type SchemaComponents struct { + FeatureName string `json:"feature_name"` + FeatureType string `json:"feature_type"` + FeatureSize any `json:"feature_size"` +} + func (p *Payload) Scan(value interface{}) error { if value == nil { *p = Payload{} diff --git a/horizon/internal/repositories/sql/inferflow/request/repository.go b/horizon/internal/repositories/sql/inferflow/request/repository.go index dbe5d34d..29bc0ac3 100644 --- a/horizon/internal/repositories/sql/inferflow/request/repository.go +++ b/horizon/internal/repositories/sql/inferflow/request/repository.go @@ -23,6 +23,7 @@ type Repository interface { Deactivate(configID string) error GetLatestPendingRequestByConfigID(configID string) ([]Table, error) GetApprovedRequestsByConfigID(configID string) ([]Table, error) + GetByConfigIDandVersion(configID string, version int) ([]Table, error) } type InferflowRequest struct { @@ -144,3 +145,15 @@ func (g *InferflowRequest) GetLatestPendingRequestByConfigID(configID string) ([ Find(&tables) return tables, result.Error } + +func (g *InferflowRequest) GetByConfigIDandVersion(configID string, version int) ([]Table, error) { + var tables []Table + result := g.db.Where("config_id = ? AND version = ? AND status = ?", configID, version, "APPROVED").Find(&tables) + if result.Error != nil { + return nil, result.Error + } + if len(tables) == 0 { + return nil, errors.New("no request found with the given config_id and version") + } + return tables, result.Error +} diff --git a/horizon/pkg/configschemaclient/client.go b/horizon/pkg/configschemaclient/client.go new file mode 100644 index 00000000..5e655eb6 --- /dev/null +++ b/horizon/pkg/configschemaclient/client.go @@ -0,0 +1,245 @@ +package configschemaclient + +import ( + "strings" +) + +// BuildFeatureSchema builds a feature schema from the component and response configs. +// It processes components in order: Iris Output → Iris Input → FS → RTP → Predator Input → Predator Output → Response Config +func BuildFeatureSchema(componentConfig *ComponentConfig, responseConfig *ResponseConfig) []SchemaComponents { + if componentConfig == nil { + return nil + } + + existingFeatures := make(map[string]bool) + var response []SchemaComponents + + addUniqueComponents := func(components []SchemaComponents) { + for _, component := range components { + if !existingFeatures[component.FeatureName] { + response = append(response, component) + existingFeatures[component.FeatureName] = true + } + } + } + + addOrUpdateComponents := func(components []SchemaComponents) { + for _, component := range components { + if !existingFeatures[component.FeatureName] { + component.FeatureType = "String" + response = append(response, component) + existingFeatures[component.FeatureName] = true + } + } + } + + // 1. Iris Output + addUniqueComponents(processIrisOutput(componentConfig.NumerixComponents)) + + // 2. Iris Input + addUniqueComponents(processIrisInput(componentConfig.NumerixComponents)) + + // 3. FS (Feature Store) + addUniqueComponents(processFS(componentConfig.FeatureComponents)) + + // 4. RTP (Real Time Pricing) + addUniqueComponents(processRTP(componentConfig.RTPComponents)) + + // 5. Predator Input (only add if not already present) + addOrUpdateComponents(processPredatorInput(componentConfig.PredatorComponents)) + + // 6. Predator Output (only add if not already present) + addOrUpdateComponents(processPredatorOutput(componentConfig.PredatorComponents)) + + // 7. Response Config features + responseSchemaComponents := ProcessResponseConfig(responseConfig, response) + addUniqueComponents(responseSchemaComponents) + + return response +} + +func processIrisOutput(irisComponents []NumerixComponent) []SchemaComponents { + if len(irisComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, irisComponent := range irisComponents { + response = append(response, SchemaComponents{ + FeatureName: irisComponent.ScoreCol, + FeatureType: irisComponent.DataType, + FeatureSize: 1, + }) + } + return response +} + +func processIrisInput(irisComponents []NumerixComponent) []SchemaComponents { + if len(irisComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, irisComponent := range irisComponents { + for irisInput, featureName := range irisComponent.ScoreMapping { + inputParts := strings.Split(irisInput, "@") + response = append(response, SchemaComponents{ + FeatureName: featureName, + FeatureType: inputParts[1], + FeatureSize: 1, + }) + } + } + return response +} + +func getFeatureName(prefix, entityLabel, fgLabel, feature string) string { + featureName := "" + if prefix != "" { + featureName = prefix + } + if entityLabel != "" { + featureName = featureName + entityLabel + ":" + } + if fgLabel != "" { + featureName = featureName + fgLabel + ":" + } + return featureName + feature +} + +func processFS(featureComponents []FeatureComponent) []SchemaComponents { + if len(featureComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, featureComponent := range featureComponents { + if featureComponent.FSRequest == nil { + continue + } + for _, featureGroup := range featureComponent.FSRequest.FeatureGroups { + for _, feature := range featureGroup.Features { + response = append(response, SchemaComponents{ + FeatureName: getFeatureName(featureComponent.ColNamePrefix, featureComponent.FSRequest.Label, featureGroup.Label, feature), + FeatureType: featureGroup.DataType, + FeatureSize: 1, + }) + } + } + } + return response +} + +func processRTP(rtpComponents []RTPComponent) []SchemaComponents { + if len(rtpComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, rtpComponent := range rtpComponents { + if rtpComponent.FSRequest == nil { + continue + } + for _, featureGroup := range rtpComponent.FSRequest.FeatureGroups { + for _, feature := range featureGroup.Features { + response = append(response, SchemaComponents{ + FeatureName: getFeatureName(rtpComponent.ColNamePrefix, rtpComponent.FSRequest.Label, featureGroup.Label, feature), + FeatureType: featureGroup.DataType, + FeatureSize: 1, + }) + } + } + } + return response +} + +func processPredatorOutput(predatorComponents []PredatorComponent) []SchemaComponents { + if len(predatorComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, predatorComponent := range predatorComponents { + for _, output := range predatorComponent.Outputs { + for index, modelScore := range output.ModelScores { + var featureSize any = 1 + dataType := output.DataType + if index < len(output.ModelScoresDims) { + featureSize, dataType = getPredatorFeatureTypeAndSize(output.DataType, output.ModelScoresDims[index]) + } + response = append(response, SchemaComponents{ + FeatureName: modelScore, + FeatureType: dataType, + FeatureSize: featureSize, + }) + } + } + } + return response +} + +func processPredatorInput(predatorComponents []PredatorComponent) []SchemaComponents { + if len(predatorComponents) == 0 { + return nil + } + + var response []SchemaComponents + for _, predatorComponent := range predatorComponents { + for _, input := range predatorComponent.Inputs { + for _, feature := range input.Features { + size, dataType := getPredatorFeatureTypeAndSize(input.DataType, input.Dims) + response = append(response, SchemaComponents{ + FeatureName: feature, + FeatureType: dataType, + FeatureSize: size, + }) + } + } + } + return response +} + +func getPredatorFeatureTypeAndSize(dataType string, shape []int) (int, string) { + if len(shape) == 1 && shape[0] == 1 { + return 1, dataType + } + if len(shape) == 2 && shape[0] == -1 { + return shape[1], dataType + "Vector" + } + if len(shape) > 0 { + return shape[0], dataType + "Vector" + } + return 1, dataType +} + +// ProcessResponseConfig processes the response config and builds schema components +// based on the features specified in the response config. +func ProcessResponseConfig(responseConfig *ResponseConfig, schemaComponents []SchemaComponents) []SchemaComponents { + if responseConfig == nil || len(responseConfig.Features) == 0 { + return nil + } + + var response []SchemaComponents + + schemaMap := make(map[string]SchemaComponents) + for _, component := range schemaComponents { + schemaMap[component.FeatureName] = component + } + + for _, feature := range responseConfig.Features { + if existingComponent, exists := schemaMap[feature]; exists { + response = append(response, SchemaComponents{ + FeatureName: feature, + FeatureType: existingComponent.FeatureType, + FeatureSize: existingComponent.FeatureSize, + }) + } else { + response = append(response, SchemaComponents{ + FeatureName: feature, + FeatureType: "String", + FeatureSize: 1, + }) + } + } + return response +} diff --git a/horizon/pkg/configschemaclient/types.go b/horizon/pkg/configschemaclient/types.go new file mode 100644 index 00000000..b75d7c20 --- /dev/null +++ b/horizon/pkg/configschemaclient/types.go @@ -0,0 +1,119 @@ +package configschemaclient + +// SchemaComponents represents a feature schema component +type SchemaComponents struct { + FeatureName string `json:"feature_name"` + FeatureType string `json:"feature_type"` + FeatureSize any `json:"feature_size"` +} + +// ComponentConfig contains all component configurations +type ComponentConfig struct { + CacheEnabled bool `json:"cache_enabled"` + CacheTTL int `json:"cache_ttl"` + CacheVersion int `json:"cache_version"` + FeatureComponents []FeatureComponent `json:"feature_components"` + RTPComponents []RTPComponent `json:"real_time_pricing_feature_components,omitempty"` + PredatorComponents []PredatorComponent `json:"predator_components"` + NumerixComponents []NumerixComponent `json:"numerix_components"` +} + +// ResponseConfig contains response configuration +type ResponseConfig struct { + LoggingPerc int `json:"logging_perc"` + ModelSchemaPerc int `json:"model_schema_features_perc"` + Features []string `json:"features"` + LogSelectiveFeatures bool `json:"log_features"` + LogBatchSize int `json:"log_batch_size"` +} + +// NumerixComponent represents a Numerix/Iris component +type NumerixComponent struct { + Component string `json:"component"` + ComponentID string `json:"component_id"` + ScoreCol string `json:"score_col"` + ComputeID string `json:"compute_id"` + ScoreMapping map[string]string `json:"score_mapping"` + DataType string `json:"data_type"` +} + +// FeatureComponent represents a feature store component +type FeatureComponent struct { + Component string `json:"component"` + ComponentID string `json:"component_id"` + ColNamePrefix string `json:"col_name_prefix,omitempty"` + CompCacheEnabled bool `json:"comp_cache_enabled"` + CompCacheTTL int `json:"comp_cache_ttl,omitempty"` + CompositeID bool `json:"composite_id,omitempty"` + FSKeys []FSKey `json:"fs_keys"` + FSRequest *FSRequest `json:"fs_request"` + FSFlattenRespKeys []string `json:"fs_flatten_resp_keys"` +} + +// RTPComponent represents a real-time pricing component +type RTPComponent struct { + Component string `json:"component"` + ComponentID string `json:"component_id"` + CompositeID bool `json:"composite_id"` + FSKeys []FSKey `json:"fs_keys"` + FSRequest *FSRequest `json:"fs_request"` + FSFlattenRespKeys []string `json:"fs_flatten_resp_keys"` + ColNamePrefix string `json:"col_name_prefix"` + CompCacheEnabled bool `json:"comp_cache_enabled"` +} + +// PredatorComponent represents a Predator model component +type PredatorComponent struct { + Component string `json:"component"` + ComponentID string `json:"component_id"` + ModelName string `json:"model_name"` + ModelEndPoint string `json:"model_end_point"` + Calibration string `json:"calibration,omitempty"` + Deadline int `json:"deadline"` + BatchSize int `json:"batch_size"` + Inputs []PredatorInput `json:"inputs"` + Outputs []PredatorOutput `json:"outputs"` + RoutingConfig []RoutingConfig `json:"route_config,omitempty"` +} + +// PredatorInput represents input configuration for Predator +type PredatorInput struct { + Name string `json:"name"` + Features []string `json:"features"` + Dims []int `json:"shape"` + DataType string `json:"data_type"` +} + +// PredatorOutput represents output configuration for Predator +type PredatorOutput struct { + Name string `json:"name"` + ModelScores []string `json:"model_scores"` + ModelScoresDims [][]int `json:"model_scores_dims"` + DataType string `json:"data_type"` +} + +// RoutingConfig represents routing configuration +type RoutingConfig struct { + ModelName string `json:"model_name"` + ModelEndpoint string `json:"model_endpoint"` + RoutingPercentage float32 `json:"routing_percentage"` +} + +// FSKey represents a feature store key +type FSKey struct { + Schema string `json:"schema"` + Col string `json:"col"` +} + +// FSRequest represents a feature store request +type FSRequest struct { + Label string `json:"label"` + FeatureGroups []FSFeatureGroup `json:"featureGroups"` +} + +// FSFeatureGroup represents a feature group +type FSFeatureGroup struct { + Label string `json:"label"` + Features []string `json:"features"` + DataType string `json:"data_type"` +} From d6e3b9049012e59171415b3f859575747a291a6d Mon Sep 17 00:00:00 2001 From: dhruvgupta-meesho Date: Tue, 23 Dec 2025 15:57:47 +0530 Subject: [PATCH 2/4] changed order --- horizon/pkg/configschemaclient/client.go | 28 ++++++++++-------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/horizon/pkg/configschemaclient/client.go b/horizon/pkg/configschemaclient/client.go index 5e655eb6..ebdddc38 100644 --- a/horizon/pkg/configschemaclient/client.go +++ b/horizon/pkg/configschemaclient/client.go @@ -5,7 +5,7 @@ import ( ) // BuildFeatureSchema builds a feature schema from the component and response configs. -// It processes components in order: Iris Output → Iris Input → FS → RTP → Predator Input → Predator Output → Response Config +// It processes components in order: FS → RTP → Iris Output → Predator Output → Iris Input → Predator Input func BuildFeatureSchema(componentConfig *ComponentConfig, responseConfig *ResponseConfig) []SchemaComponents { if componentConfig == nil { return nil @@ -33,27 +33,23 @@ func BuildFeatureSchema(componentConfig *ComponentConfig, responseConfig *Respon } } - // 1. Iris Output - addUniqueComponents(processIrisOutput(componentConfig.NumerixComponents)) - - // 2. Iris Input - addUniqueComponents(processIrisInput(componentConfig.NumerixComponents)) - - // 3. FS (Feature Store) + // 1. FS (Feature Store) addUniqueComponents(processFS(componentConfig.FeatureComponents)) - // 4. RTP (Real Time Pricing) + // 2. RTP (Real Time Pricing) addUniqueComponents(processRTP(componentConfig.RTPComponents)) - // 5. Predator Input (only add if not already present) - addOrUpdateComponents(processPredatorInput(componentConfig.PredatorComponents)) + // 3. Iris Output + addUniqueComponents(processIrisOutput(componentConfig.NumerixComponents)) + + // 4. Predator Output + addUniqueComponents(processPredatorOutput(componentConfig.PredatorComponents)) - // 6. Predator Output (only add if not already present) - addOrUpdateComponents(processPredatorOutput(componentConfig.PredatorComponents)) + // 5. Iris Input (only add if not already present) + addOrUpdateComponents(processIrisInput(componentConfig.NumerixComponents)) - // 7. Response Config features - responseSchemaComponents := ProcessResponseConfig(responseConfig, response) - addUniqueComponents(responseSchemaComponents) + // 6. Predator Input (only add if not already present) + addOrUpdateComponents(processPredatorInput(componentConfig.PredatorComponents)) return response } From c702384432929ff0dbd2832c491ce8279680ab08 Mon Sep 17 00:00:00 2001 From: dhruvgupta-meesho Date: Tue, 23 Dec 2025 17:25:28 +0530 Subject: [PATCH 3/4] added cache version check --- horizon/internal/inferflow/handler/inferflow.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/horizon/internal/inferflow/handler/inferflow.go b/horizon/internal/inferflow/handler/inferflow.go index 1ee165f4..ba380f78 100644 --- a/horizon/internal/inferflow/handler/inferflow.go +++ b/horizon/internal/inferflow/handler/inferflow.go @@ -1130,6 +1130,18 @@ func (m *InferFlow) GetFeatureSchema(request FeatureSchemaRequest) (FeatureSchem componentConfig := &modelProxyConfig.ConfigValue.ComponentConfig responseConfig := &modelProxyConfig.ConfigValue.ResponseConfig + requestVersionInt, err := strconv.Atoi(request.Version) + if err != nil { + return FeatureSchemaResponse{ + Data: []inferflow.SchemaComponents{}, + }, err + } + if requestVersionInt != componentConfig.CacheVersion { + return FeatureSchemaResponse{ + Data: []inferflow.SchemaComponents{}, + }, errors.New("cache version mismatch for model config: " + request.ModelConfigId) + } + clientComponentConfig := toClientComponentConfig(componentConfig) clientResponseConfig := toClientResponseConfig(responseConfig) From 6d22c030df3a9390eec543bead7824fb35e15917 Mon Sep 17 00:00:00 2001 From: dhruvgupta-meesho Date: Tue, 23 Dec 2025 17:26:54 +0530 Subject: [PATCH 4/4] bug fix --- horizon/internal/inferflow/handler/inferflow.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/horizon/internal/inferflow/handler/inferflow.go b/horizon/internal/inferflow/handler/inferflow.go index ba380f78..e1917acc 100644 --- a/horizon/internal/inferflow/handler/inferflow.go +++ b/horizon/internal/inferflow/handler/inferflow.go @@ -1130,13 +1130,7 @@ func (m *InferFlow) GetFeatureSchema(request FeatureSchemaRequest) (FeatureSchem componentConfig := &modelProxyConfig.ConfigValue.ComponentConfig responseConfig := &modelProxyConfig.ConfigValue.ResponseConfig - requestVersionInt, err := strconv.Atoi(request.Version) - if err != nil { - return FeatureSchemaResponse{ - Data: []inferflow.SchemaComponents{}, - }, err - } - if requestVersionInt != componentConfig.CacheVersion { + if version != componentConfig.CacheVersion { return FeatureSchemaResponse{ Data: []inferflow.SchemaComponents{}, }, errors.New("cache version mismatch for model config: " + request.ModelConfigId)