Evan Huus 12 лет назад
Родитель
Сommit
453e7459cc
3 измененных файлов с 31 добавлено и 21 удалено
  1. 11 9
      kafka/client.go
  2. 10 6
      kafka/consumer.go
  3. 10 6
      kafka/producer.go

+ 11 - 9
kafka/client.go

@@ -8,6 +8,8 @@ package kafka
 import k "sarama/protocol"
 
 import (
+	"sarama/encoding"
+	"sarama/types"
 	"sort"
 	"sync"
 	"time"
@@ -89,7 +91,7 @@ func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error
 	}
 
 	if leader == nil {
-		return nil, k.UNKNOWN_TOPIC_OR_PARTITION
+		return nil, types.UNKNOWN_TOPIC_OR_PARTITION
 	}
 
 	return leader, nil
@@ -136,8 +138,8 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
 	for broker := client.any(); broker != nil; broker = client.any() {
 		response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
 
-		switch err.(type) {
-		case nil:
+		switch {
+		case err == nil:
 			// valid response, use it
 			retry, err := client.update(response)
 			switch {
@@ -147,12 +149,12 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
 				return nil
 			default:
 				if retries <= 0 {
-					return k.LEADER_NOT_AVAILABLE
+					return types.LEADER_NOT_AVAILABLE
 				}
 				time.Sleep(250 * time.Millisecond) // wait for leader election
 				return client.refreshTopics(retry, retries-1)
 			}
-		case k.EncodingError:
+		case err == encoding.EncodingError:
 			// didn't even send, return the error
 			return err
 		}
@@ -245,9 +247,9 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
 
 	for _, topic := range data.Topics {
 		switch topic.Err {
-		case k.NO_ERROR:
+		case types.NO_ERROR:
 			break
-		case k.LEADER_NOT_AVAILABLE:
+		case types.LEADER_NOT_AVAILABLE:
 			toRetry[topic.Name] = true
 		default:
 			return nil, topic.Err
@@ -255,13 +257,13 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
 		client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
-			case k.LEADER_NOT_AVAILABLE:
+			case types.LEADER_NOT_AVAILABLE:
 				// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
 				// partition is in the middle of leader election, so we fallthrough to save it
 				// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
 				toRetry[topic.Name] = true
 				fallthrough
-			case k.NO_ERROR:
+			case types.NO_ERROR:
 				client.leaders[topic.Name][partition.Id] = partition.Leader
 			default:
 				return nil, partition.Err

+ 10 - 6
kafka/consumer.go

@@ -1,6 +1,10 @@
 package kafka
 
 import k "sarama/protocol"
+import (
+	"sarama/encoding"
+	"sarama/types"
+)
 
 // Consumer processes Kafka messages from a given topic and partition.
 // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
@@ -95,15 +99,15 @@ func (c *Consumer) fetchMessages() {
 		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 		response, err := c.broker.Fetch(c.client.id, request)
-		switch err.(type) {
-		case k.EncodingError:
+		switch {
+		case err == nil:
+			break
+		case err == encoding.EncodingError:
 			if c.sendError(err) {
 				continue
 			} else {
 				return
 			}
-		case nil:
-			break
 		default:
 			c.client.disconnectBroker(c.broker)
 			c.broker, err = c.client.leader(c.topic, c.partition)
@@ -124,9 +128,9 @@ func (c *Consumer) fetchMessages() {
 		}
 
 		switch block.Err {
-		case k.NO_ERROR:
+		case types.NO_ERROR:
 			break
-		case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
+		case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE:
 			err = c.client.refreshTopic(c.topic)
 			if c.sendError(err) {
 				continue

+ 10 - 6
kafka/producer.go

@@ -1,6 +1,10 @@
 package kafka
 
 import k "sarama/protocol"
+import (
+	"sarama/encoding"
+	"sarama/types"
+)
 
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
@@ -74,15 +78,15 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		return err
 	}
 
-	request := &k.ProduceRequest{RequiredAcks: k.WAIT_FOR_LOCAL, Timeout: 0}
+	request := &k.ProduceRequest{RequiredAcks: types.WAIT_FOR_LOCAL, Timeout: 0}
 	request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)
-	switch err.(type) {
-	case k.EncodingError:
-		return err
+	switch err {
 	case nil:
 		break
+	case encoding.EncodingError:
+		return err
 	default:
 		if !retry {
 			return err
@@ -101,9 +105,9 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	}
 
 	switch block.Err {
-	case k.NO_ERROR:
+	case types.NO_ERROR:
 		return nil
-	case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
+	case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE:
 		if !retry {
 			return block.Err
 		}