Browse Source

Merge branch 'master' into read-committed

francois 6 years ago
parent
commit
98b2c4a922

+ 1 - 1
.github/CONTRIBUTING.md

@@ -24,7 +24,7 @@ We will gladly accept bug fixes, or additions to this library. Please fork this
 - If you plan to work on something major, please open an issue to discuss the design first.
 - If you plan to work on something major, please open an issue to discuss the design first.
 - Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
 - Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
 - Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
 - Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
-- Run [go vet](https://godoc.org/golang.org/x/tools/cmd/vet) to detect any suspicious constructs in your code that could be bugs.
+- Run [go vet](https://golang.org/cmd/vet/) to detect any suspicious constructs in your code that could be bugs.
 - Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
 - Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
 - You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
 - You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
 - Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.
 - Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.

+ 1 - 2
.travis.yml

@@ -1,6 +1,5 @@
 language: go
 language: go
 go:
 go:
-- 1.10.x
 - 1.11.x
 - 1.11.x
 - 1.12.x
 - 1.12.x
 
 
@@ -12,8 +11,8 @@ env:
   - KAFKA_HOSTNAME=localhost
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   - DEBUG=true
   matrix:
   matrix:
-  - KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
+  - KAFKA_VERSION=2.2.0 KAFKA_SCALA_VERSION=2.12
 
 
 before_install:
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 1 - 1
Makefile

@@ -29,4 +29,4 @@ install_errcheck:
 	go get github.com/kisielk/errcheck
 	go get github.com/kisielk/errcheck
 
 
 get:
 get:
-	go get -t
+	go get -t -v ./...

+ 32 - 31
acl_types.go

@@ -1,50 +1,51 @@
 package sarama
 package sarama
 
 
-type AclOperation int
+type (
+	AclOperation int
+
+	AclPermissionType int
+
+	AclResourceType int
+
+	AclResourcePatternType int
+)
 
 
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
 const (
 const (
-	AclOperationUnknown         AclOperation = 0
-	AclOperationAny             AclOperation = 1
-	AclOperationAll             AclOperation = 2
-	AclOperationRead            AclOperation = 3
-	AclOperationWrite           AclOperation = 4
-	AclOperationCreate          AclOperation = 5
-	AclOperationDelete          AclOperation = 6
-	AclOperationAlter           AclOperation = 7
-	AclOperationDescribe        AclOperation = 8
-	AclOperationClusterAction   AclOperation = 9
-	AclOperationDescribeConfigs AclOperation = 10
-	AclOperationAlterConfigs    AclOperation = 11
-	AclOperationIdempotentWrite AclOperation = 12
+	AclOperationUnknown AclOperation = iota
+	AclOperationAny
+	AclOperationAll
+	AclOperationRead
+	AclOperationWrite
+	AclOperationCreate
+	AclOperationDelete
+	AclOperationAlter
+	AclOperationDescribe
+	AclOperationClusterAction
+	AclOperationDescribeConfigs
+	AclOperationAlterConfigs
+	AclOperationIdempotentWrite
 )
 )
 
 
-type AclPermissionType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
 const (
 const (
-	AclPermissionUnknown AclPermissionType = 0
-	AclPermissionAny     AclPermissionType = 1
-	AclPermissionDeny    AclPermissionType = 2
-	AclPermissionAllow   AclPermissionType = 3
+	AclPermissionUnknown AclPermissionType = iota
+	AclPermissionAny
+	AclPermissionDeny
+	AclPermissionAllow
 )
 )
 
 
-type AclResourceType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
 const (
 const (
-	AclResourceUnknown         AclResourceType = 0
-	AclResourceAny             AclResourceType = 1
-	AclResourceTopic           AclResourceType = 2
-	AclResourceGroup           AclResourceType = 3
-	AclResourceCluster         AclResourceType = 4
-	AclResourceTransactionalID AclResourceType = 5
+	AclResourceUnknown AclResourceType = iota
+	AclResourceAny
+	AclResourceTopic
+	AclResourceGroup
+	AclResourceCluster
+	AclResourceTransactionalID
 )
 )
 
 
-type AclResourcePatternType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
-
 const (
 const (
 	AclPatternUnknown AclResourcePatternType = iota
 	AclPatternUnknown AclResourcePatternType = iota
 	AclPatternAny
 	AclPatternAny

+ 2 - 2
balance_strategy.go

@@ -24,7 +24,7 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
 // --------------------------------------------------------------------
 // --------------------------------------------------------------------
 
 
 // BalanceStrategy is used to balance topics and partitions
 // BalanceStrategy is used to balance topics and partitions
-// across memebers of a consumer group
+// across members of a consumer group
 type BalanceStrategy interface {
 type BalanceStrategy interface {
 	// Name uniquely identifies the strategy.
 	// Name uniquely identifies the strategy.
 	Name() string
 	Name() string
@@ -78,7 +78,7 @@ type balanceStrategy struct {
 // Name implements BalanceStrategy.
 // Name implements BalanceStrategy.
 func (s *balanceStrategy) Name() string { return s.name }
 func (s *balanceStrategy) Name() string { return s.name }
 
 
-// Balance implements BalanceStrategy.
+// Plan implements BalanceStrategy.
 func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
 func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
 	// Build members by topic map
 	// Build members by topic map
 	mbt := make(map[string][]string)
 	mbt := make(map[string][]string)

+ 105 - 54
broker.go

@@ -18,19 +18,20 @@ import (
 
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 type Broker struct {
-	id   int32
-	addr string
+	conf *Config
 	rack *string
 	rack *string
 
 
-	conf          *Config
+	id            int32
+	addr          string
 	correlationID int32
 	correlationID int32
 	conn          net.Conn
 	conn          net.Conn
 	connErr       error
 	connErr       error
 	lock          sync.Mutex
 	lock          sync.Mutex
 	opened        int32
 	opened        int32
+	responses     chan responsePromise
+	done          chan bool
 
 
-	responses chan responsePromise
-	done      chan bool
+	registeredMetrics []string
 
 
 	incomingByteRate       metrics.Meter
 	incomingByteRate       metrics.Meter
 	requestRate            metrics.Meter
 	requestRate            metrics.Meter
@@ -179,13 +180,7 @@ func (b *Broker) Open(conf *Config) error {
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
 		if b.id >= 0 {
-			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
-			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
-			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
-			b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
-			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
-			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
-			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+			b.registerMetrics()
 		}
 		}
 
 
 		if conf.Net.SASL.Enable {
 		if conf.Net.SASL.Enable {
@@ -228,6 +223,7 @@ func (b *Broker) Connected() (bool, error) {
 	return b.conn != nil, b.connErr
 	return b.conn != nil, b.connErr
 }
 }
 
 
+//Close closes the broker resources
 func (b *Broker) Close() error {
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	b.lock.Lock()
 	defer b.lock.Unlock()
 	defer b.lock.Unlock()
@@ -246,12 +242,7 @@ func (b *Broker) Close() error {
 	b.done = nil
 	b.done = nil
 	b.responses = nil
 	b.responses = nil
 
 
-	if b.id >= 0 {
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
-	}
+	b.unregisterMetrics()
 
 
 	if err == nil {
 	if err == nil {
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
@@ -285,6 +276,7 @@ func (b *Broker) Rack() string {
 	return *b.rack
 	return *b.rack
 }
 }
 
 
+//GetMetadata send a metadata request and returns a metadata response or error
 func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 	response := new(MetadataResponse)
 
 
@@ -297,6 +289,7 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error
 	return response, nil
 	return response, nil
 }
 }
 
 
+//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
 func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 	response := new(ConsumerMetadataResponse)
 	response := new(ConsumerMetadataResponse)
 
 
@@ -309,6 +302,7 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume
 	return response, nil
 	return response, nil
 }
 }
 
 
+//FindCoordinator sends a find coordinate request and returns a response or error
 func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
 func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
 	response := new(FindCoordinatorResponse)
 	response := new(FindCoordinatorResponse)
 
 
@@ -321,6 +315,7 @@ func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordina
 	return response, nil
 	return response, nil
 }
 }
 
 
+//GetAvailableOffsets return an offset response or error
 func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 	response := new(OffsetResponse)
 
 
@@ -333,9 +328,12 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
 	return response, nil
 	return response, nil
 }
 }
 
 
+//Produce returns a produce response or error
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
-	var response *ProduceResponse
-	var err error
+	var (
+		response *ProduceResponse
+		err      error
+	)
 
 
 	if request.RequiredAcks == NoResponse {
 	if request.RequiredAcks == NoResponse {
 		err = b.sendAndReceive(request, nil)
 		err = b.sendAndReceive(request, nil)
@@ -351,11 +349,11 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 	return response, nil
 	return response, nil
 }
 }
 
 
+//Fetch returns a FetchResponse or error
 func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 	response := new(FetchResponse)
 
 
 	err := b.sendAndReceive(request, response)
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -363,11 +361,11 @@ func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	return response, nil
 	return response, nil
 }
 }
 
 
+//CommitOffset return an Offset commit reponse or error
 func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 	response := new(OffsetCommitResponse)
 
 
 	err := b.sendAndReceive(request, response)
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -375,11 +373,11 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
 	return response, nil
 	return response, nil
 }
 }
 
 
+//FetchOffset returns an offset fetch response or error
 func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
 	response := new(OffsetFetchResponse)
 
 
 	err := b.sendAndReceive(request, response)
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -387,6 +385,7 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
+//JoinGroup returns a join group response or error
 func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
 func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
 	response := new(JoinGroupResponse)
 	response := new(JoinGroupResponse)
 
 
@@ -398,6 +397,7 @@ func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error
 	return response, nil
 	return response, nil
 }
 }
 
 
