Browse Source

Expose broker metrics with go-metrics

- add MetricRegistry configuration parameter that defaults to
  metrics.DefaultRegistry
- provide the following metrics:
  - incoming-byte-rate meter (global and per registered broker)
  - request-rate meter (global and per registered broker)
  - request-size histogram (global and per registered broker)
  - outgoing-byte-rate meter (global and per registered broker)
  - response-rate meter (global and per registered broker)
  - response-size histogram (global and per registered broker)
- add metrics flag to kafka-console-producer to output metrics
- add bytes read to decodeRequest
- store request and response size in MockBroker history
- add unit tests and example
- functional tests in functional_producer_test
- documentation of metrics in main package
Sebastien Launay 9 năm trước cách đây
mục cha
commit
f8d8342ace
12 tập tin đã thay đổi với 515 bổ sung50 xóa
  1. 73 0
      broker.go
  2. 88 31
      broker_test.go
  3. 7 0
      config.go
  4. 33 1
      config_test.go
  5. 77 5
      functional_producer_test.go
  6. 27 0
      metrics.go
  7. 158 0
      metrics_test.go
  8. 12 6
      mockbroker.go
  9. 8 6
      request.go
  10. 3 1
      request_test.go
  11. 24 0
      sarama.go
  12. 5 0
      tools/kafka-console-producer/kafka-console-producer.go

+ 73 - 0
broker.go

@@ -10,6 +10,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -26,6 +28,19 @@ type Broker struct {
 
 	responses chan responsePromise
 	done      chan bool
+
+	incomingByteRate       metrics.Meter
+	requestRate            metrics.Meter
+	requestSize            metrics.Histogram
+	outgoingByteRate       metrics.Meter
+	responseRate           metrics.Meter
+	responseSize           metrics.Histogram
+	brokerIncomingByteRate metrics.Meter
+	brokerRequestRate      metrics.Meter
+	brokerRequestSize      metrics.Histogram
+	brokerOutgoingByteRate metrics.Meter
+	brokerResponseRate     metrics.Meter
+	brokerResponseSize     metrics.Histogram
 }
 
 type responsePromise struct {
@@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error {
 
 		b.conf = conf
 
+		// Create or reuse the global metrics shared between brokers
+		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
+		b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
+		b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
+		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
+		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
+		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
+		// the same id (-1) and are already exposed through the global metrics above
+		if b.id >= 0 {
+			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
+			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
+			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
+			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
+			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
+			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+		}
+
 		if conf.Net.SASL.Enable {
 			b.connErr = b.sendAndReceiveSASLPlainAuth()
 			if b.connErr != nil {
@@ -338,6 +371,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, err
 	}
 
+	b.updateOutgoingCommunicationMetrics(len(buf))
+
 	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 		return nil, err
@@ -471,6 +506,8 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 
+		b.updateIncomingCommunicationMetrics(len(header) + len(buf))
+
 		response.packets <- buf
 	}
 	close(b.done)
@@ -500,6 +537,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
 	copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 
+	b.updateOutgoingCommunicationMetrics(len(authBytes))
+
 	err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 		Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
@@ -521,6 +560,40 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 		return err
 	}
 
+	b.updateIncomingCommunicationMetrics(n)
+
 	Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
 	return nil
 }
+
+func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
+	b.responseRate.Mark(1)
+	if b.brokerResponseRate != nil {
+		b.brokerResponseRate.Mark(1)
+	}
+	responseSize := int64(bytes)
+	b.incomingByteRate.Mark(responseSize)
+	if b.brokerIncomingByteRate != nil {
+		b.brokerIncomingByteRate.Mark(responseSize)
+	}
+	b.responseSize.Update(responseSize)
+	if b.brokerResponseSize != nil {
+		b.brokerResponseSize.Update(responseSize)
+	}
+}
+
+func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
+	b.requestRate.Mark(1)
+	if b.brokerRequestRate != nil {
+		b.brokerRequestRate.Mark(1)
+	}
+	requestSize := int64(bytes)
+	b.outgoingByteRate.Mark(requestSize)
+	if b.brokerOutgoingByteRate != nil {
+		b.brokerOutgoingByteRate.Mark(requestSize)
+	}
+	b.requestSize.Update(requestSize)
+	if b.brokerRequestSize != nil {
+		b.brokerRequestSize.Update(requestSize)
+	}
+}

