|
@@ -13,7 +13,6 @@ import (
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
- "github.com/pkg/errors"
|
|
|
|
|
"github.com/rcrowley/go-metrics"
|
|
"github.com/rcrowley/go-metrics"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -780,14 +779,10 @@ func (b *Broker) responseReceiver() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (b *Broker) authenticateViaSASL() error {
|
|
func (b *Broker) authenticateViaSASL() error {
|
|
|
- switch b.conf.Net.SASL.Mechanism {
|
|
|
|
|
- case SASLTypeOAuth:
|
|
|
|
|
|
|
+ if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
|
|
|
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider, b.conf.Net.SASL.Extensions)
|
|
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider, b.conf.Net.SASL.Extensions)
|
|
|
- case SASLTypePlaintext:
|
|
|
|
|
- return b.sendAndReceiveSASLPlainAuth()
|
|
|
|
|
- default:
|
|
|
|
|
- return b.sendAndReceiveSASLPlainAuth()
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ return b.sendAndReceiveSASLPlainAuth()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
|
|
func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
|
|
@@ -1020,7 +1015,7 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if header.correlationID != correlationID {
|
|
if header.correlationID != correlationID {
|
|
|
- return bytesRead, errors.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
|
|
|
|
|
|
|
+ return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
buf = make([]byte, header.length-4)
|
|
buf = make([]byte, header.length-4)
|