|
|
@@ -3,9 +3,12 @@ package main
|
|
|
import (
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
|
|
+ "crypto/tls"
|
|
|
+ "crypto/x509"
|
|
|
"encoding/json"
|
|
|
"flag"
|
|
|
"fmt"
|
|
|
+ "io/ioutil"
|
|
|
"log"
|
|
|
"net/http"
|
|
|
"os"
|
|
|
@@ -14,14 +17,39 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- addr = flag.String("addr", ":8080", "The address to bind to")
|
|
|
- brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
|
|
|
- verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
|
|
|
+ addr = flag.String("addr", ":8080", "The address to bind to")
|
|
|
+ brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
|
|
|
+ verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
|
|
|
+ cert_file = flag.String("certificate", "", "The certificate file for client authentication")
|
|
|
+ key_file = flag.String("key", "", "The key file for client authentication")
|
|
|
+ ca_file = flag.String("ca", "", "The certificate authority file for client authentication")
|
|
|
+ tls_config *tls.Config
|
|
|
)
|
|
|
|
|
|
func main() {
|
|
|
flag.Parse()
|
|
|
|
|
|
+ if *cert_file != "" && *key_file != "" && *ca_file != "" {
|
|
|
+ cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ caCert, err := ioutil.ReadFile(*ca_file)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ caCertPool := x509.NewCertPool()
|
|
|
+ caCertPool.AppendCertsFromPEM(caCert)
|
|
|
+
|
|
|
+ tls_config = &tls.Config{
|
|
|
+ Certificates: []tls.Certificate{cert},
|
|
|
+ // RootCAs: caCertPool,
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if *verbose {
|
|
|
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
|
|
}
|
|
|
@@ -164,6 +192,10 @@ func newDataCollector(brokerList []string) sarama.SyncProducer {
|
|
|
config := sarama.NewConfig()
|
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
|
|
|
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
|
|
|
+ if tls_config != nil {
|
|
|
+ config.Net.TLS.Config = tls_config
|
|
|
+ config.Net.TLS.Enable = true
|
|
|
+ }
|
|
|
|
|
|
// On the broker side, you may want to change the following settings to get
|
|
|
// stronger consistency guarantees:
|
|
|
@@ -183,6 +215,10 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
|
|
|
// For the access log, we are looking for AP semantics, with high throughput.
|
|
|
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
|
|
|
config := sarama.NewConfig()
|
|
|
+ if tls_config != nil {
|
|
|
+ config.Net.TLS.Enable = true
|
|
|
+ config.Net.TLS.Config = tls_config
|
|
|
+ }
|
|
|
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
|
|
|
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
|
|
|
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
|