Browse Source

Add support for SASL plain text authentication

Shriram Rajagopalan 9 years ago
parent
commit
3834ba1ce0
2 changed files with 54 additions and 0 deletions
  1. 34 0
      broker.go
  2. 20 0
      config.go

+ 34 - 0
broker.go

@@ -9,6 +9,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+	"bytes"
+	"encoding/binary"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -82,6 +84,38 @@ func (b *Broker) Open(conf *Config) error {
 		b.conn = newBufConn(b.conn)
 
 		b.conf = conf
+
+		if conf.Net.SASL.Enable {
+			//
+			// Begin SASL/PLAIN authentication
+			//
+			authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+			buf := new(bytes.Buffer)
+
+			err = binary.Write(buf, binary.BigEndian, int32(len(authBytes)))
+			if err != nil {
+				Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error())
+			}
+
+			err = binary.Write(buf, binary.BigEndian, authBytes)
+			if err != nil {
+				Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error())
+			}
+
+			b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+			b.conn.Write(buf.Bytes())
+
+			header := make([]byte, 4)
+			n, err := io.ReadFull(b.conn, header)
+			if err != nil {
+				Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error())
+			}
+			Logger.Printf("SASL authentication successful:\n%v\n%v\n%v", n, header, string(header))
+			//
+			// End SASL/PLAIN authentication
+			//
+		}
+
 		b.done = make(chan bool)
 		b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
 

+ 20 - 0
config.go

@@ -33,6 +33,17 @@ type Config struct {
 			Config *tls.Config
 		}
 
+		// SASL based authentication with broker. While there are multiple SASL authentication methods
+		// the current implementation is limited to plaintext (SASL/PLAIN) authentication
+		SASL struct {
+			// Whether or not to use SASL authentication when connecting to the broker
+			// (defaults to false).
+			Enable bool
+			//username and password for SASL/PLAIN authentication
+			User string
+			Password string
+		}
+
 		// KeepAlive specifies the keep-alive period for an active network connection.
 		// If zero, keep-alives are disabled. (default is 0: disabled).
 		KeepAlive time.Duration
@@ -222,6 +233,7 @@ func NewConfig() *Config {
 	c.Net.DialTimeout = 30 * time.Second
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
+	c.Net.SASL.Enable = false
 
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond
@@ -256,6 +268,14 @@ func (c *Config) Validate() error {
 	if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil {
 		Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
 	}
+	if c.Net.SASL.Enable == false {
+		if c.Net.SASL.User != "" {
+			Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
+		}
+		if c.Net.SASL.Password != "" {
+			Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
+		}
+	}
 	if c.Producer.RequiredAcks > 1 {
 		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
 	}