Browse Source

More cleanup, reorg, API tweaks

Evan Huus 12 years ago
parent
commit
1195e7f8d1
4 changed files with 54 additions and 11 deletions
  1. 1 1
      kafka/client.go
  2. 9 1
      kafka/errors.go
  3. 32 9
      kafka/producer.go
  4. 12 0
      protocol/produce_response.go

+ 1 - 1
kafka/client.go

@@ -49,7 +49,7 @@ func (client *Client) partitions(topic string) ([]int32, error) {
 	}
 	}
 
 
 	if partitions == nil {
 	if partitions == nil {
-		return nil, k.UNKNOWN_TOPIC_OR_PARTITION
+		return nil, NoSuchTopic
 	}
 	}
 
 
 	return partitions, nil
 	return partitions, nil

+ 9 - 1
kafka/errors.go

@@ -2,5 +2,13 @@ package kafka
 
 
 import "errors"
 import "errors"
 
 
-// Error returned when the client has run out of brokers to talk to (none of them are responding).
+// OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
+// or otherwise failed to respond.
 var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
+
+// NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
+var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
+
+// IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
+// not contain the expected information.
+var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")

+ 32 - 9
kafka/producer.go

@@ -34,10 +34,10 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 	return partitions[partitioner.Partition(key, len(partitions))], nil
 	return partitions[partitioner.Partition(key, len(partitions))], nil
 }
 }
 
 
-func (p *Producer) SendMessage(key, value Encoder) (*k.ProduceResponse, error) {
+func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
 	partition, err := p.choosePartition(key)
 	partition, err := p.choosePartition(key)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return err
 	}
 	}
 
 
 	var keyBytes []byte
 	var keyBytes []byte
@@ -46,17 +46,17 @@ func (p *Producer) SendMessage(key, value Encoder) (*k.ProduceResponse, error) {
 	if key != nil {
 	if key != nil {
 		keyBytes, err = key.Encode()
 		keyBytes, err = key.Encode()
 		if err != nil {
 		if err != nil {
-			return nil, err
+			return err
 		}
 		}
 	}
 	}
 	valBytes, err = value.Encode()
 	valBytes, err = value.Encode()
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return err
 	}
 	}
 
 
 	broker, err := p.client.leader(p.topic, partition)
 	broker, err := p.client.leader(p.topic, partition)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return err
 	}
 	}
 
 
 	request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
 	request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
@@ -64,12 +64,35 @@ func (p *Producer) SendMessage(key, value Encoder) (*k.ProduceResponse, error) {
 
 
 	response, err := broker.Produce(p.client.id, request)
 	response, err := broker.Produce(p.client.id, request)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return err
 	}
 	}
 
 
-	return response, nil
+	block := response.GetBlock(&p.topic, partition)
+	if block == nil {
+		return IncompleteResponse
+	}
+
+	switch block.Err {
+	case k.NO_ERROR:
+		return nil
+	case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION:
+		if retries <= 0 {
+			return block.Err
+		}
+		err = p.client.cache.refreshTopic(p.topic)
+		if err != nil {
+			return err
+		}
+		return p.safeSendMessage(key, value, retries-1)
+	default:
+		return block.Err
+	}
+}
+
+func (p *Producer) SendMessage(key, value Encoder) error {
+	return p.safeSendMessage(key, value, 1)
 }
 }
 
 
-func (p *Producer) SendSimpleMessage(in string) (*k.ProduceResponse, error) {
-	return p.SendMessage(nil, encodableString(in))
+func (p *Producer) SendSimpleMessage(msg string) error {
+	return p.safeSendMessage(nil, encodableString(msg), 1)
 }
 }

+ 12 - 0
protocol/produce_response.go

@@ -60,3 +60,15 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 
 
 	return nil
 	return nil
 }
 }
+
+func (pr *ProduceResponse) GetBlock(topic *string, partition int32) *ProduceResponseBlock {
+	if pr.Blocks == nil {
+		return nil
+	}
+
+	if pr.Blocks[topic] == nil {
+		return nil
+	}
+
+	return pr.Blocks[topic][partition]
+}