From 3a79fd62299bc91068ba265518db61a3edb978e0 Mon Sep 17 00:00:00 2001 From: MuLin Date: Fri, 20 Aug 2021 13:32:01 +0800 Subject: [PATCH] Add reportType "once" in dialout_client --- dialout/dialout_client/dialout_client.go | 42 +++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/dialout/dialout_client/dialout_client.go b/dialout/dialout_client/dialout_client.go index 7e46cdf9d..410fdb930 100644 --- a/dialout/dialout_client/dialout_client.go +++ b/dialout/dialout_client/dialout_client.go @@ -396,7 +396,7 @@ restart: //Remote server might go down, in that case we restart with next destin } cs.Close() cs.w.Wait() - // Don't restart immediatly + // Don't restart immediately time.Sleep(clientCfg.RetryInterval) goto restart @@ -404,6 +404,46 @@ restart: //Remote server might go down, in that case we restart with next destin log.V(1).Infof("%v exiting publishRun routine for destination %s", cs, dest) return } + case Once: + select { + default: + spbValues, err := cs.dc.Get(nil) + if err != nil { + log.V(2).Infof("Data read error %v for %v", err, cs) + } + var updates []*gpb.Update + var spbValue *spb.Value + for _, spbValue = range spbValues { + update := &gpb.Update{ + Path: spbValue.GetPath(), + Val: spbValue.GetVal(), + } + updates = append(updates, update) + } + rs := &gpb.SubscribeResponse_Update{ + Update: &gpb.Notification{ + Timestamp: spbValue.GetTimestamp(), + Prefix: cs.prefix, + Update: updates, + }, + } + response := &gpb.SubscribeResponse{Response: rs} + + log.V(6).Infof("cs %s sending \n\t%v \n To %s", cs.name, response, dest) + err = pub.Send(response) + if err != nil { + log.V(1).Infof("Client %v pub Send error:%v, cs.conTryCnt %v", cs.name, err, cs.conTryCnt) + cs.Close() + // Retry + goto restart + } + log.V(6).Infof("cs %s to %s done", cs.name, dest) + cs.sendMsg++ + c.sendMsg++ + case <-cs.stop: + log.V(1).Infof("%v exiting publishRun routine for destination %s", cs, dest) + return + } default: log.V(1).Infof("Unsupported report type %s in %v ", cs.reportType, cs) }