Browse Source

Add basic MockProducer.

Willem van Bergen 10 years ago
parent
commit
78850bba10
2 changed files with 208 additions and 0 deletions
  1. 113 0
      kafkamocks/mock_producer.go
  2. 95 0
      kafkamocks/mock_producer_test.go

+ 113 - 0
kafkamocks/mock_producer.go

@@ -0,0 +1,113 @@
+package kafkamocks
+
+import (
+	"errors"
+
+	"github.com/Shopify/sarama"
+)
+
+type TestReporter interface {
+	Errorf(string, ...interface{})
+}
+
+type KafkaProducer interface {
+	AsyncClose()
+	Close() error
+	Input() chan<- *sarama.ProducerMessage
+	Successes() <-chan *sarama.ProducerMessage
+	Errors() <-chan *sarama.ProducerError
+}
+
+var (
+	errProduceSuccess    error = nil
+	errOutOfExpectations       = errors.New("No more expectations set on MockProducer")
+)
+
+type MockProducerExpectation struct {
+	Result error
+}
+
+type MockProducer struct {
+	expectations []*MockProducerExpectation
+	closed       chan struct{}
+	input        chan *sarama.ProducerMessage
+	successes    chan *sarama.ProducerMessage
+	errors       chan *sarama.ProducerError
+}
+
+func NewMockProducer(t TestReporter, config *sarama.Config) *MockProducer {
+	if config == nil {
+		config = sarama.NewConfig()
+	}
+	mp := &MockProducer{
+		closed:       make(chan struct{}, 0),
+		expectations: make([]*MockProducerExpectation, 0),
+		input:        make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
+		successes:    make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
+		errors:       make(chan *sarama.ProducerError, config.ChannelBufferSize),
+	}
+
+	go func() {
+		defer func() {
+			close(mp.successes)
+			close(mp.errors)
+		}()
+
+		for msg := range mp.input {
+			if mp.expectations == nil || len(mp.expectations) == 0 {
+				mp.expectations = nil
+				t.Errorf("No more expectation set on this mock producer to handle the input message.")
+			} else {
+				expectation := mp.expectations[0]
+				mp.expectations = mp.expectations[1:]
+				if expectation.Result == errProduceSuccess && config.Producer.AckSuccesses {
+					mp.successes <- msg
+				} else {
+					mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
+				}
+			}
+		}
+
+		if len(mp.expectations) > 0 {
+			t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
+		}
+
+		close(mp.closed)
+	}()
+
+	return mp
+}
+
+// Implement KafkaProducer interface
+
+func (mp *MockProducer) AsyncClose() {
+	close(mp.input)
+}
+
+func (mp *MockProducer) Close() error {
+	mp.AsyncClose()
+	<-mp.closed
+	return nil
+}
+
+func (mp *MockProducer) Input() chan<- *sarama.ProducerMessage {
+	return mp.input
+}
+
+func (mp *MockProducer) Successes() <-chan *sarama.ProducerMessage {
+	return mp.successes
+}
+
+func (mp *MockProducer) Errors() <-chan *sarama.ProducerError {
+	return mp.errors
+}
+
+// Setting expectations
+
+func (mp *MockProducer) ExpectInputAndSucceed() {
+	mp.expectations = append(mp.expectations, &MockProducerExpectation{Result: errProduceSuccess})
+}
+
+func (mp *MockProducer) ExpectInputAndFail(err error) {
+	mp.expectations = append(mp.expectations, &MockProducerExpectation{Result: err})
+}

+ 95 - 0
kafkamocks/mock_producer_test.go

@@ -0,0 +1,95 @@
+package kafkamocks
+
+import (
+	"fmt"
+	"testing"
+
+	"github.com/Shopify/sarama"
+)
+
+type testReporterMock struct {
+	errors []string
+}
+
+func newTestReporterMock() *testReporterMock {
+	return &testReporterMock{errors: make([]string, 0)}
+}
+
+func (trm *testReporterMock) Errorf(format string, args ...interface{}) {
+	trm.errors = append(trm.errors, fmt.Sprintf(format, args...))
+}
+
+func TestMockProducerImplementsKafkaProducer(t *testing.T) {
+	var mp interface{} = &MockProducer{}
+	if _, ok := mp.(KafkaProducer); !ok {
+		t.Error("MockProducer should implement the KafkaProducer interface.")
+	}
+}
+
+func TestSaramaProducerImplementsKafkaProducer(t *testing.T) {
+	var sp interface{} = &sarama.Producer{}
+	if _, ok := sp.(KafkaProducer); !ok {
+		t.Error("sarama.Producer should implement the KafkaProducer interface.")
+	}
+}
+
+func TestReturnExpectationsToChannels(t *testing.T) {
+	config := sarama.NewConfig()
+	config.Producer.AckSuccesses = true
+	mp := NewMockProducer(t, config)
+
+	mp.ExpectInputAndSucceed()
+	mp.ExpectInputAndSucceed()
+	mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
+
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test 1"}
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test 2"}
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test 3"}
+
+	msg1 := <-mp.Successes()
+	msg2 := <-mp.Successes()
+	err1 := <-mp.Errors()
+
+	if msg1.Topic != "test 1" {
+		t.Error("Expected message 1 to be returned first")
+	}
+
+	if msg2.Topic != "test 2" {
+		t.Error("Expected message 2 to be returned second")
+	}
+
+	if err1.Msg.Topic != "test 3" || err1.Err != sarama.ErrOutOfBrokers {
+		t.Error("Expected message 3 to be returned as error")
+	}
+
+	mp.Close()
+}
+
+func TestTooFewExpectations(t *testing.T) {
+	trm := newTestReporterMock()
+	mp := NewMockProducer(trm, nil)
+	mp.ExpectInputAndSucceed()
+
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
+
+	mp.Close()
+
+	if len(trm.errors) != 1 {
+		t.Error("Expected to report an error")
+	}
+}
+
+func TestTooManyExpectations(t *testing.T) {
+	trm := newTestReporterMock()
+	mp := NewMockProducer(trm, nil)
+	mp.ExpectInputAndSucceed()
+	mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
+
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
+	mp.Close()
+
+	if len(trm.errors) != 1 {
+		t.Error("Expected to report an error")
+	}
+}