package sarama import ( "bytes" "errors" "fmt" "net" "reflect" "testing" "time" "github.com/rcrowley/go-metrics" "gopkg.in/jcmturner/gokrb5.v7/krberror" ) func ExampleBroker() { broker := NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { panic(err) } request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata(&request) if err != nil { _ = broker.Close() panic(err) } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") if err = broker.Close(); err != nil { panic(err) } } type mockEncoder struct { bytes []byte } func (m mockEncoder) encode(pe packetEncoder) error { return pe.putRawBytes(m.bytes) } type brokerMetrics struct { bytesRead int bytesWritten int } func TestBrokerAccessors(t *testing.T) { broker := NewBroker("abc:123") if broker.ID() != -1 { t.Error("New broker didn't have an ID of -1.") } if broker.Addr() != "abc:123" { t.Error("New broker didn't have the correct address") } if broker.Rack() != "" { t.Error("New broker didn't have an unknown rack.") } broker.id = 34 if broker.ID() != 34 { t.Error("Manually setting broker ID did not take effect.") } rack := "dc1" broker.rack = &rack if broker.Rack() != rack { t.Error("Manually setting broker rack did not take effect.") } } func TestSimpleBrokerCommunication(t *testing.T) { for _, tt := range brokerTestTable { Logger.Printf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) mb.Returns(&mockEncoder{tt.response}) pendingNotify := make(chan brokerMetrics) // Register a callback to be notified about successful requests mb.SetNotifier(func(bytesRead, bytesWritten int) { pendingNotify <- brokerMetrics{bytesRead, bytesWritten} }) broker := NewBroker(mb.Addr()) // Set the broker id in order to validate local broker metrics broker.id = 0 conf := NewConfig() conf.Version = tt.version err := broker.Open(conf) if err != nil { t.Fatal(err) } tt.runner(t, broker) // Wait up to 500 ms for the remote broker to process the request and // notify us about the metrics timeout := 500 * time.Millisecond select { case mockBrokerMetrics := <-pendingNotify: validateBrokerMetrics(t, broker, mockBrokerMetrics) case <-time.After(timeout): t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) } mb.Close() err = broker.Close() if err != nil { t.Error(err) } } } var ErrTokenFailure = errors.New("Failure generating token") type TokenProvider struct { accessToken *AccessToken err error } func (t *TokenProvider) Token() (*AccessToken, error) { return t.accessToken, t.err } func newTokenProvider(token *AccessToken, err error) *TokenProvider { return &TokenProvider{ accessToken: token, err: err, } } func TestSASLOAuthBearer(t *testing.T) { testTable := []struct { name string authidentity string mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker expectClientErr bool // Expect an internal client-side error expectedBrokerError KError // Expected Kafka error returned by client tokProvider *TokenProvider }{ { name: "SASL/OAUTHBEARER OK server response", mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypeOAuth}), mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t), expectClientErr: false, expectedBrokerError: ErrNoError, tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil), }, { name: "SASL/OAUTHBEARER authentication failure response", mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypeOAuth}), mockSASLAuthResponse: NewMockSequence( // First, the broker response with a challenge NewMockSaslAuthenticateResponse(t). SetAuthBytes([]byte(`{"status":"invalid_request1"}`)), // Next, the client terminates the token exchange. Finally, the // broker responds with an error message. NewMockSaslAuthenticateResponse(t). SetAuthBytes([]byte(`{"status":"invalid_request2"}`)). SetError(ErrSASLAuthenticationFailed), ), expectClientErr: true, expectedBrokerError: ErrSASLAuthenticationFailed, tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil), }, { name: "SASL/OAUTHBEARER handshake failure response", mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypeOAuth}). SetError(ErrSASLAuthenticationFailed), mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t), expectClientErr: true, expectedBrokerError: ErrSASLAuthenticationFailed, tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil), }, { name: "SASL/OAUTHBEARER token generation error", mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypeOAuth}), mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t), expectClientErr: true, expectedBrokerError: ErrNoError, tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure), }, { name: "SASL/OAUTHBEARER invalid extension", mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypeOAuth}), mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t), expectClientErr: true, expectedBrokerError: ErrNoError, tokProvider: newTokenProvider(&AccessToken{ Token: "access-token-123", Extensions: map[string]string{"auth": "auth-value"}, }, nil), }, } for i, test := range testTable { // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) mockBroker.SetHandlerByMap(map[string]MockResponse{ "SaslAuthenticateRequest": test.mockSASLAuthResponse, "SaslHandshakeRequest": test.mockSASLHandshakeResponse, }) // broker executes SASL requests against mockBroker broker := NewBroker(mockBroker.Addr()) broker.requestRate = metrics.NilMeter{} broker.outgoingByteRate = metrics.NilMeter{} broker.incomingByteRate = metrics.NilMeter{} broker.requestSize = metrics.NilHistogram{} broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypeOAuth conf.Net.SASL.TokenProvider = test.tokProvider broker.conf = conf dialer := net.Dialer{ Timeout: conf.Net.DialTimeout, KeepAlive: conf.Net.KeepAlive, LocalAddr: conf.Net.LocalAddr, } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) if err != nil { t.Fatal(err) } broker.conn = conn err = broker.authenticateViaSASL() if test.expectedBrokerError != ErrNoError { if test.expectedBrokerError != err { t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err) } } else if test.expectClientErr && err == nil { t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name) } else if !test.expectClientErr && err != nil { t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err) } mockBroker.Close() } } // A mock scram client. type MockSCRAMClient struct { done bool } func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) { return nil } func (m *MockSCRAMClient) Step(challenge string) (response string, err error) { if challenge == "" { return "ping", nil } if challenge == "pong" { m.done = true return "", nil } return "", errors.New("failed to authenticate :(") } func (m *MockSCRAMClient) Done() bool { return m.done } var _ SCRAMClient = &MockSCRAMClient{} func TestSASLSCRAMSHAXXX(t *testing.T) { testTable := []struct { name string mockHandshakeErr KError mockSASLAuthErr KError expectClientErr bool scramClient *MockSCRAMClient scramChallengeResp string }{ { name: "SASL/SCRAMSHAXXX successfull authentication", mockHandshakeErr: ErrNoError, scramClient: &MockSCRAMClient{}, scramChallengeResp: "pong", }, { name: "SASL/SCRAMSHAXXX SCRAM client step error client", mockHandshakeErr: ErrNoError, mockSASLAuthErr: ErrNoError, scramClient: &MockSCRAMClient{}, scramChallengeResp: "gong", expectClientErr: true, }, { name: "SASL/SCRAMSHAXXX server authentication error", mockHandshakeErr: ErrNoError, mockSASLAuthErr: ErrSASLAuthenticationFailed, scramClient: &MockSCRAMClient{}, scramChallengeResp: "pong", }, { name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism", mockHandshakeErr: ErrUnsupportedSASLMechanism, mockSASLAuthErr: ErrNoError, scramClient: &MockSCRAMClient{}, scramChallengeResp: "pong", }, } for i, test := range testTable { // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) broker := NewBroker(mockBroker.Addr()) // broker executes SASL requests against mockBroker broker.requestRate = metrics.NilMeter{} broker.outgoingByteRate = metrics.NilMeter{} broker.incomingByteRate = metrics.NilMeter{} broker.requestSize = metrics.NilHistogram{} broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp)) mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512}) if test.mockSASLAuthErr != ErrNoError { mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr) } if test.mockHandshakeErr != ErrNoError { mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr) } mockBroker.SetHandlerByMap(map[string]MockResponse{ "SaslAuthenticateRequest": mockSASLAuthResponse, "SaslHandshakeRequest": mockSASLHandshakeResponse, }) conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512 conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient } broker.conf = conf dialer := net.Dialer{ Timeout: conf.Net.DialTimeout, KeepAlive: conf.Net.KeepAlive, LocalAddr: conf.Net.LocalAddr, } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) if err != nil { t.Fatal(err) } broker.conn = conn err = broker.authenticateViaSASL() if test.mockSASLAuthErr != ErrNoError { if test.mockSASLAuthErr != err { t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err) } } else if test.mockHandshakeErr != ErrNoError { if test.mockHandshakeErr != err { t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err) } } else if test.expectClientErr && err == nil { t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name) } else if !test.expectClientErr && err != nil { t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err) } mockBroker.Close() } } func TestSASLPlainAuth(t *testing.T) { testTable := []struct { name string authidentity string mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest expectClientErr bool // Expect an internal client-side error }{ { name: "SASL Plain OK server response", mockAuthErr: ErrNoError, mockHandshakeErr: ErrNoError, }, { name: "SASL Plain OK server response with authidentity", authidentity: "authid", mockAuthErr: ErrNoError, mockHandshakeErr: ErrNoError, }, { name: "SASL Plain authentication failure response", mockAuthErr: ErrSASLAuthenticationFailed, mockHandshakeErr: ErrNoError, }, { name: "SASL Plain handshake failure response", mockAuthErr: ErrNoError, mockHandshakeErr: ErrSASLAuthenticationFailed, }, } for i, test := range testTable { // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t). SetAuthBytes([]byte(`response_payload`)) if test.mockAuthErr != ErrNoError { mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr) } mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t). SetEnabledMechanisms([]string{SASLTypePlaintext}) if test.mockHandshakeErr != ErrNoError { mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr) } mockBroker.SetHandlerByMap(map[string]MockResponse{ "SaslAuthenticateRequest": mockSASLAuthResponse, "SaslHandshakeRequest": mockSASLHandshakeResponse, }) // broker executes SASL requests against mockBroker broker := NewBroker(mockBroker.Addr()) broker.requestRate = metrics.NilMeter{} broker.outgoingByteRate = metrics.NilMeter{} broker.incomingByteRate = metrics.NilMeter{} broker.requestSize = metrics.NilHistogram{} broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypePlaintext conf.Net.SASL.AuthIdentity = test.authidentity conf.Net.SASL.User = "token" conf.Net.SASL.Password = "password" conf.Net.SASL.Version = SASLHandshakeV1 broker.conf = conf broker.conf.Version = V1_0_0_0 dialer := net.Dialer{ Timeout: conf.Net.DialTimeout, KeepAlive: conf.Net.KeepAlive, LocalAddr: conf.Net.LocalAddr, } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) if err != nil { t.Fatal(err) } broker.conn = conn err = broker.authenticateViaSASL() if err == nil { for _, rr := range mockBroker.History() { switch r := rr.Request.(type) { case *SaslAuthenticateRequest: x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3) if string(x[0]) != conf.Net.SASL.AuthIdentity { t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0]) } if string(x[1]) != conf.Net.SASL.User { t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1]) } if string(x[2]) != conf.Net.SASL.Password { t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2]) } } } } if test.mockAuthErr != ErrNoError { if test.mockAuthErr != err { t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err) } } else if test.mockHandshakeErr != ErrNoError { if test.mockHandshakeErr != err { t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err) } } else if test.expectClientErr && err == nil { t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name) } else if !test.expectClientErr && err != nil { t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err) } mockBroker.Close() } } // TestSASLReadTimeout ensures that the broker connection won't block forever // if the remote end never responds after the handshake func TestSASLReadTimeout(t *testing.T) { mockBroker := NewMockBroker(t, 0) defer mockBroker.Close() mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t). SetAuthBytes([]byte(`response_payload`)) mockBroker.SetHandlerByMap(map[string]MockResponse{ "SaslAuthenticateRequest": mockSASLAuthResponse, }) broker := NewBroker(mockBroker.Addr()) { broker.requestRate = metrics.NilMeter{} broker.outgoingByteRate = metrics.NilMeter{} broker.incomingByteRate = metrics.NilMeter{} broker.requestSize = metrics.NilHistogram{} broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} } conf := NewConfig() { conf.Net.ReadTimeout = time.Millisecond conf.Net.SASL.Mechanism = SASLTypePlaintext conf.Net.SASL.User = "token" conf.Net.SASL.Password = "password" conf.Net.SASL.Version = SASLHandshakeV1 } broker.conf = conf broker.conf.Version = V1_0_0_0 dialer := net.Dialer{} conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) if err != nil { t.Fatal(err) } broker.conn = conn err = broker.authenticateViaSASL() if err == nil { t.Errorf("should never happen - expected read timeout") } } func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { testTable := []struct { name string error error mockKerberosClient bool errorStage string badResponse bool badKeyChecksum bool }{ { name: "Kerberos authentication success", error: nil, mockKerberosClient: true, }, { name: "Kerberos login fails", error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+ "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+ "cation information was invalid - PREAUTH_FAILED"), mockKerberosClient: true, errorStage: "login", }, { name: "Kerberos service ticket fails", error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+ "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+ "cation information was invalid - PREAUTH_FAILED"), mockKerberosClient: true, errorStage: "service_ticket", }, { name: "Kerberos client creation fails", error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"), }, { name: "Bad server response, unmarshall key error", error: errors.New("bytes shorter than header length"), badResponse: true, mockKerberosClient: true, }, { name: "Bad token checksum", error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"), badResponse: false, badKeyChecksum: true, mockKerberosClient: true, }, } for i, test := range testTable { mockBroker := NewMockBroker(t, 0) // broker executes SASL requests against mockBroker mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte { return nil }) broker := NewBroker(mockBroker.Addr()) broker.requestRate = metrics.NilMeter{} broker.outgoingByteRate = metrics.NilMeter{} broker.incomingByteRate = metrics.NilMeter{} broker.requestSize = metrics.NilHistogram{} broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypeGSSAPI conf.Net.SASL.GSSAPI.ServiceName = "kafka" conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf" conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM" conf.Net.SASL.GSSAPI.Username = "kafka" conf.Net.SASL.GSSAPI.Password = "kafka" conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab" conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH broker.conf = conf broker.conf.Version = V1_0_0_0 dialer := net.Dialer{ Timeout: conf.Net.DialTimeout, KeepAlive: conf.Net.KeepAlive, LocalAddr: conf.Net.LocalAddr, } conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) if err != nil { t.Fatal(err) } gssapiHandler := KafkaGSSAPIHandler{ client: &MockKerberosClient{}, badResponse: test.badResponse, badKeyChecksum: test.badKeyChecksum, } mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI) broker.conn = conn if test.mockKerberosClient { broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) { return &MockKerberosClient{ mockError: test.error, errorStage: test.errorStage, }, nil } } else { broker.kerberosAuthenticator.NewKerberosClientFunc = nil } err = broker.authenticateViaSASL() if err != nil && test.error != nil { if test.error.Error() != err.Error() { t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err) } } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) { t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err) } mockBroker.Close() } } func TestBuildClientFirstMessage(t *testing.T) { testTable := []struct { name string token *AccessToken expected []byte expectError bool }{ { name: "Build SASL client initial response with two extensions", token: &AccessToken{ Token: "the-token", Extensions: map[string]string{ "x": "1", "y": "2", }, }, expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"), }, { name: "Build SASL client initial response with no extensions", token: &AccessToken{Token: "the-token"}, expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"), }, { name: "Build SASL client initial response using reserved extension", token: &AccessToken{ Token: "the-token", Extensions: map[string]string{ "auth": "auth-value", }, }, expected: []byte(""), expectError: true, }, } for i, test := range testTable { actual, err := buildClientFirstMessage(test.token) if !reflect.DeepEqual(test.expected, actual) { t.Errorf("Expected %s, got %s\n", test.expected, actual) } if test.expectError && err == nil { t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name) } if !test.expectError && err != nil { t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err) } } } // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { version KafkaVersion name string response []byte runner func(*testing.T, *Broker) }{ {V0_10_0_0, "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} response, err := broker.GetMetadata(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Metadata request got no response!") } }}, {V0_10_0_0, "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} response, err := broker.GetConsumerMetadata(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Consumer Metadata request got no response!") } }}, {V0_10_0_0, "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = NoResponse response, err := broker.Produce(&request) if err != nil { t.Error(err) } if response != nil { t.Error("Produce request with NoResponse got a response!") } }}, {V0_10_0_0, "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = WaitForLocal response, err := broker.Produce(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Produce request without NoResponse got no response!") } }}, {V0_10_0_0, "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} response, err := broker.Fetch(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Fetch request got no response!") } }}, {V0_10_0_0, "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} response, err := broker.FetchOffset(&request) if err != nil { t.Error(err) } if response == nil { t.Error("OffsetFetch request got no response!") } }}, {V0_10_0_0, "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} response, err := broker.CommitOffset(&request) if err != nil { t.Error(err) } if response == nil { t.Error("OffsetCommit request got no response!") } }}, {V0_10_0_0, "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} response, err := broker.GetAvailableOffsets(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Offset request got no response!") } }}, {V0_10_0_0, "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := JoinGroupRequest{} response, err := broker.JoinGroup(&request) if err != nil { t.Error(err) } if response == nil { t.Error("JoinGroup request got no response!") } }}, {V0_10_0_0, "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := SyncGroupRequest{} response, err := broker.SyncGroup(&request) if err != nil { t.Error(err) } if response == nil { t.Error("SyncGroup request got no response!") } }}, {V0_10_0_0, "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := LeaveGroupRequest{} response, err := broker.LeaveGroup(&request) if err != nil { t.Error(err) } if response == nil { t.Error("LeaveGroup request got no response!") } }}, {V0_10_0_0, "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := HeartbeatRequest{} response, err := broker.Heartbeat(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Heartbeat request got no response!") } }}, {V0_10_0_0, "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ListGroupsRequest{} response, err := broker.ListGroups(&request) if err != nil { t.Error(err) } if response == nil { t.Error("ListGroups request got no response!") } }}, {V0_10_0_0, "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DescribeGroupsRequest{} response, err := broker.DescribeGroups(&request) if err != nil { t.Error(err) } if response == nil { t.Error("DescribeGroups request got no response!") } }}, {V0_10_0_0, "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ApiVersionsRequest{} response, err := broker.ApiVersions(&request) if err != nil { t.Error(err) } if response == nil { t.Error("ApiVersions request got no response!") } }}, {V1_1_0_0, "DeleteGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DeleteGroupsRequest{} response, err := broker.DeleteGroups(&request) if err != nil { t.Error(err) } if response == nil { t.Error("DeleteGroups request got no response!") } }}, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { metricValidators := newMetricValidators() mockBrokerBytesRead := mockBrokerMetrics.bytesRead mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten // Check that the number of bytes sent corresponds to what the mock broker received metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten)) if mockBrokerBytesWritten == 0 { // This a ProduceRequest with NoResponse metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0)) metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0)) metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0)) } else { metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1)) metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1)) metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten)) } // Check that the number of bytes received corresponds to what the mock broker sent metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead)) metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1)) metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1)) metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead)) // Check that there is no more requests in flight metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0)) // Run the validators metricValidators.run(t, broker.conf.MetricRegistry) }