1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015 |
- package sarama
- import (
- "bytes"
- "errors"
- "fmt"
- "net"
- "reflect"
- "testing"
- "time"
- "github.com/rcrowley/go-metrics"
- "gopkg.in/jcmturner/gokrb5.v7/krberror"
- )
- func ExampleBroker() {
- broker := NewBroker("localhost:9092")
- err := broker.Open(nil)
- if err != nil {
- panic(err)
- }
- request := MetadataRequest{Topics: []string{"myTopic"}}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- _ = broker.Close()
- panic(err)
- }
- fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
- if err = broker.Close(); err != nil {
- panic(err)
- }
- }
- type mockEncoder struct {
- bytes []byte
- }
- func (m mockEncoder) encode(pe packetEncoder) error {
- return pe.putRawBytes(m.bytes)
- }
- func (m mockEncoder) headerVersion() int16 {
- return 0
- }
- type brokerMetrics struct {
- bytesRead int
- bytesWritten int
- }
- func TestBrokerAccessors(t *testing.T) {
- broker := NewBroker("abc:123")
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- if broker.Addr() != "abc:123" {
- t.Error("New broker didn't have the correct address")
- }
- if broker.Rack() != "" {
- t.Error("New broker didn't have an unknown rack.")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- rack := "dc1"
- broker.rack = &rack
- if broker.Rack() != rack {
- t.Error("Manually setting broker rack did not take effect.")
- }
- }
- func TestSimpleBrokerCommunication(t *testing.T) {
- for _, tt := range brokerTestTable {
- t.Run(tt.name, func(t *testing.T) {
- Logger.Printf("Testing broker communication for %s", tt.name)
- mb := NewMockBroker(t, 0)
- mb.Returns(&mockEncoder{tt.response})
- pendingNotify := make(chan brokerMetrics)
- // Register a callback to be notified about successful requests
- mb.SetNotifier(func(bytesRead, bytesWritten int) {
- pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
- })
- broker := NewBroker(mb.Addr())
- // Set the broker id in order to validate local broker metrics
- broker.id = 0
- conf := NewConfig()
- conf.Version = tt.version
- err := broker.Open(conf)
- if err != nil {
- t.Fatal(err)
- }
- tt.runner(t, broker)
- // Wait up to 500 ms for the remote broker to process the request and
- // notify us about the metrics
- timeout := 500 * time.Millisecond
- select {
- case mockBrokerMetrics := <-pendingNotify:
- validateBrokerMetrics(t, broker, mockBrokerMetrics)
- case <-time.After(timeout):
- t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
- }
- mb.Close()
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- })
- }
- }
- var ErrTokenFailure = errors.New("Failure generating token")
- type TokenProvider struct {
- accessToken *AccessToken
- err error
- }
- func (t *TokenProvider) Token() (*AccessToken, error) {
- return t.accessToken, t.err
- }
- func newTokenProvider(token *AccessToken, err error) *TokenProvider {
- return &TokenProvider{
- accessToken: token,
- err: err,
- }
- }
- func TestSASLOAuthBearer(t *testing.T) {
- testTable := []struct {
- name string
- authidentity string
- mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
- mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
- expectClientErr bool // Expect an internal client-side error
- expectedBrokerError KError // Expected Kafka error returned by client
- tokProvider *TokenProvider
- }{
- {
- name: "SASL/OAUTHBEARER OK server response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: false,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER authentication failure response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSequence(
- // First, the broker response with a challenge
- NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
- // Next, the client terminates the token exchange. Finally, the
- // broker responds with an error message.
- NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
- SetError(ErrSASLAuthenticationFailed),
- ),
- expectClientErr: true,
- expectedBrokerError: ErrSASLAuthenticationFailed,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER handshake failure response",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}).
- SetError(ErrSASLAuthenticationFailed),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrSASLAuthenticationFailed,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
- },
- {
- name: "SASL/OAUTHBEARER token generation error",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
- },
- {
- name: "SASL/OAUTHBEARER invalid extension",
- mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypeOAuth}),
- mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
- expectClientErr: true,
- expectedBrokerError: ErrNoError,
- tokProvider: newTokenProvider(&AccessToken{
- Token: "access-token-123",
- Extensions: map[string]string{"auth": "auth-value"},
- }, nil),
- },
- }
- for i, test := range testTable {
- t.Run(test.name, func(t *testing.T) {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": test.mockSASLAuthResponse,
- "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
- })
- // broker executes SASL requests against mockBroker
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- broker.requestsInFlight = metrics.NilCounter{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeOAuth
- conf.Net.SASL.TokenProvider = test.tokProvider
- broker.conf = conf
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if test.expectedBrokerError != ErrNoError {
- if test.expectedBrokerError != err {
- t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- })
- }
- }
- // A mock scram client.
- type MockSCRAMClient struct {
- done bool
- }
- func (m *MockSCRAMClient) Begin(_, _, _ string) (err error) {
- return nil
- }
- func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
- if challenge == "" {
- return "ping", nil
- }
- if challenge == "pong" {
- m.done = true
- return "", nil
- }
- return "", errors.New("failed to authenticate :(")
- }
- func (m *MockSCRAMClient) Done() bool {
- return m.done
- }
- var _ SCRAMClient = &MockSCRAMClient{}
- func TestSASLSCRAMSHAXXX(t *testing.T) {
- testTable := []struct {
- name string
- mockHandshakeErr KError
- mockSASLAuthErr KError
- expectClientErr bool
- scramClient *MockSCRAMClient
- scramChallengeResp string
- }{
- {
- name: "SASL/SCRAMSHAXXX successful authentication",
- mockHandshakeErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- {
- name: "SASL/SCRAMSHAXXX SCRAM client step error client",
- mockHandshakeErr: ErrNoError,
- mockSASLAuthErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "gong",
- expectClientErr: true,
- },
- {
- name: "SASL/SCRAMSHAXXX server authentication error",
- mockHandshakeErr: ErrNoError,
- mockSASLAuthErr: ErrSASLAuthenticationFailed,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- {
- name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
- mockHandshakeErr: ErrUnsupportedSASLMechanism,
- mockSASLAuthErr: ErrNoError,
- scramClient: &MockSCRAMClient{},
- scramChallengeResp: "pong",
- },
- }
- for i, test := range testTable {
- t.Run(test.name, func(t *testing.T) {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- broker := NewBroker(mockBroker.Addr())
- // broker executes SASL requests against mockBroker
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- broker.requestsInFlight = metrics.NilCounter{}
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
- mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
- if test.mockSASLAuthErr != ErrNoError {
- mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
- }
- if test.mockHandshakeErr != ErrNoError {
- mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
- }
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- "SaslHandshakeRequest": mockSASLHandshakeResponse,
- })
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
- conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
- broker.conf = conf
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if test.mockSASLAuthErr != ErrNoError {
- if test.mockSASLAuthErr != err {
- t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.mockHandshakeErr != ErrNoError {
- if test.mockHandshakeErr != err {
- t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- })
- }
- }
- func TestSASLPlainAuth(t *testing.T) {
- testTable := []struct {
- name string
- authidentity string
- mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
- mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
- expectClientErr bool // Expect an internal client-side error
- }{
- {
- name: "SASL Plain OK server response",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain OK server response with authidentity",
- authidentity: "authid",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain authentication failure response",
- mockAuthErr: ErrSASLAuthenticationFailed,
- mockHandshakeErr: ErrNoError,
- },
- {
- name: "SASL Plain handshake failure response",
- mockAuthErr: ErrNoError,
- mockHandshakeErr: ErrSASLAuthenticationFailed,
- },
- }
- for i, test := range testTable {
- t.Run(test.name, func(t *testing.T) {
- // mockBroker mocks underlying network logic and broker responses
- mockBroker := NewMockBroker(t, 0)
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`response_payload`))
- if test.mockAuthErr != ErrNoError {
- mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
- }
- mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
- SetEnabledMechanisms([]string{SASLTypePlaintext})
- if test.mockHandshakeErr != ErrNoError {
- mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
- }
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- "SaslHandshakeRequest": mockSASLHandshakeResponse,
- })
- // broker executes SASL requests against mockBroker
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- broker.requestsInFlight = metrics.NilCounter{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypePlaintext
- conf.Net.SASL.AuthIdentity = test.authidentity
- conf.Net.SASL.User = "token"
- conf.Net.SASL.Password = "password"
- conf.Net.SASL.Version = SASLHandshakeV1
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if err == nil {
- for _, rr := range mockBroker.History() {
- switch r := rr.Request.(type) {
- case *SaslAuthenticateRequest:
- x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
- if string(x[0]) != conf.Net.SASL.AuthIdentity {
- t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
- }
- if string(x[1]) != conf.Net.SASL.User {
- t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
- }
- if string(x[2]) != conf.Net.SASL.Password {
- t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
- }
- }
- }
- }
- if test.mockAuthErr != ErrNoError {
- if test.mockAuthErr != err {
- t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
- }
- } else if test.mockHandshakeErr != ErrNoError {
- if test.mockHandshakeErr != err {
- t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
- }
- } else if test.expectClientErr && err == nil {
- t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
- } else if !test.expectClientErr && err != nil {
- t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
- }
- mockBroker.Close()
- })
- }
- }
- // TestSASLReadTimeout ensures that the broker connection won't block forever
- // if the remote end never responds after the handshake
- func TestSASLReadTimeout(t *testing.T) {
- mockBroker := NewMockBroker(t, 0)
- defer mockBroker.Close()
- mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
- SetAuthBytes([]byte(`response_payload`))
- mockBroker.SetHandlerByMap(map[string]MockResponse{
- "SaslAuthenticateRequest": mockSASLAuthResponse,
- })
- broker := NewBroker(mockBroker.Addr())
- {
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- broker.requestsInFlight = metrics.NilCounter{}
- }
- conf := NewConfig()
- {
- conf.Net.ReadTimeout = time.Millisecond
- conf.Net.SASL.Mechanism = SASLTypePlaintext
- conf.Net.SASL.User = "token"
- conf.Net.SASL.Password = "password"
- conf.Net.SASL.Version = SASLHandshakeV1
- }
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{}
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- broker.conn = conn
- err = broker.authenticateViaSASL()
- if err == nil {
- t.Errorf("should never happen - expected read timeout")
- }
- }
- func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
- testTable := []struct {
- name string
- error error
- mockKerberosClient bool
- errorStage string
- badResponse bool
- badKeyChecksum bool
- }{
- {
- name: "Kerberos authentication success",
- error: nil,
- mockKerberosClient: true,
- },
- {
- name: "Kerberos login fails",
- error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
- "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
- "cation information was invalid - PREAUTH_FAILED"),
- mockKerberosClient: true,
- errorStage: "login",
- },
- {
- name: "Kerberos service ticket fails",
- error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
- "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
- "cation information was invalid - PREAUTH_FAILED"),
- mockKerberosClient: true,
- errorStage: "service_ticket",
- },
- {
- name: "Kerberos client creation fails",
- error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
- },
- {
- name: "Bad server response, unmarshall key error",
- error: errors.New("bytes shorter than header length"),
- badResponse: true,
- mockKerberosClient: true,
- },
- {
- name: "Bad token checksum",
- error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
- badResponse: false,
- badKeyChecksum: true,
- mockKerberosClient: true,
- },
- }
- for i, test := range testTable {
- t.Run(test.name, func(t *testing.T) {
- mockBroker := NewMockBroker(t, 0)
- // broker executes SASL requests against mockBroker
- mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
- return nil
- })
- broker := NewBroker(mockBroker.Addr())
- broker.requestRate = metrics.NilMeter{}
- broker.outgoingByteRate = metrics.NilMeter{}
- broker.incomingByteRate = metrics.NilMeter{}
- broker.requestSize = metrics.NilHistogram{}
- broker.responseSize = metrics.NilHistogram{}
- broker.responseRate = metrics.NilMeter{}
- broker.requestLatency = metrics.NilHistogram{}
- broker.requestsInFlight = metrics.NilCounter{}
- conf := NewConfig()
- conf.Net.SASL.Mechanism = SASLTypeGSSAPI
- conf.Net.SASL.GSSAPI.ServiceName = "kafka"
- conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
- conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
- conf.Net.SASL.GSSAPI.Username = "kafka"
- conf.Net.SASL.GSSAPI.Password = "kafka"
- conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
- conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
- broker.conf = conf
- broker.conf.Version = V1_0_0_0
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
- conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- gssapiHandler := KafkaGSSAPIHandler{
- client: &MockKerberosClient{},
- badResponse: test.badResponse,
- badKeyChecksum: test.badKeyChecksum,
- }
- mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
- broker.conn = conn
- if test.mockKerberosClient {
- broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
- return &MockKerberosClient{
- mockError: test.error,
- errorStage: test.errorStage,
- }, nil
- }
- } else {
- broker.kerberosAuthenticator.NewKerberosClientFunc = nil
- }
- err = broker.authenticateViaSASL()
- if err != nil && test.error != nil {
- if test.error.Error() != err.Error() {
- t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
- }
- } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
- t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
- }
- mockBroker.Close()
- })
- }
- }
- func TestBuildClientFirstMessage(t *testing.T) {
- testTable := []struct {
- name string
- token *AccessToken
- expected []byte
- expectError bool
- }{
- {
- name: "Build SASL client initial response with two extensions",
- token: &AccessToken{
- Token: "the-token",
- Extensions: map[string]string{
- "x": "1",
- "y": "2",
- },
- },
- expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
- },
- {
- name: "Build SASL client initial response with no extensions",
- token: &AccessToken{Token: "the-token"},
- expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
- },
- {
- name: "Build SASL client initial response using reserved extension",
- token: &AccessToken{
- Token: "the-token",
- Extensions: map[string]string{
- "auth": "auth-value",
- },
- },
- expected: []byte(""),
- expectError: true,
- },
- }
- for i, test := range testTable {
- t.Run(test.name, func(t *testing.T) {
- actual, err := buildClientFirstMessage(test.token)
- if !reflect.DeepEqual(test.expected, actual) {
- t.Errorf("Expected %s, got %s\n", test.expected, actual)
- }
- if test.expectError && err == nil {
- t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
- }
- if !test.expectError && err != nil {
- t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
- }
- })
- }
- }
- // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
- var brokerTestTable = []struct {
- version KafkaVersion
- name string
- response []byte
- runner func(*testing.T, *Broker)
- }{
- {V0_10_0_0,
- "MetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := MetadataRequest{}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ConsumerMetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ConsumerMetadataRequest{}
- response, err := broker.GetConsumerMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Consumer Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (NoResponse)",
- []byte{},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = NoResponse
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response != nil {
- t.Error("Produce request with NoResponse got a response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (WaitForLocal)",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = WaitForLocal
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Produce request without NoResponse got no response!")
- }
- }},
- {V0_10_0_0,
- "FetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := FetchRequest{}
- response, err := broker.Fetch(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Fetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetFetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetFetchRequest{}
- response, err := broker.FetchOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetFetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetCommitRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetCommitRequest{}
- response, err := broker.CommitOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetCommit request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetRequest{}
- response, err := broker.GetAvailableOffsets(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Offset request got no response!")
- }
- }},
- {V0_10_0_0,
- "JoinGroupRequest",
- []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := JoinGroupRequest{}
- response, err := broker.JoinGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("JoinGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "SyncGroupRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := SyncGroupRequest{}
- response, err := broker.SyncGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("SyncGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "LeaveGroupRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := LeaveGroupRequest{}
- response, err := broker.LeaveGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("LeaveGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "HeartbeatRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := HeartbeatRequest{}
- response, err := broker.Heartbeat(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Heartbeat request got no response!")
- }
- }},
- {V0_10_0_0,
- "ListGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ListGroupsRequest{}
- response, err := broker.ListGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ListGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "DescribeGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DescribeGroupsRequest{}
- response, err := broker.DescribeGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DescribeGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "ApiVersionsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ApiVersionsRequest{}
- response, err := broker.ApiVersions(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ApiVersions request got no response!")
- }
- }},
- {V1_1_0_0,
- "DeleteGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DeleteGroupsRequest{}
- response, err := broker.DeleteGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DeleteGroups request got no response!")
- }
- }},
- }
- func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
- metricValidators := newMetricValidators()
- mockBrokerBytesRead := mockBrokerMetrics.bytesRead
- mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
- // Check that the number of bytes sent corresponds to what the mock broker received
- metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
- if mockBrokerBytesWritten == 0 {
- // This a ProduceRequest with NoResponse
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
- } else {
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
- }
- // Check that the number of bytes received corresponds to what the mock broker sent
- metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
- metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
- // Check that there is no more requests in flight
- metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
- // Run the validators
- metricValidators.run(t, broker.conf.MetricRegistry)
- }
|