Browse Source

Implement SASL/OAUTHBEARER support

Make changes according to first round of feedback
Mike Kaminski 7 years ago
parent
commit
c8690e4e10
4 changed files with 63 additions and 56 deletions
  1. 45 43
      broker.go
  2. 13 11
      config.go
  3. 4 1
      sasl_authenticate_request.go
  4. 1 1
      sasl_authenticate_response.go

+ 45 - 43
broker.go

@@ -46,22 +46,29 @@ type Broker struct {
 	brokerResponseSize     metrics.Histogram
 }
 
-// SaslMechanism specifies the SASL mechanism the client uses to authenticate with the broker
-type SaslMechanism string
+// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
+type SASLMechanism string
 
 const (
-	// SaslTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
-	SaslTypeOAuth = "OAUTHBEARER"
-	// SaslTypePlaintext represents the SASL/PLAIN mechanism
-	SaslTypePlaintext = "PLAIN"
+	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
+	SASLTypeOAuth = "OAUTHBEARER"
+	// SASLTypePlaintext represents the SASL/PLAIN mechanism
+	SASLTypePlaintext = "PLAIN"
+	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
+	// server negotiate SASL auth using opaque packets.
+	SASLHandshakeV0 = int16(0)
+	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
+	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
+	SASLHandshakeV1 = int16(1)
 )
 
