Pārlūkot izejas kodu

Merge remote-tracking branch 'upstream/master'

Varun 6 gadi atpakaļ
vecāks
revīzija
bb3bd3c9f0
5 mainītis faili ar 41 papildinājumiem un 33 dzēšanām
  1. 1 1
      .github/CONTRIBUTING.md
  2. 1 2
      .travis.yml
  3. 2 2
      balance_strategy.go
  4. 37 20
      broker.go
  5. 0 8
      metrics.go

+ 1 - 1
.github/CONTRIBUTING.md

@@ -24,7 +24,7 @@ We will gladly accept bug fixes, or additions to this library. Please fork this
 - If you plan to work on something major, please open an issue to discuss the design first.
 - If you plan to work on something major, please open an issue to discuss the design first.
 - Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
 - Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
 - Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
 - Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
-- Run [go vet](https://godoc.org/golang.org/x/tools/cmd/vet) to detect any suspicious constructs in your code that could be bugs.
+- Run [go vet](https://golang.org/cmd/vet/) to detect any suspicious constructs in your code that could be bugs.
 - Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
 - Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
 - You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
 - You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
 - Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.
 - Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.

+ 1 - 2
.travis.yml

@@ -1,6 +1,5 @@
 language: go
 language: go
 go:
 go:
-- 1.10.x
 - 1.11.x
 - 1.11.x
 - 1.12.x
 - 1.12.x
 
 
@@ -12,8 +11,8 @@ env:
   - KAFKA_HOSTNAME=localhost
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   - DEBUG=true
   matrix:
   matrix:
-  - KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
   - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
+  - KAFKA_VERSION=2.2.0 KAFKA_SCALA_VERSION=2.12
 
 
 before_install:
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 2 - 2
balance_strategy.go

@@ -24,7 +24,7 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
 // --------------------------------------------------------------------
 // --------------------------------------------------------------------
 
 
 // BalanceStrategy is used to balance topics and partitions
 // BalanceStrategy is used to balance topics and partitions
-// across memebers of a consumer group
+// across members of a consumer group
 type BalanceStrategy interface {
 type BalanceStrategy interface {
 	// Name uniquely identifies the strategy.
 	// Name uniquely identifies the strategy.
 	Name() string
 	Name() string
@@ -78,7 +78,7 @@ type balanceStrategy struct {
 // Name implements BalanceStrategy.
 // Name implements BalanceStrategy.
 func (s *balanceStrategy) Name() string { return s.name }
 func (s *balanceStrategy) Name() string { return s.name }
 
 
-// Balance implements BalanceStrategy.
+// Plan implements BalanceStrategy.
 func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
 func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
 	// Build members by topic map
 	// Build members by topic map
 	mbt := make(map[string][]string)
 	mbt := make(map[string][]string)

+ 37 - 20
broker.go

@@ -31,6 +31,8 @@ type Broker struct {
 	responses     chan responsePromise
 	responses     chan responsePromise
 	done          chan bool
 	done          chan bool
 
 
+	registeredMetrics []string
+
 	incomingByteRate       metrics.Meter
 	incomingByteRate       metrics.Meter
 	requestRate            metrics.Meter
 	requestRate            metrics.Meter
 	requestSize            metrics.Histogram
 	requestSize            metrics.Histogram
@@ -178,13 +180,7 @@ func (b *Broker) Open(conf *Config) error {
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
 		if b.id >= 0 {
-			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
-			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
-			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
-			b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
-			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
-			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
-			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+			b.registerMetrics()
 		}
 		}
 
 
 		if conf.Net.SASL.Enable {
 		if conf.Net.SASL.Enable {
@@ -246,12 +242,7 @@ func (b *Broker) Close() error {
 	b.done = nil
 	b.done = nil
 	b.responses = nil
 	b.responses = nil
 
 
-	if b.id >= 0 {
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
-		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
-	}
+	b.unregisterMetrics()
 
 
 	if err == nil {
 	if err == nil {
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
@@ -1068,7 +1059,7 @@ func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (i
 
 
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 	buf := make([]byte, responseLengthSize+correlationIDSize)
 	buf := make([]byte, responseLengthSize+correlationIDSize)
-	bytesRead, err := io.ReadFull(b.conn, buf)
+	_, err := io.ReadFull(b.conn, buf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -1084,8 +1075,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 	}
 	}
 
 
 	buf = make([]byte, header.length-correlationIDSize)
 	buf = make([]byte, header.length-correlationIDSize)
-	c, err := io.ReadFull(b.conn, buf)
-	bytesRead += c
+	_, err = io.ReadFull(b.conn, buf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -1094,11 +1084,9 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 	if err := versionedDecode(buf, res, 0); err != nil {
 	if err := versionedDecode(buf, res, 0); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-
 	if res.Err != ErrNoError {
 	if res.Err != ErrNoError {
 		return nil, res.Err
 		return nil, res.Err
 	}
 	}
-
 	return res.SaslAuthBytes, nil
 	return res.SaslAuthBytes, nil
 }
 }
 
 
@@ -1156,7 +1144,8 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
 }
 }
 
 
 func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
 func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
-	buf := make([]byte, 8)
+
+	buf := make([]byte, responseLengthSize+correlationIDSize)
 
 
 	bytesRead, err := io.ReadFull(b.conn, buf)
 	bytesRead, err := io.ReadFull(b.conn, buf)
 	if err != nil {
 	if err != nil {
@@ -1174,7 +1163,7 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
 		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 		return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
 	}
 	}
 
 
-	buf = make([]byte, header.length-4)
+	buf = make([]byte, header.length-correlationIDSize)
 
 
 	c, err := io.ReadFull(b.conn, buf)
 	c, err := io.ReadFull(b.conn, buf)
 	bytesRead += c
 	bytesRead += c
@@ -1247,3 +1236,31 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 	}
 	}
 
 
 }
 }
+
+func (b *Broker) registerMetrics() {
+	b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
+	b.brokerRequestRate = b.registerMeter("request-rate")
+	b.brokerRequestSize = b.registerHistogram("request-size")
+	b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
+	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
+	b.brokerResponseRate = b.registerMeter("response-rate")
+	b.brokerResponseSize = b.registerHistogram("response-size")
+}
+
+func (b *Broker) unregisterMetrics() {
+	for _, name := range b.registeredMetrics {
+		b.conf.MetricRegistry.Unregister(name)
+	}
+}
+
+func (b *Broker) registerMeter(name string) metrics.Meter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func (b *Broker) registerHistogram(name string) metrics.Histogram {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
+}

+ 0 - 8
metrics.go

@@ -28,14 +28,6 @@ func getMetricNameForBroker(name string, broker *Broker) string {
 	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
 	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
 }
 }
 
 
-func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
-	return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
-}
-
-func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
-	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
-}
-
 func getMetricNameForTopic(name string, topic string) string {
 func getMetricNameForTopic(name string, topic string) string {
 	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
 	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
 	// cf. KAFKA-1902 and KAFKA-2337
 	// cf. KAFKA-1902 and KAFKA-2337