Browse Source

OnPartition -> ExpectConsumePartition.

Willem van Bergen 10 years ago
parent
commit
532c0656d3
3 changed files with 60 additions and 33 deletions
  1. 50 25
      mocks/consumer.go
  2. 8 8
      mocks/consumer_test.go
  3. 2 0
      mocks/mocks.go

+ 50 - 25
mocks/consumer.go

@@ -8,7 +8,7 @@ import (
 
 
 // Consumer implements sarama's Consumer interface for testing purposes.
 // Consumer implements sarama's Consumer interface for testing purposes.
 // Before you can start consuming from this consumer, you have to register
 // Before you can start consuming from this consumer, you have to register
-// topic/partitions using OnPartition, and set expectations on them.
+// topic/partitions using ExpectConsumePartition, and set expectations on them.
 type Consumer struct {
 type Consumer struct {
 	l                  sync.Mutex
 	l                  sync.Mutex
 	t                  ErrorReporter
 	t                  ErrorReporter
@@ -37,8 +37,8 @@ func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
 ///////////////////////////////////////////////////
 ///////////////////////////////////////////////////
 
 
 // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
 // ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
-// Before you can start consuming a partition, you have to set expectations on it using OnPartition.
-// You can only consume a partition once per consumer.
+// Before you can start consuming a partition, you have to set expectations on it using
+// ExpectConsumePartition. You can only consume a partition once per consumer.
 func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
 func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
 	c.l.Lock()
 	c.l.Lock()
 	defer c.l.Unlock()
 	defer c.l.Unlock()
@@ -53,6 +53,10 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 		return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
 		return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
 	}
 	}
 
 
+	if pc.offset != AnyOffset && pc.offset != offset {
+		c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
+	}
+
 	pc.consumed = true
 	pc.consumed = true
 	go pc.handleExpectations()
 	go pc.handleExpectations()
 	return pc, nil
 	return pc, nil
@@ -77,12 +81,13 @@ func (c *Consumer) Close() error {
 // Expectation API
 // Expectation API
 ///////////////////////////////////////////////////
 ///////////////////////////////////////////////////
 
 
-// OnPartition will register a topic/partition, so you can set expectations on it.
+// ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
 // The registered PartitionConsumer will be returned, so you can set expectations
 // The registered PartitionConsumer will be returned, so you can set expectations
 // on it using method chanining. Once a topic/partition is registered, you are
 // on it using method chanining. Once a topic/partition is registered, you are
 // expected to start consuming it using ConsumePartition. If that doesn't happen,
 // expected to start consuming it using ConsumePartition. If that doesn't happen,
-// an error will be written to the error reporter once the mock consumer is closed.
-func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer {
+// an error will be written to the error reporter once the mock consumer is closed. It will
+// also expect that the
+func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
 	c.l.Lock()
 	c.l.Lock()
 	defer c.l.Unlock()
 	defer c.l.Unlock()
 
 
@@ -95,6 +100,7 @@ func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer
 			t:            c.t,
 			t:            c.t,
 			topic:        topic,
 			topic:        topic,
 			partition:    partition,
 			partition:    partition,
+			offset:       offset,
 			expectations: make(chan *consumerExpectation, 1000),
 			expectations: make(chan *consumerExpectation, 1000),
 			messages:     make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
 			messages:     make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
 			errors:       make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
 			errors:       make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
@@ -110,18 +116,21 @@ func (c *Consumer) OnPartition(topic string, partition int32) *PartitionConsumer
 
 
 // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
 // PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
 // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
 // It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
-// registered first using the Consumer's OnPartition method. Before consuming the
-// Errors and Messages channel, you should set expectations on it using
-// ExpectMessage and ExpectError.
+// registered first using the Consumer's ExpectConsumePartition method. Before consuming the
+// Errors and Messages channel, you should specify what values will be provided on these
+// channels using YieldMessage and YieldError.
 type PartitionConsumer struct {
 type PartitionConsumer struct {
-	l            sync.Mutex
-	t            ErrorReporter
-	topic        string
-	partition    int32
-	expectations chan *consumerExpectation
-	messages     chan *sarama.ConsumerMessage
-	errors       chan *sarama.ConsumerError
-	consumed     bool
+	l                       sync.Mutex
+	t                       ErrorReporter
+	topic                   string
+	partition               int32
+	offset                  int64
+	expectations            chan *consumerExpectation
+	messages                chan *sarama.ConsumerMessage
+	errors                  chan *sarama.ConsumerError
+	consumed                bool
+	errorsShouldBeDrained   bool
+	messagesShouldBeDrained bool
 }
 }
 
 
 func (pc *PartitionConsumer) handleExpectations() {
 func (pc *PartitionConsumer) handleExpectations() {
@@ -168,6 +177,14 @@ func (pc *PartitionConsumer) Close() error {
 		return errPartitionConsumerNotStarted
 		return errPartitionConsumerNotStarted
 	}
 	}
 
 
+	if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
+		pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
+	}
+
+	if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
+		pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
+	}
+
 	pc.AsyncClose()
 	pc.AsyncClose()
 
 
 	var (
 	var (
@@ -215,18 +232,26 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
 // Expectation API
 // Expectation API
 ///////////////////////////////////////////////////
 ///////////////////////////////////////////////////
 
 
-// ExpectMessage will make sure a message will be placed on the Messages channel
-// of this partition consumer. The mock consumer will not verify whether this message
+// YieldMessage will yield a messages Messages channel of this partition consumer
+// when it is consumed. The mock consumer will not verify whether this message
 // was consumed from the Messages channel, because there are legitimate reasons for
 // was consumed from the Messages channel, because there are legitimate reasons for
 // this not to happen.
 // this not to happen.
-func (pc *PartitionConsumer) ExpectMessage(msg *sarama.ConsumerMessage) {
+func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
 	pc.expectations <- &consumerExpectation{Msg: msg}
 	pc.expectations <- &consumerExpectation{Msg: msg}
 }
 }
 
 
-// ExpectError will make sure an error will be placed on the Errors channel
-// of this partition consumer. The mock consumer will not verify whether this error
-// was consumed from the Errors channel, because there are legitimate reasons for
-// this not to happen.
-func (pc *PartitionConsumer) ExpectError(err error) {
+// YieldError will yield an error on the Errors channel of this partition consumer
+// when it is consumed. The mock consumer will not verify whether this error was
+// consumed from the Errors channel, because there are legitimate reasons for this
+// not to happen.
+func (pc *PartitionConsumer) YieldError(err error) {
 	pc.expectations <- &consumerExpectation{Err: err}
 	pc.expectations <- &consumerExpectation{Err: err}
 }
 }
+
+func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
+	pc.errorsShouldBeDrained = true
+}
+
+func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
+	pc.messagesShouldBeDrained = true
+}

+ 8 - 8
mocks/consumer_test.go

@@ -26,10 +26,10 @@ func TestConsumerHandlesExpectations(t *testing.T) {
 		}
 		}
 	}()
 	}()
 
 
-	consumer.OnPartition("test", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
-	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
-	consumer.OnPartition("test", 1).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
-	consumer.OnPartition("other", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
+	consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
+	consumer.ExpectConsumePartition("other", 0, AnyOffset).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
 
 
 	pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
 	pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
 	if err != nil {
 	if err != nil {
@@ -53,7 +53,7 @@ func TestConsumerHandlesExpectations(t *testing.T) {
 		t.Error("Message was not as expected:", test1_msg)
 		t.Error("Message was not as expected:", test1_msg)
 	}
 	}
 
 
-	pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetOldest)
+	pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -65,8 +65,8 @@ func TestConsumerHandlesExpectations(t *testing.T) {
 
 
 func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) {
 func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) {
 	consumer := NewConsumer(t, nil)
 	consumer := NewConsumer(t, nil)
-	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
-	consumer.OnPartition("test", 0).ExpectError(sarama.ErrOutOfBrokers)
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
 
 
 	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
 	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
 	if err != nil {
 	if err != nil {
@@ -109,7 +109,7 @@ func TestConsumerWithoutExpectationsOnPartition(t *testing.T) {
 func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
 func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
 	trm := newTestReporterMock()
 	trm := newTestReporterMock()
 	consumer := NewConsumer(trm, nil)
 	consumer := NewConsumer(trm, nil)
-	consumer.OnPartition("test", 0).ExpectMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
 
 
 	if err := consumer.Close(); err != nil {
 	if err := consumer.Close(); err != nil {
 		t.Error("No error expected on close, but found:", err)
 		t.Error("No error expected on close, but found:", err)

+ 2 - 0
mocks/mocks.go

@@ -31,6 +31,8 @@ var (
 	errPartitionConsumerNotStarted       = errors.New("The partition consumer was never started")
 	errPartitionConsumerNotStarted       = errors.New("The partition consumer was never started")
 )
 )
 
 
+const AnyOffset int64 = -1000
+
 type producerExpectation struct {
 type producerExpectation struct {
 	Result error
 	Result error
 }
 }