Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
363041b
internal complete code migration
pavan-adari-meesho Jan 30, 2026
614d483
internal components interface methods
pavan-adari-meesho Feb 2, 2026
c3762fd
revert pre-commit config
pavan-adari-meesho Feb 2, 2026
bb796ed
removed redundant structs
pavan-adari-meesho Feb 2, 2026
f380623
error formatting fixes
pavan-adari-meesho Feb 3, 2026
c7ee967
predator sync
pavan-adari-meesho Feb 3, 2026
1aa8d32
predator init fixes
pavan-adari-meesho Feb 3, 2026
19c0d77
code rabbit issues
pavan-adari-meesho Feb 3, 2026
0423a9e
bulk delete changes
pavan-adari-meesho Feb 3, 2026
9754fcb
schema client separation and fixes
pavan-adari-meesho Feb 3, 2026
b845141
removing redundant functions and error formatting
pavan-adari-meesho Feb 3, 2026
2199ed2
minor bug fix
pavan-adari-meesho Feb 3, 2026
8a8157a
further refractoring for coderabbit changes
pavan-adari-meesho Feb 3, 2026
340983e
model name extraction fix
pavan-adari-meesho Feb 4, 2026
8ecb0ea
return error on no model files found
pavan-adari-meesho Feb 4, 2026
649e89d
capitilization
pavan-adari-meesho Feb 4, 2026
4ec6b12
schema client refractor
pavan-adari-meesho Feb 5, 2026
1303975
int to preprod name convention
pavan-adari-meesho Feb 5, 2026
7135c92
etcd name fixes and refractors
pavan-adari-meesho Feb 9, 2026
a07f6f2
refractors and dev toggle script fix
pavan-adari-meesho Feb 9, 2026
16aeb6c
predator handler refractor and gcs client minor fixes
pavan-adari-meesho Feb 10, 2026
2968ade
inferflow refractor into multiple files
pavan-adari-meesho Feb 10, 2026
a254d78
Edit instance count created
paras-agarwal-meesho Feb 10, 2026
bac72ce
Instance count updation while preserving original formatting
paras-agarwal-meesho Feb 11, 2026
17f3abf
replaceInstanceCountInConfigPreservingFormat refined and test cases c…
paras-agarwal-meesho Feb 11, 2026
b83d40a
Real config.pbtxt test cases added
paras-agarwal-meesho Feb 12, 2026
efcb9b6
Merge branch 'develop' into feature/edit-instance-count
paras-agarwal-meesho Feb 13, 2026
9b9f087
Outdated code removed
paras-agarwal-meesho Feb 13, 2026
57f1b94
Regex usage replaced with proto for horizon
paras-agarwal-meesho Feb 20, 2026
d16a7f2
Refactoring
paras-agarwal-meesho Feb 20, 2026
dec6fbc
Merge branch 'develop' into feature/edit-instance-count
paras-agarwal-meesho Feb 20, 2026
f5993a1
Improvements
paras-agarwal-meesho Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 33 additions & 42 deletions horizon/internal/externalcall/gcs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
"github.com/rs/zerolog/log"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/prototext"
)

