浏览代码

Merge pull request #1300 from andrejvanderzee/master

Added TLS options with client auth to kafka-console-producer and consumer
Vlad Gorodetsky 6 年之前
父节点
当前提交
cb940f0f2a

+ 24 - 6
tools/kafka-console-consumer/kafka-console-consumer.go

@@ -11,14 +11,20 @@ import (
 	"sync"
 	"sync"
 
 
 	"github.com/Shopify/sarama"
 	"github.com/Shopify/sarama"
+	"github.com/Shopify/sarama/tools/tls"
 )
 )
 
 
 var (
 var (
-	brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
-	topic      = flag.String("topic", "", "REQUIRED: the topic to consume")
-	partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
-	offset     = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
-	verbose    = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
+	topic         = flag.String("topic", "", "REQUIRED: the topic to consume")
+	partitions    = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
+	offset        = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
+	verbose       = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+	tlsEnabled    = flag.Bool("tls-enabled", false, "Whether to enable TLS")
+	tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
+	tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
+	tlsClientKey  = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
+
 	bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
 	bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
 
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
 	logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -49,7 +55,19 @@ func main() {
 		printUsageErrorAndExit("-offset should be `oldest` or `newest`")
 		printUsageErrorAndExit("-offset should be `oldest` or `newest`")
 	}
 	}
 
 
-	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
+	config := sarama.NewConfig()
+	if *tlsEnabled {
+		tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
+		if err != nil {
+			printErrorAndExit(69, "Failed to create TLS config: %s", err)
+		}
+
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
+	}
+
+	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
 	if err != nil {
 	if err != nil {
 		printErrorAndExit(69, "Failed to start consumer: %s", err)
 		printErrorAndExit(69, "Failed to start consumer: %s", err)
 	}
 	}

+ 25 - 9
tools/kafka-console-producer/kafka-console-producer.go

@@ -9,19 +9,24 @@ import (
 	"strings"
 	"strings"
 
 
 	"github.com/Shopify/sarama"
 	"github.com/Shopify/sarama"
+	"github.com/Shopify/sarama/tools/tls"
 	"github.com/rcrowley/go-metrics"
 	"github.com/rcrowley/go-metrics"
 )
 )
 
 
 var (
 var (
-	brokerList  = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
-	topic       = flag.String("topic", "", "REQUIRED: the topic to produce to")
-	key         = flag.String("key", "", "The key of the message to produce. Can be empty.")
-	value       = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
-	partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
-	partition   = flag.Int("partition", -1, "The partition to produce to.")
-	verbose     = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
-	showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
-	silent      = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
+	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
+	topic         = flag.String("topic", "", "REQUIRED: the topic to produce to")
+	key           = flag.String("key", "", "The key of the message to produce. Can be empty.")
+	value         = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
+	partitioner   = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
+	partition     = flag.Int("partition", -1, "The partition to produce to.")
+	verbose       = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
+	showMetrics   = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
+	silent        = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
+	tlsEnabled    = flag.Bool("tls-enabled", false, "Whether to enable TLS")
+	tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
+	tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
+	tlsClientKey  = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
 
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
 	logger = log.New(os.Stderr, "", log.LstdFlags)
 )
 )
@@ -45,6 +50,17 @@ func main() {
 	config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.Return.Successes = true
 	config.Producer.Return.Successes = true
 
 
+	if *tlsEnabled {
+		tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
+		if err != nil {
+			printErrorAndExit(69, "Failed to create TLS config: %s", err)
+		}
+
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
+	}
+
 	switch *partitioner {
 	switch *partitioner {
 	case "":
 	case "":
 		if *partition >= 0 {
 		if *partition >= 0 {

+ 17 - 0
tools/tls/config.go

@@ -0,0 +1,17 @@
+package tls
+
+import "crypto/tls"
+
+func NewConfig(clientCert, clientKey string) (*tls.Config, error) {
+	tlsConfig := tls.Config{}
+
+	if clientCert != "" && clientKey != "" {
+		cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
+		if err != nil {
+			return &tlsConfig, err
+		}
+		tlsConfig.Certificates = []tls.Certificate{cert}
+	}
+
+	return &tlsConfig, nil
+}