consumer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package mocks
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/Shopify/sarama"
  6. )
  7. // Consumer implements sarama's Consumer interface for testing purposes.
  8. // Before you can start consuming from this consumer, you have to register
  9. // topic/partitions using ExpectConsumePartition, and set expectations on them.
  10. type Consumer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. config *sarama.Config
  14. partitionConsumers map[string]map[int32]*PartitionConsumer
  15. metadata map[string][]int32
  16. }
  17. // NewConsumer returns a new mock Consumer instance. The t argument should
  18. // be the *testing.T instance of your test method. An error will be written to it if
  19. // an expectation is violated. The config argument is currently unused and can be set to nil.
  20. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  21. if config == nil {
  22. config = sarama.NewConfig()
  23. }
  24. c := &Consumer{
  25. t: t,
  26. config: config,
  27. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  28. }
  29. return c
  30. }
  31. ///////////////////////////////////////////////////
  32. // Consumer interface implementation
  33. ///////////////////////////////////////////////////
  34. // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
  35. // Before you can start consuming a partition, you have to set expectations on it using
  36. // ExpectConsumePartition. You can only consume a partition once per consumer.
  37. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  38. c.l.Lock()
  39. defer c.l.Unlock()
  40. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  41. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  42. return nil, errOutOfExpectations
  43. }
  44. pc := c.partitionConsumers[topic][partition]
  45. if pc.consumed {
  46. return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
  47. }
  48. if pc.offset != AnyOffset && pc.offset != offset {
  49. c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
  50. }
  51. pc.consumed = true
  52. return pc, nil
  53. }
  54. // Topics returns a list of topics, as registered with SetMetadata
  55. func (c *Consumer) Topics() ([]string, error) {
  56. c.l.Lock()
  57. defer c.l.Unlock()
  58. if c.metadata == nil {
  59. c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
  60. return nil, sarama.ErrOutOfBrokers
  61. }
  62. var result []string
  63. for topic := range c.metadata {
  64. result = append(result, topic)
  65. }
  66. return result, nil
  67. }
  68. // Partitions returns the list of parititons for the given topic, as registered with SetMetadata
  69. func (c *Consumer) Partitions(topic string) ([]int32, error) {
  70. c.l.Lock()
  71. defer c.l.Unlock()
  72. if c.metadata == nil {
  73. c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
  74. return nil, sarama.ErrOutOfBrokers
  75. }
  76. if c.metadata[topic] == nil {
  77. return nil, sarama.ErrUnknownTopicOrPartition
  78. }
  79. return c.metadata[topic], nil
  80. }
  81. func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 {
  82. c.l.Lock()
  83. defer c.l.Unlock()
  84. hwms := make(map[string]map[int32]int64, len(c.partitionConsumers))
  85. for topic, partitionConsumers := range c.partitionConsumers {
  86. hwm := make(map[int32]int64, len(partitionConsumers))
  87. for partition, pc := range partitionConsumers {
  88. hwm[partition] = pc.HighWaterMarkOffset()
  89. }
  90. hwms[topic] = hwm
  91. }
  92. return hwms
  93. }
  94. // Close implements the Close method from the sarama.Consumer interface. It will close
  95. // all registered PartitionConsumer instances.
  96. func (c *Consumer) Close() error {
  97. c.l.Lock()
  98. defer c.l.Unlock()
  99. for _, partitions := range c.partitionConsumers {
  100. for _, partitionConsumer := range partitions {
  101. _ = partitionConsumer.Close()
  102. }
  103. }
  104. return nil
  105. }
  106. ///////////////////////////////////////////////////
  107. // Expectation API
  108. ///////////////////////////////////////////////////
  109. // SetTopicMetadata sets the clusters topic/partition metadata,
  110. // which will be returned by Topics() and Partitions().
  111. func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
  112. c.l.Lock()
  113. defer c.l.Unlock()
  114. c.metadata = metadata
  115. }
  116. // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
  117. // The registered PartitionConsumer will be returned, so you can set expectations
  118. // on it using method chaining. Once a topic/partition is registered, you are
  119. // expected to start consuming it using ConsumePartition. If that doesn't happen,
  120. // an error will be written to the error reporter once the mock consumer is closed. It will
  121. // also expect that the
  122. func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
  123. c.l.Lock()
  124. defer c.l.Unlock()
  125. if c.partitionConsumers[topic] == nil {
  126. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  127. }
  128. if c.partitionConsumers[topic][partition] == nil {
  129. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  130. t: c.t,
  131. topic: topic,
  132. partition: partition,
  133. offset: offset,
  134. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  135. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  136. }
  137. }
  138. return c.partitionConsumers[topic][partition]
  139. }
  140. ///////////////////////////////////////////////////
  141. // PartitionConsumer mock type
  142. ///////////////////////////////////////////////////
  143. // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
  144. // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
  145. // registered first using the Consumer's ExpectConsumePartition method. Before consuming the
  146. // Errors and Messages channel, you should specify what values will be provided on these
  147. // channels using YieldMessage and YieldError.
  148. type PartitionConsumer struct {
  149. l sync.Mutex
  150. t ErrorReporter
  151. topic string
  152. partition int32
  153. offset int64
  154. messages chan *sarama.ConsumerMessage
  155. errors chan *sarama.ConsumerError
  156. singleClose sync.Once
  157. consumed bool
  158. errorsShouldBeDrained bool
  159. messagesShouldBeDrained bool
  160. highWaterMarkOffset int64
  161. }
  162. ///////////////////////////////////////////////////
  163. // PartitionConsumer interface implementation
  164. ///////////////////////////////////////////////////
  165. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  166. func (pc *PartitionConsumer) AsyncClose() {
  167. pc.singleClose.Do(func() {
  168. close(pc.messages)
  169. close(pc.errors)
  170. })
  171. }
  172. // Close implements the Close method from the sarama.PartitionConsumer interface. It will
  173. // verify whether the partition consumer was actually started.
  174. func (pc *PartitionConsumer) Close() error {
  175. if !pc.consumed {
  176. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  177. return errPartitionConsumerNotStarted
  178. }
  179. if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
  180. pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
  181. }
  182. if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
  183. pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
  184. }
  185. pc.AsyncClose()
  186. var (
  187. closeErr error
  188. wg sync.WaitGroup
  189. )
  190. wg.Add(1)
  191. go func() {
  192. defer wg.Done()
  193. var errs = make(sarama.ConsumerErrors, 0)
  194. for err := range pc.errors {
  195. errs = append(errs, err)
  196. }
  197. if len(errs) > 0 {
  198. closeErr = errs
  199. }
  200. }()
  201. wg.Add(1)
  202. go func() {
  203. defer wg.Done()
  204. for range pc.messages {
  205. // drain
  206. }
  207. }()
  208. wg.Wait()
  209. return closeErr
  210. }
  211. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  212. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  213. return pc.errors
  214. }
  215. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  216. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  217. return pc.messages
  218. }
  219. func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
  220. return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
  221. }
  222. ///////////////////////////////////////////////////
  223. // Expectation API
  224. ///////////////////////////////////////////////////
  225. // YieldMessage will yield a messages Messages channel of this partition consumer
  226. // when it is consumed. By default, the mock consumer will not verify whether this
  227. // message was consumed from the Messages channel, because there are legitimate
  228. // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
  229. // verify that the channel is empty on close.
  230. func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
  231. pc.l.Lock()
  232. defer pc.l.Unlock()
  233. msg.Topic = pc.topic
  234. msg.Partition = pc.partition
  235. msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1)
  236. pc.messages <- msg
  237. }
  238. // YieldError will yield an error on the Errors channel of this partition consumer
  239. // when it is consumed. By default, the mock consumer will not verify whether this error was
  240. // consumed from the Errors channel, because there are legitimate reasons for this
  241. // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
  242. // the channel is empty on close.
  243. func (pc *PartitionConsumer) YieldError(err error) {
  244. pc.errors <- &sarama.ConsumerError{
  245. Topic: pc.topic,
  246. Partition: pc.partition,
  247. Err: err,
  248. }
  249. }
  250. // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
  251. // that the messages channel will be fully drained when Close is called. If this
  252. // expectation is not met, an error is reported to the error reporter.
  253. func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
  254. pc.messagesShouldBeDrained = true
  255. }
  256. // ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
  257. // that the errors channel will be fully drained when Close is called. If this
  258. // expectation is not met, an error is reported to the error reporter.
  259. func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
  260. pc.errorsShouldBeDrained = true
  261. }