// StarterStopper starts/stops the forwarding.
type StarterStopper interface {
Start() <-chan struct{}
Stop()
ForceStop()
}It starts the forwarder.
It stops the forwarder.
It forces stopping the forwarder.
- calls cancel function
- closes any opening clients
Kafka: return make([]error, len(offsets))
Generator: return make([]error, len(offsets))
HTTP: return make([]error, len(offsets))
Nats: return make([]error, len(offsets))
RedisStream:
if err := br.Client.XAck(RedisContext, br.Stream, br.Group, strOffsets...).Err(); err != nil {
for i := 0; i < len(offsets); i++ {
errs[i] = err // 'errs' is indexed the same as 'offsets'
}
if br.Metrics.AckErrorsAdd != nil {
br.Metrics.AckErrorsAdd(len(strOffsets))
}
} else {
if br.Metrics.AcksAdd != nil {
br.Metrics.AcksAdd(len(strOffsets))
}
}
return errs
- When do we call start?
NewProcessorCommandstarts theSourceProcessor, which starts thesourcer
udsGRPCClient is started at SourceProcessor, so it should be