consumer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. type Consumer struct {
  7. l sync.Mutex
  8. t ErrorReporter
  9. config *sarama.Config
  10. partitionConsumers map[string]map[int32]*PartitionConsumer
  11. }
  12. type PartitionConsumer struct {
  13. l sync.Mutex
  14. t ErrorReporter
  15. topic string
  16. partition int32
  17. expectations chan *consumerExpectation
  18. messages chan *sarama.ConsumerMessage
  19. errors chan *sarama.ConsumerError
  20. consumed bool
  21. }
  22. func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
  23. if config == nil {
  24. config = sarama.NewConfig()
  25. }
  26. c := &Consumer{
  27. t: t,
  28. config: config,
  29. partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
  30. }
  31. return c
  32. }
  33. func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
  34. c.l.Lock()
  35. defer c.l.Unlock()
  36. if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
  37. c.t.Errorf("No expectations set for %s/%d", topic, partition)
  38. return nil, errOutOfExpectations
  39. }
  40. pc := c.partitionConsumers[topic][partition]
  41. pc.consumed = true
  42. go pc.handleExpectations()
  43. return pc, nil
  44. }
  45. // Close implements the Close method from the sarama.Consumer interface.
  46. func (c *Consumer) Close() error {
  47. c.l.Lock()
  48. defer c.l.Unlock()
  49. for _, partitions := range c.partitionConsumers {
  50. for _, partitionConsumer := range partitions {
  51. _ = partitionConsumer.Close()
  52. }
  53. }
  54. return nil
  55. }
  56. func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
  57. c.l.Lock()
  58. defer c.l.Unlock()
  59. if c.partitionConsumers[topic] == nil {
  60. c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
  61. }
  62. if c.partitionConsumers[topic][partition] == nil {
  63. c.partitionConsumers[topic][partition] = &PartitionConsumer{
  64. t: c.t,
  65. topic: topic,
  66. partition: partition,
  67. expectations: make(chan *consumerExpectation, 1000),
  68. messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
  69. errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
  70. }
  71. }
  72. return c.partitionConsumers[topic][partition]
  73. }
  74. // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
  75. func (pc *PartitionConsumer) AsyncClose() {
  76. close(pc.expectations)
  77. }
  78. // Close implements the Close method from the sarama.PartitionConsumer interface.
  79. func (pc *PartitionConsumer) Close() error {
  80. if !pc.consumed {
  81. pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
  82. }
  83. pc.AsyncClose()
  84. var errs = make(sarama.ConsumerErrors, 0)
  85. go func() {
  86. for err := range pc.errors {
  87. errs = append(errs, err)
  88. }
  89. }()
  90. go func() {
  91. for _ = range pc.messages {
  92. // drain
  93. }
  94. }()
  95. pc.l.Lock()
  96. pc.l.Unlock()
  97. if len(errs) > 0 {
  98. return errs
  99. }
  100. return nil
  101. }
  102. // Errors implements the Errors method from the sarama.PartitionConsumer interface.
  103. func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
  104. return pc.errors
  105. }
  106. // Messages implements the Messages method from the sarama.PartitionConsumer interface.
  107. func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
  108. return pc.messages
  109. }
  110. func (pc *PartitionConsumer) handleExpectations() {
  111. pc.l.Lock()
  112. defer pc.l.Unlock()
  113. var offset int64
  114. for ex := range pc.expectations {
  115. if ex.Err != nil {
  116. pc.errors <- &sarama.ConsumerError{
  117. Topic: pc.topic,
  118. Partition: pc.partition,
  119. Err: ex.Err,
  120. }
  121. } else {
  122. offset++
  123. ex.Msg.Topic = pc.topic
  124. ex.Msg.Partition = pc.partition
  125. ex.Msg.Offset = offset
  126. pc.messages <- ex.Msg
  127. }
  128. }
  129. close(pc.messages)
  130. close(pc.errors)
  131. }
  132. func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
  133. pc.expectations <- &consumerExpectation{Msg: msg}
  134. }
  135. func (pc *PartitionConsumer) ExpectError(err error) {
  136. pc.expectations <- &consumerExpectation{Err: err}
  137. }