mockbroker.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. )
  9. // TestState is a generic interface for a test state, implemented e.g. by testing.T
  10. type TestState interface {
  11. Error(args ...interface{})
  12. Fatal(args ...interface{})
  13. Fatalf(format string, args ...interface{})
  14. }
  15. // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  16. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  17. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  18. // the server sleeps for 250ms instead of reading a request).
  19. //
  20. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  21. // waiting for a response, the test panics.
  22. //
  23. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  24. // automatically as a convenience.
  25. type MockBroker struct {
  26. brokerID int
  27. port int32
  28. stopper chan bool
  29. expectations chan encoder
  30. listener net.Listener
  31. t TestState
  32. expecting encoder
  33. }
  34. func (b *MockBroker) BrokerID() int {
  35. return b.brokerID
  36. }
  37. func (b *MockBroker) Port() int32 {
  38. return b.port
  39. }
  40. func (b *MockBroker) Addr() string {
  41. return b.listener.Addr().String()
  42. }
  43. type rawExpectation []byte
  44. func (r rawExpectation) ResponseBytes() []byte {
  45. return r
  46. }
  47. func (b *MockBroker) Close() {
  48. if b.expecting != nil {
  49. b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %#v", b.BrokerID(), b.expecting)
  50. }
  51. close(b.expectations)
  52. <-b.stopper
  53. }
  54. func (b *MockBroker) serverLoop() (ok bool) {
  55. var (
  56. err error
  57. conn net.Conn
  58. )
  59. defer close(b.stopper)
  60. if conn, err = b.listener.Accept(); err != nil {
  61. return b.serverError(err, conn)
  62. }
  63. reqHeader := make([]byte, 4)
  64. resHeader := make([]byte, 8)
  65. for expectation := range b.expectations {
  66. b.expecting = expectation
  67. _, err = io.ReadFull(conn, reqHeader)
  68. b.expecting = nil
  69. if err != nil {
  70. return b.serverError(err, conn)
  71. }
  72. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  73. if len(body) < 10 {
  74. return b.serverError(errors.New("Kafka request too short."), conn)
  75. }
  76. if _, err = io.ReadFull(conn, body); err != nil {
  77. return b.serverError(err, conn)
  78. }
  79. response, err := encode(expectation)
  80. if err != nil {
  81. return false
  82. }
  83. if len(response) == 0 {
  84. continue
  85. }
  86. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  87. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  88. if _, err = conn.Write(resHeader); err != nil {
  89. return b.serverError(err, conn)
  90. }
  91. if _, err = conn.Write(response); err != nil {
  92. return b.serverError(err, conn)
  93. }
  94. }
  95. if err = conn.Close(); err != nil {
  96. return b.serverError(err, nil)
  97. }
  98. if err = b.listener.Close(); err != nil {
  99. b.t.Error(err)
  100. return false
  101. }
  102. return true
  103. }
  104. func (b *MockBroker) serverError(err error, conn net.Conn) bool {
  105. b.t.Error(err)
  106. if conn != nil {
  107. conn.Close()
  108. }
  109. b.listener.Close()
  110. return false
  111. }
  112. // NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
  113. // test framework and a channel of responses to use. If an error occurs it is
  114. // simply logged to the TestState and the broker exits.
  115. func NewMockBroker(t TestState, brokerID int) *MockBroker {
  116. var err error
  117. broker := &MockBroker{
  118. stopper: make(chan bool),
  119. t: t,
  120. brokerID: brokerID,
  121. expectations: make(chan encoder, 512),
  122. }
  123. broker.listener, err = net.Listen("tcp", "localhost:0")
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  128. if err != nil {
  129. t.Fatal(err)
  130. }
  131. tmp, err := strconv.ParseInt(portStr, 10, 32)
  132. if err != nil {
  133. t.Fatal(err)
  134. }
  135. broker.port = int32(tmp)
  136. go broker.serverLoop()
  137. return broker
  138. }
  139. func (b *MockBroker) Returns(e encoder) {
  140. b.expectations <- e
  141. }