package sarama import ( "fmt" "testing" ) func ExampleBroker() error { broker := NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { return err } request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata(&request) if err != nil { _ = broker.Close() return err } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") return broker.Close() } type mockEncoder struct { bytes []byte } func (m mockEncoder) encode(pe packetEncoder) error { return pe.putRawBytes(m.bytes) } func TestBrokerAccessors(t *testing.T) { broker := NewBroker("abc:123") if broker.ID() != -1 { t.Error("New broker didn't have an ID of -1.") } if broker.Addr() != "abc:123" { t.Error("New broker didn't have the correct address") } broker.id = 34 if broker.ID() != 34 { t.Error("Manually setting broker ID did not take effect.") } } func TestSimpleBrokerCommunication(t *testing.T) { mb := newMockBroker(t, 0) defer mb.Close() broker := NewBroker(mb.Addr()) err := broker.Open(nil) if err != nil { t.Fatal(err) } for _, tt := range brokerTestTable { mb.Returns(&mockEncoder{tt.response}) } for _, tt := range brokerTestTable { tt.runner(t, broker) } err = broker.Close() if err != nil { t.Error(err) } } // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { response []byte runner func(*testing.T, *Broker) }{ {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} response, err := broker.GetMetadata(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Metadata request got no response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} response, err := broker.GetConsumerMetadata(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Consumer Metadata request got no response!") } }}, {[]byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = NoResponse response, err := broker.Produce(&request) if err != nil { t.Error(err) } if response != nil { t.Error("Produce request with NoResponse got a response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} request.RequiredAcks = WaitForLocal response, err := broker.Produce(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Produce request without NoResponse got no response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} response, err := broker.Fetch(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Fetch request got no response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} response, err := broker.FetchOffset(&request) if err != nil { t.Error(err) } if response == nil { t.Error("OffsetFetch request got no response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} response, err := broker.CommitOffset(&request) if err != nil { t.Error(err) } if response == nil { t.Error("OffsetCommit request got no response!") } }}, {[]byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} response, err := broker.GetAvailableOffsets(&request) if err != nil { t.Error(err) } if response == nil { t.Error("Offset request got no response!") } }}, }