| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package protocol
- import (
- "encoding/binary"
- "io"
- "net"
- "strconv"
- "testing"
- )
- // FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
- // It takes a testing.T as provided by the test framework and a channel of responses to use.
- // It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
- // from that port and returns each provided response in order (if a response is nil, nothing is sent).
- // When the server is successfully created, it returns the port on which it is listening and a 'done' channel
- // which it will close when it exits. Otherwise it will return an error (if an error occurs *in* the goroutine it
- // is simply logged to the testing.T and the goroutine exits). There is also a StopFakeServer helper that leads to
- // this recommended pattern in tests:
- //
- // port, done, err := FakeKafkaServer(t, responses)
- // if err != nil {
- // t.Fatal(err)
- // }
- // defer StopFakeServer(responses, done)
- //
- // When running tests like this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
- // waiting for a response, it automatically panics.
- //
- // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
- // automatically as a convenience.
- func FakeKafkaServer(t *testing.T, responses <-chan []byte) (int32, <-chan bool, error) {
- ln, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- return 0, nil, err
- }
- _, portStr, err := net.SplitHostPort(ln.Addr().String())
- if err != nil {
- return 0, nil, err
- }
- tmp, err := strconv.ParseInt(portStr, 10, 32)
- if err != nil {
- return 0, nil, err
- }
- port := int32(tmp)
- done := make(chan bool)
- go func() {
- defer close(done)
- conn, err := ln.Accept()
- if err != nil {
- t.Error(err)
- conn.Close()
- ln.Close()
- return
- }
- reqHeader := make([]byte, 4)
- resHeader := make([]byte, 8)
- for response := range responses {
- _, err := io.ReadFull(conn, reqHeader)
- if err != nil {
- t.Error(err)
- conn.Close()
- ln.Close()
- return
- }
- body := make([]byte, binary.BigEndian.Uint32(reqHeader))
- if len(body) < 10 {
- t.Error("Kafka request too short.")
- conn.Close()
- ln.Close()
- return
- }
- _, err = io.ReadFull(conn, body)
- if err != nil {
- t.Error(err)
- conn.Close()
- ln.Close()
- return
- }
- if response == nil {
- continue
- }
- binary.BigEndian.PutUint32(resHeader, uint32(len(response)))
- binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
- _, err = conn.Write(resHeader)
- if err != nil {
- t.Error(err)
- conn.Close()
- ln.Close()
- return
- }
- _, err = conn.Write(response)
- if err != nil {
- t.Error(err)
- conn.Close()
- ln.Close()
- return
- }
- }
- err = conn.Close()
- if err != nil {
- t.Error(err)
- ln.Close()
- return
- }
- err = ln.Close()
- if err != nil {
- t.Error(err)
- return
- }
- }()
- return port, done, nil
- }
- func StopFakeServer(responses chan []byte, done <-chan bool) {
- close(responses)
- <-done
- }
- func TestBrokerEquals(t *testing.T) {
- var b1, b2 *Broker
- b1 = nil
- b2 = nil
- if !b1.Equals(b2) {
- t.Error("Two nil brokers didn't compare equal.")
- }
- b1 = NewBroker("abc", 123)
- if b1.Equals(b2) {
- t.Error("Non-nil and nil brokers compared equal.")
- }
- if b2.Equals(b1) {
- t.Error("Nil and non-nil brokers compared equal.")
- }
- b2 = NewBroker("abc", 1234)
- if b1.Equals(b2) || b2.Equals(b1) {
- t.Error("Brokers with different ports compared equal.")
- }
- b2 = NewBroker("abcd", 123)
- if b1.Equals(b2) || b2.Equals(b1) {
- t.Error("Brokers with different hosts compared equal.")
- }
- b2 = NewBroker("abc", 123)
- b2.id = -2
- if b1.Equals(b2) || b2.Equals(b1) {
- t.Error("Brokers with different ids compared equal.")
- }
- b2.id = -1
- if !b1.Equals(b2) || !b2.Equals(b1) {
- t.Error("Similar brokers did not compare equal.")
- }
- }
- func TestBrokerID(t *testing.T) {
- broker := NewBroker("abc", 123)
- if broker.ID() != -1 {
- t.Error("New broker didn't have an ID of -1.")
- }
- broker.id = 34
- if broker.ID() != 34 {
- t.Error("Manually setting broker ID did not take effect.")
- }
- }
- func TestBrokerConnectClose(t *testing.T) {
- responses := make(chan []byte)
- port, done, err := FakeKafkaServer(t, responses)
- if err != nil {
- t.Fatal(err)
- }
- defer StopFakeServer(responses, done)
- broker := NewBroker("localhost", port)
- err = broker.Connect()
- if err != nil {
- t.Fatal(err)
- }
- err = broker.Close()
- if err != nil {
- t.Error(err)
- }
- }
|