-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstatmessageprocessor.go
More file actions
127 lines (110 loc) · 5.33 KB
/
statmessageprocessor.go
File metadata and controls
127 lines (110 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package stats
import (
"context"
"time"
"github.com/asecurityteam/messageprocessor"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/rs/xstats"
)
const (
consumedCounter = "kinesis.consumed"
consumerSuccessCounter = "kinesis.consumer_success"
consumerErrorCounter = "kinesis.consumer_error"
consumedSize = "kinesis.consumed_size"
consumerLag = "kinesis.consumer_lag.timing"
consumerTimingSuccess = "kinesis.consumer.timing.success"
consumerTimingFailure = "kinesis.consumer.timing.failure"
)
// StatMessageProcessorConfig is the config for creating a StatMessageProcessor
type StatMessageProcessorConfig struct {
ConsumedCounter string `description:"Name of overall kinesis record consumption metric."`
ConsumerSuccessCounter string `description:"Name of overall successful kinesis record consumption metric."`
ConsumerErrorCounter string `description:"Name of overall failed kinesis record consumption metric."`
ConsumedSize string `description:"Name of consumed kinesis record size metric."`
ConsumerLag string `description:"Name of lag time between kinesis production and consumption metric."`
ConsumerTimingSuccess string `description:"Name of time to process successful kinesis record metric."`
ConsumerTimingFailure string `description:"Name of time to process failed kinesis record metric."`
}
// Name of the config root.
func (*StatMessageProcessorConfig) Name() string {
return "consumerMetrics"
}
// StatMessageProcessorComponent implements the settings.Component interface.
type StatMessageProcessorComponent struct{}
// NewComponent populates default values.
func NewComponent() *StatMessageProcessorComponent {
return &StatMessageProcessorComponent{}
}
// Settings generates a config populated with defaults.
func (*StatMessageProcessorComponent) Settings() *StatMessageProcessorConfig {
return &StatMessageProcessorConfig{
ConsumedCounter: consumedCounter,
ConsumerSuccessCounter: consumerSuccessCounter,
ConsumerErrorCounter: consumerErrorCounter,
ConsumedSize: consumedSize,
ConsumerLag: consumerLag,
ConsumerTimingSuccess: consumerTimingSuccess,
ConsumerTimingFailure: consumerTimingFailure,
}
}
func (c *StatMessageProcessorComponent) New(_ context.Context, conf *StatMessageProcessorConfig) (func(messageprocessor.MessageProcessor) messageprocessor.MessageProcessor, error) { // nolint
return func(processor messageprocessor.MessageProcessor) messageprocessor.MessageProcessor {
return &StatMessageProcessor{
ConsumedCounter: conf.ConsumedCounter,
ConsumerSuccessCounter: conf.ConsumerSuccessCounter,
ConsumerErrorCounter: conf.ConsumerErrorCounter,
ConsumedSize: conf.ConsumedSize,
ConsumerLag: conf.ConsumerLag,
ConsumerTimingSuccess: conf.ConsumerTimingSuccess,
ConsumerTimingFailure: conf.ConsumerTimingFailure,
wrapped: processor,
}
}, nil
}
// StatMessageProcessor is wrapper around MessageProcessor to capture and emit Kinesis related stats
type StatMessageProcessor struct {
ConsumedCounter string
ConsumerSuccessCounter string
ConsumerErrorCounter string
ConsumedSize string
ConsumerLag string
ConsumerTimingSuccess string
ConsumerTimingFailure string
wrapped messageprocessor.MessageProcessor
}
// ProcessMessage injects an `xstats.XStater` into the request and invokes the
// wrapped `MessageProcessor.ProcessMessage`.
func (t StatMessageProcessor) ProcessMessage(ctx context.Context, record *kinesis.Record) messageprocessor.MessageProcessorError {
stat := xstats.FromContext(ctx)
// consumerLag - Time.Duration between the time immediately before the message is processed and its Record.ApproximateArrivalTimestamp,
//which is the timestamp of when the record was inserted into the Kinesis stream.
messageArrivalTimeStamp := record.ApproximateArrivalTimestamp
lagDuration := time.Since(messageArrivalTimeStamp.Local())
stat.Timing(t.ConsumerLag, lagDuration)
// consumedCounter - Incremented for every message received, regardless of success or failure
stat.Count(t.ConsumedCounter, 1)
// length of message body
stat.Count(t.ConsumedSize, float64(len(record.Data)))
var start = time.Now()
err := t.wrapped.ProcessMessage(ctx, record)
var end = time.Since(start)
if err == nil {
// consumerSuccessCounter - Incremented for every message processed successfully
stat.Count(t.ConsumerSuccessCounter, 1)
// consumerTimingSuccess - Time.Duration for processing of a message which is successfully processed by underlying Kinesis consumer
stat.Timing(t.ConsumerTimingSuccess, end)
} else {
// consumerFailure - Incremented for every message which is failed to be processed
stat.Count(t.ConsumerErrorCounter, 1)
// consumerTimingFailure - Time.Duration for processing of a message which underlying Kinesis consumer fails to process
stat.Timing(t.ConsumerTimingFailure, end)
}
return err
}
// NewStatMessageProcessor returns a function that wraps a `messageprocessor.MessageProcessor` in a
// `StatMessageProcessor` `messageprocessor.MessageProcessor`.
func NewStatMessageProcessor() func(messageprocessor.MessageProcessor) messageprocessor.MessageProcessor {
return func(processor messageprocessor.MessageProcessor) messageprocessor.MessageProcessor {
return &StatMessageProcessor{wrapped: processor}
}
}