Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions horizon/internal/inferflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config interface {
ExecuteFuncitonalTestRequest(ctx *gin.Context)
GetLatestRequest(ctx *gin.Context)
GetLoggingTTL(ctx *gin.Context)
GetFeatureSchema(ctx *gin.Context)
}

var (
Expand Down Expand Up @@ -365,3 +366,15 @@ func (c *V1) GetLoggingTTL(ctx *gin.Context) {
}
ctx.JSON(200, response)
}

func (c *V1)GetFeatureSchema(ctx *gin.Context) {
response, err := c.Config.GetFeatureSchema(handler.FeatureSchemaRequest{
ModelConfigId: ctx.Query("model_config_id"),
Version: strings.TrimSpace(ctx.Query("version")),
})
if err != nil {
ctx.JSON(api.NewBadRequestError(err.Error()).StatusCode, "Error getting feature schema")
return
}
ctx.JSON(200, response)
}
1 change: 1 addition & 0 deletions horizon/internal/inferflow/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ type Config interface {
ExecuteFuncitonalTestRequest(request ExecuteRequestFunctionalTestingRequest) (ExecuteRequestFunctionalTestingResponse, error)
GetLatestRequest(requestID string) (GetLatestRequestResponse, error)
GetLoggingTTL() (GetLoggingTTLResponse, error)
GetFeatureSchema(FeatureSchemaRequest) (FeatureSchemaResponse, error)
}
267 changes: 267 additions & 0 deletions horizon/internal/inferflow/handler/inferflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
inferflow_config "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/inferflow/config"
inferflow_request "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/inferflow/request"
service_deployable_config "github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/servicedeployableconfig"
configschemaclient "github.com/Meesho/BharatMLStack/horizon/pkg/configschemaclient"
"github.com/Meesho/BharatMLStack/horizon/pkg/grpc"
"github.com/Meesho/BharatMLStack/horizon/pkg/infra"
"github.com/Meesho/BharatMLStack/horizon/pkg/random"
Expand Down Expand Up @@ -1110,3 +1111,269 @@ func (m *InferFlow) GetLoggingTTL() (GetLoggingTTLResponse, error) {
Data: []int{30, 60, 90},
}, nil
}

func (m *InferFlow) GetFeatureSchema(request FeatureSchemaRequest) (FeatureSchemaResponse, error) {
version, err := strconv.Atoi(request.Version)
if err != nil {
return FeatureSchemaResponse{
Data: []inferflow.SchemaComponents{},
}, err
}
modelProxyRequests, err := m.InferFlowRequestRepo.GetByConfigIDandVersion(request.ModelConfigId, version)
if err != nil {
log.Error().Err(err).Str("model_config_id", request.ModelConfigId).Msg("Failed to get model proxy config")
return FeatureSchemaResponse{
Data: []inferflow.SchemaComponents{},
}, err
}
modelProxyConfig := modelProxyRequests[0].Payload
componentConfig := &modelProxyConfig.ConfigValue.ComponentConfig
responseConfig := &modelProxyConfig.ConfigValue.ResponseConfig

if version != componentConfig.CacheVersion {
return FeatureSchemaResponse{
Data: []inferflow.SchemaComponents{},
}, errors.New("cache version mismatch for model config: " + request.ModelConfigId)
}

clientComponentConfig := toClientComponentConfig(componentConfig)
clientResponseConfig := toClientResponseConfig(responseConfig)

response := configschemaclient.BuildFeatureSchema(clientComponentConfig, clientResponseConfig)

if responseConfig.LogSelectiveFeatures {
clientSchemaComponents := configschemaclient.ProcessResponseConfig(clientResponseConfig, response)
log.Info().Str("model_config_id", request.ModelConfigId).Int("schema_components_count", len(clientSchemaComponents)).Msg("Successfully generated feature schema")
return FeatureSchemaResponse{
Data: toInferflowSchemaComponents(clientSchemaComponents),
}, nil
}

log.Info().Str("model_config_id", request.ModelConfigId).Int("schema_components_count", len(response)).Msg("Successfully generated feature schema")
return FeatureSchemaResponse{
Data: toInferflowSchemaComponents(response),
}, nil
}

func toClientComponentConfig(config *inferflow.ComponentConfig) *configschemaclient.ComponentConfig {
if config == nil {
return nil
}

return &configschemaclient.ComponentConfig{
CacheEnabled: config.CacheEnabled,
CacheTTL: config.CacheTTL,
CacheVersion: config.CacheVersion,
FeatureComponents: toClientFeatureComponents(config.FeatureComponents),
RTPComponents: toClientRTPComponents(config.RTPComponents),
PredatorComponents: toClientPredatorComponents(config.PredatorComponents),
NumerixComponents: toClientNumerixComponents(config.NumerixComponents),
}
}

func toClientResponseConfig(config *inferflow.ResponseConfig) *configschemaclient.ResponseConfig {
if config == nil {
return nil
}

return &configschemaclient.ResponseConfig{
LoggingPerc: config.LoggingPerc,
ModelSchemaPerc: config.ModelSchemaPerc,
Features: config.Features,
LogSelectiveFeatures: config.LogSelectiveFeatures,
LogBatchSize: config.LogBatchSize,
}
}

func toClientNumerixComponents(components []inferflow.NumerixComponent) []configschemaclient.NumerixComponent {
if len(components) == 0 {
return nil
}

result := make([]configschemaclient.NumerixComponent, len(components))
for i, c := range components {
result[i] = configschemaclient.NumerixComponent{
Component: c.Component,
ComponentID: c.ComponentID,
ScoreCol: c.ScoreCol,
ComputeID: c.ComputeID,
ScoreMapping: c.ScoreMapping,
DataType: c.DataType,
}
}
return result
}

func toClientFeatureComponents(components []inferflow.FeatureComponent) []configschemaclient.FeatureComponent {
if len(components) == 0 {
return nil
}

result := make([]configschemaclient.FeatureComponent, len(components))
for i, c := range components {
result[i] = configschemaclient.FeatureComponent{
Component: c.Component,
ComponentID: c.ComponentID,
ColNamePrefix: c.ColNamePrefix,
CompCacheEnabled: c.CompCacheEnabled,
CompCacheTTL: c.CompCacheTTL,
CompositeID: c.CompositeID,
FSKeys: toClientFSKeys(c.FSKeys),
FSRequest: toClientFSRequest(c.FSRequest),
FSFlattenRespKeys: c.FSFlattenRespKeys,
}
}
return result
}

func toClientRTPComponents(components []inferflow.RTPComponent) []configschemaclient.RTPComponent {
if len(components) == 0 {
return nil
}

result := make([]configschemaclient.RTPComponent, len(components))
for i, c := range components {
result[i] = configschemaclient.RTPComponent{
Component: c.Component,
ComponentID: c.ComponentID,
CompositeID: c.CompositeID,
FSKeys: toClientFSKeys(c.FSKeys),
FSRequest: toClientFSRequest(c.FSRequest),
FSFlattenRespKeys: c.FSFlattenRespKeys,
ColNamePrefix: c.ColNamePrefix,
CompCacheEnabled: c.CompCacheEnabled,
}
}
return result
}

func toClientPredatorComponents(components []inferflow.PredatorComponent) []configschemaclient.PredatorComponent {
if len(components) == 0 {
return nil
}

result := make([]configschemaclient.PredatorComponent, len(components))
for i, c := range components {
result[i] = configschemaclient.PredatorComponent{
Component: c.Component,
ComponentID: c.ComponentID,
ModelName: c.ModelName,
ModelEndPoint: c.ModelEndPoint,
Calibration: c.Calibration,
Deadline: c.Deadline,
BatchSize: c.BatchSize,
Inputs: toClientPredatorInputs(c.Inputs),
Outputs: toClientPredatorOutputs(c.Outputs),
RoutingConfig: toClientRoutingConfigs(c.RoutingConfig),
}
}
return result
}

func toClientFSKeys(keys []inferflow.FSKey) []configschemaclient.FSKey {
if len(keys) == 0 {
return nil
}

result := make([]configschemaclient.FSKey, len(keys))
for i, k := range keys {
result[i] = configschemaclient.FSKey{
Schema: k.Schema,
Col: k.Col,
}
}
return result
}

func toClientFSRequest(req *inferflow.FSRequest) *configschemaclient.FSRequest {
if req == nil {
return nil
}

return &configschemaclient.FSRequest{
Label: req.Label,
FeatureGroups: toClientFSFeatureGroups(req.FeatureGroups),
}
}

func toClientFSFeatureGroups(groups []inferflow.FSFeatureGroup) []configschemaclient.FSFeatureGroup {
if len(groups) == 0 {
return nil
}

result := make([]configschemaclient.FSFeatureGroup, len(groups))
for i, g := range groups {
result[i] = configschemaclient.FSFeatureGroup{
Label: g.Label,
Features: g.Features,
DataType: g.DataType,
}
}
return result
}

func toClientPredatorInputs(inputs []inferflow.PredatorInput) []configschemaclient.PredatorInput {
if len(inputs) == 0 {
return nil
}

result := make([]configschemaclient.PredatorInput, len(inputs))
for i, input := range inputs {
result[i] = configschemaclient.PredatorInput{
Name: input.Name,
Features: input.Features,
Dims: input.Dims,
DataType: input.DataType,
}
}
return result
}

