Преглед на файлове

Merge pull request #1240 from mkaminski1988/master

Implement SASL/OAUTHBEARER support
Vlad Gorodetsky преди 7 години
родител
ревизия
0a21d90df4
променени са 11 файла, в които са добавени 625 реда и са изтрити 9 реда
  1. 211 4
      broker.go
  2. 196 0
      broker_test.go
  3. 27 4
      config.go
  4. 21 0
      config_test.go
  5. 54 0
      mockresponses.go
  6. 2 0
      request.go
  7. 29 0
      sasl_authenticate_request.go
  8. 17 0
      sasl_authenticate_request_test.go
  9. 44 0
      sasl_authenticate_response.go
  10. 22 0
      sasl_authenticate_response_test.go
  11. 2 1
      sasl_handshake_request.go

+ 211 - 4
broker.go

@@ -6,7 +6,9 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"sort"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -46,6 +48,50 @@ type Broker struct {
 	brokerResponseSize     metrics.Histogram
 }
 
+// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
+type SASLMechanism string
+
+const (
+	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
+	SASLTypeOAuth = "OAUTHBEARER"
+	// SASLTypePlaintext represents the SASL/PLAIN mechanism
+	SASLTypePlaintext = "PLAIN"
+	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
+	// server negotiate SASL auth using opaque packets.
+	SASLHandshakeV0 = int16(0)
+	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
+	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
+	SASLHandshakeV1 = int16(1)
+	// SASLExtKeyAuth is the reserved extension key name sent as part of the
+	// SASL/OAUTHBEARER intial client response
+	SASLExtKeyAuth = "auth"
+)
+
+// AccessToken contains an access token used to authenticate a
+// SASL/OAUTHBEARER client along with associated metadata.
+type AccessToken struct {
+	// Token is the access token payload.
+	Token string
+	// Extensions is a optional map of arbitrary key-value pairs that can be
+	// sent with the SASL/OAUTHBEARER initial client response. These values are
+	// ignored by the SASL server if they are unexpected. This feature is only
+	// supported by Kafka >= 2.1.0.
+	Extensions map[string]string
+}
+
+// AccessTokenProvider is the interface that encapsulates how implementors
+// can generate access tokens for Kafka broker authentication.
+type AccessTokenProvider interface {
+	// Token returns an access token. The implementation should ensure token
+	// reuse so that multiple calls at connect time do not create multiple
+	// tokens. The implementation should also periodically refresh the token in
+	// order to guarantee that each call returns an unexpired token.  This
+	// method should not block indefinitely--a timeout error should be returned
+	// after a short period of inactivity so that the broker connection logic
+	// can log debugging information and retry.
+	Token() (*AccessToken, error)
+}
+
 type responsePromise struct {
 	requestTime   time.Time
 	correlationID int32
@@ -125,7 +171,9 @@ func (b *Broker) Open(conf *Config) error {
 		}
 
 		if conf.Net.SASL.Enable {
-			b.connErr = b.sendAndReceiveSASLPlainAuth()
+
+			b.connErr = b.authenticateViaSASL()
+
 			if b.connErr != nil {
 				err = b.conn.Close()
 				if err == nil {
@@ -744,8 +792,16 @@ func (b *Broker) responseReceiver() {
 	close(b.done)
 }
 
-func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
-	rb := &SaslHandshakeRequest{"PLAIN"}
+func (b *Broker) authenticateViaSASL() error {
+	if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
+		return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
+	}
+	return b.sendAndReceiveSASLPlainAuth()
+}
+
+func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
+	rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
+
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
 	buf, err := encode(req, b.conf.MetricRegistry)
 	if err != nil {
@@ -814,7 +870,7 @@ func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
 // of responding to bad credentials but thats how its being done today.
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	if b.conf.Net.SASL.Handshake {
-		handshakeErr := b.sendAndReceiveSASLPlainHandshake()
+		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
 			return handshakeErr
@@ -853,6 +909,157 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	return nil
 }
 
+// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
+// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
+func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
+
+	if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
+		return err
+	}
+
+	token, err := provider.Token()
+
+	if err != nil {
+		return err
+	}
+
+	requestTime := time.Now()
+
+	correlationID := b.correlationID
+
+	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+
+	if err != nil {
+		return err
+	}
+
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
+
+	b.correlationID++
+
+	bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
+
+	if err != nil {
+		return err
+	}
+
+	requestLatency := time.Since(requestTime)
+	b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+
+	return nil
+}
+
+// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
+// https://tools.ietf.org/html/rfc7628
+func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+
+	var ext string
+
+	if token.Extensions != nil && len(token.Extensions) > 0 {
+		if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
+			return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
+		}
+		ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
+	}
+
+	resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
+
+	return resp, nil
+}
+
+// mapToString returns a list of key-value pairs ordered by key.
+// keyValSep separates the key from the value. elemSep separates each pair.
+func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
+
+	buf := make([]string, 0, len(extensions))
+
+	for k, v := range extensions {
+		buf = append(buf, k+keyValSep+v)
+	}
+
+	sort.Strings(buf)
+
+	return strings.Join(buf, elemSep)
+}
+
+func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
+
+	initialResp, err := buildClientInitialResponse(token)
+
+	if err != nil {
+		return 0, err
+	}
+
+	rb := &SaslAuthenticateRequest{initialResp}
+
+	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
+
+	buf, err := encode(req, b.conf.MetricRegistry)
+
+	if err != nil {
+		return 0, err
+	}
+
+	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+		return 0, err
+	}
+
+	return b.conn.Write(buf)
+}
+
+func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
+
+	buf := make([]byte, 8)
+
+	bytesRead, err := io.ReadFull(b.conn, buf)
+
+	if err != nil {
+		return bytesRead, err
+	}
+
+	header := responseHeader{}
+
+	err = decode(buf, &header)
+
+	if err != nil {
+		return bytesRead, err
+	}
+
+	if header.correlationID != correlationID {
+		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
+	}
+
+	buf = make([]byte, header.length-4)
+
+	c, err := io.ReadFull(b.conn, buf)
+
+	bytesRead += c
+
+	if err != nil {
+		return bytesRead, err
+	}
+
+	res := &SaslAuthenticateResponse{}
+
+	if err := versionedDecode(buf, res, 0); err != nil {
+		return bytesRead, err
+	}
+
+	if err != nil {
+		return bytesRead, err
+	}
+
+	if res.Err != ErrNoError {
+		return bytesRead, res.Err
+	}
+
+	if len(res.SaslAuthBytes) > 0 {
+		Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
+	}
+
+	return bytesRead, nil
+}
+
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
 	b.updateRequestLatencyMetrics(requestLatency)
 	b.responseRate.Mark(1)

