|
|
@@ -67,6 +67,17 @@ const (
|
|
|
SASLExtKeyAuth = "auth"
|
|
|
)
|
|
|
|
|
|
+// AccessToken contains an access token used to authenticate a
|
|
|
+// SASL/OAUTHBEARER client along with associated metadata.
|
|
|
+type AccessToken struct {
|
|
|
+ // Token is the access token payload.
|
|
|
+ Token string
|
|
|
+ // Extensions is a optional map of arbitrary key-value pairs that can be
|
|
|
+ // sent with the SASL/OAUTHBEARER initial client response. These values are
|
|
|
+ // ignored by the SASL server if they are unexpected.
|
|
|
+ Extensions map[string]string
|
|
|
+}
|
|
|
+
|
|
|
// AccessTokenProvider is the interface that encapsulates how implementors
|
|
|
// can generate access tokens for Kafka broker authentication.
|
|
|
type AccessTokenProvider interface {
|
|
|
@@ -75,7 +86,7 @@ type AccessTokenProvider interface {
|
|
|
// not block indefinitely. A timeout error should be returned after a short
|
|
|
// period of inactivity so that the broker connection logic can log
|
|
|
// debugging information and retry.
|
|
|
- Token() (string, error)
|
|
|
+ Token() (*AccessToken, error)
|
|
|
}
|
|
|
|
|
|
type responsePromise struct {
|
|
|
@@ -780,7 +791,7 @@ func (b *Broker) responseReceiver() {
|
|
|
|
|
|
func (b *Broker) authenticateViaSASL() error {
|
|
|
if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
|
|
|
- return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider, b.conf.Net.SASL.Extensions)
|
|
|
+ return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
|
|
|
}
|
|
|
return b.sendAndReceiveSASLPlainAuth()
|
|
|
}
|
|
|
@@ -897,13 +908,13 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
|
|
|
// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
|
|
|
// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
|
|
|
-func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider AccessTokenProvider, extensions map[string]string) error {
|
|
|
+func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
|
|
|
|
|
|
if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- token, err := tokenProvider.Token()
|
|
|
+ token, err := provider.Token()
|
|
|
|
|
|
if err != nil {
|
|
|
return err
|
|
|
@@ -913,7 +924,7 @@ func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider AccessTokenProvider, exte
|
|
|
|
|
|
correlationID := b.correlationID
|
|
|
|
|
|
- bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, extensions, correlationID)
|
|
|
+ bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
|
|
|
|
|
|
if err != nil {
|
|
|
return err
|
|
|
@@ -937,19 +948,18 @@ func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider AccessTokenProvider, exte
|
|
|
|
|
|
// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
|
|
|
// https://tools.ietf.org/html/rfc7628
|
|
|
-func buildClientInitialResponse(bearerToken string, extensions map[string]string) ([]byte, error) {
|
|
|
-
|
|
|
- if _, ok := extensions[SASLExtKeyAuth]; ok {
|
|
|
- return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
|
|
|
- }
|
|
|
+func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
|
|
|
|
|
|
ext := ""
|
|
|
|
|
|
- if len(extensions) > 0 {
|
|
|
- ext = "\x01" + mapToString(extensions, "=", "\x01")
|
|
|
+ if token.Extensions != nil && len(token.Extensions) > 0 {
|
|
|
+ if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
|
|
|
+ return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
|
|
|
+ }
|
|
|
+ ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
|
|
|
}
|
|
|
|
|
|
- resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", bearerToken, ext))
|
|
|
+ resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
|
|
|
|
|
|
return resp, nil
|
|
|
}
|
|
|
@@ -969,9 +979,9 @@ func mapToString(extensions map[string]string, keyValSep string, elemSep string)
|
|
|
return strings.Join(buf, elemSep)
|
|
|
}
|
|
|
|
|
|
-func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken string, extensions map[string]string, correlationID int32) (int, error) {
|
|
|
+func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
|
|
|
|
|
|
- initialResp, err := buildClientInitialResponse(bearerToken, extensions)
|
|
|
+ initialResp, err := buildClientInitialResponse(token)
|
|
|
|
|
|
if err != nil {
|
|
|
return 0, err
|