|
@@ -85,7 +85,7 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
b.conf = conf
|
|
b.conf = conf
|
|
|
|
|
|
|
|
if conf.Net.SASL.Enable {
|
|
if conf.Net.SASL.Enable {
|
|
|
- b.connErr = b.doSASLPlainAuth()
|
|
|
|
|
|
|
+ b.connErr = b.sendAndReceiveSASLPlainAuth()
|
|
|
if b.connErr != nil {
|
|
if b.connErr != nil {
|
|
|
err = b.conn.Close()
|
|
err = b.conn.Close()
|
|
|
if err == nil {
|
|
if err == nil {
|
|
@@ -490,7 +490,7 @@ func (b *Broker) responseReceiver() {
|
|
|
// When credentials are valid, Kafka returns a 4 byte array of null characters.
|
|
// When credentials are valid, Kafka returns a 4 byte array of null characters.
|
|
|
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
|
|
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
|
|
|
// of responding to bad credentials but thats how its being done today.
|
|
// of responding to bad credentials but thats how its being done today.
|
|
|
-func (b *Broker) doSASLPlainAuth() error {
|
|
|
|
|
|
|
+func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
|
|
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
|
|
|
authBytes := make([]byte, length+4) //4 byte length header + auth data
|
|
authBytes := make([]byte, length+4) //4 byte length header + auth data
|
|
|
binary.BigEndian.PutUint32(authBytes, uint32(length))
|
|
binary.BigEndian.PutUint32(authBytes, uint32(length))
|