Forráskód Böngészése

Return a sane error if the partitioner gives a bad partition.

Evan Huus 12 éve
szülő
commit
de961cf388
2 módosított fájl, 11 hozzáadás és 1 törlés
  1. 4 0
      kafka/errors.go
  2. 7 1
      kafka/producer.go

+ 4 - 0
kafka/errors.go

@@ -12,3 +12,7 @@ var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
 // IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
 // not contain the expected information.
 var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")
+
+// InvalidPartition is the error returned when a partitioner returns an invalid partition index
+// (meaning one outside of the range [0...numPartitions-1]).
+var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

+ 7 - 1
kafka/producer.go

@@ -40,7 +40,13 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 		partitioner = p.partitioner
 	}
 
-	return partitions[partitioner.Partition(key, len(partitions))], nil
+	choice := partitioner.Partition(key, len(partitions))
+
+	if choice >= len(partitions) {
+		return -1, InvalidPartition
+	}
+
+	return partitions[choice], nil
 }
 
 func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {