|
|
@@ -122,6 +122,11 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
|
|
|
|
|
|
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
|
|
|
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
|
+
|
|
|
+ // NOTE:
|
|
|
+ // Do not move the code below to a goroutine.
|
|
|
+ // The `ConsumeClaim` itself is called within a goroutine, see:
|
|
|
+ // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
|
|
|
for message := range claim.Messages() {
|
|
|
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
|
|
|
session.MarkMessage(message, "")
|