+ 196 - 0
broker_test.go

@@ -1,9 +1,14 @@
 package sarama
 
 import (
+	"errors"
 	"fmt"
+	"net"
+	"reflect"
 	"testing"
 	"time"
+
+	metrics "github.com/rcrowley/go-metrics"
 )
 
 func ExampleBroker() {
@@ -105,6 +110,197 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 
 }
 
+var ErrTokenFailure = errors.New("Failure generating token")
+
+type TokenProvider struct {
+	accessToken *AccessToken
+	err         error
+}
+
+func (t *TokenProvider) Token() (*AccessToken, error) {
+	return t.accessToken, t.err
+}
+
+func newTokenProvider(token *AccessToken, err error) *TokenProvider {
+	return &TokenProvider{
+		accessToken: token,
+		err:         err,
+	}
+}
+
+func TestSASLOAuthBearer(t *testing.T) {
+
+	testTable := []struct {
+		name             string
+		mockAuthErr      KError // Mock and expect error returned from SaslAuthenticateRequest
+		mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
+		expectClientErr  bool   // Expect an internal client-side error
+		tokProvider      *TokenProvider
+	}{
+		{
+			name:             "SASL/OAUTHBEARER OK server response",
+			mockAuthErr:      ErrNoError,
+			mockHandshakeErr: ErrNoError,
+			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+		},
+		{
+			name:             "SASL/OAUTHBEARER authentication failure response",
+			mockAuthErr:      ErrSASLAuthenticationFailed,
+			mockHandshakeErr: ErrNoError,
+			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+		},
+		{
+			name:             "SASL/OAUTHBEARER handshake failure response",
+			mockAuthErr:      ErrNoError,
+			mockHandshakeErr: ErrSASLAuthenticationFailed,
+			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+		},
+		{
+			name:             "SASL/OAUTHBEARER token generation error",
+			mockAuthErr:      ErrNoError,
+			mockHandshakeErr: ErrNoError,
+			expectClientErr:  true,
+			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
+		},
+		{
+			name:             "SASL/OAUTHBEARER invalid extension",
+			mockAuthErr:      ErrNoError,
+			mockHandshakeErr: ErrNoError,
+			expectClientErr:  true,
+			tokProvider: newTokenProvider(&AccessToken{
+				Token:      "access-token-123",
+				Extensions: map[string]string{"auth": "auth-value"},
+			}, nil),
+		},
+	}
+
+	for i, test := range testTable {
+
+		// mockBroker mocks underlying network logic and broker responses
+		mockBroker := NewMockBroker(t, 0)
+
+		mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
+			SetAuthBytes([]byte(`response_payload`))
+
+		if test.mockAuthErr != ErrNoError {
+			mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
+		}
+
+		mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
+			SetEnabledMechanisms([]string{SASLTypeOAuth})
+
+		if test.mockHandshakeErr != ErrNoError {
+			mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
+		}
+
+		mockBroker.SetHandlerByMap(map[string]MockResponse{
+			"SaslAuthenticateRequest": mockSASLAuthResponse,
+			"SaslHandshakeRequest":    mockSASLHandshakeResponse,
+		})
+
+		// broker executes SASL requests against mockBroker
+		broker := NewBroker(mockBroker.Addr())
+		broker.requestRate = metrics.NilMeter{}
+		broker.outgoingByteRate = metrics.NilMeter{}
+		broker.incomingByteRate = metrics.NilMeter{}
+		broker.requestSize = metrics.NilHistogram{}
+		broker.responseSize = metrics.NilHistogram{}
+		broker.responseRate = metrics.NilMeter{}
+		broker.requestLatency = metrics.NilHistogram{}
+
+		conf := NewConfig()
+		conf.Net.SASL.Mechanism = SASLTypeOAuth
+		conf.Net.SASL.TokenProvider = test.tokProvider
+
+		broker.conf = conf
+
+		dialer := net.Dialer{
+			Timeout:   conf.Net.DialTimeout,
+			KeepAlive: conf.Net.KeepAlive,
+			LocalAddr: conf.Net.LocalAddr,
+		}
+
+		conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
+
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		broker.conn = conn
+
+		err = broker.authenticateViaSASL()
+
+		if test.mockAuthErr != ErrNoError {
+			if test.mockAuthErr != err {
+				t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
+			}
+		} else if test.mockHandshakeErr != ErrNoError {
+			if test.mockHandshakeErr != err {
+				t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
+			}
+		} else if test.expectClientErr && err == nil {
+			t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
+		} else if !test.expectClientErr && err != nil {
+			t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
+		}
+
+		mockBroker.Close()
+	}
+}
+
+func TestBuildClientInitialResponse(t *testing.T) {
+
+	testTable := []struct {
+		name        string
+		token       *AccessToken
+		expected    []byte
+		expectError bool
+	}{
+		{
+			name: "Build SASL client initial response with two extensions",
+			token: &AccessToken{
+				Token: "the-token",
+				Extensions: map[string]string{
+					"x": "1",
+					"y": "2",
+				},
+			},
+			expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
+		},
+		{
+			name:     "Build SASL client initial response with no extensions",
+			token:    &AccessToken{Token: "the-token"},
+			expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
+		},
+		{
+			name: "Build SASL client initial response using reserved extension",
+			token: &AccessToken{
+				Token: "the-token",
+				Extensions: map[string]string{
+					"auth": "auth-value",
+				},
+			},
+			expected:    []byte(""),
+			expectError: true,
+		},
+	}
+
+	for i, test := range testTable {
+
+		actual, err := buildClientInitialResponse(test.token)
+
+		if !reflect.DeepEqual(test.expected, actual) {
+			t.Errorf("Expected %s, got %s\n", test.expected, actual)
+		}
+		if test.expectError && err == nil {
+			t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
+		}
+		if !test.expectError && err != nil {
+			t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
+		}
+	}
+}
+
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {
 	version  KafkaVersion

+ 27 - 4
config.go

@@ -54,6 +54,9 @@ type Config struct {
 			// Whether or not to use SASL authentication when connecting to the broker
 			// (defaults to false).
 			Enable bool
+			// SASLMechanism is the name of the enabled SASL mechanism.
+			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
+			Mechanism SASLMechanism
 			// Whether or not to send the Kafka SASL handshake first if enabled
 			// (defaults to true). You should only set this to false if you're using
 			// a non-Kafka SASL proxy.
@@ -61,6 +64,11 @@ type Config struct {
 			//username and password for SASL/PLAIN authentication
 			User     string
 			Password string
+			// TokenProvider is a user-defined callback for generating
+			// access tokens for SASL/OAUTHBEARER auth. See the
+			// AccessTokenProvider interface docs for proper implementation
+			// guidelines.
+			TokenProvider AccessTokenProvider
 		}
 
 		// KeepAlive specifies the keep-alive period for an active network connection.
@@ -454,10 +462,25 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Net.WriteTimeout must be > 0")
 	case c.Net.KeepAlive < 0:
 		return ConfigurationError("Net.KeepAlive must be >= 0")
-	case c.Net.SASL.Enable == true && c.Net.SASL.User == "":
-		return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
-	case c.Net.SASL.Enable == true && c.Net.SASL.Password == "":
-		return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
+	case c.Net.SASL.Enable:
+		// For backwards compatibility, empty mechanism value defaults to PLAIN
+		isSASLPlain := len(c.Net.SASL.Mechanism) == 0 || c.Net.SASL.Mechanism == SASLTypePlaintext
+		if isSASLPlain {
+			if c.Net.SASL.User == "" {
+				return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
+			}
+			if c.Net.SASL.Password == "" {
+				return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
+			}
+		} else if c.Net.SASL.Mechanism == SASLTypeOAuth {
+			if c.Net.SASL.TokenProvider == nil {
+				return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.User.TokenProvider")
+			}
+		} else {
+			msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s` and `%s`",
+				SASLTypeOAuth, SASLTypePlaintext)
+			return ConfigurationError(msg)
+		}
 	}
 
 	// validate the Admin values

+ 21 - 0
config_test.go

@@ -33,6 +33,13 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
 	}
 }
 
+type DummyTokenProvider struct {
+}
+
+func (t *DummyTokenProvider) Token() (*AccessToken, error) {
+	return &AccessToken{Token: "access-token-string"}, nil
+}
+
 func TestNetConfigValidates(t *testing.T) {
 	tests := []struct {
 		name string
@@ -78,6 +85,20 @@ func TestNetConfigValidates(t *testing.T) {
 				cfg.Net.SASL.Password = ""
 			},
 			"Net.SASL.Password must not be empty when SASL is enabled"},
+		{"SASL.Mechanism - Invalid mechanism type",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
+				cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
+			},
+			"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER` and `PLAIN`"},
+		{"SASL.Mechanism.OAUTHBEARER - Missing token provider",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = SASLTypeOAuth
+				cfg.Net.SASL.TokenProvider = nil
+			},
+			"An AccessTokenProvider instance must be provided to Net.SASL.User.TokenProvider"},
 	}
 
 	for i, test := range tests {

+ 54 - 0
mockresponses.go

@@ -706,10 +706,64 @@ func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
 	return res
 }
 
+type MockSaslAuthenticateResponse struct {
+	t             TestReporter
+	kerror        KError
+	saslAuthBytes []byte
+}
+
+func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
+	return &MockSaslAuthenticateResponse{t: t}
+}
+
+func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
+	res := &SaslAuthenticateResponse{}
+	res.Err = msar.kerror
+	res.SaslAuthBytes = msar.saslAuthBytes
+	return res
+}
+
+func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
+	msar.kerror = kerror
+	return msar
+}
+
+func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
+	msar.saslAuthBytes = saslAuthBytes
+	return msar
+}
+
 type MockDeleteAclsResponse struct {
 	t TestReporter
 }
 
+type MockSaslHandshakeResponse struct {
+	enabledMechanisms []string
+	kerror            KError
+	t                 TestReporter
+}
+
+func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
+	return &MockSaslHandshakeResponse{t: t}
+}
+
+func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
+	res := &SaslHandshakeResponse{}
+	res.Err = mshr.kerror
+	res.EnabledMechanisms = mshr.enabledMechanisms
+	return res
+}
+
+func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
+	mshr.kerror = kerror
+	return mshr
+}
+
+func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
+	mshr.enabledMechanisms = enabledMechanisms
+	return mshr
+}
+
 func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
 	return &MockDeleteAclsResponse{t: t}
 }

+ 2 - 0
request.go

@@ -140,6 +140,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &DescribeConfigsRequest{}
 	case 33:
 		return &AlterConfigsRequest{}
+	case 36:
+		return &SaslAuthenticateRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
 	case 42:

+ 29 - 0
sasl_authenticate_request.go

@@ -0,0 +1,29 @@
+package sarama
+
+type SaslAuthenticateRequest struct {
+	SaslAuthBytes []byte
+}
+
+// APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
+const APIKeySASLAuth = 36
+
+func (r *SaslAuthenticateRequest) encode(pe packetEncoder) error {
+	return pe.putBytes(r.SaslAuthBytes)
+}
+
+func (r *SaslAuthenticateRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.SaslAuthBytes, err = pd.getBytes()
+	return err
+}
+
+func (r *SaslAuthenticateRequest) key() int16 {
+	return APIKeySASLAuth
+}
+
+func (r *SaslAuthenticateRequest) version() int16 {
+	return 0
+}
+
+func (r *SaslAuthenticateRequest) requiredVersion() KafkaVersion {
+	return V1_0_0_0
+}

