broker_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package protocol
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "net"
  6. "strconv"
  7. "testing"
  8. )
  9. // FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
  10. // It takes a testing.T as provided by the test framework, a channel of responses, and a done channel.
  11. // It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
  12. // from that port and returns each provided response in order (if a response is nil, nothing is sent).
  13. // It returns the port on which it is listening or an error (if an error is returned, the goroutine is not spawned,
  14. // if an error occurs *in* the goroutine it is simply logged to the testing.T and the goroutine exits).
  15. // When the responses channel closes, it closes the done channel and exits. The calling test must read from the done
  16. // channel as its last step to ensure the number of requests and provided responses lined up correctly.
  17. //
  18. // When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
  19. // waiting for a response, it automatically panics.
  20. //
  21. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  22. // automatically as a convenience.
  23. func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (int32, error) {
  24. ln, err := net.Listen("tcp", "localhost:0")
  25. if err != nil {
  26. return 0, err
  27. }
  28. _, portStr, err := net.SplitHostPort(ln.Addr().String())
  29. if err != nil {
  30. return 0, err
  31. }
  32. tmp, err := strconv.ParseInt(portStr, 10, 32)
  33. if err != nil {
  34. return 0, err
  35. }
  36. port := int32(tmp)
  37. go func() {
  38. defer close(done)
  39. conn, err := ln.Accept()
  40. if err != nil {
  41. t.Error(err)
  42. conn.Close()
  43. ln.Close()
  44. return
  45. }
  46. reqHeader := make([]byte, 4)
  47. resHeader := make([]byte, 8)
  48. for response := range responses {
  49. _, err := io.ReadFull(conn, reqHeader)
  50. if err != nil {
  51. t.Error(err)
  52. conn.Close()
  53. ln.Close()
  54. return
  55. }
  56. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  57. if len(body) < 10 {
  58. t.Error("Kafka request too short.")
  59. conn.Close()
  60. ln.Close()
  61. return
  62. }
  63. _, err = io.ReadFull(conn, body)
  64. if err != nil {
  65. t.Error(err)
  66. conn.Close()
  67. ln.Close()
  68. return
  69. }
  70. if response == nil {
  71. continue
  72. }
  73. binary.BigEndian.PutUint32(resHeader, uint32(len(response)))
  74. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  75. _, err = conn.Write(resHeader)
  76. if err != nil {
  77. t.Error(err)
  78. conn.Close()
  79. ln.Close()
  80. return
  81. }
  82. _, err = conn.Write(response)
  83. if err != nil {
  84. t.Error(err)
  85. conn.Close()
  86. ln.Close()
  87. return
  88. }
  89. }
  90. err = conn.Close()
  91. if err != nil {
  92. t.Error(err)
  93. ln.Close()
  94. return
  95. }
  96. err = ln.Close()
  97. if err != nil {
  98. t.Error(err)
  99. return
  100. }
  101. }()
  102. return port, nil
  103. }
  104. func TestBrokerEquals(t *testing.T) {
  105. var b1, b2 *Broker
  106. b1 = nil
  107. b2 = nil
  108. if !b1.Equals(b2) {
  109. t.Error("Two nil brokers didn't compare equal.")
  110. }
  111. b1 = NewBroker("abc", 123)
  112. if b1.Equals(b2) {
  113. t.Error("Non-nil and nil brokers compared equal.")
  114. }
  115. if b2.Equals(b1) {
  116. t.Error("Nil and non-nil brokers compared equal.")
  117. }
  118. b2 = NewBroker("abc", 1234)
  119. if b1.Equals(b2) || b2.Equals(b1) {
  120. t.Error("Brokers with different ports compared equal.")
  121. }
  122. b2 = NewBroker("abcd", 123)
  123. if b1.Equals(b2) || b2.Equals(b1) {
  124. t.Error("Brokers with different hosts compared equal.")
  125. }
  126. b2 = NewBroker("abc", 123)
  127. b2.id = -2
  128. if b1.Equals(b2) || b2.Equals(b1) {
  129. t.Error("Brokers with different ids compared equal.")
  130. }
  131. b2.id = -1
  132. if !b1.Equals(b2) || !b2.Equals(b1) {
  133. t.Error("Similar brokers did not compare equal.")
  134. }
  135. }
  136. func TestBrokerID(t *testing.T) {
  137. broker := NewBroker("abc", 123)
  138. if broker.ID() != -1 {
  139. t.Error("New broker didn't have an ID of -1.")
  140. }
  141. broker.id = 34
  142. if broker.ID() != 34 {
  143. t.Error("Manually setting broker ID did not take effect.")
  144. }
  145. }
  146. func TestBrokerConnectClose(t *testing.T) {
  147. done := make(chan bool)
  148. responses := make(chan []byte)
  149. port, err := FakeKafkaServer(t, responses, done)
  150. if err != nil {
  151. t.Error(err)
  152. return
  153. }
  154. broker := NewBroker("localhost", port)
  155. err = broker.Connect()
  156. if err != nil {
  157. t.Error(err)
  158. return
  159. }
  160. err = broker.Close()
  161. if err != nil {
  162. t.Error(err)
  163. return
  164. }
  165. close(responses)
  166. <-done
  167. }