Ver código fonte

Merge branch 'master' of github.com:Shopify/sarama into 1311_offline_replicas

drsoares 6 anos atrás
pai
commit
36113c9237
17 arquivos alterados com 241 adições e 169 exclusões
  1. 1 1
      .github/CONTRIBUTING.md
  2. 1 2
      .travis.yml
  3. 32 31
      acl_types.go
  4. 3 3
      admin.go
  5. 2 2
      balance_strategy.go
  6. 105 54
      broker.go
  7. 2 1
      client.go
  8. 6 6
      config_resource_type.go
  9. 6 6
      describe_configs_response.go
  10. 2 2
      fetch_request.go
  11. 2 2
      find_coordinator_request.go
  12. 25 18
      message.go
  13. 0 8
      metrics.go
  14. 1 1
      offset_manager.go
  15. 29 11
      request.go
  16. 23 21
      sarama.go
  17. 1 0
      utils.go

+ 1 - 1
.github/CONTRIBUTING.md

@@ -24,7 +24,7 @@ We will gladly accept bug fixes, or additions to this library. Please fork this
 - If you plan to work on something major, please open an issue to discuss the design first.
 - Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
 - Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
-- Run [go vet](https://godoc.org/golang.org/x/tools/cmd/vet) to detect any suspicious constructs in your code that could be bugs.
+- Run [go vet](https://golang.org/cmd/vet/) to detect any suspicious constructs in your code that could be bugs.
 - Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
 - You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
 - Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.

+ 1 - 2
.travis.yml

@@ -1,6 +1,5 @@
 language: go
 go:
-- 1.10.x
 - 1.11.x
 - 1.12.x
 
@@ -12,8 +11,8 @@ env:
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   matrix:
-  - KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
+  - KAFKA_VERSION=2.2.0 KAFKA_SCALA_VERSION=2.12
 
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 32 - 31
acl_types.go

@@ -1,50 +1,51 @@
 package sarama
 
-type AclOperation int
+type (
+	AclOperation int
+
+	AclPermissionType int
+
+	AclResourceType int
+
+	AclResourcePatternType int
+)
 
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
 const (
-	AclOperationUnknown         AclOperation = 0
-	AclOperationAny             AclOperation = 1
-	AclOperationAll             AclOperation = 2
-	AclOperationRead            AclOperation = 3
-	AclOperationWrite           AclOperation = 4
-	AclOperationCreate          AclOperation = 5
-	AclOperationDelete          AclOperation = 6
-	AclOperationAlter           AclOperation = 7
-	AclOperationDescribe        AclOperation = 8
-	AclOperationClusterAction   AclOperation = 9
-	AclOperationDescribeConfigs AclOperation = 10
-	AclOperationAlterConfigs    AclOperation = 11
-	AclOperationIdempotentWrite AclOperation = 12
+	AclOperationUnknown AclOperation = iota
+	AclOperationAny
+	AclOperationAll
+	AclOperationRead
+	AclOperationWrite
+	AclOperationCreate
+	AclOperationDelete
+	AclOperationAlter
+	AclOperationDescribe
+	AclOperationClusterAction
+	AclOperationDescribeConfigs
+	AclOperationAlterConfigs
+	AclOperationIdempotentWrite
 )
 
-type AclPermissionType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
 const (
-	AclPermissionUnknown AclPermissionType = 0
-	AclPermissionAny     AclPermissionType = 1
-	AclPermissionDeny    AclPermissionType = 2
-	AclPermissionAllow   AclPermissionType = 3
+	AclPermissionUnknown AclPermissionType = iota
+	AclPermissionAny
+	AclPermissionDeny
+	AclPermissionAllow
 )
 
-type AclResourceType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
 const (
-	AclResourceUnknown         AclResourceType = 0
-	AclResourceAny             AclResourceType = 1
-	AclResourceTopic           AclResourceType = 2
-	AclResourceGroup           AclResourceType = 3
-	AclResourceCluster         AclResourceType = 4
-	AclResourceTransactionalID AclResourceType = 5
+	AclResourceUnknown AclResourceType = iota
+	AclResourceAny
+	AclResourceTopic
+	AclResourceGroup
+	AclResourceCluster
+	AclResourceTransactionalID
 )
 
-type AclResourcePatternType int
-
 // ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
-
 const (
 	AclPatternUnknown AclResourcePatternType = iota
 	AclPatternAny

+ 3 - 3
admin.go

@@ -20,7 +20,7 @@ type ClusterAdmin interface {
 	// List the topics available in the cluster with the default options.
 	ListTopics() (map[string]TopicDetail, error)
 
-	// Describe some topics in the cluster
+	// Describe some topics in the cluster.
 	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
 
 	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
@@ -78,13 +78,13 @@ type ClusterAdmin interface {
 	// List the consumer groups available in the cluster.
 	ListConsumerGroups() (map[string]string, error)
 
-	// Describe the given consumer group
+	// Describe the given consumer groups.
 	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
 
 	// List the consumer group offsets available in the cluster.
 	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
 
-	// Get information about the nodes in the cluster
+	// Get information about the nodes in the cluster.
 	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
 
 	// Close shuts down the admin and closes underlying client.

+ 2 - 2
balance_strategy.go

@@ -24,7 +24,7 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
 // --------------------------------------------------------------------
 
 // BalanceStrategy is used to balance topics and partitions
-// across memebers of a consumer group
+// across members of a consumer group
 type BalanceStrategy interface {
 	// Name uniquely identifies the strategy.
 	Name() string
@@ -78,7 +78,7 @@ type balanceStrategy struct {
 // Name implements BalanceStrategy.
 func (s *balanceStrategy) Name() string { return s.name }
 
-// Balance implements BalanceStrategy.
+// Plan implements BalanceStrategy.
 func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
 	// Build members by topic map
 	mbt := make(map[string][]string)

+ 105 - 54
broker.go

@@ -18,19 +18,20 @@ import (
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
-	id   int32
-	addr string
+	conf *Config
 	rack *string
 
-	conf          *Config
+	id            int32
+	addr          string
 	correlationID int32
 	conn          net.Conn
 	connErr       error
 	lock          sync.Mutex
 	opened        int32
+	responses     chan responsePromise
+	done          chan bool
 
-	responses chan responsePromise
-	done      chan bool
+	registeredMetrics []string
 
 	incomingByteRate       metrics.Meter
 	requestRate            metrics.Meter
@@ -179,13 +180,7 @@ func (b *Broker) Open(conf *Config) error {
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
-			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
-			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
-			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
-			b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
-			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
-			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
-			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+			b.registerMetrics()
 		}
 
 		if conf.Net.SASL.Enable {
@@ -228,6 +223,7 @@ func (b *Broker) Connected() (bool, error) {
 	return b.conn != nil, b.connErr
 }
 
+//Close closes the broker resources
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -246,12 +242,7 @@ func (b *Broker) Close() error {
 	b.done = nil
 	b.responses = nil
 
-	if b.id >= 0 {
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
-	}
+	b.unregisterMetrics()
 
 	if err == nil {
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
@@ -285,6 +276,7 @@ func (b *Broker) Rack() string {
 	return *b.rack
 }
 
+//GetMetadata send a metadata request and returns a metadata response or error
 func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 
@@ -297,6 +289,7 @@ func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error
 	return response, nil
 }
 
+//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
 func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 	response := new(ConsumerMetadataResponse)
 
@@ -309,6 +302,7 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume
 	return response, nil
 }
 
+//FindCoordinator sends a find coordinate request and returns a response or error
 func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
 	response := new(FindCoordinatorResponse)
 
@@ -321,6 +315,7 @@ func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordina
 	return response, nil
 }
 
+//GetAvailableOffsets return an offset response or error
 func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 
@@ -333,9 +328,12 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
 	return response, nil
 }
 
+//Produce returns a produce response or error
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
-	var response *ProduceResponse
-	var err error
+	var (
+		response *ProduceResponse
+		err      error
+	)
 
 	if request.RequiredAcks == NoResponse {
 		err = b.sendAndReceive(request, nil)
@@ -351,11 +349,11 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 	return response, nil
 }
 
+//Fetch returns a FetchResponse or error
 func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -363,11 +361,11 @@ func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	return response, nil
 }
 
+//CommitOffset return an Offset commit reponse or error
 func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -375,11 +373,11 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
 	return response, nil
 }
 
+//FetchOffset returns an offset fetch response or error
 func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -387,6 +385,7 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
 	return response, nil
 }
 
+//JoinGroup returns a join group response or error
 func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
 	response := new(JoinGroupResponse)
 
@@ -398,6 +397,7 @@ func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error
 	return response, nil
 }
 
+//SyncGroup returns a sync group response or error
 func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
 	response := new(SyncGroupResponse)
 
@@ -409,6 +409,7 @@ func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error
 	return response, nil
 }
 
+//LeaveGroup return a leave group response or error
 func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
 	response := new(LeaveGroupResponse)
 
@@ -420,6 +421,7 @@ func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, er
 	return response, nil
 }
 
+//Heartbeat returns a heartbeat response or error
 func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
 	response := new(HeartbeatResponse)
 
@@ -431,6 +433,7 @@ func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error
 	return response, nil
 }
 
+//ListGroups return a list group response or error
 func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
 	response := new(ListGroupsResponse)
 
@@ -442,6 +445,7 @@ func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, er
 	return response, nil
 }
 
+//DescribeGroups return describe group response or error
 func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
 	response := new(DescribeGroupsResponse)
 
@@ -453,6 +457,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
 	return response, nil
 }
 
+//ApiVersions return api version response or error
 func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
 	response := new(ApiVersionsResponse)
 
@@ -464,6 +469,7 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
 	return response, nil
 }
 
+//CreateTopics send a create topic request and returns create topic response
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 	response := new(CreateTopicsResponse)
 
@@ -475,6 +481,7 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon
 	return response, nil
 }
 
+//DeleteTopics sends a delete topic request and returns delete topic response
 func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
 	response := new(DeleteTopicsResponse)
 
@@ -486,6 +493,8 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon
 	return response, nil
 }
 
+//CreatePartitions sends a create partition request and returns create
+//partitions response or error
 func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
 	response := new(CreatePartitionsResponse)
 
@@ -497,6 +506,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart
 	return response, nil
 }
 
+//DeleteRecords send a request to delete records and return delete record
+//response or error
 func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
 	response := new(DeleteRecordsResponse)
 
@@ -508,6 +519,7 @@ func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsRes
 	return response, nil
 }
 
