|
|
@@ -20,9 +20,10 @@ var (
|
|
|
addr = flag.String("addr", ":8080", "The address to bind to")
|
|
|
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
|
|
|
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
|
|
|
- cert_file = flag.String("certificate", "", "The optional certificate file for client authentication")
|
|
|
- key_file = flag.String("key", "", "The optional key file for client authentication")
|
|
|
- ca_file = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
|
|
|
+ certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
|
|
|
+ keyFile = flag.String("key", "", "The optional key file for client authentication")
|
|
|
+ caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
|
|
|
+ verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
|
|
|
)
|
|
|
|
|
|
func main() {
|
|
|
@@ -54,13 +55,13 @@ func main() {
|
|
|
}
|
|
|
|
|
|
func createTlsConfiguration() (t *tls.Config) {
|
|
|
- if *cert_file != "" && *key_file != "" && *ca_file != "" {
|
|
|
- cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
|
|
|
+ if *certFile != "" && *keyFile != "" && *caFile != "" {
|
|
|
+ cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
|
|
|
if err != nil {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
|
|
|
- caCert, err := ioutil.ReadFile(*ca_file)
|
|
|
+ caCert, err := ioutil.ReadFile(*caFile)
|
|
|
if err != nil {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
@@ -71,7 +72,7 @@ func createTlsConfiguration() (t *tls.Config) {
|
|
|
t = &tls.Config{
|
|
|
Certificates: []tls.Certificate{cert},
|
|
|
RootCAs: caCertPool,
|
|
|
- InsecureSkipVerify: true,
|
|
|
+ InsecureSkipVerify: *verifySsl,
|
|
|
}
|
|
|
}
|
|
|
// will be nil by default if nothing is provided
|
|
|
@@ -195,9 +196,9 @@ func newDataCollector(brokerList []string) sarama.SyncProducer {
|
|
|
config := sarama.NewConfig()
|
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
|
|
|
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
|
|
|
- tls_config := createTlsConfiguration()
|
|
|
- if tls_config != nil {
|
|
|
- config.Net.TLS.Config = tls_config
|
|
|
+ tlsConfig := createTlsConfiguration()
|
|
|
+ if tlsConfig != nil {
|
|
|
+ config.Net.TLS.Config = tlsConfig
|
|
|
config.Net.TLS.Enable = true
|
|
|
}
|
|
|
|
|
|
@@ -219,10 +220,10 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
|
|
|
// For the access log, we are looking for AP semantics, with high throughput.
|
|
|
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
|
|
|
config := sarama.NewConfig()
|
|
|
- tls_config := createTlsConfiguration()
|
|
|
- if tls_config != nil {
|
|
|
+ tlsConfig := createTlsConfiguration()
|
|
|
+ if tlsConfig != nil {
|
|
|
config.Net.TLS.Enable = true
|
|
|
- config.Net.TLS.Config = tls_config
|
|
|
+ config.Net.TLS.Config = tlsConfig
|
|
|
}
|
|
|
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
|
|
|
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
|