|
|
@@ -60,7 +60,6 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
|
|
|
}
|
|
|
|
|
|
pc.consumed = true
|
|
|
- go pc.handleExpectations()
|
|
|
return pc, nil
|
|
|
}
|
|
|
|
|
|
@@ -141,13 +140,12 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
|
|
|
|
|
|
if c.partitionConsumers[topic][partition] == nil {
|
|
|
c.partitionConsumers[topic][partition] = &PartitionConsumer{
|
|
|
- t: c.t,
|
|
|
- topic: topic,
|
|
|
- partition: partition,
|
|
|
- offset: offset,
|
|
|
- expectations: make(chan *consumerExpectation, 1000),
|
|
|
- messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
|
|
|
- errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
|
|
|
+ t: c.t,
|
|
|
+ topic: topic,
|
|
|
+ partition: partition,
|
|
|
+ offset: offset,
|
|
|
+ messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
|
|
|
+ errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -169,7 +167,6 @@ type PartitionConsumer struct {
|
|
|
topic string
|
|
|
partition int32
|
|
|
offset int64
|
|
|
- expectations chan *consumerExpectation
|
|
|
messages chan *sarama.ConsumerMessage
|
|
|
errors chan *sarama.ConsumerError
|
|
|
singleClose sync.Once
|
|
|
@@ -179,32 +176,6 @@ type PartitionConsumer struct {
|
|
|
highWaterMarkOffset int64
|
|
|
}
|
|
|
|
|
|
-func (pc *PartitionConsumer) handleExpectations() {
|
|
|
- pc.l.Lock()
|
|
|
- defer pc.l.Unlock()
|
|
|
-
|
|
|
- for ex := range pc.expectations {
|
|
|
- if ex.Err != nil {
|
|
|
- pc.errors <- &sarama.ConsumerError{
|
|
|
- Topic: pc.topic,
|
|
|
- Partition: pc.partition,
|
|
|
- Err: ex.Err,
|
|
|
- }
|
|
|
- } else {
|
|
|
- atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
-
|
|
|
- ex.Msg.Topic = pc.topic
|
|
|
- ex.Msg.Partition = pc.partition
|
|
|
- ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
|
|
|
-
|
|
|
- pc.messages <- ex.Msg
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- close(pc.messages)
|
|
|
- close(pc.errors)
|
|
|
-}
|
|
|
-
|
|
|
///////////////////////////////////////////////////
|
|
|
// PartitionConsumer interface implementation
|
|
|
///////////////////////////////////////////////////
|
|
|
@@ -212,7 +183,8 @@ func (pc *PartitionConsumer) handleExpectations() {
|
|
|
// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
|
|
|
func (pc *PartitionConsumer) AsyncClose() {
|
|
|
pc.singleClose.Do(func() {
|
|
|
- close(pc.expectations)
|
|
|
+ close(pc.messages)
|
|
|
+ close(pc.errors)
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -289,7 +261,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
|
|
|
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
|
|
|
// verify that the channel is empty on close.
|
|
|
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
|
|
|
- pc.expectations <- &consumerExpectation{Msg: msg}
|
|
|
+ pc.highWaterMarkOffset += 1
|
|
|
+
|
|
|
+ msg.Topic = pc.topic
|
|
|
+ msg.Partition = pc.partition
|
|
|
+ msg.Offset = pc.highWaterMarkOffset
|
|
|
+
|
|
|
+ pc.messages <- msg
|
|
|
}
|
|
|
|
|
|
// YieldError will yield an error on the Errors channel of this partition consumer
|
|
|
@@ -298,7 +276,11 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
|
|
|
// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
|
|
|
// the channel is empty on close.
|
|
|
func (pc *PartitionConsumer) YieldError(err error) {
|
|
|
- pc.expectations <- &consumerExpectation{Err: err}
|
|
|
+ pc.errors <- &sarama.ConsumerError{
|
|
|
+ Topic: pc.topic,
|
|
|
+ Partition: pc.partition,
|
|
|
+ Err: err,
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
|