@@ -47,6 +47,10 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
}
pc := c.partitionConsumers[topic][partition]
+ if pc.consumed {
+ return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
+ }
+
pc.consumed = true
go pc.handleExpectations()
return pc, nil