async_producer.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // AsyncProducer implements sarama's Producer interface for testing purposes.
  7. // Before you can send messages to it's Input channel, you have to set expectations
  8. // so it knows how to handle the input; it returns an error if the number of messages
  9. // received is bigger then the number of expectations set. You can also set a
  10. // function in each expectation so that the message value is checked by this function
  11. // and an error is returned if the match fails.
  12. type AsyncProducer struct {
  13. l sync.Mutex
  14. t ErrorReporter
  15. expectations []*producerExpectation
  16. closed chan struct{}
  17. input chan *sarama.ProducerMessage
  18. successes chan *sarama.ProducerMessage
  19. errors chan *sarama.ProducerError
  20. lastOffset int64
  21. }
  22. // NewAsyncProducer instantiates a new Producer mock. The t argument should
  23. // be the *testing.T instance of your test method. An error will be written to it if
  24. // an expectation is violated. The config argument is used to determine whether it
  25. // should ack successes on the Successes channel.
  26. func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
  27. if config == nil {
  28. config = sarama.NewConfig()
  29. }
  30. mp := &AsyncProducer{
  31. t: t,
  32. closed: make(chan struct{}),
  33. expectations: make([]*producerExpectation, 0),
  34. input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
  35. successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
  36. errors: make(chan *sarama.ProducerError, config.ChannelBufferSize),
  37. }
  38. go func() {
  39. defer func() {
  40. close(mp.successes)
  41. close(mp.errors)
  42. close(mp.closed)
  43. }()
  44. for msg := range mp.input {
  45. mp.l.Lock()
  46. if mp.expectations == nil || len(mp.expectations) == 0 {
  47. mp.expectations = nil
  48. mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
  49. } else {
  50. expectation := mp.expectations[0]
  51. mp.expectations = mp.expectations[1:]
  52. if expectation.CheckFunction != nil {
  53. if val, err := msg.Value.Encode(); err != nil {
  54. mp.t.Errorf("Input message encoding failed: %s", err.Error())
  55. mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
  56. } else {
  57. err = expectation.CheckFunction(val)
  58. if err != nil {
  59. mp.t.Errorf("Check function returned an error: %s", err.Error())
  60. mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
  61. }
  62. }
  63. }
  64. if expectation.Result == errProduceSuccess {
  65. mp.lastOffset++
  66. if config.Producer.Return.Successes {
  67. msg.Offset = mp.lastOffset
  68. mp.successes <- msg
  69. }
  70. } else {
  71. if config.Producer.Return.Errors {
  72. mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
  73. }
  74. }
  75. }
  76. mp.l.Unlock()
  77. }
  78. mp.l.Lock()
  79. if len(mp.expectations) > 0 {
  80. mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
  81. }
  82. mp.l.Unlock()
  83. }()
  84. return mp
  85. }
  86. ////////////////////////////////////////////////
  87. // Implement Producer interface
  88. ////////////////////////////////////////////////
  89. // AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation.
  90. // By closing a mock producer, you also tell it that no more input will be provided, so it will
  91. // write an error to the test state if there's any remaining expectations.
  92. func (mp *AsyncProducer) AsyncClose() {
  93. close(mp.input)
  94. }
  95. // Close corresponds with the Close method of sarama's Producer implementation.
  96. // By closing a mock producer, you also tell it that no more input will be provided, so it will
  97. // write an error to the test state if there's any remaining expectations.
  98. func (mp *AsyncProducer) Close() error {
  99. mp.AsyncClose()
  100. <-mp.closed
  101. return nil
  102. }
  103. // Input corresponds with the Input method of sarama's Producer implementation.
  104. // You have to set expectations on the mock producer before writing messages to the Input
  105. // channel, so it knows how to handle them. If there is no more remaining expectations and
  106. // a messages is written to the Input channel, the mock producer will write an error to the test
  107. // state object.
  108. func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage {
  109. return mp.input
  110. }
  111. // Successes corresponds with the Successes method of sarama's Producer implementation.
  112. func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {
  113. return mp.successes
  114. }
  115. // Errors corresponds with the Errors method of sarama's Producer implementation.
  116. func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
  117. return mp.errors
  118. }
  119. ////////////////////////////////////////////////
  120. // Setting expectations
  121. ////////////////////////////////////////////////
  122. // ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
  123. // will be provided on the input channel. The mock producer will call the given function to check
  124. // the message value. If an error is returned it will be made available on the Errors channel
  125. // otherwise the mock will handle the message as if it produced successfully, i.e. it will make
  126. // it available on the Successes channel if the Producer.Return.Successes setting is set to true.
  127. func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) {
  128. mp.l.Lock()
  129. defer mp.l.Unlock()
  130. mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
  131. }
  132. // ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message
  133. // will be provided on the input channel. The mock producer will first call the given function to
  134. // check the message value. If an error is returned it will be made available on the Errors channel
  135. // otherwise the mock will handle the message as if it failed to produce successfully. This means
  136. // it will make a ProducerError available on the Errors channel.
  137. func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) {
  138. mp.l.Lock()
  139. defer mp.l.Unlock()
  140. mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
  141. }
  142. // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
  143. // on the input channel. The mock producer will handle the message as if it is produced successfully,
  144. // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
  145. // is set to true.
  146. func (mp *AsyncProducer) ExpectInputAndSucceed() {
  147. mp.ExpectInputWithCheckerFunctionAndSucceed(nil)
  148. }
  149. // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
  150. // on the input channel. The mock producer will handle the message as if it failed to produce
  151. // successfully. This means it will make a ProducerError available on the Errors channel.
  152. func (mp *AsyncProducer) ExpectInputAndFail(err error) {
  153. mp.ExpectInputWithCheckerFunctionAndFail(nil, err)
  154. }