sync_producer.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package mocks
  2. import (
  3. "sync"
  4. "github.com/Shopify/sarama"
  5. )
  6. // SyncProducer implements sarama's SyncProducer interface for testing purposes.
  7. // Before you can use it, you have to set expectations on the mock SyncProducer
  8. // to tell it how to handle calls to SendMessage, so you can easily test success
  9. // and failure scenarios.
  10. type SyncProducer struct {
  11. l sync.Mutex
  12. t ErrorReporter
  13. expectations []*producerExpectation
  14. lastOffset int64
  15. }
  16. // NewSyncProducer instantiates a new SyncProducer mock. The t argument should
  17. // be the *testing.T instance of your test method. An error will be written to it if
  18. // an expectation is violated. The config argument is currently unused, but is
  19. // maintained to be compatible with the async Producer.
  20. func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
  21. return &SyncProducer{
  22. t: t,
  23. expectations: make([]*producerExpectation, 0),
  24. }
  25. }
  26. ////////////////////////////////////////////////
  27. // Implement SyncProducer interface
  28. ////////////////////////////////////////////////
  29. // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
  30. // You have to set expectations on the mock producer before calling SendMessage, so it knows
  31. // how to handle them. You can set a function in each expectation so that the message value
  32. // checked by this function and an error is returned if the match fails.
  33. // If there is no more remaining expectation when SendMessage is called,
  34. // the mock producer will write an error to the test state object.
  35. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
  36. sp.l.Lock()
  37. defer sp.l.Unlock()
  38. if len(sp.expectations) > 0 {
  39. expectation := sp.expectations[0]
  40. sp.expectations = sp.expectations[1:]
  41. if expectation.CheckFunction != nil {
  42. val, err := msg.Value.Encode()
  43. if err != nil {
  44. sp.t.Errorf("Input message encoding failed: %s", err.Error())
  45. return -1, -1, err
  46. }
  47. errCheck := expectation.CheckFunction(val)
  48. if errCheck != nil {
  49. sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
  50. return -1, -1, errCheck
  51. }
  52. }
  53. if expectation.Result == errProduceSuccess {
  54. sp.lastOffset++
  55. msg.Offset = sp.lastOffset
  56. return 0, msg.Offset, nil
  57. }
  58. return -1, -1, expectation.Result
  59. }
  60. sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
  61. return -1, -1, errOutOfExpectations
  62. }
  63. // SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
  64. // You have to set expectations on the mock producer before calling SendMessages, so it knows
  65. // how to handle them. If there is no more remaining expectations when SendMessages is called,
  66. // the mock producer will write an error to the test state object.
  67. func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
  68. sp.l.Lock()
  69. defer sp.l.Unlock()
  70. if len(sp.expectations) >= len(msgs) {
  71. expectations := sp.expectations[0:len(msgs)]
  72. sp.expectations = sp.expectations[len(msgs):]
  73. for i, expectation := range expectations {
  74. if expectation.CheckFunction != nil {
  75. val, err := msgs[i].Value.Encode()
  76. if err != nil {
  77. sp.t.Errorf("Input message encoding failed: %s", err.Error())
  78. return err
  79. }
  80. errCheck := expectation.CheckFunction(val)
  81. if errCheck != nil {
  82. sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
  83. return errCheck
  84. }
  85. }
  86. if expectation.Result != errProduceSuccess {
  87. return expectation.Result
  88. }
  89. }
  90. return nil
  91. }
  92. sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
  93. return errOutOfExpectations
  94. }
  95. // Close corresponds with the Close method of sarama's SyncProducer implementation.
  96. // By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
  97. // so it will write an error to the test state if there's any remaining expectations.
  98. func (sp *SyncProducer) Close() error {
  99. sp.l.Lock()
  100. defer sp.l.Unlock()
  101. if len(sp.expectations) > 0 {
  102. sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
  103. }
  104. return nil
  105. }
  106. ////////////////////////////////////////////////
  107. // Setting expectations
  108. ////////////////////////////////////////////////
  109. // ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
  110. // will be called. The mock producer will first call the given function to check the message value.
  111. // It will cascade the error of the function, if any, or handle the message as if it produced
  112. // successfully, i.e. by returning a valid partition, and offset, and a nil error.
  113. func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
  114. sp.l.Lock()
  115. defer sp.l.Unlock()
  116. sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
  117. }
  118. // ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
  119. // called. The mock producer will first call the given function to check the message value.
  120. // It will cascade the error of the function, if any, or handle the message as if it failed
  121. // to produce successfully, i.e. by returning the provided error.
  122. func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
  123. sp.l.Lock()
  124. defer sp.l.Unlock()
  125. sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
  126. }
  127. // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
  128. // called. The mock producer will handle the message as if it produced successfully, i.e. by
  129. // returning a valid partition, and offset, and a nil error.
  130. func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
  131. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
  132. }
  133. // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
  134. // called. The mock producer will handle the message as if it failed to produce
  135. // successfully, i.e. by returning the provided error.
  136. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
  137. sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
  138. }