|
@@ -32,6 +32,8 @@ type Broker struct {
|
|
|
responses chan responsePromise
|
|
responses chan responsePromise
|
|
|
done chan bool
|
|
done chan bool
|
|
|
|
|
|
|
|
|
|
+ registeredMetrics []string
|
|
|
|
|
+
|
|
|
incomingByteRate metrics.Meter
|
|
incomingByteRate metrics.Meter
|
|
|
requestRate metrics.Meter
|
|
requestRate metrics.Meter
|
|
|
requestSize metrics.Histogram
|
|
requestSize metrics.Histogram
|
|
@@ -179,13 +181,7 @@ func (b *Broker) Open(conf *Config) error {
|
|
|
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
|
|
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
|
|
|
// the same id (-1) and are already exposed through the global metrics above
|
|
// the same id (-1) and are already exposed through the global metrics above
|
|
|
if b.id >= 0 {
|
|
if b.id >= 0 {
|
|
|
- b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
|
|
|
|
|
- b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
|
|
|
|
|
|
|
+ b.registerMetrics()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if conf.Net.SASL.Enable {
|
|
if conf.Net.SASL.Enable {
|
|
@@ -246,12 +242,7 @@ func (b *Broker) Close() error {
|
|
|
b.done = nil
|
|
b.done = nil
|
|
|
b.responses = nil
|
|
b.responses = nil
|
|
|
|
|
|
|
|
- if b.id >= 0 {
|
|
|
|
|
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
|
|
|
|
|
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
|
|
|
|
|
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
|
|
|
|
|
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ b.unregisterMetrics()
|
|
|
|
|
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
Logger.Printf("Closed connection to broker %s\n", b.addr)
|
|
Logger.Printf("Closed connection to broker %s\n", b.addr)
|
|
@@ -1209,3 +1200,31 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
|
|
|
b.brokerRequestSize.Update(requestSize)
|
|
b.brokerRequestSize.Update(requestSize)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+func (b *Broker) registerMetrics() {
|
|
|
|
|
+ b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
|
|
|
|
|
+ b.brokerRequestRate = b.registerMeter("request-rate")
|
|
|
|
|
+ b.brokerRequestSize = b.registerHistogram("request-size")
|
|
|
|
|
+ b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
|
|
|
|
|
+ b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
|
|
|
|
|
+ b.brokerResponseRate = b.registerMeter("response-rate")
|
|
|
|
|
+ b.brokerResponseSize = b.registerHistogram("response-size")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (b *Broker) unregisterMetrics() {
|
|
|
|
|
+ for _, name := range b.registeredMetrics {
|
|
|
|
|
+ b.conf.MetricRegistry.Unregister(name)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (b *Broker) registerMeter(name string) metrics.Meter {
|
|
|
|
|
+ nameForBroker := getMetricNameForBroker(name, b)
|
|
|
|
|
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
|
|
|
|
|
+ return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (b *Broker) registerHistogram(name string) metrics.Histogram {
|
|
|
|
|
+ nameForBroker := getMetricNameForBroker(name, b)
|
|
|
|
|
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
|
|
|
|
|
+ return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
|
|
|
|
|
+}
|