+//SyncGroup returns a sync group response or error
 func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
 func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
 	response := new(SyncGroupResponse)
 	response := new(SyncGroupResponse)
 
 
@@ -409,6 +409,7 @@ func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error
 	return response, nil
 	return response, nil
 }
 }
 
 
+//LeaveGroup return a leave group response or error
 func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
 func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
 	response := new(LeaveGroupResponse)
 	response := new(LeaveGroupResponse)
 
 
@@ -420,6 +421,7 @@ func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, er
 	return response, nil
 	return response, nil
 }
 }
 
 
+//Heartbeat returns a heartbeat response or error
 func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
 func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
 	response := new(HeartbeatResponse)
 	response := new(HeartbeatResponse)
 
 
@@ -431,6 +433,7 @@ func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error
 	return response, nil
 	return response, nil
 }
 }
 
 
+//ListGroups return a list group response or error
 func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
 func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
 	response := new(ListGroupsResponse)
 	response := new(ListGroupsResponse)
 
 
@@ -442,6 +445,7 @@ func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, er
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DescribeGroups return describe group response or error
 func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
 func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
 	response := new(DescribeGroupsResponse)
 	response := new(DescribeGroupsResponse)
 
 
@@ -453,6 +457,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
 	return response, nil
 	return response, nil
 }
 }
 
 
+//ApiVersions return api version response or error
 func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
 func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
 	response := new(ApiVersionsResponse)
 	response := new(ApiVersionsResponse)
 
 
@@ -464,6 +469,7 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
+//CreateTopics send a create topic request and returns create topic response
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 	response := new(CreateTopicsResponse)
 	response := new(CreateTopicsResponse)
 
 
@@ -475,6 +481,7 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DeleteTopics sends a delete topic request and returns delete topic response
 func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
 func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
 	response := new(DeleteTopicsResponse)
 	response := new(DeleteTopicsResponse)
 
 
@@ -486,6 +493,8 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon
 	return response, nil
 	return response, nil
 }
 }
 
 
+//CreatePartitions sends a create partition request and returns create
+//partitions response or error
 func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
 func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
 	response := new(CreatePartitionsResponse)
 	response := new(CreatePartitionsResponse)
 
 
@@ -497,6 +506,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DeleteRecords send a request to delete records and return delete record
+//response or error
 func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
 func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
 	response := new(DeleteRecordsResponse)
 	response := new(DeleteRecordsResponse)
 
 
@@ -508,6 +519,7 @@ func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsRes
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DescribeAcls sends a describe acl request and returns a response or error
 func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
 func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
 	response := new(DescribeAclsResponse)
 	response := new(DescribeAclsResponse)
 
 
@@ -519,6 +531,7 @@ func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsRespon
 	return response, nil
 	return response, nil
 }
 }
 
 
+//CreateAcls sends a create acl request and returns a response or error
 func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
 func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
 	response := new(CreateAclsResponse)
 	response := new(CreateAclsResponse)
 
 
