broker_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package protocol
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "net"
  6. "strconv"
  7. "testing"
  8. )
  9. // FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
  10. // It takes a testing.T as provided by the test framework and a channel of responses to use.
  11. // It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
  12. // from that port and returns each provided response in order (if a response is nil, nothing is sent).
  13. // When the server is successfully created, it returns the port on which it is listening and a 'done' channel
  14. // which it will close when it exits. Otherwise it will return an error (if an error occurs *in* the goroutine it
  15. // is simply logged to the testing.T and the goroutine exits). There is also a StopFakeServer helper that leads to
  16. // this recommended pattern in tests:
  17. //
  18. // port, done, err := FakeKafkaServer(t, responses)
  19. // if err != nil {
  20. // t.Fatal(err)
  21. // }
  22. // defer StopFakeServer(responses, done)
  23. //
  24. // When running tests like this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
  25. // waiting for a response, it automatically panics.
  26. //
  27. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  28. // automatically as a convenience.
  29. func FakeKafkaServer(t *testing.T, responses <-chan []byte) (int32, <-chan bool, error) {
  30. ln, err := net.Listen("tcp", "localhost:0")
  31. if err != nil {
  32. return 0, nil, err
  33. }
  34. _, portStr, err := net.SplitHostPort(ln.Addr().String())
  35. if err != nil {
  36. return 0, nil, err
  37. }
  38. tmp, err := strconv.ParseInt(portStr, 10, 32)
  39. if err != nil {
  40. return 0, nil, err
  41. }
  42. port := int32(tmp)
  43. done := make(chan bool)
  44. go func() {
  45. defer close(done)
  46. conn, err := ln.Accept()
  47. if err != nil {
  48. t.Error(err)
  49. conn.Close()
  50. ln.Close()
  51. return
  52. }
  53. reqHeader := make([]byte, 4)
  54. resHeader := make([]byte, 8)
  55. for response := range responses {
  56. _, err := io.ReadFull(conn, reqHeader)
  57. if err != nil {
  58. t.Error(err)
  59. conn.Close()
  60. ln.Close()
  61. return
  62. }
  63. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  64. if len(body) < 10 {
  65. t.Error("Kafka request too short.")
  66. conn.Close()
  67. ln.Close()
  68. return
  69. }
  70. _, err = io.ReadFull(conn, body)
  71. if err != nil {
  72. t.Error(err)
  73. conn.Close()
  74. ln.Close()
  75. return
  76. }
  77. if response == nil {
  78. continue
  79. }
  80. binary.BigEndian.PutUint32(resHeader, uint32(len(response)))
  81. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  82. _, err = conn.Write(resHeader)
  83. if err != nil {
  84. t.Error(err)
  85. conn.Close()
  86. ln.Close()
  87. return
  88. }
  89. _, err = conn.Write(response)
  90. if err != nil {
  91. t.Error(err)
  92. conn.Close()
  93. ln.Close()
  94. return
  95. }
  96. }
  97. err = conn.Close()
  98. if err != nil {
  99. t.Error(err)
  100. ln.Close()
  101. return
  102. }
  103. err = ln.Close()
  104. if err != nil {
  105. t.Error(err)
  106. return
  107. }
  108. }()
  109. return port, done, nil
  110. }
  111. func StopFakeServer(responses chan []byte, done <-chan bool) {
  112. close(responses)
  113. <-done
  114. }
  115. func TestBrokerEquals(t *testing.T) {
  116. var b1, b2 *Broker
  117. b1 = nil
  118. b2 = nil
  119. if !b1.Equals(b2) {
  120. t.Error("Two nil brokers didn't compare equal.")
  121. }
  122. b1 = NewBroker("abc", 123)
  123. if b1.Equals(b2) {
  124. t.Error("Non-nil and nil brokers compared equal.")
  125. }
  126. if b2.Equals(b1) {
  127. t.Error("Nil and non-nil brokers compared equal.")
  128. }
  129. b2 = NewBroker("abc", 1234)
  130. if b1.Equals(b2) || b2.Equals(b1) {
  131. t.Error("Brokers with different ports compared equal.")
  132. }
  133. b2 = NewBroker("abcd", 123)
  134. if b1.Equals(b2) || b2.Equals(b1) {
  135. t.Error("Brokers with different hosts compared equal.")
  136. }
  137. b2 = NewBroker("abc", 123)
  138. b2.id = -2
  139. if b1.Equals(b2) || b2.Equals(b1) {
  140. t.Error("Brokers with different ids compared equal.")
  141. }
  142. b2.id = -1
  143. if !b1.Equals(b2) || !b2.Equals(b1) {
  144. t.Error("Similar brokers did not compare equal.")
  145. }
  146. }
  147. func TestBrokerID(t *testing.T) {
  148. broker := NewBroker("abc", 123)
  149. if broker.ID() != -1 {
  150. t.Error("New broker didn't have an ID of -1.")
  151. }
  152. broker.id = 34
  153. if broker.ID() != 34 {
  154. t.Error("Manually setting broker ID did not take effect.")
  155. }
  156. }
  157. func TestBrokerConnectClose(t *testing.T) {
  158. responses := make(chan []byte)
  159. port, done, err := FakeKafkaServer(t, responses)
  160. if err != nil {
  161. t.Fatal(err)
  162. }
  163. defer StopFakeServer(responses, done)
  164. broker := NewBroker("localhost", port)
  165. err = broker.Connect()
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. err = broker.Close()
  170. if err != nil {
  171. t.Error(err)
  172. }
  173. }