Willem van Bergen 10 years ago
parent
commit
61ca52d100
1 changed files with 88 additions and 39 deletions
  1. 88 39
      mocks/consumer.go

+ 88 - 39
mocks/consumer.go

@@ -6,6 +6,9 @@ import (
 	"github.com/Shopify/sarama"
 	"github.com/Shopify/sarama"
 )
 )
 
 
+// Consumer implements sarama's Consumer interface for testing purposes.
+// Before you can start consuming from this consumer, you have to register
+// topic/partitions using OnPartition, and set expectations on them.
 type Consumer struct {
 type Consumer struct {
 	l                  sync.Mutex
 	l                  sync.Mutex
 	t                  ErrorReporter
 	t                  ErrorReporter
@@ -13,17 +16,9 @@ type Consumer struct {
 	partitionConsumers map[string]map[int32]*PartitionConsumer
 	partitionConsumers map[string]map[int32]*PartitionConsumer
 }
 }
 
 
-type PartitionConsumer struct {
-	l            sync.Mutex
-	t            ErrorReporter
-	topic        string
-	partition    int32
-	expectations chan *consumerExpectation
-	messages     chan *sarama.ConsumerMessage
-	errors       chan *sarama.ConsumerError
-	consumed     bool
-}
-
+// NewConsumer returns a new mock Consumer instance. The t argument should
+// be the *testing.T instance of your test method. An error will be written to it if
+// an expectation is violated. The config argument is currently unused and can be set to nil.
 func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
 func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
 	if config == nil {
 	if config == nil {
 		config = sarama.NewConfig()
 		config = sarama.NewConfig()
@@ -37,6 +32,13 @@ func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
 	return c
 	return c
 }
 }
 
 
+///////////////////////////////////////////////////
+// Consumer interface implementation
+///////////////////////////////////////////////////
+
+// ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
+// Before you can start consuming a partition, you have to set expectations on it using OnPartition.
+// You can only consume a partition once per consumer.
 func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
 func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
 	c.l.Lock()
 	c.l.Lock()
 	defer c.l.Unlock()
 	defer c.l.Unlock()
@@ -56,7 +58,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 	return pc, nil
 	return pc, nil
 }
 }
 
 
-// Close implements the Close method from the sarama.Consumer interface.
+// Close implements the Close method from the sarama.Consumer interface. It will close
+// all registered PartitionConsumer instances.
 func (c *Consumer) Close() error {
 func (c *Consumer) Close() error {
 	c.l.Lock()
 	c.l.Lock()
 	defer c.l.Unlock()
 	defer c.l.Unlock()
@@ -70,6 +73,15 @@ func (c *Consumer) Close() error {
 	return nil
 	return nil
 }
 }
 
 
+///////////////////////////////////////////////////
+// Expectation API
+///////////////////////////////////////////////////
+
+// OnPartition will register a topic/partition, so you can set expectations on it.
+// The registered PartitionConsumer will be returned, so you can set expectations
+// on it using method chanining. Once a topic/partition is registered, you are
+// expected to start consuming it using ConsumePartition. If that doesn't happen,
+// an error will be written to the error reporter once the mock consumer is closed.
 func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
 func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
 	c.l.Lock()
 	c.l.Lock()
 	defer c.l.Unlock()
 	defer c.l.Unlock()
@@ -92,12 +104,64 @@ func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer
 	return c.partitionConsumers[topic][partition]
 	return c.partitionConsumers[topic][partition]
 }
 }
 
 
+///////////////////////////////////////////////////
+// PartitionConsumer mock type
+///////////////////////////////////////////////////
+
+// PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
+// It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
+// registered first using the Consumer's OnPartition method. Before consuming the
+// Errors and Messages channel, you should set expectations on it using
+// ExpectMessage and ExpectError.
+type PartitionConsumer struct {
+	l            sync.Mutex
+	t            ErrorReporter
+	topic        string
+	partition    int32
+	expectations chan *consumerExpectation
+	messages     chan *sarama.ConsumerMessage
+	errors       chan *sarama.ConsumerError
+	consumed     bool
+}
+
+func (pc *PartitionConsumer) handleExpectations() {
+	pc.l.Lock()
+	defer pc.l.Unlock()
+
+	var offset int64
+	for ex := range pc.expectations {
+		if ex.Err != nil {
+			pc.errors <- &sarama.ConsumerError{
+				Topic:     pc.topic,
+				Partition: pc.partition,
+				Err:       ex.Err,
+			}
+		} else {
+			offset++
+
+			ex.Msg.Topic = pc.topic
+			ex.Msg.Partition = pc.partition
+			ex.Msg.Offset = offset
+
+			pc.messages <- ex.Msg
+		}
+	}
+
+	close(pc.messages)
+	close(pc.errors)
+}
+
+///////////////////////////////////////////////////
+// PartitionConsumer interface implementation
+///////////////////////////////////////////////////
+
 // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
 // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
 func (pc *PartitionConsumer) AsyncClose() {
 func (pc *PartitionConsumer) AsyncClose() {
 	close(pc.expectations)
 	close(pc.expectations)
 }
 }
 
 
-// Close implements the Close method from the sarama.PartitionConsumer interface.
+// Close implements the Close method from the sarama.PartitionConsumer interface. It will
+// verify whether the partition consumer was actually started.
 func (pc *PartitionConsumer) Close() error {
 func (pc *PartitionConsumer) Close() error {
 	if !pc.consumed {
 	if !pc.consumed {
 		pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
 		pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
@@ -147,37 +211,22 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
 	return pc.messages
 	return pc.messages
 }
 }
 
 
-func (pc *PartitionConsumer) handleExpectations() {
-	pc.l.Lock()
-	defer pc.l.Unlock()
-
-	var offset int64
-	for ex := range pc.expectations {
-		if ex.Err != nil {
-			pc.errors <- &sarama.ConsumerError{
-				Topic:     pc.topic,
-				Partition: pc.partition,
-				Err:       ex.Err,
-			}
-		} else {
-			offset++
-
-			ex.Msg.Topic = pc.topic
-			ex.Msg.Partition = pc.partition
-			ex.Msg.Offset = offset
-
-			pc.messages <- ex.Msg
-		}
-	}
-
-	close(pc.messages)
-	close(pc.errors)
-}
+///////////////////////////////////////////////////
+// Expectation API
+///////////////////////////////////////////////////
 
 
+// ExpectMessage will make sure a message will be placed on the Messages channel
+// of this partition consumer. The mock consumer will not verify whether this message
+// was consumed from the Messages channel, because there are legitimate reasons for
+// this not to happen.
 func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
 func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
 	pc.expectations <- &consumerExpectation{Msg: msg}
 	pc.expectations <- &consumerExpectation{Msg: msg}
 }
 }
 
 
+// ExpectError will make sure an error will be placed on the Errors channel
+// of this partition consumer. The mock consumer will not verify whether this error
+// was consumed from the Errors channel, because there are legitimate reasons for
+// this not to happen.
 func (pc *PartitionConsumer) ExpectError(err error) {
 func (pc *PartitionConsumer) ExpectError(err error) {
 	pc.expectations <- &consumerExpectation{Err: err}
 	pc.expectations <- &consumerExpectation{Err: err}
 }
 }