Browse Source

Rebase fixes for metrics changes

Also add a configuration so that non-kafka SASL proxies continue to work, since
that was the original reason we added SASL.
Evan Huus 9 years ago
parent
commit
c4cc2603da
2 changed files with 20 additions and 9 deletions
  1. 15 9
      broker.go
  2. 5 0
      config.go

+ 15 - 9
broker.go

@@ -526,7 +526,7 @@ func (b *Broker) responseReceiver() {
 func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 	rb := &SaslHandshakeRequest{"PLAIN"}
 	rb := &SaslHandshakeRequest{"PLAIN"}
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
-	buf, err := encode(req)
+	buf, err := encode(req, b.conf.MetricRegistry)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -536,6 +536,7 @@ func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 		return err
 		return err
 	}
 	}
 
 
+	requestTime := time.Now()
 	bytes, err := b.conn.Write(buf)
 	bytes, err := b.conn.Write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 	if err != nil {
@@ -545,16 +546,19 @@ func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 	b.correlationID++
 	b.correlationID++
 	//wait for the response
 	//wait for the response
 	header := make([]byte, 8) // response header
 	header := make([]byte, 8) // response header
-	n, err := io.ReadFull(b.conn, header)
-	b.updateIncomingCommunicationMetrics(n)
+	_, err = io.ReadFull(b.conn, header)
+	if err != nil {
+		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
+		return err
+	}
 	length := binary.BigEndian.Uint32(header[:4])
 	length := binary.BigEndian.Uint32(header[:4])
 	payload := make([]byte, length-4)
 	payload := make([]byte, length-4)
-	n, err = io.ReadFull(b.conn, payload)
+	n, err := io.ReadFull(b.conn, payload)
 	if err != nil {
 	if err != nil {
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
-	b.updateIncomingCommunicationMetrics(n)
+	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
 	res := &SaslHandshakeResponse{}
 	res := &SaslHandshakeResponse{}
 	err = versionedDecode(payload, res, 0)
 	err = versionedDecode(payload, res, 0)
 	if err != nil {
 	if err != nil {
@@ -588,10 +592,12 @@ func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
 // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
 // of responding to bad credentials but thats how its being done today.
 // of responding to bad credentials but thats how its being done today.
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
-	handshakeErr := b.sendAndReceiveSASLPlainHandshake()
-	if handshakeErr != nil {
-		Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
-		return handshakeErr
+	if b.conf.Net.SASL.Handshake {
+		handshakeErr := b.sendAndReceiveSASLPlainHandshake()
+		if handshakeErr != nil {
+			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
+			return handshakeErr
+		}
 	}
 	}
 	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	authBytes := make([]byte, length+4) //4 byte length header + auth data
 	authBytes := make([]byte, length+4) //4 byte length header + auth data

+ 5 - 0
config.go

@@ -43,6 +43,10 @@ type Config struct {
 			// Whether or not to use SASL authentication when connecting to the broker
 			// Whether or not to use SASL authentication when connecting to the broker
 			// (defaults to false).
 			// (defaults to false).
 			Enable bool
 			Enable bool
+			// Whether or not to send the Kafka SASL handshake first if enabled
+			// (defaults to true). You should only set this to false if you're using
+			// a non-Kafka SASL proxy.
+			Handshake bool
 			//username and password for SASL/PLAIN authentication
 			//username and password for SASL/PLAIN authentication
 			User     string
 			User     string
 			Password string
 			Password string
@@ -251,6 +255,7 @@ func NewConfig() *Config {
 	c.Net.DialTimeout = 30 * time.Second
 	c.Net.DialTimeout = 30 * time.Second
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
+	c.Net.SASL.Handshake = true
 
 
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond