Browse Source

Merge pull request #1142 from andy2046/add_Timeout_for_ClusterAdmin

add Timeout config for ClusterAdmin related Requests
Evan Huus 7 years ago
parent
commit
e7238b119b
3 changed files with 46 additions and 2 deletions
  1. 9 2
      admin.go
  2. 15 0
      config.go
  3. 22 0
      config_test.go

+ 9 - 2
admin.go

@@ -118,6 +118,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 	request := &CreateTopicsRequest{
 		TopicDetails: topicDetails,
 		ValidateOnly: validateOnly,
+		Timeout:      ca.conf.Admin.Timeout,
 	}
 
 	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
@@ -155,7 +156,10 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
 		return ErrInvalidTopic
 	}
 
-	request := &DeleteTopicsRequest{Topics: []string{topic}}
+	request := &DeleteTopicsRequest{
+		Topics:  []string{topic},
+		Timeout: ca.conf.Admin.Timeout,
+	}
 
 	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
 		request.Version = 1
@@ -192,6 +196,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
 
 	request := &CreatePartitionsRequest{
 		TopicPartitions: topicPartitions,
+		Timeout:         ca.conf.Admin.Timeout,
 	}
 
 	b, err := ca.Controller()
@@ -225,7 +230,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
 	topics := make(map[string]*DeleteRecordsRequestTopic)
 	topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
 	request := &DeleteRecordsRequest{
-		Topics: topics}
+		Topics:  topics,
+		Timeout: ca.conf.Admin.Timeout,
+	}
 
 	b, err := ca.Controller()
 	if err != nil {

+ 15 - 0
config.go

@@ -18,6 +18,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
 
 // Config is used to pass multiple configuration options to Sarama's constructors.
 type Config struct {
+	// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
+	Admin struct {
+		// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
+		// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
+		Timeout time.Duration
+	}
+
 	// Net is the namespace for network-level properties used by the Broker, and
 	// shared by the Client/Producer/Consumer.
 	Net struct {
@@ -292,6 +299,8 @@ type Config struct {
 func NewConfig() *Config {
 	c := &Config{}
 
+	c.Admin.Timeout = 3 * time.Second
+
 	c.Net.MaxOpenRequests = 5
 	c.Net.DialTimeout = 30 * time.Second
 	c.Net.ReadTimeout = 30 * time.Second
@@ -391,6 +400,12 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
 	}
 
+	// validate the Admin values
+	switch {
+	case c.Admin.Timeout <= 0:
+		return ConfigurationError("Admin.Timeout must be > 0")
+	}
+
 	// validate the Metadata values
 	switch {
 	case c.Metadata.Retry.Max < 0:

+ 22 - 0
config_test.go

@@ -122,6 +122,28 @@ func TestMetadataConfigValidates(t *testing.T) {
 	}
 }
 
+func TestAdminConfigValidates(t *testing.T) {
+	tests := []struct {
+		name string
+		cfg  func(*Config) // resorting to using a function as a param because of internal composite structs
+		err  string
+	}{
+		{"Timeout",
+			func(cfg *Config) {
+				cfg.Admin.Timeout = 0
+			},
+			"Admin.Timeout must be > 0"},
+	}
+
+	for i, test := range tests {
+		c := NewConfig()
+		test.cfg(c)
+		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
+			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
+		}
+	}
+}
+
 func TestProducerConfigValidates(t *testing.T) {
 	tests := []struct {
 		name string