Browse Source

Merge pull request #1464 from MikeSchlosser16/add-kafka-errors

Handle missed error codes on TopicMetaDataRequest and GroupCoordinatorRequest
Vlad Gorodetsky 5 years ago
parent
commit
4b0a0e5b4b
2 changed files with 12 additions and 0 deletions
  1. 9 0
      client.go
  2. 3 0
      errors.go

+ 9 - 0
client.go

@@ -803,6 +803,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
 				Logger.Println("client/metadata failed SASL authentication")
 				return err
 			}
+
+			if err.(KError) == ErrTopicAuthorizationFailed {
+				Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
+				return err
+			}
 			// else remove that broker and try again
 			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
 			_ = broker.Close()
@@ -966,6 +971,10 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
 			}
 
 			return retry(ErrConsumerCoordinatorNotAvailable)
+		case ErrGroupAuthorizationFailed:
+			Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
+			return retry(ErrGroupAuthorizationFailed)
+
 		default:
 			return nil, response.Err
 		}

+ 3 - 0
errors.go

@@ -188,6 +188,7 @@ const (
 	ErrMemberIdRequired                   KError = 79
 	ErrPreferredLeaderNotAvailable        KError = 80
 	ErrGroupMaxSizeReached                KError = 81
+	ErrFencedInstancedId                  KError = 82
 )
 
 func (err KError) Error() string {
@@ -360,6 +361,8 @@ func (err KError) Error() string {
 		return "kafka server: The preferred leader was not available"
 	case ErrGroupMaxSizeReached:
 		return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
+	case ErrFencedInstancedId:
+		return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)