Explorar el Código

consumer: silently skip messages already consumed

It seems that sometimes the broker can return us messages we've already seen,
and that this is legitimate, so we shouldn't produce errors, just quietly ignore
them.

Possibly fixes #166 pending confirmation from upstream
(https://issues.apache.org/jira/browse/KAFKA-1744).
Evan Huus hace 11 años
padre
commit
64e556f2c9
Se han modificado 1 ficheros con 3 adiciones y 1 borrados
  1. 3 1
      consumer.go

+ 3 - 1
consumer.go

@@ -267,7 +267,9 @@ func (c *Consumer) fetchMessages() {
 			for _, msg := range msgBlock.Messages() {
 			for _, msg := range msgBlock.Messages() {
 
 
 				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
 				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
-				if msg.Offset != c.offset {
+				if msg.Offset < c.offset {
+					continue
+				} else if msg.Offset > c.offset {
 					event.Err = IncompleteResponse
 					event.Err = IncompleteResponse
 				} else {
 				} else {
 					event.Key = msg.Msg.Key
 					event.Key = msg.Msg.Key