@@ -530,6 +543,7 @@ func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, er
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DeleteAcls sends a delete acl request and returns a response or error
 func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
 func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
 	response := new(DeleteAclsResponse)
 	response := new(DeleteAclsResponse)
 
 
@@ -541,6 +555,7 @@ func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, er
 	return response, nil
 	return response, nil
 }
 }
 
 
+//InitProducerID sends an init producer request and returns a response or error
 func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
 func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
 	response := new(InitProducerIDResponse)
 	response := new(InitProducerIDResponse)
 
 
@@ -552,6 +567,8 @@ func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerID
 	return response, nil
 	return response, nil
 }
 }
 
 
+//AddPartitionsToTxn send a request to add partition to txn and returns
+//a response or error
 func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
 func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
 	response := new(AddPartitionsToTxnResponse)
 	response := new(AddPartitionsToTxnResponse)
 
 
@@ -563,6 +580,8 @@ func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPar
 	return response, nil
 	return response, nil
 }
 }
 
 
+//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
+//or error
 func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
 func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
 	response := new(AddOffsetsToTxnResponse)
 	response := new(AddOffsetsToTxnResponse)
 
 
@@ -574,6 +593,7 @@ func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsTo
 	return response, nil
 	return response, nil
 }
 }
 
 
+//EndTxn sends a request to end txn and returns a response or error
 func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 	response := new(EndTxnResponse)
 	response := new(EndTxnResponse)
 
 
@@ -585,6 +605,8 @@ func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 	return response, nil
 	return response, nil
 }
 }
 
 
+//TxnOffsetCommit sends a request to commit transaction offsets and returns
+//a response or error
 func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
 func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
 	response := new(TxnOffsetCommitResponse)
 	response := new(TxnOffsetCommitResponse)
 
 
@@ -596,6 +618,8 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DescribeConfigs sends a request to describe config and returns a response or
+//error
 func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
 func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
 	response := new(DescribeConfigsResponse)
 	response := new(DescribeConfigsResponse)
 
 
@@ -607,6 +631,7 @@ func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConf
 	return response, nil
 	return response, nil
 }
 }
 
 
+//AlterConfigs sends a request to alter config and return a response or error
 func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
 func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
 	response := new(AlterConfigsResponse)
 	response := new(AlterConfigsResponse)
 
 
@@ -618,6 +643,7 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
 	return response, nil
 	return response, nil
 }
 }
 
 
+//DeleteGroups sends a request to delete groups and returns a response or error
 func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
 func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
 	response := new(DeleteGroupsResponse)
 	response := new(DeleteGroupsResponse)
 
 
@@ -656,7 +682,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
 	bytes, err := b.conn.Write(buf)
 	bytes, err := b.conn.Write(buf)
-	b.updateOutgoingCommunicationMetrics(bytes)
+	b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -676,7 +702,6 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 
 
 func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
 func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
 	promise, err := b.send(req, res != nil)
 	promise, err := b.send(req, res != nil)
-
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -725,11 +750,11 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
 }
 }
 
 
 func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
 func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
-
 	host, portstr, err := net.SplitHostPort(b.addr)
 	host, portstr, err := net.SplitHostPort(b.addr)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	port, err := strconv.Atoi(portstr)
 	port, err := strconv.Atoi(portstr)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -757,6 +782,7 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
 func (b *Broker) responseReceiver() {
 func (b *Broker) responseReceiver() {
 	var dead error
 	var dead error
 	header := make([]byte, 8)
 	header := make([]byte, 8)
+
 	for response := range b.responses {
 	for response := range b.responses {
 		if dead != nil {
 		if dead != nil {
 			response.errors <- dead
 			response.errors <- dead
@@ -819,7 +845,6 @@ func (b *Broker) authenticateViaSASL() error {
 	default:
 	default:
 		return b.sendAndReceiveSASLPlainAuth()
 		return b.sendAndReceiveSASLPlainAuth()
 	}
 	}
-
 }
 }
 
 
 func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
 func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
@@ -851,6 +876,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
+
 	length := binary.BigEndian.Uint32(header[:4])
 	length := binary.BigEndian.Uint32(header[:4])
 	payload := make([]byte, length-4)
 	payload := make([]byte, length-4)
 	n, err := io.ReadFull(b.conn, payload)
 	n, err := io.ReadFull(b.conn, payload)
@@ -858,17 +884,21 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
+
 	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
 	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
 	res := &SaslHandshakeResponse{}
 	res := &SaslHandshakeResponse{}
+
 	err = versionedDecode(payload, res, 0)
 	err = versionedDecode(payload, res, 0)
 	if err != nil {
 	if err != nil {
 		Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
 		Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
+
 	if res.Err != ErrNoError {
 	if res.Err != ErrNoError {
 		Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
 		Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
 		return res.Err
 		return res.Err
 	}
 	}
+
 	Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
 	Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
 	return nil
 	return nil
 }
 }
@@ -899,6 +929,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 			return handshakeErr
 			return handshakeErr
 		}
 		}
 	}
 	}
+
 	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	authBytes := make([]byte, length+4) //4 byte length header + auth data
 	authBytes := make([]byte, length+4) //4 byte length header + auth data
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
@@ -935,33 +966,27 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
 // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
 // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
 // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
 func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
 func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
-
 	if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
 	if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	token, err := provider.Token()
 	token, err := provider.Token()
-
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
-
 	correlationID := b.correlationID
 	correlationID := b.correlationID
 
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
 	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
-
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
-
 	b.correlationID++
 	b.correlationID++
 
 
 	bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
 	bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
-
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -1012,6 +1037,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
 			return err
 			return err
 		}
 		}
 	}
 	}
+
 	Logger.Println("SASL authentication succeeded")
 	Logger.Println("SASL authentication succeeded")
 	return nil
 	return nil
 }
 }
@@ -1023,39 +1049,41 @@ func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (i
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
 	}
 	}
+
 	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
 	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
 		return 0, err
 		return 0, err
 	}
 	}
+
 	return b.conn.Write(buf)
 	return b.conn.Write(buf)
 }
 }
 
 
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 	buf := make([]byte, responseLengthSize+correlationIDSize)
 	buf := make([]byte, responseLengthSize+correlationIDSize)
-	bytesRead, err := io.ReadFull(b.conn, buf)
+	_, err := io.ReadFull(b.conn, buf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	header := responseHeader{}
 	header := responseHeader{}
 	err = decode(buf, &header)
 	err = decode(buf, &header)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	if header.correlationID != correlationID {
 	if header.correlationID != correlationID {
 		return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 		return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 	}
 	}
+
 	buf = make([]byte, header.length-correlationIDSize)
 	buf = make([]byte, header.length-correlationIDSize)
-	c, err := io.ReadFull(b.conn, buf)
-	bytesRead += c
+	_, err = io.ReadFull(b.conn, buf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	res := &SaslAuthenticateResponse{}
 	res := &SaslAuthenticateResponse{}
 	if err := versionedDecode(buf, res, 0); err != nil {
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if err != nil {
-		return nil, err
-	}
 	if res.Err != ErrNoError {
 	if res.Err != ErrNoError {
 		return nil, res.Err
 		return nil, res.Err
 	}
 	}
@@ -1065,7 +1093,6 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // https://tools.ietf.org/html/rfc7628
 // https://tools.ietf.org/html/rfc7628
 func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
 func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
-
 	var ext string
 	var ext string
 
 
 	if token.Extensions != nil && len(token.Extensions) > 0 {
 	if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1083,7 +1110,6 @@ func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
 // mapToString returns a list of key-value pairs ordered by key.
 // mapToString returns a list of key-value pairs ordered by key.
 // keyValSep separates the key from the value. elemSep separates each pair.
 // keyValSep separates the key from the value. elemSep separates each pair.
 func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
 func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
-
 	buf := make([]string, 0, len(extensions))
 	buf := make([]string, 0, len(extensions))
 
 
 	for k, v := range extensions {
 	for k, v := range extensions {
@@ -1096,9 +1122,7 @@ func mapToString(extensions map[string]string, keyValSep string, elemSep string)
 }
 }
 
 
 func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
 func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-
 	initialResp, err := buildClientInitialResponse(token)
 	initialResp, err := buildClientInitialResponse(token)
-
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
 	}
 	}
@@ -1108,7 +1132,6 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
 
 
 	buf, err := encode(req, b.conf.MetricRegistry)
 	buf, err := encode(req, b.conf.MetricRegistry)
-
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
 	}
 	}
@@ -1122,10 +1145,9 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 
 
 func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
 func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
 
 
-	buf := make([]byte, 8)
+	buf := make([]byte, responseLengthSize+correlationIDSize)
 
 
 	bytesRead, err := io.ReadFull(b.conn, buf)
 	bytesRead, err := io.ReadFull(b.conn, buf)
-
 	if err != nil {
 	if err != nil {
 		return bytesRead, err
 		return bytesRead, err
 	}
 	}
@@ -1133,7 +1155,6 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 	header := responseHeader{}
 	header := responseHeader{}
 
 
 	err = decode(buf, &header)
 	err = decode(buf, &header)
-
 	if err != nil {
 	if err != nil {
 		return bytesRead, err
 		return bytesRead, err
 	}
 	}
@@ -1142,12 +1163,10 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 	}
 	}
 
 
-	buf = make([]byte, header.length-4)
+	buf = make([]byte, header.length-correlationIDSize)
 
 
 	c, err := io.ReadFull(b.conn, buf)
 	c, err := io.ReadFull(b.conn, buf)
-
 	bytesRead += c
 	bytesRead += c
-
 	if err != nil {
 	if err != nil {
 		return bytesRead, err
 		return bytesRead, err
 	}
 	}
@@ -1158,10 +1177,6 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 		return bytesRead, err
 		return bytesRead, err
 	}
 	}
 
 
-	if err != nil {
-		return bytesRead, err
-	}
-
 	if res.Err != ErrNoError {
 	if res.Err != ErrNoError {
 		return bytesRead, res.Err
 		return bytesRead, res.Err
 	}
 	}
@@ -1176,14 +1191,17 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
 	b.updateRequestLatencyMetrics(requestLatency)
 	b.updateRequestLatencyMetrics(requestLatency)
 	b.responseRate.Mark(1)
 	b.responseRate.Mark(1)
+
 	if b.brokerResponseRate != nil {
 	if b.brokerResponseRate != nil {
 		b.brokerResponseRate.Mark(1)
 		b.brokerResponseRate.Mark(1)
 	}
 	}
+
 	responseSize := int64(bytes)
 	responseSize := int64(bytes)
 	b.incomingByteRate.Mark(responseSize)
 	b.incomingByteRate.Mark(responseSize)
 	if b.brokerIncomingByteRate != nil {
 	if b.brokerIncomingByteRate != nil {
 		b.brokerIncomingByteRate.Mark(responseSize)
 		b.brokerIncomingByteRate.Mark(responseSize)
 	}
 	}
+
 	b.responseSize.Update(responseSize)
 	b.responseSize.Update(responseSize)
 	if b.brokerResponseSize != nil {
 	if b.brokerResponseSize != nil {
 		b.brokerResponseSize.Update(responseSize)
 		b.brokerResponseSize.Update(responseSize)
@@ -1193,9 +1211,11 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti
 func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
 func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
 	b.requestLatency.Update(requestLatencyInMs)
+
 	if b.brokerRequestLatency != nil {
 	if b.brokerRequestLatency != nil {
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
 	}
+
 }
 }
 
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1203,13 +1223,44 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 	if b.brokerRequestRate != nil {
 	if b.brokerRequestRate != nil {
 		b.brokerRequestRate.Mark(1)
 		b.brokerRequestRate.Mark(1)
 	}
 	}
+
 	requestSize := int64(bytes)
 	requestSize := int64(bytes)
 	b.outgoingByteRate.Mark(requestSize)
 	b.outgoingByteRate.Mark(requestSize)
 	if b.brokerOutgoingByteRate != nil {
 	if b.brokerOutgoingByteRate != nil {
 		b.brokerOutgoingByteRate.Mark(requestSize)
 		b.brokerOutgoingByteRate.Mark(requestSize)
 	}
 	}
+
 	b.requestSize.Update(requestSize)
 	b.requestSize.Update(requestSize)
 	if b.brokerRequestSize != nil {
 	if b.brokerRequestSize != nil {
 		b.brokerRequestSize.Update(requestSize)
 		b.brokerRequestSize.Update(requestSize)
 	}
 	}
+
+}
+
+func (b *Broker) registerMetrics() {
+	b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
+	b.brokerRequestRate = b.registerMeter("request-rate")
+	b.brokerRequestSize = b.registerHistogram("request-size")
+	b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
+	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
+	b.brokerResponseRate = b.registerMeter("response-rate")
+	b.brokerResponseSize = b.registerHistogram("response-size")
+}
+
+func (b *Broker) unregisterMetrics() {
+	for _, name := range b.registeredMetrics {
+		b.conf.MetricRegistry.Unregister(name)
+	}
+}
+
+func (b *Broker) registerMeter(name string) metrics.Meter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func (b *Broker) registerHistogram(name string) metrics.Histogram {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }
 }

+ 2 - 1
client.go

@@ -288,7 +288,8 @@ func (client *client) Partitions(topic string) ([]int32, error) {
 		partitions = client.cachedPartitions(topic, allPartitions)
 		partitions = client.cachedPartitions(topic, allPartitions)
 	}
 	}
 
 
-	if partitions == nil {
+	// no partitions found after refresh metadata
+	if len(partitions) == 0 {
 		return nil, ErrUnknownTopicOrPartition
 		return nil, ErrUnknownTopicOrPartition
 	}
 	}
 
 

+ 6 - 6
config_resource_type.go

@@ -6,10 +6,10 @@ type ConfigResourceType int8
 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
 
 
 const (
 const (
-	UnknownResource ConfigResourceType = 0
-	AnyResource     ConfigResourceType = 1
-	TopicResource   ConfigResourceType = 2
-	GroupResource   ConfigResourceType = 3
-	ClusterResource ConfigResourceType = 4
-	BrokerResource  ConfigResourceType = 5
+	UnknownResource ConfigResourceType = iota
+	AnyResource
+	TopicResource
+	GroupResource
+	ClusterResource
+	BrokerResource
 )
 )

+ 23 - 12
consumer.go

@@ -6,6 +6,8 @@ import (
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 )
 
 
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
@@ -44,11 +46,6 @@ func (ce ConsumerErrors) Error() string {
 // Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
 // Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope.
 // scope.
-//
-// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
-// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
-// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
-// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
 type Consumer interface {
 type Consumer interface {
 
 
 	// Topics returns the set of available topics as retrieved from the cluster
 	// Topics returns the set of available topics as retrieved from the cluster
@@ -426,12 +423,6 @@ func (child *partitionConsumer) AsyncClose() {
 func (child *partitionConsumer) Close() error {
 func (child *partitionConsumer) Close() error {
 	child.AsyncClose()
 	child.AsyncClose()
 
 
-	go withRecover(func() {
-		for range child.messages {
-			// drain
-		}
-	})
-
 	var errors ConsumerErrors
 	var errors ConsumerErrors
 	for err := range child.errors {
 	for err := range child.errors {
 		errors = append(errors, err)
 		errors = append(errors, err)
@@ -463,14 +454,22 @@ feederLoop:
 		for i, msg := range msgs {
 		for i, msg := range msgs {
 		messageSelect:
 		messageSelect:
 			select {
 			select {
+			case <-child.dying:
+				child.broker.acks.Done()
+				continue feederLoop
 			case child.messages <- msg:
 			case child.messages <- msg:
 				firstAttempt = true
 				firstAttempt = true
 			case <-expiryTicker.C:
 			case <-expiryTicker.C:
 				if !firstAttempt {
 				if !firstAttempt {
 					child.responseResult = errTimedOut
 					child.responseResult = errTimedOut
 					child.broker.acks.Done()
 					child.broker.acks.Done()
+				remainingLoop:
 					for _, msg = range msgs[i:] {
 					for _, msg = range msgs[i:] {
-						child.messages <- msg
+						select {
+						case child.messages <- msg:
+						case <-child.dying:
+							break remainingLoop
+						}
 					}
 					}
 					child.broker.input <- child
 					child.broker.input <- child
 					continue feederLoop
 					continue feederLoop
@@ -555,6 +554,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 }
 }
 
 
 func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
 func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+	var (
+		metricRegistry          = child.conf.MetricRegistry
+		consumerBatchSizeMetric metrics.Histogram
+	)
+
+	if metricRegistry != nil {
+		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
+	}
+
 	block := response.GetBlock(child.topic, child.partition)
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
 	if block == nil {
 		return nil, ErrIncompleteResponse
 		return nil, ErrIncompleteResponse
@@ -568,6 +576,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
+	consumerBatchSizeMetric.Update(int64(nRecs))
+
 	if nRecs == 0 {
 	if nRecs == 0 {
 		partialTrailingMessage, err := block.isPartial()
 		partialTrailingMessage, err := block.isPartial()
 		if err != nil {
 		if err != nil {

+ 6 - 6
describe_configs_response.go

@@ -26,12 +26,12 @@ func (s ConfigSource) String() string {
 }
 }
 
 
 const (
 const (
-	SourceUnknown              ConfigSource = 0
-	SourceTopic                ConfigSource = 1
-	SourceDynamicBroker        ConfigSource = 2
-	SourceDynamicDefaultBroker ConfigSource = 3
-	SourceStaticBroker         ConfigSource = 4
-	SourceDefault              ConfigSource = 5
+	SourceUnknown ConfigSource = iota
+	SourceTopic
+	SourceDynamicBroker
+	SourceDynamicDefaultBroker
+	SourceStaticBroker
+	SourceDefault
 )
 )
 
 
 type DescribeConfigsResponse struct {
 type DescribeConfigsResponse struct {

+ 3 - 0
examples/README.md

@@ -7,3 +7,6 @@ In these examples, we use `github.com/Shopify/sarama` as import path. We do this
 #### HTTP server
 #### HTTP server
 
 
 [http_server](./http_server) is a simple HTTP server uses both the sync producer to produce data as part of the request handling cycle, as well as the async producer to maintain an access log. It also uses the [mocks subpackage](https://godoc.org/github.com/Shopify/sarama/mocks) to test both.
 [http_server](./http_server) is a simple HTTP server uses both the sync producer to produce data as part of the request handling cycle, as well as the async producer to maintain an access log. It also uses the [mocks subpackage](https://godoc.org/github.com/Shopify/sarama/mocks) to test both.
+
+#### SASL SCRAM Authentication
+[sasl_scram_authentication](./sasl_scram_authentication) is an example of how to authenticate to a Kafka cluster using SASL SCRAM-SHA-256 or SCRAM-SHA-512 mechanisms.

+ 7 - 0
examples/consumergroup/README.md

@@ -0,0 +1,7 @@
+# Consumergroup example
+
+This example shows you how to use the Sarama consumer group consumer. The example simply starts consuming the given Kafka topics and logs the consumed messages.
+
+```bash
+$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
+```

+ 5 - 0
examples/consumergroup/go.mod

@@ -0,0 +1,5 @@
+module github.com/Shopify/sarama/examples/consumer
+
+replace github.com/Shopify/sarama => ../../
+
+require github.com/Shopify/sarama v0.0.0-00010101000000-000000000000

+ 17 - 0
examples/consumergroup/go.sum

@@ -0,0 +1,17 @@
+github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
+github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
+github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
+github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
+github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
+github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=

+ 131 - 0
examples/consumergroup/main.go

@@ -0,0 +1,131 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"os"
+	"os/signal"
+	"strings"
+	"syscall"
+
+	"github.com/Shopify/sarama"
+)
+
+// Sarma configuration options
+var (
+	brokers = ""
+	version = ""
+	group   = ""
+	topics  = ""
+	oldest  = true
+	verbose = false
+)
+
+func init() {
+	flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
+	flag.StringVar(&group, "group", "", "Kafka consumer group definition")
+	flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
+	flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
+	flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
+	flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
+	flag.Parse()
+
+	if len(brokers) == 0 {
+		panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
+	}
+
+	if len(topics) == 0 {
+		panic("no topics given to be consumed, please set the -topics flag")
+	}
+
+	if len(group) == 0 {
+		panic("no Kafka consumer group defined, please set the -group flag")
+	}
+}
+
+func main() {
+	log.Println("Starting a new Sarama consumer")
+
+	if verbose {
+		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
+	}
+
+	version, err := sarama.ParseKafkaVersion(version)
+	if err != nil {
+		panic(err)
+	}
+
+	/**
+	 * Construct a new Sarama configuration.
+	 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
+	 */
+	config := sarama.NewConfig()
+	config.Version = version
+
+	if oldest {
+		config.Consumer.Offsets.Initial = sarama.OffsetOldest
+	}
+
+	/**
+	 * Setup a new Sarama consumer group
+	 */
+	consumer := Consumer{
+		ready: make(chan bool, 0),
+	}
+
+	ctx := context.Background()
+	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
+	if err != nil {
+		panic(err)
+	}
+
+	go func() {
+		for {
+			err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
+			if err != nil {
+				panic(err)
+			}
+		}
+	}()
+
+	<-consumer.ready // Await till the consumer has been set up
+	log.Println("Sarama consumer up and running!...")
+
+	sigterm := make(chan os.Signal, 1)
+	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
+
+	<-sigterm // Await a sigterm signal before safely closing the consumer
+
+	err = client.Close()
+	if err != nil {
+		panic(err)
+	}
+}
+
+// Consumer represents a Sarama consumer group consumer
+type Consumer struct {
+	ready chan bool
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
+	// Mark the consumer as ready
+	close(consumer.ready)
+	return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
+func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for message := range claim.Messages() {
+		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
+		session.MarkMessage(message, "")
+	}
+
+	return nil
+}

+ 2 - 0
examples/sasl_scram_client/.gitignore

@@ -0,0 +1,2 @@
+sasl_scram_client
+

+ 4 - 0
examples/sasl_scram_client/README.md

@@ -0,0 +1,4 @@
+Example commande line:
+
+```./sasl_scram_client -brokers localhost:9094 -username foo -passwd a_password -topic topic_name -tls -algorithm [sha256|sha512]```
+

+ 118 - 0
examples/sasl_scram_client/main.go

@@ -0,0 +1,118 @@
+package main
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"flag"
+	"io/ioutil"
+	"log"
+	"os"
+	"strings"
+
+	"github.com/Shopify/sarama"
+)
+
+func init() {
+	sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
+}
+
+var (
+	brokers   = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
+	userName  = flag.String("username", "", "The SASL username")
+	passwd    = flag.String("passwd", "", "The SASL password")
+	algorithm = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
+	topic     = flag.String("topic", "default_topic", "The Kafka topic to use")
+	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
+	keyFile   = flag.String("key", "", "The optional key file for client authentication")
+	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
+	verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
+	useTLS    = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
+
+	logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
+)
+
+func createTLSConfiguration() (t *tls.Config) {
+	t = &tls.Config{
+		InsecureSkipVerify: *verifySSL,
+	}
+	if *certFile != "" && *keyFile != "" && *caFile != "" {
+		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		caCert, err := ioutil.ReadFile(*caFile)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+
+		t = &tls.Config{
+			Certificates:       []tls.Certificate{cert},
+			RootCAs:            caCertPool,
+			InsecureSkipVerify: *verifySSL,
+		}
+	}
+	return t
+}
+
+func main() {
+	flag.Parse()
+
+	if *brokers == "" {
+		log.Fatalln("at least one brocker is required")
+	}
+
+	if *userName == "" {
+		log.Fatalln("SASL username is required")
+	}
+
+	if *passwd == "" {
+		log.Fatalln("SASL password is required")
+	}
+
+	conf := sarama.NewConfig()
+	conf.Producer.Retry.Max = 1
+	conf.Producer.RequiredAcks = sarama.WaitForAll
+	conf.Producer.Return.Successes = true
+	conf.Metadata.Full = true
+	conf.Version = sarama.V0_10_0_0
+	conf.ClientID = "sasl_scram_client"
+	conf.Metadata.Full = true
+	conf.Net.SASL.Enable = true
+	conf.Net.SASL.User = *userName
+	conf.Net.SASL.Password = *passwd
+	conf.Net.SASL.Handshake = true
+	if *algorithm == "sha512" {
+		conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA512}
+		conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
+	} else if *algorithm == "sha256" {
+		conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA256}
+		conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
+
+	} else {
+		log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
+	}
+
+	if *useTLS {
+		conf.Net.TLS.Enable = true
+		conf.Net.TLS.Config = createTLSConfiguration()
+	}
+
+	syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
+	if err != nil {
+		logger.Fatalln("failed to create producer: ", err)
+	}
+	partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
+		Topic: *topic,
+		Value: sarama.StringEncoder("test_message"),
+	})
+	if err != nil {
+		logger.Fatalln("failed to send message to ", *topic, err)
+	}
+	logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
+	_ = syncProcuder.Close()
+	logger.Println("Bye now !")
+}

+ 36 - 0
examples/sasl_scram_client/scram_client.go

@@ -0,0 +1,36 @@
+package main
+
+import (
+	"crypto/sha256"
+	"crypto/sha512"
+	"hash"
+
+	"github.com/xdg/scram"
+)
+
+var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
+var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
+
+type XDGSCRAMClient struct {
+	*scram.Client
+	*scram.ClientConversation
+	scram.HashGeneratorFcn
+}
+
+func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
+	x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
+	if err != nil {
+		return err
+	}
+	x.ClientConversation = x.Client.NewConversation()
+	return nil
+}
+
+func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
+	response, err = x.ClientConversation.Step(challenge)
+	return
+}
+
+func (x *XDGSCRAMClient) Done() bool {
+	return x.ClientConversation.Done()
+}

+ 2 - 2
fetch_request.go

@@ -36,8 +36,8 @@ type FetchRequest struct {
 type IsolationLevel int8
 type IsolationLevel int8
 
 
 const (
 const (
-	ReadUncommitted IsolationLevel = 0
-	ReadCommitted   IsolationLevel = 1
+	ReadUncommitted IsolationLevel = iota
+	ReadCommitted
 )
 )
 
 
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {

+ 2 - 2
find_coordinator_request.go

@@ -3,8 +3,8 @@ package sarama
 type CoordinatorType int8
 type CoordinatorType int8
 
 
 const (
 const (
-	CoordinatorGroup       CoordinatorType = 0
-	CoordinatorTransaction CoordinatorType = 1
+	CoordinatorGroup CoordinatorType = iota
+	CoordinatorTransaction
 )
 )
 
 
 type FindCoordinatorRequest struct {
 type FindCoordinatorRequest struct {

+ 4 - 0
go.mod

@@ -12,4 +12,8 @@ require (
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
 	github.com/stretchr/testify v1.3.0
 	github.com/stretchr/testify v1.3.0
+	github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
+	github.com/xdg/stringprep v1.0.0 // indirect
+	golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 // indirect
+	golang.org/x/text v0.3.0 // indirect
 )
 )

+ 9 - 5
go.sum

@@ -21,8 +21,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563 h1:NIou6eNFigscvKJmsbyez16S2cIS6idossORlFtSt2E=
-golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
+github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
+github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
+golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
+golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

+ 25 - 18
message.go

@@ -5,37 +5,44 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-// The lowest 3 bits contain the compression codec used for the message
-const compressionCodecMask int8 = 0x07
-
-// Bit 3 set for "LogAppend" timestamps
-const timestampTypeMask = 0x08
+const (
+	//CompressionNone no compression
+	CompressionNone CompressionCodec = iota
+	//CompressionGZIP compression using GZIP
+	CompressionGZIP
+	//CompressionSnappy compression using snappy
+	CompressionSnappy
+	//CompressionLZ4 compression using LZ4
+	CompressionLZ4
+	//CompressionZSTD compression using ZSTD
+	CompressionZSTD
+
+	// The lowest 3 bits contain the compression codec used for the message
+	compressionCodecMask int8 = 0x07
+
+	// Bit 3 set for "LogAppend" timestamps
+	timestampTypeMask = 0x08
+
+	// CompressionLevelDefault is the constant to use in CompressionLevel
+	// to have the default compression level for any codec. The value is picked
+	// that we don't use any existing compression levels.
+	CompressionLevelDefault = -1000
+)
 
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
 type CompressionCodec int8
 type CompressionCodec int8
 
 
-const (
-	CompressionNone   CompressionCodec = 0
-	CompressionGZIP   CompressionCodec = 1
-	CompressionSnappy CompressionCodec = 2
-	CompressionLZ4    CompressionCodec = 3
-	CompressionZSTD   CompressionCodec = 4
-)
-
 func (cc CompressionCodec) String() string {
 func (cc CompressionCodec) String() string {
 	return []string{
 	return []string{
 		"none",
 		"none",
 		"gzip",
 		"gzip",
 		"snappy",
 		"snappy",
 		"lz4",
 		"lz4",
+		"zstd",
 	}[int(cc)]
 	}[int(cc)]
 }
 }
 
 
-// CompressionLevelDefault is the constant to use in CompressionLevel
-// to have the default compression level for any codec. The value is picked
-// that we don't use any existing compression levels.
-const CompressionLevelDefault = -1000
-
+//Message is a kafka message type
 type Message struct {
 type Message struct {
 	Codec            CompressionCodec // codec used to compress the message contents
 	Codec            CompressionCodec // codec used to compress the message contents
 	CompressionLevel int              // compression level
 	CompressionLevel int              // compression level

+ 0 - 8
metrics.go

@@ -28,14 +28,6 @@ func getMetricNameForBroker(name string, broker *Broker) string {
 	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
 	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
 }
 }
 
 
-func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
-	return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
-}
-
-func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
-	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
-}
-
 func getMetricNameForTopic(name string, topic string) string {
 func getMetricNameForTopic(name string, topic string) string {
 	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
 	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
 	// cf. KAFKA-1902 and KAFKA-2337
 	// cf. KAFKA-1902 and KAFKA-2337

+ 1 - 1
offset_manager.go

@@ -576,6 +576,6 @@ func (pom *partitionOffsetManager) handleError(err error) {
 
 
 func (pom *partitionOffsetManager) release() {
 func (pom *partitionOffsetManager) release() {
 	pom.releaseOnce.Do(func() {
 	pom.releaseOnce.Do(func() {
-		go close(pom.errors)
+		close(pom.errors)
 	})
 	})
 }
 }

+ 29 - 11
request.go

@@ -20,51 +20,67 @@ type request struct {
 	body          protocolBody
 	body          protocolBody
 }
 }
 
 
-func (r *request) encode(pe packetEncoder) (err error) {
+func (r *request) encode(pe packetEncoder) error {
 	pe.push(&lengthField{})
 	pe.push(&lengthField{})
 	pe.putInt16(r.body.key())
 	pe.putInt16(r.body.key())
 	pe.putInt16(r.body.version())
 	pe.putInt16(r.body.version())
 	pe.putInt32(r.correlationID)
 	pe.putInt32(r.correlationID)
-	err = pe.putString(r.clientID)
+
+	err := pe.putString(r.clientID)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	err = r.body.encode(pe)
 	err = r.body.encode(pe)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	return pe.pop()
 	return pe.pop()
 }
 }
 
 
 func (r *request) decode(pd packetDecoder) (err error) {
 func (r *request) decode(pd packetDecoder) (err error) {
-	var key int16
-	if key, err = pd.getInt16(); err != nil {
+	key, err := pd.getInt16()
+	if err != nil {
 		return err
 		return err
 	}
 	}
-	var version int16
-	if version, err = pd.getInt16(); err != nil {
+
+	version, err := pd.getInt16()
+	if err != nil {
 		return err
 		return err
 	}
 	}
-	if r.correlationID, err = pd.getInt32(); err != nil {
+
+	r.correlationID, err = pd.getInt32()
+	if err != nil {
 		return err
 		return err
 	}
 	}
+
 	r.clientID, err = pd.getString()
 	r.clientID, err = pd.getString()
+	if err != nil {
+		return err
+	}
 
 
 	r.body = allocateBody(key, version)
 	r.body = allocateBody(key, version)
 	if r.body == nil {
 	if r.body == nil {
 		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
 		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
 	}
 	}
+
 	return r.body.decode(pd, version)
 	return r.body.decode(pd, version)
 }
 }
 
 
-func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
-	lengthBytes := make([]byte, 4)
+func decodeRequest(r io.Reader) (*request, int, error) {
+	var (
+		bytesRead   int
+		lengthBytes = make([]byte, 4)
+	)
+
 	if _, err := io.ReadFull(r, lengthBytes); err != nil {
 	if _, err := io.ReadFull(r, lengthBytes); err != nil {
 		return nil, bytesRead, err
 		return nil, bytesRead, err
 	}
 	}
-	bytesRead += len(lengthBytes)
 
 
+	bytesRead += len(lengthBytes)
 	length := int32(binary.BigEndian.Uint32(lengthBytes))
 	length := int32(binary.BigEndian.Uint32(lengthBytes))
+
 	if length <= 4 || length > MaxRequestSize {
 	if length <= 4 || length > MaxRequestSize {
 		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
 		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
 	}
 	}
@@ -73,12 +89,14 @@ func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
 	if _, err := io.ReadFull(r, encodedReq); err != nil {
 	if _, err := io.ReadFull(r, encodedReq); err != nil {
 		return nil, bytesRead, err
 		return nil, bytesRead, err
 	}
 	}
+
 	bytesRead += len(encodedReq)
 	bytesRead += len(encodedReq)
 
 
-	req = &request{}
+	req := &request{}
 	if err := decode(encodedReq, req); err != nil {
 	if err := decode(encodedReq, req); err != nil {
 		return nil, bytesRead, err
 		return nil, bytesRead, err
 	}
 	}
+
 	return req, bytesRead, nil
 	return req, bytesRead, nil
 }
 }
 
 

+ 32 - 25
sarama.go

@@ -10,10 +10,7 @@ useful but comes with two caveats: it will generally be less efficient, and the
 depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the
 depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the
 SyncProducer can still sometimes be lost.
 SyncProducer can still sometimes be lost.
 
 
-To consume messages, use the Consumer. Note that Sarama's Consumer implementation does not currently support automatic
-consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the
-https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9
-and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
+To consume messages, use Consumer or Consumer-Group API.
 
 
 For lower-level needs, the Broker and Request/Response objects permit precise control over each connection
 For lower-level needs, the Broker and Request/Response objects permit precise control over each connection
 and message sent on the wire; the Client provides higher-level metadata management that is shared between
 and message sent on the wire; the Client provides higher-level metadata management that is shared between
@@ -61,6 +58,14 @@ Producer related metrics:
 	| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
 	| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
 	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
 	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
 
 
+Consumer related metrics:
+
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+	| Name                                      | Type       | Description                                                                          |
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+	| consumer-batch-size                       | histogram  | Distribution of the number of messages in a batch                                    |
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+
 */
 */
 package sarama
 package sarama
 
 
@@ -69,10 +74,29 @@ import (
 	"log"
 	"log"
 )
 )
 
 
-// Logger is the instance of a StdLogger interface that Sarama writes connection
-// management events to. By default it is set to discard all log messages via ioutil.Discard,
-// but you can set it to redirect wherever you want.
-var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+var (
+	// Logger is the instance of a StdLogger interface that Sarama writes connection
+	// management events to. By default it is set to discard all log messages via ioutil.Discard,
+	// but you can set it to redirect wherever you want.
+	Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+
+	// PanicHandler is called for recovering from panics spawned internally to the library (and thus
+	// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
+	PanicHandler func(interface{})
+
+	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
+	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
+	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
+	// to process.
+	MaxRequestSize int32 = 100 * 1024 * 1024
+
+	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
+	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
+	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
+	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
+	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
+	MaxResponseSize int32 = 100 * 1024 * 1024
+)
 
 
 // StdLogger is used to log error messages.
 // StdLogger is used to log error messages.
 type StdLogger interface {
 type StdLogger interface {
@@ -80,20 +104,3 @@ type StdLogger interface {
 	Printf(format string, v ...interface{})
 	Printf(format string, v ...interface{})
 	Println(v ...interface{})
 	Println(v ...interface{})
 }
 }
-
-// PanicHandler is called for recovering from panics spawned internally to the library (and thus
-// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
-var PanicHandler func(interface{})
-
-// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
-// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
-// to process.
-var MaxRequestSize int32 = 100 * 1024 * 1024
-
-// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
-// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
-// protect the client from running out of memory. Please note that brokers do not have any natural limit on
-// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
-// (see https://issues.apache.org/jira/browse/KAFKA-2063).
-var MaxResponseSize int32 = 100 * 1024 * 1024

+ 1 - 0
utils.go

@@ -188,6 +188,7 @@ var (
 	MaxVersion = V2_2_0_0
 	MaxVersion = V2_2_0_0
 )
 )
 
 
+//ParseKafkaVersion parses and returns kafka version or error from a string
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
 	if len(s) < 5 {
 	if len(s) < 5 {
 		return MinVersion, fmt.Errorf("invalid version `%s`", s)
 		return MinVersion, fmt.Errorf("invalid version `%s`", s)