Skip to content

feat(controller) Score controller scores batches using heuristic scorer#121

Closed
manjari25 wants to merge 3 commits into
mainfrom
manjari/batch-score-controller
Closed

feat(controller) Score controller scores batches using heuristic scorer#121
manjari25 wants to merge 3 commits into
mainfrom
manjari/batch-score-controller

Conversation

@manjari25
Copy link
Copy Markdown
Contributor

@manjari25 manjari25 commented Mar 5, 2026

Summary

feat(controller) Score controller scores batches using heuristic scorer

What?

  • Add concept of Score to batches
    • Add Score field to batch entity
    • Batch store schema change
    • Update existing Batch store methods to account for new schema
    • Add new UpdateScoreAndState method
  • Add heuristic scorer to orchestrator service
  • Use this scorer in the batch scorer controller to score a batch
  • Once scored, update the batch store with the score and state
  • Use newly added core/metrics pkg for metric emission

Why?

  • Having a score associated with a batch is needed so that in the speculation stage; speculation path scores can be computed based on this number.
  • A speculation path score will be computed based on the success/failure score of all batches in the path.

Test Plan

Issues

@manjari25 manjari25 marked this pull request as ready for review March 5, 2026 01:38
@manjari25 manjari25 requested review from a team, behinddwalls and sbalabanov as code owners March 5, 2026 01:38
Comment thread entity/batch.go
func (b Batch) WithScoreAndState(score float32, state BatchState) Batch {
b.Score = score
b.State = state
b.Version++
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to increment the version? It should only be incremented after a successful atomic persistence (i.e. a write to the database).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see where it is used (after the update), the comment could reflect that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Copy Markdown
Collaborator

@behinddwalls behinddwalls Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am wondering if this should be in entity at all?

to me it seems like we probably need this to be part of contract at storage layer itself which accepts, entity, oldVersion, newVresion... I am not fan of implicit things as they hide away and lead to confusion/issues down the line..

Thoughts?

func newScorer(scope tally.Scope) scorer.Scorer {
return heuristicscorer.New(
[]heuristicscorer.Bucket{
{Min: 0, Max: 100, Score: 1.0},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "TODO" for the real one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

queue VARCHAR(255) NOT NULL,
contains JSON NOT NULL,
dependencies JSON NOT NULL,
score FLOAT NOT NULL DEFAULT 0,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid db defaults, leave it to application

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", batch.Contains[0], err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only explicitly known errors should be converted to retryable.
For example, if database connection fails, it is retryable.
If database schema mismatch, it is not retryable.
Being not retryable is a default to avoid retry storms.
For now we can just bubble up the error unchanged with a TODO to make a classification later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't only schema mismatch though right? Get from the store could fail for any other reason as well. Do we not want to retry in this case?

Copy link
Copy Markdown
Contributor

@sbalabanov sbalabanov Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our default should be to not retry. Retryable errors need to be explicitly opted in as such.
For now we can put TODO here, return generic err and classify later, I envision there may be a general helper framework needed to transform i.e. mysql errors to retryable or not.

// Update batch store with score and transition state to speculating
if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, batchScore, entity.BatchStateSpeculating); err != nil {
if errors.Is(err, storage.ErrVersionMismatch) {
c.logger.Errorw("version mismatch updating batch score",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is expected by some workflow path, should be warning at most

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we expect this by any workflow path...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case just bubble the error up without processing?


// Update batch store with score and transition state to speculating
if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, batchScore, entity.BatchStateSpeculating); err != nil {
if errors.Is(err, storage.ErrVersionMismatch) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we probably should create wrappers for it in storage.go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will work on this in a different PR (so we can get this one out before the grand mock refactor).


// Publish to speculate topic
if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil {
if err := c.publish(ctx, consumer.TopicKeySpeculate, scored); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we publish batch_id, not the whole object

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do - that is abstracted in func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error function below

metrics.NamedCounter(c.metricsScope, "process", "request_not_found_errors", 1)
return fmt.Errorf("request %s not found: %w", batch.Contains[0], err)
}
c.logger.Errorw("failed to get request",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to log if bubble up

request, err := c.store.GetRequestStore().Get(ctx, batch.Contains[0])
if err != nil {
if storage.IsNotFound(err) {
c.logger.Errorw("request not found",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to log if bubble up

// Look up the request to get its Change
request, err := c.store.GetRequestStore().Get(ctx, batch.Contains[0])
if err != nil {
if storage.IsNotFound(err) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to process it separately? why not to just to bubble up? This is not an expected state, even when states are (eventually) inconsistent this should not be true.

// Score the change
score, err := c.scorer.Score(ctx, request.Change)
if err != nil {
c.logger.Errorw("failed to score change",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto
plz check other usages

"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "batch_store_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to update batch %s score: %w", batch.ID, err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, plz check other usages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants