Browse Source

fix: retry topic request on ControllerNotAvailable

If an admin request to Create/Delete Topic{Partitions} receives a
ControllerNotAvailable kError back from Kafka, we should expire the
cached controller and refresh it from metadata.

Additionally add a `Retry` section to the `Admin` config similarly to
the Java AdminKafkaClient and consider this case one that is eligble to
be re-tried.

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 5 years ago
parent
commit
5160d7db71
4 changed files with 196 additions and 51 deletions
  1. 103 50
      admin.go
  2. 48 0
      admin_test.go
  3. 36 1
      client.go
  4. 9 0
      config.go

+ 103 - 50
admin.go

@@ -6,6 +6,7 @@ import (
 	"math/rand"
 	"strconv"
 	"sync"
+	"time"
 )
 
 // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -134,8 +135,45 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
 	return ca.client.Controller()
 }
 
-func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+	return ca.client.RefreshController()
+}
+
+// isErrNoController returns `true` if the given error type unwraps to an
+// `ErrNotController` response from Kafka
+func isErrNoController(err error) bool {
+	switch e := err.(type) {
+	case *TopicError:
+		return e.Err == ErrNotController
+	case *TopicPartitionError:
+		return e.Err == ErrNotController
+	case KError:
+		return e == ErrNotController
+	}
+	return false
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retriable (as determined by the
+// provided retriable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
+	var err error
+	for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
+		err = fn()
+		if err == nil || !retriable(err) {
+			return err
+		}
+		Logger.Printf(
+			"admin/request retrying after %dms... (%d attempts remaining)\n",
+			ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
+		time.Sleep(ca.conf.Admin.Retry.Backoff)
+		continue
+	}
+	return err
+}
 
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
 	if topic == "" {
 		return ErrInvalidTopic
 	}
@@ -160,26 +198,31 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 		request.Version = 2
 	}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
-	}
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
 
-	rsp, err := b.CreateTopics(request)
-	if err != nil {
-		return err
-	}
+		rsp, err := b.CreateTopics(request)
+		if err != nil {
+			return err
+		}
 
-	topicErr, ok := rsp.TopicErrors[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
+		topicErr, ok := rsp.TopicErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
 
-	if topicErr.Err != ErrNoError {
-		return topicErr
-	}
+		if topicErr.Err != ErrNoError {
+			if topicErr.Err == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
 
-	return nil
+		return nil
+	})
 }
 
 func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -320,7 +363,6 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
 }
 
 func (ca *clusterAdmin) DeleteTopic(topic string) error {
-
 	if topic == "" {
 		return ErrInvalidTopic
 	}
@@ -334,25 +376,31 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
 		request.Version = 1
 	}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
-	}
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
 
-	rsp, err := b.DeleteTopics(request)
-	if err != nil {
-		return err
-	}
+		rsp, err := b.DeleteTopics(request)
+		if err != nil {
+			return err
+		}
 
-	topicErr, ok := rsp.TopicErrorCodes[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
+		topicErr, ok := rsp.TopicErrorCodes[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
 
-	if topicErr != ErrNoError {
-		return topicErr
-	}
-	return nil
+		if topicErr != ErrNoError {
+			if topicErr == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
 }
 
 func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -368,26 +416,31 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
 		Timeout:         ca.conf.Admin.Timeout,
 	}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
-	}
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
 
-	rsp, err := b.CreatePartitions(request)
-	if err != nil {
-		return err
-	}
+		rsp, err := b.CreatePartitions(request)
+		if err != nil {
+			return err
+		}
 
-	topicErr, ok := rsp.TopicPartitionErrors[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
+		topicErr, ok := rsp.TopicPartitionErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
 
-	if topicErr.Err != ErrNoError {
-		return topicErr
-	}
+		if topicErr.Err != ErrNoError {
+			if topicErr.Err == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
 
-	return nil
+		return nil
+	})
 }
 
 func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {

+ 48 - 0
admin_test.go

@@ -1104,3 +1104,51 @@ func TestDeleteConsumerGroup(t *testing.T) {
 	}
 
 }
+
+// TestRefreshMetaDataWithDifferentController ensures that the cached
+// controller can be forcibly updated from Metadata by the admin client
+func TestRefreshMetaDataWithDifferentController(t *testing.T) {
+	seedBroker1 := NewMockBroker(t, 1)
+	seedBroker2 := NewMockBroker(t, 2)
+	defer seedBroker1.Close()
+	defer seedBroker2.Close()
+
+	seedBroker1.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker1.BrokerID()).
+			SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
+			SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
+	})
+
+	config := NewConfig()
+	config.Version = V1_1_0_0
+
+	client, err := NewClient([]string{seedBroker1.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	ca := clusterAdmin{client: client, conf: config}
+
+	if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
+		t.Fatalf("expected cached controller to be %d rather than %d",
+			seedBroker1.BrokerID(), b.ID())
+	}
+
+	seedBroker1.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker2.BrokerID()).
+			SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
+			SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
+	})
+
+	if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
+		t.Fatalf("expected refreshed controller to be %d rather than %d",
+			seedBroker2.BrokerID(), b.ID())
+	}
+
+	if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
+		t.Fatalf("expected cached controller to be %d rather than %d",
+			seedBroker2.BrokerID(), b.ID())
+	}
+}

+ 36 - 1
client.go

@@ -17,9 +17,15 @@ type Client interface {
 	// altered after it has been created.
 	Config() *Config
 
-	// Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
+	// Controller returns the cluster controller broker. It will return a
+	// locally cached value if it's available. You can call RefreshController
+	// to update the cached value. Requires Kafka 0.10 or higher.
 	Controller() (*Broker, error)
 
+	// RefreshController retrieves the cluster controller from fresh metadata
+	// and stores it in the local cache. Requires Kafka 0.10 or higher.
+	RefreshController() (*Broker, error)
+
 	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
 	Brokers() []*Broker
 
@@ -487,6 +493,35 @@ func (client *client) Controller() (*Broker, error) {
 	return controller, nil
 }
 
+// deregisterController removes the cached controllerID
+func (client *client) deregisterController() {
+	client.lock.Lock()
+	defer client.lock.Unlock()
+	delete(client.brokers, client.controllerID)
+}
+
+// RefreshController retrieves the cluster controller from fresh metadata
+// and stores it in the local cache. Requires Kafka 0.10 or higher.
+func (client *client) RefreshController() (*Broker, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	client.deregisterController()
+
+	if err := client.refreshMetadata(); err != nil {
+		return nil, err
+	}
+
+	controller := client.cachedController()
+	if controller == nil {
+		return nil, ErrControllerNotAvailable
+	}
+
+	_ = controller.Open(client.conf)
+	return controller, nil
+}
+
 func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient

+ 9 - 0
config.go

@@ -21,6 +21,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
 type Config struct {
 	// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
 	Admin struct {
+		Retry struct {
+			// The total number of times to retry sending (retriable) admin requests (default 5).
+			// Similar to the `retries` setting of the JVM AdminClientConfig.
+			Max int
+			// Backoff time between retries of a failed request (default 100ms)
+			Backoff time.Duration
+		}
 		// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
 		// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
 		Timeout time.Duration
@@ -408,6 +415,8 @@ type Config struct {
 func NewConfig() *Config {
 	c := &Config{}
 
+	c.Admin.Retry.Max = 5
+	c.Admin.Retry.Backoff = 100 * time.Millisecond
 	c.Admin.Timeout = 3 * time.Second
 
 	c.Net.MaxOpenRequests = 5