Browse Source

Merge pull request #555 from aaronkavlie-wf/yield_immediately

Send messages and errors directly in consumer mock
Willem van Bergen 10 years ago
parent
commit
b81dd26dcd
1 changed files with 20 additions and 39 deletions
  1. 20 39
      mocks/consumer.go

+ 20 - 39
mocks/consumer.go

@@ -60,7 +60,6 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 	}
 	}
 
 
 	pc.consumed = true
 	pc.consumed = true
-	go pc.handleExpectations()
 	return pc, nil
 	return pc, nil
 }
 }
 
 
@@ -141,13 +140,12 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
 
 
 	if c.partitionConsumers[topic][partition] == nil {
 	if c.partitionConsumers[topic][partition] == nil {
 		c.partitionConsumers[topic][partition] = &PartitionConsumer{
 		c.partitionConsumers[topic][partition] = &PartitionConsumer{
-			t:            c.t,
-			topic:        topic,
-			partition:    partition,
-			offset:       offset,
-			expectations: make(chan *consumerExpectation, 1000),
-			messages:     make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
-			errors:       make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
+			t:         c.t,
+			topic:     topic,
+			partition: partition,
+			offset:    offset,
+			messages:  make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
+			errors:    make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
 		}
 		}
 	}
 	}
 
 
@@ -164,12 +162,10 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
 // Errors and Messages channel, you should specify what values will be provided on these
 // Errors and Messages channel, you should specify what values will be provided on these
 // channels using YieldMessage and YieldError.
 // channels using YieldMessage and YieldError.
 type PartitionConsumer struct {
 type PartitionConsumer struct {
-	l                       sync.Mutex
 	t                       ErrorReporter
 	t                       ErrorReporter
 	topic                   string
 	topic                   string
 	partition               int32
 	partition               int32
 	offset                  int64
 	offset                  int64
-	expectations            chan *consumerExpectation
 	messages                chan *sarama.ConsumerMessage
 	messages                chan *sarama.ConsumerMessage
 	errors                  chan *sarama.ConsumerError
 	errors                  chan *sarama.ConsumerError
 	singleClose             sync.Once
 	singleClose             sync.Once
@@ -179,32 +175,6 @@ type PartitionConsumer struct {
 	highWaterMarkOffset     int64
 	highWaterMarkOffset     int64
 }
 }
 
 
-func (pc *PartitionConsumer) handleExpectations() {
-	pc.l.Lock()
-	defer pc.l.Unlock()
-
-	for ex := range pc.expectations {
-		if ex.Err != nil {
-			pc.errors <- &sarama.ConsumerError{
-				Topic:     pc.topic,
-				Partition: pc.partition,
-				Err:       ex.Err,
-			}
-		} else {
-			atomic.AddInt64(&pc.highWaterMarkOffset, 1)
-
-			ex.Msg.Topic = pc.topic
-			ex.Msg.Partition = pc.partition
-			ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
-
-			pc.messages <- ex.Msg
-		}
-	}
-
-	close(pc.messages)
-	close(pc.errors)
-}
-
 ///////////////////////////////////////////////////
 ///////////////////////////////////////////////////
 // PartitionConsumer interface implementation
 // PartitionConsumer interface implementation
 ///////////////////////////////////////////////////
 ///////////////////////////////////////////////////
@@ -212,7 +182,8 @@ func (pc *PartitionConsumer) handleExpectations() {
 // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
 // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
 func (pc *PartitionConsumer) AsyncClose() {
 func (pc *PartitionConsumer) AsyncClose() {
 	pc.singleClose.Do(func() {
 	pc.singleClose.Do(func() {
-		close(pc.expectations)
+		close(pc.messages)
+		close(pc.errors)
 	})
 	})
 }
 }
 
 
@@ -289,7 +260,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
 // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
 // reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
 // verify that the channel is empty on close.
 // verify that the channel is empty on close.
 func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
 func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
-	pc.expectations <- &consumerExpectation{Msg: msg}
+	atomic.AddInt64(&pc.highWaterMarkOffset, 1)
+
+	msg.Topic = pc.topic
+	msg.Partition = pc.partition
+	msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
+
+	pc.messages <- msg
 }
 }
 
 
 // YieldError will yield an error on the Errors channel of this partition consumer
 // YieldError will yield an error on the Errors channel of this partition consumer
@@ -298,7 +275,11 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
 // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
 // not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
 // the channel is empty on close.
 // the channel is empty on close.
 func (pc *PartitionConsumer) YieldError(err error) {
 func (pc *PartitionConsumer) YieldError(err error) {
-	pc.expectations <- &consumerExpectation{Err: err}
+	pc.errors <- &sarama.ConsumerError{
+		Topic:     pc.topic,
+		Partition: pc.partition,
+		Err:       err,
+	}
 }
 }
 
 
 // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
 // ExpectMessagesDrainedOnClose sets an expectation on the partition consumer