Browse Source

Merge pull request #766 from slaunay/feature/request-latency-metric

Expose request latency metric
Evan Huus 9 years ago
parent
commit
15bc16649d
3 changed files with 61 additions and 23 deletions
  1. 26 7
      broker.go
  2. 17 0
      functional_producer_test.go
  3. 18 16
      sarama.go

+ 26 - 7
broker.go

@@ -32,18 +32,21 @@ type Broker struct {
 	incomingByteRate       metrics.Meter
 	incomingByteRate       metrics.Meter
 	requestRate            metrics.Meter
 	requestRate            metrics.Meter
 	requestSize            metrics.Histogram
 	requestSize            metrics.Histogram
+	requestLatency         metrics.Histogram
 	outgoingByteRate       metrics.Meter
 	outgoingByteRate       metrics.Meter
 	responseRate           metrics.Meter
 	responseRate           metrics.Meter
 	responseSize           metrics.Histogram
 	responseSize           metrics.Histogram
 	brokerIncomingByteRate metrics.Meter
 	brokerIncomingByteRate metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestSize      metrics.Histogram
 	brokerRequestSize      metrics.Histogram
+	brokerRequestLatency   metrics.Histogram
 	brokerOutgoingByteRate metrics.Meter
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
 	brokerResponseSize     metrics.Histogram
 }
 }
 
 
 type responsePromise struct {
 type responsePromise struct {
+	requestTime   time.Time
 	correlationID int32
 	correlationID int32
 	packets       chan []byte
 	packets       chan []byte
 	errors        chan error
 	errors        chan error
@@ -103,6 +106,7 @@ func (b *Broker) Open(conf *Config) error {
 		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
 		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
 		b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
 		b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
 		b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
 		b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
+		b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
@@ -112,6 +116,7 @@ func (b *Broker) Open(conf *Config) error {
 			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
 			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
 			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
 			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
 			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
 			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
+			b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
 			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
 			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
 			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
 			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
 			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
 			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
@@ -376,6 +381,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	requestTime := time.Now()
 	bytes, err := b.conn.Write(buf)
 	bytes, err := b.conn.Write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 	if err != nil {
@@ -384,10 +390,12 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 	b.correlationID++
 	b.correlationID++
 
 
 	if !promiseResponse {
 	if !promiseResponse {
+		// Record request latency without the response
+		b.updateRequestLatencyMetrics(time.Since(requestTime))
 		return nil, nil
 		return nil, nil
 	}
 	}
 
 
-	promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)}
+	promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
 	b.responses <- promise
 	b.responses <- promise
 
 
 	return &promise, nil
 	return &promise, nil
@@ -476,8 +484,9 @@ func (b *Broker) responseReceiver() {
 		}
 		}
 
 
 		bytesReadHeader, err := io.ReadFull(b.conn, header)
 		bytesReadHeader, err := io.ReadFull(b.conn, header)
+		requestLatency := time.Since(response.requestTime)
 		if err != nil {
 		if err != nil {
-			b.updateIncomingCommunicationMetrics(bytesReadHeader)
+			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
 			dead = err
 			dead = err
 			response.errors <- err
 			response.errors <- err
 			continue
 			continue
@@ -486,13 +495,13 @@ func (b *Broker) responseReceiver() {
 		decodedHeader := responseHeader{}
 		decodedHeader := responseHeader{}
 		err = decode(header, &decodedHeader)
 		err = decode(header, &decodedHeader)
 		if err != nil {
 		if err != nil {
-			b.updateIncomingCommunicationMetrics(bytesReadHeader)
+			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
 			dead = err
 			dead = err
 			response.errors <- err
 			response.errors <- err
 			continue
 			continue
 		}
 		}
 		if decodedHeader.correlationID != response.correlationID {
 		if decodedHeader.correlationID != response.correlationID {
-			b.updateIncomingCommunicationMetrics(bytesReadHeader)
+			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
 			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
@@ -502,7 +511,7 @@ func (b *Broker) responseReceiver() {
 
 
 		buf := make([]byte, decodedHeader.length-4)
 		buf := make([]byte, decodedHeader.length-4)
 		bytesReadBody, err := io.ReadFull(b.conn, buf)
 		bytesReadBody, err := io.ReadFull(b.conn, buf)
-		b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
+		b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
 		if err != nil {
 		if err != nil {
 			dead = err
 			dead = err
 			response.errors <- err
 			response.errors <- err
@@ -544,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 		return err
 		return err
 	}
 	}
 
 
+	requestTime := time.Now()
 	bytesWritten, err := b.conn.Write(authBytes)
 	bytesWritten, err := b.conn.Write(authBytes)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
 	if err != nil {
@@ -553,7 +563,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 
 
 	header := make([]byte, 4)
 	header := make([]byte, 4)
 	n, err := io.ReadFull(b.conn, header)
 	n, err := io.ReadFull(b.conn, header)
-	b.updateIncomingCommunicationMetrics(n)
+	b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
 	// If the credentials are valid, we would get a 4 byte response filled with null characters.
 	// If the credentials are valid, we would get a 4 byte response filled with null characters.
 	// Otherwise, the broker closes the connection and we get an EOF
 	// Otherwise, the broker closes the connection and we get an EOF
 	if err != nil {
 	if err != nil {
@@ -565,7 +575,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	return nil
 	return nil
 }
 }
 
 
-func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
+func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
+	b.updateRequestLatencyMetrics(requestLatency)
 	b.responseRate.Mark(1)
 	b.responseRate.Mark(1)
 	if b.brokerResponseRate != nil {
 	if b.brokerResponseRate != nil {
 		b.brokerResponseRate.Mark(1)
 		b.brokerResponseRate.Mark(1)
@@ -581,6 +592,14 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
 	}
 	}
 }
 }
 
 
+func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+	requestLatencyInMs := int64(requestLatency / time.Millisecond)
+	b.requestLatency.Update(requestLatencyInMs)
+	if b.brokerRequestLatency != nil {
+		b.brokerRequestLatency.Update(requestLatencyInMs)
+	}
+}
+
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 	b.requestRate.Mark(1)
 	b.requestRate.Mark(1)
 	if b.brokerRequestRate != nil {
 	if b.brokerRequestRate != nil {

+ 17 - 0
functional_producer_test.go

@@ -7,6 +7,7 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	toxiproxy "github.com/Shopify/toxiproxy/client"
 	"github.com/rcrowley/go-metrics"
 	"github.com/rcrowley/go-metrics"
 )
 )
 
 
@@ -99,6 +100,13 @@ func testProducingMessages(t *testing.T, config *Config) {
 	setupFunctionalTest(t)
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
 
+	// Configure some latency in order to properly validate the request latency metric
+	for _, proxy := range Proxies {
+		if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
+			t.Fatal("Unable to configure latency toxicity", err)
+		}
+	}
+
 	config.Producer.Return.Successes = true
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 	config.Consumer.Return.Errors = true
 
 
@@ -193,16 +201,25 @@ func validateMetrics(t *testing.T, client Client) {
 	noResponse := client.Config().Producer.RequiredAcks == NoResponse
 	noResponse := client.Config().Producer.RequiredAcks == NoResponse
 	compressionEnabled := client.Config().Producer.Compression != CompressionNone
 	compressionEnabled := client.Config().Producer.Compression != CompressionNone
 
 
+	// We are adding 10ms of latency to all requests with toxiproxy
+	minRequestLatencyInMs := 10
+	if noResponse {
+		// but when we do not wait for a response it can be less than 1ms
+		minRequestLatencyInMs = 0
+	}
+
 	// We read at least 1 byte from the broker
 	// We read at least 1 byte from the broker
 	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
 	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
 	// in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
 	// in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
 	metricValidators.register(minCountMeterValidator("request-rate", 3))
 	metricValidators.register(minCountMeterValidator("request-rate", 3))
 	metricValidators.register(minCountHistogramValidator("request-size", 3))
 	metricValidators.register(minCountHistogramValidator("request-size", 3))
 	metricValidators.register(minValHistogramValidator("request-size", 1))
 	metricValidators.register(minValHistogramValidator("request-size", 1))
+	metricValidators.register(minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
 	// and at least 2 requests to the registered broker (offset + produces)
 	// and at least 2 requests to the registered broker (offset + produces)
 	metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
 	metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
 	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
 	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
 	metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
 	metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
+	metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
 
 
 	// We send at least 1 batch
 	// We send at least 1 batch
 	metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
 	metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))

+ 18 - 16
sarama.go

@@ -25,22 +25,24 @@ Metrics are exposed through https://github.com/rcrowley/go-metrics library in a
 
 
 Broker related metrics:
 Broker related metrics:
 
 
-	+-------------------------------------------+------------+---------------------------------------------------------------+
-	| Name                                      | Type       | Description                                                   |
-	+-------------------------------------------+------------+---------------------------------------------------------------+
-	| incoming-byte-rate                        | meter      | Bytes/second read off all brokers                             |
-	| incoming-byte-rate-for-broker-<broker-id> | meter      | Bytes/second read off a given broker                          |
-	| outgoing-byte-rate                        | meter      | Bytes/second written off all brokers                          |
-	| outgoing-byte-rate-for-broker-<broker-id> | meter      | Bytes/second written off a given broker                       |
-	| request-rate                              | meter      | Requests/second sent to all brokers                           |
-	| request-rate-for-broker-<broker-id>       | meter      | Requests/second sent to a given broker                        |
-	| request-size                              | histogram  | Distribution of the request size in bytes for all brokers     |
-	| request-size-for-broker-<broker-id>       | histogram  | Distribution of the request size in bytes for a given broker  |
-	| response-rate                             | meter      | Responses/second received from all brokers                    |
-	| response-rate-for-broker-<broker-id>      | meter      | Responses/second received from a given broker                 |
-	| response-size                             | histogram  | Distribution of the response size in bytes for all brokers    |
-	| response-size-for-broker-<broker-id>      | histogram  | Distribution of the response size in bytes for a given broker |
-	+-------------------------------------------+------------+---------------------------------------------------------------+
+	+----------------------------------------------+------------+---------------------------------------------------------------+
+	| Name                                         | Type       | Description                                                   |
+	+----------------------------------------------+------------+---------------------------------------------------------------+
+	| incoming-byte-rate                           | meter      | Bytes/second read off all brokers                             |
+	| incoming-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second read off a given broker                          |
+	| outgoing-byte-rate                           | meter      | Bytes/second written off all brokers                          |
+	| outgoing-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second written off a given broker                       |
+	| request-rate                                 | meter      | Requests/second sent to all brokers                           |
+	| request-rate-for-broker-<broker-id>          | meter      | Requests/second sent to a given broker                        |
+	| request-size                                 | histogram  | Distribution of the request size in bytes for all brokers     |
+	| request-size-for-broker-<broker-id>          | histogram  | Distribution of the request size in bytes for a given broker  |
+	| request-latency-in-ms                        | histogram  | Distribution of the request latency in ms for all brokers     |
+	| request-latency-in-ms-for-broker-<broker-id> | histogram  | Distribution of the request latency in ms for a given broker  |
+	| response-rate                                | meter      | Responses/second received from all brokers                    |
+	| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
+	| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
+	| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
+	+----------------------------------------------+------------+---------------------------------------------------------------+
 
 
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.