consumer.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // Consumer implements sarama's Consumer interface for testing purposes.
  7. // Before you can start consuming from this consumer, you have to register
  8. // topic/partitions using ExpectConsumePartition, and set expectations on them.
  9. type Consumer struct {
  10. l sync.Mutex
  11. t ErrorReporter
  12. config *sarama.Config
  13. partitionConsumers map[string]map[int32]*PartitionConsumer
  14. }
  15. // NewConsumer returns a new mock Consumer instance. The t argument should
  16. // be the *testing.T instance of your test method. An error will be written to it if
  17. // an expectation is violated. The config argument is currently unused and can be set to nil.
  18. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  19. if config == nil {
  20. config = sarama.NewConfig()
  21. }
  22. c := &Consumer{
  23. t: t,
  24. config: config,
  25. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  26. }
  27. return c
  28. }
  29. ///////////////////////////////////////////////////
  30. // Consumer interface implementation
  31. ///////////////////////////////////////////////////
  32. // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
  33. // Before you can start consuming a partition, you have to set expectations on it using
  34. // ExpectConsumePartition. You can only consume a partition once per consumer.
  35. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  36. c.l.Lock()
  37. defer c.l.Unlock()
  38. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  39. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  40. return nil, errOutOfExpectations
  41. }
  42. pc := c.partitionConsumers[topic][partition]
  43. if pc.consumed {
  44. return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
  45. }
  46. if pc.offset != AnyOffset && pc.offset != offset {
  47. c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
  48. }
  49. pc.consumed = true
  50. go pc.handleExpectations()
  51. return pc, nil
  52. }
  53. // Close implements the Close method from the sarama.Consumer interface. It will close
  54. // all registered PartitionConsumer instances.
  55. func (c *Consumer) Close() error {
  56. c.l.Lock()
  57. defer c.l.Unlock()
  58. for _, partitions := range c.partitionConsumers {
  59. for _, partitionConsumer := range partitions {
  60. _ = partitionConsumer.Close()
  61. }
  62. }
  63. return nil
  64. }
  65. ///////////////////////////////////////////////////
  66. // Expectation API
  67. ///////////////////////////////////////////////////
  68. // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
  69. // The registered PartitionConsumer will be returned, so you can set expectations
  70. // on it using method chanining. Once a topic/partition is registered, you are
  71. // expected to start consuming it using ConsumePartition. If that doesn't happen,
  72. // an error will be written to the error reporter once the mock consumer is closed. It will
  73. // also expect that the
  74. func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
  75. c.l.Lock()
  76. defer c.l.Unlock()
  77. if c.partitionConsumers[topic] == nil {
  78. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  79. }
  80. if c.partitionConsumers[topic][partition] == nil {
  81. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  82. t: c.t,
  83. topic: topic,
  84. partition: partition,
  85. offset: offset,
  86. expectations: make(chan *consumerExpectation, 1000),
  87. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  88. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  89. }
  90. }
  91. return c.partitionConsumers[topic][partition]
  92. }
  93. ///////////////////////////////////////////////////
  94. // PartitionConsumer mock type
  95. ///////////////////////////////////////////////////
  96. // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
  97. // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
  98. // registered first using the Consumer's ExpectConsumePartition method. Before consuming the
  99. // Errors and Messages channel, you should specify what values will be provided on these
  100. // channels using YieldMessage and YieldError.
  101. type PartitionConsumer struct {
  102. l sync.Mutex
  103. t ErrorReporter
  104. topic string
  105. partition int32
  106. offset int64
  107. expectations chan *consumerExpectation
  108. messages chan *sarama.ConsumerMessage
  109. errors chan *sarama.ConsumerError
  110. singleClose sync.Once
  111. consumed bool
  112. errorsShouldBeDrained bool
  113. messagesShouldBeDrained bool
  114. }
  115. func (pc *PartitionConsumer) handleExpectations() {
  116. pc.l.Lock()
  117. defer pc.l.Unlock()
  118. var offset int64
  119. for ex := range pc.expectations {
  120. if ex.Err != nil {
  121. pc.errors <- &sarama.ConsumerError{
  122. Topic: pc.topic,
  123. Partition: pc.partition,
  124. Err: ex.Err,
  125. }
  126. } else {
  127. offset++
  128. ex.Msg.Topic = pc.topic
  129. ex.Msg.Partition = pc.partition
  130. ex.Msg.Offset = offset
  131. pc.messages <- ex.Msg
  132. }
  133. }
  134. close(pc.messages)
  135. close(pc.errors)
  136. }
  137. ///////////////////////////////////////////////////
  138. // PartitionConsumer interface implementation
  139. ///////////////////////////////////////////////////
  140. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  141. func (pc *PartitionConsumer) AsyncClose() {
  142. pc.singleClose.Do(func() {
  143. close(pc.expectations)
  144. })
  145. }
  146. // Close implements the Close method from the sarama.PartitionConsumer interface. It will
  147. // verify whether the partition consumer was actually started.
  148. func (pc *PartitionConsumer) Close() error {
  149. if !pc.consumed {
  150. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  151. return errPartitionConsumerNotStarted
  152. }
  153. if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
  154. pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
  155. }
  156. if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
  157. pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
  158. }
  159. pc.AsyncClose()
  160. var (
  161. closeErr error
  162. wg sync.WaitGroup
  163. )
  164. wg.Add(1)
  165. go func() {
  166. defer wg.Done()
  167. var errs = make(sarama.ConsumerErrors, 0)
  168. for err := range pc.errors {
  169. errs = append(errs, err)
  170. }
  171. if len(errs) > 0 {
  172. closeErr = errs
  173. }
  174. }()
  175. wg.Add(1)
  176. go func() {
  177. defer wg.Done()
  178. for _ = range pc.messages {
  179. // drain
  180. }
  181. }()
  182. wg.Wait()
  183. return closeErr
  184. }
  185. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  186. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  187. return pc.errors
  188. }
  189. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  190. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  191. return pc.messages
  192. }
  193. ///////////////////////////////////////////////////
  194. // Expectation API
  195. ///////////////////////////////////////////////////
  196. // YieldMessage will yield a messages Messages channel of this partition consumer
  197. // when it is consumed. By default, the mock consumer will not verify whether this
  198. // message was consumed from the Messages channel, because there are legitimate
  199. // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
  200. // verify that the channel is empty on close.
  201. func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
  202. pc.expectations <- &consumerExpectation{Msg: msg}
  203. }
  204. // YieldError will yield an error on the Errors channel of this partition consumer
  205. // when it is consumed. By default, the mock consumer will not verify whether this error was
  206. // consumed from the Errors channel, because there are legitimate reasons for this
  207. // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
  208. // the channel is empty on close.
  209. func (pc *PartitionConsumer) YieldError(err error) {
  210. pc.expectations <- &consumerExpectation{Err: err}
  211. }
  212. // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
  213. // that the messages channel will be fully drained when Close is called. If this
  214. // expectation is not met, an error is reported to the error reporter.
  215. func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
  216. pc.messagesShouldBeDrained = true
  217. }
  218. // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
  219. // that the errors channel will be fully drained when Close is called. If this
  220. // expectation is not met, an error is reported to the error reporter.
  221. func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
  222. pc.errorsShouldBeDrained = true
  223. }