|
|
@@ -162,6 +162,7 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
|
|
|
// Errors and Messages channel, you should specify what values will be provided on these
|
|
|
// channels using YieldMessage and YieldError.
|
|
|
type PartitionConsumer struct {
|
|
|
+ l sync.Mutex
|
|
|
t ErrorReporter
|
|
|
topic string
|
|
|
partition int32
|
|
|
@@ -260,8 +261,10 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
|
|
|
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
|
|
|
// verify that the channel is empty on close.
|
|
|
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
|
|
|
- atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
+ pc.l.Lock()
|
|
|
+ defer pc.l.Unlock()
|
|
|
|
|
|
+ atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
msg.Topic = pc.topic
|
|
|
msg.Partition = pc.partition
|
|
|
msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
|