Browse Source

Merge remote-tracking branch 'upstream/master' into v2

Varun Kumar 6 years ago
parent
commit
8fa09a5d48
21 changed files with 535 additions and 113 deletions
  1. 2 1
      .travis.yml
  2. 36 0
      CHANGELOG.md
  3. 1 1
      README.md
  4. 34 2
      admin.go
  5. 31 1
      admin_test.go
  6. 43 28
      broker.go
  7. 62 47
      broker_test.go
  8. 4 0
      config.go
  9. 13 0
      consumer.go
  10. 47 0
      consumer_test.go
  11. 17 0
      crc32_field.go
  12. 15 0
      errors.go
  13. 27 14
      examples/consumergroup/main.go
  14. 65 13
      examples/sasl_scram_client/main.go
  15. 70 0
      fetch_response_test.go
  16. 18 1
      length_field.go
  17. 4 1
      message.go
  18. 4 1
      message_set.go
  19. 35 1
      mockresponses.go
  20. 4 1
      record_batch.go
  21. 3 1
      utils.go

+ 2 - 1
.travis.yml

@@ -13,7 +13,8 @@ env:
   - DEBUG=true
   matrix:
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
-  - KAFKA_VERSION=2.2.0 KAFKA_SCALA_VERSION=2.12
+  - KAFKA_VERSION=2.2.1 KAFKA_SCALA_VERSION=2.12
+  - KAFKA_VERSION=2.3.0 KAFKA_SCALA_VERSION=2.12
 
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 36 - 0
CHANGELOG.md

@@ -1,5 +1,41 @@
 # Changelog
 
+#### Version 1.23.0 (2019-07-02)
+
+New Features:
+- Add support for Kafka 2.3.0
+  ([1418](https://github.com/Shopify/sarama/pull/1418)).
+- Add support for ListConsumerGroupOffsets v2
+  ([1374](https://github.com/Shopify/sarama/pull/1374)).
+- Add support for DeleteConsumerGroup
+  ([1417](https://github.com/Shopify/sarama/pull/1417)).
+- Add support for SASLVersion configuration
+  ([1410](https://github.com/Shopify/sarama/pull/1410)).
+- Add kerberos support
+  ([1366](https://github.com/Shopify/sarama/pull/1366)).
+
+Improvements:
+- Improve sasl_scram_client example
+  ([1406](https://github.com/Shopify/sarama/pull/1406)).
+- Fix shutdown and race-condition in consumer-group example
+  ([1404](https://github.com/Shopify/sarama/pull/1404)).
+- Add support for error codes 77—81
+  ([1397](https://github.com/Shopify/sarama/pull/1397)).
+- Pool internal objects allocated per message
+  ([1385](https://github.com/Shopify/sarama/pull/1385)).
+- Reduce packet decoder allocations
+  ([1373](https://github.com/Shopify/sarama/pull/1373)).
+- Support timeout when fetching metadata
+  ([1359](https://github.com/Shopify/sarama/pull/1359)).
+
+Bug Fixes:
+- Fix fetch size integer overflow
+  ([1376](https://github.com/Shopify/sarama/pull/1376)).
+- Handle and log throttled FetchResponses
+  ([1383](https://github.com/Shopify/sarama/pull/1383)).
+- Refactor misspelled word Resouce to Resource
+  ([1368](https://github.com/Shopify/sarama/pull/1368)).
+
 #### Version 1.22.1 (2019-04-29)
 
 Improvements:

+ 1 - 1
README.md

@@ -21,7 +21,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
 Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
 the two latest stable releases of Kafka and Go, and we provide a two month
 grace period for older releases. This means we currently officially support
-Go 1.10 through 1.12, and Kafka 2.0 through 2.2, although older releases are
+Go 1.11 through 1.12, and Kafka 2.0 through 2.3, although older releases are
 still likely to work.
 
 Sarama follows semantic versioning and provides API stability via the gopkg.in service.

+ 34 - 2
admin.go

@@ -84,7 +84,10 @@ type ClusterAdmin interface {
 	// List the consumer group offsets available in the cluster.
 	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
 
-	// Get information about the nodes in the cluster.
+	// Delete a consumer group.
+	DeleteConsumerGroup(group string) error
+
+	// Get information about the nodes in the cluster
 	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
 
 	// Close shuts down the admin and closes underlying client.
@@ -608,9 +611,38 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
 		partitions:    topicPartitions,
 	}
 
-	if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+	if ca.conf.Version.IsAtLeast(V0_10_2_0) {
+		request.Version = 2
+	} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
 		request.Version = 1
 	}
 
 	return coordinator.FetchOffset(request)
 }
+
+func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
+	coordinator, err := ca.client.Coordinator(group)
+	if err != nil {
+		return err
+	}
+
+	request := &DeleteGroupsRequest{
+		Groups: []string{group},
+	}
+
+	resp, err := coordinator.DeleteGroups(request)
+	if err != nil {
+		return err
+	}
+
+	groupErr, ok := resp.GroupErrorCodes[group]
+	if !ok {
+		return ErrIncompleteResponse
+	}
+
+	if groupErr != ErrNoError {
+		return groupErr
+	}
+
+	return nil
+}

+ 31 - 1
admin_test.go

@@ -822,7 +822,7 @@ func TestListConsumerGroupOffsets(t *testing.T) {
 	expectedOffset := int64(0)
 
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
-		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
+		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetController(seedBroker.BrokerID()).
 			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
@@ -859,3 +859,33 @@ func TestListConsumerGroupOffsets(t *testing.T) {
 	}
 
 }
+
+func TestDeleteConsumerGroup(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	group := "my-group"
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		// "OffsetFetchRequest":  NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
+		"DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
+	})
+
+	config := NewConfig()
+	config.Version = V1_1_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.DeleteConsumerGroup(group)
+	if err != nil {
+		t.Fatalf("DeleteConsumerGroup failed with error %v", err)
+	}
+
+}

+ 43 - 28
broker.go

@@ -4,7 +4,6 @@ import (
 	"crypto/tls"
 	"encoding/binary"
 	"fmt"
-	metrics "github.com/rcrowley/go-metrics"
 	"io"
 	"net"
 	"sort"
@@ -13,6 +12,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	metrics "github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -944,19 +945,16 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	// default to V0 to allow for backward compatability when SASL is enabled
 	// but not the handshake
-	saslHandshake := SASLHandshakeV0
 	if b.conf.Net.SASL.Handshake {
-		if b.conf.Version.IsAtLeast(V1_0_0_0) {
-			saslHandshake = SASLHandshakeV1
-		}
-		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, saslHandshake)
+
+		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
 			return handshakeErr
 		}
 	}
 
-	if saslHandshake == SASLHandshakeV1 {
+	if b.conf.Net.SASL.Version == SASLHandshakeV1 {
 		return b.sendAndReceiveV1SASLPlainAuth()
 	}
 	return b.sendAndReceiveV0SASLPlainAuth()
@@ -1015,7 +1013,7 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
 
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
+	bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
 	b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
 
 	// With v1 sasl we get an error message set in the response we can return
@@ -1039,26 +1037,53 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
 		return err
 	}
 
+	message, err := buildClientFirstMessage(token)
+	if err != nil {
+		return err
+	}
+
+	challenged, err := b.sendClientMessage(message)
+	if err != nil {
+		return err
+	}
+
+	if challenged {
+		// Abort the token exchange. The broker returns the failure code.
+		_, err = b.sendClientMessage([]byte(`\x01`))
+	}
+
+	return err
+}
+
+// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
+// if the broker responds with a challenge, in which case the token is
+// rejected.
+func (b *Broker) sendClientMessage(message []byte) (bool, error) {
+
 	requestTime := time.Now()
 	correlationID := b.correlationID
 
-	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
 	if err != nil {
-		return err
+		return false, err
 	}
 
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
-	bytesRead, err := b.receiveSASLServerResponse(correlationID)
-	if err != nil {
-		return err
-	}
+	res := &SaslAuthenticateResponse{}
+	bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
 
 	requestLatency := time.Since(requestTime)
 	b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
 
-	return nil
+	isChallenge := len(res.SaslAuthBytes) > 0
+
+	if isChallenge && err != nil {
+		Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
+	}
+
+	return isChallenge, err
 }
 
 func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
@@ -1156,7 +1181,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // https://tools.ietf.org/html/rfc7628
-func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
 	var ext string
 
 	if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1202,11 +1227,7 @@ func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, erro
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-	initialResp, err := buildClientInitialResponse(token)
-	if err != nil {
-		return 0, err
-	}
+func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
 
 	rb := &SaslAuthenticateRequest{initialResp}
 
@@ -1224,7 +1245,7 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
+func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
 
 	buf := make([]byte, responseLengthSize+correlationIDSize)
 
@@ -1252,8 +1273,6 @@ func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
 		return bytesRead, err
 	}
 
-	res := &SaslAuthenticateResponse{}
-
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return bytesRead, err
 	}
@@ -1262,10 +1281,6 @@ func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
 		return bytesRead, res.Err
 	}
 
-	if len(res.SaslAuthBytes) > 0 {
-		Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
-	}
-
 	return bytesRead, nil
 }
 

+ 62 - 47
broker_test.go

@@ -3,13 +3,13 @@ package sarama
 import (
 	"errors"
 	"fmt"
-	"gopkg.in/jcmturner/gokrb5.v7/krberror"
 	"net"
 	"reflect"
 	"testing"
 	"time"
 
 	"github.com/rcrowley/go-metrics"
+	"gopkg.in/jcmturner/gokrb5.v7/krberror"
 )
 
 func ExampleBroker() {
@@ -132,42 +132,66 @@ func newTokenProvider(token *AccessToken, err error) *TokenProvider {
 func TestSASLOAuthBearer(t *testing.T) {
 
 	testTable := []struct {
-		name             string
-		mockAuthErr      KError // Mock and expect error returned from SaslAuthenticateRequest
-		mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
-		expectClientErr  bool   // Expect an internal client-side error
-		tokProvider      *TokenProvider
+		name                      string
+		mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
+		mockSASLAuthResponse      MockResponse // Mock SaslAuthenticateRequest response from broker
+		expectClientErr           bool         // Expect an internal client-side error
+		expectedBrokerError       KError       // Expected Kafka error returned by client
+		tokProvider               *TokenProvider
 	}{
 		{
-			name:             "SASL/OAUTHBEARER OK server response",
-			mockAuthErr:      ErrNoError,
-			mockHandshakeErr: ErrNoError,
-			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+			name: "SASL/OAUTHBEARER OK server response",
+			mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
+				SetEnabledMechanisms([]string{SASLTypeOAuth}),
+			mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
+			expectClientErr:      false,
+			expectedBrokerError:  ErrNoError,
+			tokProvider:          newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
 		},
 		{
-			name:             "SASL/OAUTHBEARER authentication failure response",
-			mockAuthErr:      ErrSASLAuthenticationFailed,
-			mockHandshakeErr: ErrNoError,
-			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+			name: "SASL/OAUTHBEARER authentication failure response",
+			mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
+				SetEnabledMechanisms([]string{SASLTypeOAuth}),
+			mockSASLAuthResponse: NewMockSequence(
+				// First, the broker response with a challenge
+				NewMockSaslAuthenticateResponse(t).
+					SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
+				// Next, the client terminates the token exchange. Finally, the
+				// broker responds with an error message.
+				NewMockSaslAuthenticateResponse(t).
+					SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
+					SetError(ErrSASLAuthenticationFailed),
+			),
+			expectClientErr:     true,
+			expectedBrokerError: ErrSASLAuthenticationFailed,
+			tokProvider:         newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
 		},
 		{
-			name:             "SASL/OAUTHBEARER handshake failure response",
-			mockAuthErr:      ErrNoError,
-			mockHandshakeErr: ErrSASLAuthenticationFailed,
-			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
+			name: "SASL/OAUTHBEARER handshake failure response",
+			mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
+				SetEnabledMechanisms([]string{SASLTypeOAuth}).
+				SetError(ErrSASLAuthenticationFailed),
+			mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
+			expectClientErr:      true,
+			expectedBrokerError:  ErrSASLAuthenticationFailed,
+			tokProvider:          newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
 		},
 		{
-			name:             "SASL/OAUTHBEARER token generation error",
-			mockAuthErr:      ErrNoError,
-			mockHandshakeErr: ErrNoError,
-			expectClientErr:  true,
-			tokProvider:      newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
+			name: "SASL/OAUTHBEARER token generation error",
+			mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
+				SetEnabledMechanisms([]string{SASLTypeOAuth}),
+			mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
+			expectClientErr:      true,
+			expectedBrokerError:  ErrNoError,
+			tokProvider:          newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
 		},
 		{
-			name:             "SASL/OAUTHBEARER invalid extension",
-			mockAuthErr:      ErrNoError,
-			mockHandshakeErr: ErrNoError,
-			expectClientErr:  true,
+			name: "SASL/OAUTHBEARER invalid extension",
+			mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
+				SetEnabledMechanisms([]string{SASLTypeOAuth}),
+			mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
+			expectClientErr:      true,
+			expectedBrokerError:  ErrNoError,
 			tokProvider: newTokenProvider(&AccessToken{
 				Token:      "access-token-123",
 				Extensions: map[string]string{"auth": "auth-value"},
@@ -180,19 +204,9 @@ func TestSASLOAuthBearer(t *testing.T) {
 		// mockBroker mocks underlying network logic and broker responses
 		mockBroker := NewMockBroker(t, 0)
 
-		mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte("response_payload"))
-		if test.mockAuthErr != ErrNoError {
-			mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
-		}
-
-		mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeOAuth})
-		if test.mockHandshakeErr != ErrNoError {
-			mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
-		}
-
 		mockBroker.SetHandlerByMap(map[string]MockResponse{
-			"SaslAuthenticateRequest": mockSASLAuthResponse,
-			"SaslHandshakeRequest":    mockSASLHandshakeResponse,
+			"SaslAuthenticateRequest": test.mockSASLAuthResponse,
+			"SaslHandshakeRequest":    test.mockSASLHandshakeResponse,
 		})
 
 		// broker executes SASL requests against mockBroker
@@ -227,13 +241,13 @@ func TestSASLOAuthBearer(t *testing.T) {
 
 		err = broker.authenticateViaSASL()
 
-		if test.mockAuthErr != ErrNoError {
-			if test.mockAuthErr != err {
-				t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
+		if test.expectedBrokerError != ErrNoError {
+			if test.expectedBrokerError != err {
+				t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
 			}
-		} else if test.mockHandshakeErr != ErrNoError {
-			if test.mockHandshakeErr != err {
-				t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
+		} else if test.expectedBrokerError != ErrNoError {
+			if test.expectedBrokerError != err {
+				t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.expectedBrokerError, err)
 			}
 		} else if test.expectClientErr && err == nil {
 			t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
@@ -441,6 +455,7 @@ func TestSASLPlainAuth(t *testing.T) {
 		conf.Net.SASL.Mechanism = SASLTypePlaintext
 		conf.Net.SASL.User = "token"
 		conf.Net.SASL.Password = "password"
+		conf.Net.SASL.Version = SASLHandshakeV1
 
 		broker.conf = conf
 		broker.conf.Version = V1_0_0_0
@@ -598,7 +613,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
 
 }
 
-func TestBuildClientInitialResponse(t *testing.T) {
+func TestBuildClientFirstMessage(t *testing.T) {
 
 	testTable := []struct {
 		name        string
@@ -637,7 +652,7 @@ func TestBuildClientInitialResponse(t *testing.T) {
 
 	for i, test := range testTable {
 
-		actual, err := buildClientInitialResponse(test.token)
+		actual, err := buildClientFirstMessage(test.token)
 
 		if !reflect.DeepEqual(test.expected, actual) {
 			t.Errorf("Expected %s, got %s\n", test.expected, actual)

+ 4 - 0
config.go

@@ -58,6 +58,9 @@ type Config struct {
 			// SASLMechanism is the name of the enabled SASL mechanism.
 			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
 			Mechanism SASLMechanism
+			// Version is the SASL Protocol Version to use
+			// Kafka > 1.x should use V1, except on Azure EventHub which use V0
+			Version int16
 			// Whether or not to send the Kafka SASL handshake first if enabled
 			// (defaults to true). You should only set this to false if you're using
 			// a non-Kafka SASL proxy.
@@ -398,6 +401,7 @@ func NewConfig() *Config {
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
 	c.Net.SASL.Handshake = true
+	c.Net.SASL.Version = SASLHandshakeV0
 
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond

+ 13 - 0
consumer.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"errors"
 	"fmt"
+	"math"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -561,6 +562,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
 	}
 
+	// If request was throttled and empty we log and return without error
+	if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
+		Logger.Printf(
+			"consumer/broker/%d FetchResponse throttled %v\n",
+			child.broker.broker.ID(), response.ThrottleTime)
+		return nil, nil
+	}
+
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
 		return nil, ErrIncompleteResponse
@@ -591,6 +600,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 				child.fetchSize *= 2
+				// check int32 overflow
+				if child.fetchSize < 0 {
+					child.fetchSize = math.MaxInt32
+				}
 				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
 					child.fetchSize = child.conf.Consumer.Fetch.Max
 				}

+ 47 - 0
consumer_test.go

@@ -4,6 +4,7 @@ import (
 	"log"
 	"os"
 	"os/signal"
+	"reflect"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -1240,3 +1241,49 @@ ConsumerLoop:
 
 	log.Printf("Consumed: %d\n", consumed)
 }
+
+func Test_partitionConsumer_parseResponse(t *testing.T) {
+	type args struct {
+		response *FetchResponse
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    []*ConsumerMessage
+		wantErr bool
+	}{
+		{
+			name: "empty but throttled FetchResponse is not considered an error",
+			args: args{
+				response: &FetchResponse{
+					ThrottleTime: time.Millisecond,
+				},
+			},
+		},
+		{
+			name: "empty FetchResponse is considered an incomplete response by default",
+			args: args{
+				response: &FetchResponse{},
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			child := &partitionConsumer{
+				broker: &brokerConsumer{
+					broker: &Broker{},
+				},
+				conf: &Config{},
+			}
+			got, err := child.parseResponse(tt.args.response)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 17 - 0
crc32_field.go

@@ -4,6 +4,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"hash/crc32"
+	"sync"
 )
 
 type crcPolynomial int8
@@ -13,6 +14,22 @@ const (
 	crcCastagnoli
 )
 
+var crc32FieldPool = sync.Pool{}
+
+func acquireCrc32Field(polynomial crcPolynomial) *crc32Field {
+	val := crc32FieldPool.Get()
+	if val != nil {
+		c := val.(*crc32Field)
+		c.polynomial = polynomial
+		return c
+	}
+	return newCRC32Field(polynomial)
+}
+
+func releaseCrc32Field(c *crc32Field) {
+	crc32FieldPool.Put(c)
+}
+
 var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
 
 // crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.

+ 15 - 0
errors.go

@@ -161,6 +161,11 @@ const (
 	ErrFencedLeaderEpoch                  KError = 74
 	ErrUnknownLeaderEpoch                 KError = 75
 	ErrUnsupportedCompressionType         KError = 76
+	ErrStaleBrokerEpoch                   KError = 77
+	ErrOffsetNotAvailable                 KError = 78
+	ErrMemberIdRequired                   KError = 79
+	ErrPreferredLeaderNotAvailable        KError = 80
+	ErrGroupMaxSizeReached                KError = 81
 )
 
 func (err KError) Error() string {
@@ -323,6 +328,16 @@ func (err KError) Error() string {
 		return "kafka server: The leader epoch in the request is newer than the epoch on the broker."
 	case ErrUnsupportedCompressionType:
 		return "kafka server: The requesting client does not support the compression type of given partition."
+	case ErrStaleBrokerEpoch:
+		return "kafka server: Broker epoch has changed"
+	case ErrOffsetNotAvailable:
+		return "kafka server: The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing"
+	case ErrMemberIdRequired:
+		return "kafka server: The group member needs to have a valid member id before actually entering a consumer group"
+	case ErrPreferredLeaderNotAvailable:
+		return "kafka server: The preferred leader was not available"
+	case ErrGroupMaxSizeReached:
+		return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

+ 27 - 14
examples/consumergroup/main.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 
 	"github.com/Shopify/sarama"
@@ -53,7 +54,7 @@ func main() {
 
 	version, err := sarama.ParseKafkaVersion(version)
 	if err != nil {
-		panic(err)
+		log.Panicf("Error parsing Kafka version: %v", err)
 	}
 
 	/**
@@ -70,21 +71,29 @@ func main() {
 	/**
 	 * Setup a new Sarama consumer group
 	 */
-	consumer := Consumer{}
+	consumer := Consumer{
+		ready: make(chan bool, 0),
+	}
 
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
 	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
 	if err != nil {
-		panic(err)
+		log.Panicf("Error creating consumer group client: %v", err)
 	}
 
+	wg := &sync.WaitGroup{}
 	go func() {
+		wg.Add(1)
+		defer wg.Done()
 		for {
-			consumer.ready = make(chan bool, 0)
-			err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
-			if err != nil {
-				panic(err)
+			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
+				log.Panicf("Error from consumer: %v", err)
 			}
+			// check if context was cancelled, signaling that the consumer should stop
+			if ctx.Err() != nil {
+				return
+			}
+			consumer.ready = make(chan bool, 0)
 		}
 	}()
 
@@ -93,12 +102,16 @@ func main() {
 
 	sigterm := make(chan os.Signal, 1)
 	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
-
-	<-sigterm // Await a sigterm signal before safely closing the consumer
-
-	err = client.Close()
-	if err != nil {
-		panic(err)
+	select {
+	case <-ctx.Done():
+		log.Println("terminating: context cancelled")
+	case <-sigterm:
+		log.Println("terminating: via signal")
+	}
+	cancel()
+	wg.Wait()
+	if err = client.Close(); err != nil {
+		log.Panicf("Error closing client: %v", err)
 	}
 }
 

+ 65 - 13
examples/sasl_scram_client/main.go

@@ -7,6 +7,7 @@ import (
 	"io/ioutil"
 	"log"
 	"os"
+	"os/signal"
 	"strings"
 
 	"github.com/Shopify/sarama"
@@ -27,6 +28,8 @@ var (
 	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
 	verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
 	useTLS    = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
+	mode      = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
+	logMsg    = flag.Bool("logmsg", false, "True to log consumed messages to console")
 
 	logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
 )
@@ -62,8 +65,9 @@ func main() {
 	flag.Parse()
 
 	if *brokers == "" {
-		log.Fatalln("at least one brocker is required")
+		log.Fatalln("at least one broker is required")
 	}
+	splitBrokers := strings.Split(*brokers, ",")
 
 	if *userName == "" {
 		log.Fatalln("SASL username is required")
@@ -101,18 +105,66 @@ func main() {
 		conf.Net.TLS.Config = createTLSConfiguration()
 	}
 
-	syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
-	if err != nil {
-		logger.Fatalln("failed to create producer: ", err)
-	}
-	partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
-		Topic: *topic,
-		Value: sarama.StringEncoder("test_message"),
-	})
-	if err != nil {
-		logger.Fatalln("failed to send message to ", *topic, err)
+	if *mode == "consume" {
+		consumer, err := sarama.NewConsumer(splitBrokers, conf)
+		if err != nil {
+			panic(err)
+		}
+		log.Println("consumer created")
+		defer func() {
+			if err := consumer.Close(); err != nil {
+				log.Fatalln(err)
+			}
+		}()
+		log.Println("commence consuming")
+		partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+		if err != nil {
+			panic(err)
+		}
+
+		defer func() {
+			if err := partitionConsumer.Close(); err != nil {
+				log.Fatalln(err)
+			}
+		}()
+
+		// Trap SIGINT to trigger a shutdown.
+		signals := make(chan os.Signal, 1)
+		signal.Notify(signals, os.Interrupt)
+
+		consumed := 0
+	ConsumerLoop:
+		for {
+			log.Println("in the for")
+			select {
+			case msg := <-partitionConsumer.Messages():
+				log.Printf("Consumed message offset %d\n", msg.Offset)
+				if *logMsg {
+					log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
+				}
+				consumed++
+			case <-signals:
+				break ConsumerLoop
+			}
+		}
+
+		log.Printf("Consumed: %d\n", consumed)
+
+	} else {
+		syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
+		if err != nil {
+			logger.Fatalln("failed to create producer: ", err)
+		}
+		partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
+			Topic: *topic,
+			Value: sarama.StringEncoder("test_message"),
+		})
+		if err != nil {
+			logger.Fatalln("failed to send message to ", *topic, err)
+		}
+		logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
+		_ = syncProducer.Close()
 	}
-	logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
-	_ = syncProcuder.Close()
 	logger.Println("Bye now !")
+
 }

+ 70 - 0
fetch_response_test.go

@@ -86,6 +86,37 @@ var (
 		0x06, 0x08, 0x09, 0x0A,
 		0x04, 0x0B, 0x0C}
 
+	partialFetchResponse = []byte{
+		0x00, 0x00, 0x00, 0x00, // ThrottleTime
+		0x00, 0x00, 0x00, 0x01, // Number of Topics
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
+		0x00, 0x00, 0x00, 0x01, // Number of Partitions
+		0x00, 0x00, 0x00, 0x05, // Partition
+		0x00, 0x00, // Error
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
+		0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
+		0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
+		0x00, 0x00, 0x00, 0x40, // Records length
+
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x46,
+		0x00, 0x00, 0x00, 0x00,
+		0x02,
+		0xDB, 0x47, 0x14, 0xC9,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		// record
+		0x28,
+		0x00,
+		0x00,
+	}
+
 	oneMessageFetchResponseV4 = []byte{
 		0x00, 0x00, 0x00, 0x00, // ThrottleTime
 		0x00, 0x00, 0x00, 0x01, // Number of Topics
@@ -277,6 +308,45 @@ func TestOneRecordFetchResponse(t *testing.T) {
 	}
 }
 
+func TestPartailFetchResponse(t *testing.T) {
+	response := FetchResponse{}
+	testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4)
+
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != ErrNoError {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	partial, err := block.isPartial()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if !partial {
+		t.Error("Decoding not a partial trailing record")
+	}
+
+	n, err := block.numRecords()
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	if n != 0 {
+		t.Fatal("Decoding produced incorrect number of records.")
+	}
+}
+
 func TestOneMessageFetchResponseV4(t *testing.T) {
 	response := FetchResponse{}
 	testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)

+ 18 - 1
length_field.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "encoding/binary"
+import (
+	"encoding/binary"
+	"sync"
+)
 
 // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
 type lengthField struct {
@@ -8,6 +11,20 @@ type lengthField struct {
 	length      int32
 }
 
+var lengthFieldPool = sync.Pool{}
+
+func acquireLengthField() *lengthField {
+	val := lengthFieldPool.Get()
+	if val != nil {
+		return val.(*lengthField)
+	}
+	return &lengthField{}
+}
+
+func releaseLengthField(m *lengthField) {
+	lengthFieldPool.Put(m)
+}
+
 func (l *lengthField) decode(pd packetDecoder) error {
 	var err error
 	l.length, err = pd.getInt32()

+ 4 - 1
message.go

@@ -103,7 +103,10 @@ func (m *Message) encode(pe packetEncoder) error {
 }
 
 func (m *Message) decode(pd packetDecoder) (err error) {
-	err = pd.push(newCRC32Field(crcIEEE))
+	crc32Decoder := acquireCrc32Field(crcIEEE)
+	defer releaseCrc32Field(crc32Decoder)
+
+	err = pd.push(crc32Decoder)
 	if err != nil {
 		return err
 	}

+ 4 - 1
message_set.go

@@ -29,7 +29,10 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	if err = pd.push(&lengthField{}); err != nil {
+	lengthDecoder := acquireLengthField()
+	defer releaseLengthField(lengthDecoder)
+
+	if err = pd.push(lengthDecoder); err != nil {
 		return err
 	}
 

+ 35 - 1
mockresponses.go

@@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
 // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
 type MockOffsetFetchResponse struct {
 	offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
+	error   KError
 	t       TestReporter
 }
 
@@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 	return mr
 }
 
+func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
+	mr.error = kerror
+	return mr
+}
+
 func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*OffsetFetchRequest)
 	group := req.ConsumerGroup
-	res := &OffsetFetchResponse{}
+	res := &OffsetFetchResponse{Version: req.Version}
+
 	for topic, partitions := range mr.offsets[group] {
 		for partition, block := range partitions {
 			res.AddBlock(topic, partition, block)
 		}
 	}
+
+	if res.Version >= 2 {
+		res.Err = mr.error
+	}
 	return res
 }
 
@@ -885,3 +896,26 @@ func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
 	}
 	return res
 }
+
+type MockDeleteGroupsResponse struct {
+	deletedGroups []string
+}
+
+func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
+	return &MockDeleteGroupsResponse{}
+}
+
+func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
+	m.deletedGroups = groups
+	return m
+}
+
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+	resp := &DeleteGroupsResponse{
+		GroupErrorCodes: map[string]KError{},
+	}
+	for _, group := range m.deletedGroups {
+		resp.GroupErrorCodes[group] = ErrNoError
+	}
+	return resp
+}

+ 4 - 1
record_batch.go

@@ -116,7 +116,10 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
+	crc32Decoder := acquireCrc32Field(crcCastagnoli)
+	defer releaseCrc32Field(crc32Decoder)
+
+	if err = pd.push(crc32Decoder); err != nil {
 		return err
 	}
 

+ 3 - 1
utils.go

@@ -160,6 +160,7 @@ var (
 	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
 	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
 	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
+	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
 
 	SupportedVersions = []KafkaVersion{
 		V0_8_2_0,
@@ -183,9 +184,10 @@ var (
 		V2_0_1_0,
 		V2_1_0_0,
 		V2_2_0_0,
+		V2_3_0_0,
 	}
 	MinVersion = V0_8_2_0
-	MaxVersion = V2_2_0_0
+	MaxVersion = V2_3_0_0
 )
 
 //ParseKafkaVersion parses and returns kafka version or error from a string