123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- package mocks
- import (
- "sync"
- "github.com/Shopify/sarama"
- )
- // SyncProducer implements sarama's SyncProducer interface for testing purposes.
- // Before you can use it, you have to set expectations on the mock SyncProducer
- // to tell it how to handle calls to SendMessage, so you can easily test success
- // and failure scenarios.
- type SyncProducer struct {
- l sync.Mutex
- t ErrorReporter
- expectations []*producerExpectation
- lastOffset int64
- }
- // NewSyncProducer instantiates a new SyncProducer mock. The t argument should
- // be the *testing.T instance of your test method. An error will be written to it if
- // an expectation is violated. The config argument is currently unused, but is
- // maintained to be compatible with the async Producer.
- func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
- return &SyncProducer{
- t: t,
- expectations: make([]*producerExpectation, 0),
- }
- }
- ////////////////////////////////////////////////
- // Implement SyncProducer interface
- ////////////////////////////////////////////////
- // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
- // You have to set expectations on the mock producer before calling SendMessage, so it knows
- // how to handle them. You can set a function in each expectation so that the message value
- // checked by this function and an error is returned if the match fails.
- // If there is no more remaining expectation when SendMessage is called,
- // the mock producer will write an error to the test state object.
- func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
- sp.l.Lock()
- defer sp.l.Unlock()
- if len(sp.expectations) > 0 {
- expectation := sp.expectations[0]
- sp.expectations = sp.expectations[1:]
- if expectation.CheckFunction != nil {
- val, err := msg.Value.Encode()
- if err != nil {
- sp.t.Errorf("Input message encoding failed: %s", err.Error())
- return -1, -1, err
- }
- errCheck := expectation.CheckFunction(val)
- if errCheck != nil {
- sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
- return -1, -1, errCheck
- }
- }
- if expectation.Result == errProduceSuccess {
- sp.lastOffset++
- msg.Offset = sp.lastOffset
- return 0, msg.Offset, nil
- }
- return -1, -1, expectation.Result
- }
- sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
- return -1, -1, errOutOfExpectations
- }
- // SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
- // You have to set expectations on the mock producer before calling SendMessages, so it knows
- // how to handle them. If there is no more remaining expectations when SendMessages is called,
- // the mock producer will write an error to the test state object.
- func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
- sp.l.Lock()
- defer sp.l.Unlock()
- if len(sp.expectations) >= len(msgs) {
- expectations := sp.expectations[0:len(msgs)]
- sp.expectations = sp.expectations[len(msgs):]
- for i, expectation := range expectations {
- if expectation.CheckFunction != nil {
- val, err := msgs[i].Value.Encode()
- if err != nil {
- sp.t.Errorf("Input message encoding failed: %s", err.Error())
- return err
- }
- errCheck := expectation.CheckFunction(val)
- if errCheck != nil {
- sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
- return errCheck
- }
- }
- if expectation.Result != errProduceSuccess {
- return expectation.Result
- }
- }
- return nil
- }
- sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
- return errOutOfExpectations
- }
- // Close corresponds with the Close method of sarama's SyncProducer implementation.
- // By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
- // so it will write an error to the test state if there's any remaining expectations.
- func (sp *SyncProducer) Close() error {
- sp.l.Lock()
- defer sp.l.Unlock()
- if len(sp.expectations) > 0 {
- sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
- }
- return nil
- }
- ////////////////////////////////////////////////
- // Setting expectations
- ////////////////////////////////////////////////
- // ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
- // will be called. The mock producer will first call the given function to check the message value.
- // It will cascade the error of the function, if any, or handle the message as if it produced
- // successfully, i.e. by returning a valid partition, and offset, and a nil error.
- func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
- sp.l.Lock()
- defer sp.l.Unlock()
- sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
- }
- // ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
- // called. The mock producer will first call the given function to check the message value.
- // It will cascade the error of the function, if any, or handle the message as if it failed
- // to produce successfully, i.e. by returning the provided error.
- func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
- sp.l.Lock()
- defer sp.l.Unlock()
- sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
- }
- // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
- // called. The mock producer will handle the message as if it produced successfully, i.e. by
- // returning a valid partition, and offset, and a nil error.
- func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
- sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
- }
- // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
- // called. The mock producer will handle the message as if it failed to produce
- // successfully, i.e. by returning the provided error.
- func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
- sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
- }
|