|
@@ -189,7 +189,6 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
}
|
|
|
|
|
|
if conf.Net.SASL.Enable {
|
|
|
-
|
|
|
b.connErr = b.authenticateViaSASL()
|
|
|
|
|
|
if b.connErr != nil {
|
|
@@ -961,7 +960,6 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
|
|
|
|
|
|
if b.conf.Net.SASL.Handshake {
|
|
|
-
|
|
|
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
|
|
|
if handshakeErr != nil {
|
|
|
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
|
|
@@ -977,7 +975,6 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
|
|
|
|
|
|
func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
|
|
|
-
|
|
|
length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
|
|
|
authBytes := make([]byte, length+4)
|
|
|
binary.BigEndian.PutUint32(authBytes, uint32(length))
|
|
@@ -1068,7 +1065,6 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
|
|
|
|
|
|
|
|
|
func (b *Broker) sendClientMessage(message []byte) (bool, error) {
|
|
|
-
|
|
|
requestTime := time.Now()
|
|
|
correlationID := b.correlationID
|
|
|
|
|
@@ -1108,7 +1104,6 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
|
|
|
msg, err := scramClient.Step("")
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
|
|
|
-
|
|
|
}
|
|
|
|
|
|
for !scramClient.Done() {
|
|
@@ -1228,7 +1223,6 @@ func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, erro
|
|
|
}
|
|
|
|
|
|
func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
|
|
|
-
|
|
|
rb := &SaslAuthenticateRequest{initialResp}
|
|
|
|
|
|
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
|
|
@@ -1303,7 +1297,6 @@ func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
|
|
|
if b.brokerRequestLatency != nil {
|
|
|
b.brokerRequestLatency.Update(requestLatencyInMs)
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
|
|
@@ -1322,7 +1315,6 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
|
|
|
if b.brokerRequestSize != nil {
|
|
|
b.brokerRequestSize.Update(requestSize)
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
func (b *Broker) registerMetrics() {
|