Browse Source

Merge pull request #1539 from slaunay/feature/requests-in-flight-metric

Add requests-in-flight metric
Vlad Gorodetsky 5 years ago
parent
commit
f4f562ee27
6 changed files with 91 additions and 8 deletions
  1. 48 6
      broker.go
  2. 8 0
      broker_test.go
  3. 3 0
      functional_producer_test.go
  4. 16 0
      metrics_test.go
  5. 4 0
      sarama.go
  6. 12 2
      tools/kafka-producer-performance/main.go

+ 48 - 6
broker.go

@@ -40,6 +40,7 @@ type Broker struct {
 	outgoingByteRate       metrics.Meter
 	outgoingByteRate       metrics.Meter
 	responseRate           metrics.Meter
 	responseRate           metrics.Meter
 	responseSize           metrics.Histogram
 	responseSize           metrics.Histogram
+	requestsInFlight       metrics.Counter
 	brokerIncomingByteRate metrics.Meter
 	brokerIncomingByteRate metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestSize      metrics.Histogram
 	brokerRequestSize      metrics.Histogram
@@ -47,6 +48,7 @@ type Broker struct {
 	brokerOutgoingByteRate metrics.Meter
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
 	brokerResponseSize     metrics.Histogram
+	brokerRequestsInFlight metrics.Counter
 
 
 	kerberosAuthenticator GSSAPIKerberosAuth
 	kerberosAuthenticator GSSAPIKerberosAuth
 }
 }
@@ -182,6 +184,7 @@ func (b *Broker) Open(conf *Config) error {
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
 		if b.id >= 0 {
@@ -712,16 +715,19 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 	}
 	}
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
+	// Will be decremented in responseReceiver (except error or request with NoResponse)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return nil, err
 		return nil, err
 	}
 	}
 	b.correlationID++
 	b.correlationID++
 
 
 	if !promiseResponse {
 	if !promiseResponse {
 		// Record request latency without the response
 		// Record request latency without the response
-		b.updateRequestLatencyMetrics(time.Since(requestTime))
+		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
 		return nil, nil
 		return nil, nil
 	}
 	}
 
 
@@ -816,6 +822,9 @@ func (b *Broker) responseReceiver() {
 
 
 	for response := range b.responses {
 	for response := range b.responses {
 		if dead != nil {
 		if dead != nil {
+			// This was previously incremented in send() and
+			// we are not calling updateIncomingCommunicationMetrics()
+			b.addRequestInFlightMetrics(-1)
 			response.errors <- dead
 			response.errors <- dead
 			continue
 			continue
 		}
 		}
@@ -891,9 +900,12 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	}
 	}
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
 		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
 		return err
 		return err
 	}
 	}
@@ -902,6 +914,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	header := make([]byte, 8) // response header
 	header := make([]byte, 8) // response header
 	_, err = b.readFull(header)
 	_, err = b.readFull(header)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
@@ -910,6 +923,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	payload := make([]byte, length-4)
 	payload := make([]byte, length-4)
 	n, err := b.readFull(payload)
 	n, err := b.readFull(payload)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 		return err
 	}
 	}
@@ -981,9 +995,12 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
 	copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 	copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.write(authBytes)
 	bytesWritten, err := b.write(authBytes)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 		return err
 	}
 	}
@@ -1008,11 +1025,13 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
 
 
 	requestTime := time.Now()
 	requestTime := time.Now()
 
 
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
 	bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 
 
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 		return err
 	}
 	}
@@ -1066,14 +1085,17 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
 // rejected.
 // rejected.
 func (b *Broker) sendClientMessage(message []byte) (bool, error) {
 func (b *Broker) sendClientMessage(message []byte) (bool, error) {
 	requestTime := time.Now()
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	correlationID := b.correlationID
 	correlationID := b.correlationID
 
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
 	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return false, err
 		return false, err
 	}
 	}
 
 
-	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 	b.correlationID++
 
 
 	res := &SaslAuthenticateResponse{}
 	res := &SaslAuthenticateResponse{}
@@ -1108,17 +1130,21 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
 
 
 	for !scramClient.Done() {
 	for !scramClient.Done() {
 		requestTime := time.Now()
 		requestTime := time.Now()
+		// Will be decremented in updateIncomingCommunicationMetrics (except error)
+		b.addRequestInFlightMetrics(1)
 		correlationID := b.correlationID
 		correlationID := b.correlationID
 		bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
 		bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		if err != nil {
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 			Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 			return err
 			return err
 		}
 		}
 
 
-		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		b.correlationID++
 		b.correlationID++
 		challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
 		challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
 		if err != nil {
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
 			Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
 			return err
 			return err
 		}
 		}
@@ -1271,7 +1297,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
 }
 }
 
 
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
-	b.updateRequestLatencyMetrics(requestLatency)
+	b.updateRequestLatencyAndInFlightMetrics(requestLatency)
 	b.responseRate.Mark(1)
 	b.responseRate.Mark(1)
 
 
 	if b.brokerResponseRate != nil {
 	if b.brokerResponseRate != nil {
@@ -1290,13 +1316,22 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti
 	}
 	}
 }
 }
 
 
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
 	b.requestLatency.Update(requestLatencyInMs)
 
 
 	if b.brokerRequestLatency != nil {
 	if b.brokerRequestLatency != nil {
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
 	}
+
+	b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+	b.requestsInFlight.Inc(i)
+	if b.brokerRequestsInFlight != nil {
+		b.brokerRequestsInFlight.Inc(i)
+	}
 }
 }
 
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1325,6 +1360,7 @@ func (b *Broker) registerMetrics() {
 	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
 	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
 	b.brokerResponseRate = b.registerMeter("response-rate")
 	b.brokerResponseRate = b.registerMeter("response-rate")
 	b.brokerResponseSize = b.registerHistogram("response-size")
 	b.brokerResponseSize = b.registerHistogram("response-size")
+	b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
 }
 }
 
 
 func (b *Broker) unregisterMetrics() {
 func (b *Broker) unregisterMetrics() {
@@ -1344,3 +1380,9 @@ func (b *Broker) registerHistogram(name string) metrics.Histogram {
 	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
 	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
 	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }
 }
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}

+ 8 - 0
broker_test.go

@@ -217,6 +217,7 @@ func TestSASLOAuthBearer(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 
 		conf := NewConfig()
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypeOAuth
 		conf.Net.SASL.Mechanism = SASLTypeOAuth
@@ -335,6 +336,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 
 		mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
 		mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
 		mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
 		mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
@@ -453,6 +455,7 @@ func TestSASLPlainAuth(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 
 		conf := NewConfig()
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypePlaintext
 		conf.Net.SASL.Mechanism = SASLTypePlaintext
@@ -536,6 +539,7 @@ func TestSASLReadTimeout(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 	}
 	}
 
 
 	conf := NewConfig()
 	conf := NewConfig()
@@ -626,6 +630,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 		conf := NewConfig()
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypeGSSAPI
 		conf.Net.SASL.Mechanism = SASLTypeGSSAPI
 		conf.Net.SASL.GSSAPI.ServiceName = "kafka"
 		conf.Net.SASL.GSSAPI.ServiceName = "kafka"
@@ -990,6 +995,9 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke
 	metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
 	metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
 	metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
 	metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
 
 
+	// Check that there is no more requests in flight
+	metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
+
 	// Run the validators
 	// Run the validators
 	metricValidators.run(t, broker.conf.MetricRegistry)
 	metricValidators.run(t, broker.conf.MetricRegistry)
 }
 }

+ 3 - 0
functional_producer_test.go

@@ -260,6 +260,9 @@ func validateMetrics(t *testing.T, client Client) {
 		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
 		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
 	}
 	}
 
 
+	// There should be no requests in flight anymore
+	metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
+
 	// Run the validators
 	// Run the validators
 	metricValidators.run(t, client.Config().MetricRegistry)
 	metricValidators.run(t, client.Config().MetricRegistry)
 }
 }

+ 16 - 0
metrics_test.go

@@ -170,3 +170,19 @@ func maxValHistogramValidator(name string, maxMax int) *metricValidator {
 		}
 		}
 	})
 	})
 }
 }
+
+func counterValidator(name string, expectedCount int) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if counter, ok := metric.(metrics.Counter); !ok {
+				t.Errorf("Expected counter metric for '%s', got %T", name, metric)
+			} else {
+				count := counter.Count()
+				if count != int64(expectedCount) {
+					t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count)
+				}
+			}
+		},
+	}
+}

+ 4 - 0
sarama.go

@@ -39,6 +39,10 @@ Broker related metrics:
 	| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
 	| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
 	| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
 	| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
 	| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
 	| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
+	| requests-in-flight                           | counter    | The current number of in-flight requests awaiting a response  |
+	|                                              |            | for all brokers                                               |
+	| requests-in-flight-for-broker-<broker-id>    | counter    | The current number of in-flight requests awaiting a response  |
+	|                                              |            | for a given broker                                            |
 	+----------------------------------------------+------------+---------------------------------------------------------------+
 	+----------------------------------------------+------------+---------------------------------------------------------------+
 
 
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

+ 12 - 2
tools/kafka-producer-performance/main.go

@@ -80,6 +80,11 @@ var (
 		0,
 		0,
 		"The maximum number of messages to send per second (0 for no limit).",
 		"The maximum number of messages to send per second (0 for no limit).",
 	)
 	)
+	maxOpenRequests = flag.Int(
+		"max-open-requests",
+		5,
+		"The maximum number of unacknowledged requests the client will send on a single connection before blocking (default: 5).",
+	)
 	maxMessageBytes = flag.Int(
 	maxMessageBytes = flag.Int(
 		"max-message-bytes",
 		"max-message-bytes",
 		1000000,
 		1000000,
@@ -248,6 +253,7 @@ func main() {
 
 
 	config := sarama.NewConfig()
 	config := sarama.NewConfig()
 
 
+	config.Net.MaxOpenRequests = *maxOpenRequests
 	config.Producer.MaxMessageBytes = *maxMessageBytes
 	config.Producer.MaxMessageBytes = *maxMessageBytes
 	config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
 	config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
 	config.Producer.Timeout = *timeout
 	config.Producer.Timeout = *timeout
@@ -430,17 +436,20 @@ func printMetrics(w io.Writer, r metrics.Registry) {
 	recordSendRateMetric := r.Get("record-send-rate")
 	recordSendRateMetric := r.Get("record-send-rate")
 	requestLatencyMetric := r.Get("request-latency-in-ms")
 	requestLatencyMetric := r.Get("request-latency-in-ms")
 	outgoingByteRateMetric := r.Get("outgoing-byte-rate")
 	outgoingByteRateMetric := r.Get("outgoing-byte-rate")
+	requestsInFlightMetric := r.Get("requests-in-flight")
 
 
-	if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil {
+	if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil ||
+		requestsInFlightMetric == nil {
 		return
 		return
 	}
 	}
 	recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
 	recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
 	requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
 	requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
 	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
 	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
 	outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
 	outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
+	requestsInFlight := requestsInFlightMetric.(metrics.Counter).Count()
 	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
 	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
 		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
 		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
-		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
+		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th, %d total req. in flight\n",
 		recordSendRate.Count(),
 		recordSendRate.Count(),
 		recordSendRate.RateMean(),
 		recordSendRate.RateMean(),
 		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
 		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
@@ -452,6 +461,7 @@ func printMetrics(w io.Writer, r metrics.Registry) {
 		requestLatencyPercentiles[2],
 		requestLatencyPercentiles[2],
 		requestLatencyPercentiles[3],
 		requestLatencyPercentiles[3],
 		requestLatencyPercentiles[4],
 		requestLatencyPercentiles[4],
+		requestsInFlight,
 	)
 	)
 }
 }