Browse Source

Add mocks.Consumer implementation.

Willem van Bergen 10 years ago
parent
commit
43dbec2ed7
3 changed files with 298 additions and 1 deletions
  1. 169 0
      mocks/consumer.go
  2. 121 0
      mocks/consumer_test.go
  3. 8 1
      mocks/mocks.go

+ 169 - 0
mocks/consumer.go

@@ -0,0 +1,169 @@
+package mocks
+
+import (
+	"sync"
+
+	"github.com/Shopify/sarama"
+)
+
+type Consumer struct {
+	l                  sync.Mutex
+	t                  ErrorReporter
+	config             *sarama.Config
+	partitionConsumers map[string]map[int32]*PartitionConsumer
+}
+
+type PartitionConsumer struct {
+	l            sync.Mutex
+	t            ErrorReporter
+	topic        string
+	partition    int32
+	expectations chan *consumerExpectation
+	messages     chan *sarama.ConsumerMessage
+	errors       chan *sarama.ConsumerError
+	consumed     bool
+}
+
+func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
+	if config == nil {
+		config = sarama.NewConfig()
+	}
+
+	c := &Consumer{
+		t:                  t,
+		config:             config,
+		partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
+	}
+	return c
+}
+
+func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
+		c.t.Errorf("No expectations set for %s/%d", topic, partition)
+		return nil, errOutOfExpectations
+	}
+
+	pc := c.partitionConsumers[topic][partition]
+	pc.consumed = true
+	go pc.handleExpectations()
+	return pc, nil
+}
+
+// Close implements the Close method from the sarama.Consumer interface.
+func (c *Consumer) Close() error {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	for _, partitions := range c.partitionConsumers {
+		for _, partitionConsumer := range partitions {
+			_ = partitionConsumer.Close()
+		}
+	}
+
+	return nil
+}
+
+func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	if c.partitionConsumers[topic] == nil {
+		c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
+	}
+
+	if c.partitionConsumers[topic][partition] == nil {
+		c.partitionConsumers[topic][partition] = &PartitionConsumer{
+			t:            c.t,
+			topic:        topic,
+			partition:    partition,
+			expectations: make(chan *consumerExpectation, 1000),
+			messages:     make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
+			errors:       make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
+		}
+	}
+
+	return c.partitionConsumers[topic][partition]
+}
+
+// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
+func (pc *PartitionConsumer) AsyncClose() {
+	close(pc.expectations)
+}
+
+// Close implements the Close method from the sarama.PartitionConsumer interface.
+func (pc *PartitionConsumer) Close() error {
+	if !pc.consumed {
+		pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
+	}
+
+	pc.AsyncClose()
+
+	var errs = make(sarama.ConsumerErrors, 0)
+	go func() {
+		for err := range pc.errors {
+			errs = append(errs, err)
+		}
+	}()
+
+	go func() {
+		for _ = range pc.messages {
+			// drain
+		}
+	}()
+
+	pc.l.Lock()
+	pc.l.Unlock()
+
+	if len(errs) > 0 {
+		return errs
+	}
+	return nil
+}
+
+// Errors implements the Errors method from the sarama.PartitionConsumer interface.
+func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
+	return pc.errors
+}
+
+// Messages implements the Messages method from the sarama.PartitionConsumer interface.
+func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
+	return pc.messages
+}
+
+func (pc *PartitionConsumer) handleExpectations() {
+	pc.l.Lock()
+	defer pc.l.Unlock()
+
+	var offset int64
+	for ex := range pc.expectations {
+		if ex.Err != nil {
+			pc.errors <- &sarama.ConsumerError{
+				Topic:     pc.topic,
+				Partition: pc.partition,
+				Err:       ex.Err,
+			}
+		} else {
+			offset++
+
+			ex.Msg.Topic = pc.topic
+			ex.Msg.Partition = pc.partition
+			ex.Msg.Offset = offset
+
+			pc.messages <- ex.Msg
+		}
+	}
+
+	close(pc.messages)
+	close(pc.errors)
+}
+
+func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
+	pc.expectations <- &consumerExpectation{Msg: msg}
+}
+
+func (pc *PartitionConsumer) ExpectError(err error) {
+	pc.expectations <- &consumerExpectation{Err: err}
+}

