Browse Source

Expose producer metrics with go-metrics

- provide the following metrics:
  - batch-size histogram (global and per topic)
  - record-send-rate meter (global and per topic)
  - records-per-request histogram (global and per topic)
  - compression-ratio histogram (global and per topic)
- add metrics.Registry parameter to the encode function
- provide underlying MessageSet when encoding fake compressed "message"
- use len(msg.Set.Messages) for counting records
- use compressedSize property in Message for knowing the size of the
  compressed payload
- expose the configured metric registry in ProduceRequest
- expose current offset in packetEncoder for batch size metric
- expose real encoder flag in packetEncoder for recording metrics only
  once
- record metrics in produce_request.go
- add unit tests and functional tests
- use Spew library for building better DeepEqual error message when
  comparing raw bodies
- add documentation for new metrics
Sebastien Launay 9 years ago
parent
commit
124e7c647e

+ 1 - 1
broker.go

@@ -366,7 +366,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 	}
 
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
-	buf, err := encode(req)
+	buf, err := encode(req, b.conf.MetricRegistry)
 	if err != nil {
 		return nil, err
 	}

+ 2 - 2
consumer_group_members_test.go

@@ -31,7 +31,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
 		UserData: []byte{0x01, 0x02, 0x03},
 	}
 
-	buf, err := encode(meta)
+	buf, err := encode(meta, nil)
 	if err != nil {
 		t.Error("Failed to encode data", err)
 	} else if !bytes.Equal(groupMemberMetadata, buf) {
@@ -56,7 +56,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
 		UserData: []byte{0x01, 0x02, 0x03},
 	}
 
-	buf, err := encode(amt)
+	buf, err := encode(amt, nil)
 	if err != nil {
 		t.Error("Failed to encode data", err)
 	} else if !bytes.Equal(groupMemberAssignment, buf) {

+ 8 - 3
encoder_decoder.go

@@ -1,6 +1,10 @@
 package sarama
 
-import "fmt"
+import (
+	"fmt"
+
+	"github.com/rcrowley/go-metrics"
+)
 
 // Encoder is the interface that wraps the basic Encode method.
 // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
@@ -8,8 +12,8 @@ type encoder interface {
 	encode(pe packetEncoder) error
 }
 
-// Encode takes an Encoder and turns it into bytes.
-func encode(e encoder) ([]byte, error) {
+// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
+func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
 	if e == nil {
 		return nil, nil
 	}
@@ -27,6 +31,7 @@ func encode(e encoder) ([]byte, error) {
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length)
+	realEnc.registry = metricRegistry
 	err = e.encode(&realEnc)
 	if err != nil {
 		return nil, err

+ 22 - 0
functional_producer_test.go

@@ -191,6 +191,7 @@ func validateMetrics(t *testing.T, client Client) {
 
 	metricValidators := newMetricValidators()
 	noResponse := client.Config().Producer.RequiredAcks == NoResponse
+	compressionEnabled := client.Config().Producer.Compression != CompressionNone
 
 	// We read at least 1 byte from the broker
 	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
@@ -203,6 +204,27 @@ func validateMetrics(t *testing.T, client Client) {
 	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
 	metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
 
+	// We send at least 1 batch
+	metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
+	metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
+	if compressionEnabled {
+		// We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
+		metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
+		metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
+		metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
+	} else {
+		// We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
+		metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
+		metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
+		metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
+	}
+
+	// We send exactly TestBatchSize messages
+	metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
+	// We send at least one record per request
+	metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
+	metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
+
 	// We receive at least 1 byte from the broker
 	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
 	if noResponse {

+ 1 - 1
join_group_request.go

@@ -98,7 +98,7 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
 }
 
 func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
-	bin, err := encode(metadata)
+	bin, err := encode(metadata, nil)
 	if err != nil {
 		return err
 	}

+ 7 - 0
message.go

@@ -31,6 +31,7 @@ type Message struct {
 	Timestamp time.Time        // the timestamp of the message (version 1+ only)
 
 	compressedCache []byte
+	compressedSize  int // used for computing the compression ratio metrics
 }
 
 func (m *Message) encode(pe packetEncoder) error {
@@ -77,6 +78,8 @@ func (m *Message) encode(pe packetEncoder) error {
 		default:
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
 		}
+		// Keep in mind the compressed payload size for metric gathering
+		m.compressedSize = len(payload)
 	}
 
 	if err = pe.putBytes(payload); err != nil {
@@ -121,6 +124,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	// Required for deep equal assertion during tests but might be useful
+	// for future metrics about the compression ratio in fetch requests
+	m.compressedSize = len(m.Value)
+
 	switch m.Codec {
 	case CompressionNone:
 		// nothing to do

+ 15 - 0
metrics.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"strings"
 
 	"github.com/rcrowley/go-metrics"
 )
@@ -34,3 +35,17 @@ func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) m
 func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
 	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
 }
+
+func getMetricNameForTopic(name string, topic string) string {
+	// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
+	// cf. KAFKA-1902 and KAFKA-2337
+	return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1))
+}
+
+func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
+	return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
+}
+
+func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
+	return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
+}