func toClientPredatorOutputs(outputs []inferflow.PredatorOutput) []configschemaclient.PredatorOutput {
if len(outputs) == 0 {
return nil
}

result := make([]configschemaclient.PredatorOutput, len(outputs))
for i, output := range outputs {
result[i] = configschemaclient.PredatorOutput{
Name: output.Name,
ModelScores: output.ModelScores,
ModelScoresDims: output.ModelScoresDims,
DataType: output.DataType,
}
}
return result
}

func toClientRoutingConfigs(configs []inferflow.RoutingConfig) []configschemaclient.RoutingConfig {
if len(configs) == 0 {
return nil
}

result := make([]configschemaclient.RoutingConfig, len(configs))
for i, c := range configs {
result[i] = configschemaclient.RoutingConfig{
ModelName: c.ModelName,
ModelEndpoint: c.ModelEndpoint,
RoutingPercentage: c.RoutingPercentage,
}
}
return result
}

func toInferflowSchemaComponents(components []configschemaclient.SchemaComponents) []inferflow.SchemaComponents {
if len(components) == 0 {
return nil
}

result := make([]inferflow.SchemaComponents, len(components))
for i, c := range components {
result[i] = inferflow.SchemaComponents{
FeatureName: c.FeatureName,
FeatureType: c.FeatureType,
FeatureSize: c.FeatureSize,
}
}
return result
}
9 changes: 9 additions & 0 deletions horizon/internal/inferflow/handler/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,12 @@ type RTPFeatureGroup struct {
Features []string `json:"features"`
DataType string `json:"dataType"`
}

type FeatureSchemaRequest struct {
ModelConfigId string `json:"model_config_id"`
Version string `json:"version"`
}

type FeatureSchemaResponse struct {
Data []dbModel.SchemaComponents `json:"data"`
}
1 change: 1 addition & 0 deletions horizon/internal/inferflow/route/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func Init() {
register.GET("/logging-ttl", controller.NewConfigController().GetLoggingTTL)
register.PATCH("/delete", controller.NewConfigController().Delete)
register.GET("/latestRequest/:config_id", controller.NewConfigController().GetLatestRequest)
register.GET("/get_feature_schema", controller.NewConfigController().GetFeatureSchema)
}

discovery := v1.Group("/inferflow-config-discovery")
Expand Down
3 changes: 2 additions & 1 deletion horizon/internal/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (m *MiddlewareHandler) AuthMiddleware() gin.HandlerFunc {
if strings.HasPrefix(c.Request.URL.Path, "/login") ||
strings.HasPrefix(c.Request.URL.Path, "/register") ||
strings.HasPrefix(c.Request.URL.Path, "/health") ||
strings.HasPrefix(c.Request.URL.Path, "/api/1.0/fs-config") {
strings.HasPrefix(c.Request.URL.Path, "/api/1.0/fs-config")||
strings.HasPrefix(c.Request.URL.Path, "/api/v1/inferflow-config-registry/get_feature_schema") {
c.Next()
return
}
Expand Down
10 changes: 10 additions & 0 deletions horizon/internal/repositories/sql/inferflow/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ type TestResults struct {
Message string `json:"message"`
}

type GetSchemaResponse struct {
Components []SchemaComponents
}

type SchemaComponents struct {
FeatureName string `json:"feature_name"`
FeatureType string `json:"feature_type"`
FeatureSize any `json:"feature_size"`
}

func (p *Payload) Scan(value interface{}) error {
if value == nil {
*p = Payload{}
Expand Down
13 changes: 13 additions & 0 deletions horizon/internal/repositories/sql/inferflow/request/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Repository interface {
Deactivate(configID string) error
GetLatestPendingRequestByConfigID(configID string) ([]Table, error)
GetApprovedRequestsByConfigID(configID string) ([]Table, error)
GetByConfigIDandVersion(configID string, version int) ([]Table, error)
}

type InferflowRequest struct {
Expand Down Expand Up @@ -144,3 +145,15 @@ func (g *InferflowRequest) GetLatestPendingRequestByConfigID(configID string) ([
Find(&tables)
return tables, result.Error
}

func (g *InferflowRequest) GetByConfigIDandVersion(configID string, version int) ([]Table, error) {
var tables []Table
result := g.db.Where("config_id = ? AND version = ? AND status = ?", configID, version, "APPROVED").Find(&tables)
if result.Error != nil {
return nil, result.Error
}
if len(tables) == 0 {
return nil, errors.New("no request found with the given config_id and version")
}
return tables, result.Error
}
Loading