|
|
@@ -17,39 +17,17 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- addr = flag.String("addr", ":8080", "The address to bind to")
|
|
|
- brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
|
|
|
- verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
|
|
|
- cert_file = flag.String("certificate", "", "The certificate file for client authentication")
|
|
|
- key_file = flag.String("key", "", "The key file for client authentication")
|
|
|
- ca_file = flag.String("ca", "", "The certificate authority file for client authentication")
|
|
|
- tls_config *tls.Config
|
|
|
+ addr = flag.String("addr", ":8080", "The address to bind to")
|
|
|
+ brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
|
|
|
+ verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
|
|
|
+ cert_file = flag.String("certificate", "", "The certificate file for client authentication")
|
|
|
+ key_file = flag.String("key", "", "The key file for client authentication")
|
|
|
+ ca_file = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
|
|
|
)
|
|
|
|
|
|
func main() {
|
|
|
flag.Parse()
|
|
|
|
|
|
- if *cert_file != "" && *key_file != "" && *ca_file != "" {
|
|
|
- cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
|
|
|
- if err != nil {
|
|
|
- log.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- caCert, err := ioutil.ReadFile(*ca_file)
|
|
|
- if err != nil {
|
|
|
- log.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- caCertPool := x509.NewCertPool()
|
|
|
- caCertPool.AppendCertsFromPEM(caCert)
|
|
|
-
|
|
|
- tls_config = &tls.Config{
|
|
|
- Certificates: []tls.Certificate{cert},
|
|
|
- // RootCAs: caCertPool,
|
|
|
- InsecureSkipVerify: true,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
if *verbose {
|
|
|
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
|
|
}
|
|
|
@@ -75,6 +53,31 @@ func main() {
|
|
|
log.Fatal(server.Run(*addr))
|
|
|
}
|
|
|
|
|
|
+func createTlsConfiguration() (t *tls.Config) {
|
|
|
+ if *cert_file != "" && *key_file != "" && *ca_file != "" {
|
|
|
+ cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ caCert, err := ioutil.ReadFile(*ca_file)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ caCertPool := x509.NewCertPool()
|
|
|
+ caCertPool.AppendCertsFromPEM(caCert)
|
|
|
+
|
|
|
+ t = &tls.Config{
|
|
|
+ Certificates: []tls.Certificate{cert},
|
|
|
+ RootCAs: caCertPool,
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // will be nil by default if nothing is provided
|
|
|
+ return t
|
|
|
+}
|
|
|
+
|
|
|
type Server struct {
|
|
|
DataCollector sarama.SyncProducer
|
|
|
AccessLogProducer sarama.AsyncProducer
|
|
|
@@ -192,6 +195,7 @@ func newDataCollector(brokerList []string) sarama.SyncProducer {
|
|
|
config := sarama.NewConfig()
|
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
|
|
|
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
|
|
|
+ tls_config := createTlsConfiguration()
|
|
|
if tls_config != nil {
|
|
|
config.Net.TLS.Config = tls_config
|
|
|
config.Net.TLS.Enable = true
|
|
|
@@ -215,6 +219,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
|
|
|
// For the access log, we are looking for AP semantics, with high throughput.
|
|
|
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
|
|
|
config := sarama.NewConfig()
|
|
|
+ tls_config := createTlsConfiguration()
|
|
|
if tls_config != nil {
|
|
|
config.Net.TLS.Enable = true
|
|
|
config.Net.TLS.Config = tls_config
|