mockbroker.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. "time"
  9. )
  10. // TestState is a generic interface for a test state, implemented e.g. by testing.T
  11. type TestState interface {
  12. Error(args ...interface{})
  13. Fatal(args ...interface{})
  14. Errorf(format string, args ...interface{})
  15. Fatalf(format string, args ...interface{})
  16. }
  17. // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  18. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  19. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  20. // the server sleeps for 250ms instead of reading a request).
  21. //
  22. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  23. // waiting for a response, the test panics.
  24. //
  25. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  26. // automatically as a convenience.
  27. type MockBroker struct {
  28. brokerID int32
  29. port int32
  30. stopper chan bool
  31. expectations chan encoder
  32. listener net.Listener
  33. t TestState
  34. latency time.Duration
  35. }
  36. func (b *MockBroker) SetLatency(latency time.Duration) {
  37. b.latency = latency
  38. }
  39. func (b *MockBroker) BrokerID() int32 {
  40. return b.brokerID
  41. }
  42. func (b *MockBroker) Port() int32 {
  43. return b.port
  44. }
  45. func (b *MockBroker) Addr() string {
  46. return b.listener.Addr().String()
  47. }
  48. func (b *MockBroker) Close() {
  49. if len(b.expectations) > 0 {
  50. b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
  51. }
  52. close(b.expectations)
  53. <-b.stopper
  54. }
  55. func (b *MockBroker) serverLoop() (ok bool) {
  56. var (
  57. err error
  58. conn net.Conn
  59. )
  60. defer close(b.stopper)
  61. if conn, err = b.listener.Accept(); err != nil {
  62. return b.serverError(err, conn)
  63. }
  64. reqHeader := make([]byte, 4)
  65. resHeader := make([]byte, 8)
  66. for expectation := range b.expectations {
  67. _, err = io.ReadFull(conn, reqHeader)
  68. if err != nil {
  69. return b.serverError(err, conn)
  70. }
  71. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  72. if len(body) < 10 {
  73. return b.serverError(errors.New("Kafka request too short."), conn)
  74. }
  75. if _, err = io.ReadFull(conn, body); err != nil {
  76. return b.serverError(err, conn)
  77. }
  78. if b.latency > 0 {
  79. time.Sleep(b.latency)
  80. }
  81. response, err := encode(expectation)
  82. if err != nil {
  83. return false
  84. }
  85. if len(response) == 0 {
  86. continue
  87. }
  88. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  89. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  90. if _, err = conn.Write(resHeader); err != nil {
  91. return b.serverError(err, conn)
  92. }
  93. if _, err = conn.Write(response); err != nil {
  94. return b.serverError(err, conn)
  95. }
  96. }
  97. if err = conn.Close(); err != nil {
  98. return b.serverError(err, nil)
  99. }
  100. if err = b.listener.Close(); err != nil {
  101. b.t.Error(err)
  102. return false
  103. }
  104. return true
  105. }
  106. func (b *MockBroker) serverError(err error, conn net.Conn) bool {
  107. b.t.Error(err)
  108. if conn != nil {
  109. if err := conn.Close(); err != nil {
  110. b.t.Error(err)
  111. }
  112. }
  113. if err := b.listener.Close(); err != nil {
  114. b.t.Error(err)
  115. }
  116. return false
  117. }
  118. // NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
  119. // test framework and a channel of responses to use. If an error occurs it is
  120. // simply logged to the TestState and the broker exits.
  121. func NewMockBroker(t TestState, brokerID int32) *MockBroker {
  122. return NewMockBrokerAddr(t, brokerID, "localhost:0")
  123. }
  124. // NewMockBrokerAddr behaves like NewMockBroker but listens on the address you give
  125. // it rather than just some ephemeral port.
  126. func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
  127. var err error
  128. broker := &MockBroker{
  129. stopper: make(chan bool),
  130. t: t,
  131. brokerID: brokerID,
  132. expectations: make(chan encoder, 512),
  133. }
  134. broker.listener, err = net.Listen("tcp", addr)
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. tmp, err := strconv.ParseInt(portStr, 10, 32)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. broker.port = int32(tmp)
  147. go broker.serverLoop()
  148. return broker
  149. }
  150. func (b *MockBroker) Returns(e encoder) {
  151. b.expectations <- e
  152. }