+//DescribeAcls sends a describe acl request and returns a response or error
 func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
 	response := new(DescribeAclsResponse)
 
@@ -519,6 +531,7 @@ func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsRespon
 	return response, nil
 }
 
+//CreateAcls sends a create acl request and returns a response or error
 func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
 	response := new(CreateAclsResponse)
 
@@ -530,6 +543,7 @@ func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, er
 	return response, nil
 }
 
+//DeleteAcls sends a delete acl request and returns a response or error
 func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
 	response := new(DeleteAclsResponse)
 
@@ -541,6 +555,7 @@ func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, er
 	return response, nil
 }
 
+//InitProducerID sends an init producer request and returns a response or error
 func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
 	response := new(InitProducerIDResponse)
 
@@ -552,6 +567,8 @@ func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerID
 	return response, nil
 }
 
+//AddPartitionsToTxn send a request to add partition to txn and returns
+//a response or error
 func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
 	response := new(AddPartitionsToTxnResponse)
 
@@ -563,6 +580,8 @@ func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPar
 	return response, nil
 }
 
+//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
+//or error
 func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
 	response := new(AddOffsetsToTxnResponse)
 
@@ -574,6 +593,7 @@ func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsTo
 	return response, nil
 }
 
+//EndTxn sends a request to end txn and returns a response or error
 func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 	response := new(EndTxnResponse)
 
@@ -585,6 +605,8 @@ func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 	return response, nil
 }
 
+//TxnOffsetCommit sends a request to commit transaction offsets and returns
+//a response or error
 func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
 	response := new(TxnOffsetCommitResponse)
 
@@ -596,6 +618,8 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom
 	return response, nil
 }
 
+//DescribeConfigs sends a request to describe config and returns a response or
+//error
 func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
 	response := new(DescribeConfigsResponse)
 
@@ -607,6 +631,7 @@ func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConf
 	return response, nil
 }
 
+//AlterConfigs sends a request to alter config and return a response or error
 func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
 	response := new(AlterConfigsResponse)
 
@@ -618,6 +643,7 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
 	return response, nil
 }
 
+//DeleteGroups sends a request to delete groups and returns a response or error
 func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
 	response := new(DeleteGroupsResponse)
 
@@ -656,7 +682,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 
 	requestTime := time.Now()
 	bytes, err := b.conn.Write(buf)
-	b.updateOutgoingCommunicationMetrics(bytes)
+	b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
 	if err != nil {
 		return nil, err
 	}
@@ -676,7 +702,6 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 
 func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
 	promise, err := b.send(req, res != nil)
-
 	if err != nil {
 		return err
 	}
@@ -725,11 +750,11 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
 }
 
 func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
-
 	host, portstr, err := net.SplitHostPort(b.addr)
 	if err != nil {
 		return err
 	}
+
 	port, err := strconv.Atoi(portstr)
 	if err != nil {
 		return err
@@ -757,6 +782,7 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
 func (b *Broker) responseReceiver() {
 	var dead error
 	header := make([]byte, 8)
+
 	for response := range b.responses {
 		if dead != nil {
 			response.errors <- dead
@@ -819,7 +845,6 @@ func (b *Broker) authenticateViaSASL() error {
 	default:
 		return b.sendAndReceiveSASLPlainAuth()
 	}
-
 }
 
 func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
@@ -851,6 +876,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 	}
+
 	length := binary.BigEndian.Uint32(header[:4])
 	payload := make([]byte, length-4)
 	n, err := io.ReadFull(b.conn, payload)
@@ -858,17 +884,21 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 	}
+
 	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
 	res := &SaslHandshakeResponse{}
+
 	err = versionedDecode(payload, res, 0)
 	if err != nil {
 		Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
 		return err
 	}
+
 	if res.Err != ErrNoError {
 		Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
 		return res.Err
 	}
+
 	Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
 	return nil
 }
@@ -899,6 +929,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 			return handshakeErr
 		}
 	}
+
 	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	authBytes := make([]byte, length+4) //4 byte length header + auth data
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
@@ -935,33 +966,27 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
 // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
 func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
-
 	if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
 		return err
 	}
 
 	token, err := provider.Token()
-
 	if err != nil {
 		return err
 	}
 
 	requestTime := time.Now()
-
 	correlationID := b.correlationID
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
-
 	if err != nil {
 		return err
 	}
 
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
-
 	b.correlationID++
 
 	bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
-
 	if err != nil {
 		return err
 	}
@@ -1012,6 +1037,7 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
 			return err
 		}
 	}
+
 	Logger.Println("SASL authentication succeeded")
 	return nil
 }
@@ -1023,39 +1049,41 @@ func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (i
 	if err != nil {
 		return 0, err
 	}
+
 	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
 		return 0, err
 	}
+
 	return b.conn.Write(buf)
 }
 
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 	buf := make([]byte, responseLengthSize+correlationIDSize)
-	bytesRead, err := io.ReadFull(b.conn, buf)
+	_, err := io.ReadFull(b.conn, buf)
 	if err != nil {
 		return nil, err
 	}
+
 	header := responseHeader{}
 	err = decode(buf, &header)
 	if err != nil {
 		return nil, err
 	}
+
 	if header.correlationID != correlationID {
 		return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 	}
+
 	buf = make([]byte, header.length-correlationIDSize)
-	c, err := io.ReadFull(b.conn, buf)
-	bytesRead += c
+	_, err = io.ReadFull(b.conn, buf)
 	if err != nil {
 		return nil, err
 	}
+
 	res := &SaslAuthenticateResponse{}
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return nil, err
 	}
-	if err != nil {
-		return nil, err
-	}
 	if res.Err != ErrNoError {
 		return nil, res.Err
 	}
@@ -1065,7 +1093,6 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
 // https://tools.ietf.org/html/rfc7628
 func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
-
 	var ext string
 
 	if token.Extensions != nil && len(token.Extensions) > 0 {
@@ -1083,7 +1110,6 @@ func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
 // mapToString returns a list of key-value pairs ordered by key.
 // keyValSep separates the key from the value. elemSep separates each pair.
 func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
-
 	buf := make([]string, 0, len(extensions))
 
 	for k, v := range extensions {
@@ -1096,9 +1122,7 @@ func mapToString(extensions map[string]string, keyValSep string, elemSep string)
 }
 
 func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-
 	initialResp, err := buildClientInitialResponse(token)
-
 	if err != nil {
 		return 0, err
 	}
@@ -1108,7 +1132,6 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
 
 	buf, err := encode(req, b.conf.MetricRegistry)
-
 	if err != nil {
 		return 0, err
 	}
@@ -1122,10 +1145,9 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 
 func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
 
-	buf := make([]byte, 8)
+	buf := make([]byte, responseLengthSize+correlationIDSize)
 
 	bytesRead, err := io.ReadFull(b.conn, buf)
-
 	if err != nil {
 		return bytesRead, err
 	}
@@ -1133,7 +1155,6 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 	header := responseHeader{}
 
 	err = decode(buf, &header)
-
 	if err != nil {
 		return bytesRead, err
 	}
@@ -1142,12 +1163,10 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 	}
 
-	buf = make([]byte, header.length-4)
+	buf = make([]byte, header.length-correlationIDSize)
 
 	c, err := io.ReadFull(b.conn, buf)
-
 	bytesRead += c
-
 	if err != nil {
 		return bytesRead, err
 	}
@@ -1158,10 +1177,6 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 		return bytesRead, err
 	}
 
-	if err != nil {
-		return bytesRead, err
-	}
-
 	if res.Err != ErrNoError {
 		return bytesRead, res.Err
 	}
@@ -1176,14 +1191,17 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
 	b.updateRequestLatencyMetrics(requestLatency)
 	b.responseRate.Mark(1)
+
 	if b.brokerResponseRate != nil {
 		b.brokerResponseRate.Mark(1)
 	}
+
 	responseSize := int64(bytes)
 	b.incomingByteRate.Mark(responseSize)
 	if b.brokerIncomingByteRate != nil {
 		b.brokerIncomingByteRate.Mark(responseSize)
 	}
+
 	b.responseSize.Update(responseSize)
 	if b.brokerResponseSize != nil {
 		b.brokerResponseSize.Update(responseSize)
@@ -1193,9 +1211,11 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti
 func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
+
 	if b.brokerRequestLatency != nil {
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
+
 }
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1203,13 +1223,44 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 	if b.brokerRequestRate != nil {
 		b.brokerRequestRate.Mark(1)
 	}
+
 	requestSize := int64(bytes)
 	b.outgoingByteRate.Mark(requestSize)
 	if b.brokerOutgoingByteRate != nil {
 		b.brokerOutgoingByteRate.Mark(requestSize)
 	}
+
 	b.requestSize.Update(requestSize)
 	if b.brokerRequestSize != nil {
 		b.brokerRequestSize.Update(requestSize)
 	}
+
+}
+
+func (b *Broker) registerMetrics() {
+	b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
+	b.brokerRequestRate = b.registerMeter("request-rate")
+	b.brokerRequestSize = b.registerHistogram("request-size")
+	b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
+	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
+	b.brokerResponseRate = b.registerMeter("response-rate")
+	b.brokerResponseSize = b.registerHistogram("response-size")
+}
+
+func (b *Broker) unregisterMetrics() {
+	for _, name := range b.registeredMetrics {
+		b.conf.MetricRegistry.Unregister(name)
+	}
+}
+
+func (b *Broker) registerMeter(name string) metrics.Meter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func (b *Broker) registerHistogram(name string) metrics.Histogram {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }

+ 2 - 1
client.go

@@ -292,7 +292,8 @@ func (client *client) Partitions(topic string) ([]int32, error) {
 		partitions = client.cachedPartitions(topic, allPartitions)
 	}
 
-	if partitions == nil {
+	// no partitions found after refresh metadata
+	if len(partitions) == 0 {
 		return nil, ErrUnknownTopicOrPartition
 	}
 

+ 6 - 6
config_resource_type.go

@@ -6,10 +6,10 @@ type ConfigResourceType int8
 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
 
 const (
-	UnknownResource ConfigResourceType = 0
-	AnyResource     ConfigResourceType = 1
-	TopicResource   ConfigResourceType = 2
-	GroupResource   ConfigResourceType = 3
-	ClusterResource ConfigResourceType = 4
-	BrokerResource  ConfigResourceType = 5
+	UnknownResource ConfigResourceType = iota
+	AnyResource
+	TopicResource
+	GroupResource
+	ClusterResource
+	BrokerResource
 )

+ 6 - 6
describe_configs_response.go

@@ -26,12 +26,12 @@ func (s ConfigSource) String() string {
 }
 
 const (
-	SourceUnknown              ConfigSource = 0
-	SourceTopic                ConfigSource = 1
-	SourceDynamicBroker        ConfigSource = 2
-	SourceDynamicDefaultBroker ConfigSource = 3
-	SourceStaticBroker         ConfigSource = 4
-	SourceDefault              ConfigSource = 5
+	SourceUnknown ConfigSource = iota
+	SourceTopic
+	SourceDynamicBroker
+	SourceDynamicDefaultBroker
+	SourceStaticBroker
+	SourceDefault
 )
 
 type DescribeConfigsResponse struct {

+ 2 - 2
fetch_request.go

@@ -36,8 +36,8 @@ type FetchRequest struct {
 type IsolationLevel int8
 
 const (
-	ReadUncommitted IsolationLevel = 0
-	ReadCommitted   IsolationLevel = 1
+	ReadUncommitted IsolationLevel = iota
+	ReadCommitted
 )
 
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {

+ 2 - 2
find_coordinator_request.go

@@ -3,8 +3,8 @@ package sarama
 type CoordinatorType int8
 
 const (
-	CoordinatorGroup       CoordinatorType = 0
-	CoordinatorTransaction CoordinatorType = 1
+	CoordinatorGroup CoordinatorType = iota
+	CoordinatorTransaction
 )
 
 type FindCoordinatorRequest struct {

+ 25 - 18
message.go

@@ -5,37 +5,44 @@ import (
 	"time"
 )
 
-// The lowest 3 bits contain the compression codec used for the message
-const compressionCodecMask int8 = 0x07
-
-// Bit 3 set for "LogAppend" timestamps
-const timestampTypeMask = 0x08
+const (
+	//CompressionNone no compression
+	CompressionNone CompressionCodec = iota
+	//CompressionGZIP compression using GZIP
+	CompressionGZIP
+	//CompressionSnappy compression using snappy
+	CompressionSnappy
+	//CompressionLZ4 compression using LZ4
+	CompressionLZ4
+	//CompressionZSTD compression using ZSTD
+	CompressionZSTD
+
+	// The lowest 3 bits contain the compression codec used for the message
+	compressionCodecMask int8 = 0x07
+
+	// Bit 3 set for "LogAppend" timestamps
+	timestampTypeMask = 0x08
+
+	// CompressionLevelDefault is the constant to use in CompressionLevel
+	// to have the default compression level for any codec. The value is picked
+	// that we don't use any existing compression levels.
+	CompressionLevelDefault = -1000
+)
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
 type CompressionCodec int8
 
-const (
-	CompressionNone   CompressionCodec = 0
-	CompressionGZIP   CompressionCodec = 1
-	CompressionSnappy CompressionCodec = 2
-	CompressionLZ4    CompressionCodec = 3
-	CompressionZSTD   CompressionCodec = 4
-)
-
 func (cc CompressionCodec) String() string {
 	return []string{
 		"none",
 		"gzip",
 		"snappy",
 		"lz4",
+		"zstd",
 	}[int(cc)]
 }
 
-// CompressionLevelDefault is the constant to use in CompressionLevel
-// to have the default compression level for any codec. The value is picked
-// that we don't use any existing compression levels.
-const CompressionLevelDefault = -1000
-
+//Message is a kafka message type
 type Message struct {
 	Codec            CompressionCodec // codec used to compress the message contents
 	CompressionLevel int              // compression level

+ 0 - 8
metrics.go

@@ -28,14 +28,6 @@ func getMetricNameForBroker(name string, broker *Broker) string {
 	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
 }
 
-func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
-	return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
-}
-
-func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
-	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
-}
-
 func getMetricNameForTopic(name string, topic string) string {
 	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
 	// cf. KAFKA-1902 and KAFKA-2337

+ 1 - 1
offset_manager.go

@@ -576,6 +576,6 @@ func (pom *partitionOffsetManager) handleError(err error) {
 
 func (pom *partitionOffsetManager) release() {
 	pom.releaseOnce.Do(func() {
-		go close(pom.errors)
+		close(pom.errors)
 	})
 }

+ 29 - 11
request.go

@@ -20,51 +20,67 @@ type request struct {
 	body          protocolBody
 }
 
-func (r *request) encode(pe packetEncoder) (err error) {
+func (r *request) encode(pe packetEncoder) error {
 	pe.push(&lengthField{})
 	pe.putInt16(r.body.key())
 	pe.putInt16(r.body.version())
 	pe.putInt32(r.correlationID)
-	err = pe.putString(r.clientID)
+
+	err := pe.putString(r.clientID)
 	if err != nil {
 		return err
 	}
+
 	err = r.body.encode(pe)
 	if err != nil {
 		return err
 	}
+
 	return pe.pop()
 }
 
 func (r *request) decode(pd packetDecoder) (err error) {
-	var key int16
-	if key, err = pd.getInt16(); err != nil {
+	key, err := pd.getInt16()
+	if err != nil {
 		return err
 	}
-	var version int16
-	if version, err = pd.getInt16(); err != nil {
+
+	version, err := pd.getInt16()
+	if err != nil {
 		return err
 	}
-	if r.correlationID, err = pd.getInt32(); err != nil {
+
+	r.correlationID, err = pd.getInt32()
+	if err != nil {
 		return err
 	}
+
 	r.clientID, err = pd.getString()
+	if err != nil {
+		return err
+	}
 
 	r.body = allocateBody(key, version)
 	if r.body == nil {
 		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
 	}
+
 	return r.body.decode(pd, version)
 }
 
-func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
-	lengthBytes := make([]byte, 4)
+func decodeRequest(r io.Reader) (*request, int, error) {
+	var (
+		bytesRead   int
+		lengthBytes = make([]byte, 4)
+	)
+
 	if _, err := io.ReadFull(r, lengthBytes); err != nil {
 		return nil, bytesRead, err
 	}
-	bytesRead += len(lengthBytes)
 
+	bytesRead += len(lengthBytes)
 	length := int32(binary.BigEndian.Uint32(lengthBytes))
+
 	if length <= 4 || length > MaxRequestSize {
 		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
 	}
@@ -73,12 +89,14 @@ func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
 	if _, err := io.ReadFull(r, encodedReq); err != nil {
 		return nil, bytesRead, err
 	}
+
 	bytesRead += len(encodedReq)
 
-	req = &request{}
+	req := &request{}
 	if err := decode(encodedReq, req); err != nil {
 		return nil, bytesRead, err
 	}
+
 	return req, bytesRead, nil
 }
 

+ 23 - 21
sarama.go

@@ -74,10 +74,29 @@ import (
 	"log"
 )
 
-// Logger is the instance of a StdLogger interface that Sarama writes connection
-// management events to. By default it is set to discard all log messages via ioutil.Discard,
-// but you can set it to redirect wherever you want.
-var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+var (
+	// Logger is the instance of a StdLogger interface that Sarama writes connection
+	// management events to. By default it is set to discard all log messages via ioutil.Discard,
+	// but you can set it to redirect wherever you want.
+	Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+
+	// PanicHandler is called for recovering from panics spawned internally to the library (and thus
+	// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
+	PanicHandler func(interface{})
+
+	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
+	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
+	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
+	// to process.
+	MaxRequestSize int32 = 100 * 1024 * 1024
+
+	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
+	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
+	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
+	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
+	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
+	MaxResponseSize int32 = 100 * 1024 * 1024
+)
 
 // StdLogger is used to log error messages.
 type StdLogger interface {
@@ -85,20 +104,3 @@ type StdLogger interface {
 	Printf(format string, v ...interface{})
 	Println(v ...interface{})
 }
-
-// PanicHandler is called for recovering from panics spawned internally to the library (and thus
-// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
-var PanicHandler func(interface{})
-
-// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
-// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
-// to process.
-var MaxRequestSize int32 = 100 * 1024 * 1024
-
-// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
-// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
-// protect the client from running out of memory. Please note that brokers do not have any natural limit on
-// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
-// (see https://issues.apache.org/jira/browse/KAFKA-2063).
-var MaxResponseSize int32 = 100 * 1024 * 1024

+ 1 - 0
utils.go

@@ -188,6 +188,7 @@ var (
 	MaxVersion = V2_2_0_0
 )
 
+//ParseKafkaVersion parses and returns kafka version or error from a string
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
 	if len(s) < 5 {
 		return MinVersion, fmt.Errorf("invalid version `%s`", s)