Browse Source

Handle new errors from kafka 0.8.2

Per the discussion on PR #313, there are a few changes needed to handle all the
errors that can be returned by kafka 0.8.2:
- adjust comments and error return in the client when faced with a blank topic,
  to reflect the fact that this is now fixed upstream and to match upstream's
  error message
- retry ErrNotEnoughReplicas and ErrNotEnoughReplicasAfterAppend in the producer
Evan Huus 11 years ago
parent
commit
74cd0ea353
2 changed files with 4 additions and 3 deletions
  1. 2 2
      client.go
  2. 2 1
      producer.go

+ 2 - 2
client.go

@@ -515,12 +515,12 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		return ErrClosedClient
 	}
 
-	// Kafka will throw exceptions on an empty topic and not return a proper
+	// Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
 	// error. This handles the case by returning an error instead of sending it
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 		if len(topic) == 0 {
-			return ErrUnknownTopicOrPartition
+			return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
 		}
 	}
 

+ 2 - 1
producer.go

@@ -629,7 +629,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 						}
 						p.returnSuccesses(msgs)
 					}
-				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut:
+				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+					ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
 						broker.ID(), topic, partition, block.Err)
 					if currentRetries[topic] == nil {