+ 88 - 31
broker_test.go

@@ -3,6 +3,9 @@ package sarama
 import (
 	"fmt"
 	"testing"
+	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 func ExampleBroker() {
@@ -52,36 +55,40 @@ func TestBrokerAccessors(t *testing.T) {
 }
 
 func TestSimpleBrokerCommunication(t *testing.T) {
-	mb := NewMockBroker(t, 0)
-	defer mb.Close()
-
-	broker := NewBroker(mb.Addr())
-	conf := NewConfig()
-	conf.Version = V0_10_0_0
-	err := broker.Open(conf)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	for _, tt := range brokerTestTable {
+		t.Log("Testing broker communication for", tt.name)
+		mb := NewMockBroker(t, 0)
 		mb.Returns(&mockEncoder{tt.response})
-	}
-	for _, tt := range brokerTestTable {
+		defer mb.Close()
+		broker := NewBroker(mb.Addr())
+		// Set the broker id in order to validate local broker metrics
+		broker.id = 0
+		conf := NewConfig()
+		conf.Version = V0_10_0_0
+		// Use a new registry every time to prevent side effect caused by the global one
+		conf.MetricRegistry = metrics.NewRegistry()
+		err := broker.Open(conf)
+		if err != nil {
+			t.Fatal(err)
+		}
 		tt.runner(t, broker)
+		validateBrokerMetrics(t, broker, mb)
+		err = broker.Close()
+		if err != nil {
+			t.Error(err)
+		}
 	}
 
-	err = broker.Close()
-	if err != nil {
-		t.Error(err)
-	}
 }
 
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {
+	name     string
 	response []byte
 	runner   func(*testing.T, *Broker)
 }{
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"MetadataRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := MetadataRequest{}
 			response, err := broker.GetMetadata(&request)
@@ -93,7 +100,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
+	{"ConsumerMetadataRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ConsumerMetadataRequest{}
 			response, err := broker.GetConsumerMetadata(&request)
@@ -105,7 +113,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{},
+	{"ProduceRequest (NoResponse)",
+		[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = NoResponse
@@ -116,9 +125,13 @@ var brokerTestTable = []struct {
 			if response != nil {
 				t.Error("Produce request with NoResponse got a response!")
 			}
+			// Wait for the request to be processed by the broker so
+			// we do not get 0 bytes written from the broker when validating metrics
+			time.Sleep(100 * time.Millisecond)
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"ProduceRequest (WaitForLocal)",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = WaitForLocal
@@ -131,7 +144,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"FetchRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := FetchRequest{}
 			response, err := broker.Fetch(&request)
@@ -143,7 +157,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetFetchRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
 			response, err := broker.FetchOffset(&request)
@@ -155,7 +170,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetCommitRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
 			response, err := broker.CommitOffset(&request)
@@ -167,7 +183,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetRequest{}
 			response, err := broker.GetAvailableOffsets(&request)
@@ -179,7 +196,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"JoinGroupRequest",
+		[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := JoinGroupRequest{}
 			response, err := broker.JoinGroup(&request)
@@ -191,7 +209,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"SyncGroupRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := SyncGroupRequest{}
 			response, err := broker.SyncGroup(&request)
@@ -203,7 +222,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00},
+	{"LeaveGroupRequest",
+		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := LeaveGroupRequest{}
 			response, err := broker.LeaveGroup(&request)
@@ -215,7 +235,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00},
+	{"HeartbeatRequest",
+		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := HeartbeatRequest{}
 			response, err := broker.Heartbeat(&request)
@@ -227,7 +248,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"ListGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ListGroupsRequest{}
 			response, err := broker.ListGroups(&request)
@@ -239,7 +261,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"DescribeGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := DescribeGroupsRequest{}
 			response, err := broker.DescribeGroups(&request)
@@ -251,3 +274,37 @@ var brokerTestTable = []struct {
 			}
 		}},
 }
+
+func validateBrokerMetrics(t *testing.T, broker *Broker, mockBroker *MockBroker) {
+	metricValidators := newMetricValidators()
+	mockBrokerBytesRead := 0
+	mockBrokerBytesWritten := 0
+
+	// Compute socket bytes
+	for _, requestResponse := range mockBroker.History() {
+		mockBrokerBytesRead += requestResponse.RequestSize
+		mockBrokerBytesWritten += requestResponse.ResponseSize
+	}
+
+	// Check that the number of bytes sent corresponds to what the mock broker received
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
+	if mockBrokerBytesWritten == 0 {
+		// This a ProduceRequest with NoResponse
+		metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
+		metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
+		metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
+	} else {
+		metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
+		metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
+		metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
+	}
+
+	// Check that the number of bytes received corresponds to what the mock broker sent
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
+	metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
+	metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
+
+	// Run the validators
+	metricValidators.run(t, broker.conf.MetricRegistry)
+}

+ 7 - 0
config.go

@@ -4,6 +4,8 @@ import (
 	"crypto/tls"
 	"regexp"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 const defaultClientID = "sarama"
@@ -233,6 +235,10 @@ type Config struct {
 	// latest features. Setting it to a version greater than you are actually
 	// running may lead to random breakage.
 	Version KafkaVersion
+	// The registry to define metrics into.
+	// Defaults to metrics.DefaultRegistry.
+	// See Examples on how to use the metrics registry
+	MetricRegistry metrics.Registry
 }
 
 // NewConfig returns a new configuration instance with sane defaults.
@@ -268,6 +274,7 @@ func NewConfig() *Config {
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
 	c.Version = minVersion
+	c.MetricRegistry = metrics.DefaultRegistry
 
 	return c
 }

+ 33 - 1
config_test.go

@@ -1,12 +1,20 @@
 package sarama
 
-import "testing"
+import (
+	"os"
+	"testing"
+
+	"github.com/rcrowley/go-metrics"
+)
 
 func TestDefaultConfigValidates(t *testing.T) {
 	config := NewConfig()
 	if err := config.Validate(); err != nil {
 		t.Error(err)
 	}
+	if config.MetricRegistry != metrics.DefaultRegistry {
+		t.Error("Expected metrics.DefaultRegistry, got ", config.MetricRegistry)
+	}
 }
 
 func TestInvalidClientIDConfigValidates(t *testing.T) {
@@ -24,3 +32,27 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
 		t.Error("Expected invalid ClientID, got ", err)
 	}
 }
+
+// This example shows how to integrate with an existing registry as well as publishing metrics
+// on the standard output
+func ExampleConfig_metrics() {
+	// Our application registry
+	appMetricRegistry := metrics.NewRegistry()
+	appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
+	appGauge.Update(1)
+
+	config := NewConfig()
+	// Use a prefix registry instead of the default global one
+	config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
+
+	// Simulate a metric created by sarama without starting a broker
+	saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
+	saramaGauge.Update(2)
+
+	metrics.WriteOnce(appMetricRegistry, os.Stdout)
+	// Output:
+	// gauge m1
+	//   value:               1
+	// gauge sarama.m2
+	//   value:               2
+}

+ 77 - 5
functional_producer_test.go

@@ -5,6 +5,8 @@ import (
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 const TestBatchSize = 1000
@@ -96,6 +98,9 @@ func testProducingMessages(t *testing.T, config *Config) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
+	// Use a dedicated registry to prevent side effect caused by the global one
+	config.MetricRegistry = metrics.NewRegistry()
+
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 
@@ -104,11 +109,8 @@ func testProducingMessages(t *testing.T, config *Config) {
 		t.Fatal(err)
 	}
 
-	master, err := NewConsumerFromClient(client)
-	if err != nil {
-		t.Fatal(err)
-	}
-	consumer, err := master.ConsumePartition("test.1", 0, OffsetNewest)
+	// Keep in mind the current offset
+	initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -140,6 +142,18 @@ func testProducingMessages(t *testing.T, config *Config) {
 	}
 	safeClose(t, producer)
 
+	// Validate producer metrics before using the consumer minus the offset request
+	validateMetrics(t, client)
+
+	master, err := NewConsumerFromClient(client)
+	if err != nil {
+		t.Fatal(err)
+	}
+	consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	for i := 1; i <= TestBatchSize; i++ {
 		select {
 		case <-time.After(10 * time.Second):
@@ -159,6 +173,64 @@ func testProducingMessages(t *testing.T, config *Config) {
 	safeClose(t, client)
 }
 
+func validateMetrics(t *testing.T, client Client) {
+	// Get the broker used by test1 topic
+	var broker *Broker
+	if partitions, err := client.Partitions("test.1"); err != nil {
+		t.Error(err)
+	} else {
+		for _, partition := range partitions {
+			if b, err := client.Leader("test.1", partition); err != nil {
+				t.Error(err)
+			} else {
+				if broker != nil && b != broker {
+					t.Fatal("Expected only one broker, got at least 2")
+				}
+				broker = b
+			}
+		}
+	}
+
+	metricValidators := newMetricValidators()
+	noResponse := client.Config().Producer.RequiredAcks == NoResponse
+
+	// We read at least 1 byte from the broker
+	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
+	// in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
+	metricValidators.register(minCountMeterValidator("request-rate", 3))
+	metricValidators.register(minCountHistogramValidator("request-size", 3))
+	metricValidators.register(minValHistogramValidator("request-size", 1))
+	// and at least 2 requests to the registered broker (offset + produces)
+	metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
+	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
+	metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
+
+	// We receive at least 1 byte from the broker
+	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
+	if noResponse {
+		// in exactly 2 global responses (metadata + offset)
+		metricValidators.register(countMeterValidator("response-rate", 2))
+		metricValidators.register(minCountHistogramValidator("response-size", 2))
+		metricValidators.register(minValHistogramValidator("response-size", 1))
+		// and exactly 1 offset response for the registered broker
+		metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
+		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
+		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
+	} else {
+		// in at least 3 global responses (metadata + offset + produces)
+		metricValidators.register(minCountMeterValidator("response-rate", 3))
+		metricValidators.register(minCountHistogramValidator("response-size", 3))
+		metricValidators.register(minValHistogramValidator("response-size", 1))
+		// and at least 2 for the registered broker
+		metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
+		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
+		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
+	}
+
+	// Run the validators
+	metricValidators.run(t, client.Config().MetricRegistry)
+}
+
 // Benchmarks
 
 func BenchmarkProducerSmall(b *testing.B) {

+ 27 - 0
metrics.go

@@ -0,0 +1,27 @@
+package sarama
+
+import (
+	"fmt"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
+	return r.GetOrRegister(name, func() metrics.Histogram {
+		return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015))
+	}).(metrics.Histogram)
+}
+
+func getMetricNameForBroker(name string, broker *Broker) string {
+	// Use broker id like the Java client as it does not contain '.' or ':' characters that
+	// can be interpreted as special character by monitoring tool (e.g. Graphite)
+	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
+}
+
+func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
+	return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
+}
+
+func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
+	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
+}

+ 158 - 0
metrics_test.go

@@ -0,0 +1,158 @@
+package sarama
+
+import (
+	"testing"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+func TestGetOrRegisterHistogram(t *testing.T) {
+	metricRegistry := metrics.NewRegistry()
+	histogram := getOrRegisterHistogram("name", metricRegistry)
+
+	if histogram == nil {
+		t.Error("Unexpected nil histogram")
+	}
+
+	// Fetch the metric
+	foundHistogram := metricRegistry.Get("name")
+
+	if foundHistogram != histogram {
+		t.Error("Unexpected different histogram", foundHistogram, histogram)
+	}
+
+	// Try to register the metric again
+	sameHistogram := getOrRegisterHistogram("name", metricRegistry)
+
+	if sameHistogram != histogram {
+		t.Error("Unexpected different histogram", sameHistogram, histogram)
+	}
+}
+
+func TestGetMetricNameForBroker(t *testing.T) {
+	metricName := getMetricNameForBroker("name", &Broker{id: 1})
+
+	if metricName != "name-for-broker-1" {
+		t.Error("Unexpected metric name", metricName)
+	}
+}
+
+// Common type and functions for metric validation
+type metricValidator struct {
+	name      string
+	validator func(*testing.T, interface{})
+}
+
+type metricValidators []*metricValidator
+
+func newMetricValidators() metricValidators {
+	return make([]*metricValidator, 0, 32)
+}
+
+func (m *metricValidators) register(validator *metricValidator) {
+	*m = append(*m, validator)
+}
+
+func (m *metricValidators) registerForBroker(broker *Broker, validator *metricValidator) {
+	m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator})
+}
+
+func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) {
+	m.register(validator)
+	m.registerForBroker(broker, validator)
+}
+
+func (m metricValidators) run(t *testing.T, r metrics.Registry) {
+	for _, metricValidator := range m {
+		metric := r.Get(metricValidator.name)
+		if metric == nil {
+			t.Error("No metric named", metricValidator.name)
+		} else {
+			metricValidator.validator(t, metric)
+		}
+	}
+}
+
+func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if meter, ok := metric.(metrics.Meter); !ok {
+				t.Errorf("Expected meter metric for '%s', got %T", name, metric)
+			} else {
+				extraValidator(t, meter)
+			}
+		},
+	}
+}
+
+func countMeterValidator(name string, expectedCount int) *metricValidator {
+	return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
+		count := meter.Count()
+		if count != int64(expectedCount) {
+			t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count)
+		}
+	})
+}
+
+func minCountMeterValidator(name string, minCount int) *metricValidator {
+	return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
+		count := meter.Count()
+		if count < int64(minCount) {
+			t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count)
+		}
+	})
+}
+
+func histogramValidator(name string, extraValidator func(*testing.T, metrics.Histogram)) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if histogram, ok := metric.(metrics.Histogram); !ok {
+				t.Errorf("Expected histogram metric for '%s', got %T", name, metric)
+			} else {
+				extraValidator(t, histogram)
+			}
+		},
+	}
+}
+
+func countHistogramValidator(name string, expectedCount int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		count := histogram.Count()
+		if count != int64(expectedCount) {
+			t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count)
+		}
+	})
+}
+
+func minCountHistogramValidator(name string, minCount int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		count := histogram.Count()
+		if count < int64(minCount) {
+			t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count)
+		}
+	})
+}
+
+func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		min := int(histogram.Min())
+		if min != expectedMin {
+			t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min)
+		}
+		max := int(histogram.Max())
+		if max != expectedMax {
+			t.Errorf("Expected histogram metric '%s' max = %d, got %d", name, expectedMax, max)
+		}
+	})
+}
+
+func minValHistogramValidator(name string, minMin int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		min := int(histogram.Min())
+		if min < minMin {
+			t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min)
+		}
+	})
+}