type GCSClientInterface interface {
Expand Down Expand Up @@ -268,7 +269,10 @@ func (g *GCSClient) transferSingleConfigFile(objAttrs storage.ObjectAttrs, srcBu

// Replace model name
log.Info().Msgf("Processing config.pbtxt file: %s -> %s", objAttrs.Name, destObjectPath)
modified := replaceModelNameInConfig(content, destModelName)
modified, err := ReplaceModelNameInConfig(content, destModelName)
if err != nil {
return fmt.Errorf("failed to replace model name: %w", err)
}

// Upload modified content
destWriter := g.client.Bucket(destBucket).Object(destObjectPath).NewWriter(g.ctx)
Expand Down Expand Up @@ -464,49 +468,36 @@ func (g *GCSClient) TransferAndDeleteFolder(srcBucket, srcPath, srcModelName, de

// replaceModelNameInConfig modifies only the top-level `name:` field in config.pbtxt content
// It replaces only the first occurrence to avoid modifying nested names in inputs/outputs/instance_groups
func replaceModelNameInConfig(data []byte, destModelName string) []byte {
content := string(data)
lines := strings.Split(content, "\n")

for i, line := range lines {
trimmed := strings.TrimSpace(line)
// Match top-level "name:" field - should be at the start of line (or minimal indentation)
// Skip nested names which are typically indented with 2+ spaces
if strings.HasPrefix(trimmed, "name:") {
// Check indentation: top-level fields have minimal/no indentation
leadingWhitespace := len(line) - len(strings.TrimLeft(line, " \t"))
// Skip if heavily indented (nested field)
if leadingWhitespace >= 2 {
continue
}
func ReplaceModelNameInConfig(data []byte, destModelName string) ([]byte, error) {
if destModelName == "" {
return nil, fmt.Errorf("destination model name cannot be empty")
}

// Match the first occurrence of name: "value" pattern
namePattern := regexp.MustCompile(`name\s*:\s*"([^"]+)"`)
matches := namePattern.FindStringSubmatch(line)
if len(matches) > 1 {
oldModelName := matches[1]
// Replace only the FIRST occurrence to avoid replacing nested names
loc := namePattern.FindStringIndex(line)
if loc != nil {
// Replace only the matched portion (first occurrence)
before := line[:loc[0]]
matched := line[loc[0]:loc[1]]
after := line[loc[1]:]
// Replace the value inside quotes while preserving the "name:" format
valuePattern := regexp.MustCompile(`"([^"]+)"`)
valueReplaced := valuePattern.ReplaceAllString(matched, fmt.Sprintf(`"%s"`, destModelName))
lines[i] = before + valueReplaced + after
} else {
// Fallback: replace all (shouldn't happen with valid input)
lines[i] = namePattern.ReplaceAllString(line, fmt.Sprintf(`name: "%s"`, destModelName))
}
log.Info().Msgf("Replacing top-level model name in config.pbtxt: '%s' -> '%s'", oldModelName, destModelName)
break
}
}
var modelConfig protogen.ModelConfig

if err := prototext.Unmarshal(data, &modelConfig); err != nil {
return nil, fmt.Errorf("failed to parse config.pbtxt: %w", err)
}

return []byte(strings.Join(lines, "\n"))
oldModelName := modelConfig.Name
modelConfig.Name = destModelName

opts := prototext.MarshalOptions{
Multiline: true,
Indent: " ",
}

out, err := opts.Marshal(&modelConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal updated config.pbtxt: %w", err)
}

log.Info().
Str("old_model_name", oldModelName).
Str("new_model_name", destModelName).
Msg("replaced top-level model name in config.pbtxt")

return out, nil
}

func (g *GCSClient) ListFolders(bucket, prefix string) ([]string, error) {
Expand Down
64 changes: 37 additions & 27 deletions horizon/internal/externalcall/gcs_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,60 @@ import (

func TestReplaceModelNameInConfig(t *testing.T) {
tests := []struct {
name string
data []byte
destModelName string
expectContains string
name string
data []byte
destModelName string
wantTopLevel string
wantNested string
expectError bool
}{
{
name: "replaces top-level name only",
data: []byte(`name: "old_model"
data: []byte(`
name: "old_model"
instance_group {
name: "old_model"
name: "nested_model"
}
`),
destModelName: "new_model",
expectContains: `name: "new_model"`,
destModelName: "new_model",
wantTopLevel: `"new_model"`,
wantNested: "nested_model",
},
{
name: "preserves nested name with indentation",
data: []byte(`name: "top_level"
instance_group {
name: "nested_name"
}
`),
destModelName: "replaced",
expectContains: `name: "replaced"`,
name: "single line config",
data: []byte(`name: "single_model"` + "\n"),
destModelName: "replaced_model",
wantTopLevel: `"replaced_model"`,
},
{
name: "single line config",
data: []byte(`name: "single_model"` + "\n"),
destModelName: "replaced_model",
expectContains: `name: "replaced_model"`,
name: "empty dest model name returns error",
data: []byte(`name: "some_model"`),
destModelName: "",
expectError: true,
},
{
name: "no name field returns unchanged",
data: []byte(`platform: "tensorflow"
version: 1
`),
name: "malformed pbtxt returns error",
data: []byte(`name: "unclosed`),
destModelName: "any",
expectContains: `platform: "tensorflow"`,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := replaceModelNameInConfig(tt.data, tt.destModelName)
assert.Contains(t, string(got), tt.expectContains)
got, err := ReplaceModelNameInConfig(tt.data, tt.destModelName)

if tt.expectError {
require.Error(t, err)
return
}

require.NoError(t, err)
assert.Contains(t, string(got), tt.wantTopLevel)

if tt.wantNested != "" {
assert.Contains(t, string(got), tt.wantNested)
}
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion horizon/internal/predator/handler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package handler
import (
"encoding/json"
"time"

"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

type Payload struct {
Expand Down Expand Up @@ -169,7 +171,7 @@ type ModelParamsResponse struct {
Backend string `json:"backend"`
DynamicBatchingEnabled bool `json:"dynamic_batching_enabled"`
Platform string `json:"platform"`
EnsembleScheduling *ModelEnsembling `json:"ensemble_scheduling,omitempty"`
EnsembleScheduling *protogen.ModelEnsembling `json:"ensemble_scheduling,omitempty"`
}

type RequestGenerationRequest struct {
Expand Down
11 changes: 6 additions & 5 deletions horizon/internal/predator/handler/predator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/Meesho/BharatMLStack/horizon/pkg/random"
"github.com/Meesho/BharatMLStack/horizon/pkg/serializer"
"github.com/rs/zerolog/log"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

type Predator struct {
Expand Down Expand Up @@ -353,7 +354,7 @@ func (p *Predator) FetchModelConfig(req FetchModelConfigRequest) (ModelParamsRes
}
}

var modelConfig ModelConfig
var modelConfig protogen.ModelConfig
if err := prototext.Unmarshal(configData, &modelConfig); err != nil {
return ModelParamsResponse{}, http.StatusInternalServerError, fmt.Errorf(errUnmarshalProtoFormat, err)
}
Expand Down Expand Up @@ -391,7 +392,7 @@ func parseModelPath(modelPath string) (bucket, objectPath string) {
return parts[0], parts[1]
}

func validateModelConfig(cfg *ModelConfig) error {
func validateModelConfig(cfg *protogen.ModelConfig) error {
switch {
case cfg.Name == constant.EmptyString:
return errors.New(errModelNameMissing)
Expand Down Expand Up @@ -423,7 +424,7 @@ func convertFields(name string, dims []int64, dataType string) (IO, bool) {
}, true
}

func convertInputWithFeatures(fields []*ModelInput, featureMap map[string][]string) []IO {
func convertInputWithFeatures(fields []*protogen.ModelInput, featureMap map[string][]string) []IO {
ios := make([]IO, 0, len(fields))
for _, f := range fields {
if io, ok := convertFields(f.Name, f.Dims, f.DataType.String()); ok {
Expand All @@ -437,7 +438,7 @@ func convertInputWithFeatures(fields []*ModelInput, featureMap map[string][]stri
return ios
}

func convertOutput(fields []*ModelOutput) []IO {
func convertOutput(fields []*protogen.ModelOutput) []IO {
ios := make([]IO, 0, len(fields))
for _, f := range fields {
if io, ok := convertFields(f.Name, f.Dims, f.DataType.String()); ok {
Expand All @@ -447,7 +448,7 @@ func convertOutput(fields []*ModelOutput) []IO {
return ios
}

func createModelParamsResponse(modelConfig *ModelConfig, objectPath string, inputs, outputs []IO) ModelParamsResponse {
func createModelParamsResponse(modelConfig *protogen.ModelConfig, objectPath string, inputs, outputs []IO) ModelParamsResponse {
var resp ModelParamsResponse

if len(modelConfig.InstanceGroup) > 0 {
Expand Down
56 changes: 56 additions & 0 deletions horizon/internal/predator/handler/predator_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"time"

Expand All @@ -13,7 +14,9 @@ import (
"github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/predatorconfig"
"github.com/Meesho/BharatMLStack/horizon/internal/repositories/sql/predatorrequest"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/prototext"
"gorm.io/gorm"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
)

func (p *Predator) processRequest(requestIdPayloadMap map[uint]*Payload, predatorRequestList []predatorrequest.PredatorRequest, req ApproveRequest) {
Expand Down Expand Up @@ -191,6 +194,12 @@ func (p *Predator) processEditGCSCopyStage(requestIdPayloadMap map[uint]*Payload
} else {
configBucket := pred.GcsConfigBucket
configPath := pred.GcsConfigBasePath
if configBucket != "" && configPath != "" && payload.MetaData.InstanceCount > 0 {
if err := p.updateInstanceCountInConfigSource(configBucket, configPath, modelName, payload.MetaData.InstanceCount); err != nil {
log.Error().Err(err).Msgf("Failed to update instance count in config-source for model %s", modelName)
return transferredGcsModelData, err
}
}
Comment on lines +197 to +202
Copy link

@coderabbitai coderabbitai bot Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Config-source update lacks rollback on later failures.

If the instance_count update succeeds but the subsequent GCS transfer / DB update / restart fails, the config-source stays mutated while the request is marked failed. Consider deferring this update until success or capturing the original config and restoring it on any downstream failure.

πŸ€– Prompt for AI Agents
In `@horizon/internal/predator/handler/predator_approval.go` around lines 196 -
201, The config-source update (call to updateInstanceCountInConfigSource) is
applied before downstream steps (GCS transfer, DB update, restart) and is not
rolled back on failure; either move the update to after those steps succeed or
implement a rollback: read and cache the original config before calling
updateInstanceCountInConfigSource (use the same
configBucket/configPath/modelName context and the local variable
transferredGcsModelData for flow), then if any subsequent operation (the GCS
transfer, DB update, or restart code paths that follow this block) returns an
error, call a restore function to write the cached original config back;
alternatively, defer performing updateInstanceCountInConfigSource until all
downstream operations succeed and then call it as the final step.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutation is not harmful and rollback will cause more read writes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paras-agarwal-meesho, understood. I acknowledge your decision to proceed without rollback for the config-source update to minimize read/write operations.


✏️ Learnings added
Learnt from: paras-agarwal-meesho
Repo: Meesho/BharatMLStack PR: 289
File: horizon/internal/predator/handler/predator_approval.go:196-201
Timestamp: 2026-02-17T07:03:43.427Z
Learning: In the Predator handler (horizon/internal/predator/handler), config-source mutations (e.g., instance count updates) are intentionally not rolled back on downstream failures to avoid additional read/write operations, as these mutations are considered non-harmful even if the request ultimately fails.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

if err := p.GcsClient.TransferFolderWithSplitSources(
sourceBucket, sourceBasePath, configBucket, configPath,
sourceModelName, targetBucket, targetPath, modelName,
Expand All @@ -212,6 +221,53 @@ func (p *Predator) processEditGCSCopyStage(requestIdPayloadMap map[uint]*Payload
return transferredGcsModelData, nil
}

func (p *Predator) updateInstanceCountInConfigSource(bucket, basePath, modelName string, instanceCount int) error {
if modelName == "" {
return fmt.Errorf("model name is empty, required to update instance count in config-source")
}

configPath := path.Join(basePath, modelName, configFile)
configData, err := p.GcsClient.ReadFile(bucket, configPath)
if err != nil {
return fmt.Errorf("failed to read config.pbtxt from config-source for model %s: %w", modelName, err)
}

var modelConfig protogen.ModelConfig
if err := prototext.Unmarshal(configData, &modelConfig); err != nil {
return fmt.Errorf("failed to parse config.pbtxt from config-source for model %s: %w", modelName, err)
}
if len(modelConfig.InstanceGroup) == 0 {
return fmt.Errorf("%s (model %s)", errNoInstanceGroup, modelName)
}

currentCount := modelConfig.InstanceGroup[0].Count
if currentCount == int32(instanceCount) {
log.Info().
Str("model", modelName).
Int("instance_count", instanceCount).
Msg("instance_count unchanged, skipping config update")
return nil
}

modelConfig.InstanceGroup[0].Count = int32(instanceCount)

opts := prototext.MarshalOptions{
Multiline: true,
Indent: " ",
}

newConfigData, err := opts.Marshal(&modelConfig)
if err != nil {
return fmt.Errorf("failed to marshal config.pbtxt for model %s: %w", modelName, err)
}
if err := p.GcsClient.UploadFile(bucket, configPath, newConfigData); err != nil {
return fmt.Errorf("failed to upload config.pbtxt to config-source for model %s: %w", modelName, err)
}

log.Info().Msgf("Updated instance_count to %d in config-source for model %s", instanceCount, modelName)
return nil
}

// processEditDBUpdateStage updates predator config for edit approval
// This updates the existing predator config with new config.pbtxt and metadata.json changes
func (p *Predator) processEditDBUpdateStage(requestIdPayloadMap map[uint]*Payload, predatorRequestList []predatorrequest.PredatorRequest, approvedBy string) error {
Expand Down
15 changes: 10 additions & 5 deletions horizon/internal/predator/handler/predator_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/Meesho/BharatMLStack/horizon/internal/externalcall"
pred "github.com/Meesho/BharatMLStack/horizon/internal/predator"
"github.com/Meesho/BharatMLStack/horizon/internal/predator/proto/protogen"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/prototext"
)
Expand Down Expand Up @@ -88,8 +89,10 @@ func (p *Predator) copyConfigToProdConfigSource(gcsPath, modelName string) error
return fmt.Errorf("failed to read config.pbtxt from source: %w", err)
}

updatedConfigData := p.replaceModelNameInConfigPreservingFormat(configData, modelName)

updatedConfigData, err := externalcall.ReplaceModelNameInConfig(configData, modelName)
if err != nil {
return fmt.Errorf("failed to replace model name in config: %w", err)
}
destConfigPath := path.Join(pred.GcsConfigBasePath, modelName, configFile)
if err := p.GcsClient.UploadFile(pred.GcsConfigBucket, destConfigPath, updatedConfigData); err != nil {
return fmt.Errorf("failed to upload config.pbtxt to config source: %w", err)
Expand Down Expand Up @@ -437,7 +440,7 @@ func (p *Predator) validateModelConfiguration(gcsPath string) error {
return fmt.Errorf("failed to read config.pbtxt from %s/%s: %w", srcBucket, configPath, err)
}

var modelConfig ModelConfig
var modelConfig protogen.ModelConfig
if err := prototext.Unmarshal(configData, &modelConfig); err != nil {
return fmt.Errorf("failed to parse config.pbtxt as proto: %w", err)
}
Expand Down Expand Up @@ -481,8 +484,10 @@ func (p *Predator) copyConfigToNewNameInConfigSource(oldModelName, newModelName
return fmt.Errorf("failed to read config.pbtxt from %s: %w", srcConfigPath, err)
}

updatedConfigData := p.replaceModelNameInConfigPreservingFormat(configData, newModelName)

updatedConfigData, err := externalcall.ReplaceModelNameInConfig(configData, newModelName)
if err != nil {
return fmt.Errorf("failed to replace model name in config: %w", err)
}
if err := p.GcsClient.UploadFile(pred.GcsConfigBucket, destConfigPath, updatedConfigData); err != nil {
return fmt.Errorf("failed to upload config.pbtxt to %s: %w", destConfigPath, err)
}
Expand Down
Loading