broker_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package protocol
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "net"
  6. "strconv"
  7. "testing"
  8. )
  9. // FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
  10. // It takes a testing.T as provided by the test framework, a set of responses, and a done channel.
  11. // It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
  12. // from that port and returns each provided response in order (if a response is nil, nothing is sent).
  13. // It returns the port on which it is listening or an error (if an error is returned, the goroutine is not spawned,
  14. // if an error occurs *in* the goroutine it is simply logged to the testing.T and the goroutine exits).
  15. // When the goroutine finishes, it closes the done channel. The using test must read from the done channel as its
  16. // last step to ensure the number of requests and provided responses lined up correctly.
  17. //
  18. // When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
  19. // waiting for a response, it automatically panics.
  20. //
  21. // It is not necessary to prefix message length to your response bytes, the server does that automatically as a convenience.
  22. func FakeKafkaServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, error) {
  23. ln, err := net.Listen("tcp", "localhost:0")
  24. if err != nil {
  25. return 0, err
  26. }
  27. _, portStr, err := net.SplitHostPort(ln.Addr().String())
  28. if err != nil {
  29. return 0, err
  30. }
  31. tmp, err := strconv.ParseInt(portStr, 10, 32)
  32. if err != nil {
  33. return 0, err
  34. }
  35. port := int32(tmp)
  36. go func() {
  37. defer close(done)
  38. conn, err := ln.Accept()
  39. if err != nil {
  40. t.Error(err)
  41. conn.Close()
  42. ln.Close()
  43. return
  44. }
  45. for _, response := range responses {
  46. header := make([]byte, 4)
  47. _, err := io.ReadFull(conn, header)
  48. if err != nil {
  49. t.Error(err)
  50. conn.Close()
  51. ln.Close()
  52. return
  53. }
  54. body := make([]byte, binary.BigEndian.Uint32(header))
  55. _, err = io.ReadFull(conn, body)
  56. if err != nil {
  57. t.Error(err)
  58. conn.Close()
  59. ln.Close()
  60. return
  61. }
  62. if response == nil {
  63. continue
  64. }
  65. binary.BigEndian.PutUint32(header, uint32(len(response)))
  66. _, err = conn.Write(header)
  67. if err != nil {
  68. t.Error(err)
  69. conn.Close()
  70. ln.Close()
  71. return
  72. }
  73. _, err = conn.Write(response)
  74. if err != nil {
  75. t.Error(err)
  76. conn.Close()
  77. ln.Close()
  78. return
  79. }
  80. }
  81. err = conn.Close()
  82. if err != nil {
  83. t.Error(err)
  84. ln.Close()
  85. return
  86. }
  87. err = ln.Close()
  88. if err != nil {
  89. t.Error(err)
  90. return
  91. }
  92. }()
  93. return port, nil
  94. }
  95. func TestBrokerEquals(t *testing.T) {
  96. var b1, b2 *Broker
  97. b1 = nil
  98. b2 = nil
  99. if !b1.Equals(b2) {
  100. t.Error("Two nil brokers didn't compare equal.")
  101. }
  102. b1 = NewBroker("abc", 123)
  103. if b1.Equals(b2) {
  104. t.Error("Non-nil and nil brokers compared equal.")
  105. }
  106. if b2.Equals(b1) {
  107. t.Error("Nil and non-nil brokers compared equal.")
  108. }
  109. b2 = NewBroker("abc", 1234)
  110. if b1.Equals(b2) || b2.Equals(b1) {
  111. t.Error("Brokers with different ports compared equal.")
  112. }
  113. b2 = NewBroker("abcd", 123)
  114. if b1.Equals(b2) || b2.Equals(b1) {
  115. t.Error("Brokers with different hosts compared equal.")
  116. }
  117. b2 = NewBroker("abc", 123)
  118. b2.id = -2
  119. if b1.Equals(b2) || b2.Equals(b1) {
  120. t.Error("Brokers with different ids compared equal.")
  121. }
  122. b2.id = -1
  123. if !b1.Equals(b2) || !b2.Equals(b1) {
  124. t.Error("Similar brokers did not compare equal.")
  125. }
  126. }
  127. func TestBrokerID(t *testing.T) {
  128. broker := NewBroker("abc", 123)
  129. if broker.ID() != -1 {
  130. t.Error("New broker didn't have an ID of -1.")
  131. }
  132. broker.id = 34
  133. if broker.ID() != 34 {
  134. t.Error("Manually setting broker ID did not take effect.")
  135. }
  136. }
  137. func TestBrokerConnectClose(t *testing.T) {
  138. done := make(chan bool)
  139. port, err := FakeKafkaServer(t, [][]byte{}, done)
  140. if err != nil {
  141. t.Error(err)
  142. return
  143. }
  144. broker := NewBroker("localhost", port)
  145. err = broker.Connect()
  146. if err != nil {
  147. t.Error(err)
  148. return
  149. }
  150. err = broker.Close()
  151. if err != nil {
  152. t.Error(err)
  153. return
  154. }
  155. <-done
  156. }