consumer.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package mocks
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/Shopify/sarama"
  6. )
  7. // Consumer implements sarama's Consumer interface for testing purposes.
  8. // Before you can start consuming from this consumer, you have to register
  9. // topic/partitions using ExpectConsumePartition, and set expectations on them.
  10. type Consumer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. config *sarama.Config
  14. partitionConsumers map[string]map[int32]*PartitionConsumer
  15. metadata map[string][]int32
  16. }
  17. // NewConsumer returns a new mock Consumer instance. The t argument should
  18. // be the *testing.T instance of your test method. An error will be written to it if
  19. // an expectation is violated. The config argument is currently unused and can be set to nil.
  20. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  21. if config == nil {
  22. config = sarama.NewConfig()
  23. }
  24. c := &Consumer{
  25. t: t,
  26. config: config,
  27. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  28. }
  29. return c
  30. }
  31. ///////////////////////////////////////////////////
  32. // Consumer interface implementation
  33. ///////////////////////////////////////////////////
  34. // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
  35. // Before you can start consuming a partition, you have to set expectations on it using
  36. // ExpectConsumePartition. You can only consume a partition once per consumer.
  37. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  38. c.l.Lock()
  39. defer c.l.Unlock()
  40. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  41. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  42. return nil, errOutOfExpectations
  43. }
  44. pc := c.partitionConsumers[topic][partition]
  45. if pc.consumed {
  46. return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
  47. }
  48. if pc.offset != AnyOffset && pc.offset != offset {
  49. c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
  50. }
  51. pc.consumed = true
  52. return pc, nil
  53. }
  54. // Topics returns a list of topics, as registered with SetMetadata
  55. func (c *Consumer) Topics() ([]string, error) {
  56. c.l.Lock()
  57. defer c.l.Unlock()
  58. if c.metadata == nil {
  59. c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
  60. return nil, sarama.ErrOutOfBrokers
  61. }
  62. var result []string
  63. for topic := range c.metadata {
  64. result = append(result, topic)
  65. }
  66. return result, nil
  67. }
  68. // Partitions returns the list of parititons for the given topic, as registered with SetMetadata
  69. func (c *Consumer) Partitions(topic string) ([]int32, error) {
  70. c.l.Lock()
  71. defer c.l.Unlock()
  72. if c.metadata == nil {
  73. c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
  74. return nil, sarama.ErrOutOfBrokers
  75. }
  76. if c.metadata[topic] == nil {
  77. return nil, sarama.ErrUnknownTopicOrPartition
  78. }
  79. return c.metadata[topic], nil
  80. }
  81. // Close implements the Close method from the sarama.Consumer interface. It will close
  82. // all registered PartitionConsumer instances.
  83. func (c *Consumer) Close() error {
  84. c.l.Lock()
  85. defer c.l.Unlock()
  86. for _, partitions := range c.partitionConsumers {
  87. for _, partitionConsumer := range partitions {
  88. _ = partitionConsumer.Close()
  89. }
  90. }
  91. return nil
  92. }
  93. ///////////////////////////////////////////////////
  94. // Expectation API
  95. ///////////////////////////////////////////////////
  96. // SetTopicMetadata sets the clusters topic/partition metadata,
  97. // which will be returned by Topics() and Partitions().
  98. func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
  99. c.l.Lock()
  100. defer c.l.Unlock()
  101. c.metadata = metadata
  102. }
  103. // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
  104. // The registered PartitionConsumer will be returned, so you can set expectations
  105. // on it using method chaining. Once a topic/partition is registered, you are
  106. // expected to start consuming it using ConsumePartition. If that doesn't happen,
  107. // an error will be written to the error reporter once the mock consumer is closed. It will
  108. // also expect that the
  109. func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
  110. c.l.Lock()
  111. defer c.l.Unlock()
  112. if c.partitionConsumers[topic] == nil {
  113. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  114. }
  115. if c.partitionConsumers[topic][partition] == nil {
  116. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  117. t: c.t,
  118. topic: topic,
  119. partition: partition,
  120. offset: offset,
  121. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  122. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  123. }
  124. }
  125. return c.partitionConsumers[topic][partition]
  126. }
  127. ///////////////////////////////////////////////////
  128. // PartitionConsumer mock type
  129. ///////////////////////////////////////////////////
  130. // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
  131. // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
  132. // registered first using the Consumer's ExpectConsumePartition method. Before consuming the
  133. // Errors and Messages channel, you should specify what values will be provided on these
  134. // channels using YieldMessage and YieldError.
  135. type PartitionConsumer struct {
  136. l sync.Mutex
  137. t ErrorReporter
  138. topic string
  139. partition int32
  140. offset int64
  141. messages chan *sarama.ConsumerMessage
  142. errors chan *sarama.ConsumerError
  143. singleClose sync.Once
  144. consumed bool
  145. errorsShouldBeDrained bool
  146. messagesShouldBeDrained bool
  147. highWaterMarkOffset int64
  148. }
  149. ///////////////////////////////////////////////////
  150. // PartitionConsumer interface implementation
  151. ///////////////////////////////////////////////////
  152. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  153. func (pc *PartitionConsumer) AsyncClose() {
  154. pc.singleClose.Do(func() {
  155. close(pc.messages)
  156. close(pc.errors)
  157. })
  158. }
  159. // Close implements the Close method from the sarama.PartitionConsumer interface. It will
  160. // verify whether the partition consumer was actually started.
  161. func (pc *PartitionConsumer) Close() error {
  162. if !pc.consumed {
  163. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  164. return errPartitionConsumerNotStarted
  165. }
  166. if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
  167. pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
  168. }
  169. if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
  170. pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
  171. }
  172. pc.AsyncClose()
  173. var (
  174. closeErr error
  175. wg sync.WaitGroup
  176. )
  177. wg.Add(1)
  178. go func() {
  179. defer wg.Done()
  180. var errs = make(sarama.ConsumerErrors, 0)
  181. for err := range pc.errors {
  182. errs = append(errs, err)
  183. }
  184. if len(errs) > 0 {
  185. closeErr = errs
  186. }
  187. }()
  188. wg.Add(1)
  189. go func() {
  190. defer wg.Done()
  191. for _ = range pc.messages {
  192. // drain
  193. }
  194. }()
  195. wg.Wait()
  196. return closeErr
  197. }
  198. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  199. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  200. return pc.errors
  201. }
  202. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  203. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  204. return pc.messages
  205. }
  206. func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
  207. return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
  208. }
  209. ///////////////////////////////////////////////////
  210. // Expectation API
  211. ///////////////////////////////////////////////////
  212. // YieldMessage will yield a messages Messages channel of this partition consumer
  213. // when it is consumed. By default, the mock consumer will not verify whether this
  214. // message was consumed from the Messages channel, because there are legitimate
  215. // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
  216. // verify that the channel is empty on close.
  217. func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
  218. pc.l.Lock()
  219. defer pc.l.Unlock()
  220. msg.Topic = pc.topic
  221. msg.Partition = pc.partition
  222. msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1)
  223. pc.messages <- msg
  224. }
  225. // YieldError will yield an error on the Errors channel of this partition consumer
  226. // when it is consumed. By default, the mock consumer will not verify whether this error was
  227. // consumed from the Errors channel, because there are legitimate reasons for this
  228. // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
  229. // the channel is empty on close.
  230. func (pc *PartitionConsumer) YieldError(err error) {
  231. pc.errors <- &sarama.ConsumerError{
  232. Topic: pc.topic,
  233. Partition: pc.partition,
  234. Err: err,
  235. }
  236. }
  237. // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
  238. // that the messages channel will be fully drained when Close is called. If this
  239. // expectation is not met, an error is reported to the error reporter.
  240. func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
  241. pc.messagesShouldBeDrained = true
  242. }
  243. // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
  244. // that the errors channel will be fully drained when Close is called. If this
  245. // expectation is not met, an error is reported to the error reporter.
  246. func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
  247. pc.errorsShouldBeDrained = true
  248. }