consumer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package mocks
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/Shopify/sarama"
  6. )
  7. // Consumer implements sarama's Consumer interface for testing purposes.
  8. // Before you can start consuming from this consumer, you have to register
  9. // topic/partitions using ExpectConsumePartition, and set expectations on them.
  10. type Consumer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. config *sarama.Config
  14. partitionConsumers map[string]map[int32]*PartitionConsumer
  15. metadata map[string][]int32
  16. }
  17. // NewConsumer returns a new mock Consumer instance. The t argument should
  18. // be the *testing.T instance of your test method. An error will be written to it if
  19. // an expectation is violated. The config argument is currently unused and can be set to nil.
  20. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  21. if config == nil {
  22. config = sarama.NewConfig()
  23. }
  24. c := &Consumer{
  25. t: t,
  26. config: config,
  27. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  28. }
  29. return c
  30. }
  31. ///////////////////////////////////////////////////
  32. // Consumer interface implementation
  33. ///////////////////////////////////////////////////
  34. // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
  35. // Before you can start consuming a partition, you have to set expectations on it using
  36. // ExpectConsumePartition. You can only consume a partition once per consumer.
  37. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  38. c.l.Lock()
  39. defer c.l.Unlock()
  40. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  41. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  42. return nil, errOutOfExpectations
  43. }
  44. pc := c.partitionConsumers[topic][partition]
  45. if pc.consumed {
  46. return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
  47. }
  48. if pc.offset != AnyOffset && pc.offset != offset {
  49. c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
  50. }
  51. pc.consumed = true
  52. go pc.handleExpectations()
  53. return pc, nil
  54. }
  55. // Topics returns a list of topics, as registered with SetMetadata
  56. func (c *Consumer) Topics() ([]string, error) {
  57. c.l.Lock()
  58. defer c.l.Unlock()
  59. if c.metadata == nil {
  60. c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
  61. return nil, sarama.ErrOutOfBrokers
  62. }
  63. var result []string
  64. for topic, _ := range c.metadata {
  65. result = append(result, topic)
  66. }
  67. return result, nil
  68. }
  69. // Partitions returns the list of parititons for the given topic, as registered with SetMetadata
  70. func (c *Consumer) Partitions(topic string) ([]int32, error) {
  71. c.l.Lock()
  72. defer c.l.Unlock()
  73. if c.metadata == nil {
  74. c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
  75. return nil, sarama.ErrOutOfBrokers
  76. }
  77. if c.metadata[topic] == nil {
  78. return nil, sarama.ErrUnknownTopicOrPartition
  79. }
  80. return c.metadata[topic], nil
  81. }
  82. // Close implements the Close method from the sarama.Consumer interface. It will close
  83. // all registered PartitionConsumer instances.
  84. func (c *Consumer) Close() error {
  85. c.l.Lock()
  86. defer c.l.Unlock()
  87. for _, partitions := range c.partitionConsumers {
  88. for _, partitionConsumer := range partitions {
  89. _ = partitionConsumer.Close()
  90. }
  91. }
  92. return nil
  93. }
  94. ///////////////////////////////////////////////////
  95. // Expectation API
  96. ///////////////////////////////////////////////////
  97. // SetMetadata sets the clusters topic/partition metadata,
  98. // which will be returned by Topics() and Partitions().
  99. func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
  100. c.l.Lock()
  101. defer c.l.Unlock()
  102. c.metadata = metadata
  103. }
  104. // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
  105. // The registered PartitionConsumer will be returned, so you can set expectations
  106. // on it using method chanining. Once a topic/partition is registered, you are
  107. // expected to start consuming it using ConsumePartition. If that doesn't happen,
  108. // an error will be written to the error reporter once the mock consumer is closed. It will
  109. // also expect that the
  110. func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
  111. c.l.Lock()
  112. defer c.l.Unlock()
  113. if c.partitionConsumers[topic] == nil {
  114. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  115. }
  116. if c.partitionConsumers[topic][partition] == nil {
  117. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  118. t: c.t,
  119. topic: topic,
  120. partition: partition,
  121. offset: offset,
  122. expectations: make(chan *consumerExpectation, 1000),
  123. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  124. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  125. }
  126. }
  127. return c.partitionConsumers[topic][partition]
  128. }
  129. ///////////////////////////////////////////////////
  130. // PartitionConsumer mock type
  131. ///////////////////////////////////////////////////
  132. // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
  133. // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
  134. // registered first using the Consumer's ExpectConsumePartition method. Before consuming the
  135. // Errors and Messages channel, you should specify what values will be provided on these
  136. // channels using YieldMessage and YieldError.
  137. type PartitionConsumer struct {
  138. l sync.Mutex
  139. t ErrorReporter
  140. topic string
  141. partition int32
  142. offset int64
  143. expectations chan *consumerExpectation
  144. messages chan *sarama.ConsumerMessage
  145. errors chan *sarama.ConsumerError
  146. singleClose sync.Once
  147. consumed bool
  148. errorsShouldBeDrained bool
  149. messagesShouldBeDrained bool
  150. highWaterMarkOffset int64
  151. }
  152. func (pc *PartitionConsumer) handleExpectations() {
  153. pc.l.Lock()
  154. defer pc.l.Unlock()
  155. for ex := range pc.expectations {
  156. if ex.Err != nil {
  157. pc.errors <- &sarama.ConsumerError{
  158. Topic: pc.topic,
  159. Partition: pc.partition,
  160. Err: ex.Err,
  161. }
  162. } else {
  163. atomic.AddInt64(&pc.highWaterMarkOffset, 1)
  164. ex.Msg.Topic = pc.topic
  165. ex.Msg.Partition = pc.partition
  166. ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
  167. pc.messages <- ex.Msg
  168. }
  169. }
  170. close(pc.messages)
  171. close(pc.errors)
  172. }
  173. ///////////////////////////////////////////////////
  174. // PartitionConsumer interface implementation
  175. ///////////////////////////////////////////////////
  176. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  177. func (pc *PartitionConsumer) AsyncClose() {
  178. pc.singleClose.Do(func() {
  179. close(pc.expectations)
  180. })
  181. }
  182. // Close implements the Close method from the sarama.PartitionConsumer interface. It will
  183. // verify whether the partition consumer was actually started.
  184. func (pc *PartitionConsumer) Close() error {
  185. if !pc.consumed {
  186. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  187. return errPartitionConsumerNotStarted
  188. }
  189. if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
  190. pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
  191. }
  192. if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
  193. pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
  194. }
  195. pc.AsyncClose()
  196. var (
  197. closeErr error
  198. wg sync.WaitGroup
  199. )
  200. wg.Add(1)
  201. go func() {
  202. defer wg.Done()
  203. var errs = make(sarama.ConsumerErrors, 0)
  204. for err := range pc.errors {
  205. errs = append(errs, err)
  206. }
  207. if len(errs) > 0 {
  208. closeErr = errs
  209. }
  210. }()
  211. wg.Add(1)
  212. go func() {
  213. defer wg.Done()
  214. for _ = range pc.messages {
  215. // drain
  216. }
  217. }()
  218. wg.Wait()
  219. return closeErr
  220. }
  221. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  222. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  223. return pc.errors
  224. }
  225. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  226. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  227. return pc.messages
  228. }
  229. func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
  230. return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
  231. }
  232. ///////////////////////////////////////////////////
  233. // Expectation API
  234. ///////////////////////////////////////////////////
  235. // YieldMessage will yield a messages Messages channel of this partition consumer
  236. // when it is consumed. By default, the mock consumer will not verify whether this
  237. // message was consumed from the Messages channel, because there are legitimate
  238. // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
  239. // verify that the channel is empty on close.
  240. func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
  241. pc.expectations <- &consumerExpectation{Msg: msg}
  242. }
  243. // YieldError will yield an error on the Errors channel of this partition consumer
  244. // when it is consumed. By default, the mock consumer will not verify whether this error was
  245. // consumed from the Errors channel, because there are legitimate reasons for this
  246. // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
  247. // the channel is empty on close.
  248. func (pc *PartitionConsumer) YieldError(err error) {
  249. pc.expectations <- &consumerExpectation{Err: err}
  250. }
  251. // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
  252. // that the messages channel will be fully drained when Close is called. If this
  253. // expectation is not met, an error is reported to the error reporter.
  254. func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
  255. pc.messagesShouldBeDrained = true
  256. }
  257. // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
  258. // that the errors channel will be fully drained when Close is called. If this
  259. // expectation is not met, an error is reported to the error reporter.
  260. func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
  261. pc.errorsShouldBeDrained = true
  262. }