@@ -16,6 +16,7 @@ package batch
1616
1717import (
1818 "context"
19+ "errors"
1920 "fmt"
2021
2122 "github.com/uber-go/tally/v4"
@@ -72,12 +73,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
7273
7374 msg := delivery .Message ()
7475
75- // Deserialize request entity
76- request , err := entity .RequestFromBytes (msg .Payload )
76+ // Deserialize request ID from payload
77+ rid , err := entity .RequestIDFromBytes (msg .Payload )
7778 if err != nil {
7879 c .metricsScope .Counter ("deserialize_errors" ).Inc (1 )
79- // Non-retryable: malformed messages will never succeed regardless of retry count
80- return fmt .Errorf ("failed to deserialize request: %w" , err )
80+ return fmt .Errorf ("failed to deserialize request ID: %w" , err )
81+ }
82+
83+ // Fetch request from storage
84+ request , err := c .store .GetRequestStore ().Get (ctx , rid .ID )
85+ if err != nil {
86+ c .metricsScope .Counter ("storage_errors" ).Inc (1 )
87+ return fmt .Errorf ("failed to get request %s: %w" , rid .ID , err )
8188 }
8289
8390 c .logger .Infow ("received batch event" ,
@@ -139,11 +146,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
139146 if err == nil {
140147 bd .Dependents = append (existing .Dependents , bd .Dependents ... )
141148 }
149+
150+ if err := c .store .GetBatchDependentStore ().Create (ctx , bd ); err != nil && ! errors .Is (err , storage .ErrAlreadyExists ) {
151+ c .metricsScope .Counter ("batch_dependent_store_errors" ).Inc (1 )
152+ return fmt .Errorf ("failed to create batch dependent for batchID=%s: %w" , dep .ID , err )
153+ }
142154 }
143155
144- // TODO:
145- // - Add batch to DB
146- // - Add to batch dependent DB
156+ // Persist batch to storage (idempotent — ErrAlreadyExists means a retry)
157+ if err := c .store .GetBatchStore ().Create (ctx , batch ); err != nil && ! errors .Is (err , storage .ErrAlreadyExists ) {
158+ c .metricsScope .Counter ("batch_store_errors" ).Inc (1 )
159+ return fmt .Errorf ("failed to create batch: %w" , err )
160+ }
147161
148162 c .logger .Infow ("batch created" ,
149163 "batch_id" , batch .ID ,
@@ -153,7 +167,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
153167 )
154168
155169 // Publish to score topic
156- if err := c .publish (ctx , consumer .TopicKeyScore , batch ); err != nil {
170+ if err := c .publish (ctx , consumer .TopicKeyScore , batch . ID , batch . Queue ); err != nil {
157171 c .metricsScope .Counter ("publish_errors" ).Inc (1 )
158172 return fmt .Errorf ("failed to publish to score: %w" , err )
159173 }
@@ -168,14 +182,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
168182 return nil // Success - message will be acked
169183}
170184
171- // publish publishes a batch to the specified topic key.
172- func (c * Controller ) publish (ctx context.Context , key consumer.TopicKey , batch entity.Batch ) error {
173- payload , err := batch .ToBytes ()
185+ // publish publishes a batch ID to the specified topic key.
186+ func (c * Controller ) publish (ctx context.Context , key consumer.TopicKey , batchID string , partitionKey string ) error {
187+ bid := entity.BatchID {ID : batchID }
188+ payload , err := bid .ToBytes ()
174189 if err != nil {
175- return fmt .Errorf ("failed to serialize batch: %w" , err )
190+ return fmt .Errorf ("failed to serialize batch ID : %w" , err )
176191 }
177192
178- msg := entityqueue .NewMessage (batch . ID , payload , batch . Queue , nil )
193+ msg := entityqueue .NewMessage (batchID , payload , partitionKey , nil )
179194
180195 q , ok := c .registry .Queue (key )
181196 if ! ok {
0 commit comments