Browse Source

Also prefix KError instances with Err.

Willem van Bergen 10 years ago
parent
commit
87c89f868c

+ 19 - 19
client.go

@@ -108,7 +108,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	switch err {
 	case nil:
 		break
-	case LeaderNotAvailable, ReplicaNotAvailable:
+	case ErrLeaderNotAvailable, ErrReplicaNotAvailable:
 		// indicates that maybe part of the cluster is down, but is not fatal to creating the client
 		Logger.Println(err)
 	default:
@@ -194,7 +194,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -213,7 +213,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
 	// this function returning an empty slice (which is a valid return value) but catching it
-	// here the first time (note we *don't* catch it below where we return UnknownTopicOrPartition) triggers
+	// here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
 	// a metadata refresh as a nicety so callers can just try again and don't have to manually
 	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
 	if len(partitions) == 0 {
@@ -225,7 +225,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -243,7 +243,7 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 		return nil, err
 	}
 
-	if metadata.Err == ReplicaNotAvailable {
+	if metadata.Err == ErrReplicaNotAvailable {
 		return nil, metadata.Err
 	}
 	return dupeAndSort(metadata.Replicas), nil
@@ -263,7 +263,7 @@ func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32,
 		return nil, err
 	}
 
-	if metadata.Err == ReplicaNotAvailable {
+	if metadata.Err == ErrReplicaNotAvailable {
 		return nil, metadata.Err
 	}
 	return dupeAndSort(metadata.Isr), nil
@@ -317,11 +317,11 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
 	if block == nil {
 		return -1, ErrIncompleteResponse
 	}
-	if block.Err != NoError {
+	if block.Err != ErrNoError {
 		return -1, block.Err
 	}
 	if len(block.Offsets) != 1 {
-		return -1, OffsetOutOfRange
+		return -1, ErrOffsetOutOfRange
 	}
 
 	return block.Offsets[0], nil
@@ -415,7 +415,7 @@ func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMe
 	}
 
 	if metadata == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return metadata, nil
@@ -454,7 +454,7 @@ func (client *Client) setPartitionCache(topic string, partitionSet partitionType
 
 	ret := make([]int32, 0, len(partitions))
 	for _, partition := range partitions {
-		if partitionSet == writablePartitions && partition.Err == LeaderNotAvailable {
+		if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable {
 			continue
 		}
 		ret = append(ret, partition.ID)
@@ -472,18 +472,18 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
 	if partitions != nil {
 		metadata, ok := partitions[partitionID]
 		if ok {
-			if metadata.Err == LeaderNotAvailable {
-				return nil, LeaderNotAvailable
+			if metadata.Err == ErrLeaderNotAvailable {
+				return nil, ErrLeaderNotAvailable
 			}
 			b := client.brokers[metadata.Leader]
 			if b == nil {
-				return nil, LeaderNotAvailable
+				return nil, ErrLeaderNotAvailable
 			}
 			return b, nil
 		}
 	}
 
-	return nil, UnknownTopicOrPartition
+	return nil, ErrUnknownTopicOrPartition
 }
 
 // core metadata update logic
@@ -520,7 +520,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 		if len(topic) == 0 {
-			return UnknownTopicOrPartition
+			return ErrUnknownTopicOrPartition
 		}
 	}
 
@@ -570,7 +570,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	return ErrOutOfBrokers
 }
 
-// if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable
+// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
@@ -600,9 +600,9 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	var err error
 	for _, topic := range data.Topics {
 		switch topic.Err {
-		case NoError:
+		case ErrNoError:
 			break
-		case LeaderNotAvailable:
+		case ErrLeaderNotAvailable:
 			toRetry[topic.Name] = true
 		default:
 			err = topic.Err
@@ -612,7 +612,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		delete(client.cachedPartitionsResults, topic.Name)
 		for _, partition := range topic.Partitions {
 			client.metadata[topic.Name][partition.ID] = partition
-			if partition.Err == LeaderNotAvailable {
+			if partition.Err == ErrLeaderNotAvailable {
 				toRetry[topic.Name] = true
 			}
 		}

+ 5 - 5
client_test.go

@@ -42,8 +42,8 @@ func TestCachedPartitions(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
@@ -100,8 +100,8 @@ func TestClientMetadata(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
@@ -173,7 +173,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker.Returns(metadataResponse1)
 
 	metadataResponse2 := new(MetadataResponse)
-	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 
 	client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)

+ 2 - 2
consumer.go

@@ -525,12 +525,12 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 
 func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
 	switch block.Err {
-	case NoError:
+	case ErrNoError:
 		break
 	default:
 		child.sendError(block.Err)
 		fallthrough
-	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+	case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
 		// doesn't belong to us, redispatch it
 		child.trigger <- none{}
 		delete(w.subscriptions, child)

+ 2 - 2
consumer_metadata_response_test.go

@@ -21,7 +21,7 @@ func TestConsumerMetadataResponseError(t *testing.T) {
 
 	testDecodable(t, "error", &response, consumerMetadataResponseError)
 
-	if response.Err != OffsetsLoadInProgress {
+	if response.Err != ErrOffsetsLoadInProgress {
 		t.Error("Decoding produced incorrect error value.")
 	}
 
@@ -43,7 +43,7 @@ func TestConsumerMetadataResponseSuccess(t *testing.T) {
 
 	testDecodable(t, "success", &response, consumerMetadataResponseSuccess)
 
-	if response.Err != NoError {
+	if response.Err != ErrNoError {
 		t.Error("Decoding produced error value where there was none.")
 	}
 

+ 11 - 11
consumer_test.go

@@ -27,7 +27,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	for i := 0; i <= 10; i++ {
@@ -77,7 +77,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	offsetResponse := new(OffsetResponse)
@@ -125,7 +125,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	fetchResponse := new(FetchResponse)
@@ -175,8 +175,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
 	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
@@ -229,13 +229,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 
 	// leader0 says no longer leader of partition 0
 	fetchResponse = new(FetchResponse)
-	fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
+	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
 	leader0.Returns(fetchResponse)
 
 	// metadata assigns both partitions to leader1
 	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 
@@ -259,13 +259,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	for i := 0; i < 3; i++ {
 		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
 	}
-	fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
+	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
 	leader1.Returns(fetchResponse)
 
 	// metadata assigns 0 to leader1 and 1 to leader0
 	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 

+ 34 - 34
errors.go

@@ -66,62 +66,62 @@ type KError int16
 
 // Numeric error codes returned by the Kafka server.
 const (
-	NoError                         KError = 0
-	Unknown                         KError = -1
-	OffsetOutOfRange                KError = 1
-	InvalidMessage                  KError = 2
-	UnknownTopicOrPartition         KError = 3
-	InvalidMessageSize              KError = 4
-	LeaderNotAvailable              KError = 5
-	NotLeaderForPartition           KError = 6
-	RequestTimedOut                 KError = 7
-	BrokerNotAvailable              KError = 8
-	ReplicaNotAvailable             KError = 9
-	MessageSizeTooLarge             KError = 10
-	StaleControllerEpochCode        KError = 11
-	OffsetMetadataTooLarge          KError = 12
-	OffsetsLoadInProgress           KError = 14
-	ConsumerCoordinatorNotAvailable KError = 15
-	NotCoordinatorForConsumer       KError = 16
+	ErrNoError                         KError = 0
+	ErrUnknown                         KError = -1
+	ErrOffsetOutOfRange                KError = 1
+	ErrInvalidMessage                  KError = 2
+	ErrUnknownTopicOrPartition         KError = 3
+	ErrInvalidMessageSize              KError = 4
+	ErrLeaderNotAvailable              KError = 5
+	ErrNotLeaderForPartition           KError = 6
+	ErrRequestTimedOut                 KError = 7
+	ErrBrokerNotAvailable              KError = 8
+	ErrReplicaNotAvailable             KError = 9
+	ErrMessageSizeTooLarge             KError = 10
+	ErrStaleControllerEpochCode        KError = 11
+	ErrOffsetMetadataTooLarge          KError = 12
+	ErrOffsetsLoadInProgress           KError = 14
+	ErrConsumerCoordinatorNotAvailable KError = 15
+	ErrNotCoordinatorForConsumer       KError = 16
 )
 
 func (err KError) Error() string {
 	// Error messages stolen/adapted from
 	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 	switch err {
-	case NoError:
+	case ErrNoError:
 		return "kafka server: Not an error, why are you printing me?"
-	case Unknown:
+	case ErrUnknown:
 		return "kafka server: Unexpected (unknown?) server error."
-	case OffsetOutOfRange:
+	case ErrOffsetOutOfRange:
 		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
-	case InvalidMessage:
+	case ErrInvalidMessage:
 		return "kafka server: Message contents does not match its CRC."
-	case UnknownTopicOrPartition:
+	case ErrUnknownTopicOrPartition:
 		return "kafka server: Request was for a topic or partition that does not exist on this broker."
-	case InvalidMessageSize:
+	case ErrInvalidMessageSize:
 		return "kafka server: The message has a negative size."
-	case LeaderNotAvailable:
+	case ErrLeaderNotAvailable:
 		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
-	case NotLeaderForPartition:
+	case ErrNotLeaderForPartition:
 		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
-	case RequestTimedOut:
+	case ErrRequestTimedOut:
 		return "kafka server: Request exceeded the user-specified time limit in the request."
-	case BrokerNotAvailable:
+	case ErrBrokerNotAvailable:
 		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
-	case ReplicaNotAvailable:
+	case ErrReplicaNotAvailable:
 		return "kafka server: Replica infomation not available, one or more brokers are down."
-	case MessageSizeTooLarge:
+	case ErrMessageSizeTooLarge:
 		return "kafka server: Message was too large, server rejected it to avoid allocation error."
-	case StaleControllerEpochCode:
+	case ErrStaleControllerEpochCode:
 		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
-	case OffsetMetadataTooLarge:
+	case ErrOffsetMetadataTooLarge:
 		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
-	case OffsetsLoadInProgress:
+	case ErrOffsetsLoadInProgress:
 		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
-	case ConsumerCoordinatorNotAvailable:
+	case ErrConsumerCoordinatorNotAvailable:
 		return "kafka server: Offset's topic has not yet been created."
-	case NotCoordinatorForConsumer:
+	case ErrNotCoordinatorForConsumer:
 		return "kafka server: Request was for a consumer group that is not coordinated by this broker."
 	}
 

+ 1 - 1
fetch_response_test.go

@@ -54,7 +54,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	if block == nil {
 		t.Fatal("GetBlock didn't return block.")
 	}
-	if block.Err != OffsetOutOfRange {
+	if block.Err != ErrOffsetOutOfRange {
 		t.Error("Decoding didn't produce correct error code.")
 	}
 	if block.HighWaterMarkOffset != 0x10101010 {

+ 3 - 3
metadata_response_test.go

@@ -88,7 +88,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Fatal("Decoding produced", len(response.Topics), "topics where there were two!")
 	}
 
-	if response.Topics[0].Err != NoError {
+	if response.Topics[0].Err != ErrNoError {
 		t.Error("Decoding produced invalid topic 0 error.")
 	}
 
@@ -100,7 +100,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Fatal("Decoding produced invalid partition count for topic 0.")
 	}
 
-	if response.Topics[0].Partitions[0].Err != InvalidMessageSize {
+	if response.Topics[0].Partitions[0].Err != ErrInvalidMessageSize {
 		t.Error("Decoding produced invalid topic 0 partition 0 error.")
 	}
 
@@ -125,7 +125,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Error("Decoding produced invalid topic 0 partition 0 isr length.")
 	}
 
-	if response.Topics[1].Err != NoError {
+	if response.Topics[1].Err != ErrNoError {
 		t.Error("Decoding produced invalid topic 1 error.")
 	}
 

+ 1 - 1
offset_commit_response_test.go

@@ -45,7 +45,7 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 		t.Fatal("Decoding produced wrong number of errors for topic 't'.")
 	}
 
-	if response.Errors["t"][0] != NotLeaderForPartition {
+	if response.Errors["t"][0] != ErrNotLeaderForPartition {
 		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
 

+ 1 - 1
offset_fetch_response_test.go

@@ -55,7 +55,7 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 		t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
 	}
 
-	if response.Blocks["t"][0].Err != RequestTimedOut {
+	if response.Blocks["t"][0].Err != ErrRequestTimedOut {
 		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
 }

+ 1 - 1
offset_response_test.go

@@ -47,7 +47,7 @@ func TestNormalOffsetResponse(t *testing.T) {
 		t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
 	}
 
-	if response.Blocks["z"][2].Err != NoError {
+	if response.Blocks["z"][2].Err != ErrNoError {
 		t.Fatal("Decoding produced invalid error for topic z partition 2.")
 	}
 

+ 2 - 2
produce_response_test.go

@@ -46,7 +46,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/1")
 	} else {
-		if block.Err != NoError {
+		if block.Err != ErrNoError {
 			t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0xFF {
@@ -57,7 +57,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/2")
 	} else {
-		if block.Err != InvalidMessage {
+		if block.Err != ErrInvalidMessage {
 			t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0 {

+ 4 - 4
producer.go

@@ -295,7 +295,7 @@ func (p *Producer) topicDispatcher() {
 		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
 			(msg.byteSize() > p.config.MaxMessageBytes) {
 
-			p.returnError(msg, MessageSizeTooLarge)
+			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 		}
 
@@ -617,7 +617,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 				}
 
 				switch block.Err {
-				case NoError:
+				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
 					if p.config.AckSuccesses {
 						for i := range msgs {
@@ -625,7 +625,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 						}
 						p.returnSuccesses(msgs)
 					}
-				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable, RequestTimedOut:
+				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut:
 					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
 						broker.ID(), topic, partition, block.Err)
 					if currentRetries[topic] == nil {
@@ -710,7 +710,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 	numPartitions := int32(len(partitions))
 
 	if numPartitions == 0 {
-		return LeaderNotAvailable
+		return ErrLeaderNotAvailable
 	}
 
 	choice, err := partitioner.Partition(msg.Key, numPartitions)

+ 25 - 25
producer_test.go

@@ -41,11 +41,11 @@ func TestSimpleProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	for i := 0; i < 10; i++ {
 		leader.Returns(prodSuccess)
 	}
@@ -79,11 +79,11 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -124,11 +124,11 @@ func TestProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -176,11 +176,11 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
@@ -231,16 +231,16 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
 	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodResponse0 := new(ProduceResponse)
-	prodResponse0.AddTopicPartition("my_topic", 0, NoError)
+	prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader0.Returns(prodResponse0)
 
 	prodResponse1 := new(ProduceResponse)
-	prodResponse1.AddTopicPartition("my_topic", 1, NoError)
+	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader1.Returns(prodResponse1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -288,7 +288,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -310,16 +310,16 @@ func TestProducerFailureRetry(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	prodNotLeader := new(ProduceResponse)
-	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
 	leader1.Returns(prodNotLeader)
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	leader1.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -366,7 +366,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -391,7 +391,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -420,7 +420,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -448,11 +448,11 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	// ok fine, tell it to go to leader2 finally
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -481,7 +481,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -503,12 +503,12 @@ func TestProducerMultipleRetries(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	prodNotLeader := new(ProduceResponse)
-	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
 	leader1.Returns(prodNotLeader)
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader2)
 	leader2.Returns(prodNotLeader)
 	seedBroker.Returns(metadataLeader1)
@@ -518,7 +518,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {