feat(controller) Score controller scores batches using heuristic scorer#121
feat(controller) Score controller scores batches using heuristic scorer#121manjari25 wants to merge 3 commits into
Conversation
| func (b Batch) WithScoreAndState(score float32, state BatchState) Batch { | ||
| b.Score = score | ||
| b.State = state | ||
| b.Version++ |
There was a problem hiding this comment.
why need to increment the version? It should only be incremented after a successful atomic persistence (i.e. a write to the database).
There was a problem hiding this comment.
Now I see where it is used (after the update), the comment could reflect that.
There was a problem hiding this comment.
i am wondering if this should be in entity at all?
to me it seems like we probably need this to be part of contract at storage layer itself which accepts, entity, oldVersion, newVresion... I am not fan of implicit things as they hide away and lead to confusion/issues down the line..
Thoughts?
| func newScorer(scope tally.Scope) scorer.Scorer { | ||
| return heuristicscorer.New( | ||
| []heuristicscorer.Bucket{ | ||
| {Min: 0, Max: 100, Score: 1.0}, |
There was a problem hiding this comment.
add "TODO" for the real one?
| queue VARCHAR(255) NOT NULL, | ||
| contains JSON NOT NULL, | ||
| dependencies JSON NOT NULL, | ||
| score FLOAT NOT NULL DEFAULT 0, |
There was a problem hiding this comment.
avoid db defaults, leave it to application
| "error", err, | ||
| ) | ||
| metrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", batch.Contains[0], err)) |
There was a problem hiding this comment.
Only explicitly known errors should be converted to retryable.
For example, if database connection fails, it is retryable.
If database schema mismatch, it is not retryable.
Being not retryable is a default to avoid retry storms.
For now we can just bubble up the error unchanged with a TODO to make a classification later.
There was a problem hiding this comment.
This isn't only schema mismatch though right? Get from the store could fail for any other reason as well. Do we not want to retry in this case?
There was a problem hiding this comment.
Our default should be to not retry. Retryable errors need to be explicitly opted in as such.
For now we can put TODO here, return generic err and classify later, I envision there may be a general helper framework needed to transform i.e. mysql errors to retryable or not.
| // Update batch store with score and transition state to speculating | ||
| if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, batchScore, entity.BatchStateSpeculating); err != nil { | ||
| if errors.Is(err, storage.ErrVersionMismatch) { | ||
| c.logger.Errorw("version mismatch updating batch score", |
There was a problem hiding this comment.
If it is expected by some workflow path, should be warning at most
There was a problem hiding this comment.
I don't think we expect this by any workflow path...
There was a problem hiding this comment.
In this case just bubble the error up without processing?
|
|
||
| // Update batch store with score and transition state to speculating | ||
| if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, batchScore, entity.BatchStateSpeculating); err != nil { | ||
| if errors.Is(err, storage.ErrVersionMismatch) { |
There was a problem hiding this comment.
nit: we probably should create wrappers for it in storage.go
There was a problem hiding this comment.
Will work on this in a different PR (so we can get this one out before the grand mock refactor).
|
|
||
| // Publish to speculate topic | ||
| if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { | ||
| if err := c.publish(ctx, consumer.TopicKeySpeculate, scored); err != nil { |
There was a problem hiding this comment.
I think we publish batch_id, not the whole object
There was a problem hiding this comment.
We do - that is abstracted in func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error function below
| metrics.NamedCounter(c.metricsScope, "process", "request_not_found_errors", 1) | ||
| return fmt.Errorf("request %s not found: %w", batch.Contains[0], err) | ||
| } | ||
| c.logger.Errorw("failed to get request", |
There was a problem hiding this comment.
no need to log if bubble up
| request, err := c.store.GetRequestStore().Get(ctx, batch.Contains[0]) | ||
| if err != nil { | ||
| if storage.IsNotFound(err) { | ||
| c.logger.Errorw("request not found", |
There was a problem hiding this comment.
no need to log if bubble up
| // Look up the request to get its Change | ||
| request, err := c.store.GetRequestStore().Get(ctx, batch.Contains[0]) | ||
| if err != nil { | ||
| if storage.IsNotFound(err) { |
There was a problem hiding this comment.
do we need to process it separately? why not to just to bubble up? This is not an expected state, even when states are (eventually) inconsistent this should not be true.
| // Score the change | ||
| score, err := c.scorer.Score(ctx, request.Change) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to score change", |
There was a problem hiding this comment.
ditto
plz check other usages
| "error", err, | ||
| ) | ||
| metrics.NamedCounter(c.metricsScope, "process", "batch_store_errors", 1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to update batch %s score: %w", batch.ID, err)) |
There was a problem hiding this comment.
ditto, plz check other usages
Summary
feat(controller) Score controller scores batches using heuristic scorer
What?
Why?
Test Plan
Issues