+ 17 - 0
sasl_authenticate_request_test.go

@@ -0,0 +1,17 @@
+package sarama
+
+import "testing"
+
+var (
+	saslAuthenticateRequest = []byte{
+		0, 0, 0, 3, 'f', 'o', 'o',
+	}
+)
+
+func TestSaslAuthenticateRequest(t *testing.T) {
+	var request *SaslAuthenticateRequest
+
+	request = new(SaslAuthenticateRequest)
+	request.SaslAuthBytes = []byte(`foo`)
+	testRequest(t, "basic", request, saslAuthenticateRequest)
+}

+ 44 - 0
sasl_authenticate_response.go

@@ -0,0 +1,44 @@
+package sarama
+
+type SaslAuthenticateResponse struct {
+	Err           KError
+	ErrorMessage  *string
+	SaslAuthBytes []byte
+}
+
+func (r *SaslAuthenticateResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	if err := pe.putNullableString(r.ErrorMessage); err != nil {
+		return err
+	}
+	return pe.putBytes(r.SaslAuthBytes)
+}
+
+func (r *SaslAuthenticateResponse) decode(pd packetDecoder, version int16) error {
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+
+	r.Err = KError(kerr)
+
+	if r.ErrorMessage, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	r.SaslAuthBytes, err = pd.getBytes()
+
+	return err
+}
+
+func (r *SaslAuthenticateResponse) key() int16 {
+	return APIKeySASLAuth
+}
+
+func (r *SaslAuthenticateResponse) version() int16 {
+	return 0
+}
+
+func (r *SaslAuthenticateResponse) requiredVersion() KafkaVersion {
+	return V1_0_0_0
+}

+ 22 - 0
sasl_authenticate_response_test.go

@@ -0,0 +1,22 @@
+package sarama
+
+import "testing"
+
+var (
+	saslAuthenticatResponseErr = []byte{
+		0, 58,
+		0, 3, 'e', 'r', 'r',
+		0, 0, 0, 3, 'm', 's', 'g',
+	}
+)
+
+func TestSaslAuthenticateResponse(t *testing.T) {
+
+	response := new(SaslAuthenticateResponse)
+	response.Err = ErrSASLAuthenticationFailed
+	msg := "err"
+	response.ErrorMessage = &msg
+	response.SaslAuthBytes = []byte(`msg`)
+
+	testResponse(t, "authenticate reponse", response, saslAuthenticatResponseErr)
+}

+ 2 - 1
sasl_handshake_request.go

@@ -2,6 +2,7 @@ package sarama
 
 type SaslHandshakeRequest struct {
 	Mechanism string
+	Version   int16
 }
 
 func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
@@ -25,7 +26,7 @@ func (r *SaslHandshakeRequest) key() int16 {
 }
 
 func (r *SaslHandshakeRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion {