Browse Source

Merge remote-tracking branch 'shopify_sarama/master'

antshbean 5 years ago
parent
commit
0cce202975
22 changed files with 604 additions and 89 deletions
  1. 0 1
      .travis.yml
  2. 38 0
      CHANGELOG.md
  3. 1 1
      README.md
  4. 54 4
      admin.go
  5. 146 8
      admin_test.go
  6. 13 1
      client.go
  7. 12 4
      config.go
  8. 11 15
      config_resource_type.go
  9. 18 12
      consumer_group.go
  10. 1 1
      dev.yml
  11. 0 3
      functional_producer_test.go
  12. 1 1
      go.mod
  13. 2 0
      go.sum
  14. 68 19
      mockresponses.go
  15. 4 4
      mocks/consumer.go
  16. 6 1
      offset_manager.go
  17. 80 5
      offset_manager_test.go
  18. 2 1
      produce_set.go
  19. 59 2
      produce_set_test.go
  20. 74 4
      tools/kafka-producer-performance/main.go
  21. 3 1
      utils.go
  22. 11 1
      vagrant/halt_cluster.sh

+ 0 - 1
.travis.yml

@@ -1,7 +1,6 @@
 dist: xenial
 language: go
 go:
-- 1.11.x
 - 1.12.x
 - 1.13.x
 

+ 38 - 0
CHANGELOG.md

@@ -1,5 +1,43 @@
 # Changelog
 
+#### Version 1.25.0 (2020-01-13)
+
+New Features:
+- Support TLS protocol in kafka-producer-performance
+  ([1538](https://github.com/Shopify/sarama/pull/1538)).
+- Add support for kafka 2.4.0
+  ([1552](https://github.com/Shopify/sarama/pull/1552)).
+
+Improvements:
+- Allow the Consumer to disable auto-commit offsets
+  ([1164](https://github.com/Shopify/sarama/pull/1164)).
+- Produce records with consistent timestamps
+  ([1455](https://github.com/Shopify/sarama/pull/1455)).
+
+Bug Fixes:
+- Fix incorrect SetTopicMetadata name mentions
+  ([1534](https://github.com/Shopify/sarama/pull/1534)).
+- Fix client.tryRefreshMetadata Println
+  ([1535](https://github.com/Shopify/sarama/pull/1535)).
+- Fix panic on calling updateMetadata on closed client
+  ([1531](https://github.com/Shopify/sarama/pull/1531)).
+- Fix possible faulty metrics in TestFuncProducing
+  ([1545](https://github.com/Shopify/sarama/pull/1545)).
+
+#### Version 1.24.1 (2019-10-31)
+
+New Features:
+- Add DescribeLogDirs Request/Response pair
+  ([1520](https://github.com/Shopify/sarama/pull/1520)).
+
+Bug Fixes:
+- Fix ClusterAdmin returning invalid controller ID on DescribeCluster
+  ([1518](https://github.com/Shopify/sarama/pull/1518)).
+- Fix issue with consumergroup not rebalancing when new partition is added
+  ([1525](https://github.com/Shopify/sarama/pull/1525)).
+- Ensure consistent use of read/write deadlines
+  ([1529](https://github.com/Shopify/sarama/pull/1529)).
+
 #### Version 1.24.0 (2019-10-09)
 
 New Features:

+ 1 - 1
README.md

@@ -20,7 +20,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
 Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
 the two latest stable releases of Kafka and Go, and we provide a two month
 grace period for older releases. This means we currently officially support
-Go 1.11 through 1.13, and Kafka 2.1 through 2.3, although older releases are
+Go 1.12 through 1.13, and Kafka 2.1 through 2.4, although older releases are
 still likely to work.
 
 Sarama follows semantic versioning and provides API stability via the gopkg.in service.

+ 54 - 4
admin.go

@@ -2,7 +2,9 @@ package sarama
 
 import (
 	"errors"
+	"fmt"
 	"math/rand"
+	"strconv"
 	"sync"
 )
 
@@ -214,7 +216,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
 		Topics: []string{},
 	}
 
-	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+	if ca.conf.Version.IsAtLeast(V0_10_0_0) {
 		request.Version = 1
 	}
 
@@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
 	return response.Brokers, response.ControllerID, nil
 }
 
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+	brokers := ca.client.Brokers()
+	for _, b := range brokers {
+		if b.ID() == id {
+			return b, nil
+		}
+	}
+	return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
 func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
 	brokers := ca.client.Brokers()
 	if len(brokers) > 0 {
@@ -432,8 +444,14 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
 	return nil
 }
 
-func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+	return (resource.Type == BrokerResource && resource.Name != "") ||
+		resource.Type == BrokerLoggerResource
+}
 
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
 	var entries []ConfigEntry
 	var resources []*ConfigResource
 	resources = append(resources, &resource)
@@ -442,11 +460,31 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
 		Resources: resources,
 	}
 
-	b, err := ca.Controller()
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		request.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 2
+	}
+
+	var (
+		b   *Broker
+		err error
+	)
+
+	// DescribeConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(resource) {
+		id, _ := strconv.Atoi(resource.Name)
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return nil, err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.DescribeConfigs(request)
 	if err != nil {
 		return nil, err
@@ -479,11 +517,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
 		ValidateOnly: validateOnly,
 	}
 
-	b, err := ca.Controller()
+	var (
+		b   *Broker
+		err error
+	)
+
+	// AlterConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+		id, _ := strconv.Atoi(name)
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.AlterConfigs(request)
 	if err != nil {
 		return err

+ 146 - 8
admin_test.go

@@ -488,21 +488,105 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
 		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
 	})
 
+	var tests = []struct {
+		saramaVersion   KafkaVersion
+		requestVersion  int16
+		includeSynonyms bool
+	}{
+		{V1_0_0_0, 0, false},
+		{V1_1_0_0, 1, true},
+		{V1_1_1_0, 1, true},
+		{V2_0_0_0, 2, true},
+	}
+	for _, tt := range tests {
+		config := NewConfig()
+		config.Version = tt.saramaVersion
+		admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer func() {
+			_ = admin.Close()
+		}()
+
+		resource := ConfigResource{
+			Name:        "r1",
+			Type:        TopicResource,
+			ConfigNames: []string{"my_topic"},
+		}
+
+		entries, err := admin.DescribeConfig(resource)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		history := seedBroker.History()
+		describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
+		if !ok {
+			t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
+		}
+
+		if describeReq.Version != tt.requestVersion {
+			t.Fatalf(
+				"requestVersion %v did not match expected %v",
+				describeReq.Version, tt.requestVersion)
+		}
+
+		if len(entries) <= 0 {
+			t.Fatal(errors.New("no resource present"))
+		}
+		if tt.includeSynonyms {
+			if len(entries[0].Synonyms) == 0 {
+				t.Fatal("expected synonyms to have been included")
+			}
+		}
+	}
+}
+
+// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
+// is sent to the broker in the resource struct, _not_ the controller
+func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
+	controllerBroker := NewMockBroker(t, 1)
+	defer controllerBroker.Close()
+	configBroker := NewMockBroker(t, 2)
+	defer configBroker.Close()
+
+	controllerBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+	})
+
+	configBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
+	})
+
 	config := NewConfig()
 	config.Version = V1_0_0_0
-	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	admin, err := NewClusterAdmin(
+		[]string{
+			controllerBroker.Addr(),
+			configBroker.Addr(),
+		}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
-	entries, err := admin.DescribeConfig(resource)
-	if err != nil {
-		t.Fatal(err)
-	}
+	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
+		resource := ConfigResource{Name: "2", Type: resourceType}
+		entries, err := admin.DescribeConfig(resource)
+		if err != nil {
+			t.Fatal(err)
+		}
 
-	if len(entries) <= 0 {
-		t.Fatal(errors.New("no resource present"))
+		if len(entries) <= 0 {
+			t.Fatal(errors.New("no resource present"))
+		}
 	}
 
 	err = admin.Close()
@@ -544,6 +628,60 @@ func TestClusterAdminAlterConfig(t *testing.T) {
 	}
 }
 
+func TestClusterAdminAlterBrokerConfig(t *testing.T) {
+	controllerBroker := NewMockBroker(t, 1)
+	defer controllerBroker.Close()
+	configBroker := NewMockBroker(t, 2)
+	defer configBroker.Close()
+
+	controllerBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+	})
+	configBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+		"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin(
+		[]string{
+			controllerBroker.Addr(),
+			configBroker.Addr(),
+		}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var value string
+	entries := make(map[string]*string)
+	value = "3"
+	entries["min.insync.replicas"] = &value
+
+	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
+		resource := ConfigResource{Name: "2", Type: resourceType}
+		err = admin.AlterConfig(
+			resource.Type,
+			resource.Name,
+			entries,
+			false)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminCreateAcl(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()

+ 13 - 1
client.go

@@ -242,6 +242,9 @@ func (client *client) Close() error {
 }
 
 func (client *client) Closed() bool {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
 	return client.brokers == nil
 }
 
@@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
 func (client *client) registerBroker(broker *Broker) {
+	if client.brokers == nil {
+		Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
+		return
+	}
+
 	if client.brokers[broker.ID()] == nil {
 		client.brokers[broker.ID()] = broker
 		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -822,7 +830,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
 	}
 
 	if broker != nil {
-		Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+		Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
 		return retry(ErrOutOfBrokers)
 	}
 
@@ -833,6 +841,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
+	if client.Closed() {
+		return
+	}
+
 	client.lock.Lock()
 	defer client.lock.Unlock()
 

+ 12 - 4
config.go

@@ -338,8 +338,15 @@ type Config struct {
 		// offsets. This currently requires the manual use of an OffsetManager
 		// but will eventually be automated.
 		Offsets struct {
-			// How frequently to commit updated offsets. Defaults to 1s.
-			CommitInterval time.Duration
+			AutoCommit struct {
+				// Whether or not to auto-commit updated offsets back to the broker.
+				// (default enabled).
+				Enable bool
+
+				// How frequently to commit updated offsets. Ineffective unless
+				// auto-commit is enabled (default 1s)
+				Interval time.Duration
+			}
 
 			// The initial offset to use if no offset was previously committed.
 			// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
@@ -423,7 +430,8 @@ func NewConfig() *Config {
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
-	c.Consumer.Offsets.CommitInterval = 1 * time.Second
+	c.Consumer.Offsets.AutoCommit.Enable = true
+	c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Retry.Max = 3
 
@@ -650,7 +658,7 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
-	case c.Consumer.Offsets.CommitInterval <= 0:
+	case c.Consumer.Offsets.AutoCommit.Interval <= 0:
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

+ 11 - 15
config_resource_type.go

@@ -1,22 +1,18 @@
 package sarama
 
-//ConfigResourceType is a type for config resource
+// ConfigResourceType is a type for resources that have configs.
 type ConfigResourceType int8
 
-// Taken from :
-// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
+// Taken from:
+// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55
 
 const (
-	//UnknownResource constant type
-	UnknownResource ConfigResourceType = iota
-	//AnyResource constant type
-	AnyResource
-	//TopicResource constant type
-	TopicResource
-	//GroupResource constant type
-	GroupResource
-	//ClusterResource constant type
-	ClusterResource
-	//BrokerResource constant type
-	BrokerResource
+	// UnknownResource constant type
+	UnknownResource ConfigResourceType = 0
+	// TopicResource constant type
+	TopicResource ConfigResourceType = 2
+	// BrokerResource constant type
+	BrokerResource ConfigResourceType = 4
+	// BrokerLoggerResource constant type
+	BrokerLoggerResource ConfigResourceType = 8
 )

+ 18 - 12
consumer_group.go

@@ -418,12 +418,6 @@ func (c *consumerGroup) leave() error {
 }
 
 func (c *consumerGroup) handleError(err error, topic string, partition int32) {
-	select {
-	case <-c.closed:
-		return
-	default:
-	}
-
 	if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
 		err = &ConsumerError{
 			Topic:     topic,
@@ -432,13 +426,25 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 		}
 	}
 
-	if c.config.Consumer.Return.Errors {
-		select {
-		case c.errors <- err:
-		default:
-		}
-	} else {
+	if !c.config.Consumer.Return.Errors {
 		Logger.Println(err)
+		return
+	}
+
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	select {
+	case <-c.closed:
+		//consumer is closed
+		return
+	default:
+	}
+
+	select {
+	case c.errors <- err:
+	default:
+		// no error listener
 	}
 }
 

+ 1 - 1
dev.yml

@@ -2,7 +2,7 @@ name: sarama
 
 up:
   - go:
-      version: '1.13.1'
+      version: '1.13.4'
 
 commands:
   test:

+ 0 - 3
functional_producer_test.go

@@ -214,7 +214,6 @@ func validateMetrics(t *testing.T, client Client) {
 	metricValidators.register(minCountMeterValidator("request-rate", 3))
 	metricValidators.register(minCountHistogramValidator("request-size", 3))
 	metricValidators.register(minValHistogramValidator("request-size", 1))
-	metricValidators.register(minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
 	// and at least 2 requests to the registered broker (offset + produces)
 	metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
 	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
@@ -248,7 +247,6 @@ func validateMetrics(t *testing.T, client Client) {
 		// in exactly 2 global responses (metadata + offset)
 		metricValidators.register(countMeterValidator("response-rate", 2))
 		metricValidators.register(minCountHistogramValidator("response-size", 2))
-		metricValidators.register(minValHistogramValidator("response-size", 1))
 		// and exactly 1 offset response for the registered broker
 		metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
 		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
@@ -257,7 +255,6 @@ func validateMetrics(t *testing.T, client Client) {
 		// in at least 3 global responses (metadata + offset + produces)
 		metricValidators.register(minCountMeterValidator("response-rate", 3))
 		metricValidators.register(minCountHistogramValidator("response-size", 3))
-		metricValidators.register(minValHistogramValidator("response-size", 1))
 		// and at least 2 for the registered broker
 		metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
 		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))

+ 1 - 1
go.mod

@@ -13,7 +13,7 @@ require (
 	github.com/golang/snappy v0.0.1 // indirect
 	github.com/hashicorp/go-uuid v1.0.1 // indirect
 	github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect
-	github.com/klauspost/compress v1.8.2
+	github.com/klauspost/compress v1.9.7
 	github.com/pierrec/lz4 v2.2.6+incompatible
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
 	github.com/stretchr/testify v1.3.0

+ 2 - 0
go.sum

@@ -27,6 +27,8 @@ github.com/klauspost/compress v1.8.1 h1:oygt2ychZFHOB6M9gUgajzgKrwRgHbGC77NwA4CO
 github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
 github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
+github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=

+ 68 - 19
mockresponses.go

@@ -731,29 +731,78 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse
 
 func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*DescribeConfigsRequest)
-	res := &DescribeConfigsResponse{}
+	res := &DescribeConfigsResponse{
+		Version: req.Version,
+	}
+
+	includeSynonyms := (req.Version > 0)
 
 	for _, r := range req.Resources {
 		var configEntries []*ConfigEntry
 		switch r.Type {
-		case TopicResource:
+		case BrokerResource:
 			configEntries = append(configEntries,
-				&ConfigEntry{Name: "max.message.bytes",
-					Value:     "1000000",
-					ReadOnly:  false,
-					Default:   true,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "retention.ms",
-					Value:     "5000",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "password",
-					Value:     "12345",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: true,
-				})
+				&ConfigEntry{
+					Name:     "min.insync.replicas",
+					Value:    "2",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		case BrokerLoggerResource:
+			configEntries = append(configEntries,
+				&ConfigEntry{
+					Name:     "kafka.controller.KafkaController",
+					Value:    "DEBUG",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		case TopicResource:
+			maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
+				Value:     "1000000",
+				ReadOnly:  false,
+				Default:   true,
+				Sensitive: false,
+			}
+			if includeSynonyms {
+				maxMessageBytes.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "max.message.bytes",
+						ConfigValue: "500000",
+					},
+				}
+			}
+			retentionMs := &ConfigEntry{Name: "retention.ms",
+				Value:     "5000",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: false,
+			}
+			if includeSynonyms {
+				retentionMs.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "log.retention.ms",
+						ConfigValue: "2500",
+					},
+				}
+			}
+			password := &ConfigEntry{Name: "password",
+				Value:     "12345",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: true,
+			}
+			configEntries = append(
+				configEntries, maxMessageBytes, retentionMs, password)
 			res.Resources = append(res.Resources, &ResourceResponse{
 				Name:    r.Name,
 				Configs: configEntries,
@@ -777,7 +826,7 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
 
 	for _, r := range req.Resources {
 		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
-			Type:     TopicResource,
+			Type:     r.Type,
 			ErrorMsg: "",
 		})
 	}

+ 4 - 4
mocks/consumer.go

@@ -63,13 +63,13 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 	return pc, nil
 }
 
-// Topics returns a list of topics, as registered with SetMetadata
+// Topics returns a list of topics, as registered with SetTopicMetadata
 func (c *Consumer) Topics() ([]string, error) {
 	c.l.Lock()
 	defer c.l.Unlock()
 
 	if c.metadata == nil {
-		c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
+		c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetTopicMetadata.")
 		return nil, sarama.ErrOutOfBrokers
 	}
 
@@ -80,13 +80,13 @@ func (c *Consumer) Topics() ([]string, error) {
 	return result, nil
 }
 
-// Partitions returns the list of parititons for the given topic, as registered with SetMetadata
+// Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata
 func (c *Consumer) Partitions(topic string) ([]int32, error) {
 	c.l.Lock()
 	defer c.l.Unlock()
 
 	if c.metadata == nil {
-		c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
+		c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetTopicMetadata.")
 		return nil, sarama.ErrOutOfBrokers
 	}
 	if c.metadata[topic] == nil {

+ 6 - 1
offset_manager.go

@@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
 		client: client,
 		conf:   conf,
 		group:  group,
-		ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
+		ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
 
 		memberID:   memberID,
@@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() {
 	}
 }
 
+// flushToBroker is ignored if auto-commit offsets is disabled
 func (om *offsetManager) flushToBroker() {
+	if !om.conf.Consumer.Offsets.AutoCommit.Enable {
+		return
+	}
+
 	req := om.constructRequest()
 	if req == nil {
 		return

+ 80 - 5
offset_manager_test.go

@@ -7,15 +7,14 @@ import (
 )
 
 func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
-	backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
+	backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
 
-	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	if backoffFunc != nil {
 		config.Metadata.Retry.BackoffFunc = backoffFunc
 	}
-	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
+	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
 	config.Version = V0_9_0_0
 	if retention > 0 {
 		config.Consumer.Offsets.Retention = retention
@@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
 
 func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
-	return initOffsetManagerWithBackoffFunc(t, retention, nil)
+	return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
 }
 
 func initPartitionOffsetManager(t *testing.T, om OffsetManager,
@@ -97,6 +96,82 @@ func TestNewOffsetManager(t *testing.T) {
 	}
 }
 
+var offsetsautocommitTestTable = []struct {
+	name   string
+	set    bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
+	enable bool
+}{
+	{
+		"AutoCommit (default)",
+		false, // use default
+		true,
+	},
+	{
+		"AutoCommit Enabled",
+		true,
+		true,
+	},
+	{
+		"AutoCommit Disabled",
+		true,
+		false,
+	},
+}
+
+func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
+	// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
+	for _, tt := range offsetsautocommitTestTable {
+		t.Run(tt.name, func(t *testing.T) {
+
+			config := NewConfig()
+			if tt.set {
+				config.Consumer.Offsets.AutoCommit.Enable = tt.enable
+			}
+			om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
+			pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+			// Wait long enough for the test not to fail..
+			timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
+
+			called := make(chan none)
+
+			ocResponse := new(OffsetCommitResponse)
+			ocResponse.AddError("my_topic", 0, ErrNoError)
+			handler := func(req *request) (res encoder) {
+				close(called)
+				return ocResponse
+			}
+			coordinator.setHandler(handler)
+
+			// Should force an offset commit, if auto-commit is enabled.
+			expected := int64(1)
+			pom.ResetOffset(expected, "modified_meta")
+			_, _ = pom.NextOffset()
+
+			select {
+			case <-called:
+				// OffsetManager called on the wire.
+				if !config.Consumer.Offsets.AutoCommit.Enable {
+					t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
+				}
+			case <-time.After(timeout):
+				// Timeout waiting for OffsetManager to call on the wire.
+				if config.Consumer.Offsets.AutoCommit.Enable {
+					t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
+				}
+			}
+
+			broker.Close()
+			coordinator.Close()
+
+			// !! om must be closed before the pom so pom.release() is called before pom.Close()
+			safeClose(t, om)
+			safeClose(t, pom)
+			safeClose(t, testClient)
+		})
+	}
+}
+
 // Test recovery from ErrNotCoordinatorForConsumer
 // on first fetchInitialOffset call
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
@@ -148,7 +223,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 		atomic.AddInt32(&retryCount, 1)
 		return 0
 	}
-	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
+	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())
 
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{

+ 2 - 1
produce_set.go

@@ -44,9 +44,10 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 	}
 
 	timestamp := msg.Timestamp
-	if msg.Timestamp.IsZero() {
+	if timestamp.IsZero() {
 		timestamp = time.Now()
 	}
+	timestamp = timestamp.Truncate(time.Millisecond)
 
 	partitions := ps.msgs[msg.Topic]
 	if partitions == nil {

+ 59 - 2
produce_set_test.go

@@ -255,7 +255,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 	}
 
 	batch := req.records["t1"][0].RecordBatch
-	if batch.FirstTimestamp != now {
+	if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 	}
 	for i := 0; i < 10; i++ {
@@ -334,7 +334,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
 	}
 
 	batch := req.records["t1"][0].RecordBatch
-	if batch.FirstTimestamp != now {
+	if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 	}
 	if batch.ProducerID != pID {
@@ -368,3 +368,60 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
 		}
 	}
 }
+
+func TestProduceSetConsistentTimestamps(t *testing.T) {
+	parent, ps1 := makeProduceSet()
+	ps2 := newProduceSet(parent)
+	parent.conf.Producer.RequiredAcks = WaitForAll
+	parent.conf.Producer.Timeout = 10 * time.Second
+	parent.conf.Version = V0_11_0_0
+
+	msg1 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 500000000),
+		sequenceNumber: 123,
+	}
+	msg2 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 500900000),
+		sequenceNumber: 123,
+	}
+	msg3 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 600000000),
+		sequenceNumber: 123,
+	}
+
+	safeAddMessage(t, ps1, msg1)
+	safeAddMessage(t, ps1, msg3)
+	req1 := ps1.buildRequest()
+	if req1.Version != 3 {
+		t.Error("Wrong request version")
+	}
+	batch1 := req1.records["t1"][0].RecordBatch
+	ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
+	time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)
+
+	safeAddMessage(t, ps2, msg2)
+	safeAddMessage(t, ps2, msg3)
+	req2 := ps2.buildRequest()
+	if req2.Version != 3 {
+		t.Error("Wrong request version")
+	}
+	batch2 := req2.records["t1"][0].RecordBatch
+	ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
+	time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)
+
+	if time1 != time2 {
+		t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
+	}
+}

+ 74 - 4
tools/kafka-producer-performance/main.go

@@ -3,15 +3,19 @@ package main
 import (
 	"context"
 	"crypto/rand"
+	"crypto/x509"
 	"flag"
 	"fmt"
 	"io"
+	"io/ioutil"
+	"log"
 	"os"
 	"strings"
 	gosync "sync"
 	"time"
 
 	"github.com/Shopify/sarama"
+	"github.com/Shopify/sarama/tools/tls"
 	metrics "github.com/rcrowley/go-metrics"
 )
 
@@ -36,6 +40,31 @@ var (
 		"",
 		"REQUIRED: A comma separated list of broker addresses.",
 	)
+	securityProtocol = flag.String(
+		"security-protocol",
+		"PLAINTEXT",
+		"The name of the security protocol to talk to Kafka (PLAINTEXT, SSL) (default: PLAINTEXT).",
+	)
+	tlsRootCACerts = flag.String(
+		"tls-ca-certs",
+		"",
+		"The path to a file that contains a set of root certificate authorities in PEM format "+
+			"to trust when verifying broker certificates when -security-protocol=SSL "+
+			"(leave empty to use the host's root CA set).",
+	)
+	tlsClientCert = flag.String(
+		"tls-client-cert",
+		"",
+		"The path to a file that contains the client certificate to send to the broker "+
+			"in PEM format if client authentication is required when -security-protocol=SSL "+
+			"(leave empty to disable client authentication).",
+	)
+	tlsClientKey = flag.String(
+		"tls-client-key",
+		"",
+		"The path to a file that contains the client private key linked to the client certificate "+
+			"in PEM format when -security-protocol=SSL (REQUIRED if tls-client-cert is provided).",
+	)
 	topic = flag.String(
 		"topic",
 		"",
@@ -126,6 +155,11 @@ var (
 		"0.8.2.0",
 		"The assumed version of Kafka.",
 	)
+	verbose = flag.Bool(
+		"verbose",
+		false,
+		"Turn on sarama logging to stderr",
+	)
 )
 
 func parseCompression(scheme string) sarama.CompressionCodec {
@@ -205,6 +239,12 @@ func main() {
 	if *routines < 1 || *routines > *messageLoad {
 		printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
 	}
+	if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
+		printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
+	}
+	if *verbose {
+		sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
+	}
 
 	config := sarama.NewConfig()
 
@@ -222,6 +262,30 @@ func main() {
 	config.ChannelBufferSize = *channelBufferSize
 	config.Version = parseVersion(*version)
 
+	if *securityProtocol == "SSL" {
+		tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
+		if err != nil {
+			printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
+				*tlsClientCert, *tlsClientKey, err)
+		}
+
+		if *tlsRootCACerts != "" {
+			rootCAsBytes, err := ioutil.ReadFile(*tlsRootCACerts)
+			if err != nil {
+				printErrorAndExit(69, "failed to read root CA certificates: %v", err)
+			}
+			certPool := x509.NewCertPool()
+			if !certPool.AppendCertsFromPEM(rootCAsBytes) {
+				printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
+			}
+			// Use specific root CA set vs the host's set
+			tlsConfig.RootCAs = certPool
+		}
+
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = tlsConfig
+	}
+
 	if err := config.Validate(); err != nil {
 		printErrorAndExit(69, "Invalid configuration: %s", err)
 	}
@@ -363,18 +427,24 @@ func runSyncProducer(topic string, partition, messageLoad, messageSize, routines
 }
 
 func printMetrics(w io.Writer, r metrics.Registry) {
-	if r.Get("record-send-rate") == nil || r.Get("request-latency-in-ms") == nil {
+	recordSendRateMetric := r.Get("record-send-rate")
+	requestLatencyMetric := r.Get("request-latency-in-ms")
+	outgoingByteRateMetric := r.Get("outgoing-byte-rate")
+
+	if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil {
 		return
 	}
-	recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
-	requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
+	recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
+	requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
 	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
-	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
+	outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
+	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
 		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
 		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
 		recordSendRate.Count(),
 		recordSendRate.RateMean(),
 		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
+		outgoingByteRate.RateMean()/1024/1024,
 		requestLatency.Mean(),
 		requestLatency.StdDev(),
 		requestLatencyPercentiles[0],

+ 3 - 1
utils.go

@@ -161,6 +161,7 @@ var (
 	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
 	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
 	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
+	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
 
 	SupportedVersions = []KafkaVersion{
 		V0_8_2_0,
@@ -185,9 +186,10 @@ var (
 		V2_1_0_0,
 		V2_2_0_0,
 		V2_3_0_0,
+		V2_4_0_0,
 	}
 	MinVersion = V0_8_2_0
-	MaxVersion = V2_3_0_0
+	MaxVersion = V2_4_0_0
 )
 
 //ParseKafkaVersion parses and returns kafka version or error from a string

+ 11 - 1
vagrant/halt_cluster.sh

@@ -1,4 +1,14 @@
-#!/bin/sh
+#!/bin/bash
+
+# If the functional tests failed (or some other task) then
+# we might want to look into the broker logs
+if [ "$TRAVIS_TEST_RESULT" = "1" ]; then
+    echo "Dumping Kafka brokers server.log:"
+    for i in 1 2 3 4 5; do
+        KAFKA_PORT=`expr $i + 9090`
+        sed -e "s/^/kafka-${KAFKA_PORT} /" ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/logs/server.log{.*,}
+    done
+fi
 
 set -ex