mockbroker.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. "testing"
  9. )
  10. // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  11. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  12. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  13. // the server sleeps for 250ms instead of reading a request).
  14. //
  15. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  16. // waiting for a response, the test panics.
  17. //
  18. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  19. // automatically as a convenience.
  20. type MockBroker struct {
  21. brokerID int
  22. port int32
  23. stopper chan bool
  24. expectations chan encoder
  25. listener net.Listener
  26. t *testing.T
  27. expecting encoder
  28. }
  29. func (b *MockBroker) BrokerID() int {
  30. return b.brokerID
  31. }
  32. func (b *MockBroker) Port() int32 {
  33. return b.port
  34. }
  35. func (b *MockBroker) Addr() string {
  36. return b.listener.Addr().String()
  37. }
  38. type rawExpectation []byte
  39. func (r rawExpectation) ResponseBytes() []byte {
  40. return r
  41. }
  42. func (b *MockBroker) Close() {
  43. if b.expecting != nil {
  44. b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %#v", b.BrokerID(), b.expecting)
  45. }
  46. close(b.expectations)
  47. <-b.stopper
  48. }
  49. func (b *MockBroker) serverLoop() (ok bool) {
  50. var (
  51. err error
  52. conn net.Conn
  53. )
  54. defer close(b.stopper)
  55. if conn, err = b.listener.Accept(); err != nil {
  56. return b.serverError(err, conn)
  57. }
  58. reqHeader := make([]byte, 4)
  59. resHeader := make([]byte, 8)
  60. for expectation := range b.expectations {
  61. b.expecting = expectation
  62. _, err = io.ReadFull(conn, reqHeader)
  63. b.expecting = nil
  64. if err != nil {
  65. return b.serverError(err, conn)
  66. }
  67. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  68. if len(body) < 10 {
  69. return b.serverError(errors.New("Kafka request too short."), conn)
  70. }
  71. if _, err = io.ReadFull(conn, body); err != nil {
  72. return b.serverError(err, conn)
  73. }
  74. response, err := encode(expectation)
  75. if err != nil {
  76. return false
  77. }
  78. if len(response) == 0 {
  79. continue
  80. }
  81. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  82. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  83. if _, err = conn.Write(resHeader); err != nil {
  84. return b.serverError(err, conn)
  85. }
  86. if _, err = conn.Write(response); err != nil {
  87. return b.serverError(err, conn)
  88. }
  89. }
  90. if err = conn.Close(); err != nil {
  91. return b.serverError(err, nil)
  92. }
  93. if err = b.listener.Close(); err != nil {
  94. b.t.Error(err)
  95. return false
  96. }
  97. return true
  98. }
  99. func (b *MockBroker) serverError(err error, conn net.Conn) bool {
  100. b.t.Error(err)
  101. if conn != nil {
  102. conn.Close()
  103. }
  104. b.listener.Close()
  105. return false
  106. }
  107. // New launches a fake Kafka broker. It takes a testing.T as provided by the
  108. // test framework and a channel of responses to use. If an error occurs it is
  109. // simply logged to the testing.T and the broker exits.
  110. func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
  111. var err error
  112. broker := &MockBroker{
  113. stopper: make(chan bool),
  114. t: t,
  115. brokerID: brokerID,
  116. expectations: make(chan encoder, 512),
  117. }
  118. broker.listener, err = net.Listen("tcp", "localhost:0")
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. tmp, err := strconv.ParseInt(portStr, 10, 32)
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. broker.port = int32(tmp)
  131. go broker.serverLoop()
  132. return broker
  133. }
  134. func (b *MockBroker) Returns(e encoder) {
  135. b.expectations <- e
  136. }