@@ -122,6 +122,11 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+
+ // NOTE:
+ // Do not move the code below to a goroutine.
+ // The `ConsumeClaim` itself is called within a goroutine, see:
+ // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")