+ 12 - 6
mockbroker.go

@@ -54,14 +54,16 @@ type MockBroker struct {
 	t            TestReporter
 	latency      time.Duration
 	handler      requestHandlerFunc
-	history      []RequestResponse
+	history      []*RequestResponse
 	lock         sync.Mutex
 }
 
 // RequestResponse represents a Request/Response pair processed by MockBroker.
 type RequestResponse struct {
-	Request  protocolBody
-	Response encoder
+	Request      protocolBody
+	Response     encoder
+	RequestSize  int
+	ResponseSize int
 }
 
 // SetLatency makes broker pause for the specified period every time before
@@ -97,7 +99,9 @@ func (b *MockBroker) BrokerID() int32 {
 func (b *MockBroker) History() []RequestResponse {
 	b.lock.Lock()
 	history := make([]RequestResponse, len(b.history))
-	copy(history, b.history)
+	for i, rr := range b.history {
+		history[i] = *rr
+	}
 	b.lock.Unlock()
 	return history
 }
@@ -180,7 +184,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 
 	resHeader := make([]byte, 8)
 	for {
-		req, err := decodeRequest(conn)
+		req, bytesRead, err := decodeRequest(conn)
 		if err != nil {
 			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
 			b.serverError(err)
@@ -193,7 +197,8 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 
 		b.lock.Lock()
 		res := b.handler(req)
-		b.history = append(b.history, RequestResponse{req.body, res})
+		requestResponse := RequestResponse{req.body, res, bytesRead, 0}
+		b.history = append(b.history, &requestResponse)
 		b.lock.Unlock()
 
 		if res == nil {
@@ -221,6 +226,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 			b.serverError(err)
 			break
 		}
+		requestResponse.ResponseSize = len(resHeader) + len(encodedRes)
 	}
 	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }

+ 8 - 6
request.go

@@ -57,27 +57,29 @@ func (r *request) decode(pd packetDecoder) (err error) {
 	return r.body.decode(pd, version)
 }
 
-func decodeRequest(r io.Reader) (req *request, err error) {
+func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
 	lengthBytes := make([]byte, 4)
 	if _, err := io.ReadFull(r, lengthBytes); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
+	bytesRead += len(lengthBytes)
 
 	length := int32(binary.BigEndian.Uint32(lengthBytes))
 	if length <= 4 || length > MaxRequestSize {
-		return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
+		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
 	}
 
 	encodedReq := make([]byte, length)
 	if _, err := io.ReadFull(r, encodedReq); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
+	bytesRead += len(encodedReq)
 
 	req = &request{}
 	if err := decode(encodedReq, req); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
-	return req, nil
+	return req, bytesRead, nil
 }
 
 func allocateBody(key, version int16) protocolBody {

+ 3 - 1
request_test.go

@@ -58,13 +58,15 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 		t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected)
 	}
 	// Decoder request
