Browse Source

Merge pull request #493 from uovobw/support-ssl

Add configuration to support broker SSL
Evan Huus 9 years ago
parent
commit
b973736745
3 changed files with 60 additions and 5 deletions
  1. 6 1
      broker.go
  2. 9 1
      config.go
  3. 45 3
      examples/http_server/http_server.go

+ 6 - 1
broker.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"crypto/tls"
 	"fmt"
 	"io"
 	"net"
@@ -73,7 +74,11 @@ func (b *Broker) Open(conf *Config) error {
 			KeepAlive: conf.Net.KeepAlive,
 		}
 
-		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
+		if conf.Net.TLS.Enable {
+			b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
+		} else {
+			b.conn, b.connErr = dialer.Dial("tcp", b.addr)
+		}
 		if b.connErr != nil {
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)

+ 9 - 1
config.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "time"
+import (
+	"crypto/tls"
+	"time"
+)
 
 // Config is used to pass multiple configuration options to Sarama's constructors.
 type Config struct {
@@ -13,6 +16,11 @@ type Config struct {
 		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
 		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
 
+		TLS struct {
+			Enable bool        // Whether or not to use TLS when connecting to the broker (defaults to false).
+			Config *tls.Config // The TLS configuration to use for secure connections if enabled (defaults to nil).
+		}
+
 		// KeepAlive specifies the keep-alive period for an active network connection.
 		// If zero, keep-alives are disabled. (default is 0: disabled).
 		KeepAlive time.Duration

+ 45 - 3
examples/http_server/http_server.go

@@ -3,9 +3,12 @@ package main
 import (
 	"github.com/Shopify/sarama"
 
+	"crypto/tls"
+	"crypto/x509"
 	"encoding/json"
 	"flag"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"net/http"
 	"os"
@@ -14,9 +17,13 @@ import (
 )
 
 var (
-	addr    = flag.String("addr", ":8080", "The address to bind to")
-	brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
-	verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
+	addr      = flag.String("addr", ":8080", "The address to bind to")
+	brokers   = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
+	verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
+	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
+	keyFile   = flag.String("key", "", "The optional key file for client authentication")
+	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
+	verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
 )
 
 func main() {
@@ -47,6 +54,31 @@ func main() {
 	log.Fatal(server.Run(*addr))
 }
 
+func createTlsConfiguration() (t *tls.Config) {
+	if *certFile != "" && *keyFile != "" && *caFile != "" {
+		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		caCert, err := ioutil.ReadFile(*caFile)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+
+		t = &tls.Config{
+			Certificates:       []tls.Certificate{cert},
+			RootCAs:            caCertPool,
+			InsecureSkipVerify: *verifySsl,
+		}
+	}
+	// will be nil by default if nothing is provided
+	return t
+}
+
 type Server struct {
 	DataCollector     sarama.SyncProducer
 	AccessLogProducer sarama.AsyncProducer
@@ -164,6 +196,11 @@ func newDataCollector(brokerList []string) sarama.SyncProducer {
 	config := sarama.NewConfig()
 	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
 	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
+	tlsConfig := createTlsConfiguration()
+	if tlsConfig != nil {
+		config.Net.TLS.Config = tlsConfig
+		config.Net.TLS.Enable = true
+	}
 
 	// On the broker side, you may want to change the following settings to get
 	// stronger consistency guarantees:
@@ -183,6 +220,11 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
 	// For the access log, we are looking for AP semantics, with high throughput.
 	// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
 	config := sarama.NewConfig()
+	tlsConfig := createTlsConfiguration()
+	if tlsConfig != nil {
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+	}
 	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
 	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
 	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms