Browse Source

Revert "Synced error names and descriptions with the kafka's protocol"

Vlad Gorodetsky 6 years ago
parent
commit
e6c029ff1a

+ 4 - 4
admin.go

@@ -105,7 +105,7 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
 func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
 func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
 
 
 	if topic == "" {
 	if topic == "" {
-		return ErrInvalidTopicException
+		return ErrInvalidTopic
 	}
 	}
 
 
 	if detail == nil {
 	if detail == nil {
@@ -153,7 +153,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 func (ca *clusterAdmin) DeleteTopic(topic string) error {
 func (ca *clusterAdmin) DeleteTopic(topic string) error {
 
 
 	if topic == "" {
 	if topic == "" {
-		return ErrInvalidTopicException
+		return ErrInvalidTopic
 	}
 	}
 
 
 	request := &DeleteTopicsRequest{
 	request := &DeleteTopicsRequest{
@@ -188,7 +188,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
 
 
 func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
 func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
 	if topic == "" {
 	if topic == "" {
-		return ErrInvalidTopicException
+		return ErrInvalidTopic
 	}
 	}
 
 
 	topicPartitions := make(map[string]*TopicPartition)
 	topicPartitions := make(map[string]*TopicPartition)
@@ -224,7 +224,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
 func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
 func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
 
 
 	if topic == "" {
 	if topic == "" {
-		return ErrInvalidTopicException
+		return ErrInvalidTopic
 	}
 	}
 
 
 	topics := make(map[string]*DeleteRecordsRequestTopic)
 	topics := make(map[string]*DeleteRecordsRequestTopic)

+ 1 - 1
admin_test.go

@@ -182,7 +182,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
 	}
 	}
 
 
 	err = admin.DeleteTopic("")
 	err = admin.DeleteTopic("")
-	if err != ErrInvalidTopicException {
+	if err != ErrInvalidTopic {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 

+ 3 - 3
async_producer.go

@@ -329,7 +329,7 @@ func (p *asyncProducer) dispatcher() {
 			continue
 			continue
 		}
 		}
 		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
 		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
-			p.returnError(msg, ErrMessageTooLarge)
+			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 			continue
 		}
 		}
 
 
@@ -827,7 +827,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		case ErrDuplicateSequenceNumber:
 		case ErrDuplicateSequenceNumber:
 			bp.parent.returnSuccesses(pSet.msgs)
 			bp.parent.returnSuccesses(pSet.msgs)
 		// Retriable errors
 		// Retriable errors
-		case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
+		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			retryTopics = append(retryTopics, topic)
 			retryTopics = append(retryTopics, topic)
 		// Other non-retriable errors
 		// Other non-retriable errors
@@ -852,7 +852,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 			}
 			}
 
 
 			switch block.Err {
 			switch block.Err {
-			case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
+			case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 				Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 				Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 					bp.broker.ID(), topic, partition, block.Err)
 					bp.broker.ID(), topic, partition, block.Err)

+ 5 - 5
client.go

@@ -401,7 +401,7 @@ func (client *client) RefreshMetadata(topics ...string) error {
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 	for _, topic := range topics {
 		if len(topic) == 0 {
 		if len(topic) == 0 {
-			return ErrInvalidTopicException // this is the error that 0.8.2 and later correctly return
+			return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
 		}
 		}
 	}
 	}
 
 
@@ -465,7 +465,7 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
 	}
 	}
 
 
 	if coordinator == nil {
 	if coordinator == nil {
-		return nil, ErrCoordinatorNotAvailable
+		return nil, ErrConsumerCoordinatorNotAvailable
 	}
 	}
 
 
 	_ = coordinator.Open(client.conf)
 	_ = coordinator.Open(client.conf)
@@ -790,7 +790,7 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
 		switch topic.Err {
 		switch topic.Err {
 		case ErrNoError:
 		case ErrNoError:
 			break
 			break
-		case ErrInvalidTopicException, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
+		case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
 			err = topic.Err
 			err = topic.Err
 			continue
 			continue
 		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
 		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
@@ -876,7 +876,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
 			Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
 			Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
 			return response, nil
 			return response, nil
 
 
-		case ErrCoordinatorNotAvailable:
+		case ErrConsumerCoordinatorNotAvailable:
 			Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
 			Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
 
 
 			// This is very ugly, but this scenario will only happen once per cluster.
 			// This is very ugly, but this scenario will only happen once per cluster.
@@ -887,7 +887,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
 				time.Sleep(2 * time.Second)
 				time.Sleep(2 * time.Second)
 			}
 			}
 
 
-			return retry(ErrCoordinatorNotAvailable)
+			return retry(ErrConsumerCoordinatorNotAvailable)
 		default:
 		default:
 			return nil, response.Err
 			return nil, response.Err
 		}
 		}

+ 2 - 2
client_test.go

@@ -504,7 +504,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
 	}
 	}
 
 
 	coordinatorResponse1 := new(ConsumerMetadataResponse)
 	coordinatorResponse1 := new(ConsumerMetadataResponse)
-	coordinatorResponse1.Err = ErrCoordinatorNotAvailable
+	coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
 	seedBroker.Returns(coordinatorResponse1)
 	seedBroker.Returns(coordinatorResponse1)
 
 
 	coordinatorResponse2 := new(ConsumerMetadataResponse)
 	coordinatorResponse2 := new(ConsumerMetadataResponse)
@@ -581,7 +581,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
 	}
 	}
 
 
 	coordinatorResponse1 := new(ConsumerMetadataResponse)
 	coordinatorResponse1 := new(ConsumerMetadataResponse)
-	coordinatorResponse1.Err = ErrCoordinatorNotAvailable
+	coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
 	seedBroker.Returns(coordinatorResponse1)
 	seedBroker.Returns(coordinatorResponse1)
 
 
 	metadataResponse2 := new(MetadataResponse)
 	metadataResponse2 := new(MetadataResponse)

+ 1 - 1
config.go

