From 24ee1a188733775833daccd41ed62bfabb5cd8cb Mon Sep 17 00:00:00 2001 From: zhongxibiao Date: Wed, 1 Jul 2020 14:55:50 +0800 Subject: [PATCH] =?UTF-8?q?Kafka=E9=98=9F=E5=88=97=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E9=93=BE=E8=B7=AF=E8=BF=BD=E8=B8=AA=E5=AE=9E=E7=8E=B0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/net/trace/noop.go | 3 +++ pkg/net/trace/span.go | 14 ++++++++++++-- pkg/net/trace/tracer.go | 3 +++ pkg/queue/databus/consumer.go | 25 ++++++++++++++++++++++--- pkg/queue/databus/test/test.go | 2 +- 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/pkg/net/trace/noop.go b/pkg/net/trace/noop.go index b60a32b..e5f2125 100644 --- a/pkg/net/trace/noop.go +++ b/pkg/net/trace/noop.go @@ -45,3 +45,6 @@ func (n noopspan) Visit(func(k, v string)) {} func (n noopspan) SetTitle(string) {} func (n noopspan) String() string { return "" } + +//重置TraceID信息 +func (n noopspan) ResetTraceInfo(traceID string){} \ No newline at end of file diff --git a/pkg/net/trace/span.go b/pkg/net/trace/span.go index b03aaa2..717ac38 100644 --- a/pkg/net/trace/span.go +++ b/pkg/net/trace/span.go @@ -2,9 +2,10 @@ package trace import ( "fmt" - "time" - protogen "github.com/mapgoo-lab/atreus/pkg/net/trace/proto" + "strconv" + "strings" + "time" ) const ( @@ -139,3 +140,12 @@ func (s *Span) SetTitle(operationName string) { func (s *Span) String() string { return s.context.String() } + +//重置TraceID信息 +func(s *Span) ResetTraceInfo(traceID string){ + tids := strings.Split(traceID, ":") + if len(tids) == 4 { + s.context.TraceID, _ = strconv.ParseUint(tids[0], 16, 64) + s.context.ParentID, _ = strconv.ParseUint(tids[1], 16, 64) + } +} \ No newline at end of file diff --git a/pkg/net/trace/tracer.go b/pkg/net/trace/tracer.go index 20e71de..7b43ef6 100644 --- a/pkg/net/trace/tracer.go +++ b/pkg/net/trace/tracer.go @@ -91,4 +91,7 @@ type Trace interface { // SetTitle reset trace title SetTitle(title string) + + //重置TraceID信息 + ResetTraceInfo(traceID string) } diff --git a/pkg/queue/databus/consumer.go b/pkg/queue/databus/consumer.go index a9a2cf4..75204a9 100644 --- a/pkg/queue/databus/consumer.go +++ b/pkg/queue/databus/consumer.go @@ -4,15 +4,19 @@ import ( "context" "fmt" "github.com/Shopify/sarama" + "github.com/mapgoo-lab/atreus/pkg/conf/env" + "github.com/mapgoo-lab/atreus/pkg/log" + "github.com/mapgoo-lab/atreus/pkg/net/criticality" + "github.com/mapgoo-lab/atreus/pkg/net/metadata" + "github.com/mapgoo-lab/atreus/pkg/net/trace" "os" "time" - "github.com/mapgoo-lab/atreus/pkg/log" ) //使用者必须实现的接口 type ConsumerDeal interface { //数据处理的实现 - DealMessage(data []byte) error + DealMessage(data []byte,ctx context.Context) error //消费组增加消费者的消息通知 Setup(topicAndPartitions map[string][]int32, memberId string, generationId int32) @@ -125,11 +129,26 @@ func (handle consumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) err } func (handle consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - err := handle.event.deal.DealMessage(msg.Value) + var ctx,tre= getTraceContextTrace() + err := handle.event.deal.DealMessage(msg.Value,ctx) if err != nil { log.Info("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) } + tre.Finish(nil) sess.MarkMessage(msg, "") } return nil } + +func getTraceContextTrace() (ctx context.Context,tre trace.Trace) { + md := metadata.MD{ + metadata.RemoteIP: env.Hostname, + metadata.Criticality: string(criticality.Critical), + metadata.Caller: env.AppID, + } + + tre = trace.New(fmt.Sprintf("/%s.Consumer",env.AppID)) + ctx1 := metadata.NewContext(context.Background(), md) + ctx= trace.NewContext(ctx1, tre) + return +} diff --git a/pkg/queue/databus/test/test.go b/pkg/queue/databus/test/test.go index cd7fe0c..b7033a8 100644 --- a/pkg/queue/databus/test/test.go +++ b/pkg/queue/databus/test/test.go @@ -11,7 +11,7 @@ import ( type ConsumerDealHandle struct {} -func (handle ConsumerDealHandle) DealMessage(data []byte) error { +func (handle ConsumerDealHandle) DealMessage(data []byte,ctx context.Context) error { //解包proto var req *pb.EventReq = new(pb.EventReq) err := proto.Unmarshal(data, req)