mockbroker_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. "testing"
  9. "time"
  10. )
  11. // mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  12. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  13. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  14. // the server sleeps for 250ms instead of reading a request).
  15. //
  16. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  17. // waiting for a response, the test panics.
  18. //
  19. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  20. // automatically as a convenience.
  21. type mockBroker struct {
  22. brokerID int32
  23. port int32
  24. stopper chan bool
  25. expectations chan encoder
  26. listener net.Listener
  27. t *testing.T
  28. latency time.Duration
  29. }
  30. func (b *mockBroker) SetLatency(latency time.Duration) {
  31. b.latency = latency
  32. }
  33. func (b *mockBroker) BrokerID() int32 {
  34. return b.brokerID
  35. }
  36. func (b *mockBroker) Port() int32 {
  37. return b.port
  38. }
  39. func (b *mockBroker) Addr() string {
  40. return b.listener.Addr().String()
  41. }
  42. func (b *mockBroker) Close() {
  43. if len(b.expectations) > 0 {
  44. b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
  45. }
  46. close(b.expectations)
  47. <-b.stopper
  48. }
  49. func (b *mockBroker) serverLoop() (ok bool) {
  50. var (
  51. err error
  52. conn net.Conn
  53. )
  54. defer close(b.stopper)
  55. if conn, err = b.listener.Accept(); err != nil {
  56. return b.serverError(err, conn)
  57. }
  58. reqHeader := make([]byte, 4)
  59. resHeader := make([]byte, 8)
  60. for expectation := range b.expectations {
  61. _, err = io.ReadFull(conn, reqHeader)
  62. if err != nil {
  63. return b.serverError(err, conn)
  64. }
  65. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  66. if len(body) < 10 {
  67. return b.serverError(errors.New("Kafka request too short."), conn)
  68. }
  69. if _, err = io.ReadFull(conn, body); err != nil {
  70. return b.serverError(err, conn)
  71. }
  72. if b.latency > 0 {
  73. time.Sleep(b.latency)
  74. }
  75. response, err := encode(expectation)
  76. if err != nil {
  77. return false
  78. }
  79. if len(response) == 0 {
  80. continue
  81. }
  82. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  83. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  84. if _, err = conn.Write(resHeader); err != nil {
  85. return b.serverError(err, conn)
  86. }
  87. if _, err = conn.Write(response); err != nil {
  88. return b.serverError(err, conn)
  89. }
  90. }
  91. if err = conn.Close(); err != nil {
  92. return b.serverError(err, nil)
  93. }
  94. if err = b.listener.Close(); err != nil {
  95. b.t.Error(err)
  96. return false
  97. }
  98. return true
  99. }
  100. func (b *mockBroker) serverError(err error, conn net.Conn) bool {
  101. b.t.Error(err)
  102. if conn != nil {
  103. if err := conn.Close(); err != nil {
  104. b.t.Error(err)
  105. }
  106. }
  107. if err := b.listener.Close(); err != nil {
  108. b.t.Error(err)
  109. }
  110. return false
  111. }
  112. // newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the
  113. // test framework and a channel of responses to use. If an error occurs it is
  114. // simply logged to the *testing.T and the broker exits.
  115. func newMockBroker(t *testing.T, brokerID int32) *mockBroker {
  116. return newMockBrokerAddr(t, brokerID, "localhost:0")
  117. }
  118. // newMockBrokerAddr behaves like newMockBroker but listens on the address you give
  119. // it rather than just some ephemeral port.
  120. func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
  121. var err error
  122. broker := &mockBroker{
  123. stopper: make(chan bool),
  124. t: t,
  125. brokerID: brokerID,
  126. expectations: make(chan encoder, 512),
  127. }
  128. broker.listener, err = net.Listen("tcp", addr)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. tmp, err := strconv.ParseInt(portStr, 10, 32)
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. broker.port = int32(tmp)
  141. go broker.serverLoop()
  142. return broker
  143. }
  144. func (b *mockBroker) Returns(e encoder) {
  145. b.expectations <- e
  146. }