Bläddra i källkod

Add requests-in-flight metric

- add requests-in-flight and requests-in-flight-for-broker-<brokerID>
  metrics to the broker
- move some b.updateOutgoingCommunicationMetrics(...) just after
  b.write(...) for consistency
- add -max-open-requests flag to kafka-producer-performance
- update kafka-producer-performance to monitor total requests in flight
- update unit tests
- validate new metric in TestFuncProducing*
- document new metrics
Sebastien Launay 4 år sedan
förälder
incheckning
2db8d8aea2
6 ändrade filer med 91 tillägg och 8 borttagningar
  1. 48 6
      broker.go
  2. 8 0
      broker_test.go
  3. 3 0
      functional_producer_test.go
  4. 16 0
      metrics_test.go
  5. 4 0
      sarama.go
  6. 12 2
      tools/kafka-producer-performance/main.go

+ 48 - 6
broker.go

@@ -40,6 +40,7 @@ type Broker struct {
 	outgoingByteRate       metrics.Meter
 	responseRate           metrics.Meter
 	responseSize           metrics.Histogram
+	requestsInFlight       metrics.Counter
 	brokerIncomingByteRate metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestSize      metrics.Histogram
@@ -47,6 +48,7 @@ type Broker struct {
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
+	brokerRequestsInFlight metrics.Counter
 
 	kerberosAuthenticator GSSAPIKerberosAuth
 }
@@ -182,6 +184,7 @@ func (b *Broker) Open(conf *Config) error {
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
@@ -712,16 +715,19 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 	}
 
 	requestTime := time.Now()
+	// Will be decremented in responseReceiver (except error or request with NoResponse)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return nil, err
 	}
 	b.correlationID++
 
 	if !promiseResponse {
 		// Record request latency without the response
-		b.updateRequestLatencyMetrics(time.Since(requestTime))
+		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
 		return nil, nil
 	}
 
@@ -816,6 +822,9 @@ func (b *Broker) responseReceiver() {
 
 	for response := range b.responses {
 		if dead != nil {
+			// This was previously incremented in send() and
+			// we are not calling updateIncomingCommunicationMetrics()
+			b.addRequestInFlightMetrics(-1)
 			response.errors <- dead
 			continue
 		}
@@ -891,9 +900,12 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	}
 
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -902,6 +914,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	header := make([]byte, 8) // response header
 	_, err = b.readFull(header)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 	}
@@ -910,6 +923,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
 	payload := make([]byte, length-4)
 	n, err := b.readFull(payload)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 	}
@@ -981,9 +995,12 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
 	copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.write(authBytes)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -1008,11 +1025,13 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
 
 	requestTime := time.Now()
 
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -1066,14 +1085,17 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
 // rejected.
 func (b *Broker) sendClientMessage(message []byte) (bool, error) {
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	correlationID := b.correlationID
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return false, err
 	}
 
-	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
 	res := &SaslAuthenticateResponse{}
