Browse Source

Gracefully handle leader election (3rd time lucky)

The timer and retries should be in refreshTopics, nowhere else. Everything else
can just check for LEADER_NOT_AVAILABLE with the other recoverable errors (like
NOT_LEADER_FOR_PARTITION) and call refreshTopics in the same code path. Simpler,
safer, and saner.
Evan Huus 12 years ago
parent
commit
c058b830d6
3 changed files with 78 additions and 61 deletions
  1. 58 33
      kafka/client.go
  2. 18 1
      kafka/consumer.go
  3. 2 27
      kafka/producer.go

+ 58 - 33
kafka/client.go

@@ -5,6 +5,7 @@ import k "sarama/protocol"
 import (
 	"sort"
 	"sync"
+	"time"
 )
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -40,7 +41,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
 	client.brokers[tmp.ID()] = tmp
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = client.refreshTopics(make([]string, 0))
+	err = client.refreshTopics(make([]string, 0), 3)
 	if err != nil {
 		client.Close() // this closes tmp, since it's still in the brokers hash
 		return nil, err
@@ -72,18 +73,18 @@ func (client *Client) Close() {
 // if Go had the concept they would be marked 'protected'
 
 func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
-	leader, kerr := client.cachedLeader(topic, partition_id)
+	leader := client.cachedLeader(topic, partition_id)
 
-	if kerr != k.NO_ERROR {
+	if leader == nil {
 		err := client.refreshTopic(topic)
 		if err != nil {
 			return nil, err
 		}
-		leader, kerr = client.cachedLeader(topic, partition_id)
+		leader = client.cachedLeader(topic, partition_id)
 	}
 
-	if kerr != k.NO_ERROR {
-		return nil, kerr
+	if leader == nil {
+		return nil, k.UNKNOWN_TOPIC_OR_PARTITION
 	}
 
 	return leader, nil
@@ -117,14 +118,35 @@ func (client *Client) disconnectBroker(broker *k.Broker) {
 	go broker.Close()
 }
 
-func (client *Client) refreshTopics(topics []string) error {
+func (client *Client) refreshTopic(topic string) error {
+	tmp := make([]string, 1)
+	tmp[0] = topic
+	// we permit three retries by default, 'cause that seemed like a nice number
+	return client.refreshTopics(tmp, 3)
+}
+
+// truly private helper functions
+
+func (client *Client) refreshTopics(topics []string, retries int) error {
 	for broker := client.any(); broker != nil; broker = client.any() {
 		response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
 
 		switch err.(type) {
 		case nil:
 			// valid response, use it
-			return client.update(response)
+			retry, err := client.update(response)
+			switch {
+			case err != nil:
+				return err
+			case len(retry) == 0:
+				return nil
+			default:
+				if retries <= 0 {
+					return k.LEADER_NOT_AVAILABLE
+				}
+				time.Sleep(250 * time.Millisecond) // wait for leader election
+				return client.refreshTopics(retry, retries-1)
+			}
 		case k.EncodingError:
 			// didn't even send, return the error
 			return err
@@ -137,14 +159,6 @@ func (client *Client) refreshTopics(topics []string) error {
 	return OutOfBrokers
 }
 
-func (client *Client) refreshTopic(topic string) error {
-	tmp := make([]string, 1)
-	tmp[0] = topic
-	return client.refreshTopics(tmp)
-}
-
-// truly private helper functions
-
 func (client *Client) any() *k.Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
@@ -156,23 +170,19 @@ func (client *Client) any() *k.Broker {
 	return nil
 }
 
-func (client *Client) cachedLeader(topic string, partition_id int32) (*k.Broker, k.KError) {
+func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
 	partitions := client.leaders[topic]
 	if partitions != nil {
 		leader, ok := partitions[partition_id]
-		if ok {
-			if leader == -1 {
-				return nil, k.LEADER_NOT_AVAILABLE
-			} else {
-				return client.brokers[leader], k.NO_ERROR
-			}
+		if ok && leader != -1 {
+			return client.brokers[leader]
 		}
 	}
 
-	return nil, k.UNKNOWN_TOPIC_OR_PARTITION
+	return nil
 }
 
 func (client *Client) cachedPartitions(topic string) []int32 {
@@ -193,7 +203,8 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 	return ret
 }
 
-func (client *Client) update(data *k.MetadataResponse) error {
+// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
+func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
 	// First discard brokers that we already know about. This avoids bouncing TCP connections,
 	// and especially avoids closing valid connections out from under other code which may be trying
 	// to use them. We only need a read-lock for this.
@@ -211,7 +222,7 @@ func (client *Client) update(data *k.MetadataResponse) error {
 	for _, broker := range newBrokers {
 		err := broker.Connect()
 		if err != nil {
-			return err
+			return nil, err
 		}
 	}
 
@@ -225,23 +236,37 @@ func (client *Client) update(data *k.MetadataResponse) error {
 		client.brokers[broker.ID()] = broker
 	}
 
+	toRetry := make(map[string]bool)
+
 	for _, topic := range data.Topics {
-		if topic.Err != k.NO_ERROR {
-			return topic.Err
+		switch topic.Err {
+		case k.NO_ERROR:
+			break
+		case k.LEADER_NOT_AVAILABLE:
+			toRetry[topic.Name] = true
+		default:
+			return nil, topic.Err
 		}
 		client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
-			case k.NO_ERROR, k.LEADER_NOT_AVAILABLE:
+			case k.LEADER_NOT_AVAILABLE:
 				// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
-				// partition is in the middle of leader election, so we save it anyways to avoid
-				// returning the stale leader (since our broker map should never have a broker with ID -1)
+				// partition is in the middle of leader election, so we fallthrough to save it
+				// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
+				toRetry[topic.Name] = true
+				fallthrough
+			case k.NO_ERROR:
 				client.leaders[topic.Name][partition.Id] = partition.Leader
 			default:
-				return partition.Err
+				return nil, partition.Err
 			}
 		}
 	}
 
-	return nil
+	ret := make([]string, 0, len(toRetry))
+	for topic, _ := range toRetry {
+		ret = append(ret, topic)
+	}
+	return ret, nil
 }

+ 18 - 1
kafka/consumer.go

@@ -119,7 +119,24 @@ func (c *Consumer) fetchMessages() {
 			}
 		}
 
-		if block.Err != k.NO_ERROR {
+		switch block.Err {
+		case k.NO_ERROR:
+			break
+		case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
+			err = c.client.refreshTopic(c.topic)
+			if err != nil {
+				select {
+				case <-c.stopper:
+					close(c.messages)
+					close(c.errors)
+					close(c.done)
+					return
+				case c.errors <- block.Err:
+					continue
+				}
+			}
+			continue
+		default:
 			select {
 			case <-c.stopper:
 				close(c.messages)

+ 2 - 27
kafka/producer.go

@@ -1,7 +1,6 @@
 package kafka
 
 import k "sarama/protocol"
-import "time"
 
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
@@ -65,20 +64,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	}
 
 	broker, err := p.client.leader(p.topic, partition)
-	switch t := err.(type) {
-	case k.KError:
-		if t == k.LEADER_NOT_AVAILABLE {
-			time.Sleep(250 * time.Millisecond) // wait for leader election
-			broker, err = p.client.leader(p.topic, partition)
-			if err != nil {
-				return err
-			}
-		} else {
-			return err
-		}
-	case nil:
-		break
-	default:
+	if err != nil {
 		return err
 	}
 
@@ -111,18 +97,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	switch block.Err {
 	case k.NO_ERROR:
 		return nil
-	case k.LEADER_NOT_AVAILABLE:
-		if !retry {
-			return block.Err
-		}
-		// wait for leader election to finish
-		time.Sleep(250 * time.Millisecond)
-		err = p.client.refreshTopic(p.topic)
-		if err != nil {
-			return err
-		}
-		return p.safeSendMessage(key, value, false)
-	case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION:
+	case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
 		if !retry {
 			return block.Err
 		}