Просмотр исходного кода

Add additional error handling based on spec

Gwen posted to the spec a list of expected error codes for each
request/response. This adds handling for the listed codes for consumer, client,
and offset manager. Some errors (e.g. around group membership) have been ignored
because we don't implement the relevant feature yet anyways.
Possible errors for the producer are not yet listed in the spec.
Evan Huus 9 лет назад
Родитель
Сommit
bed560f780
3 измененных файлов с 12 добавлено и 4 удалено
  1. 1 1
      client.go
  2. 1 1
      consumer.go
  3. 10 2
      offset_manager.go

+ 1 - 1
client.go

@@ -632,7 +632,7 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
 		switch topic.Err {
 		switch topic.Err {
 		case ErrNoError:
 		case ErrNoError:
 			break
 			break
-		case ErrInvalidTopic: // don't retry, don't store partial results
+		case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
 			err = topic.Err
 			err = topic.Err
 			continue
 			continue
 		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
 		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results

+ 1 - 1
consumer.go

@@ -642,7 +642,7 @@ func (bc *brokerConsumer) handleResponses() {
 			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
 			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
 			close(child.trigger)
 			close(child.trigger)
 			delete(bc.subscriptions, child)
 			delete(bc.subscriptions, child)
-		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
 			// not an error, but does need redispatching
 			// not an error, but does need redispatching
 			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
 			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
 				bc.broker.ID(), child.topic, child.partition, result)
 				bc.broker.ID(), child.topic, child.partition, result)

+ 10 - 2
offset_manager.go

@@ -462,11 +462,19 @@ func (bom *brokerOffsetManager) flushToBroker() {
 		case ErrNoError:
 		case ErrNoError:
 			block := request.blocks[s.topic][s.partition]
 			block := request.blocks[s.topic][s.partition]
 			s.updateCommitted(block.offset, block.metadata)
 			s.updateCommitted(block.offset, block.metadata)
-			break
-		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+			ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
+			// not a critical error, we just need to redispatch
 			delete(bom.subscriptions, s)
 			delete(bom.subscriptions, s)
 			s.rebalance <- none{}
 			s.rebalance <- none{}
+		case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
+			// nothing we can do about this, just tell the user and carry on
+			s.handleError(err)
+		case ErrOffsetsLoadInProgress:
+			// nothing wrong but we didn't commit, we'll get it next time round
+			break
 		default:
 		default:
+			// dunno, tell the user and try redispatching
 			s.handleError(err)
 			s.handleError(err)
 			delete(bom.subscriptions, s)
 			delete(bom.subscriptions, s)
 			s.rebalance <- none{}
 			s.rebalance <- none{}