Browse Source

More error handling in consumer

Evan Huus 12 years ago
parent
commit
1a4ee4cbad
1 changed files with 19 additions and 1 deletions
  1. 19 1
      kafka/consumer.go

+ 19 - 1
kafka/consumer.go

@@ -76,7 +76,8 @@ func (c *Consumer) fetchMessages() {
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 		response, err := c.broker.Fetch(c.client.id, request)
-		if err != nil {
+		switch err.(type) {
+		case k.EncodingError:
 			select {
 			case <-c.stopper:
 				close(c.messages)
@@ -86,6 +87,23 @@ func (c *Consumer) fetchMessages() {
 			case c.errors <- err:
 				continue
 			}
+		case nil:
+			break
+		default:
+			c.client.disconnectBroker(c.broker)
+			c.broker, err = c.client.leader(c.topic, c.partition)
+			if err != nil {
+				select {
+				case <-c.stopper:
+					close(c.messages)
+					close(c.errors)
+					close(c.done)
+					return
+				case c.errors <- err:
+					continue
+				}
+			}
+			continue
 		}
 
 		block := response.GetBlock(c.topic, c.partition)