123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- package sarama
- import (
- "fmt"
- "testing"
- "time"
- )
- func ExampleBroker() {
- broker := NewBroker("localhost:9092")
- err := broker.Open(nil)
- if err != nil {
- panic(err)
- }
- request := MetadataRequest{Topics: []string{"myTopic"}}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- _ = broker.Close()
- panic(err)
- }
- fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
- if err = broker.Close(); err != nil {
- panic(err)
- }
- }
- type mockEncoder struct {
- bytes []byte
- }
- func (m mockEncoder) encode(pe packetEncoder) error {
- return pe.putRawBytes(m.bytes)
- }
- type brokerMetrics struct {
- bytesRead int
- bytesWritten int
- }
- func TestBrokerAccessors(t *testing.T) {
- broker := NewBroker("abc:123")
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- if broker.Addr() != "abc:123" {
- t.Error("New broker didn't have the correct address")
- }
- if broker.Rack() != "" {
- t.Error("New broker didn't have an unknown rack.")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- rack := "dc1"
- broker.rack = &rack
- if broker.Rack() != rack {
- t.Error("Manually setting broker rack did not take effect.")
- }
- }
- func TestSimpleBrokerCommunication(t *testing.T) {
- for _, tt := range brokerTestTable {
- Logger.Printf("Testing broker communication for %s", tt.name)
- mb := NewMockBroker(t, 0)
- mb.Returns(&mockEncoder{tt.response})
- pendingNotify := make(chan brokerMetrics)
-
- mb.SetNotifier(func(bytesRead, bytesWritten int) {
- pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
- })
- broker := NewBroker(mb.Addr())
-
- broker.id = 0
- conf := NewConfig()
- conf.Version = tt.version
- err := broker.Open(conf)
- if err != nil {
- t.Fatal(err)
- }
- tt.runner(t, broker)
-
-
- timeout := 500 * time.Millisecond
- select {
- case mockBrokerMetrics := <-pendingNotify:
- validateBrokerMetrics(t, broker, mockBrokerMetrics)
- case <-time.After(timeout):
- t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
- }
- mb.Close()
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- }
- }
- var brokerTestTable = []struct {
- version KafkaVersion
- name string
- response []byte
- runner func(*testing.T, *Broker)
- }{
- {V0_10_0_0,
- "MetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := MetadataRequest{}
- response, err := broker.GetMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ConsumerMetadataRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ConsumerMetadataRequest{}
- response, err := broker.GetConsumerMetadata(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Consumer Metadata request got no response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (NoResponse)",
- []byte{},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = NoResponse
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response != nil {
- t.Error("Produce request with NoResponse got a response!")
- }
- }},
- {V0_10_0_0,
- "ProduceRequest (WaitForLocal)",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = WaitForLocal
- response, err := broker.Produce(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Produce request without NoResponse got no response!")
- }
- }},
- {V0_10_0_0,
- "FetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := FetchRequest{}
- response, err := broker.Fetch(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Fetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetFetchRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetFetchRequest{}
- response, err := broker.FetchOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetFetch request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetCommitRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetCommitRequest{}
- response, err := broker.CommitOffset(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetCommit request got no response!")
- }
- }},
- {V0_10_0_0,
- "OffsetRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetRequest{}
- response, err := broker.GetAvailableOffsets(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Offset request got no response!")
- }
- }},
- {V0_10_0_0,
- "JoinGroupRequest",
- []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := JoinGroupRequest{}
- response, err := broker.JoinGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("JoinGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "SyncGroupRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := SyncGroupRequest{}
- response, err := broker.SyncGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("SyncGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "LeaveGroupRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := LeaveGroupRequest{}
- response, err := broker.LeaveGroup(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("LeaveGroup request got no response!")
- }
- }},
- {V0_10_0_0,
- "HeartbeatRequest",
- []byte{0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := HeartbeatRequest{}
- response, err := broker.Heartbeat(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Heartbeat request got no response!")
- }
- }},
- {V0_10_0_0,
- "ListGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ListGroupsRequest{}
- response, err := broker.ListGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ListGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "DescribeGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DescribeGroupsRequest{}
- response, err := broker.DescribeGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DescribeGroups request got no response!")
- }
- }},
- {V0_10_0_0,
- "ApiVersionsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ApiVersionsRequest{}
- response, err := broker.ApiVersions(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("ApiVersions request got no response!")
- }
- }},
- {V1_1_0_0,
- "DeleteGroupsRequest",
- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := DeleteGroupsRequest{}
- response, err := broker.DeleteGroups(&request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("DeleteGroups request got no response!")
- }
- }},
- }
- func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
- metricValidators := newMetricValidators()
- mockBrokerBytesRead := mockBrokerMetrics.bytesRead
- mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
-
- metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
- if mockBrokerBytesWritten == 0 {
-
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
- } else {
- metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
- }
-
- metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
- metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
- metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
- metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
-
- metricValidators.run(t, broker.conf.MetricRegistry)
- }
|