Selaa lähdekoodia

Minor changes

- update*CommunicationMetrics even when a Read/Write fails
- use MockBroker notifier for waiting for both expectations and metrics
- add documentation about disabling metrics gathering
- use METRICS_DISABLE env variable for disabling metrics in benchmarks
- use constants for exponentially decaying reservoir for histograms
- fix typo in main documentation
Sebastien Launay 9 vuotta sitten
vanhempi
commit
3ea3cb2edb
7 muutettua tiedostoa jossa 96 lisäystä ja 76 poistoa
  1. 11 12
      broker.go
  2. 21 14
      broker_test.go
  3. 2 0
      config.go
  4. 12 0
      functional_producer_test.go
  5. 10 1
      metrics.go
  6. 39 48
      mockbroker.go
  7. 1 1
      sarama.go

+ 11 - 12
broker.go

@@ -371,14 +371,13 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, err
 	}
 
-	b.updateOutgoingCommunicationMetrics(len(buf))
-
 	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 		return nil, err
 	}
 
-	_, err = b.conn.Write(buf)
+	bytes, err := b.conn.Write(buf)
+	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 		return nil, err
 	}
@@ -476,8 +475,9 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 
-		_, err = io.ReadFull(b.conn, header)
+		bytesReadHeader, err := io.ReadFull(b.conn, header)
 		if err != nil {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			dead = err
 			response.errors <- err
 			continue
@@ -486,11 +486,13 @@ func (b *Broker) responseReceiver() {
 		decodedHeader := responseHeader{}
 		err = decode(header, &decodedHeader)
 		if err != nil {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			dead = err
 			response.errors <- err
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
@@ -499,15 +501,14 @@ func (b *Broker) responseReceiver() {
 		}
 
 		buf := make([]byte, decodedHeader.length-4)
-		_, err = io.ReadFull(b.conn, buf)
+		bytesReadBody, err := io.ReadFull(b.conn, buf)
+		b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
 		if err != nil {
 			dead = err
 			response.errors <- err
 			continue
 		}
 
-		b.updateIncomingCommunicationMetrics(len(header) + len(buf))
-
 		response.packets <- buf
 	}
 	close(b.done)
@@ -537,15 +538,14 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
 	copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 
-	b.updateOutgoingCommunicationMetrics(len(authBytes))
-
 	err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 		Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
 
-	_, err = b.conn.Write(authBytes)
+	bytesWritten, err := b.conn.Write(authBytes)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
@@ -553,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 
 	header := make([]byte, 4)
 	n, err := io.ReadFull(b.conn, header)
+	b.updateIncomingCommunicationMetrics(n)
 	// If the credentials are valid, we would get a 4 byte response filled with null characters.
 	// Otherwise, the broker closes the connection and we get an EOF
 	if err != nil {
@@ -560,8 +561,6 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 		return err
 	}
 
-	b.updateIncomingCommunicationMetrics(n)
-
 	Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
 	return nil
 }

+ 21 - 14
broker_test.go

@@ -37,6 +37,11 @@ func (m mockEncoder) encode(pe packetEncoder) error {
 	return pe.putRawBytes(m.bytes)
 }
 
+type brokerMetrics struct {
+	bytesRead    int
+	bytesWritten int
+}
+
 func TestBrokerAccessors(t *testing.T) {
 	broker := NewBroker("abc:123")
 
@@ -59,6 +64,11 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 		Logger.Printf("Testing broker communication for %s", tt.name)
 		mb := NewMockBroker(t, 0)
 		mb.Returns(&mockEncoder{tt.response})
+		pendingNotify := make(chan brokerMetrics)
+		// Register a callback to be notified about successful requests
+		mb.SetNotifier(func(bytesRead, bytesWritten int) {
+			pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
+		})
 		broker := NewBroker(mb.Addr())
 		// Set the broker id in order to validate local broker metrics
 		broker.id = 0
@@ -75,13 +85,16 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		// Wait up to 500 ms for the remote broker to process requests
-		// in order to have consistent metrics
-		if err := mb.WaitForExpectations(500 * time.Millisecond); err != nil {
-			t.Error(err)
+		// Wait up to 500 ms for the remote broker to process the request and
+		// notify us about the metrics
+		timeout := 500 * time.Millisecond
+		select {
+		case mockBrokerMetrics := <-pendingNotify:
+			validateBrokerMetrics(t, broker, mockBrokerMetrics)
+		case <-time.After(timeout):
+			t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
 		}
 		mb.Close()
-		validateBrokerMetrics(t, broker, mb)
 	}
 
 }
@@ -277,16 +290,10 @@ var brokerTestTable = []struct {
 		}},
 }
 
-func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) {
+func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
 	metricValidators := newMetricValidators()
-	mockBrokerBytesRead := 0
-	mockBrokerBytesWritten := 0
-
-	// Compute socket bytes
-	for _, requestResponse := range mockBroker.History() {
-		mockBrokerBytesRead += requestResponse.RequestSize
-		mockBrokerBytesWritten += requestResponse.ResponseSize
-	}
+	mockBrokerBytesRead := mockBrokerMetrics.bytesRead
+	mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
 
 	// Check that the number of bytes sent corresponds to what the mock broker received
 	metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))

+ 2 - 0
config.go

@@ -237,6 +237,8 @@ type Config struct {
 	Version KafkaVersion
 	// The registry to define metrics into.
 	// Defaults to metrics.DefaultRegistry.
+	// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
+	// prior to starting Sarama.
 	// See Examples on how to use the metrics registry
 	MetricRegistry metrics.Registry
 }

+ 12 - 0
functional_producer_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"os"
 	"sync"
 	"testing"
 	"time"
@@ -255,6 +256,17 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder)
 	setupFunctionalTest(b)
 	defer teardownFunctionalTest(b)
 
+	metricsDisable := os.Getenv("METRICS_DISABLE")
+	if metricsDisable != "" {
+		previousUseNilMetrics := metrics.UseNilMetrics
+		Logger.Println("Disabling metrics using no-op implementation")
+		metrics.UseNilMetrics = true
+		// Restore previous setting
+		defer func() {
+			metrics.UseNilMetrics = previousUseNilMetrics
+		}()
+	}
+
 	producer, err := NewAsyncProducer(kafkaBrokers, conf)
 	if err != nil {
 		b.Fatal(err)

+ 10 - 1
metrics.go

@@ -6,9 +6,18 @@ import (
 	"github.com/rcrowley/go-metrics"
 )
 
+// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library:
+// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution,
+// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements.
+// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38
+const (
+	metricsReservoirSize = 1028
+	metricsAlphaFactor   = 0.015
+)
+
 func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
 	return r.GetOrRegister(name, func() metrics.Histogram {
-		return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015))
+		return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor))
 	}).(metrics.Histogram)
 }
 

+ 39 - 48
mockbroker.go

@@ -3,7 +3,6 @@ package sarama
 import (
 	"bytes"
 	"encoding/binary"
-	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -21,6 +20,10 @@ const (
 
 type requestHandlerFunc func(req *request) (res encoder)
 
+// RequestNotifierFunc is invoked when a mock broker processes a request successfully
+// and will provides the number of bytes read and written.
+type RequestNotifierFunc func(bytesRead, bytesWritten int)
+
 // MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
 // to facilitate testing of higher level or specialized consumers and producers
 // built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
@@ -51,22 +54,19 @@ type MockBroker struct {
 	closing      chan none
 	stopper      chan none
 	expectations chan encoder
-	done         sync.WaitGroup
 	listener     net.Listener
 	t            TestReporter
 	latency      time.Duration
 	handler      requestHandlerFunc
-	origHandler  bool
-	history      []*RequestResponse
+	notifier     RequestNotifierFunc
+	history      []RequestResponse
 	lock         sync.Mutex
 }
 
 // RequestResponse represents a Request/Response pair processed by MockBroker.
 type RequestResponse struct {
-	Request      protocolBody
-	Response     encoder
-	RequestSize  int
-	ResponseSize int
+	Request  protocolBody
+	Response encoder
 }
 
 // SetLatency makes broker pause for the specified period every time before
@@ -90,6 +90,14 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
 	})
 }
 
+// SetNotifier set a function that will get invoked whenever a request has been
+// processed successfully and will provide the number of bytes read and written
+func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
+	b.lock.Lock()
+	b.notifier = notifier
+	b.lock.Unlock()
+}
+
 // BrokerID returns broker ID assigned to the broker.
 func (b *MockBroker) BrokerID() int32 {
 	return b.brokerID
@@ -102,9 +110,7 @@ func (b *MockBroker) BrokerID() int32 {
 func (b *MockBroker) History() []RequestResponse {
 	b.lock.Lock()
 	history := make([]RequestResponse, len(b.history))
-	for i, rr := range b.history {
-		history[i] = *rr
-	}
+	copy(history, b.history)
 	b.lock.Unlock()
 	return history
 }
@@ -119,21 +125,6 @@ func (b *MockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
-// Wait for the remaining expectations to be consumed or that the timeout expires
-func (b *MockBroker) WaitForExpectations(timeout time.Duration) error {
-	c := make(chan none)
-	go func() {
-		b.done.Wait()
-		close(c)
-	}()
-	select {
-	case <-c:
-		return nil
-	case <-time.After(timeout):
-		return errors.New(fmt.Sprintf("Not all expectations have been honoured after %v", timeout))
-	}
-}
-
 // Close terminates the broker blocking until it stops internal goroutines and
 // releases all resources.
 func (b *MockBroker) Close() {
@@ -155,7 +146,6 @@ func (b *MockBroker) Close() {
 func (b *MockBroker) setHandler(handler requestHandlerFunc) {
 	b.lock.Lock()
 	b.handler = handler
-	b.origHandler = false
 	b.lock.Unlock()
 }
 
@@ -215,10 +205,8 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 		}
 
 		b.lock.Lock()
-		originalHandlerUsed := b.origHandler
 		res := b.handler(req)
-		requestResponse := RequestResponse{req.body, res, bytesRead, 0}
-		b.history = append(b.history, &requestResponse)
+		b.history = append(b.history, RequestResponse{req.body, res})
 		b.lock.Unlock()
 
 		if res == nil {
@@ -232,25 +220,31 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 			b.serverError(err)
 			break
 		}
-		if len(encodedRes) != 0 {
-			binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
-			binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
-			if _, err = conn.Write(resHeader); err != nil {
-				b.serverError(err)
-				break
-			}
-			if _, err = conn.Write(encodedRes); err != nil {
-				b.serverError(err)
-				break
-			}
+		if len(encodedRes) == 0 {
 			b.lock.Lock()
-			requestResponse.ResponseSize = len(resHeader) + len(encodedRes)
+			if b.notifier != nil {
+				b.notifier(bytesRead, 0)
+			}
 			b.lock.Unlock()
+			continue
 		}
-		// Prevent negative wait group in case we are using a custom handler
-		if originalHandlerUsed {
-			b.done.Done()
+
+		binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+		if _, err = conn.Write(resHeader); err != nil {
+			b.serverError(err)
+			break
 		}
+		if _, err = conn.Write(encodedRes); err != nil {
+			b.serverError(err)
+			break
+		}
+
+		b.lock.Lock()
+		if b.notifier != nil {
+			b.notifier(bytesRead, len(resHeader)+len(encodedRes))
+		}
+		b.lock.Unlock()
 	}
 	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }
@@ -302,10 +296,8 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker
 		t:            t,
 		brokerID:     brokerID,
 		expectations: make(chan encoder, 512),
-		done:         sync.WaitGroup{},
 	}
 	broker.handler = broker.defaultRequestHandler
-	broker.origHandler = true
 
 	broker.listener, err = net.Listen("tcp", addr)
 	if err != nil {
@@ -328,6 +320,5 @@ func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker
 }
 
 func (b *MockBroker) Returns(e encoder) {
-	b.done.Add(1)
 	b.expectations <- e
 }

+ 1 - 1
sarama.go

@@ -42,7 +42,7 @@ Broker related metrics:
 	| histogram response-size-for-broker-<broker-id> | histogram  | Distribution of the response size in bytes for a given broker |
 	+------------------------------------------------+------------+---------------------------------------------------------------+
 
-Note that we do not gather specific metrics for seed broker but they are part of the "all brokers" metrics.
+Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
 
 */
 package sarama