Browse Source

Merge pull request #1366 from rubenvp8510/kerberos_support

Add kerberos support
Vlad Gorodetsky 5 years ago
parent
commit
41f00bce7f
11 changed files with 916 additions and 53 deletions
  1. 14 2
      broker.go
  2. 122 1
      broker_test.go
  3. 31 2
      config.go
  4. 81 1
      config_test.go
  5. 6 0
      go.mod
  6. 12 0
      go.sum
  7. 257 0
      gssapi_kerberos.go
  8. 51 0
      kerberos_client.go
  9. 99 0
      kerberos_client_test.go
  10. 120 47
      mockbroker.go
  11. 123 0
      mockkerberos.go

+ 14 - 2
broker.go

@@ -4,6 +4,7 @@ import (
 	"crypto/tls"
 	"encoding/binary"
 	"fmt"
+	metrics "github.com/rcrowley/go-metrics"
 	"io"
 	"net"
 	"sort"
@@ -12,8 +13,6 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
-
-	metrics "github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -47,6 +46,8 @@ type Broker struct {
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
+
+	kerberosAuthenticator GSSAPIKerberosAuth
 }
 
 // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
@@ -61,6 +62,7 @@ const (
 	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
 	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
 	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
+	SASLTypeGSSAPI      = "GSSAPI"
 	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
 	// server negotiate SASL auth using opaque packets.
 	SASLHandshakeV0 = int16(0)
@@ -844,11 +846,21 @@ func (b *Broker) authenticateViaSASL() error {
 		return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
 	case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
 		return b.sendAndReceiveSASLSCRAMv1()
+	case SASLTypeGSSAPI:
+		return b.sendAndReceiveKerberos()
 	default:
 		return b.sendAndReceiveSASLPlainAuth()
 	}
 }
 
+func (b *Broker) sendAndReceiveKerberos() error {
+	b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
+	if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
+		b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
+	}
+	return b.kerberosAuthenticator.Authorize(b)
+}
+
 func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
 	rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
 

+ 122 - 1
broker_test.go

@@ -3,12 +3,13 @@ package sarama
 import (
 	"errors"
 	"fmt"
+	"gopkg.in/jcmturner/gokrb5.v7/krberror"
 	"net"
 	"reflect"
 	"testing"
 	"time"
 
-	metrics "github.com/rcrowley/go-metrics"
+	"github.com/rcrowley/go-metrics"
 )
 
 func ExampleBroker() {
@@ -477,6 +478,126 @@ func TestSASLPlainAuth(t *testing.T) {
 	}
 }
 