+ 121 - 0
mocks/consumer_test.go

@@ -0,0 +1,121 @@
+package mocks
+
+import (
+	"testing"
+
+	"github.com/Shopify/sarama"
+)
+
+func TestMockConsumerImplementsConsumerInterface(t *testing.T) {
+	var c interface{} = &Consumer{}
+	if _, ok := c.(sarama.Consumer); !ok {
+		t.Error("The mock consumer should implement the sarama.Consumer interface.")
+	}
+
+	var pc interface{} = &PartitionConsumer{}
+	if _, ok := pc.(sarama.PartitionConsumer); !ok {
+		t.Error("The mock partitionconsumer should implement the sarama.PartitionConsumer interface.")
+	}
+}
+
+func TestConsumerHandlesExpectations(t *testing.T) {
+	consumer := NewConsumer(t, nil)
+	defer func() {
+		if err := consumer.Close(); err != nil {
+			t.Error(err)
+		}
+	}()
+
+	consumer.OnPartition("test", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
+	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
+	consumer.OnPartition("test", 1).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
+	consumer.OnPartition("other", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
+
+	pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
+	if err != nil {
+		t.Fatal(err)
+	}
+	test0_msg := <-pc_test0.Messages()
+	if test0_msg.Topic != "test" || test0_msg.Partition != 0 || string(test0_msg.Value) != "hello world" {
+		t.Error("Message was not as expected:", test0_msg)
+	}
+	test0_err := <-pc_test0.Errors()
+	if test0_err.Err != sarama.ErrOutOfBrokers {
+		t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err)
+	}
+
+	pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
+	if err != nil {
+		t.Fatal(err)
+	}
+	test1_msg := <-pc_test1.Messages()
+	if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" {
+		t.Error("Message was not as expected:", test1_msg)
+	}
+
+	pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetOldest)
+	if err != nil {
+		t.Fatal(err)
+	}
+	other0_msg := <-pc_other0.Messages()
+	if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" {
+		t.Error("Message was not as expected:", other0_msg)
+	}
+}
+
+func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) {
+	consumer := NewConsumer(t, nil)
+	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
+	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
+
+	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-pc.Messages():
+		t.Error("Did not epxect a message on the messages channel.")
+	case err := <-pc.Errors():
+		if err.Err != sarama.ErrOutOfBrokers {
+			t.Error("Expected sarama.ErrOutOfBrokers, found", err)
+		}
+	}
+
+	errs := pc.Close().(sarama.ConsumerErrors)
+	if len(errs) != 1 && errs[0].Err != sarama.ErrOutOfBrokers {
+		t.Error("Expected Close to return the remaining sarama.ErrOutOfBrokers")
+	}
+}
+
+func TestConsumerWithoutExpectationsOnPartition(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+
+	_, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
+	if err != errOutOfExpectations {
+		t.Error("Expected ConsumePartition to return errOutOfExpectations")
+	}
+
+	if err := consumer.Close(); err != nil {
+		t.Error("No error expected on close, but found:", err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Errorf("Expected an expectation failure to be set on the error reporter.")
+	}
+}
+
+func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+	consumer.OnPartition("test", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
+
+	if err := consumer.Close(); err != nil {
+		t.Error("No error expected on close, but found:", err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Errorf("Expected an expectation failure to be set on the error reporter.")
+	}
+}

+ 8 - 1
mocks/mocks.go

@@ -15,6 +15,8 @@ package mocks
 
 import (
 	"errors"
+
+	"github.com/Shopify/sarama"
 )
 
 // A simple interface that includes the testing.T methods we use to report
@@ -25,9 +27,14 @@ type ErrorReporter interface {
 
 var (
 	errProduceSuccess    error = nil
-	errOutOfExpectations       = errors.New("No more expectations set on mock producer")
+	errOutOfExpectations       = errors.New("No more expectations set on mock")
 )
 
 type producerExpectation struct {
 	Result error
 }
+
+type consumerExpectation struct {
+	Err error
+	Msg *sarama.ConsumerMessage
+}