Browse Source

added SASLVersion in config (#1410)

Prune Sebastien THOMAS 6 năm trước cách đây
mục cha
commit
c82066c158
3 tập tin đã thay đổi với 10 bổ sung7 xóa
  1. 5 7
      broker.go
  2. 1 0
      broker_test.go
  3. 4 0
      config.go

+ 5 - 7
broker.go

@@ -4,7 +4,6 @@ import (
 	"crypto/tls"
 	"encoding/binary"
 	"fmt"
-	metrics "github.com/rcrowley/go-metrics"
 	"io"
 	"net"
 	"sort"
@@ -13,6 +12,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	metrics "github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -944,19 +945,16 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	// default to V0 to allow for backward compatability when SASL is enabled
 	// but not the handshake
-	saslHandshake := SASLHandshakeV0
 	if b.conf.Net.SASL.Handshake {
-		if b.conf.Version.IsAtLeast(V1_0_0_0) {
-			saslHandshake = SASLHandshakeV1
-		}
-		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, saslHandshake)
+
+		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
 			return handshakeErr
 		}
 	}
 
-	if saslHandshake == SASLHandshakeV1 {
+	if b.conf.Net.SASL.Version == SASLHandshakeV1 {
 		return b.sendAndReceiveV1SASLPlainAuth()
 	}
 	return b.sendAndReceiveV0SASLPlainAuth()

+ 1 - 0
broker_test.go

@@ -441,6 +441,7 @@ func TestSASLPlainAuth(t *testing.T) {
 		conf.Net.SASL.Mechanism = SASLTypePlaintext
 		conf.Net.SASL.User = "token"
 		conf.Net.SASL.Password = "password"
+		conf.Net.SASL.Version = SASLHandshakeV1
 
 		broker.conf = conf
 		broker.conf.Version = V1_0_0_0

+ 4 - 0
config.go

@@ -58,6 +58,9 @@ type Config struct {
 			// SASLMechanism is the name of the enabled SASL mechanism.
 			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
 			Mechanism SASLMechanism
+			// Version is the SASL Protocol Version to use
+			// Kafka > 1.x should use V1, except on Azure EventHub which use V0
+			Version int16
 			// Whether or not to send the Kafka SASL handshake first if enabled
 			// (defaults to true). You should only set this to false if you're using
 			// a non-Kafka SASL proxy.
@@ -398,6 +401,7 @@ func NewConfig() *Config {
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
 	c.Net.SASL.Handshake = true
+	c.Net.SASL.Version = SASLHandshakeV0
 
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond