|
|
@@ -6,6 +6,8 @@ import (
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
+
|
|
|
+ "github.com/rcrowley/go-metrics"
|
|
|
)
|
|
|
|
|
|
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
|
|
|
@@ -552,6 +554,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
|
|
|
}
|
|
|
|
|
|
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
|
|
|
+ var (
|
|
|
+ metricRegistry = child.conf.MetricRegistry
|
|
|
+ consumerBatchSizeMetric metrics.Histogram
|
|
|
+ )
|
|
|
+
|
|
|
+ if metricRegistry != nil {
|
|
|
+ consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
|
|
|
+ }
|
|
|
+
|
|
|
block := response.GetBlock(child.topic, child.partition)
|
|
|
if block == nil {
|
|
|
return nil, ErrIncompleteResponse
|
|
|
@@ -565,6 +576,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
+ consumerBatchSizeMetric.Update(int64(nRecs))
|
|
|
+
|
|
|
if nRecs == 0 {
|
|
|
partialTrailingMessage, err := block.isPartial()
|
|
|
if err != nil {
|