mockbroker_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package sarama
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "reflect"
  9. "strconv"
  10. "sync"
  11. "testing"
  12. "time"
  13. "github.com/davecgh/go-spew/spew"
  14. )
  15. const (
  16. expectationTimeout = 500 * time.Millisecond
  17. )
  18. type requestHandlerFunc func(req *request) (res encoder)
  19. // mockBroker is a mock Kafka broker. It consists of a TCP server on a
  20. // kernel-selected localhost port that can accept many connections. It reads
  21. // Kafka requests from that connection and passes them to the user specified
  22. // handler function (see SetHandler) that generates respective responses. If
  23. // the handler has not been explicitly specified then the broker returns
  24. // responses set by the Returns function in the exact order they were provided.
  25. // (if a response has a len of 0, nothing is sent, and the client request will
  26. // timeout in this case).
  27. //
  28. // When running tests with one of these, it is strongly recommended to specify
  29. // a timeout to `go test` so that if the broker hangs waiting for a response,
  30. // the test panics.
  31. //
  32. // It is not necessary to prefix message length or correlation ID to your
  33. // response bytes, the server does that automatically as a convenience.
  34. type mockBroker struct {
  35. brokerID int32
  36. port int32
  37. closing chan none
  38. stopper chan none
  39. expectations chan encoder
  40. listener net.Listener
  41. t *testing.T
  42. latency time.Duration
  43. handler requestHandlerFunc
  44. history []RequestResponse
  45. lock sync.Mutex
  46. }
  47. type RequestResponse struct {
  48. Request requestBody
  49. Response encoder
  50. }
  51. func (b *mockBroker) SetLatency(latency time.Duration) {
  52. b.latency = latency
  53. }
  54. // SetHandler sets the specified function as the request handler. Whenever
  55. // a mock broker reads a request from the wire it passes the request to the
  56. // function and sends back whatever the handler function returns.
  57. func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
  58. b.lock.Lock()
  59. b.handler = handler
  60. b.lock.Unlock()
  61. }
  62. func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
  63. b.SetHandler(func(req *request) (res encoder) {
  64. reqTypeName := reflect.TypeOf(req.body).Elem().Name()
  65. mockResponse := handlerMap[reqTypeName]
  66. if mockResponse == nil {
  67. return nil
  68. }
  69. return mockResponse.For(req.body)
  70. })
  71. }
  72. func (b *mockBroker) BrokerID() int32 {
  73. return b.brokerID
  74. }
  75. func (b *mockBroker) History() []RequestResponse {
  76. b.lock.Lock()
  77. history := make([]RequestResponse, len(b.history))
  78. copy(history, b.history)
  79. b.lock.Unlock()
  80. return history
  81. }
  82. func (b *mockBroker) Port() int32 {
  83. return b.port
  84. }
  85. func (b *mockBroker) Addr() string {
  86. return b.listener.Addr().String()
  87. }
  88. func (b *mockBroker) Close() {
  89. close(b.expectations)
  90. if len(b.expectations) > 0 {
  91. buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
  92. for e := range b.expectations {
  93. _, _ = buf.WriteString(spew.Sdump(e))
  94. }
  95. b.t.Error(buf.String())
  96. }
  97. close(b.closing)
  98. <-b.stopper
  99. }
  100. func (b *mockBroker) serverLoop() {
  101. defer close(b.stopper)
  102. var err error
  103. var conn net.Conn
  104. go func() {
  105. <-b.closing
  106. safeClose(b.t, b.listener)
  107. }()
  108. wg := &sync.WaitGroup{}
  109. i := 0
  110. for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
  111. wg.Add(1)
  112. go b.handleRequests(conn, i, wg)
  113. i++
  114. }
  115. wg.Wait()
  116. Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
  117. }
  118. func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
  119. defer wg.Done()
  120. defer func() {
  121. _ = conn.Close()
  122. }()
  123. Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
  124. var err error
  125. abort := make(chan none)
  126. defer close(abort)
  127. go func() {
  128. select {
  129. case <-b.closing:
  130. _ = conn.Close()
  131. case <-abort:
  132. }
  133. }()
  134. resHeader := make([]byte, 8)
  135. for {
  136. req, err := decodeRequest(conn)
  137. if err != nil {
  138. Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
  139. b.serverError(err)
  140. break
  141. }
  142. if b.latency > 0 {
  143. time.Sleep(b.latency)
  144. }
  145. b.lock.Lock()
  146. res := b.handler(req)
  147. b.history = append(b.history, RequestResponse{req.body, res})
  148. b.lock.Unlock()
  149. if res == nil {
  150. Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
  151. continue
  152. }
  153. Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
  154. encodedRes, err := encode(res)
  155. if err != nil {
  156. b.serverError(err)
  157. break
  158. }
  159. if len(encodedRes) == 0 {
  160. continue
  161. }
  162. binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
  163. binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
  164. if _, err = conn.Write(resHeader); err != nil {
  165. b.serverError(err)
  166. break
  167. }
  168. if _, err = conn.Write(encodedRes); err != nil {
  169. b.serverError(err)
  170. break
  171. }
  172. }
  173. Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
  174. }
  175. func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) {
  176. select {
  177. case res, ok := <-b.expectations:
  178. if !ok {
  179. return nil
  180. }
  181. return res
  182. case <-time.After(expectationTimeout):
  183. return nil
  184. }
  185. }
  186. func (b *mockBroker) serverError(err error) {
  187. isConnectionClosedError := false
  188. if _, ok := err.(*net.OpError); ok {
  189. isConnectionClosedError = true
  190. } else if err == io.EOF {
  191. isConnectionClosedError = true
  192. } else if err.Error() == "use of closed network connection" {
  193. isConnectionClosedError = true
  194. }
  195. if isConnectionClosedError {
  196. return
  197. }
  198. b.t.Errorf(err.Error())
  199. }
  200. // newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the
  201. // test framework and a channel of responses to use. If an error occurs it is
  202. // simply logged to the *testing.T and the broker exits.
  203. func newMockBroker(t *testing.T, brokerID int32) *mockBroker {
  204. return newMockBrokerAddr(t, brokerID, "localhost:0")
  205. }
  206. // newMockBrokerAddr behaves like newMockBroker but listens on the address you give
  207. // it rather than just some ephemeral port.
  208. func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
  209. var err error
  210. broker := &mockBroker{
  211. closing: make(chan none),
  212. stopper: make(chan none),
  213. t: t,
  214. brokerID: brokerID,
  215. expectations: make(chan encoder, 512),
  216. }
  217. broker.handler = broker.defaultRequestHandler
  218. broker.listener, err = net.Listen("tcp", addr)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
  223. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  224. if err != nil {
  225. t.Fatal(err)
  226. }
  227. tmp, err := strconv.ParseInt(portStr, 10, 32)
  228. if err != nil {
  229. t.Fatal(err)
  230. }
  231. broker.port = int32(tmp)
  232. go broker.serverLoop()
  233. return broker
  234. }
  235. func (b *mockBroker) Returns(e encoder) {
  236. b.expectations <- e
  237. }