Browse Source

Merge pull request #299 from Shopify/error_variables

Error variables
Willem van Bergen 10 năm trước cách đây
mục cha
commit
1b6daea541

+ 5 - 7
broker.go

@@ -97,8 +97,8 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 
 	if b.conn != nil {
 		b.lock.Unlock()
-		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, AlreadyConnected)
-		return AlreadyConnected
+		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected)
+		return ErrAlreadyConnected
 	}
 
 	go withRecover(func() {
@@ -143,7 +143,7 @@ func (b *Broker) Close() (err error) {
 	}()
 
 	if b.conn == nil {
-		return NotConnected
+		return ErrNotConnected
 	}
 
 	close(b.responses)
@@ -267,7 +267,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		if b.connErr != nil {
 			return nil, b.connErr
 		}
-		return nil, NotConnected
+		return nil, ErrNotConnected
 	}
 
 	fullRequest := request{b.correlationID, clientID, req}
@@ -384,9 +384,7 @@ func (b *Broker) responseReceiver() {
 		if decodedHeader.correlationID != response.correlationID {
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
-			response.errors <- DecodingError{
-				Info: fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID),
-			}
+			response.errors <- PacketDecodingError{fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
 			continue
 		}
 

+ 30 - 30
client.go

@@ -108,7 +108,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	switch err {
 	case nil:
 		break
-	case LeaderNotAvailable, ReplicaNotAvailable:
+	case ErrLeaderNotAvailable, ErrReplicaNotAvailable:
 		// indicates that maybe part of the cluster is down, but is not fatal to creating the client
 		Logger.Println(err)
 	default:
@@ -131,7 +131,7 @@ func (client *Client) Close() error {
 		// Chances are this is being called from a defer() and the error will go unobserved
 		// so we go ahead and log the event in this case.
 		Logger.Printf("Close() called on already closed client")
-		return ClosedClient
+		return ErrClosedClient
 	}
 
 	client.lock.Lock()
@@ -162,7 +162,7 @@ func (client *Client) Closed() bool {
 func (client *Client) Topics() ([]string, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	client.lock.RLock()
@@ -180,7 +180,7 @@ func (client *Client) Topics() ([]string, error) {
 func (client *Client) Partitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	partitions := client.cachedPartitions(topic, allPartitions)
@@ -194,7 +194,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -205,7 +205,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	partitions := client.cachedPartitions(topic, writablePartitions)
@@ -213,7 +213,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
 	// this function returning an empty slice (which is a valid return value) but catching it
-	// here the first time (note we *don't* catch it below where we return UnknownTopicOrPartition) triggers
+	// here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
 	// a metadata refresh as a nicety so callers can just try again and don't have to manually
 	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
 	if len(partitions) == 0 {
@@ -225,7 +225,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -234,7 +234,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
 // Replicas returns the set of all replica IDs for the given partition.
 func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	metadata, err := client.getMetadata(topic, partitionID)
@@ -243,7 +243,7 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 		return nil, err
 	}
 
-	if metadata.Err == ReplicaNotAvailable {
+	if metadata.Err == ErrReplicaNotAvailable {
 		return nil, metadata.Err
 	}
 	return dupeAndSort(metadata.Replicas), nil
@@ -254,7 +254,7 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 // This method should be considered effectively deprecated.
 func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	metadata, err := client.getMetadata(topic, partitionID)
@@ -263,7 +263,7 @@ func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32,
 		return nil, err
 	}
 
-	if metadata.Err == ReplicaNotAvailable {
+	if metadata.Err == ErrReplicaNotAvailable {
 		return nil, metadata.Err
 	}
 	return dupeAndSort(metadata.Isr), nil
@@ -315,13 +315,13 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
 
 	block := response.GetBlock(topic, partitionID)
 	if block == nil {
-		return -1, IncompleteResponse
+		return -1, ErrIncompleteResponse
 	}
-	if block.Err != NoError {
+	if block.Err != ErrNoError {
 		return -1, block.Err
 	}
 	if len(block.Offsets) != 1 {
-		return -1, OffsetOutOfRange
+		return -1, ErrOffsetOutOfRange
 	}
 
 	return block.Offsets[0], nil
@@ -415,7 +415,7 @@ func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMe
 	}
 
 	if metadata == nil {
-		return nil, UnknownTopicOrPartition
+		return nil, ErrUnknownTopicOrPartition
 	}
 
 	return metadata, nil
@@ -454,7 +454,7 @@ func (client *Client) setPartitionCache(topic string, partitionSet partitionType
 
 	ret := make([]int32, 0, len(partitions))
 	for _, partition := range partitions {
-		if partitionSet == writablePartitions && partition.Err == LeaderNotAvailable {
+		if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable {
 			continue
 		}
 		ret = append(ret, partition.ID)
@@ -472,18 +472,18 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
 	if partitions != nil {
 		metadata, ok := partitions[partitionID]
 		if ok {
-			if metadata.Err == LeaderNotAvailable {
-				return nil, LeaderNotAvailable
+			if metadata.Err == ErrLeaderNotAvailable {
+				return nil, ErrLeaderNotAvailable
 			}
 			b := client.brokers[metadata.Leader]
 			if b == nil {
-				return nil, LeaderNotAvailable
+				return nil, ErrLeaderNotAvailable
 			}
 			return b, nil
 		}
 	}
 
-	return nil, UnknownTopicOrPartition
+	return nil, ErrUnknownTopicOrPartition
 }
 
 // core metadata update logic
@@ -512,7 +512,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	// resources.  Check to see if we're dealing with a closed Client and error
 	// out immediately if so.
 	if client.Closed() {
-		return ClosedClient
+		return ErrClosedClient
 	}
 
 	// Kafka will throw exceptions on an empty topic and not return a proper
@@ -520,7 +520,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 		if len(topic) == 0 {
-			return UnknownTopicOrPartition
+			return ErrUnknownTopicOrPartition
 		}
 	}
 
@@ -532,7 +532,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		}
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
 
-		switch err {
+		switch err.(type) {
 		case nil:
 			// valid response, use it
 			retry, err := client.update(response)
@@ -548,7 +548,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 			}
 
 			return err
-		case EncodingError:
+		case PacketEncodingError:
 			// didn't even send, return the error
 			return err
 		default:
@@ -567,10 +567,10 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		return client.refreshMetadata(topics, retriesRemaining-1)
 	}
 
-	return OutOfBrokers
+	return ErrOutOfBrokers
 }
 
-// if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable
+// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
@@ -600,9 +600,9 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	var err error
 	for _, topic := range data.Topics {
 		switch topic.Err {
-		case NoError:
+		case ErrNoError:
 			break
-		case LeaderNotAvailable:
+		case ErrLeaderNotAvailable:
 			toRetry[topic.Name] = true
 		default:
 			err = topic.Err
@@ -612,7 +612,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		delete(client.cachedPartitionsResults, topic.Name)
 		for _, partition := range topic.Partitions {
 			client.metadata[topic.Name][partition.ID] = partition
-			if partition.Err == LeaderNotAvailable {
+			if partition.Err == ErrLeaderNotAvailable {
 				toRetry[topic.Name] = true
 			}
 		}

+ 5 - 5
client_test.go

@@ -42,8 +42,8 @@ func TestCachedPartitions(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
@@ -100,8 +100,8 @@ func TestClientMetadata(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
@@ -173,7 +173,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker.Returns(metadataResponse1)
 
 	metadataResponse2 := new(MetadataResponse)
-	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 
 	client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)

+ 7 - 7
consumer.go

@@ -62,7 +62,7 @@ func (config *ConsumerConfig) Validate() error {
 type PartitionConsumerConfig struct {
 	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
 	DefaultFetchSize int32
-	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
+	// The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is
 	// treated as no limit.
 	MaxMessageSize int32
 	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
@@ -134,7 +134,7 @@ type Consumer struct {
 func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	if config == nil {
@@ -466,7 +466,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		for child := range w.subscriptions {
 			block := response.GetBlock(child.topic, child.partition)
 			if block == nil {
-				child.sendError(IncompleteResponse)
+				child.sendError(ErrIncompleteResponse)
 				child.trigger <- none{}
 				delete(w.subscriptions, child)
 				continue
@@ -525,12 +525,12 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 
 func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
 	switch block.Err {
-	case NoError:
+	case ErrNoError:
 		break
 	default:
 		child.sendError(block.Err)
 		fallthrough
-	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+	case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
 		// doesn't belong to us, redispatch it
 		child.trigger <- none{}
 		delete(w.subscriptions, child)
@@ -543,7 +543,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 		if block.MsgSet.PartialTrailingMessage {
 			if child.config.MaxMessageSize > 0 && child.fetchSize == child.config.MaxMessageSize {
 				// we can't ask for more data, we've hit the configured limit
-				child.sendError(MessageTooLarge)
+				child.sendError(ErrMessageTooLarge)
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 				child.fetchSize *= 2
@@ -588,7 +588,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 	}
 
 	if incomplete || !atLeastOne {
-		child.sendError(IncompleteResponse)
+		child.sendError(ErrIncompleteResponse)
 		child.trigger <- none{}
 		delete(w.subscriptions, child)
 	}

+ 2 - 2
consumer_metadata_response_test.go

@@ -21,7 +21,7 @@ func TestConsumerMetadataResponseError(t *testing.T) {
 
 	testDecodable(t, "error", &response, consumerMetadataResponseError)
 
-	if response.Err != OffsetsLoadInProgress {
+	if response.Err != ErrOffsetsLoadInProgress {
 		t.Error("Decoding produced incorrect error value.")
 	}
 
@@ -43,7 +43,7 @@ func TestConsumerMetadataResponseSuccess(t *testing.T) {
 
 	testDecodable(t, "success", &response, consumerMetadataResponseSuccess)
 
-	if response.Err != NoError {
+	if response.Err != ErrNoError {
 		t.Error("Decoding produced error value where there was none.")
 	}
 

+ 11 - 11
consumer_test.go

@@ -27,7 +27,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	for i := 0; i <= 10; i++ {
@@ -77,7 +77,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	offsetResponse := new(OffsetResponse)
@@ -125,7 +125,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	fetchResponse := new(FetchResponse)
@@ -175,8 +175,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
 	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
@@ -229,13 +229,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 
 	// leader0 says no longer leader of partition 0
 	fetchResponse = new(FetchResponse)
-	fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
+	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
 	leader0.Returns(fetchResponse)
 
 	// metadata assigns both partitions to leader1
 	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 
@@ -259,13 +259,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	for i := 0; i < 3; i++ {
 		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
 	}
-	fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
+	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
 	leader1.Returns(fetchResponse)
 
 	// metadata assigns 0 to leader1 and 1 to leader0
 	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 

+ 1 - 1
crc32_field.go

@@ -28,7 +28,7 @@ func (c *crc32Field) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
-		return DecodingError{Info: "CRC didn't match"}
+		return PacketDecodingError{"CRC didn't match"}
 	}
 
 	return nil

+ 4 - 2
encoder_decoder.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "fmt"
+
 // Encoder is the interface that wraps the basic Encode method.
 // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
 type encoder interface {
@@ -21,7 +23,7 @@ func encode(in encoder) ([]byte, error) {
 	}
 
 	if prepEnc.length < 0 || uint32(prepEnc.length) > MaxRequestSize {
-		return nil, EncodingError
+		return nil, PacketEncodingError{fmt.Sprintf("Invalid request size: %d", prepEnc.length)}
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length)
@@ -53,7 +55,7 @@ func decode(buf []byte, in decoder) error {
 	}
 
 	if helper.off != len(buf) {
-		return DecodingError{Info: "Length was invalid"}
+		return PacketDecodingError{"Length was invalid"}
 	}
 
 	return nil

+ 67 - 60
errors.go

@@ -5,52 +5,59 @@ import (
 	"fmt"
 )
 
-// OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
+// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
 // or otherwise failed to respond.
-var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
+var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 
-// ClosedClient is the error returned when a method is called on a client that has been closed.
-var ClosedClient = errors.New("kafka: Tried to use a client that was closed")
+// ErrClosedClient is the error returned when a method is called on a client that has been closed.
+var ErrClosedClient = errors.New("kafka: Tried to use a client that was closed")
 
-// IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
+// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
 // not contain the expected information.
-var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")
+var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")
 
-// InvalidPartition is the error returned when a partitioner returns an invalid partition index
+// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
 // (meaning one outside of the range [0...numPartitions-1]).
-var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
+var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
 
-// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
-var AlreadyConnected = errors.New("kafka: broker: already connected")
+// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
+var ErrAlreadyConnected = errors.New("kafka: broker already connected")
 
-// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
-var NotConnected = errors.New("kafka: broker: not connected")
+// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
+var ErrNotConnected = errors.New("kafka: broker not connected")
 
-// EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
-// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
-var EncodingError = errors.New("kafka: Error while encoding packet")
-
-// InsufficientData is returned when decoding and the packet is truncated. This can be expected
+// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
 // when requesting messages, since as an optimization the server is allowed to return a partial message at the end
 // of the message set.
-var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
 
-// ShuttingDown is returned when a producer receives a message during shutdown.
-var ShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
+var ErrInsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
+
+// ErrShuttingDown is returned when a producer receives a message during shutdown.
+var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
 
-// DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
+// ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
+var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
+
+// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
+// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
+type PacketEncodingError struct {
+	Info string
+}
+
+func (err PacketEncodingError) Error() string {
+	return fmt.Sprintf("kafka: Error while encoding packet: %s", err.Info)
+}
+
+// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
-type DecodingError struct {
+type PacketDecodingError struct {
 	Info string
 }
 
-func (err DecodingError) Error() string {
+func (err PacketDecodingError) Error() string {
 	return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
 }
 
-// MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
-var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
-
 // ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
 // configuration is invalid.
 type ConfigurationError string
@@ -65,62 +72,62 @@ type KError int16
 
 // Numeric error codes returned by the Kafka server.
 const (
-	NoError                         KError = 0
-	Unknown                         KError = -1
-	OffsetOutOfRange                KError = 1
-	InvalidMessage                  KError = 2
-	UnknownTopicOrPartition         KError = 3
-	InvalidMessageSize              KError = 4
-	LeaderNotAvailable              KError = 5
-	NotLeaderForPartition           KError = 6
-	RequestTimedOut                 KError = 7
-	BrokerNotAvailable              KError = 8
-	ReplicaNotAvailable             KError = 9
-	MessageSizeTooLarge             KError = 10
-	StaleControllerEpochCode        KError = 11
-	OffsetMetadataTooLarge          KError = 12
-	OffsetsLoadInProgress           KError = 14
-	ConsumerCoordinatorNotAvailable KError = 15
-	NotCoordinatorForConsumer       KError = 16
+	ErrNoError                         KError = 0
+	ErrUnknown                         KError = -1
+	ErrOffsetOutOfRange                KError = 1
+	ErrInvalidMessage                  KError = 2
+	ErrUnknownTopicOrPartition         KError = 3
+	ErrInvalidMessageSize              KError = 4
+	ErrLeaderNotAvailable              KError = 5
+	ErrNotLeaderForPartition           KError = 6
+	ErrRequestTimedOut                 KError = 7
+	ErrBrokerNotAvailable              KError = 8
+	ErrReplicaNotAvailable             KError = 9
+	ErrMessageSizeTooLarge             KError = 10
+	ErrStaleControllerEpochCode        KError = 11
+	ErrOffsetMetadataTooLarge          KError = 12
+	ErrOffsetsLoadInProgress           KError = 14
+	ErrConsumerCoordinatorNotAvailable KError = 15
+	ErrNotCoordinatorForConsumer       KError = 16
 )
 
 func (err KError) Error() string {
 	// Error messages stolen/adapted from
 	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 	switch err {
-	case NoError:
+	case ErrNoError:
 		return "kafka server: Not an error, why are you printing me?"
-	case Unknown:
+	case ErrUnknown:
 		return "kafka server: Unexpected (unknown?) server error."
-	case OffsetOutOfRange:
+	case ErrOffsetOutOfRange:
 		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
-	case InvalidMessage:
+	case ErrInvalidMessage:
 		return "kafka server: Message contents does not match its CRC."
-	case UnknownTopicOrPartition:
+	case ErrUnknownTopicOrPartition:
 		return "kafka server: Request was for a topic or partition that does not exist on this broker."
-	case InvalidMessageSize:
+	case ErrInvalidMessageSize:
 		return "kafka server: The message has a negative size."
-	case LeaderNotAvailable:
+	case ErrLeaderNotAvailable:
 		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
-	case NotLeaderForPartition:
+	case ErrNotLeaderForPartition:
 		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
-	case RequestTimedOut:
+	case ErrRequestTimedOut:
 		return "kafka server: Request exceeded the user-specified time limit in the request."
-	case BrokerNotAvailable:
+	case ErrBrokerNotAvailable:
 		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
-	case ReplicaNotAvailable:
+	case ErrReplicaNotAvailable:
 		return "kafka server: Replica infomation not available, one or more brokers are down."
-	case MessageSizeTooLarge:
+	case ErrMessageSizeTooLarge:
 		return "kafka server: Message was too large, server rejected it to avoid allocation error."
-	case StaleControllerEpochCode:
+	case ErrStaleControllerEpochCode:
 		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
-	case OffsetMetadataTooLarge:
+	case ErrOffsetMetadataTooLarge:
 		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
-	case OffsetsLoadInProgress:
+	case ErrOffsetsLoadInProgress:
 		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
-	case ConsumerCoordinatorNotAvailable:
+	case ErrConsumerCoordinatorNotAvailable:
 		return "kafka server: Offset's topic has not yet been created."
-	case NotCoordinatorForConsumer:
+	case ErrNotCoordinatorForConsumer:
 		return "kafka server: Request was for a consumer group that is not coordinated by this broker."
 	}
 

+ 1 - 1
fetch_response_test.go

@@ -54,7 +54,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	if block == nil {
 		t.Fatal("GetBlock didn't return block.")
 	}
-	if block.Err != OffsetOutOfRange {
+	if block.Err != ErrOffsetOutOfRange {
 		t.Error("Decoding didn't produce correct error code.")
 	}
 	if block.HighWaterMarkOffset != 0x10101010 {

+ 2 - 2
functional_test.go

@@ -48,8 +48,8 @@ func TestFuncConnectionFailure(t *testing.T) {
 	config.MetadataRetries = 1
 
 	_, err := NewClient("test", []string{"localhost:9000"}, config)
-	if err != OutOfBrokers {
-		t.Fatal("Expected returned error to be OutOfBrokers, but was: ", err)
+	if err != ErrOutOfBrokers {
+		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
 }
 

+ 1 - 1
length_field.go

@@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
 
 func (l *lengthField) check(curOffset int, buf []byte) error {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
-		return DecodingError{Info: "Lengthfield check failed"}
+		return PacketDecodingError{"Lengthfield check failed"}
 	}
 
 	return nil

+ 6 - 5
message.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"bytes"
 	"compress/gzip"
+	"fmt"
 	"io/ioutil"
 )
 
@@ -72,7 +73,7 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = tmp
 			payload = m.compressedCache
 		default:
-			return EncodingError
+			return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)}
 		}
 	}
 
@@ -94,7 +95,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 	}
 	if format != messageFormat {
-		return DecodingError{Info: "Unexpected messageFormat"}
+		return PacketDecodingError{"Unexpected messageFormat"}
 	}
 
 	attribute, err := pd.getInt8()
@@ -118,7 +119,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		// nothing to do
 	case CompressionGZIP:
 		if m.Value == nil {
-			return DecodingError{Info: "GZIP compression specified, but no data to uncompress"}
+			return PacketDecodingError{"GZIP compression specified, but no data to uncompress"}
 		}
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
@@ -130,14 +131,14 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return m.decodeSet()
 	case CompressionSnappy:
 		if m.Value == nil {
-			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
+			return PacketDecodingError{"Snappy compression specified, but no data to uncompress"}
 		}
 		if m.Value, err = snappyDecode(m.Value); err != nil {
 			return err
 		}
 		return m.decodeSet()
 	default:
-		return DecodingError{Info: "Invalid compression specified"}
+		return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)}
 	}
 
 	err = pd.pop()

+ 1 - 1
message_set.go

@@ -69,7 +69,7 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) {
 		switch err {
 		case nil:
 			ms.Messages = append(ms.Messages, msb)
-		case InsufficientData:
+		case ErrInsufficientData:
 			// As an optimization the server is allowed to return a partial message at the
 			// end of the message set. Clients should handle this case. So we just ignore such things.
 			ms.PartialTrailingMessage = true

+ 3 - 3
metadata_response_test.go

@@ -88,7 +88,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Fatal("Decoding produced", len(response.Topics), "topics where there were two!")
 	}
 
-	if response.Topics[0].Err != NoError {
+	if response.Topics[0].Err != ErrNoError {
 		t.Error("Decoding produced invalid topic 0 error.")
 	}
 
@@ -100,7 +100,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Fatal("Decoding produced invalid partition count for topic 0.")
 	}
 
-	if response.Topics[0].Partitions[0].Err != InvalidMessageSize {
+	if response.Topics[0].Partitions[0].Err != ErrInvalidMessageSize {
 		t.Error("Decoding produced invalid topic 0 partition 0 error.")
 	}
 
@@ -125,7 +125,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Error("Decoding produced invalid topic 0 partition 0 isr length.")
 	}
 
-	if response.Topics[1].Err != NoError {
+	if response.Topics[1].Err != ErrNoError {
 		t.Error("Decoding produced invalid topic 1 error.")
 	}
 

+ 1 - 1
offset_commit_response_test.go

@@ -45,7 +45,7 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 		t.Fatal("Decoding produced wrong number of errors for topic 't'.")
 	}
 
-	if response.Errors["t"][0] != NotLeaderForPartition {
+	if response.Errors["t"][0] != ErrNotLeaderForPartition {
 		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
 

+ 1 - 1
offset_fetch_response_test.go

@@ -55,7 +55,7 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 		t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
 	}
 
-	if response.Blocks["t"][0].Err != RequestTimedOut {
+	if response.Blocks["t"][0].Err != ErrRequestTimedOut {
 		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
 }

+ 1 - 1
offset_response_test.go

@@ -47,7 +47,7 @@ func TestNormalOffsetResponse(t *testing.T) {
 		t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
 	}
 
-	if response.Blocks["z"][2].Err != NoError {
+	if response.Blocks["z"][2].Err != ErrNoError {
 		t.Fatal("Decoding produced invalid error for topic z partition 2.")
 	}
 

+ 5 - 4
prep_encoder.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"encoding/binary"
+	"fmt"
 	"math"
 )
 
@@ -29,7 +30,7 @@ func (pe *prepEncoder) putInt64(in int64) {
 
 func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
-		return EncodingError
+		return PacketEncodingError{fmt.Sprintf("Array too long: %d", in)}
 	}
 	pe.length += 4
 	return nil
@@ -43,7 +44,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 		return nil
 	}
 	if len(in) > math.MaxInt32 {
-		return EncodingError
+		return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
 	}
 	pe.length += len(in)
 	return nil
@@ -51,7 +52,7 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 
 func (pe *prepEncoder) putRawBytes(in []byte) error {
 	if len(in) > math.MaxInt32 {
-		return EncodingError
+		return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
 	}
 	pe.length += len(in)
 	return nil
@@ -60,7 +61,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
 func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
-		return EncodingError
+		return PacketEncodingError{fmt.Sprintf("String too long: %d", len(in))}
 	}
 	pe.length += len(in)
 	return nil

+ 2 - 2
produce_response_test.go

@@ -46,7 +46,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/1")
 	} else {
-		if block.Err != NoError {
+		if block.Err != ErrNoError {
 			t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0xFF {
@@ -57,7 +57,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/2")
 	} else {
-		if block.Err != InvalidMessage {
+		if block.Err != ErrInvalidMessage {
 			t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0 {

+ 10 - 10
producer.go

@@ -126,7 +126,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	// Check that we are not dealing with a closed Client before processing
 	// any other arguments
 	if client.Closed() {
-		return nil, ClosedClient
+		return nil, ErrClosedClient
 	}
 
 	if config == nil {
@@ -295,7 +295,7 @@ func (p *Producer) topicDispatcher() {
 		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
 			(msg.byteSize() > p.config.MaxMessageBytes) {
 
-			p.returnError(msg, MessageSizeTooLarge)
+			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 		}
 
@@ -319,7 +319,7 @@ func (p *Producer) topicDispatcher() {
 	p.retries <- &MessageToSend{flags: shutdown}
 
 	for msg := range p.input {
-		p.returnError(msg, ShuttingDown)
+		p.returnError(msg, ErrShuttingDown)
 	}
 
 	close(p.errors)
@@ -583,10 +583,10 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 
 		response, err := broker.Produce(p.client.id, request)
 
-		switch err {
+		switch err.(type) {
 		case nil:
 			break
-		case EncodingError:
+		case PacketEncodingError:
 			p.returnErrors(batch, err)
 			continue
 		default:
@@ -612,12 +612,12 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 
 				block := response.GetBlock(topic, partition)
 				if block == nil {
-					p.returnErrors(msgs, IncompleteResponse)
+					p.returnErrors(msgs, ErrIncompleteResponse)
 					continue
 				}
 
 				switch block.Err {
-				case NoError:
+				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
 					if p.config.AckSuccesses {
 						for i := range msgs {
@@ -625,7 +625,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 						}
 						p.returnSuccesses(msgs)
 					}
-				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable, RequestTimedOut:
+				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut:
 					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
 						broker.ID(), topic, partition, block.Err)
 					if currentRetries[topic] == nil {
@@ -710,7 +710,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 	numPartitions := int32(len(partitions))
 
 	if numPartitions == 0 {
-		return LeaderNotAvailable
+		return ErrLeaderNotAvailable
 	}
 
 	choice, err := partitioner.Partition(msg.Key, numPartitions)
@@ -718,7 +718,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 	if err != nil {
 		return err
 	} else if choice < 0 || choice >= numPartitions {
-		return InvalidPartition
+		return ErrInvalidPartition
 	}
 
 	msg.partition = partitions[choice]

+ 25 - 25
producer_test.go

@@ -41,11 +41,11 @@ func TestSimpleProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	for i := 0; i < 10; i++ {
 		leader.Returns(prodSuccess)
 	}
@@ -79,11 +79,11 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -124,11 +124,11 @@ func TestProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -176,11 +176,11 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
@@ -231,16 +231,16 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
 	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodResponse0 := new(ProduceResponse)
-	prodResponse0.AddTopicPartition("my_topic", 0, NoError)
+	prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader0.Returns(prodResponse0)
 
 	prodResponse1 := new(ProduceResponse)
-	prodResponse1.AddTopicPartition("my_topic", 1, NoError)
+	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader1.Returns(prodResponse1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -288,7 +288,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -310,16 +310,16 @@ func TestProducerFailureRetry(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	prodNotLeader := new(ProduceResponse)
-	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
 	leader1.Returns(prodNotLeader)
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	leader1.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -366,7 +366,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -391,7 +391,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -420,7 +420,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -448,11 +448,11 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	// ok fine, tell it to go to leader2 finally
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
@@ -481,7 +481,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
@@ -503,12 +503,12 @@ func TestProducerMultipleRetries(t *testing.T) {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	prodNotLeader := new(ProduceResponse)
-	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
 	leader1.Returns(prodNotLeader)
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader2)
 	leader2.Returns(prodNotLeader)
 	seedBroker.Returns(metadataLeader1)
@@ -518,7 +518,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {

+ 19 - 19
real_decoder.go

@@ -16,7 +16,7 @@ type realDecoder struct {
 func (rd *realDecoder) getInt8() (int8, error) {
 	if rd.remaining() < 1 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int8(rd.raw[rd.off])
 	rd.off += binary.Size(tmp)
@@ -26,7 +26,7 @@ func (rd *realDecoder) getInt8() (int8, error) {
 func (rd *realDecoder) getInt16() (int16, error) {
 	if rd.remaining() < 2 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -36,7 +36,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 func (rd *realDecoder) getInt32() (int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -46,7 +46,7 @@ func (rd *realDecoder) getInt32() (int32, error) {
 func (rd *realDecoder) getInt64() (int64, error) {
 	if rd.remaining() < 8 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
 	rd.off += binary.Size(tmp)
@@ -56,15 +56,15 @@ func (rd *realDecoder) getInt64() (int64, error) {
 func (rd *realDecoder) getArrayLength() (int, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 	if tmp > rd.remaining() {
 		rd.off = len(rd.raw)
-		return -1, InsufficientData
+		return -1, ErrInsufficientData
 	} else if tmp > 2*math.MaxUint16 {
-		return -1, DecodingError{Info: "getArrayLength failed: Invalid array length"}
+		return -1, PacketDecodingError{"getArrayLength failed: Invalid array length"}
 	}
 	return tmp, nil
 }
@@ -82,14 +82,14 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 
 	switch {
 	case n < -1:
-		return nil, DecodingError{Info: "getBytes failed"}
+		return nil, PacketDecodingError{"getBytes failed: Invalid length"}
 	case n == -1:
 		return nil, nil
 	case n == 0:
 		return make([]byte, 0), nil
 	case n > rd.remaining():
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	tmpStr := rd.raw[rd.off : rd.off+n]
@@ -108,14 +108,14 @@ func (rd *realDecoder) getString() (string, error) {
 
 	switch {
 	case n < -1:
-		return "", DecodingError{Info: "getString failed"}
+		return "", PacketDecodingError{"getString failed: invalid length"}
 	case n == -1:
 		return "", nil
 	case n == 0:
 		return "", nil
 	case n > rd.remaining():
 		rd.off = len(rd.raw)
-		return "", InsufficientData
+		return "", ErrInsufficientData
 	}
 
 	tmpStr := string(rd.raw[rd.off : rd.off+n])
@@ -126,14 +126,14 @@ func (rd *realDecoder) getString() (string, error) {
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	if rd.remaining() < 4*n {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	if n == 0 {
@@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	}
 
 	if n < 0 {
-		return nil, DecodingError{Info: "getInt32Array failed"}
+		return nil, PacketDecodingError{"getInt32Array failed: invalid length"}
 	}
 
 	ret := make([]int32, n)
@@ -155,14 +155,14 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	if rd.remaining() < 8*n {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	if n == 0 {
@@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	}
 
 	if n < 0 {
-		return nil, DecodingError{Info: "getInt64Array failed"}
+		return nil, PacketDecodingError{"getInt64Array failed: invalid length"}
 	}
 
 	ret := make([]int64, n)
@@ -190,7 +190,7 @@ func (rd *realDecoder) remaining() int {
 func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
 	if length > rd.remaining() {
 		rd.off = len(rd.raw)
-		return nil, InsufficientData
+		return nil, ErrInsufficientData
 	}
 
 	start := rd.off
@@ -206,7 +206,7 @@ func (rd *realDecoder) push(in pushDecoder) error {
 	reserve := in.reserveLength()
 	if rd.remaining() < reserve {
 		rd.off = len(rd.raw)
-		return InsufficientData
+		return ErrInsufficientData
 	}
 
 	rd.stack = append(rd.stack, in)

+ 1 - 1
response_header.go

@@ -13,7 +13,7 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
 		return err
 	}
 	if r.length <= 4 || r.length > MaxResponseSize {
-		return DecodingError{Info: fmt.Sprintf("Message too large or too small. Got %d", r.length)}
+		return PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", r.length)}
 	}
 
 	r.correlationID, err = pd.getInt32()

+ 2 - 2
sarama.go

@@ -28,13 +28,13 @@ type StdLogger interface {
 var PanicHandler func(interface{})
 
 // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an EncodingError. The default of 100 MiB is aligned
+// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
 // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
 // to process.
 var MaxRequestSize uint32 = 100 * 1024 * 1024
 
 // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
-// a broker returns a response message larger than this value, Sarama will return a DecodingError. The
+// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError. The
 // default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest
 // request the broker will attempt to process.
 var MaxResponseSize int32 = 100 * 1024 * 1024