consumer.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // Consumer implements sarama's Consumer interface for testing purposes.
  7. // Before you can start consuming from this consumer, you have to register
  8. // topic/partitions using ExpectConsumePartition, and set expectations on them.
  9. type Consumer struct {
  10. l sync.Mutex
  11. t ErrorReporter
  12. config *sarama.Config
  13. partitionConsumers map[string]map[int32]*PartitionConsumer
  14. metadata map[string][]int32
  15. }
  16. // NewConsumer returns a new mock Consumer instance. The t argument should
  17. // be the *testing.T instance of your test method. An error will be written to it if
  18. // an expectation is violated. The config argument is currently unused and can be set to nil.
  19. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  20. if config == nil {
  21. config = sarama.NewConfig()
  22. }
  23. c := &Consumer{
  24. t: t,
  25. config: config,
  26. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  27. }
  28. return c
  29. }
  30. ///////////////////////////////////////////////////
  31. // Consumer interface implementation
  32. ///////////////////////////////////////////////////
  33. // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
  34. // Before you can start consuming a partition, you have to set expectations on it using
  35. // ExpectConsumePartition. You can only consume a partition once per consumer.
  36. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  37. c.l.Lock()
  38. defer c.l.Unlock()
  39. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  40. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  41. return nil, errOutOfExpectations
  42. }
  43. pc := c.partitionConsumers[topic][partition]
  44. if pc.consumed {
  45. return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
  46. }
  47. if pc.offset != AnyOffset && pc.offset != offset {
  48. c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
  49. }
  50. pc.consumed = true
  51. go pc.handleExpectations()
  52. return pc, nil
  53. }
  54. // Topics returns a list of topics, as registered with SetMetadata
  55. func (c *Consumer) Topics() ([]string, error) {
  56. c.l.Lock()
  57. defer c.l.Unlock()
  58. if c.metadata == nil {
  59. c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
  60. return nil, sarama.ErrOutOfBrokers
  61. }
  62. var result []string
  63. for topic, _ := range c.metadata {
  64. result = append(result, topic)
  65. }
  66. return result, nil
  67. }
  68. // Partitions returns the list of parititons for the given topic, as registered with SetMetadata
  69. func (c *Consumer) Partitions(topic string) ([]int32, error) {
  70. c.l.Lock()
  71. defer c.l.Unlock()
  72. if c.metadata == nil {
  73. c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
  74. return nil, sarama.ErrOutOfBrokers
  75. }
  76. if c.metadata[topic] == nil {
  77. return nil, sarama.ErrUnknownTopicOrPartition
  78. }
  79. return c.metadata[topic], nil
  80. }
  81. // Close implements the Close method from the sarama.Consumer interface. It will close
  82. // all registered PartitionConsumer instances.
  83. func (c *Consumer) Close() error {
  84. c.l.Lock()
  85. defer c.l.Unlock()
  86. for _, partitions := range c.partitionConsumers {
  87. for _, partitionConsumer := range partitions {
  88. _ = partitionConsumer.Close()
  89. }
  90. }
  91. return nil
  92. }
  93. ///////////////////////////////////////////////////
  94. // Expectation API
  95. ///////////////////////////////////////////////////
  96. // SetMetadata sets the clusters topic/partition metadata,
  97. // which will be returned by Topics() and Partitions().
  98. func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
  99. c.l.Lock()
  100. defer c.l.Unlock()
  101. c.metadata = metadata
  102. }
  103. // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
  104. // The registered PartitionConsumer will be returned, so you can set expectations
  105. // on it using method chanining. Once a topic/partition is registered, you are
  106. // expected to start consuming it using ConsumePartition. If that doesn't happen,
  107. // an error will be written to the error reporter once the mock consumer is closed. It will
  108. // also expect that the
  109. func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
  110. c.l.Lock()
  111. defer c.l.Unlock()
  112. if c.partitionConsumers[topic] == nil {
  113. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  114. }
  115. if c.partitionConsumers[topic][partition] == nil {
  116. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  117. t: c.t,
  118. topic: topic,
  119. partition: partition,
  120. offset: offset,
  121. expectations: make(chan *consumerExpectation, 1000),
  122. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  123. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  124. }
  125. }
  126. return c.partitionConsumers[topic][partition]
  127. }
  128. ///////////////////////////////////////////////////
  129. // PartitionConsumer mock type
  130. ///////////////////////////////////////////////////
  131. // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
  132. // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
  133. // registered first using the Consumer's ExpectConsumePartition method. Before consuming the
  134. // Errors and Messages channel, you should specify what values will be provided on these
  135. // channels using YieldMessage and YieldError.
  136. type PartitionConsumer struct {
  137. l sync.Mutex
  138. t ErrorReporter
  139. topic string
  140. partition int32
  141. offset int64
  142. expectations chan *consumerExpectation
  143. messages chan *sarama.ConsumerMessage
  144. errors chan *sarama.ConsumerError
  145. singleClose sync.Once
  146. consumed bool
  147. errorsShouldBeDrained bool
  148. messagesShouldBeDrained bool
  149. }
  150. func (pc *PartitionConsumer) handleExpectations() {
  151. pc.l.Lock()
  152. defer pc.l.Unlock()
  153. var offset int64
  154. for ex := range pc.expectations {
  155. if ex.Err != nil {
  156. pc.errors <- &sarama.ConsumerError{
  157. Topic: pc.topic,
  158. Partition: pc.partition,
  159. Err: ex.Err,
  160. }
  161. } else {
  162. offset++
  163. ex.Msg.Topic = pc.topic
  164. ex.Msg.Partition = pc.partition
  165. ex.Msg.Offset = offset
  166. pc.messages <- ex.Msg
  167. }
  168. }
  169. close(pc.messages)
  170. close(pc.errors)
  171. }
  172. ///////////////////////////////////////////////////
  173. // PartitionConsumer interface implementation
  174. ///////////////////////////////////////////////////
  175. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  176. func (pc *PartitionConsumer) AsyncClose() {
  177. pc.singleClose.Do(func() {
  178. close(pc.expectations)
  179. })
  180. }
  181. // Close implements the Close method from the sarama.PartitionConsumer interface. It will
  182. // verify whether the partition consumer was actually started.
  183. func (pc *PartitionConsumer) Close() error {
  184. if !pc.consumed {
  185. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  186. return errPartitionConsumerNotStarted
  187. }
  188. if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
  189. pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
  190. }
  191. if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
  192. pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
  193. }
  194. pc.AsyncClose()
  195. var (
  196. closeErr error
  197. wg sync.WaitGroup
  198. )
  199. wg.Add(1)
  200. go func() {
  201. defer wg.Done()
  202. var errs = make(sarama.ConsumerErrors, 0)
  203. for err := range pc.errors {
  204. errs = append(errs, err)
  205. }
  206. if len(errs) > 0 {
  207. closeErr = errs
  208. }
  209. }()
  210. wg.Add(1)
  211. go func() {
  212. defer wg.Done()
  213. for _ = range pc.messages {
  214. // drain
  215. }
  216. }()
  217. wg.Wait()
  218. return closeErr
  219. }
  220. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  221. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  222. return pc.errors
  223. }
  224. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  225. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  226. return pc.messages
  227. }
  228. ///////////////////////////////////////////////////
  229. // Expectation API
  230. ///////////////////////////////////////////////////
  231. // YieldMessage will yield a messages Messages channel of this partition consumer
  232. // when it is consumed. By default, the mock consumer will not verify whether this
  233. // message was consumed from the Messages channel, because there are legitimate
  234. // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
  235. // verify that the channel is empty on close.
  236. func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
  237. pc.expectations <- &consumerExpectation{Msg: msg}
  238. }
  239. // YieldError will yield an error on the Errors channel of this partition consumer
  240. // when it is consumed. By default, the mock consumer will not verify whether this error was
  241. // consumed from the Errors channel, because there are legitimate reasons for this
  242. // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
  243. // the channel is empty on close.
  244. func (pc *PartitionConsumer) YieldError(err error) {
  245. pc.expectations <- &consumerExpectation{Err: err}
  246. }
  247. // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
  248. // that the messages channel will be fully drained when Close is called. If this
  249. // expectation is not met, an error is reported to the error reporter.
  250. func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
  251. pc.messagesShouldBeDrained = true
  252. }
  253. // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
  254. // that the errors channel will be fully drained when Close is called. If this
  255. // expectation is not met, an error is reported to the error reporter.
  256. func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
  257. pc.errorsShouldBeDrained = true
  258. }