|
|
@@ -32,18 +32,21 @@ type Broker struct {
|
|
|
incomingByteRate metrics.Meter
|
|
|
requestRate metrics.Meter
|
|
|
requestSize metrics.Histogram
|
|
|
+ requestLatency metrics.Histogram
|
|
|
outgoingByteRate metrics.Meter
|
|
|
responseRate metrics.Meter
|
|
|
responseSize metrics.Histogram
|
|
|
brokerIncomingByteRate metrics.Meter
|
|
|
brokerRequestRate metrics.Meter
|
|
|
brokerRequestSize metrics.Histogram
|
|
|
+ brokerRequestLatency metrics.Histogram
|
|
|
brokerOutgoingByteRate metrics.Meter
|
|
|
brokerResponseRate metrics.Meter
|
|
|
brokerResponseSize metrics.Histogram
|
|
|
}
|
|
|
|
|
|
type responsePromise struct {
|
|
|
+ requestTime time.Time
|
|
|
correlationID int32
|
|
|
packets chan []byte
|
|
|
errors chan error
|
|
|
@@ -103,6 +106,7 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
|
|
|
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
|
|
|
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
|
|
|
+ b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
|
|
|
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
|
|
|
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
|
|
|
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
|
|
|
@@ -112,6 +116,7 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
|
|
|
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
|
|
|
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
|
|
|
+ b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
|
|
|
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
|
|
|
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
|
|
|
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
|
|
|
@@ -376,6 +381,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ requestTime := time.Now()
|
|
|
bytes, err := b.conn.Write(buf)
|
|
|
b.updateOutgoingCommunicationMetrics(bytes)
|
|
|
if err != nil {
|
|
|
@@ -384,10 +390,12 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
|
|
|
b.correlationID++
|
|
|
|
|
|
if !promiseResponse {
|
|
|
+ // Record request latency without the response
|
|
|
+ b.updateRequestLatencyMetrics(time.Since(requestTime))
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
- promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)}
|
|
|
+ promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
|
|
|
b.responses <- promise
|
|
|
|
|
|
return &promise, nil
|
|
|
@@ -476,8 +484,9 @@ func (b *Broker) responseReceiver() {
|
|
|
}
|
|
|
|
|
|
bytesReadHeader, err := io.ReadFull(b.conn, header)
|
|
|
+ requestLatency := time.Since(response.requestTime)
|
|
|
if err != nil {
|
|
|
- b.updateIncomingCommunicationMetrics(bytesReadHeader)
|
|
|
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
|
|
|
dead = err
|
|
|
response.errors <- err
|
|
|
continue
|
|
|
@@ -486,13 +495,13 @@ func (b *Broker) responseReceiver() {
|
|
|
decodedHeader := responseHeader{}
|
|
|
err = decode(header, &decodedHeader)
|
|
|
if err != nil {
|
|
|
- b.updateIncomingCommunicationMetrics(bytesReadHeader)
|
|
|
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
|
|
|
dead = err
|
|
|
response.errors <- err
|
|
|
continue
|
|
|
}
|
|
|
if decodedHeader.correlationID != response.correlationID {
|
|
|
- b.updateIncomingCommunicationMetrics(bytesReadHeader)
|
|
|
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
|
|
|
// TODO if decoded ID < cur ID, discard until we catch up
|
|
|
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
|
|
|
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
|
|
|
@@ -502,7 +511,7 @@ func (b *Broker) responseReceiver() {
|
|
|
|
|
|
buf := make([]byte, decodedHeader.length-4)
|
|
|
bytesReadBody, err := io.ReadFull(b.conn, buf)
|
|
|
- b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
|
|
|
+ b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
|
|
|
if err != nil {
|
|
|
dead = err
|
|
|
response.errors <- err
|
|
|
@@ -544,6 +553,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ requestTime := time.Now()
|
|
|
bytesWritten, err := b.conn.Write(authBytes)
|
|
|
b.updateOutgoingCommunicationMetrics(bytesWritten)
|
|
|
if err != nil {
|
|
|
@@ -553,7 +563,7 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
|
|
|
header := make([]byte, 4)
|
|
|
n, err := io.ReadFull(b.conn, header)
|
|
|
- b.updateIncomingCommunicationMetrics(n)
|
|
|
+ b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
|
|
|
// If the credentials are valid, we would get a 4 byte response filled with null characters.
|
|
|
// Otherwise, the broker closes the connection and we get an EOF
|
|
|
if err != nil {
|
|
|
@@ -565,7 +575,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
|
|
|
+func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
|
|
|
+ b.updateRequestLatencyMetrics(requestLatency)
|
|
|
b.responseRate.Mark(1)
|
|
|
if b.brokerResponseRate != nil {
|
|
|
b.brokerResponseRate.Mark(1)
|
|
|
@@ -581,6 +592,14 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
|
|
|
+ requestLatencyInMs := int64(requestLatency / time.Millisecond)
|
|
|
+ b.requestLatency.Update(requestLatencyInMs)
|
|
|
+ if b.brokerRequestLatency != nil {
|
|
|
+ b.brokerRequestLatency.Update(requestLatencyInMs)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
|
|
|
b.requestRate.Mark(1)
|
|
|
if b.brokerRequestRate != nil {
|