Browse Source

Merge pull request #701 from slaunay/enhancement/broker-metrics

Expose broker metrics with go-metrics
Evan Huus 9 years ago
parent
commit
e03d23b5a4
12 changed files with 563 additions and 49 deletions
  1. 76 4
      broker.go
  2. 97 31
      broker_test.go
  3. 9 0
      config.go
  4. 33 1
      config_test.go
  5. 89 5
      functional_producer_test.go
  6. 36 0
      metrics.go
  7. 158 0
      metrics_test.go
  8. 25 1
      mockbroker.go
  9. 8 6
      request.go
  10. 3 1
      request_test.go
  11. 24 0
      sarama.go
  12. 5 0
      tools/kafka-console-producer/kafka-console-producer.go

+ 76 - 4
broker.go

@@ -10,6 +10,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -26,6 +28,19 @@ type Broker struct {
 
 	responses chan responsePromise
 	done      chan bool
+
+	incomingByteRate       metrics.Meter
+	requestRate            metrics.Meter
+	requestSize            metrics.Histogram
+	outgoingByteRate       metrics.Meter
+	responseRate           metrics.Meter
+	responseSize           metrics.Histogram
+	brokerIncomingByteRate metrics.Meter
+	brokerRequestRate      metrics.Meter
+	brokerRequestSize      metrics.Histogram
+	brokerOutgoingByteRate metrics.Meter
+	brokerResponseRate     metrics.Meter
+	brokerResponseSize     metrics.Histogram
 }
 
 type responsePromise struct {
@@ -84,6 +99,24 @@ func (b *Broker) Open(conf *Config) error {
 
 		b.conf = conf
 
+		// Create or reuse the global metrics shared between brokers
+		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
+		b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
+		b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
+		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
+		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
+		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
+		// the same id (-1) and are already exposed through the global metrics above
+		if b.id >= 0 {
+			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
+			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
+			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
+			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
+			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
+			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+		}
+
 		if conf.Net.SASL.Enable {
 			b.connErr = b.sendAndReceiveSASLPlainAuth()
 			if b.connErr != nil {
@@ -343,7 +376,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, err
 	}
 
-	_, err = b.conn.Write(buf)
+	bytes, err := b.conn.Write(buf)
+	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 		return nil, err
 	}
@@ -441,8 +475,9 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 
-		_, err = io.ReadFull(b.conn, header)
+		bytesReadHeader, err := io.ReadFull(b.conn, header)
 		if err != nil {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			dead = err
 			response.errors <- err
 			continue
@@ -451,11 +486,13 @@ func (b *Broker) responseReceiver() {
 		decodedHeader := responseHeader{}
 		err = decode(header, &decodedHeader)
 		if err != nil {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			dead = err
 			response.errors <- err
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
+			b.updateIncomingCommunicationMetrics(bytesReadHeader)
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
@@ -464,7 +501,8 @@ func (b *Broker) responseReceiver() {
 		}
 
 		buf := make([]byte, decodedHeader.length-4)
-		_, err = io.ReadFull(b.conn, buf)
+		bytesReadBody, err := io.ReadFull(b.conn, buf)
+		b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
 		if err != nil {
 			dead = err
 			response.errors <- err
@@ -506,7 +544,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 		return err
 	}
 
-	_, err = b.conn.Write(authBytes)
+	bytesWritten, err := b.conn.Write(authBytes)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
@@ -514,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 
 	header := make([]byte, 4)
 	n, err := io.ReadFull(b.conn, header)
+	b.updateIncomingCommunicationMetrics(n)
 	// If the credentials are valid, we would get a 4 byte response filled with null characters.
 	// Otherwise, the broker closes the connection and we get an EOF
 	if err != nil {
@@ -524,3 +564,35 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
 	Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
 	return nil
 }
+
+func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
+	b.responseRate.Mark(1)
+	if b.brokerResponseRate != nil {
+		b.brokerResponseRate.Mark(1)
+	}
+	responseSize := int64(bytes)
+	b.incomingByteRate.Mark(responseSize)
+	if b.brokerIncomingByteRate != nil {
+		b.brokerIncomingByteRate.Mark(responseSize)
+	}
+	b.responseSize.Update(responseSize)
+	if b.brokerResponseSize != nil {
+		b.brokerResponseSize.Update(responseSize)
+	}
+}
+
+func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
+	b.requestRate.Mark(1)
+	if b.brokerRequestRate != nil {
+		b.brokerRequestRate.Mark(1)
+	}
+	requestSize := int64(bytes)
+	b.outgoingByteRate.Mark(requestSize)
+	if b.brokerOutgoingByteRate != nil {
+		b.brokerOutgoingByteRate.Mark(requestSize)
+	}
+	b.requestSize.Update(requestSize)
+	if b.brokerRequestSize != nil {
+		b.brokerRequestSize.Update(requestSize)
+	}
+}

+ 97 - 31
broker_test.go

@@ -3,6 +3,9 @@ package sarama
 import (
 	"fmt"
 	"testing"
+	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 func ExampleBroker() {
@@ -34,6 +37,11 @@ func (m mockEncoder) encode(pe packetEncoder) error {
 	return pe.putRawBytes(m.bytes)
 }
 
+type brokerMetrics struct {
+	bytesRead    int
+	bytesWritten int
+}
+
 func TestBrokerAccessors(t *testing.T) {
 	broker := NewBroker("abc:123")
 
@@ -52,36 +60,53 @@ func TestBrokerAccessors(t *testing.T) {
 }
 
 func TestSimpleBrokerCommunication(t *testing.T) {
-	mb := NewMockBroker(t, 0)
-	defer mb.Close()
-
-	broker := NewBroker(mb.Addr())
-	conf := NewConfig()
-	conf.Version = V0_10_0_0
-	err := broker.Open(conf)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	for _, tt := range brokerTestTable {
+		Logger.Printf("Testing broker communication for %s", tt.name)
+		mb := NewMockBroker(t, 0)
 		mb.Returns(&mockEncoder{tt.response})
-	}
-	for _, tt := range brokerTestTable {
+		pendingNotify := make(chan brokerMetrics)
+		// Register a callback to be notified about successful requests
+		mb.SetNotifier(func(bytesRead, bytesWritten int) {
+			pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
+		})
+		broker := NewBroker(mb.Addr())
+		// Set the broker id in order to validate local broker metrics
+		broker.id = 0
+		conf := NewConfig()
+		conf.Version = V0_10_0_0
+		// Use a new registry every time to prevent side effect caused by the global one
+		conf.MetricRegistry = metrics.NewRegistry()
+		err := broker.Open(conf)
+		if err != nil {
+			t.Fatal(err)
+		}
 		tt.runner(t, broker)
+		err = broker.Close()
+		if err != nil {
+			t.Error(err)
+		}
+		// Wait up to 500 ms for the remote broker to process the request and
+		// notify us about the metrics
+		timeout := 500 * time.Millisecond
+		select {
+		case mockBrokerMetrics := <-pendingNotify:
+			validateBrokerMetrics(t, broker, mockBrokerMetrics)
+		case <-time.After(timeout):
+			t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
+		}
+		mb.Close()
 	}
 
-	err = broker.Close()
-	if err != nil {
-		t.Error(err)
-	}
 }
 
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {
+	name     string
 	response []byte
 	runner   func(*testing.T, *Broker)
 }{
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"MetadataRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := MetadataRequest{}
 			response, err := broker.GetMetadata(&request)
@@ -93,7 +118,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
+	{"ConsumerMetadataRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ConsumerMetadataRequest{}
 			response, err := broker.GetConsumerMetadata(&request)
@@ -105,7 +131,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{},
+	{"ProduceRequest (NoResponse)",
+		[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = NoResponse
@@ -118,7 +145,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"ProduceRequest (WaitForLocal)",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = WaitForLocal
@@ -131,7 +159,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"FetchRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := FetchRequest{}
 			response, err := broker.Fetch(&request)
@@ -143,7 +172,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetFetchRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
 			response, err := broker.FetchOffset(&request)
@@ -155,7 +185,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetCommitRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
 			response, err := broker.CommitOffset(&request)
@@ -167,7 +198,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"OffsetRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetRequest{}
 			response, err := broker.GetAvailableOffsets(&request)
@@ -179,7 +211,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"JoinGroupRequest",
+		[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := JoinGroupRequest{}
 			response, err := broker.JoinGroup(&request)
@@ -191,7 +224,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"SyncGroupRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := SyncGroupRequest{}
 			response, err := broker.SyncGroup(&request)
@@ -203,7 +237,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00},
+	{"LeaveGroupRequest",
+		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := LeaveGroupRequest{}
 			response, err := broker.LeaveGroup(&request)
@@ -215,7 +250,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00},
+	{"HeartbeatRequest",
+		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := HeartbeatRequest{}
 			response, err := broker.Heartbeat(&request)
@@ -227,7 +263,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{"ListGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ListGroupsRequest{}
 			response, err := broker.ListGroups(&request)
@@ -239,7 +276,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00},
+	{"DescribeGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := DescribeGroupsRequest{}
 			response, err := broker.DescribeGroups(&request)
@@ -251,3 +289,31 @@ var brokerTestTable = []struct {
 			}
 		}},
 }
+
+func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
+	metricValidators := newMetricValidators()
+	mockBrokerBytesRead := mockBrokerMetrics.bytesRead
+	mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
+
+	// Check that the number of bytes sent corresponds to what the mock broker received
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
+	if mockBrokerBytesWritten == 0 {
+		// This a ProduceRequest with NoResponse
+		metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
+		metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
+		metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
+	} else {
+		metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
+		metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
+		metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
+	}
+
+	// Check that the number of bytes received corresponds to what the mock broker sent
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
+	metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
+	metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
+	metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
+
+	// Run the validators
+	metricValidators.run(t, broker.conf.MetricRegistry)
+}

+ 9 - 0
config.go

@@ -4,6 +4,8 @@ import (
 	"crypto/tls"
 	"regexp"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 const defaultClientID = "sarama"
@@ -233,6 +235,12 @@ type Config struct {
 	// latest features. Setting it to a version greater than you are actually
 	// running may lead to random breakage.
 	Version KafkaVersion
+	// The registry to define metrics into.
+	// Defaults to metrics.DefaultRegistry.
+	// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
+	// prior to starting Sarama.
+	// See Examples on how to use the metrics registry
+	MetricRegistry metrics.Registry
 }
 
 // NewConfig returns a new configuration instance with sane defaults.
@@ -268,6 +276,7 @@ func NewConfig() *Config {
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
 	c.Version = minVersion
+	c.MetricRegistry = metrics.DefaultRegistry
 
 	return c
 }

+ 33 - 1
config_test.go

@@ -1,12 +1,20 @@
 package sarama
 
-import "testing"
+import (
+	"os"
+	"testing"
+
+	"github.com/rcrowley/go-metrics"
+)
 
 func TestDefaultConfigValidates(t *testing.T) {
 	config := NewConfig()
 	if err := config.Validate(); err != nil {
 		t.Error(err)
 	}
+	if config.MetricRegistry != metrics.DefaultRegistry {
+		t.Error("Expected metrics.DefaultRegistry, got ", config.MetricRegistry)
+	}
 }
 
 func TestInvalidClientIDConfigValidates(t *testing.T) {
@@ -24,3 +32,27 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
 		t.Error("Expected invalid ClientID, got ", err)
 	}
 }
+
+// This example shows how to integrate with an existing registry as well as publishing metrics
+// on the standard output
+func ExampleConfig_metrics() {
+	// Our application registry
+	appMetricRegistry := metrics.NewRegistry()
+	appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
+	appGauge.Update(1)
+
+	config := NewConfig()
+	// Use a prefix registry instead of the default global one
+	config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
+
+	// Simulate a metric created by sarama without starting a broker
+	saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
+	saramaGauge.Update(2)
+
+	metrics.WriteOnce(appMetricRegistry, os.Stdout)
+	// Output:
+	// gauge m1
+	//   value:               1
+	// gauge sarama.m2
+	//   value:               2
+}

+ 89 - 5
functional_producer_test.go

@@ -2,9 +2,12 @@ package sarama
 
 import (
 	"fmt"
+	"os"
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 const TestBatchSize = 1000
@@ -96,6 +99,9 @@ func testProducingMessages(t *testing.T, config *Config) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
+	// Use a dedicated registry to prevent side effect caused by the global one
+	config.MetricRegistry = metrics.NewRegistry()
+
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 
@@ -104,11 +110,8 @@ func testProducingMessages(t *testing.T, config *Config) {
 		t.Fatal(err)
 	}
 
-	master, err := NewConsumerFromClient(client)
-	if err != nil {
-		t.Fatal(err)
-	}
-	consumer, err := master.ConsumePartition("test.1", 0, OffsetNewest)
+	// Keep in mind the current offset
+	initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -140,6 +143,18 @@ func testProducingMessages(t *testing.T, config *Config) {
 	}
 	safeClose(t, producer)
 
+	// Validate producer metrics before using the consumer minus the offset request
+	validateMetrics(t, client)
+
+	master, err := NewConsumerFromClient(client)
+	if err != nil {
+		t.Fatal(err)
+	}
+	consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	for i := 1; i <= TestBatchSize; i++ {
 		select {
 		case <-time.After(10 * time.Second):
@@ -159,6 +174,64 @@ func testProducingMessages(t *testing.T, config *Config) {
 	safeClose(t, client)
 }
 
+func validateMetrics(t *testing.T, client Client) {
+	// Get the broker used by test1 topic
+	var broker *Broker
+	if partitions, err := client.Partitions("test.1"); err != nil {
+		t.Error(err)
+	} else {
+		for _, partition := range partitions {
+			if b, err := client.Leader("test.1", partition); err != nil {
+				t.Error(err)
+			} else {
+				if broker != nil && b != broker {
+					t.Fatal("Expected only one broker, got at least 2")
+				}
+				broker = b
+			}
+		}
+	}
+
+	metricValidators := newMetricValidators()
+	noResponse := client.Config().Producer.RequiredAcks == NoResponse
+
+	// We read at least 1 byte from the broker
+	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
+	// in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
+	metricValidators.register(minCountMeterValidator("request-rate", 3))
+	metricValidators.register(minCountHistogramValidator("request-size", 3))
+	metricValidators.register(minValHistogramValidator("request-size", 1))
+	// and at least 2 requests to the registered broker (offset + produces)
+	metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
+	metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
+	metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
+
+	// We receive at least 1 byte from the broker
+	metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
+	if noResponse {
+		// in exactly 2 global responses (metadata + offset)
+		metricValidators.register(countMeterValidator("response-rate", 2))
+		metricValidators.register(minCountHistogramValidator("response-size", 2))
+		metricValidators.register(minValHistogramValidator("response-size", 1))
+		// and exactly 1 offset response for the registered broker
+		metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
+		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
+		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
+	} else {
+		// in at least 3 global responses (metadata + offset + produces)
+		metricValidators.register(minCountMeterValidator("response-rate", 3))
+		metricValidators.register(minCountHistogramValidator("response-size", 3))
+		metricValidators.register(minValHistogramValidator("response-size", 1))
+		// and at least 2 for the registered broker
+		metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
+		metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
+		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
+	}
+
+	// Run the validators
+	metricValidators.run(t, client.Config().MetricRegistry)
+}
+
 // Benchmarks
 
 func BenchmarkProducerSmall(b *testing.B) {
@@ -183,6 +256,17 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder)
 	setupFunctionalTest(b)
 	defer teardownFunctionalTest(b)
 
+	metricsDisable := os.Getenv("METRICS_DISABLE")
+	if metricsDisable != "" {
+		previousUseNilMetrics := metrics.UseNilMetrics
+		Logger.Println("Disabling metrics using no-op implementation")
+		metrics.UseNilMetrics = true
+		// Restore previous setting
+		defer func() {
+			metrics.UseNilMetrics = previousUseNilMetrics
+		}()
+	}
+
 	producer, err := NewAsyncProducer(kafkaBrokers, conf)
 	if err != nil {
 		b.Fatal(err)

+ 36 - 0
metrics.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"fmt"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library:
+// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution,
+// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements.
+// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38
+const (
+	metricsReservoirSize = 1028
+	metricsAlphaFactor   = 0.015
+)
+
+func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
+	return r.GetOrRegister(name, func() metrics.Histogram {
+		return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor))
+	}).(metrics.Histogram)
+}
+
+func getMetricNameForBroker(name string, broker *Broker) string {
+	// Use broker id like the Java client as it does not contain '.' or ':' characters that
+	// can be interpreted as special character by monitoring tool (e.g. Graphite)
+	return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
+}
+
+func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
+	return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
+}
+
+func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
+	return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
+}

+ 158 - 0
metrics_test.go

@@ -0,0 +1,158 @@
+package sarama
+
+import (
+	"testing"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+func TestGetOrRegisterHistogram(t *testing.T) {
+	metricRegistry := metrics.NewRegistry()
+	histogram := getOrRegisterHistogram("name", metricRegistry)
+
+	if histogram == nil {
+		t.Error("Unexpected nil histogram")
+	}
+
+	// Fetch the metric
+	foundHistogram := metricRegistry.Get("name")
+
+	if foundHistogram != histogram {
+		t.Error("Unexpected different histogram", foundHistogram, histogram)
+	}
+
+	// Try to register the metric again
+	sameHistogram := getOrRegisterHistogram("name", metricRegistry)
+
+	if sameHistogram != histogram {
+		t.Error("Unexpected different histogram", sameHistogram, histogram)
+	}
+}
+
+func TestGetMetricNameForBroker(t *testing.T) {
+	metricName := getMetricNameForBroker("name", &Broker{id: 1})
+
+	if metricName != "name-for-broker-1" {
+		t.Error("Unexpected metric name", metricName)
+	}
+}
+
+// Common type and functions for metric validation
+type metricValidator struct {
+	name      string
+	validator func(*testing.T, interface{})
+}
+
+type metricValidators []*metricValidator
+
+func newMetricValidators() metricValidators {
+	return make([]*metricValidator, 0, 32)
+}
+
+func (m *metricValidators) register(validator *metricValidator) {
+	*m = append(*m, validator)
+}
+
+func (m *metricValidators) registerForBroker(broker *Broker, validator *metricValidator) {
+	m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator})
+}
+
+func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) {
+	m.register(validator)
+	m.registerForBroker(broker, validator)
+}
+
+func (m metricValidators) run(t *testing.T, r metrics.Registry) {
+	for _, metricValidator := range m {
+		metric := r.Get(metricValidator.name)
+		if metric == nil {
+			t.Error("No metric named", metricValidator.name)
+		} else {
+			metricValidator.validator(t, metric)
+		}
+	}
+}
+
+func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if meter, ok := metric.(metrics.Meter); !ok {
+				t.Errorf("Expected meter metric for '%s', got %T", name, metric)
+			} else {
+				extraValidator(t, meter)
+			}
+		},
+	}
+}
+
+func countMeterValidator(name string, expectedCount int) *metricValidator {
+	return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
+		count := meter.Count()
+		if count != int64(expectedCount) {
+			t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count)
+		}
+	})
+}
+
+func minCountMeterValidator(name string, minCount int) *metricValidator {
+	return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
+		count := meter.Count()
+		if count < int64(minCount) {
+			t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count)
+		}
+	})
+}
+
+func histogramValidator(name string, extraValidator func(*testing.T, metrics.Histogram)) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if histogram, ok := metric.(metrics.Histogram); !ok {
+				t.Errorf("Expected histogram metric for '%s', got %T", name, metric)
+			} else {
+				extraValidator(t, histogram)
+			}
+		},
+	}
+}
+
+func countHistogramValidator(name string, expectedCount int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		count := histogram.Count()
+		if count != int64(expectedCount) {
+			t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count)
+		}
+	})
+}
+
+func minCountHistogramValidator(name string, minCount int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		count := histogram.Count()
+		if count < int64(minCount) {
+			t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count)
+		}
+	})
+}
+
+func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		min := int(histogram.Min())
+		if min != expectedMin {
+			t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min)
+		}
+		max := int(histogram.Max())
+		if max != expectedMax {
+			t.Errorf("Expected histogram metric '%s' max = %d, got %d", name, expectedMax, max)
+		}
+	})
+}
+
+func minValHistogramValidator(name string, minMin int) *metricValidator {
+	return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
+		min := int(histogram.Min())
+		if min < minMin {
+			t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min)
+		}
+	})
+}

+ 25 - 1
mockbroker.go

@@ -20,6 +20,10 @@ const (
 
 type requestHandlerFunc func(req *request) (res encoder)
 
+// RequestNotifierFunc is invoked when a mock broker processes a request successfully
+// and will provides the number of bytes read and written.
+type RequestNotifierFunc func(bytesRead, bytesWritten int)
+
 // MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
 // to facilitate testing of higher level or specialized consumers and producers
 // built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
@@ -54,6 +58,7 @@ type MockBroker struct {
 	t            TestReporter
 	latency      time.Duration
 	handler      requestHandlerFunc
+	notifier     RequestNotifierFunc
 	history      []RequestResponse
 	lock         sync.Mutex
 }
@@ -85,6 +90,14 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
 	})
 }
 
+// SetNotifier set a function that will get invoked whenever a request has been
+// processed successfully and will provide the number of bytes read and written
+func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
+	b.lock.Lock()
+	b.notifier = notifier
+	b.lock.Unlock()
+}
+
 // BrokerID returns broker ID assigned to the broker.
 func (b *MockBroker) BrokerID() int32 {
 	return b.brokerID
@@ -180,7 +193,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 
 	resHeader := make([]byte, 8)
 	for {
-		req, err := decodeRequest(conn)
+		req, bytesRead, err := decodeRequest(conn)
 		if err != nil {
 			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
 			b.serverError(err)
@@ -208,6 +221,11 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 			break
 		}
 		if len(encodedRes) == 0 {
+			b.lock.Lock()
+			if b.notifier != nil {
+				b.notifier(bytesRead, 0)
+			}
+			b.lock.Unlock()
 			continue
 		}
 
@@ -221,6 +239,12 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 			b.serverError(err)
 			break
 		}
+
+		b.lock.Lock()
+		if b.notifier != nil {
+			b.notifier(bytesRead, len(resHeader)+len(encodedRes))
+		}
+		b.lock.Unlock()
 	}
 	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }

+ 8 - 6
request.go

@@ -57,27 +57,29 @@ func (r *request) decode(pd packetDecoder) (err error) {
 	return r.body.decode(pd, version)
 }
 
-func decodeRequest(r io.Reader) (req *request, err error) {
+func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
 	lengthBytes := make([]byte, 4)
 	if _, err := io.ReadFull(r, lengthBytes); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
+	bytesRead += len(lengthBytes)
 
 	length := int32(binary.BigEndian.Uint32(lengthBytes))
 	if length <= 4 || length > MaxRequestSize {
-		return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
+		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
 	}
 
 	encodedReq := make([]byte, length)
 	if _, err := io.ReadFull(r, encodedReq); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
+	bytesRead += len(encodedReq)
 
 	req = &request{}
 	if err := decode(encodedReq, req); err != nil {
-		return nil, err
+		return nil, bytesRead, err
 	}
-	return req, nil
+	return req, bytesRead, nil
 }
 
 func allocateBody(key, version int16) protocolBody {

+ 3 - 1
request_test.go

@@ -58,13 +58,15 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 		t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected)
 	}
 	// Decoder request
-	decoded, err := decodeRequest(bytes.NewReader(packet))
+	decoded, n, err := decodeRequest(bytes.NewReader(packet))
 	if err != nil {
 		t.Error("Failed to decode request", err)
 	} else if decoded.correlationID != 123 || decoded.clientID != "foo" {
 		t.Errorf("Decoded header is not valid: %v", decoded)
 	} else if !reflect.DeepEqual(rb, decoded.body) {
 		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body)
+	} else if n != len(packet) {
+		t.Errorf("Decoded request bytes: %d does not match the encoded one: %d\n", n, len(packet))
 	}
 }
 

+ 24 - 0
sarama.go

@@ -20,6 +20,30 @@ and message sent on the wire; the Client provides higher-level metadata manageme
 the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up
 exactly with the protocol fields documented by Kafka at
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
+
+Metrics are exposed through https://github.com/rcrowley/go-metrics library.
+
+Broker related metrics:
+
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+	| Name                                           | Type       | Description                                                   |
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+	| incoming-byte-rate                             | meter      | Bytes/second read off all brokers                             |
+	| incoming-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second read off a given broker                          |
+	| outgoing-byte-rate                             | meter      | Bytes/second written off all brokers                          |
+	| outgoing-byte-rate-for-broker-<broker-id>      | meter      | Bytes/second written off a given broker                       |
+	| request-rate                                   | meter      | Requests/second sent to all brokers                           |
+	| request-rate-for-broker-<broker-id>            | meter      | Requests/second sent to a given broker                        |
+	| histogram request-size                         | histogram  | Distribution of the request size in bytes for all brokers     |
+	| histogram request-size-for-broker-<broker-id>  | histogram  | Distribution of the request size in bytes for a given broker  |
+	| response-rate                                  | meter      | Responses/second received from all brokers                    |
+	| response-rate-for-broker-<broker-id>           | meter      | Responses/second received from a given broker                 |
+	| histogram response-size                        | histogram  | Distribution of the response size in bytes for all brokers    |
+	| histogram response-size-for-broker-<broker-id> | histogram  | Distribution of the response size in bytes for a given broker |
+	+------------------------------------------------+------------+---------------------------------------------------------------+
+
+Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
+
 */
 package sarama
 

+ 5 - 0
tools/kafka-console-producer/kafka-console-producer.go

@@ -9,6 +9,7 @@ import (
 	"strings"
 
 	"github.com/Shopify/sarama"
+	"github.com/rcrowley/go-metrics"
 )
 
 var (
@@ -19,6 +20,7 @@ var (
 	partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
 	partition   = flag.Int("partition", -1, "The partition to produce to.")
 	verbose     = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
+	showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
 	silent      = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
 
 	logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -96,6 +98,9 @@ func main() {
 	} else if !*silent {
 		fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
 	}
+	if *showMetrics {
+		metrics.WriteOnce(config.MetricRegistry, os.Stderr)
+	}
 }
 
 func printErrorAndExit(code int, format string, values ...interface{}) {