diff --git a/cmd/topicmappr/commands/config.go b/cmd/topicmappr/commands/config.go index 78d49067..40c82a34 100644 --- a/cmd/topicmappr/commands/config.go +++ b/cmd/topicmappr/commands/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/DataDog/kafka-kit/v4/kafkazk" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/spf13/cobra" ) @@ -93,6 +94,30 @@ func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler, return zk, nil } +func newKafkaAdminClient(cmd *cobra.Command) (kafkaadmin.KafkaAdmin, error) { + cfg := kafkaadmin.Config{ + BootstrapServers: cmd.Parent().Flag("kafka-addr").Value.String(), + } + + if flag := cmd.Parent().Flag("kafka-ssl-ca-location"); flag.Changed { + cfg.SSLCALocation = flag.Value.String() + } + if flag := cmd.Parent().Flag("kafka-security-protocol"); flag.Changed { + cfg.SecurityProtocol = flag.Value.String() + } + if flag := cmd.Parent().Flag("kafka-sasl-mechanism"); flag.Changed { + cfg.SASLMechanism = flag.Value.String() + } + if flag := cmd.Parent().Flag("kafka-sasl-username"); flag.Changed { + cfg.SASLUsername = flag.Value.String() + } + if flag := cmd.Parent().Flag("kafka-sasl-password"); flag.Changed { + cfg.SASLPassword = flag.Value.String() + } + + return kafkaadmin.NewClient(cfg) +} + // containsRegex takes a topic name reference and returns whether or not // it should be interpreted as regex. func containsRegex(t string) bool { diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 4d98f4e4..c05d3ca0 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/spf13/cobra" ) @@ -57,8 +56,7 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index cd43e61f..ccdf3995 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -6,7 +6,6 @@ import ( "regexp" "strings" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/spf13/cobra" @@ -153,8 +152,7 @@ func rebuild(cmd *cobra.Command, _ []string) { } // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/root.go b/cmd/topicmappr/commands/root.go index 6bb6a5c0..ce848cbf 100644 --- a/cmd/topicmappr/commands/root.go +++ b/cmd/topicmappr/commands/root.go @@ -24,8 +24,15 @@ func Execute() { func init() { rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address") + rootCmd.PersistentFlags().String("kafka-ssl-ca-location", "/etc/kafka/config/ca.crt", "Kafka ssl ca location") + rootCmd.PersistentFlags().String("kafka-security-protocol", "SASL_SSL", "Kafka security protocol") + rootCmd.PersistentFlags().String("kafka-sasl-mechanism", "PLAIN", "Kafka sasl mechanism") + rootCmd.PersistentFlags().String("kafka-sasl-username", "", "Kafka sasl username") + rootCmd.PersistentFlags().String("kafka-sasl-password", "", "Kafka sasl password") + rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string") rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)") rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") + rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered") } diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 7e97975b..2287d151 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -4,8 +4,6 @@ import ( "fmt" "os" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" - "github.com/spf13/cobra" ) @@ -55,8 +53,7 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/go.sum b/go.sum index de825451..3bf66f66 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -248,6 +249,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=