Browse Source

Add tests for channel draining expectations.

Willem van Bergen 10 năm trước cách đây
mục cha
commit
abfb8a5a6f
2 tập tin đã thay đổi với 92 bổ sung10 xóa
  1. 21 10
      mocks/consumer.go
  2. 71 0
      mocks/consumer_test.go

+ 21 - 10
mocks/consumer.go

@@ -128,6 +128,7 @@ type PartitionConsumer struct {
 	expectations            chan *consumerExpectation
 	messages                chan *sarama.ConsumerMessage
 	errors                  chan *sarama.ConsumerError
+	singleClose             sync.Once
 	consumed                bool
 	errorsShouldBeDrained   bool
 	messagesShouldBeDrained bool
@@ -166,7 +167,9 @@ func (pc *PartitionConsumer) handleExpectations() {
 
 // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
 func (pc *PartitionConsumer) AsyncClose() {
-	close(pc.expectations)
+	pc.singleClose.Do(func() {
+		close(pc.expectations)
+	})
 }
 
 // Close implements the Close method from the sarama.PartitionConsumer interface. It will
@@ -233,25 +236,33 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
 ///////////////////////////////////////////////////
 
 // YieldMessage will yield a messages Messages channel of this partition consumer
-// when it is consumed. The mock consumer will not verify whether this message
-// was consumed from the Messages channel, because there are legitimate reasons for
-// this not to happen.
+// when it is consumed. By default, the mock consumer will not verify whether this
+// message was consumed from the Messages channel, because there are legitimate
+// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
+// verify that the channel is empty on close.
 func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
 	pc.expectations <- &consumerExpectation{Msg: msg}
 }
 
 // YieldError will yield an error on the Errors channel of this partition consumer
-// when it is consumed. The mock consumer will not verify whether this error was
+// when it is consumed. By default, the mock consumer will not verify whether this error was
 // consumed from the Errors channel, because there are legitimate reasons for this
-// not to happen.
+// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
+// the channel is empty on close.
 func (pc *PartitionConsumer) YieldError(err error) {
 	pc.expectations <- &consumerExpectation{Err: err}
 }
 
-func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
-	pc.errorsShouldBeDrained = true
-}
-
+// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
+// that the messages channel will be fully drained when Close is called. If this
+// expectation is not met, an error is reported to the error reporter.
 func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() {
 	pc.messagesShouldBeDrained = true
 }
+
+// ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
+// that the errors channel will be fully drained when Close is called. If this
+// expectation is not met, an error is reported to the error reporter.
+func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() {
+	pc.errorsShouldBeDrained = true
+}

+ 71 - 0
mocks/consumer_test.go

@@ -119,3 +119,74 @@ func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
 		t.Errorf("Expected an expectation failure to be set on the error reporter.")
 	}
 }
+
+func TestConsumerWithWrongOffsetExpectation(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+	consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
+
+	_, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
+	if err != nil {
+		t.Error("Did not expect error, found:", err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Errorf("Expected an expectation failure to be set on the error reporter.")
+	}
+
+	if err := consumer.Close(); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+	pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
+	pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
+	pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
+	pcmock.ExpectMessagesDrainedOnClose()
+
+	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
+	if err != nil {
+		t.Error(err)
+	}
+
+	// consume first message, not second one
+	<-pc.Messages()
+
+	if err := consumer.Close(); err != nil {
+		t.Error(err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Errorf("Expected an expectation failure to be set on the error reporter.")
+	}
+}
+
+func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+
+	pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
+	pcmock.YieldError(sarama.ErrInvalidMessage)
+	pcmock.YieldError(sarama.ErrInvalidMessage)
+	pcmock.ExpectErrorsDrainedOnClose()
+
+	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
+	if err != nil {
+		t.Error(err)
+	}
+
+	// consume first and second error,
+	<-pc.Errors()
+	<-pc.Errors()
+
+	if err := consumer.Close(); err != nil {
+		t.Error(err)
+	}
+
+	if len(trm.errors) != 0 {
+		t.Errorf("Expected ano expectation failures to be set on the error reporter.")
+	}
+}