Browse Source

consumer: more robust handling of message offsets

- Only skip messages before the requested offset at the beginning of a set
- Error if we don't receive any legitimate messages in the set.
Evan Huus 11 years ago
parent
commit
6c7468fa03
1 changed files with 21 additions and 5 deletions
  1. 21 5
      consumer.go

+ 21 - 5
consumer.go

@@ -264,18 +264,24 @@ func (c *Consumer) fetchMessages() {
 		}
 		}
 
 
 		for _, msgBlock := range block.MsgSet.Messages {
 		for _, msgBlock := range block.MsgSet.Messages {
+			atLeastOne := false
+			prelude := true
+
 			for _, msg := range msgBlock.Messages() {
 			for _, msg := range msgBlock.Messages() {
+				if prelude && msg.Offset < c.offset {
+					continue
+				}
+				prelude = false
 
 
 				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
 				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
-				if msg.Offset < c.offset {
-					continue
-				} else if msg.Offset > c.offset {
-					event.Err = IncompleteResponse
-				} else {
+				if msg.Offset == c.offset {
+					atLeastOne = true
 					event.Key = msg.Msg.Key
 					event.Key = msg.Msg.Key
 					event.Value = msg.Msg.Value
 					event.Value = msg.Msg.Value
 					event.Offset = msg.Offset
 					event.Offset = msg.Offset
 					c.offset++
 					c.offset++
+				} else {
+					event.Err = IncompleteResponse
 				}
 				}
 
 
 				select {
 				select {
@@ -286,6 +292,16 @@ func (c *Consumer) fetchMessages() {
 				case c.events <- event:
 				case c.events <- event:
 				}
 				}
 			}
 			}
+
+			if !atLeastOne {
+				select {
+				case <-c.stopper:
+					close(c.events)
+					close(c.done)
+					return
+				case c.events <- &ConsumerEvent{Topic: c.topic, Partition: c.partition, Err: IncompleteResponse}:
+				}
+			}
 		}
 		}
 	}
 	}
 }
 }