|
|
@@ -123,6 +123,11 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
}
|
|
|
|
|
|
if conf.Net.SASL.Enable {
|
|
|
+ b.connErr = b.sendAndReceiveSASLPlainHandshake()
|
|
|
+ if b.connErr != nil {
|
|
|
+ Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
|
|
|
+ return
|
|
|
+ }
|
|
|
b.connErr = b.sendAndReceiveSASLPlainAuth()
|
|
|
if b.connErr != nil {
|
|
|
err = b.conn.Close()
|
|
|
@@ -523,6 +528,53 @@ func (b *Broker) responseReceiver() {
|
|
|
close(b.done)
|
|
|
}
|
|
|
|
|
|
+func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
|
|
|
+ rb := &SaslHandshakeRequest{"PLAIN"}
|
|
|
+ req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
|
|
|
+ buf, err := encode(req)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ bytes, err := b.conn.Write(buf)
|
|
|
+ b.updateOutgoingCommunicationMetrics(bytes)
|
|
|
+ if err != nil {
|
|
|
+ Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ b.correlationID++
|
|
|
+ //wait for the response
|
|
|
+ header := make([]byte, 8) // response header
|
|
|
+ n, err := io.ReadFull(b.conn, header)
|
|
|
+ b.updateIncomingCommunicationMetrics(n)
|
|
|
+ length := binary.BigEndian.Uint32(header[:4])
|
|
|
+ payload := make([]byte, length-4)
|
|
|
+ n, err = io.ReadFull(b.conn, payload)
|
|
|
+ if err != nil {
|
|
|
+ Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ b.updateIncomingCommunicationMetrics(n)
|
|
|
+ res := &SaslHandshakeResponse{}
|
|
|
+ err = versionedDecode(payload, res, 0)
|
|
|
+ if err != nil {
|
|
|
+ Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if res.Err != ErrNoError {
|
|
|
+ Logger.Printf("Invalid SASL Mechanism : %s\n", err.Error())
|
|
|
+ return res.Err
|
|
|
+ }
|
|
|
+ Logger.Print("Successful SASL handshake")
|
|
|
+ return nil
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
|
|
|
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
|
|
|
//
|
|
|
@@ -542,6 +594,12 @@ func (b *Broker) responseReceiver() {
|
|
|
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
|
|
|
// of responding to bad credentials but thats how its being done today.
|
|
|
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
+
|
|
|
+ handshakeErr := b.sendAndReceiveSASLPlainHandshake()
|
|
|
+ if handshakeErr != nil {
|
|
|
+ Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
|
|
|
+ return handshakeErr
|
|
|
+ }
|
|
|
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
|
|
|
authBytes := make([]byte, length+4) //4 byte length header + auth data
|
|
|
binary.BigEndian.PutUint32(authBytes, uint32(length))
|