+ 14 - 0
metrics_test.go

@@ -57,6 +57,11 @@ func (m *metricValidators) registerForBroker(broker *Broker, validator *metricVa
 	m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator})
 }
 
+func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) {
+	m.register(&metricValidator{validator.name, validator.validator})
+	m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator})
+}
+
 func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) {
 	m.register(validator)
 	m.registerForBroker(broker, validator)
@@ -156,3 +161,12 @@ func minValHistogramValidator(name string, minMin int) *metricValidator {
 		}
 	})
 }
+
+func maxValHistogramValidator(name string, maxMax int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		max := int(histogram.Max())
+		if max > maxMax {
+			t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
+		}
+	})
+}

+ 1 - 1
mockbroker.go

@@ -215,7 +215,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 		}
 		Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
 
-		encodedRes, err := encode(res)
+		encodedRes, err := encode(res, nil)
 		if err != nil {
 			b.serverError(err)
 			break

+ 8 - 0
packet_encoder.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "github.com/rcrowley/go-metrics"
+
 // PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules.
 // Types implementing Encoder only need to worry about calling methods like PutString,
 // not about how a string is represented in Kafka.
@@ -19,9 +21,15 @@ type packetEncoder interface {
 	putInt32Array(in []int32) error
 	putInt64Array(in []int64) error
 
+	// Provide the current offset to record the batch size metric
+	offset() int
+
 	// Stacks, see PushEncoder
 	push(in pushEncoder)
 	pop() error
+
+	// To record metrics when provided
+	metricRegistry() metrics.Registry
 }
 
 // PushEncoder is the interface for encoding fields like CRCs and lengths where the value

+ 11 - 0
prep_encoder.go

@@ -3,6 +3,8 @@ package sarama
 import (
 	"fmt"
 	"math"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 type prepEncoder struct {
@@ -99,6 +101,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error {
 	return nil
 }
 
+func (pe *prepEncoder) offset() int {
+	return pe.length
+}
+
 // stackable
 
 func (pe *prepEncoder) push(in pushEncoder) {
@@ -108,3 +114,8 @@ func (pe *prepEncoder) push(in pushEncoder) {
 func (pe *prepEncoder) pop() error {
 	return nil
 }
+
+// we do not record metrics during the prep encoder pass
+func (pe *prepEncoder) metricRegistry() metrics.Registry {
+	return nil
+}

+ 50 - 0
produce_request.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "github.com/rcrowley/go-metrics"
+
 // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
 // it must see before responding. Any of the constants defined here are valid. On broker versions
 // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
@@ -30,6 +32,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 	if err != nil {
 		return err
 	}
+	metricRegistry := pe.metricRegistry()
+	var batchSizeMetric metrics.Histogram
+	var compressionRatioMetric metrics.Histogram
+	if metricRegistry != nil {
+		batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
+		compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
+	}
+
+	totalRecordCount := int64(0)
 	for topic, partitions := range r.msgSets {
 		err = pe.putString(topic)
 		if err != nil {
@@ -39,7 +50,13 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 		if err != nil {
 			return err
 		}
+		topicRecordCount := int64(0)
+		var topicCompressionRatioMetric metrics.Histogram
+		if metricRegistry != nil {
+			topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
+		}
 		for id, msgSet := range partitions {
+			startOffset := pe.offset()
 			pe.putInt32(id)
 			pe.push(&lengthField{})
 			err = msgSet.encode(pe)
@@ -50,8 +67,41 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 			if err != nil {
 				return err
 			}
+			if metricRegistry != nil {
+				for _, messageBlock := range msgSet.Messages {
+					// Is this a fake "message" wrapping real messages?
+					if messageBlock.Msg.Set != nil {
+						topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
+					} else {
+						// A single uncompressed message
+						topicRecordCount++
+					}
+					// Better be safe than sorry when computing the compression ratio
+					if messageBlock.Msg.compressedSize != 0 {
+						compressionRatio := float64(len(messageBlock.Msg.Value)) /
+							float64(messageBlock.Msg.compressedSize)
+						// Histogram do not support decimal values, let's multiple it by 100 for better precision
+						intCompressionRatio := int64(100 * compressionRatio)
+						compressionRatioMetric.Update(intCompressionRatio)
+						topicCompressionRatioMetric.Update(intCompressionRatio)
+					}
+				}
+				batchSize := int64(pe.offset() - startOffset)
+				batchSizeMetric.Update(batchSize)
+				getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
+			}
+		}
+		if topicRecordCount > 0 {
+			getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
+			getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
+			totalRecordCount += topicRecordCount
 		}
 	}
+	if totalRecordCount > 0 {
+		metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
+		getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
+	}
+
 	return nil
 }
 

+ 2 - 1
produce_set.go

@@ -89,7 +89,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 				// and sent as the payload of a single fake "message" with the appropriate codec
 				// set and no key. When the server sees a message with a compression codec, it
 				// decompresses the payload and treats the result as its message set.
-				payload, err := encode(set.setToSend)
+				payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry)
 				if err != nil {
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
@@ -98,6 +98,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					Codec: ps.parent.conf.Producer.Compression,
 					Key:   nil,
 					Value: payload,
+					Set:   set.setToSend, // Provide the underlying message set for accurate metrics
 				}
 				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 					compMsg.Version = 1

+ 18 - 4
real_encoder.go

@@ -1,11 +1,16 @@
 package sarama
 
-import "encoding/binary"
+import (
+	"encoding/binary"
+
+	"github.com/rcrowley/go-metrics"
+)
 
 type realEncoder struct {
-	raw   []byte
-	off   int
-	stack []pushEncoder
+	raw      []byte
+	off      int
+	stack    []pushEncoder
+	registry metrics.Registry
 }
 
 // primitives
@@ -98,6 +103,10 @@ func (re *realEncoder) putInt64Array(in []int64) error {
 	return nil
 }
 
+func (re *realEncoder) offset() int {
+	return re.off
+}
+
 // stacks
 
 func (re *realEncoder) push(in pushEncoder) {
@@ -113,3 +122,8 @@ func (re *realEncoder) pop() error {
 
 	return in.run(re.off, re.raw)
 }
+
+// we do record metrics during the real encoder pass
+func (re *realEncoder) metricRegistry() metrics.Registry {
+	return re.registry
+}

+ 8 - 6
request_test.go

@@ -4,6 +4,8 @@ import (
 	"bytes"
 	"reflect"
 	"testing"
+
+	"github.com/davecgh/go-spew/spew"
 )
 
 type testRequestBody struct {
@@ -25,7 +27,7 @@ func (s *testRequestBody) encode(pe packetEncoder) error {
 // implement the encoder or decoder interfaces that needed somewhere to live
 
 func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
-	packet, err := encode(in)
+	packet, err := encode(in, nil)
 	if err != nil {
 		t.Error(err)
 	} else if !bytes.Equal(packet, expect) {
@@ -50,7 +52,7 @@ func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []
 func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 	// Encoder request
 	req := &request{correlationID: 123, clientID: "foo", body: rb}
-	packet, err := encode(req)
+	packet, err := encode(req, nil)
 	headerSize := 14 + len("foo")
 	if err != nil {
 		t.Error(err)
@@ -62,16 +64,16 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 	if err != nil {
 		t.Error("Failed to decode request", err)
 	} else if decoded.correlationID != 123 || decoded.clientID != "foo" {
-		t.Errorf("Decoded header is not valid: %v", decoded)
+		t.Errorf("Decoded header %q is not valid: %+v", name, decoded)
 	} else if !reflect.DeepEqual(rb, decoded.body) {
-		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body)
+		t.Error(spew.Sprintf("Decoded request %q does not match the encoded one\nencoded: %+v\ndecoded: %+v", name, rb, decoded.body))
 	} else if n != len(packet) {
-		t.Errorf("Decoded request bytes: %d does not match the encoded one: %d\n", n, len(packet))
+		t.Errorf("Decoded request %q bytes: %d does not match the encoded one: %d\n", name, n, len(packet))
 	}
 }
 
 func testResponse(t *testing.T, name string, res protocolBody, expected []byte) {
-	encoded, err := encode(res)
+	encoded, err := encode(res, nil)
 	if err != nil {
 		t.Error(err)
 	} else if expected != nil && !bytes.Equal(encoded, expected) {

+ 31 - 16
sarama.go

@@ -25,25 +25,40 @@ Metrics are exposed through https://github.com/rcrowley/go-metrics library in a
 
 Broker related metrics:
 
-	+------------------------------------------------+------------+---------------------------------------------------------------+
-	| Name                                           | Type       | Description                                                   |
-	+------------------------------------------------+------------+---------------------------------------------------------------+
-	| incoming-byte-rate                             | meter      | Bytes/second read off all brokers                             |
-	| incoming-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second read off a given broker                          |
-	| outgoing-byte-rate                             | meter      | Bytes/second written off all brokers                          |
-	| outgoing-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second written off a given broker                       |
-	| request-rate                                   | meter      | Requests/second sent to all brokers                           |
-	| request-rate-for-broker-<broker-id>            | meter      | Requests/second sent to a given broker                        |
-	| histogram request-size                         | histogram  | Distribution of the request size in bytes for all brokers     |
-	| histogram request-size-for-broker-<broker-id>  | histogram  | Distribution of the request size in bytes for a given broker  |
-	| response-rate                                  | meter      | Responses/second received from all brokers                    |
-	| response-rate-for-broker-<broker-id>           | meter      | Responses/second received from a given broker                 |
-	| histogram response-size                        | histogram  | Distribution of the response size in bytes for all brokers    |
-	| histogram response-size-for-broker-<broker-id> | histogram  | Distribution of the response size in bytes for a given broker |
-	+------------------------------------------------+------------+---------------------------------------------------------------+
+	+-------------------------------------------+------------+---------------------------------------------------------------+
+	| Name                                      | Type       | Description                                                   |
+	+-------------------------------------------+------------+---------------------------------------------------------------+
+	| incoming-byte-rate                        | meter      | Bytes/second read off all brokers                             |
+	| incoming-byte-rate-for-broker-<broker-id> | meter      | Bytes/second read off a given broker                          |
+	| outgoing-byte-rate                        | meter      | Bytes/second written off all brokers                          |
+	| outgoing-byte-rate-for-broker-<broker-id> | meter      | Bytes/second written off a given broker                       |
+	| request-rate                              | meter      | Requests/second sent to all brokers                           |
+	| request-rate-for-broker-<broker-id>       | meter      | Requests/second sent to a given broker                        |
+	| request-size                              | histogram  | Distribution of the request size in bytes for all brokers     |
+	| request-size-for-broker-<broker-id>       | histogram  | Distribution of the request size in bytes for a given broker  |
+	| response-rate                             | meter      | Responses/second received from all brokers                    |
+	| response-rate-for-broker-<broker-id>      | meter      | Responses/second received from a given broker                 |
+	| response-size                             | histogram  | Distribution of the response size in bytes for all brokers    |
+	| response-size-for-broker-<broker-id>      | histogram  | Distribution of the response size in bytes for a given broker |
+	+-------------------------------------------+------------+---------------------------------------------------------------+
 
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
 
+Producer related metrics:
+
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+	| Name                                      | Type       | Description                                                                          |
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+	| batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
+	| batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
+	| record-send-rate                          | meter      | Records/second sent to all topics                                                    |
+	| record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
+	| records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
+	| records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
+	| compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
+	| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
+	+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+
 */
 package sarama
 

+ 1 - 1
sync_group_request.go

@@ -90,7 +90,7 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment
 }
 
 func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
-	bin, err := encode(memberAssignment)
+	bin, err := encode(memberAssignment, nil)
 	if err != nil {
 		return err
 	}