Browse Source

added a metric to expose consumer batch size

Varun 6 years ago
parent
commit
4ecd1e57c5
1 changed files with 15 additions and 0 deletions
  1. 15 0
      consumer.go

+ 15 - 0
consumer.go

@@ -6,6 +6,8 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"github.com/rcrowley/go-metrics"
 )
 
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
@@ -518,6 +520,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 
 func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
+
 	for _, rec := range batch.Records {
 		offset := batch.FirstOffset + rec.OffsetDelta
 		if offset < child.offset {
@@ -545,6 +548,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 }
 
 func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+	var (
+		metricRegistry          = child.conf.MetricRegistry
+		consumerBatchSizeMetric metrics.Histogram
+	)
+
+	if metricRegistry != nil {
+		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
+	}
+
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
 		return nil, ErrIncompleteResponse
@@ -558,6 +570,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	if err != nil {
 		return nil, err
 	}
+
+	consumerBatchSizeMetric.Update(int64(nRecs))
+
 	if nRecs == 0 {
 		partialTrailingMessage, err := block.isPartial()
 		if err != nil {