|
|
@@ -264,10 +264,9 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
|
|
|
pc.l.Lock()
|
|
|
defer pc.l.Unlock()
|
|
|
|
|
|
- atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
msg.Topic = pc.topic
|
|
|
msg.Partition = pc.partition
|
|
|
- msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
|
|
|
+ msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
|
|
|
pc.messages <- msg
|
|
|
}
|