@@ -255,7 +255,7 @@ type Config struct {
 			Default int32
 			Default int32
 			// The maximum number of message bytes to fetch from the broker in a
 			// The maximum number of message bytes to fetch from the broker in a
 			// single request. Messages larger than this will return
 			// single request. Messages larger than this will return
-			// ErrMessageSizeTooLarge and will not be consumable, so you must be sure
+			// ErrMessageTooLarge and will not be consumable, so you must be sure
 			// this is at least as large as your largest message. Defaults to 0
 			// this is at least as large as your largest message. Defaults to 0
 			// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
 			// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
 			// global `sarama.MaxResponseSize` still applies.
 			// global `sarama.MaxResponseSize` still applies.

+ 1 - 1
consumer.go

@@ -560,7 +560,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 		if partialTrailingMessage {
 		if partialTrailingMessage {
 			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
 			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
 				// we can't ask for more data, we've hit the configured limit
 				// we can't ask for more data, we've hit the configured limit
-				child.sendError(ErrMessageSizeTooLarge)
+				child.sendError(ErrMessageTooLarge)
 				child.offset++ // skip this one so we can keep processing future messages
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 			} else {
 				child.fetchSize *= 2
 				child.fetchSize *= 2

+ 4 - 4
consumer_group.go

@@ -193,7 +193,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
 	switch join.Err {
 	switch join.Err {
 	case ErrNoError:
 	case ErrNoError:
 		c.memberID = join.MemberId
 		c.memberID = join.MemberId
-	case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
+	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
 		c.memberID = ""
 		return c.newSession(ctx, coordinator, topics, handler, retries)
 		return c.newSession(ctx, coordinator, topics, handler, retries)
 	case ErrRebalanceInProgress: // retry after backoff
 	case ErrRebalanceInProgress: // retry after backoff
@@ -234,7 +234,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
 	}
 	}
 	switch sync.Err {
 	switch sync.Err {
 	case ErrNoError:
 	case ErrNoError:
-	case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
+	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
 		c.memberID = ""
 		return c.newSession(ctx, coordinator, topics, handler, retries)
 		return c.newSession(ctx, coordinator, topics, handler, retries)
 	case ErrRebalanceInProgress: // retry after backoff
 	case ErrRebalanceInProgress: // retry after backoff
@@ -366,7 +366,7 @@ func (c *consumerGroup) leave() error {
 
 
 	// Check response
 	// Check response
 	switch resp.Err {
 	switch resp.Err {
-	case ErrRebalanceInProgress, ErrUnknownMemberID, ErrNoError:
+	case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
 		return nil
 		return nil
 	default:
 	default:
 		return resp.Err
 		return resp.Err
@@ -664,7 +664,7 @@ func (s *consumerGroupSession) heartbeatLoop() {
 		switch resp.Err {
 		switch resp.Err {
 		case ErrNoError:
 		case ErrNoError:
 			retries = s.parent.config.Metadata.Retry.Max
 			retries = s.parent.config.Metadata.Retry.Max
-		case ErrRebalanceInProgress, ErrUnknownMemberID, ErrIllegalGeneration:
+		case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
 			return
 			return
 		default:
 		default:
 			s.parent.handleError(err, "", -1)
 			s.parent.handleError(err, "", -1)

+ 3 - 3
consumer_metadata_response_test.go

@@ -17,7 +17,7 @@ var (
 )
 )
 
 
 func TestConsumerMetadataResponseError(t *testing.T) {
 func TestConsumerMetadataResponseError(t *testing.T) {
-	response := &ConsumerMetadataResponse{Err: ErrCoordinatorLoadInProgress}
+	response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
 	testEncodable(t, "", response, consumerMetadataResponseError)
 	testEncodable(t, "", response, consumerMetadataResponseError)
 
 
 	decodedResp := &ConsumerMetadataResponse{}
 	decodedResp := &ConsumerMetadataResponse{}
@@ -25,8 +25,8 @@ func TestConsumerMetadataResponseError(t *testing.T) {
 		t.Error("could not decode: ", err)
 		t.Error("could not decode: ", err)
 	}
 	}
 
 
-	if decodedResp.Err != ErrCoordinatorLoadInProgress {
-		t.Errorf("got %s, want %s", decodedResp.Err, ErrCoordinatorLoadInProgress)
+	if decodedResp.Err != ErrOffsetsLoadInProgress {
+		t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
 	}
 	}
 }
 }
 
 

+ 60 - 60
errors.go

@@ -34,8 +34,8 @@ var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet,
 // ErrShuttingDown is returned when a producer receives a message during shutdown.
 // ErrShuttingDown is returned when a producer receives a message during shutdown.
 var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
 var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
 
 
-// ErrMessageSizeTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
-var ErrMessageSizeTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
+// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
+var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
 
 
 // ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
 // ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
 // a RecordBatch.
 // a RecordBatch.
@@ -86,30 +86,30 @@ const (
 	ErrNoError                            KError = 0
 	ErrNoError                            KError = 0
 	ErrUnknown                            KError = -1
 	ErrUnknown                            KError = -1
 	ErrOffsetOutOfRange                   KError = 1
 	ErrOffsetOutOfRange                   KError = 1
-	ErrCorruptMessage                     KError = 2
+	ErrInvalidMessage                     KError = 2
 	ErrUnknownTopicOrPartition            KError = 3
 	ErrUnknownTopicOrPartition            KError = 3
-	ErrInvalidFetchSize                   KError = 4
+	ErrInvalidMessageSize                 KError = 4
 	ErrLeaderNotAvailable                 KError = 5
 	ErrLeaderNotAvailable                 KError = 5
 	ErrNotLeaderForPartition              KError = 6
 	ErrNotLeaderForPartition              KError = 6
 	ErrRequestTimedOut                    KError = 7
 	ErrRequestTimedOut                    KError = 7
 	ErrBrokerNotAvailable                 KError = 8
 	ErrBrokerNotAvailable                 KError = 8
 	ErrReplicaNotAvailable                KError = 9
 	ErrReplicaNotAvailable                KError = 9
-	ErrMessageTooLarge                    KError = 10
-	ErrStaleControllerEpoch               KError = 11
+	ErrMessageSizeTooLarge                KError = 10
+	ErrStaleControllerEpochCode           KError = 11
 	ErrOffsetMetadataTooLarge             KError = 12
 	ErrOffsetMetadataTooLarge             KError = 12
 	ErrNetworkException                   KError = 13
 	ErrNetworkException                   KError = 13
-	ErrCoordinatorLoadInProgress          KError = 14
-	ErrCoordinatorNotAvailable            KError = 15
-	ErrNotCoordinator                     KError = 16
-	ErrInvalidTopicException              KError = 17
-	ErrRecordListTooLarge                 KError = 18
+	ErrOffsetsLoadInProgress              KError = 14
+	ErrConsumerCoordinatorNotAvailable    KError = 15
+	ErrNotCoordinatorForConsumer          KError = 16
+	ErrInvalidTopic                       KError = 17
+	ErrMessageSetSizeTooLarge             KError = 18
 	ErrNotEnoughReplicas                  KError = 19
 	ErrNotEnoughReplicas                  KError = 19
 	ErrNotEnoughReplicasAfterAppend       KError = 20
 	ErrNotEnoughReplicasAfterAppend       KError = 20
 	ErrInvalidRequiredAcks                KError = 21
 	ErrInvalidRequiredAcks                KError = 21
 	ErrIllegalGeneration                  KError = 22
 	ErrIllegalGeneration                  KError = 22
 	ErrInconsistentGroupProtocol          KError = 23
 	ErrInconsistentGroupProtocol          KError = 23
-	ErrInvalidGroupID                     KError = 24
-	ErrUnknownMemberID                    KError = 25
+	ErrInvalidGroupId                     KError = 24
+	ErrUnknownMemberId                    KError = 25
 	ErrInvalidSessionTimeout              KError = 26
 	ErrInvalidSessionTimeout              KError = 26
 	ErrRebalanceInProgress                KError = 27
 	ErrRebalanceInProgress                KError = 27
 	ErrInvalidCommitOffsetSize            KError = 28
 	ErrInvalidCommitOffsetSize            KError = 28
@@ -168,67 +168,67 @@ func (err KError) Error() string {
 	case ErrUnknown:
 	case ErrUnknown:
 		return "kafka server: Unexpected (unknown?) server error."
 		return "kafka server: Unexpected (unknown?) server error."
 	case ErrOffsetOutOfRange:
 	case ErrOffsetOutOfRange:
-		return "kafka server: The requested offset is not within the range of offsets maintained by the server."
-	case ErrCorruptMessage:
-		return "kafka server: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt."
+		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
+	case ErrInvalidMessage:
+		return "kafka server: Message contents does not match its CRC."
 	case ErrUnknownTopicOrPartition:
 	case ErrUnknownTopicOrPartition:
-		return "kafka server: This server does not host this topic-partition."
-	case ErrInvalidFetchSize:
-		return "kafka server: The requested fetch size is invalid."
+		return "kafka server: Request was for a topic or partition that does not exist on this broker."
+	case ErrInvalidMessageSize:
+		return "kafka server: The message has a negative size."
 	case ErrLeaderNotAvailable:
 	case ErrLeaderNotAvailable:
-		return "kafka server: There is no leader for this topic-partition as we are in the middle of a leadership election."
+		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
 	case ErrNotLeaderForPartition:
 	case ErrNotLeaderForPartition:
-		return "kafka server: This server is not the leader for that topic-partition."
+		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
 	case ErrRequestTimedOut:
 	case ErrRequestTimedOut:
-		return "kafka server: The request timed out."
+		return "kafka server: Request exceeded the user-specified time limit in the request."
 	case ErrBrokerNotAvailable:
 	case ErrBrokerNotAvailable:
-		return "kafka server: The broker is not available."
+		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
 	case ErrReplicaNotAvailable:
 	case ErrReplicaNotAvailable:
-		return "kafka server: The replica is not available for the requested topic-partition."
-	case ErrMessageTooLarge:
-		return "kafka server: The request included a message larger than the max message size the server will accept."
-	case ErrStaleControllerEpoch:
-		return "kafka server: The controller moved to another broker."
+		return "kafka server: Replica information not available, one or more brokers are down."
+	case ErrMessageSizeTooLarge:
+		return "kafka server: Message was too large, server rejected it to avoid allocation error."
+	case ErrStaleControllerEpochCode:
+		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
 	case ErrOffsetMetadataTooLarge:
 	case ErrOffsetMetadataTooLarge:
-		return "kafka server: The metadata field of the offset request was too large."
+		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
 	case ErrNetworkException:
 	case ErrNetworkException:
 		return "kafka server: The server disconnected before a response was received."
 		return "kafka server: The server disconnected before a response was received."
-	case ErrCoordinatorLoadInProgress:
-		return "kafka server: The coordinator is loading and hence can't process requests."
-	case ErrCoordinatorNotAvailable:
-		return "kafka server: The coordinator is not available."
-	case ErrNotCoordinator:
-		return "kafka server: This is not the correct coordinator."
-	case ErrInvalidTopicException:
+	case ErrOffsetsLoadInProgress:
+		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
+	case ErrConsumerCoordinatorNotAvailable:
+		return "kafka server: Offset's topic has not yet been created."
+	case ErrNotCoordinatorForConsumer:
+		return "kafka server: Request was for a consumer group that is not coordinated by this broker."
+	case ErrInvalidTopic:
 		return "kafka server: The request attempted to perform an operation on an invalid topic."
 		return "kafka server: The request attempted to perform an operation on an invalid topic."
-	case ErrRecordListTooLarge:
+	case ErrMessageSetSizeTooLarge:
 		return "kafka server: The request included message batch larger than the configured segment size on the server."
 		return "kafka server: The request included message batch larger than the configured segment size on the server."
 	case ErrNotEnoughReplicas:
 	case ErrNotEnoughReplicas:
 		return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
 		return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
 	case ErrNotEnoughReplicasAfterAppend:
 	case ErrNotEnoughReplicasAfterAppend:
 		return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
 		return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
 	case ErrInvalidRequiredAcks:
 	case ErrInvalidRequiredAcks:
-		return "kafka server: Produce request specified an invalid value for required acks."
+		return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)."
 	case ErrIllegalGeneration:
 	case ErrIllegalGeneration:
-		return "kafka server: Specified group generation id is not valid."
+		return "kafka server: The provided generation id is not the current generation."
 	case ErrInconsistentGroupProtocol:
 	case ErrInconsistentGroupProtocol:
-		return "kafka server: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list."
-	case ErrInvalidGroupID:
-		return "kafka server: The configured groupId is invalid."
-	case ErrUnknownMemberID:
-		return "kafka server: The coordinator is not aware of this member."
+		return "kafka server: The provider group protocol type is incompatible with the other members."
+	case ErrInvalidGroupId:
+		return "kafka server: The provided group id was empty."
+	case ErrUnknownMemberId:
+		return "kafka server: The provided member is not known in the current generation."
 	case ErrInvalidSessionTimeout:
 	case ErrInvalidSessionTimeout:
-		return "kafka server: The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)."
+		return "kafka server: The provided session timeout is outside the allowed range."
 	case ErrRebalanceInProgress:
 	case ErrRebalanceInProgress:
-		return "kafka server: The group is rebalancing, so a rejoin is needed."
+		return "kafka server: A rebalance for the group is in progress. Please re-join the group."
 	case ErrInvalidCommitOffsetSize:
 	case ErrInvalidCommitOffsetSize:
-		return "kafka server: The committing offset data size is not valid."
+		return "kafka server: The provided commit metadata was too large."
 	case ErrTopicAuthorizationFailed:
 	case ErrTopicAuthorizationFailed:
-		return "kafka server: Not authorized to access topics: [Topic authorization failed.]"
+		return "kafka server: The client is not authorized to access this topic."
 	case ErrGroupAuthorizationFailed:
 	case ErrGroupAuthorizationFailed:
-		return "kafka server: Not authorized to access group: Group authorization failed."
+		return "kafka server: The client is not authorized to access this group."
 	case ErrClusterAuthorizationFailed:
 	case ErrClusterAuthorizationFailed:
-		return "kafka server: Cluster authorization failed."
+		return "kafka server: The client is not authorized to send this request type."
 	case ErrInvalidTimestamp:
 	case ErrInvalidTimestamp:
 		return "kafka server: The timestamp of the message is out of acceptable range."
 		return "kafka server: The timestamp of the message is out of acceptable range."
 	case ErrUnsupportedSASLMechanism:
 	case ErrUnsupportedSASLMechanism:
@@ -240,9 +240,9 @@ func (err KError) Error() string {
 	case ErrTopicAlreadyExists:
 	case ErrTopicAlreadyExists:
 		return "kafka server: Topic with this name already exists."
 		return "kafka server: Topic with this name already exists."
 	case ErrInvalidPartitions:
 	case ErrInvalidPartitions:
-		return "kafka server: Number of partitions is below 1."
+		return "kafka server: Number of partitions is invalid."
 	case ErrInvalidReplicationFactor:
 	case ErrInvalidReplicationFactor:
-		return "kafka server: Replication factor is below 1 or larger than the number of available brokers."
+		return "kafka server: Replication-factor is invalid."
 	case ErrInvalidReplicaAssignment:
 	case ErrInvalidReplicaAssignment:
 		return "kafka server: Replica assignment is invalid."
 		return "kafka server: Replica assignment is invalid."
 	case ErrInvalidConfig:
 	case ErrInvalidConfig:
@@ -252,7 +252,7 @@ func (err KError) Error() string {
 	case ErrInvalidRequest:
 	case ErrInvalidRequest:
 		return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details."
 		return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details."
 	case ErrUnsupportedForMessageFormat:
 	case ErrUnsupportedForMessageFormat:
-		return "kafka server: The message format version on the broker does not support the request."
+		return "kafka server: The requested operation is not supported by the message format version."
 	case ErrPolicyViolation:
 	case ErrPolicyViolation:
 		return "kafka server: Request parameters do not satisfy the configured policy."
 		return "kafka server: Request parameters do not satisfy the configured policy."
 	case ErrOutOfOrderSequenceNumber:
 	case ErrOutOfOrderSequenceNumber:
@@ -260,31 +260,31 @@ func (err KError) Error() string {
 	case ErrDuplicateSequenceNumber:
 	case ErrDuplicateSequenceNumber:
 		return "kafka server: The broker received a duplicate sequence number."
 		return "kafka server: The broker received a duplicate sequence number."
 	case ErrInvalidProducerEpoch:
 	case ErrInvalidProducerEpoch:
-		return "kafka server: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker."
+		return "kafka server: Producer attempted an operation with an old epoch."
 	case ErrInvalidTxnState:
 	case ErrInvalidTxnState:
 		return "kafka server: The producer attempted a transactional operation in an invalid state."
 		return "kafka server: The producer attempted a transactional operation in an invalid state."
 	case ErrInvalidProducerIDMapping:
 	case ErrInvalidProducerIDMapping:
 		return "kafka server: The producer attempted to use a producer id which is not currently assigned to its transactional id."
 		return "kafka server: The producer attempted to use a producer id which is not currently assigned to its transactional id."
 	case ErrInvalidTransactionTimeout:
 	case ErrInvalidTransactionTimeout:
-		return "kafka server: The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)."
+		return "kafka server: The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms)."
 	case ErrConcurrentTransactions:
 	case ErrConcurrentTransactions:
 		return "kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing."
 		return "kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing."
 	case ErrTransactionCoordinatorFenced:
 	case ErrTransactionCoordinatorFenced:
-		return "kafka server: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer."
+		return "kafka server: The transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer."
 	case ErrTransactionalIDAuthorizationFailed:
 	case ErrTransactionalIDAuthorizationFailed:
-		return "kafka server: Transactional Id authorization failed."
+		return "kafka server: Transactional ID authorization failed."
 	case ErrSecurityDisabled:
 	case ErrSecurityDisabled:
 		return "kafka server: Security features are disabled."
 		return "kafka server: Security features are disabled."
 	case ErrOperationNotAttempted:
 	case ErrOperationNotAttempted:
-		return "kafka server: The broker did not attempt to execute this operation. This may happen for batched Rpcs where some operations in the batch failed, causing the broker to respond without trying the rest."
+		return "kafka server: The broker did not attempt to execute this operation."
 	case ErrKafkaStorageError:
 	case ErrKafkaStorageError:
 		return "kafka server: Disk error when trying to access log file on the disk."
 		return "kafka server: Disk error when trying to access log file on the disk."
 	case ErrLogDirNotFound:
 	case ErrLogDirNotFound:
-		return "kafka server: The user-specified log directory is not found in the broker config."
+		return "kafka server: The specified log directory is not found in the broker config."
 	case ErrSASLAuthenticationFailed:
 	case ErrSASLAuthenticationFailed:
 		return "kafka server: SASL Authentication failed."
 		return "kafka server: SASL Authentication failed."
 	case ErrUnknownProducerID:
 	case ErrUnknownProducerID:
-		return "kafka server: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception."
+		return "kafka server: The broker could not locate the producer metadata associated with the Producer ID."
 	case ErrReassignmentInProgress:
 	case ErrReassignmentInProgress:
 		return "kafka server: A partition reassignment is in progress."
 		return "kafka server: A partition reassignment is in progress."
 	case ErrDelegationTokenAuthDisabled:
 	case ErrDelegationTokenAuthDisabled:

+ 2 - 2
find_coordinator_response_test.go

@@ -51,7 +51,7 @@ func TestFindCoordinatorResponse(t *testing.T) {
 		desc: "version 0 - error",
 		desc: "version 0 - error",
 		response: &FindCoordinatorResponse{
 		response: &FindCoordinatorResponse{
 			Version:     0,
 			Version:     0,
-			Err:         ErrCoordinatorNotAvailable,
+			Err:         ErrConsumerCoordinatorNotAvailable,
 			Coordinator: NoNode,
 			Coordinator: NoNode,
 		},
 		},
 		encoded: []byte{
 		encoded: []byte{
@@ -65,7 +65,7 @@ func TestFindCoordinatorResponse(t *testing.T) {
 		response: &FindCoordinatorResponse{
 		response: &FindCoordinatorResponse{
 			Version:      1,
 			Version:      1,
 			ThrottleTime: 100 * time.Millisecond,
 			ThrottleTime: 100 * time.Millisecond,
-			Err:          ErrCoordinatorNotAvailable,
+			Err:          ErrConsumerCoordinatorNotAvailable,
 			ErrMsg:       &errMsg,
 			ErrMsg:       &errMsg,
 			Coordinator:  NoNode,
 			Coordinator:  NoNode,
 		},
 		},

+ 2 - 2
leave_group_response_test.go

@@ -18,7 +18,7 @@ func TestLeaveGroupResponse(t *testing.T) {
 
 
 	response = new(LeaveGroupResponse)
 	response = new(LeaveGroupResponse)
 	testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0)
 	testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0)
-	if response.Err != ErrUnknownMemberID {
-		t.Error("Decoding error failed: ErrUnknownMemberID expected but found", response.Err)
+	if response.Err != ErrUnknownMemberId {
+		t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err)
 	}
 	}
 }
 }

+ 1 - 1
metadata_response_test.go

@@ -164,7 +164,7 @@ func TestMetadataResponseWithTopicsV0(t *testing.T) {
 		t.Fatal("Decoding produced invalid partition count for topic 0.")
 		t.Fatal("Decoding produced invalid partition count for topic 0.")
 	}
 	}
 
 
-	if response.Topics[0].Partitions[0].Err != ErrInvalidFetchSize {
+	if response.Topics[0].Partitions[0].Err != ErrInvalidMessageSize {
 		t.Error("Decoding produced invalid topic 0 partition 0 error.")
 		t.Error("Decoding produced invalid topic 0 partition 0 error.")
 	}
 	}
 
 

+ 2 - 2
mocks/consumer_test.go

@@ -170,8 +170,8 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
 	consumer := NewConsumer(trm, nil)
 	consumer := NewConsumer(trm, nil)
 
 
 	pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
 	pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
-	pcmock.YieldError(sarama.ErrCorruptMessage)
-	pcmock.YieldError(sarama.ErrCorruptMessage)
+	pcmock.YieldError(sarama.ErrInvalidMessage)
+	pcmock.YieldError(sarama.ErrInvalidMessage)
 	pcmock.ExpectErrorsDrainedOnClose()
 	pcmock.ExpectErrorsDrainedOnClose()
 
 
 	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
 	pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)

+ 4 - 4
offset_manager.go

@@ -151,13 +151,13 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 	switch block.Err {
 	switch block.Err {
 	case ErrNoError:
 	case ErrNoError:
 		return block.Offset, block.Metadata, nil
 		return block.Offset, block.Metadata, nil
-	case ErrNotCoordinator:
+	case ErrNotCoordinatorForConsumer:
 		if retries <= 0 {
 		if retries <= 0 {
 			return 0, "", block.Err
 			return 0, "", block.Err
 		}
 		}
 		om.releaseCoordinator(broker)
 		om.releaseCoordinator(broker)
 		return om.fetchInitialOffset(topic, partition, retries-1)
 		return om.fetchInitialOffset(topic, partition, retries-1)
-	case ErrCoordinatorLoadInProgress:
+	case ErrOffsetsLoadInProgress:
 		if retries <= 0 {
 		if retries <= 0 {
 			return 0, "", block.Err
 			return 0, "", block.Err
 		}
 		}
@@ -316,13 +316,13 @@ func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest
 				block := req.blocks[pom.topic][pom.partition]
 				block := req.blocks[pom.topic][pom.partition]
 				pom.updateCommitted(block.offset, block.metadata)
 				pom.updateCommitted(block.offset, block.metadata)
 			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
 			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
-				ErrCoordinatorNotAvailable, ErrNotCoordinator:
+				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
 				// not a critical error, we just need to redispatch
 				// not a critical error, we just need to redispatch
 				om.releaseCoordinator(broker)
 				om.releaseCoordinator(broker)
 			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
 			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
 				// nothing we can do about this, just tell the user and carry on
 				// nothing we can do about this, just tell the user and carry on
 				pom.handleError(err)
 				pom.handleError(err)
-			case ErrCoordinatorLoadInProgress:
+			case ErrOffsetsLoadInProgress:
 				// nothing wrong but we didn't commit, we'll get it next time round
 				// nothing wrong but we didn't commit, we'll get it next time round
 				break
 				break
 			case ErrUnknownTopicOrPartition:
 			case ErrUnknownTopicOrPartition:

+ 4 - 4
offset_manager_test.go

@@ -87,14 +87,14 @@ func TestNewOffsetManager(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// Test recovery from ErrNotCoordinator
+// Test recovery from ErrNotCoordinatorForConsumer
 // on first fetchInitialOffset call
 // on first fetchInitialOffset call
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 
 
 	// Error on first fetchInitialOffset call
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
 	responseBlock := OffsetFetchResponseBlock{
-		Err:      ErrNotCoordinator,
+		Err:      ErrNotCoordinatorForConsumer,
 		Offset:   5,
 		Offset:   5,
 		Metadata: "test_meta",
 		Metadata: "test_meta",
 	}
 	}
@@ -131,13 +131,13 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
 	safeClose(t, testClient)
 	safeClose(t, testClient)
 }
 }
 
 
-// Test fetchInitialOffset retry on ErrCoordinatorLoadInProgress
+// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
 func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 	om, testClient, broker, coordinator := initOffsetManager(t, 0)
 
 
 	// Error on first fetchInitialOffset call
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
 	responseBlock := OffsetFetchResponseBlock{
-		Err:      ErrCoordinatorLoadInProgress,
+		Err:      ErrOffsetsLoadInProgress,
 		Offset:   5,
 		Offset:   5,
 		Metadata: "test_meta",
 		Metadata: "test_meta",
 	}
 	}

+ 5 - 5
produce_response_test.go

@@ -18,7 +18,7 @@ var (
 			0x00, 0x00, 0x00, 0x01,
 			0x00, 0x00, 0x00, 0x01,
 
 
 			0x00, 0x00, 0x00, 0x01, // Partition 1
 			0x00, 0x00, 0x00, 0x01, // Partition 1
-			0x00, 0x02, // ErrCorruptMessage
+			0x00, 0x02, // ErrInvalidMessage
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 		}, {
 		}, {
 			0x00, 0x00, 0x00, 0x01,
 			0x00, 0x00, 0x00, 0x01,
@@ -27,7 +27,7 @@ var (
 			0x00, 0x00, 0x00, 0x01,
 			0x00, 0x00, 0x00, 0x01,
 
 
 			0x00, 0x00, 0x00, 0x01, // Partition 1
 			0x00, 0x00, 0x00, 0x01, // Partition 1
-			0x00, 0x02, // ErrCorruptMessage
+			0x00, 0x02, // ErrInvalidMessage
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 
 
 			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
 			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
@@ -38,7 +38,7 @@ var (
 			0x00, 0x00, 0x00, 0x01,
 			0x00, 0x00, 0x00, 0x01,
 
 
 			0x00, 0x00, 0x00, 0x01, // Partition 1
 			0x00, 0x00, 0x00, 0x01, // Partition 1
-			0x00, 0x02, // ErrCorruptMessage
+			0x00, 0x02, // ErrInvalidMessage
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
 			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
 
 
@@ -68,7 +68,7 @@ func TestProduceResponseDecode(t *testing.T) {
 		if block == nil {
 		if block == nil {
 			t.Error("Decoding did not produce a block for foo/1")
 			t.Error("Decoding did not produce a block for foo/1")
 		} else {
 		} else {
-			if block.Err != ErrCorruptMessage {
+			if block.Err != ErrInvalidMessage {
 				t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
 				t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
 			}
 			}
 			if block.Offset != 255 {
 			if block.Offset != 255 {
@@ -95,7 +95,7 @@ func TestProduceResponseEncode(t *testing.T) {
 
 
 	response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
 	response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
 	response.Blocks["foo"][1] = &ProduceResponseBlock{
 	response.Blocks["foo"][1] = &ProduceResponseBlock{
-		Err:       ErrCorruptMessage,
+		Err:       ErrInvalidMessage,
 		Offset:    255,
 		Offset:    255,
 		Timestamp: time.Unix(1, 0),
 		Timestamp: time.Unix(1, 0),
 	}
 	}