|
|
@@ -1,6 +1,7 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
+ "crypto/tls"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"net"
|
|
|
@@ -68,7 +69,15 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
go withRecover(func() {
|
|
|
defer b.lock.Unlock()
|
|
|
|
|
|
- b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
|
|
|
+ dialer := &net.Dialer{
|
|
|
+ Timeout: conf.Net.DialTimeout,
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.Net.TLS.Enable {
|
|
|
+ b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.Net.TLS.Config)
|
|
|
+ } else {
|
|
|
+ b.conn, b.connErr = dialer.Dial("tcp", b.addr)
|
|
|
+ }
|
|
|
if b.connErr != nil {
|
|
|
b.conn = nil
|
|
|
atomic.StoreInt32(&b.opened, 0)
|