mockbroker.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "net"
  7. "strconv"
  8. )
  9. // TestState is a generic interface for a test state, implemented e.g. by testing.T
  10. type TestState interface {
  11. Error(args ...interface{})
  12. Fatal(args ...interface{})
  13. Errorf(format string, args ...interface{})
  14. Fatalf(format string, args ...interface{})
  15. }
  16. // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
  17. // accepts a single connection. It reads Kafka requests from that connection and returns each response
  18. // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
  19. // the server sleeps for 250ms instead of reading a request).
  20. //
  21. // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
  22. // waiting for a response, the test panics.
  23. //
  24. // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
  25. // automatically as a convenience.
  26. type MockBroker struct {
  27. brokerID int32
  28. port int32
  29. stopper chan bool
  30. expectations chan encoder
  31. listener net.Listener
  32. t TestState
  33. }
  34. func (b *MockBroker) BrokerID() int32 {
  35. return b.brokerID
  36. }
  37. func (b *MockBroker) Port() int32 {
  38. return b.port
  39. }
  40. func (b *MockBroker) Addr() string {
  41. return b.listener.Addr().String()
  42. }
  43. func (b *MockBroker) Close() {
  44. if len(b.expectations) > 0 {
  45. b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
  46. }
  47. close(b.expectations)
  48. <-b.stopper
  49. }
  50. func (b *MockBroker) serverLoop() (ok bool) {
  51. var (
  52. err error
  53. conn net.Conn
  54. )
  55. defer close(b.stopper)
  56. if conn, err = b.listener.Accept(); err != nil {
  57. return b.serverError(err, conn)
  58. }
  59. reqHeader := make([]byte, 4)
  60. resHeader := make([]byte, 8)
  61. for expectation := range b.expectations {
  62. _, err = io.ReadFull(conn, reqHeader)
  63. if err != nil {
  64. return b.serverError(err, conn)
  65. }
  66. body := make([]byte, binary.BigEndian.Uint32(reqHeader))
  67. if len(body) < 10 {
  68. return b.serverError(errors.New("Kafka request too short."), conn)
  69. }
  70. if _, err = io.ReadFull(conn, body); err != nil {
  71. return b.serverError(err, conn)
  72. }
  73. response, err := encode(expectation)
  74. if err != nil {
  75. return false
  76. }
  77. if len(response) == 0 {
  78. continue
  79. }
  80. binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
  81. binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
  82. if _, err = conn.Write(resHeader); err != nil {
  83. return b.serverError(err, conn)
  84. }
  85. if _, err = conn.Write(response); err != nil {
  86. return b.serverError(err, conn)
  87. }
  88. }
  89. if err = conn.Close(); err != nil {
  90. return b.serverError(err, nil)
  91. }
  92. if err = b.listener.Close(); err != nil {
  93. b.t.Error(err)
  94. return false
  95. }
  96. return true
  97. }
  98. func (b *MockBroker) serverError(err error, conn net.Conn) bool {
  99. b.t.Error(err)
  100. if conn != nil {
  101. if err := conn.Close(); err != nil {
  102. b.t.Error(err)
  103. }
  104. }
  105. if err := b.listener.Close(); err != nil {
  106. b.t.Error(err)
  107. }
  108. return false
  109. }
  110. // NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
  111. // test framework and a channel of responses to use. If an error occurs it is
  112. // simply logged to the TestState and the broker exits.
  113. func NewMockBroker(t TestState, brokerID int32) *MockBroker {
  114. var err error
  115. broker := &MockBroker{
  116. stopper: make(chan bool),
  117. t: t,
  118. brokerID: brokerID,
  119. expectations: make(chan encoder, 512),
  120. }
  121. broker.listener, err = net.Listen("tcp", "localhost:0")
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. tmp, err := strconv.ParseInt(portStr, 10, 32)
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. broker.port = int32(tmp)
  134. go broker.serverLoop()
  135. return broker
  136. }
  137. func (b *MockBroker) Returns(e encoder) {
  138. b.expectations <- e
  139. }