|
|
@@ -1,7 +1,6 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
- "bytes"
|
|
|
"crypto/tls"
|
|
|
"encoding/binary"
|
|
|
"fmt"
|
|
|
@@ -488,23 +487,13 @@ func (b *Broker) responseReceiver() {
|
|
|
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
|
|
|
// of responding to bad credentials but thats how its being done today.
|
|
|
func (b *Broker) doSASLPlainAuth() error {
|
|
|
- authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
|
|
|
- buf := new(bytes.Buffer)
|
|
|
-
|
|
|
- err := binary.Write(buf, binary.BigEndian, int32(len(authBytes)))
|
|
|
- if err != nil {
|
|
|
- Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error())
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- err = binary.Write(buf, binary.BigEndian, authBytes)
|
|
|
- if err != nil {
|
|
|
- Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error())
|
|
|
- return err
|
|
|
- }
|
|
|
+ length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
|
|
|
+ authBytes := make([]byte, length + 4) //4 byte length header + auth data
|
|
|
+ binary.BigEndian.PutUint32(authBytes, uint32(length))
|
|
|
+ copy(authBytes[4:], []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password))
|
|
|
|
|
|
b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
|
|
|
- b.conn.Write(buf.Bytes())
|
|
|
+ b.conn.Write(authBytes)
|
|
|
|
|
|
header := make([]byte, 4)
|
|
|
n, err := io.ReadFull(b.conn, header)
|