123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 |
- package sarama
- import (
- "bytes"
- "errors"
- "fmt"
- "net"
- "reflect"
- "testing"
- "time"
- "github.com/rcrowley/go-metrics"
- "gopkg.in/jcmturner/gokrb5.v7/krberror"
- )
- func ExampleBroker() {
- broker := NewBroker("localhost:9092")
- err := broker.Open(nil)
- if err != nil {
- panic(err)
- }
- request := MetadataRequest{Topics: []string{"myTopic"}}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- _ = broker.Close()
- panic(err)
- }
- fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
- if err = broker.Close(); err != nil {
- panic(err)
- }
- }
- type mockEncoder struct {
- bytes []byte
- }
- func (m mockEncoder) encode(pe packetEncoder) error {
- return pe.putRawBytes(m.bytes)
- }
- type brokerMetrics struct {
- bytesRead int
- bytesWritten int
- }
- func TestBrokerAccessors(t *testing.T) {
- broker := NewBroker("abc:123")
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- if broker.Addr() != "abc:123" {
- t.Error("New broker didn't have the correct address")
- }
- if broker.Rack() != "" {
- t.Error("New broker didn't have an unknown rack.")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- rack := "dc1"
- broker.rack = &rack
- if broker.Rack() != rack {
- t.Error("Manually setting broker rack did not take effect.")
- }
- }
- func TestSimpleBrokerCommunication(t *testing.T) {
- for _, tt := range brokerTestTable {
- Logger.Printf("Testing broker communication for %s", tt.name)
- mb := NewMockBroker(t, 0)
- mb.Returns(&mockEncoder{tt.response})
- pendingNotify := make(chan brokerMetrics)
- // Register a callback to be notified about successful requests
- mb.SetNotifier(func(bytesRead, bytesWritten int) {
- pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
- })
- broker := NewBroker(mb.Addr())
- // Set the broker id in order to validate local broker metrics
- broker.id = 0
- conf := NewConfig()
- conf.Version = tt.version
- err := broker.Open(conf)
- if err != nil {
- t.Fatal(err)
- }
- tt.runner(t, broker)
- // Wait up to 500 ms for the remote broker to process the request and
- // notify us about the metrics
- timeout := 500 * time.Millisecond
- select {
- case mockBrokerMetrics := <-pendingNotify:
- validateBrokerMetrics(t, broker, mockBrokerMetrics)
- case <-time.After(timeout):
- t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
- }
- mb.Close()
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- }
- }
- var ErrTokenFailure = errors.New("Failure generating token")
- type TokenProvider struct {
- accessToken *AccessToken
- err error
- }
- func (t *TokenProvider) Token() (*AccessToken, error) {
- return t.accessToken, t.err
- }
- func newTokenProvider(token *AccessToken, err error) *TokenProvider {
- return &TokenProvider{
- accessToken: token,
- err: err,
- }
- }
- func TestSASLOAuthBearer(t *testing.T) {
- testTable := []struct {
- name string
- authidentity string
- mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
- mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
- expectClientErr bool // Expect an internal client-side error
- expectedBrokerError KError // Expected Kafka error returned by client
- tokProvider *TokenProvider
- }{
- {
- name: "SASL/OAUTHBEARER OK server response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: false,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER authentication failure response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSequence(
- // First, the broker response with a challenge
- NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
- // Next, the client terminates the token exchange. Finally, the
- // broker responds with an error message.
- NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
- SetError(ErrSASLAuthenticationFailed),
- ),
- expectClientErr: true,
- expectedBrokerError: ErrSASLAuthenticationFailed,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER handshake failure response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}).
- SetError(ErrSASLAuthenticationFailed),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrSASLAuthenticationFailed,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER token generation error",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
- },
- {
- name: "SASL/OAUTHBEARER invalid extension",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{
- Token: "access-token-123",
- Extensions: map[string]string{"auth": "auth-value"},
- }, nil),
- },
- }
- for i, test := range testTable {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": test.mockSASLAuthResponse,
- "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
- })
- // broker executes SASL requests against mockBroker
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeOAuth
- conf.Net.SASL.TokenProvider = test.tokProvider
- broker.conf = conf
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if test.expectedBrokerError != ErrNoError {
- if test.expectedBrokerError != err {
- t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
- }
- } else if test.expectedBrokerError != ErrNoError {
- if test.expectedBrokerError != err {
- t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.expectedBrokerError, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- }
- }
- // A mock scram client.
- type MockSCRAMClient struct {
- done bool
- }
- func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
- return nil
- }
- func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
- if challenge == "" {
- return "ping", nil
- }
- if challenge == "pong" {
- m.done = true
- return "", nil
- }
- return "", errors.New("failed to authenticate :(")
- }
- func (m *MockSCRAMClient) Done() bool {
- return m.done
- }
- var _ SCRAMClient = &MockSCRAMClient{}
- func TestSASLSCRAMSHAXXX(t *testing.T) {
- testTable := []struct {
- name string
- mockHandshakeErr KError
- mockSASLAuthErr KError
- expectClientErr bool
- scramClient *MockSCRAMClient
- scramChallengeResp string
- }{
- {
- name: "SASL/SCRAMSHAXXX successfull authentication",
- mockHandshakeErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- {
- name: "SASL/SCRAMSHAXXX SCRAM client step error client",
- mockHandshakeErr: ErrNoError,
- mockSASLAuthErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "gong",
- expectClientErr: true,
- },
- {
- name: "SASL/SCRAMSHAXXX server authentication error",
- mockHandshakeErr: ErrNoError,
- mockSASLAuthErr: ErrSASLAuthenticationFailed,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- {
- name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
- mockHandshakeErr: ErrUnsupportedSASLMechanism,
- mockSASLAuthErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- }
- for i, test := range testTable {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- broker := NewBroker(mockBroker.Addr())
- // broker executes SASL requests against mockBroker
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
- mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
- if test.mockSASLAuthErr != ErrNoError {
- mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
- }
- if test.mockHandshakeErr != ErrNoError {
- mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
- }
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- "SaslHandshakeRequest": mockSASLHandshakeResponse,
- })
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
- conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
- broker.conf = conf
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if test.mockSASLAuthErr != ErrNoError {
- if test.mockSASLAuthErr != err {
- t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.mockHandshakeErr != ErrNoError {
- if test.mockHandshakeErr != err {
- t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- }
- }
- func TestSASLPlainAuth(t *testing.T) {
- testTable := []struct {
- name string
- authidentity string
- mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
- mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
- expectClientErr bool // Expect an internal client-side error
- }{
- {
- name: "SASL Plain OK server response",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain OK server response with authidentity",
- authidentity: "authid",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain authentication failure response",
- mockAuthErr: ErrSASLAuthenticationFailed,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain handshake failure response",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrSASLAuthenticationFailed,
- },
- }
- for i, test := range testTable {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`response_payload`))
- if test.mockAuthErr != ErrNoError {
- mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
- }
- mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypePlaintext})
- if test.mockHandshakeErr != ErrNoError {
- mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
- }
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- "SaslHandshakeRequest": mockSASLHandshakeResponse,
- })
- // broker executes SASL requests against mockBroker
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypePlaintext
- conf.Net.SASL.AuthIdentity = test.authidentity
- conf.Net.SASL.User = "token"
- conf.Net.SASL.Password = "password"
- conf.Net.SASL.Version = SASLHandshakeV1
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if err == nil {
- for _, rr := range mockBroker.History() {
- switch r := rr.Request.(type) {
- case *SaslAuthenticateRequest:
- x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
- if string(x[0]) != conf.Net.SASL.AuthIdentity {
- t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
- }
- if string(x[1]) != conf.Net.SASL.User {
- t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
- }
- if string(x[2]) != conf.Net.SASL.Password {
- t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
- }
- }
- }
- }
- if test.mockAuthErr != ErrNoError {
- if test.mockAuthErr != err {
- t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
- }
- } else if test.mockHandshakeErr != ErrNoError {
- if test.mockHandshakeErr != err {
- t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- }
- }
- // TestSASLReadTimeout ensures that the broker connection won't block forever
- // if the remote end never responds after the handshake
- func TestSASLReadTimeout(t *testing.T) {
- mockBroker := NewMockBroker(t, 0)
- defer mockBroker.Close()
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`response_payload`))
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- })
- broker := NewBroker(mockBroker.Addr())
- {
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- }
- conf := NewConfig()
- {
- conf.Net.ReadTimeout = time.Millisecond
- conf.Net.SASL.Mechanism = SASLTypePlaintext
- conf.Net.SASL.User = "token"
- conf.Net.SASL.Password = "password"
- conf.Net.SASL.Version = SASLHandshakeV1
- }
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{}
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if err == nil {
- t.Errorf("should never happen - expected read timeout")
- }
- }
- func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
- testTable := []struct {
- name string
- error error
- mockKerberosClient bool
- errorStage string
- badResponse bool
- badKeyChecksum bool
- }{
- {
- name: "Kerberos authentication success",
- error: nil,
- mockKerberosClient: true,
- },
- {
- name: "Kerberos login fails",
- error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
- "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
- "cation information was invalid - PREAUTH_FAILED"),
- mockKerberosClient: true,
- errorStage: "login",
- },
- {
- name: "Kerberos service ticket fails",
- error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
- "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
- "cation information was invalid - PREAUTH_FAILED"),
- mockKerberosClient: true,
- errorStage: "service_ticket",
- },
- {
- name: "Kerberos client creation fails",
- error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
- },
- {
- name: "Bad server response, unmarshall key error",
- error: errors.New("bytes shorter than header length"),
- badResponse: true,
- mockKerberosClient: true,
- },
- {
- name: "Bad token checksum",
- error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
- badResponse: false,
- badKeyChecksum: true,
- mockKerberosClient: true,
- },
- }
- for i, test := range testTable {
- mockBroker := NewMockBroker(t, 0)
- // broker executes SASL requests against mockBroker
- mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
- return nil
- })
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeGSSAPI
- conf.Net.SASL.GSSAPI.ServiceName = "kafka"
- conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
- conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
- conf.Net.SASL.GSSAPI.Username = "kafka"
- conf.Net.SASL.GSSAPI.Password = "kafka"
- conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
- conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- gssapiHandler := KafkaGSSAPIHandler{
- client: &MockKerberosClient{},
- badResponse: test.badResponse,
- badKeyChecksum: test.badKeyChecksum,
- }
- mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
- broker.conn = conn
- if test.mockKerberosClient {
- broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
- return &MockKerberosClient{
- mockError: test.error,
- errorStage: test.errorStage,
- }, nil
- }
- } else {
- broker.kerberosAuthenticator.NewKerberosClientFunc = nil
- }
- err = broker.authenticateViaSASL()
- if err != nil && test.error != nil {
- if test.error.Error() != err.Error() {
- t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
- }
- } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
- t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
- }
- mockBroker.Close()
- }
- }
- func TestBuildClientFirstMessage(t *testing.T) {
- testTable := []struct {
- name string
- token *AccessToken
- expected []byte
- expectError bool
- }{
- {
- name: "Build SASL client initial response with two extensions",
- token: &AccessToken{
- Token: "the-token",
- Extensions: map[string]string{
- "x": "1",
- "y": "2",
- },
- },
- expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
- },
- {
- name: "Build SASL client initial response with no extensions",
- token: &AccessToken{Token: "the-token"},
- expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
- },
- {
- name: "Build SASL client initial response using reserved extension",
- token: &AccessToken{
- Token: "the-token",
- Extensions: map[string]string{
- "auth": "auth-value",
- },
- },
- expected: []byte(""),
- expectError: true,
- },
- }
- for i, test := range testTable {
- actual, err := buildClientFirstMessage(test.token)
- if !reflect.DeepEqual(test.expected, actual) {
- t.Errorf("Expected %s, got %s\n", test.expected, actual)
- }
- if test.expectError && err == nil {
- t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
- }
- if !test.expectError && err != nil {
- t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
- }
- }
- }
- // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
- var brokerTestTable = []struct {
- version KafkaVersion
- name string
- response []byte
- runner func(*testing.T, *Broker)
- }{
- {V0_10_0_0,
- "MetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := MetadataRequest{}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ConsumerMetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ConsumerMetadataRequest{}
- response, err := broker.GetConsumerMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Consumer Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (NoResponse)",
- []byte{},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = NoResponse
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response != nil {
- t.Error("Produce request with NoResponse got a response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (WaitForLocal)",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = WaitForLocal
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Produce request without NoResponse got no response!")
- }
- }},
- {V0_10_0_0,
- "FetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := FetchRequest{}
- response, err := broker.Fetch(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Fetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetFetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetFetchRequest{}
- response, err := broker.FetchOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetFetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetCommitRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetCommitRequest{}
- response, err := broker.CommitOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetCommit request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetRequest{}
- response, err := broker.GetAvailableOffsets(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Offset request got no response!")
- }
- }},
- {V0_10_0_0,
- "JoinGroupRequest",
- []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := JoinGroupRequest{}
- response, err := broker.JoinGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("JoinGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "SyncGroupRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := SyncGroupRequest{}
- response, err := broker.SyncGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("SyncGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "LeaveGroupRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := LeaveGroupRequest{}
- response, err := broker.LeaveGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("LeaveGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "HeartbeatRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := HeartbeatRequest{}
- response, err := broker.Heartbeat(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Heartbeat request got no response!")
- }
- }},
- {V0_10_0_0,
- "ListGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ListGroupsRequest{}
- response, err := broker.ListGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ListGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "DescribeGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DescribeGroupsRequest{}
- response, err := broker.DescribeGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DescribeGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "ApiVersionsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ApiVersionsRequest{}
- response, err := broker.ApiVersions(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ApiVersions request got no response!")
- }
- }},
- {V1_1_0_0,
- "DeleteGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DeleteGroupsRequest{}
- response, err := broker.DeleteGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DeleteGroups request got no response!")
- }
- }},
- }
- func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
- metricValidators := newMetricValidators()
- mockBrokerBytesRead := mockBrokerMetrics.bytesRead
- mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
- // Check that the number of bytes sent corresponds to what the mock broker received
- metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
- if mockBrokerBytesWritten == 0 {
- // This a ProduceRequest with NoResponse
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
- } else {
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
- }
- // Check that the number of bytes received corresponds to what the mock broker sent
- metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
- metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
- // Run the validators
- metricValidators.run(t, broker.conf.MetricRegistry)
- }
|