123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- package sarama
- import (
- "encoding/binary"
- "fmt"
- "io"
- "net"
- "strconv"
- "testing"
- "time"
- )
- type MockBroker struct {
- port int32
- stopper chan bool
- responses chan []byte
- listener net.Listener
- t *testing.T
- }
- func (b *MockBroker) Port() int32 {
- return b.port
- }
- func (b *MockBroker) Addr() string {
- return b.listener.Addr().String()
- }
- func (b *MockBroker) Close() {
- close(b.responses)
- <-b.stopper
- }
- func (b *MockBroker) serverLoop() {
- defer close(b.stopper)
- conn, err := b.listener.Accept()
- if err != nil {
- b.t.Error(err)
- conn.Close()
- b.listener.Close()
- return
- }
- reqHeader := make([]byte, 4)
- resHeader := make([]byte, 8)
- for response := range b.responses {
- if response == nil {
- time.Sleep(250 * time.Millisecond)
- continue
- }
- _, err := io.ReadFull(conn, reqHeader)
- if err != nil {
- b.t.Error(err)
- conn.Close()
- b.listener.Close()
- return
- }
- body := make([]byte, binary.BigEndian.Uint32(reqHeader))
- if len(body) < 10 {
- b.t.Error("Kafka request too short.")
- conn.Close()
- b.listener.Close()
- return
- }
- _, err = io.ReadFull(conn, body)
- if err != nil {
- b.t.Error(err)
- conn.Close()
- b.listener.Close()
- return
- }
- if len(response) == 0 {
- continue
- }
- binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
- binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
- _, err = conn.Write(resHeader)
- if err != nil {
- b.t.Error(err)
- conn.Close()
- b.listener.Close()
- return
- }
- _, err = conn.Write(response)
- if err != nil {
- b.t.Error(err)
- conn.Close()
- b.listener.Close()
- return
- }
- }
- err = conn.Close()
- if err != nil {
- b.t.Error(err)
- b.listener.Close()
- return
- }
- err = b.listener.Close()
- if err != nil {
- b.t.Error(err)
- return
- }
- }
- func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
- var err error
- broker := new(MockBroker)
- broker.stopper = make(chan bool)
- broker.responses = responses
- broker.t = t
- broker.listener, err = net.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatal(err)
- }
- _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- tmp, err := strconv.ParseInt(portStr, 10, 32)
- if err != nil {
- t.Fatal(err)
- }
- broker.port = int32(tmp)
- go broker.serverLoop()
- return broker
- }
- func ExampleBroker() error {
- broker := NewBroker("localhost:9092")
- err := broker.Open(4)
- if err != nil {
- return err
- }
- defer broker.Close()
- request := MetadataRequest{Topics: []string{"myTopic"}}
- response, err := broker.GetMetadata("myClient", &request)
- if err != nil {
- return err
- }
- fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
- return nil
- }
- func TestBrokerAccessors(t *testing.T) {
- broker := NewBroker("abc:123")
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- if broker.Addr() != "abc:123" {
- t.Error("New broker didn't have the correct address")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- }
- func TestSimpleBrokerCommunication(t *testing.T) {
- responses := make(chan []byte)
- mockBroker := NewMockBroker(t, responses)
- defer mockBroker.Close()
- broker := NewBroker(mockBroker.Addr())
- err := broker.Open(4)
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- for _, tt := range brokerTestTable {
- responses <- tt.response
- }
- }()
- for _, tt := range brokerTestTable {
- tt.runner(t, broker)
- }
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- }
- var brokerTestTable = []struct {
- response []byte
- runner func(*testing.T, *Broker)
- }{
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := MetadataRequest{}
- response, err := broker.GetMetadata("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Metadata request got no response!")
- }
- }},
- {[]byte{},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = NoResponse
- response, err := broker.Produce("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response != nil {
- t.Error("Produce request with NoResponse got a response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := ProduceRequest{}
- request.RequiredAcks = WaitForLocal
- response, err := broker.Produce("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Produce request without NoResponse got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := FetchRequest{}
- response, err := broker.Fetch("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Fetch request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetFetchRequest{}
- response, err := broker.FetchOffset("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetFetch request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetCommitRequest{}
- response, err := broker.CommitOffset("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("OffsetCommit request got no response!")
- }
- }},
- {[]byte{0x00, 0x00, 0x00, 0x00},
- func(t *testing.T, broker *Broker) {
- request := OffsetRequest{}
- response, err := broker.GetAvailableOffsets("clientID", &request)
- if err != nil {
- t.Error(err)
- }
- if response == nil {
- t.Error("Offset request got no response!")
- }
- }},
- }
|