77 "github.com/uber-go/tally/v4"
88 "github.com/uber/submitqueue/core/consumer"
99 "github.com/uber/submitqueue/core/errs"
10+ "github.com/uber/submitqueue/core/metrics"
1011 "github.com/uber/submitqueue/entity"
1112 "github.com/uber/submitqueue/extension/storage"
1213 "go.uber.org/zap"
@@ -49,8 +50,9 @@ func NewController(
4950// Process processes a conclude delivery from the queue.
5051// Deserializes the batch and completes the pipeline processing.
5152// Returns nil to ack (success), or error to nack (retry).
52- func (c * Controller ) Process (ctx context.Context , delivery consumer.Delivery ) error {
53- c .metricsScope .Counter ("received" ).Inc (1 )
53+ func (c * Controller ) Process (ctx context.Context , delivery consumer.Delivery ) (retErr error ) {
54+ op := metrics .Begin (c .metricsScope , "process" )
55+ defer func () { op .Complete (retErr ) }()
5456
5557 msg := delivery .Message ()
5658
@@ -63,8 +65,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
6365 "attempt" , delivery .Attempt (),
6466 "error" , err ,
6567 )
66- c .metricsScope .Counter ("deserialize_errors" ).Inc (1 )
6768 // Non-retryable: malformed messages will never succeed regardless of retry count
69+ metrics .NamedCounter (c .metricsScope , "process" , "deserialize_errors" , 1 )
6870 return fmt .Errorf ("failed to deserialize batch: %w" , err )
6971 }
7072
@@ -88,7 +90,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
8890 "batch_id" , batch .ID ,
8991 "state" , string (batch .State ),
9092 )
91- c .metricsScope . Counter ( " unexpected_state_errors"). Inc ( 1 )
93+ metrics . NamedCounter ( c .metricsScope , "process" , " unexpected_state_errors", 1 )
9294 return fmt .Errorf ("unexpected batch state %q for batch %s: %w" , batch .State , batch .ID , err )
9395 }
9496
@@ -101,7 +103,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
101103 "request_id" , requestID ,
102104 "error" , err ,
103105 )
104- c .metricsScope . Counter ( " request_store_errors"). Inc ( 1 )
106+ metrics . NamedCounter ( c .metricsScope , "process" , " request_store_errors", 1 )
105107 return errs .NewRetryableError (fmt .Errorf ("failed to get request %s: %w" , requestID , err ))
106108 }
107109
@@ -113,7 +115,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
113115 "to_state" , string (requestState ),
114116 "error" , err ,
115117 )
116- c .metricsScope . Counter ( " request_update_errors"). Inc ( 1 )
118+ metrics . NamedCounter ( c .metricsScope , "process" , " request_update_errors", 1 )
117119 return errs .NewRetryableError (fmt .Errorf ("failed to update request %s state to %s: %w" , requestID , requestState , err ))
118120 }
119121
@@ -124,8 +126,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
124126 )
125127 }
126128
127- c .metricsScope .Counter ("processed" ).Inc (1 )
128-
129129 return nil // Success - message will be acked
130130}
131131
0 commit comments