|
|
@@ -162,7 +162,6 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
|
|
|
// Errors and Messages channel, you should specify what values will be provided on these
|
|
|
// channels using YieldMessage and YieldError.
|
|
|
type PartitionConsumer struct {
|
|
|
- l sync.Mutex
|
|
|
t ErrorReporter
|
|
|
topic string
|
|
|
partition int32
|
|
|
@@ -261,11 +260,11 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
|
|
|
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
|
|
|
// verify that the channel is empty on close.
|
|
|
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
|
|
|
- pc.highWaterMarkOffset += 1
|
|
|
+ atomic.AddInt64(&pc.highWaterMarkOffset, 1)
|
|
|
|
|
|
msg.Topic = pc.topic
|
|
|
msg.Partition = pc.partition
|
|
|
- msg.Offset = pc.highWaterMarkOffset
|
|
|
+ msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)
|
|
|
|
|
|
pc.messages <- msg
|
|
|
}
|