+func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
+
+	testTable := []struct {
+		name               string
+		error              error
+		mockKerberosClient bool
+		errorStage         string
+		badResponse        bool
+		badKeyChecksum     bool
+	}{
+		{
+			name:               "Kerberos authentication success",
+			error:              nil,
+			mockKerberosClient: true,
+		},
+		{
+			name: "Kerberos login fails",
+			error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
+				"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
+				"cation information was invalid - PREAUTH_FAILED"),
+			mockKerberosClient: true,
+			errorStage:         "login",
+		},
+		{
+			name: "Kerberos service ticket fails",
+			error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
+				"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
+				"cation information was invalid - PREAUTH_FAILED"),
+			mockKerberosClient: true,
+			errorStage:         "service_ticket",
+		},
+		{
+			name:  "Kerberos client creation fails",
+			error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
+		},
+		{
+			name:               "Bad server response, unmarshall key error",
+			error:              errors.New("bytes shorter than header length"),
+			badResponse:        true,
+			mockKerberosClient: true,
+		},
+		{
+			name:               "Bad token checksum",
+			error:              errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
+			badResponse:        false,
+			badKeyChecksum:     true,
+			mockKerberosClient: true,
+		},
+	}
+	for i, test := range testTable {
+		mockBroker := NewMockBroker(t, 0)
+		// broker executes SASL requests against mockBroker
+
+		mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
+			return nil
+		})
+		broker := NewBroker(mockBroker.Addr())
+		broker.requestRate = metrics.NilMeter{}
+		broker.outgoingByteRate = metrics.NilMeter{}
+		broker.incomingByteRate = metrics.NilMeter{}
+		broker.requestSize = metrics.NilHistogram{}
+		broker.responseSize = metrics.NilHistogram{}
+		broker.responseRate = metrics.NilMeter{}
+		broker.requestLatency = metrics.NilHistogram{}
+		conf := NewConfig()
+		conf.Net.SASL.Mechanism = SASLTypeGSSAPI
+		conf.Net.SASL.GSSAPI.ServiceName = "kafka"
+		conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
+		conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
+		conf.Net.SASL.GSSAPI.Username = "kafka"
+		conf.Net.SASL.GSSAPI.Password = "kafka"
+		conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
+		conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+		broker.conf = conf
+		broker.conf.Version = V1_0_0_0
+		dialer := net.Dialer{
+			Timeout:   conf.Net.DialTimeout,
+			KeepAlive: conf.Net.KeepAlive,
+			LocalAddr: conf.Net.LocalAddr,
+		}
+
+		conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
+
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		gssapiHandler := KafkaGSSAPIHandler{
+			client:         &MockKerberosClient{},
+			badResponse:    test.badResponse,
+			badKeyChecksum: test.badKeyChecksum,
+		}
+		mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
+		broker.conn = conn
+		if test.mockKerberosClient {
+			broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
+				return &MockKerberosClient{
+					mockError:  test.error,
+					errorStage: test.errorStage,
+				}, nil
+			}
+		} else {
+			broker.kerberosAuthenticator.NewKerberosClientFunc = nil
+		}
+
+		err = broker.authenticateViaSASL()
+
+		if err != nil && test.error != nil {
+			if test.error.Error() != err.Error() {
+				t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
+			}
+		} else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
+			t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
+		}
+
+		mockBroker.Close()
+	}
+
+}
+
 func TestBuildClientInitialResponse(t *testing.T) {
 
 	testTable := []struct {

+ 31 - 2
config.go

@@ -75,6 +75,8 @@ type Config struct {
 			// AccessTokenProvider interface docs for proper implementation
 			// guidelines.
 			TokenProvider AccessTokenProvider
+
+			GSSAPI GSSAPIConfig
 		}
 
 		// KeepAlive specifies the keep-alive period for an active network connection.
@@ -527,9 +529,36 @@ func (c *Config) Validate() error {
 			if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
 				return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
 			}
+		case SASLTypeGSSAPI:
+			if c.Net.SASL.GSSAPI.ServiceName == "" {
+				return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
+			}
+
+			if c.Net.SASL.GSSAPI.AuthType == KRB5_USER_AUTH {
+				if c.Net.SASL.GSSAPI.Password == "" {
+					return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
+						"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
+				}
+			} else if c.Net.SASL.GSSAPI.AuthType == KRB5_KEYTAB_AUTH {
+				if c.Net.SASL.GSSAPI.KeyTabPath == "" {
+					return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
+						" and  Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
+				}
+			} else {
+				return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH")
+			}
+			if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
+				return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
+			}
+			if c.Net.SASL.GSSAPI.Username == "" {
+				return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
+			}
+			if c.Net.SASL.GSSAPI.Realm == "" {
+				return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
+			}
 		default:
-			msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`",
-				SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512)
+			msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
+				SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
 			return ConfigurationError(msg)
 		}
 	}

+ 81 - 1
config_test.go

@@ -91,7 +91,7 @@ func TestNetConfigValidates(t *testing.T) {
 				cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
 				cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
 			},
-			"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"},
+			"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
 		{"SASL.Mechanism.OAUTHBEARER - Missing token provider",
 			func(cfg *Config) {
 				cfg.Net.SASL.Enable = true
@@ -117,6 +117,86 @@ func TestNetConfigValidates(t *testing.T) {
 				cfg.Net.SASL.Password = "stong_password"
 			},
 			"A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+			},
+			"Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
+				"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+			},
+			"Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
+				" and  Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+				cfg.Net.SASL.GSSAPI.Password = "sarama"
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+			},
+			"Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.Password = "sarama"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+			},
+			"Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.Password = "sarama"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+			},
+			"Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.Password = "sarama"
+				cfg.Net.SASL.GSSAPI.Realm = "kafka"
+			},
+			"Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
+		{"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
+			func(cfg *Config) {
+				cfg.Net.SASL.Enable = true
+				cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
+				cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
+				cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+				cfg.Net.SASL.GSSAPI.Username = "sarama"
+				cfg.Net.SASL.GSSAPI.Password = "sarama"
+				cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+
+			},
+			"Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
 	}
 
 	for i, test := range tests {

+ 6 - 0
go.mod

@@ -8,6 +8,8 @@ require (
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
 	github.com/eapache/queue v1.1.0
 	github.com/golang/snappy v0.0.1 // indirect
+	github.com/hashicorp/go-uuid v1.0.1 // indirect
+	github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
 	github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
 	github.com/stretchr/testify v1.3.0
@@ -15,4 +17,8 @@ require (
 	github.com/xdg/stringprep v1.0.0 // indirect
 	golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
 	golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
+	gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/gokrb5.v7 v7.2.3
+	gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
 )

+ 12 - 0
go.sum

@@ -13,6 +13,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
+github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
 github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
 github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
 github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
@@ -37,3 +41,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
+gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
+gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
+gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
+gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=

+ 257 - 0
gssapi_kerberos.go

@@ -0,0 +1,257 @@
+package sarama
+
+import (
+	"encoding/binary"
+	"fmt"
+	"github.com/jcmturner/gofork/encoding/asn1"
+	"gopkg.in/jcmturner/gokrb5.v7/asn1tools"
+	"gopkg.in/jcmturner/gokrb5.v7/gssapi"
+	"gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
+	"gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
+	"gopkg.in/jcmturner/gokrb5.v7/messages"
+	"gopkg.in/jcmturner/gokrb5.v7/types"
+	"io"
+	"strings"
+	"time"
+)
+
+const (
+	TOK_ID_KRB_AP_REQ   = 256
+	GSS_API_GENERIC_TAG = 0x60
+	KRB5_USER_AUTH      = 1
+	KRB5_KEYTAB_AUTH    = 2
+	GSS_API_INITIAL     = 1
+	GSS_API_VERIFY      = 2
+	GSS_API_FINISH      = 3
+)
+
+type GSSAPIConfig struct {
+	AuthType           int
+	KeyTabPath         string
+	KerberosConfigPath string
+	ServiceName        string
+	Username           string
+	Password           string
+	Realm              string
+}
+
+type GSSAPIKerberosAuth struct {
+	Config                *GSSAPIConfig
+	ticket                messages.Ticket
+	encKey                types.EncryptionKey
+	NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
+	step                  int
+}
+
+type KerberosClient interface {
+	Login() error
+	GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
+	Domain() string
+	CName() types.PrincipalName
+	Destroy()
+}
+
+/*
+*
+* Appends length in big endian before payload, and send it to kafka
+*
+ */
+
+func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
+	length := len(payload)
+	finalPackage := make([]byte, length+4) //4 byte length header + payload
+	copy(finalPackage[4:], payload)
+	binary.BigEndian.PutUint32(finalPackage, uint32(length))
+	bytes, err := broker.conn.Write(finalPackage)
+	if err != nil {
+		return bytes, err
+	}
+	return bytes, nil
+}
+
+/*
+*
+* Read length (4 bytes) and then read the payload
+*
+ */
+
+func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
+	bytesRead := 0
+	lengthInBytes := make([]byte, 4)
+	bytes, err := io.ReadFull(broker.conn, lengthInBytes)
+	if err != nil {
+		return nil, bytesRead, err
+	}
+	bytesRead += bytes
+	payloadLength := binary.BigEndian.Uint32(lengthInBytes)
+	payloadBytes := make([]byte, payloadLength)         // buffer for read..
+	bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
+	if err != nil {
+		return payloadBytes, bytesRead, err
+	}
+	bytesRead += bytes
+	return payloadBytes, bytesRead, nil
+}
+
+func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
+	a := make([]byte, 24)
+	flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
+	binary.LittleEndian.PutUint32(a[:4], 16)
+	for _, i := range flags {
+		f := binary.LittleEndian.Uint32(a[20:24])
+		f |= uint32(i)
+		binary.LittleEndian.PutUint32(a[20:24], f)
+	}
+	return a
+}
+
+/*
+*
+* Construct Kerberos AP_REQ package, conforming to RFC-4120
+* https://tools.ietf.org/html/rfc4120#page-84
+*
+ */
+func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
+	domain string, cname types.PrincipalName,
+	ticket messages.Ticket,
+	sessionKey types.EncryptionKey) ([]byte, error) {
+	auth, err := types.NewAuthenticator(domain, cname)
+	if err != nil {
+		return nil, err
+	}
+	auth.Cksum = types.Checksum{
+		CksumType: chksumtype.GSSAPI,
+		Checksum:  krbAuth.newAuthenticatorChecksum(),
+	}
+	APReq, err := messages.NewAPReq(
+		ticket,
+		sessionKey,
+		auth,
+	)
+	if err != nil {
+		return nil, err
+	}
+	aprBytes := make([]byte, 2)
+	binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
+	tb, err := APReq.Marshal()
+	if err != nil {
+		return nil, err
+	}
+	aprBytes = append(aprBytes, tb...)
+	return aprBytes, nil
+}
+
+/*
+*
+*	Append the GSS-API header to the payload, conforming to RFC-2743
+*	Section 3.1, Mechanism-Independent Token Format
+*
+*	https://tools.ietf.org/html/rfc2743#page-81
+*
+*	GSSAPIHeader + <specific mechanism payload>
+*
+ */
+func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
+	oidBytes, err := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
+	if err != nil {
+		return nil, err
+	}
+	tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
+	GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
+	GSSHeader = append(GSSHeader, oidBytes...)
+	GSSPackage := append(GSSHeader, payload...)
+	return GSSPackage, nil
+}
+
+func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
+	switch krbAuth.step {
+	case GSS_API_INITIAL:
+		aprBytes, err := krbAuth.createKrb5Token(
+			kerberosClient.Domain(),
+			kerberosClient.CName(),
+			krbAuth.ticket,
+			krbAuth.encKey)
+		if err != nil {
+			return nil, err
+		}
+		krbAuth.step = GSS_API_VERIFY
+		return krbAuth.appendGSSAPIHeader(aprBytes)
+	case GSS_API_VERIFY:
+		wrapTokenReq := gssapi.WrapToken{}
+		if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
+			return nil, err
+		}
+		// Validate response.
+		isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
+		if !isValid {
+			return nil, err
+		}
+
+		wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
+		if err != nil {
+			return nil, err
+		}
+		krbAuth.step = GSS_API_FINISH
+		return wrapTokenResponse.Marshal()
+	}
+	return nil, nil
+}
+
+/* This does the handshake for authorization */
+func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
+
+	kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
+	if err != nil {
+		Logger.Printf("Kerberos client error: %s", err)
+		return err
+	}
+
+	err = kerberosClient.Login()
+	if err != nil {
+		Logger.Printf("Kerberos client error: %s", err)
+		return err
+	}
+	// Construct SPN using serviceName and host
+	// SPN format: <SERVICE>/<FQDN>
+
+	host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
+	spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
+
+	ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
+
+	if err != nil {
+		Logger.Printf("Error getting Kerberos service ticket : %s", err)
+		return err
+	}
+	krbAuth.ticket = ticket
+	krbAuth.encKey = encKey
+	krbAuth.step = GSS_API_INITIAL
+	var receivedBytes []byte = nil
+	defer kerberosClient.Destroy()
+	for {
+		packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
+		if err != nil {
+			Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+			return err
+		}
+		requestTime := time.Now()
+		bytesWritten, err := krbAuth.writePackage(broker, packBytes)
+		if err != nil {
+			Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+			return err
+		}
+		broker.updateOutgoingCommunicationMetrics(bytesWritten)
+		if krbAuth.step == GSS_API_VERIFY {
+			var bytesRead = 0
+			receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
+			requestLatency := time.Since(requestTime)
+			broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+			if err != nil {
+				Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+				return err
+			}
+		} else if krbAuth.step == GSS_API_FINISH {
+			return nil
+		}
+	}
+}

+ 51 - 0
kerberos_client.go

@@ -0,0 +1,51 @@
+package sarama
+
+import (
+	krb5client "gopkg.in/jcmturner/gokrb5.v7/client"
+	krb5config "gopkg.in/jcmturner/gokrb5.v7/config"
+	"gopkg.in/jcmturner/gokrb5.v7/keytab"
+	"gopkg.in/jcmturner/gokrb5.v7/types"
+)
+
+type KerberosGoKrb5Client struct {
+	krb5client.Client
+}
+
+func (c *KerberosGoKrb5Client) Domain() string {
+	return c.Credentials.Domain()
+}
+
+func (c *KerberosGoKrb5Client) CName() types.PrincipalName {
+	return c.Credentials.CName()
+}
+
+/*
+*
+* Create kerberos client used to obtain TGT and TGS tokens
+* used gokrb5 library, which is a pure go kerberos client with
+* some GSS-API capabilities, and SPNEGO support. Kafka does not use SPNEGO
+* it uses pure Kerberos 5 solution (RFC-4121 and RFC-4120).
+*
+ */
+func NewKerberosClient(config *GSSAPIConfig) (KerberosClient, error) {
+	cfg, err := krb5config.Load(config.KerberosConfigPath)
+	if err != nil {
+		return nil, err
+	}
+	return createClient(config, cfg)
+}
+
+func createClient(config *GSSAPIConfig, cfg *krb5config.Config) (KerberosClient, error) {
+	var client *krb5client.Client
+	if config.AuthType == KRB5_KEYTAB_AUTH {
+		kt, err := keytab.Load(config.KeyTabPath)
+		if err != nil {
+			return nil, err
+		}
+		client = krb5client.NewClientWithKeytab(config.Username, config.Realm, kt, cfg)
+	} else {
+		client = krb5client.NewClientWithPassword(config.Username,
+			config.Realm, config.Password, cfg)
+	}
+	return &KerberosGoKrb5Client{*client}, nil
+}

+ 99 - 0
kerberos_client_test.go

@@ -0,0 +1,99 @@
+package sarama
+
+import (
+	"errors"
+	krbcfg "gopkg.in/jcmturner/gokrb5.v7/config"
+	"gopkg.in/jcmturner/gokrb5.v7/test/testdata"
+	"testing"
+)
+
+/*
+ * Minimum requirement for client creation
+ * we are not testing the client itself, we only test that the client is created
+ * properly.
+ *
+ */
+const TEST_KRB5CONF = `
+[libdefaults]
+default_realm = EXAMPLE.COM
+
+[realms]
+EXAMPLE.COM = {
+kdc = kerberos.example.com
+admin_server = kerberos.example.com
+}
+
+[domain_realm]
+.example.com = EXAMPLE.COM
+example.com = EXAMPLE.COM
+`
+
+func TestFaildToCreateKerberosConfig(t *testing.T) {
+	expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory")
+	clientConfig := NewConfig()
+	clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI
+	clientConfig.Net.SASL.Enable = true
+	clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka"
+	clientConfig.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
+	clientConfig.Net.SASL.GSSAPI.Username = "client"
+	clientConfig.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+	clientConfig.Net.SASL.GSSAPI.Password = "qwerty"
+	clientConfig.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
+	_, err := NewKerberosClient(&clientConfig.Net.SASL.GSSAPI)
+	// Expect to create client with password
+	if err.Error() != expectedErr.Error() {
+		t.Errorf("Expected error:%s, got:%s.", err, expectedErr)
+	}
+}
+
+func TestCreateWithPassword(t *testing.T) {
+	kerberosConfig, err := krbcfg.NewConfigFromString(testdata.TEST_KRB5CONF)
+	if err != nil {
+		t.Fatal(err)
+	}
+	expectedDoman := "EXAMPLE.COM"
+	expectedCName := "client"
+
+	clientConfig := NewConfig()
+	clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI
+	clientConfig.Net.SASL.Enable = true
+	clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka"
+	clientConfig.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
+	clientConfig.Net.SASL.GSSAPI.Username = "client"
+	clientConfig.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
+	clientConfig.Net.SASL.GSSAPI.Password = "qwerty"
+	clientConfig.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+	client, _ := createClient(&clientConfig.Net.SASL.GSSAPI, kerberosConfig)
+	// Expect to create client with password
+	if client == nil {
+		t.Errorf("Expected client not nil")
+	}
+	if client.Domain() != expectedDoman {
+		t.Errorf("Client domain: %s, got: %s", expectedDoman, client.Domain())
+	}
+	if client.CName().NameString[0] != expectedCName {
+		t.Errorf("Client domain:%s, got: %s", expectedCName, client.CName().NameString[0])
+	}
+}
+
+func TestCreateWithKeyTab(t *testing.T) {
+	kerberosConfig, err := krbcfg.NewConfigFromString(testdata.TEST_KRB5CONF)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Expect to try to create a client with keytab and fails with "o such file or directory" error
+	expectedErr := errors.New("open nonexist.keytab: no such file or directory")
+	clientConfig := NewConfig()
+	clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI
+	clientConfig.Net.SASL.Enable = true
+	clientConfig.Net.SASL.GSSAPI.ServiceName = "kafka"
+	clientConfig.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
+	clientConfig.Net.SASL.GSSAPI.Username = "client"
+	clientConfig.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
+	clientConfig.Net.SASL.GSSAPI.KeyTabPath = "nonexist.keytab"
+	clientConfig.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
+	_, err = createClient(&clientConfig.Net.SASL.GSSAPI, kerberosConfig)
+	if err.Error() != expectedErr.Error() {
+		t.Errorf("Expected error:%s, got:%s.", err, expectedErr)
+	}
+}

+ 120 - 47
mockbroker.go

@@ -18,6 +18,8 @@ const (
 	expectationTimeout = 500 * time.Millisecond
 )
 
+type GSSApiHandlerFunc func([]byte) []byte
+
 type requestHandlerFunc func(req *request) (res encoder)
 
 // RequestNotifierFunc is invoked when a mock broker processes a request successfully
@@ -49,18 +51,19 @@ type RequestNotifierFunc func(bytesRead, bytesWritten int)
 // It is not necessary to prefix message length or correlation ID to your
 // response bytes, the server does that automatically as a convenience.
 type MockBroker struct {
-	brokerID     int32
-	port         int32
-	closing      chan none
-	stopper      chan none
-	expectations chan encoder
-	listener     net.Listener
-	t            TestReporter
-	latency      time.Duration
-	handler      requestHandlerFunc
-	notifier     RequestNotifierFunc
-	history      []RequestResponse
-	lock         sync.Mutex
+	brokerID      int32
+	port          int32
+	closing       chan none
+	stopper       chan none
+	expectations  chan encoder
+	listener      net.Listener
+	t             TestReporter
+	latency       time.Duration
+	handler       requestHandlerFunc
+	notifier      RequestNotifierFunc
+	history       []RequestResponse
+	lock          sync.Mutex
+	gssApiHandler GSSApiHandlerFunc
 }
 
 // RequestResponse represents a Request/Response pair processed by MockBroker.
@@ -173,6 +176,43 @@ func (b *MockBroker) serverLoop() {
 	Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
 }
 
+func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
+	b.gssApiHandler = handler
+}
+
+func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
+	var (
+		bytesRead   int
+		lengthBytes = make([]byte, 4)
+	)
+
+	if _, err := io.ReadFull(r, lengthBytes); err != nil {
+		return nil, err
+	}
+
+	bytesRead += len(lengthBytes)
+	length := int32(binary.BigEndian.Uint32(lengthBytes))
+
+	if length <= 4 || length > MaxRequestSize {
+		return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
+	}
+
+	encodedReq := make([]byte, length)
+	if _, err := io.ReadFull(r, encodedReq); err != nil {
+		return nil, err
+	}
+
+	bytesRead += len(encodedReq)
+
+	fullBytes := append(lengthBytes, encodedReq...)
+
+	return fullBytes, nil
+}
+
+func (b *MockBroker) isGSSAPI(buffer []byte) bool {
+	return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
+}
+
 func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
 	defer wg.Done()
 	defer func() {
@@ -192,59 +232,92 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 	}()
 
 	resHeader := make([]byte, 8)
+	var bytesWritten int
+	var bytesRead int
 	for {
-		req, bytesRead, err := decodeRequest(conn)
+
+		buffer, err := b.readToBytes(conn)
 		if err != nil {
-			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
 			b.serverError(err)
 			break
 		}
 
-		if b.latency > 0 {
-			time.Sleep(b.latency)
-		}
+		bytesWritten = 0
+		if !b.isGSSAPI(buffer) {
 
-		b.lock.Lock()
-		res := b.handler(req)
-		b.history = append(b.history, RequestResponse{req.body, res})
-		b.lock.Unlock()
+			req, br, err := decodeRequest(bytes.NewReader(buffer))
+			bytesRead = br
+			if err != nil {
+				Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+				b.serverError(err)
+				break
+			}
 
-		if res == nil {
-			Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
-			continue
-		}
-		Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
+			if b.latency > 0 {
+				time.Sleep(b.latency)
+			}
 
-		encodedRes, err := encode(res, nil)
-		if err != nil {
-			b.serverError(err)
-			break
-		}
-		if len(encodedRes) == 0 {
 			b.lock.Lock()
-			if b.notifier != nil {
-				b.notifier(bytesRead, 0)
-			}
+			res := b.handler(req)
+			b.history = append(b.history, RequestResponse{req.body, res})
 			b.lock.Unlock()
-			continue
-		}
 
-		binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
-		if _, err = conn.Write(resHeader); err != nil {
-			b.serverError(err)
-			break
-		}
-		if _, err = conn.Write(encodedRes); err != nil {
-			b.serverError(err)
-			break
+			if res == nil {
+				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
+				continue
+			}
+			Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
+
+			encodedRes, err := encode(res, nil)
+			if err != nil {
+				b.serverError(err)
+				break
+			}
+			if len(encodedRes) == 0 {
+				b.lock.Lock()
+				if b.notifier != nil {
+					b.notifier(bytesRead, 0)
+				}
+				b.lock.Unlock()
+				continue
+			}
+
+			binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
+			binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+			if _, err = conn.Write(resHeader); err != nil {
+				b.serverError(err)
+				break
+			}
+			if _, err = conn.Write(encodedRes); err != nil {
+				b.serverError(err)
+				break
+			}
+			bytesWritten = len(resHeader) + len(encodedRes)
+
+		} else {
+			// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
+			// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
+			b.lock.Lock()
+			res := b.gssApiHandler(buffer)
+			b.lock.Unlock()
+			if res == nil {
+				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
+				continue
+			}
+			if _, err = conn.Write(res); err != nil {
+				b.serverError(err)
+				break
+			}
+			bytesWritten = len(res)
 		}
 
 		b.lock.Lock()
 		if b.notifier != nil {
-			b.notifier(bytesRead, len(resHeader)+len(encodedRes))
+			b.notifier(bytesRead, bytesWritten)
 		}
 		b.lock.Unlock()
+
 	}
 	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }

+ 123 - 0
mockkerberos.go

@@ -0,0 +1,123 @@
+package sarama
+
+import (
+	"encoding/binary"
+	"encoding/hex"
+	"gopkg.in/jcmturner/gokrb5.v7/credentials"
+	"gopkg.in/jcmturner/gokrb5.v7/gssapi"
+	"gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
+	"gopkg.in/jcmturner/gokrb5.v7/messages"
+	"gopkg.in/jcmturner/gokrb5.v7/types"
+)
+
+type KafkaGSSAPIHandler struct {
+	client         *MockKerberosClient
+	badResponse    bool
+	badKeyChecksum bool
+}
+
+func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte {
+	// Default payload used for verify
+	err := h.client.Login() // Mock client construct keys when login
+	if err != nil {
+		return nil
+	}
+	if h.badResponse { // Returns trash
+		return []byte{0x00, 0x00, 0x00, 0x01, 0xAD}
+	}
+
+	var pack = gssapi.WrapToken{
+		Flags:     KRB5_USER_AUTH,
+		EC:        12,
+		RRC:       0,
+		SndSeqNum: 3398292281,
+		Payload:   []byte{0x11, 0x00}, // 1100
+	}
+	// Compute checksum
+	if h.badKeyChecksum {
+		pack.CheckSum = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
+	} else {
+		err = pack.SetCheckSum(h.client.ASRep.DecryptedEncPart.Key, keyusage.GSSAPI_ACCEPTOR_SEAL)
+		if err != nil {
+			return nil
+		}
+	}
+
+	packBytes, err := pack.Marshal()
+	if err != nil {
+		return nil
+	}
+	lenBytes := len(packBytes)
+	response := make([]byte, lenBytes+4)
+	copy(response[4:], packBytes)
+	binary.BigEndian.PutUint32(response, uint32(lenBytes))
+	return response
+}
+
+type MockKerberosClient struct {
+	asReqBytes  string
+	asRepBytes  string
+	ASRep       messages.ASRep
+	credentials *credentials.Credentials
+	mockError   error
+	errorStage  string
+}
+
+func (c *MockKerberosClient) Login() error {
+	if c.errorStage == "login" && c.mockError != nil {
+		return c.mockError
+	}
+	c.asRepBytes = "6b8202e9308202e5a003020105a10302010ba22b30293027a103020113a220041e301c301aa003020112a1131b114" +
+		"558414d504c452e434f4d636c69656e74a30d1b0b4558414d504c452e434f4da4133011a003020101a10a30081b06636c69656e7" +
+		"4a5820156618201523082014ea003020105a10d1b0b4558414d504c452e434f4da220301ea003020102a11730151b066b7262746" +
+		"7741b0b4558414d504c452e434f4da382011430820110a003020112a103020101a28201020481ffdb9891175d106818e61008c51" +
+		"d0b3462bca92f3bf9d4cfa82de4c4d7aff9994ec87c573e3a3d54dcb2bb79618c76f2bf4a3d006f90d5bdbd049bc18f48be39203" +
+		"549ca02acaf63f292b12404f9b74c34b83687119d8f56552ccc0c50ebee2a53bb114c1b4619bb1d5d31f0f49b4d40a08a9b4c046" +
+		"2e1398d0b648be1c0e50c552ad16e1d8d8e74263dd0bf0ec591e4797dfd40a9a1be4ae830d03a306e053fd7586fef84ffc5e4a83" +
+		"7c3122bf3e6a40fe87e84019f6283634461b955712b44a5f7386c278bff94ec2c2dc0403247e29c2450e853471ceababf9b8911f" +
+		"997f2e3010b046d2c49eb438afb0f4c210821e80d4ffa4c9521eb895dcd68610b3feaa682012c30820128a003020112a282011f0" +
+		"482011bce73cbce3f1dd17661c412005f0f2257c756fe8e98ff97e6ec24b7bab66e5fd3a3827aeeae4757af0c6e892948122d8b2" +
+		"03c8df48df0ef5d142d0e416d688f11daa0fcd63d96bdd431d02b8e951c664eeff286a2be62383d274a04016d5f0e141da58cb86" +
+		"331de64063062f4f885e8e9ce5b181ca2fdc67897c5995e0ae1ae0c171a64493ff7bd91bc6d89cd4fce1e2b3ea0a10e34b0d5eda" +
+		"aa38ee727b50c5632ed1d2f2b457908e616178d0d80b72af209fb8ac9dbaa1768fa45931392b36b6d8c12400f8ded2efaa0654d0" +
+		"da1db966e8b5aab4706c800f95d559664646041fdb38b411c62fc0fbe0d25083a28562b0e1c8df16e62e9d5626b0addee489835f" +
+		"eedb0f26c05baa596b69b17f47920aa64b29dc77cfcc97ba47885"
+	apRepBytes, err := hex.DecodeString(c.asRepBytes)
+	if err != nil {
+		return err
+	}
+	err = c.ASRep.Unmarshal(apRepBytes)
+	if err != nil {
+		return err
+	}
+	c.credentials = credentials.New("client", "EXAMPLE.COM").WithPassword("qwerty")
+	_, err = c.ASRep.DecryptEncPart(c.credentials)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error) {
+	if c.errorStage == "service_ticket" && c.mockError != nil {
+		return messages.Ticket{}, types.EncryptionKey{}, c.mockError
+	}
+	return c.ASRep.Ticket, c.ASRep.DecryptedEncPart.Key, nil
+}
+
+func (c *MockKerberosClient) Domain() string {
+	return "EXAMPLE.COM"
+}
+func (c *MockKerberosClient) CName() types.PrincipalName {
+	var p = types.PrincipalName{
+		NameType: KRB5_USER_AUTH,
+		NameString: []string{
+			"kafka",
+			"kafka",
+		},
+	}
+	return p
+}
+func (c *MockKerberosClient) Destroy() {
+	// Do nothing.
+}