@@ -1108,17 +1130,21 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
 
 	for !scramClient.Done() {
 		requestTime := time.Now()
+		// Will be decremented in updateIncomingCommunicationMetrics (except error)
+		b.addRequestInFlightMetrics(1)
 		correlationID := b.correlationID
 		bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
 
-		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		b.correlationID++
 		challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
@@ -1271,7 +1297,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
 }
 
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
-	b.updateRequestLatencyMetrics(requestLatency)
+	b.updateRequestLatencyAndInFlightMetrics(requestLatency)
 	b.responseRate.Mark(1)
 
 	if b.brokerResponseRate != nil {
@@ -1290,13 +1316,22 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti
 	}
 }
 
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
 
 	if b.brokerRequestLatency != nil {
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
+
+	b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+	b.requestsInFlight.Inc(i)
+	if b.brokerRequestsInFlight != nil {
+		b.brokerRequestsInFlight.Inc(i)
+	}
 }
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1325,6 +1360,7 @@ func (b *Broker) registerMetrics() {
 	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
 	b.brokerResponseRate = b.registerMeter("response-rate")
 	b.brokerResponseSize = b.registerHistogram("response-size")
+	b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
 }
 
 func (b *Broker) unregisterMetrics() {
@@ -1344,3 +1380,9 @@ func (b *Broker) registerHistogram(name string) metrics.Histogram {
 	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
 	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}

+ 8 - 0
broker_test.go

@@ -217,6 +217,7 @@ func TestSASLOAuthBearer(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypeOAuth
@@ -335,6 +336,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 		mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
 		mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
@@ -453,6 +455,7 @@ func TestSASLPlainAuth(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypePlaintext
@@ -536,6 +539,7 @@ func TestSASLReadTimeout(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 	}
 
 	conf := NewConfig()
@@ -626,6 +630,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
 		broker.responseSize = metrics.NilHistogram{}
 		broker.responseRate = metrics.NilMeter{}
 		broker.requestLatency = metrics.NilHistogram{}
+		broker.requestsInFlight = metrics.NilCounter{}
 		conf := NewConfig()
 		conf.Net.SASL.Mechanism = SASLTypeGSSAPI
 		conf.Net.SASL.GSSAPI.ServiceName = "kafka"
@@ -990,6 +995,9 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke
 	metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
 	metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
 
+	// Check that there is no more requests in flight
+	metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
+
 	// Run the validators
 	metricValidators.run(t, broker.conf.MetricRegistry)
 }

+ 3 - 0
functional_producer_test.go

@@ -260,6 +260,9 @@ func validateMetrics(t *testing.T, client Client) {
 		metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
 	}
 
+	// There should be no requests in flight anymore
+	metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
+
 	// Run the validators
 	metricValidators.run(t, client.Config().MetricRegistry)
 }

+ 16 - 0
metrics_test.go

@@ -170,3 +170,19 @@ func maxValHistogramValidator(name string, maxMax int) *metricValidator {
 		}
 	})
 }
+
+func counterValidator(name string, expectedCount int) *metricValidator {
+	return &metricValidator{
+		name: name,
+		validator: func(t *testing.T, metric interface{}) {
+			if counter, ok := metric.(metrics.Counter); !ok {
+				t.Errorf("Expected counter metric for '%s', got %T", name, metric)
+			} else {
+				count := counter.Count()
+				if count != int64(expectedCount) {
+					t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count)
+				}
+			}
+		},
+	}
+}

+ 4 - 0
sarama.go

@@ -39,6 +39,10 @@ Broker related metrics:
 	| response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
 	| response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
 	| response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
+	| requests-in-flight                           | counter    | The current number of in-flight requests awaiting a response  |
+	|                                              |            | for all brokers                                               |
+	| requests-in-flight-for-broker-<broker-id>    | counter    | The current number of in-flight requests awaiting a response  |
+	|                                              |            | for a given broker                                            |
 	+----------------------------------------------+------------+---------------------------------------------------------------+
 
 Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

+ 12 - 2
tools/kafka-producer-performance/main.go

@@ -80,6 +80,11 @@ var (
 		0,
 		"The maximum number of messages to send per second (0 for no limit).",
 	)
+	maxOpenRequests = flag.Int(
+		"max-open-requests",
+		5,
+		"The maximum number of unacknowledged requests the client will send on a single connection before blocking (default: 5).",
+	)
 	maxMessageBytes = flag.Int(
 		"max-message-bytes",
 		1000000,
@@ -248,6 +253,7 @@ func main() {
 
 	config := sarama.NewConfig()
 
+	config.Net.MaxOpenRequests = *maxOpenRequests
 	config.Producer.MaxMessageBytes = *maxMessageBytes
 	config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
 	config.Producer.Timeout = *timeout
@@ -430,17 +436,20 @@ func printMetrics(w io.Writer, r metrics.Registry) {
 	recordSendRateMetric := r.Get("record-send-rate")
 	requestLatencyMetric := r.Get("request-latency-in-ms")
 	outgoingByteRateMetric := r.Get("outgoing-byte-rate")
+	requestsInFlightMetric := r.Get("requests-in-flight")
 
-	if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil {
+	if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil ||
+		requestsInFlightMetric == nil {
 		return
 	}
 	recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
 	requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
 	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
 	outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
+	requestsInFlight := requestsInFlightMetric.(metrics.Counter).Count()
 	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
 		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
-		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
+		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th, %d total req. in flight\n",
 		recordSendRate.Count(),
 		recordSendRate.RateMean(),
 		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
@@ -452,6 +461,7 @@ func printMetrics(w io.Writer, r metrics.Registry) {
 		requestLatencyPercentiles[2],
 		requestLatencyPercentiles[3],
 		requestLatencyPercentiles[4],
+		requestsInFlight,
 	)
 }