-// OAuthBearerTokenProvider is an interface that encapsualtes bearer token creation in an
-// unopinionated way. Users are free to decide how to construct bearer tokens and how often
-// they are be refreshed.
+// OAuthBearerTokenProvider is the interface that encapsulates how implementors
+// can generate bearer tokens sent to Kafka brokers for authentication.
 type OAuthBearerTokenProvider interface {
-	// Return a valid bearer token.
-	Token() []byte
+	// Token returns a bearer token. Because this method may be called multiple
+	// times, the implementor must ensure that each invocation returns a new,
+	// unexpired token.
+	Token() (string, error)
 }
 
 type responsePromise struct {
@@ -145,9 +152,9 @@ func (b *Broker) Open(conf *Config) error {
 		if conf.Net.SASL.Enable {
 
 			switch conf.Net.SASL.Mechanism {
-			case SaslTypeOAuth:
+			case SASLTypeOAuth:
 				b.connErr = b.sendAndReceiveSASLOAuth(conf.Net.SASL.TokenProvider)
-			case SaslTypePlaintext:
+			case SASLTypePlaintext:
 				b.connErr = b.sendAndReceiveSASLPlainAuth()
 			default:
 				b.connErr = b.sendAndReceiveSASLPlainAuth()
@@ -772,7 +779,7 @@ func (b *Broker) responseReceiver() {
 }
 
 func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
-	rb := &SaslHandshakeRequest{saslType, version}
+	rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
 
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
 	buf, err := encode(req, b.conf.MetricRegistry)
@@ -842,7 +849,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) err
 // of responding to bad credentials but thats how its being done today.
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	if b.conf.Net.SASL.Handshake {
-		handshakeErr := b.sendAndReceiveSASLHandshake(SaslTypePlaintext, 0)
+		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
 			return handshakeErr
@@ -885,21 +892,23 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
 func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider OAuthBearerTokenProvider) error {
 
-	// This version allows us to wrap tokens in the Kafka protocol, as opposed
-	// to sending opaque packets
-	handshakeVersion := int16(1)
-
-	if err := b.sendAndReceiveSASLHandshake(SaslTypeOAuth, handshakeVersion); err != nil {
-		Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
+	if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
 		return err
 	}
 
 	requestTime := time.Now()
 
-	var bytesWritten int
-	var err error
+	var (
+		bytesWritten int
+		err          error
+		token        string
+	)
+
+	if token, err = tokenProvider.Token(); err != nil {
+		return err
+	}
 
-	if bytesWritten, err = b.sendSASLOAuthBearerClientResponse(tokenProvider.Token()); err != nil {
+	if bytesWritten, err = b.sendSASLOAuthBearerClientResponse(token); err != nil {
 		return err
 	}
 
@@ -917,23 +926,21 @@ func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider OAuthBearerTokenProvider)
 	return nil
 }
 
-func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken []byte) (int, error) {
+func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken string) (int, error) {
 
 	// Initial client response as described by RFC-7628
 	// https://tools.ietf.org/html/rfc7628
-	oauthRequest := []byte(`n,,`)
-	oauthRequest = append(oauthRequest, '\x01')
-	oauthRequest = append(oauthRequest, []byte(`auth=Bearer `)...)
-	oauthRequest = append(oauthRequest, bearerToken...)
-	oauthRequest = append(oauthRequest, '\x01')
-	oauthRequest = append(oauthRequest, '\x01')
+
+	oauthRequest := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s\x01\x01", bearerToken))
 
 	rb := &SaslAuthenticateRequest{oauthRequest}
 
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
 
-	var buf []byte
-	var err error
+	var (
+		buf []byte
+		err error
+	)
 
 	buf, err = encode(req, b.conf.MetricRegistry)
 
@@ -948,7 +955,6 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken []byte) (int, err
 	var bytesWritten int
 
 	if bytesWritten, err = b.conn.Write(buf); err != nil {
-		Logger.Printf("Failed to send SASL/OAUTHBEARER initial request %s: %s\n", b.addr, err.Error())
 		return 0, err
 	}
 
@@ -959,14 +965,15 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken []byte) (int, err
 
 func (b *Broker) receiveSASLOAuthBearerServerResponse() (int, error) {
 
-	var bytesRead int
-	var err error
-	totalBytesRead := 0
+	var (
+		bytesRead      int
+		err            error
+		totalBytesRead int
+	)
 
 	header := make([]byte, 8)
 
 	if bytesRead, err = io.ReadFull(b.conn, header); err != nil {
-		Logger.Printf("Failed to read SASL/OAUTHBEARER intitial response header : %s\n", err.Error())
 		return 0, err
 	}
 
@@ -976,7 +983,6 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse() (int, error) {
 	payload := make([]byte, length-4)
 
 	if bytesRead, err = io.ReadFull(b.conn, payload); err != nil {
-		Logger.Printf("Failed to read SASL/OAUTHBEARER intitial response payload : %s\n", err.Error())
 		return bytesRead, err
 	}
 
@@ -985,17 +991,13 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse() (int, error) {
 	res := &SaslAuthenticateResponse{}
 
 	if err := versionedDecode(payload, res, 0); err != nil {
-		Logger.Printf("Failed to parse SASL/OAUTHBEARER intitial response : %s\n", err.Error())
 		return bytesRead, err
 	}
 
 	if res.Err != ErrNoError {
-		Logger.Printf("Invalid SASL/OAUTHBEARER request : %s\n", res.Err.Error())
 		return bytesRead, res.Err
 	}
 
-	Logger.Print("Successfully authenticated via SASL/OAUTHBEARER")
-
 	return totalBytesRead, nil
 }
 

+ 13 - 11
config.go

@@ -54,8 +54,9 @@ type Config struct {
 			// Whether or not to use SASL authentication when connecting to the broker
 			// (defaults to false).
 			Enable bool
-			// The type of SASL mechanism to enable. Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN)
-			Mechanism SaslMechanism
+			// SASLMechanism is the name of the enabled SASL mechanism.
+			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
+			Mechanism SASLMechanism
 			// Whether or not to send the Kafka SASL handshake first if enabled
 			// (defaults to true). You should only set this to false if you're using
 			// a non-Kafka SASL proxy.
@@ -63,9 +64,10 @@ type Config struct {
 			//username and password for SASL/PLAIN authentication
 			User     string
 			Password string
-			// TokenProvider is a bearer token generator for the OAUTHBEARER flow. You can define an instance of
-			// OAuthBearerTokenProvider that generates authentication tokens according to your Kafka cluster's
-			// configuration.
+			// TokenProvider is a bearer token generator for the OAUTHBEARER
+			// flow. You can define an instance of OAuthBearerTokenProvider
+			// that generates authentication tokens according to your Kafka
+			// cluster's configuration.
 			TokenProvider OAuthBearerTokenProvider
 		}
 
@@ -460,26 +462,26 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Net.WriteTimeout must be > 0")
 	case c.Net.KeepAlive < 0:
 		return ConfigurationError("Net.KeepAlive must be >= 0")
-	case c.Net.SASL.Enable == true:
+	case c.Net.SASL.Enable:
 		// For backwards compatibility, empty mechanism value defaults to PLAIN
-		isSaslPlain := len(c.Net.SASL.Mechanism) == 0 || c.Net.SASL.Mechanism == SaslTypePlaintext
-		if isSaslPlain {
+		isSASLPlain := len(c.Net.SASL.Mechanism) == 0 || c.Net.SASL.Mechanism == SASLTypePlaintext
+		if isSASLPlain {
 			if c.Net.SASL.User == "" {
 				return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
 			}
 			if c.Net.SASL.Password == "" {
 				return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
 			}
-		} else if c.Net.SASL.Mechanism == SaslTypeOAuth {
+		} else if c.Net.SASL.Mechanism == SASLTypeOAuth {
 			if c.Net.SASL.TokenProvider == nil {
 				return ConfigurationError("A OAuthBearerTokenProvider instance must be provided to Net.SASL.User.TokenProvider")
 			}
 			if !c.Net.SASL.Handshake {
-				Logger.Println("A SASL hanshake is required for SASL/OAUTHBEARER, ignoring disabled handshake config")
+				Logger.Println("A SASL handshake is required for SASL/OAUTHBEARER, ignoring disabled handshake config")
 			}
 		} else {
 			msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s` and `%s`",
-				SaslTypeOAuth, SaslTypePlaintext)
+				SASLTypeOAuth, SASLTypePlaintext)
 			return ConfigurationError(msg)
 		}
 	}

+ 4 - 1
sasl_authenticate_request.go

@@ -4,6 +4,9 @@ type SaslAuthenticateRequest struct {
 	SaslAuthBytes []byte
 }
 
+// APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
+const APIKeySASLAuth = 36
+
 func (r *SaslAuthenticateRequest) encode(pe packetEncoder) error {
 	if err := pe.putBytes(r.SaslAuthBytes); err != nil {
 		return err
@@ -21,7 +24,7 @@ func (r *SaslAuthenticateRequest) decode(pd packetDecoder, version int16) (err e
 }
 
 func (r *SaslAuthenticateRequest) key() int16 {
-	return 36
+	return APIKeySASLAuth
 }
 
 func (r *SaslAuthenticateRequest) version() int16 {

+ 1 - 1
sasl_authenticate_response.go

@@ -34,7 +34,7 @@ func (r *SaslAuthenticateResponse) decode(pd packetDecoder, version int16) error
 }
 
 func (r *SaslAuthenticateResponse) key() int16 {
-	return 36
+	return APIKeySASLAuth
 }
 
 func (r *SaslAuthenticateResponse) version() int16 {