-	decoded, err := decodeRequest(bytes.NewReader(packet))
+	decoded, n, err := decodeRequest(bytes.NewReader(packet))
 	if err != nil {
 		t.Error("Failed to decode request", err)
 	} else if decoded.correlationID != 123 || decoded.clientID != "foo" {
 		t.Errorf("Decoded header is not valid: %v", decoded)
 	} else if !reflect.DeepEqual(rb, decoded.body) {
 		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body)
+	} else if n != len(packet) {
+		t.Errorf("Decoded request bytes: %d does not match the encoded one: %d\n", n, len(packet))
 	}
 }
 

+ 24 - 0
sarama.go

@@ -20,6 +20,30 @@ and message sent on the wire; the Client provides higher-level metadata manageme
 the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up
 exactly with the protocol fields documented by Kafka at
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
+
+Metrics are exposed through https://github.com/rcrowley/go-metrics library.
+
+Broker related metrics:
+
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+	| Name                                           | Type       | Description                                                   |
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+	| incoming-byte-rate                             | meter      | Bytes/second read off all brokers                             |
+	| incoming-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second read off a given broker                          |
+	| outgoing-byte-rate                             | meter      | Bytes/second written off all brokers                          |
+	| outgoing-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second written off a given broker                       |
+	| request-rate                                   | meter      | Requests/second sent to all brokers                           |
+	| request-rate-for-broker-<broker-id>            | meter      | Requests/second sent to a given broker                        |
+	| histogram request-size                         | histogram  | Distribution of the request size in bytes for all brokers     |
+	| histogram request-size-for-broker-<broker-id>  | histogram  | Distribution of the request size in bytes for a given broker  |
+	| response-rate                                  | meter      | Responses/second received from all brokers                    |
+	| response-rate-for-broker-<broker-id>           | meter      | Responses/second received from a given broker                 |
+	| histogram response-size                        | histogram  | Distribution of the response size in bytes for all brokers    |
+	| histogram response-size-for-broker-<broker-id> | histogram  | Distribution of the response size in bytes for a given broker |
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+
+Note that we do not gather specific metrics for seed broker but they are part of the "all brokers" metrics.
+
 */
 package sarama
 

+ 5 - 0
tools/kafka-console-producer/kafka-console-producer.go

@@ -9,6 +9,7 @@ import (
 	"strings"
 
 	"github.com/Shopify/sarama"
+	"github.com/rcrowley/go-metrics"
 )
 
 var (
@@ -19,6 +20,7 @@ var (
 	partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
 	partition   = flag.Int("partition", -1, "The partition to produce to.")
 	verbose     = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
+	showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
 	silent      = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -96,6 +98,9 @@ func main() {
 	} else if !*silent {
 		fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
 	}
+	if *showMetrics {
+		metrics.WriteOnce(config.MetricRegistry, os.Stderr)
+	}
 }
 
 func printErrorAndExit(code int, format string, values ...interface{}) {