consumer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. type Consumer struct {
  7. l sync.Mutex
  8. t ErrorReporter
  9. config *sarama.Config
  10. partitionConsumers map[string]map[int32]*PartitionConsumer
  11. }
  12. type PartitionConsumer struct {
  13. l sync.Mutex
  14. t ErrorReporter
  15. topic string
  16. partition int32
  17. expectations chan *consumerExpectation
  18. messages chan *sarama.ConsumerMessage
  19. errors chan *sarama.ConsumerError
  20. consumed bool
  21. }
  22. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  23. if config == nil {
  24. config = sarama.NewConfig()
  25. }
  26. c := &Consumer{
  27. t: t,
  28. config: config,
  29. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  30. }
  31. return c
  32. }
  33. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  34. c.l.Lock()
  35. defer c.l.Unlock()
  36. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  37. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  38. return nil, errOutOfExpectations
  39. }
  40. pc := c.partitionConsumers[topic][partition]
  41. pc.consumed = true
  42. go pc.handleExpectations()
  43. return pc, nil
  44. }
  45. // Close implements the Close method from the sarama.Consumer interface.
  46. func (c *Consumer) Close() error {
  47. c.l.Lock()
  48. defer c.l.Unlock()
  49. for _, partitions := range c.partitionConsumers {
  50. for _, partitionConsumer := range partitions {
  51. _ = partitionConsumer.Close()
  52. }
  53. }
  54. return nil
  55. }
  56. func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
  57. c.l.Lock()
  58. defer c.l.Unlock()
  59. if c.partitionConsumers[topic] == nil {
  60. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  61. }
  62. if c.partitionConsumers[topic][partition] == nil {
  63. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  64. t: c.t,
  65. topic: topic,
  66. partition: partition,
  67. expectations: make(chan *consumerExpectation, 1000),
  68. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  69. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  70. }
  71. }
  72. return c.partitionConsumers[topic][partition]
  73. }
  74. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  75. func (pc *PartitionConsumer) AsyncClose() {
  76. close(pc.expectations)
  77. }
  78. // Close implements the Close method from the sarama.PartitionConsumer interface.
  79. func (pc *PartitionConsumer) Close() error {
  80. if !pc.consumed {
  81. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  82. return errPartitionConsumerNotStarted
  83. }
  84. pc.AsyncClose()
  85. var (
  86. closeErr error
  87. wg sync.WaitGroup
  88. )
  89. wg.Add(1)
  90. go func() {
  91. defer wg.Done()
  92. var errs = make(sarama.ConsumerErrors, 0)
  93. for err := range pc.errors {
  94. errs = append(errs, err)
  95. }
  96. if len(errs) > 0 {
  97. closeErr = errs
  98. }
  99. }()
  100. wg.Add(1)
  101. go func() {
  102. defer wg.Done()
  103. for _ = range pc.messages {
  104. // drain
  105. }
  106. }()
  107. wg.Wait()
  108. return closeErr
  109. }
  110. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  111. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  112. return pc.errors
  113. }
  114. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  115. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  116. return pc.messages
  117. }
  118. func (pc *PartitionConsumer) handleExpectations() {
  119. pc.l.Lock()
  120. defer pc.l.Unlock()
  121. var offset int64
  122. for ex := range pc.expectations {
  123. if ex.Err != nil {
  124. pc.errors <- &sarama.ConsumerError{
  125. Topic: pc.topic,
  126. Partition: pc.partition,
  127. Err: ex.Err,
  128. }
  129. } else {
  130. offset++
  131. ex.Msg.Topic = pc.topic
  132. ex.Msg.Partition = pc.partition
  133. ex.Msg.Offset = offset
  134. pc.messages <- ex.Msg
  135. }
  136. }
  137. close(pc.messages)
  138. close(pc.errors)
  139. }
  140. func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
  141. pc.expectations <- &consumerExpectation{Msg: msg}
  142. }
  143. func (pc *PartitionConsumer) ExpectError(err error) {
  144. pc.expectations <- &consumerExpectation{Err: err}
  145. }