async_producer.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // AsyncProducer implements sarama's Producer interface for testing purposes.
  7. // Before you can send messages to it's Input channel, you have to set expectations
  8. // so it knows how to handle the input. This way you can easily test success and
  9. // failure scenarios.
  10. type AsyncProducer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. expectations []*producerExpectation
  14. closed chan struct{}
  15. input chan *sarama.ProducerMessage
  16. successes chan *sarama.ProducerMessage
  17. errors chan *sarama.ProducerError
  18. lastOffset int64
  19. }
  20. // NewAsyncProducer instantiates a new Producer mock. The t argument should
  21. // be the *testing.T instance of your test method. An error will be written to it if
  22. // an expectation is violated. The config argument is used to determine whether it
  23. // should ack successes on the Successes channel.
  24. func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
  25. if config == nil {
  26. config = sarama.NewConfig()
  27. }
  28. mp := &AsyncProducer{
  29. t: t,
  30. closed: make(chan struct{}, 0),
  31. expectations: make([]*producerExpectation, 0),
  32. input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
  33. successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
  34. errors: make(chan *sarama.ProducerError, config.ChannelBufferSize),
  35. }
  36. go func() {
  37. defer func() {
  38. close(mp.successes)
  39. close(mp.errors)
  40. }()
  41. for msg := range mp.input {
  42. mp.l.Lock()
  43. if mp.expectations == nil || len(mp.expectations) == 0 {
  44. mp.expectations = nil
  45. mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
  46. } else {
  47. expectation := mp.expectations[0]
  48. mp.expectations = mp.expectations[1:]
  49. if expectation.Result == errProduceSuccess {
  50. mp.lastOffset++
  51. if config.Producer.Return.Successes {
  52. msg.Offset = mp.lastOffset
  53. mp.successes <- msg
  54. }
  55. } else {
  56. if config.Producer.Return.Errors {
  57. mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
  58. }
  59. }
  60. }
  61. mp.l.Unlock()
  62. }
  63. mp.l.Lock()
  64. if len(mp.expectations) > 0 {
  65. mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
  66. }
  67. mp.l.Unlock()
  68. close(mp.closed)
  69. }()
  70. return mp
  71. }
  72. ////////////////////////////////////////////////
  73. // Implement Producer interface
  74. ////////////////////////////////////////////////
  75. // AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation.
  76. // By closing a mock producer, you also tell it that no more input will be provided, so it will
  77. // write an error to the test state if there's any remaining expectations.
  78. func (mp *AsyncProducer) AsyncClose() {
  79. close(mp.input)
  80. }
  81. // Close corresponds with the Close method of sarama's Producer implementation.
  82. // By closing a mock producer, you also tell it that no more input will be provided, so it will
  83. // write an error to the test state if there's any remaining expectations.
  84. func (mp *AsyncProducer) Close() error {
  85. mp.AsyncClose()
  86. <-mp.closed
  87. return nil
  88. }
  89. // Input corresponds with the Input method of sarama's Producer implementation.
  90. // You have to set expectations on the mock producer before writing messages to the Input
  91. // channel, so it knows how to handle them. If there is no more remaining expectations and
  92. // a messages is written to the Input channel, the mock producer will write an error to the test
  93. // state object.
  94. func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage {
  95. return mp.input
  96. }
  97. // Successes corresponds with the Successes method of sarama's Producer implementation.
  98. func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {
  99. return mp.successes
  100. }
  101. // Errors corresponds with the Errors method of sarama's Producer implementation.
  102. func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
  103. return mp.errors
  104. }
  105. ////////////////////////////////////////////////
  106. // Setting expectations
  107. ////////////////////////////////////////////////
  108. // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
  109. // on the input channel. The mock producer will handle the message as if it is produced successfully,
  110. // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
  111. // is set to true.
  112. func (mp *AsyncProducer) ExpectInputAndSucceed() {
  113. mp.l.Lock()
  114. defer mp.l.Unlock()
  115. mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
  116. }
  117. // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
  118. // on the input channel. The mock producer will handle the message as if it failed to produce
  119. // successfully. This means it will make a ProducerError available on the Errors channel.
  120. func (mp *AsyncProducer) ExpectInputAndFail(err error) {
  121. mp.l.Lock()
  122. defer mp.l.Unlock()
  123. mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
  124. }