Selaa lähdekoodia

refactor SASL PLAIN auth into its own function

Shriram Rajagopalan 9 vuotta sitten
vanhempi
commit
5ac5287ba1
1 muutettua tiedostoa jossa 56 lisäystä ja 26 poistoa
  1. 56 26
      broker.go

+ 56 - 26
broker.go

@@ -86,34 +86,14 @@ func (b *Broker) Open(conf *Config) error {
 		b.conf = conf
 
 		if conf.Net.SASL.Enable {
-			//
-			// Begin SASL/PLAIN authentication
-			//
-			authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
-			buf := new(bytes.Buffer)
-
-			err = binary.Write(buf, binary.BigEndian, int32(len(authBytes)))
-			if err != nil {
-				Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error())
-			}
-
-			err = binary.Write(buf, binary.BigEndian, authBytes)
-			if err != nil {
-				Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error())
-			}
-
-			b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
-			b.conn.Write(buf.Bytes())
-
-			header := make([]byte, 4)
-			n, err := io.ReadFull(b.conn, header)
+			err = b.doSASLPlainAuth()
 			if err != nil {
-				Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error())
+				Logger.Printf("SASL authentication with broker %s failed\n", b.addr)
+				_ = b.Close()
+				return
+			} else {
+				Logger.Printf("SASL authentication with broker %s succeeded\n", b.addr)
 			}
-			Logger.Printf("SASL authentication successful:\n%v\n%v\n%v", n, header, string(header))
-			//
-			// End SASL/PLAIN authentication
-			//
 		}
 
 		b.done = make(chan bool)
@@ -488,3 +468,53 @@ func (b *Broker) responseReceiver() {
 	}
 	close(b.done)
 }
+
+// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
+// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
+//
+// In SASL Plain, Kafka expects the auth header to be in the following format
+// Message format (from https://tools.ietf.org/html/rfc4616):
+//
+//   message   = [authzid] UTF8NUL authcid UTF8NUL passwd
+//   authcid   = 1*SAFE ; MUST accept up to 255 octets
+//   authzid   = 1*SAFE ; MUST accept up to 255 octets
+//   passwd    = 1*SAFE ; MUST accept up to 255 octets
+//   UTF8NUL   = %x00 ; UTF-8 encoded NUL character
+//
+//   SAFE      = UTF1 / UTF2 / UTF3 / UTF4
+//                  ;; any UTF-8 encoded Unicode character except NUL
+//
+// When credentials are valid, Kafka returns a 4 byte array of null characters.
+// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
+// of responding to bad credentials but thats how its being done today. 
+func (b *Broker) doSASLPlainAuth() error {
+	authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+	buf := new(bytes.Buffer)
+
+	err := binary.Write(buf, binary.BigEndian, int32(len(authBytes)))
+	if err != nil {
+		Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error())
+		return err
+	}
+
+	err = binary.Write(buf, binary.BigEndian, authBytes)
+	if err != nil {
+		Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error())
+		return err
+	}
+
+	b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+	b.conn.Write(buf.Bytes())
+
+	header := make([]byte, 4)
+	n, err := io.ReadFull(b.conn, header)
+	// If the credentials are valid, we would get a 4 byte response filled with null characters.
+	// Otherwise, the broker closes the connection and we get an EOF
+	if err != nil {
+		Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error())
+		return err
+	}
+
+	Logger.Printf("SASL authentication successful:%v - %v", n, header)
+	return nil
+}