sync_producer.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package mocks
  2. import (
  3. "github.com/Shopify/sarama"
  4. "sync"
  5. )
  6. // SyncProducer implements sarama's SyncProducer interface for testing purposes.
  7. // Before you can use it, you have to set expectations on the mock SyncProducer
  8. // to tell it how to handle calls to SendMessage, so you can easily test success
  9. // and failure scenarios.
  10. type SyncProducer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. expectations []*producerExpectation
  14. lastOffset int64
  15. }
  16. // NewSyncProducer instantiates a new SyncProducer mock. The t argument should
  17. // be the *testing.T instance of your test method. An error will be written to it if
  18. // an expectation is violated. The config argument is currently unused, but is
  19. // maintained to be compatible with the async Producer.
  20. func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
  21. return &SyncProducer{
  22. t: t,
  23. expectations: make([]*producerExpectation, 0),
  24. }
  25. }
  26. ////////////////////////////////////////////////
  27. // Implement SyncProducer interface
  28. ////////////////////////////////////////////////
  29. // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
  30. // You have to set expectations on the mock producer before calling SendMessage, so it knows
  31. // how to handle them. If there is no more remaining expectations when SendMessage is called,
  32. // the mock producer will write an error to the test state object.
  33. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
  34. sp.l.Lock()
  35. defer sp.l.Unlock()
  36. if len(sp.expectations) > 0 {
  37. expectation := sp.expectations[0]
  38. sp.expectations = sp.expectations[1:]
  39. if expectation.Result == errProduceSuccess {
  40. sp.lastOffset++
  41. msg.Offset = sp.lastOffset
  42. return 0, msg.Offset, nil
  43. } else {
  44. return -1, -1, expectation.Result
  45. }
  46. } else {
  47. sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
  48. return -1, -1, errOutOfExpectations
  49. }
  50. }
  51. // Close corresponds with the Close method of sarama's SyncProducer implementation.
  52. // By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
  53. // so it will write an error to the test state if there's any remaining expectations.
  54. func (sp *SyncProducer) Close() error {
  55. sp.l.Lock()
  56. defer sp.l.Unlock()
  57. if len(sp.expectations) > 0 {
  58. sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
  59. }
  60. return nil
  61. }
  62. ////////////////////////////////////////////////
  63. // Setting expectations
  64. ////////////////////////////////////////////////
  65. // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
  66. // called. The mock producer will handle the message as if it produced successfully, i.e. by
  67. // returning a valid partition, and offset, and a nil error.
  68. func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
  69. sp.l.Lock()
  70. defer sp.l.Unlock()
  71. sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
  72. }
  73. // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
  74. // called. The mock producer will handle the message as if it failed to produce
  75. // successfully, i.e. by returning the provided error.
  76. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
  77. sp.l.Lock()
  78. defer sp.l.Unlock()
  79. sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
  80. }