Browse Source

Added TLS option with client auth to kafka-console-producer and kafka-console-consumer.

Andrej van der Zee 6 years ago
parent
commit
14ee58c814

+ 38 - 6
tools/kafka-console-consumer/kafka-console-consumer.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"crypto/tls"
 	"flag"
 	"fmt"
 	"log"
@@ -14,11 +15,16 @@ import (
 )
 
 var (
-	brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
-	topic      = flag.String("topic", "", "REQUIRED: the topic to consume")
-	partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
-	offset     = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
-	verbose    = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
+	topic         = flag.String("topic", "", "REQUIRED: the topic to consume")
+	partitions    = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
+	offset        = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
+	verbose       = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+	tlsEnabled    = flag.Bool("tls-enabled", false, "Whether to enable TLS")
+	tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
+	clientCert    = flag.String("client-cert", "", "The client cert for client authentication (use with -tls-enabled and -client-key)")
+	clientKey     = flag.String("client-key", "", "The client key for client authentication (use with tls-enabled and -client-cert)")
+
 	bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -49,7 +55,19 @@ func main() {
 		printUsageErrorAndExit("-offset should be `oldest` or `newest`")
 	}
 
-	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
+	config := sarama.NewConfig()
+	if *tlsEnabled {
+		tlsConfig, err := newTLSConfig(*clientCert, *clientKey)
+		if err != nil {
+			printErrorAndExit(69, "Failed to create TLS config: %s", err)
+		}
+
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
+	}
+
+	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
 	if err != nil {
 		printErrorAndExit(69, "Failed to start consumer: %s", err)
 	}
@@ -143,3 +161,17 @@ func printUsageErrorAndExit(format string, values ...interface{}) {
 	flag.PrintDefaults()
 	os.Exit(64)
 }
+
+func newTLSConfig(clientCert, clientKey string) (*tls.Config, error) {
+	tlsConfig := tls.Config{}
+
+	if clientCert != "" && clientKey != "" {
+		cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
+		if err != nil {
+			return &tlsConfig, err
+		}
+		tlsConfig.Certificates = []tls.Certificate{cert}
+	}
+
+	return &tlsConfig, nil
+}

+ 40 - 9
tools/kafka-console-producer/kafka-console-producer.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"crypto/tls"
 	"flag"
 	"fmt"
 	"io/ioutil"
@@ -13,15 +14,20 @@ import (
 )
 
 var (
-	brokerList  = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
-	topic       = flag.String("topic", "", "REQUIRED: the topic to produce to")
-	key         = flag.String("key", "", "The key of the message to produce. Can be empty.")
-	value       = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
-	partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
-	partition   = flag.Int("partition", -1, "The partition to produce to.")
-	verbose     = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
-	showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
-	silent      = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
+	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
+	topic         = flag.String("topic", "", "REQUIRED: the topic to produce to")
+	key           = flag.String("key", "", "The key of the message to produce. Can be empty.")
+	value         = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
+	partitioner   = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
+	partition     = flag.Int("partition", -1, "The partition to produce to.")
+	verbose       = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
+	showMetrics   = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
+	silent        = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
+	tlsEnabled    = flag.Bool("tls-enabled", false, "Whether to enable TLS")
+	tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
+	clientCert    = flag.String("client-cert", "", "The client cert for client authentication (use with -tls-enabled and -client-key)")
+	clientKey     = flag.String("client-key", "", "The client key for client authentication (use with tls-enabled and -client-cert)")
+	ca            = flag.String("ca", "", "The CA that signed TLS server cert (use with -tls-enabled)")
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
 )
@@ -45,6 +51,17 @@ func main() {
 	config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.Return.Successes = true
 
+	if *tlsEnabled {
+		tlsConfig, err := newTLSConfig(*clientCert, *clientKey)
+		if err != nil {
+			printErrorAndExit(69, "Failed to create TLS config: %s", err)
+		}
+
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
+	}
+
 	switch *partitioner {
 	case "":
 		if *partition >= 0 {
@@ -122,3 +139,17 @@ func stdinAvailable() bool {
 	stat, _ := os.Stdin.Stat()
 	return (stat.Mode() & os.ModeCharDevice) == 0
 }
+
+func newTLSConfig(clientCert, clientKey string) (*tls.Config, error) {
+	tlsConfig := tls.Config{}
+
+	if clientCert != "" && clientKey != "" {
+		cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
+		if err != nil {
+			return &tlsConfig, err
+		}
+		tlsConfig.Certificates = []tls.Certificate{cert}
+	}
+
+	return &tlsConfig, nil
+}