mockbroker.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. )
  9. // TestState is a generic interface for a test state, implemented e.g. by testing.T
  10. type TestState interface {
  11. Error(args ...interface{})
  12. Fatal(args ...interface{})
  13. Errorf(format string, args ...interface{})
  14. Fatalf(format string, args ...interface{})
  15. }
  16. // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  17. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  18. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  19. // the server sleeps for 250ms instead of reading a request).
  20. //
  21. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  22. // waiting for a response, the test panics.
  23. //
  24. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  25. // automatically as a convenience.
  26. type MockBroker struct {
  27. brokerID int32
  28. port int32
  29. stopper chan bool
  30. expectations chan encoder
  31. listener net.Listener
  32. t TestState
  33. }
  34. func (b *MockBroker) BrokerID() int32 {
  35. return b.brokerID
  36. }
  37. func (b *MockBroker) Port() int32 {
  38. return b.port
  39. }
  40. func (b *MockBroker) Addr() string {
  41. return b.listener.Addr().String()
  42. }
  43. func (b *MockBroker) Close() {
  44. if len(b.expectations) > 0 {
  45. b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
  46. }
  47. close(b.expectations)
  48. <-b.stopper
  49. }
  50. func (b *MockBroker) serverLoop() (ok bool) {
  51. var (
  52. err error
  53. conn net.Conn
  54. )
  55. defer close(b.stopper)
  56. if conn, err = b.listener.Accept(); err != nil {
  57. return b.serverError(err, conn)
  58. }
  59. reqHeader := make([]byte, 4)
  60. resHeader := make([]byte, 8)
  61. for expectation := range b.expectations {
  62. _, err = io.ReadFull(conn, reqHeader)
  63. if err != nil {
  64. return b.serverError(err, conn)
  65. }
  66. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  67. if len(body) < 10 {
  68. return b.serverError(errors.New("Kafka request too short."), conn)
  69. }
  70. if _, err = io.ReadFull(conn, body); err != nil {
  71. return b.serverError(err, conn)
  72. }
  73. response, err := encode(expectation)
  74. if err != nil {
  75. return false
  76. }
  77. if len(response) == 0 {
  78. continue
  79. }
  80. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  81. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  82. if _, err = conn.Write(resHeader); err != nil {
  83. return b.serverError(err, conn)
  84. }
  85. if _, err = conn.Write(response); err != nil {
  86. return b.serverError(err, conn)
  87. }
  88. }
  89. if err = conn.Close(); err != nil {
  90. return b.serverError(err, nil)
  91. }
  92. if err = b.listener.Close(); err != nil {
  93. b.t.Error(err)
  94. return false
  95. }
  96. return true
  97. }
  98. func (b *MockBroker) serverError(err error, conn net.Conn) bool {
  99. b.t.Error(err)
  100. if conn != nil {
  101. if err := conn.Close(); err != nil {
  102. b.t.Error(err)
  103. }
  104. }
  105. if err := b.listener.Close(); err != nil {
  106. b.t.Error(err)
  107. }
  108. return false
  109. }
  110. // NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
  111. // test framework and a channel of responses to use. If an error occurs it is
  112. // simply logged to the TestState and the broker exits.
  113. func NewMockBroker(t TestState, brokerID int32) *MockBroker {
  114. return NewMockBrokerAddr(t, brokerID, "localhost:0")
  115. }
  116. // NewMockBrokerAddr behaves like NewMockBroker but listens on the address you give
  117. // it rather than just some ephemeral port.
  118. func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
  119. var err error
  120. broker := &MockBroker{
  121. stopper: make(chan bool),
  122. t: t,
  123. brokerID: brokerID,
  124. expectations: make(chan encoder, 512),
  125. }
  126. broker.listener, err = net.Listen("tcp", addr)
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  131. if err != nil {
  132. t.Fatal(err)
  133. }
  134. tmp, err := strconv.ParseInt(portStr, 10, 32)
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. broker.port = int32(tmp)
  139. go broker.serverLoop()
  140. return broker
  141. }
  142. func (b *MockBroker) Returns(e encoder) {
  143. b.expectations <- e
  144. }