Browse Source

Merge pull request #119 from Shopify/validate-offset

Validate the offset received from kafka when consuming
Evan Huus 11 years ago
parent
commit
3221de0278
1 changed files with 12 additions and 2 deletions
  1. 12 2
      consumer.go

+ 12 - 2
consumer.go

@@ -265,13 +265,23 @@ func (c *Consumer) fetchMessages() {
 
 
 		for _, msgBlock := range block.MsgSet.Messages {
 		for _, msgBlock := range block.MsgSet.Messages {
 			for _, msg := range msgBlock.Messages() {
 			for _, msg := range msgBlock.Messages() {
+
+				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
+				if msg.Offset != c.offset {
+					event.Err = IncompleteResponse
+				} else {
+					event.Key = msg.Msg.Key
+					event.Value = msg.Msg.Value
+					event.Offset = msg.Offset
+					c.offset++
+				}
+
 				select {
 				select {
 				case <-c.stopper:
 				case <-c.stopper:
 					close(c.events)
 					close(c.events)
 					close(c.done)
 					close(c.done)
 					return
 					return
-				case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}:
-					c.offset++
+				case c.events <- event:
 				}
 				}
 			}
 			}
 		}
 		}