123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package sarama
- import (
- "fmt"
- "github.com/Shopify/sarama/mockbroker"
- "testing"
- )
- func ExampleBroker() error {
- broker := NewBroker("localhost:9092")
- err := broker.Open(4)
- if err != nil {
- return err
- }
- defer broker.Close()
- request := MetadataRequest{Topics: []string{"myTopic"}}
- response, err := broker.GetMetadata("myClient", &request)
- if err != nil {
- return err
- }
- fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
- return nil
- }
- func TestBrokerAccessors(t *testing.T) {
- broker := NewBroker("abc:123")
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- if broker.Addr() != "abc:123" {
- t.Error("New broker didn't have the correct address")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- }
- func TestSimpleBrokerCommunication(t *testing.T) {
- responses := make(chan []byte)
- mockBroker := mockbroker.New(t, responses)
- defer mockBroker.Close()
- broker := NewBroker(mockBroker.Addr())
- err := broker.Open(4)
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- for _, tt := range brokerTestTable {
- responses <- tt.response
- }
- }()
- for _, tt := range brokerTestTable {
- tt.runner(t, broker)
- }
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- }
- var brokerTestTable = []struct {
- response []byte
- runner func(*testing.T, *Broker)
- }{
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := MetadataRequest{}
- response, err := broker.GetMetadata("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Metadata request got no response!")
- }
- }},
- {[]byte{},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = NoResponse
- response, err := broker.Produce("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response != nil {
- t.Error("Produce request with NoResponse got a response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = WaitForLocal
- response, err := broker.Produce("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Produce request without NoResponse got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := FetchRequest{}
- response, err := broker.Fetch("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Fetch request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetFetchRequest{}
- response, err := broker.FetchOffset("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetFetch request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetCommitRequest{}
- response, err := broker.CommitOffset("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetCommit request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetRequest{}
- response, err := broker.GetAvailableOffsets("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Offset request got no response!")
- }
- }},
- }
|