|
|
@@ -0,0 +1,653 @@
|
|
|
+// Copyright 2014 The Cockroach Authors.
|
|
|
+//
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
+// You may obtain a copy of the License at
|
|
|
+//
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+//
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
|
+// implied. See the License for the specific language governing
|
|
|
+// permissions and limitations under the License. See the AUTHORS file
|
|
|
+// for names of contributors.
|
|
|
+//
|
|
|
+// Author: Tyler Neely (t@jujit.su)
|
|
|
+
|
|
|
+// IMPORTANT: only subscribe to the metric stream
|
|
|
+// using buffered channels that are regularly
|
|
|
+// flushed, as reaper will NOT block while trying
|
|
|
+// to send metrics to a subscriber, and will ignore
|
|
|
+// a subscriber if they fail to clear their channel
|
|
|
+// 3 times in a row!
|
|
|
+
|
|
|
+package loghisto
|
|
|
+
|
|
|
+import (
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "math"
|
|
|
+ "runtime"
|
|
|
+ "sort"
|
|
|
+ "sync"
|
|
|
+ "sync/atomic"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/coreos/etcd/Godeps/_workspace/src/github.com/golang/glog"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ // precision effects the bucketing used during histogram value compression.
|
|
|
+ precision = 100
|
|
|
+)
|
|
|
+
|
|
|
+// ProcessedMetricSet contains human-readable metrics that may also be
|
|
|
+// suitable for storage in time-series databases.
|
|
|
+type ProcessedMetricSet struct {
|
|
|
+ Time time.Time
|
|
|
+ Metrics map[string]float64
|
|
|
+}
|
|
|
+
|
|
|
+// RawMetricSet contains metrics in a form that supports generation of
|
|
|
+// percentiles and other rich statistics.
|
|
|
+type RawMetricSet struct {
|
|
|
+ Time time.Time
|
|
|
+ Counters map[string]uint64
|
|
|
+ Rates map[string]uint64
|
|
|
+ Histograms map[string]map[int16]*uint64
|
|
|
+ Gauges map[string]float64
|
|
|
+}
|
|
|
+
|
|
|
+// TimerToken facilitates concurrent timings of durations of the same label.
|
|
|
+type TimerToken struct {
|
|
|
+ Name string
|
|
|
+ Start time.Time
|
|
|
+ MetricSystem *MetricSystem
|
|
|
+}
|
|
|
+
|
|
|
+// proportion is a compact value with a corresponding count of
|
|
|
+// occurrences in this interval.
|
|
|
+type proportion struct {
|
|
|
+ Value float64
|
|
|
+ Count uint64
|
|
|
+}
|
|
|
+
|
|
|
+// proportionArray is a sortable collection of proportion types.
|
|
|
+type proportionArray []proportion
|
|
|
+
|
|
|
+// MetricSystem facilitates the collection and distribution of metrics.
|
|
|
+type MetricSystem struct {
|
|
|
+ // percentiles is a mapping from labels to desired percentiles to be
|
|
|
+ // calculated by the MetricSystem
|
|
|
+ percentiles map[string]float64
|
|
|
+ // interval is the duration between collections and broadcasts of metrics
|
|
|
+ // to subscribers.
|
|
|
+ interval time.Duration
|
|
|
+ // subscribeToRawMetrics allows subscription to a RawMetricSet generated
|
|
|
+ // by reaper at the end of each interval on a sent channel.
|
|
|
+ subscribeToRawMetrics chan chan *RawMetricSet
|
|
|
+ // unsubscribeFromRawMetrics allows subscribers to unsubscribe from
|
|
|
+ // receiving a RawMetricSet on the sent channel.
|
|
|
+ unsubscribeFromRawMetrics chan chan *RawMetricSet
|
|
|
+ // subscribeToProcessedMetrics allows subscription to a ProcessedMetricSet
|
|
|
+ // generated by reaper at the end of each interval on a sent channel.
|
|
|
+ subscribeToProcessedMetrics chan chan *ProcessedMetricSet
|
|
|
+ // unsubscribeFromProcessedMetrics allows subscribers to unsubscribe from
|
|
|
+ // receiving a ProcessedMetricSet on the sent channel.
|
|
|
+ unsubscribeFromProcessedMetrics chan chan *ProcessedMetricSet
|
|
|
+ // rawSubscribers stores current subscribers to RawMetrics
|
|
|
+ rawSubscribers map[chan *RawMetricSet]struct{}
|
|
|
+ // rawBadSubscribers tracks misbehaving subscribers who do not clear their
|
|
|
+ // subscription channels regularly.
|
|
|
+ rawBadSubscribers map[chan *RawMetricSet]int
|
|
|
+ // processedSubscribers stores current subscribers to ProcessedMetrics
|
|
|
+ processedSubscribers map[chan *ProcessedMetricSet]struct{}
|
|
|
+ // processedBadSubscribers tracks misbehaving subscribers who do not clear
|
|
|
+ // their subscription channels regularly.
|
|
|
+ processedBadSubscribers map[chan *ProcessedMetricSet]int
|
|
|
+ // subscribersMu controls access to subscription structures
|
|
|
+ subscribersMu sync.RWMutex
|
|
|
+ // counterStore maintains the total counts of counters.
|
|
|
+ counterStore map[string]*uint64
|
|
|
+ counterStoreMu sync.RWMutex
|
|
|
+ // counterCache aggregates new Counters until they are collected by reaper().
|
|
|
+ counterCache map[string]*uint64
|
|
|
+ // counterMu controls access to counterCache.
|
|
|
+ counterMu sync.RWMutex
|
|
|
+ // histogramCache aggregates Histograms until they are collected by reaper().
|
|
|
+ histogramCache map[string]map[int16]*uint64
|
|
|
+ // histogramMu controls access to histogramCache.
|
|
|
+ histogramMu sync.RWMutex
|
|
|
+ // histogramCountStore keeps track of aggregate counts and sums for aggregate
|
|
|
+ // mean calculation.
|
|
|
+ histogramCountStore map[string]*uint64
|
|
|
+ // histogramCountMu controls access to the histogramCountStore.
|
|
|
+ histogramCountMu sync.RWMutex
|
|
|
+ // gaugeFuncs maps metrics to functions used for calculating their value
|
|
|
+ gaugeFuncs map[string]func() float64
|
|
|
+ // gaugeFuncsMu controls access to the gaugeFuncs map.
|
|
|
+ gaugeFuncsMu sync.Mutex
|
|
|
+ // Has reaper() been started?
|
|
|
+ reaping bool
|
|
|
+ // Close this to bring down this MetricSystem
|
|
|
+ shutdownChan chan struct{}
|
|
|
+}
|
|
|
+
|
|
|
+// Metrics is the default metric system, which collects and broadcasts metrics
|
|
|
+// to subscribers once every 60 seconds. Also includes default system stats.
|
|
|
+var Metrics = NewMetricSystem(60*time.Second, true)
|
|
|
+
|
|
|
+// NewMetricSystem returns a new metric system that collects and broadcasts
|
|
|
+// metrics after each interval.
|
|
|
+func NewMetricSystem(interval time.Duration, sysStats bool) *MetricSystem {
|
|
|
+ ms := &MetricSystem{
|
|
|
+ percentiles: map[string]float64{
|
|
|
+ "%s_min": 0,
|
|
|
+ "%s_50": .5,
|
|
|
+ "%s_75": .75,
|
|
|
+ "%s_90": .9,
|
|
|
+ "%s_95": .95,
|
|
|
+ "%s_99": .99,
|
|
|
+ "%s_99.9": .999,
|
|
|
+ "%s_99.99": .9999,
|
|
|
+ "%s_max": 1,
|
|
|
+ },
|
|
|
+ interval: interval,
|
|
|
+ subscribeToRawMetrics: make(chan chan *RawMetricSet, 64),
|
|
|
+ unsubscribeFromRawMetrics: make(chan chan *RawMetricSet, 64),
|
|
|
+ subscribeToProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
|
|
|
+ unsubscribeFromProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
|
|
|
+ rawSubscribers: make(map[chan *RawMetricSet]struct{}),
|
|
|
+ rawBadSubscribers: make(map[chan *RawMetricSet]int),
|
|
|
+ processedSubscribers: make(map[chan *ProcessedMetricSet]struct{}),
|
|
|
+ processedBadSubscribers: make(map[chan *ProcessedMetricSet]int),
|
|
|
+ counterStore: make(map[string]*uint64),
|
|
|
+ counterCache: make(map[string]*uint64),
|
|
|
+ histogramCache: make(map[string]map[int16]*uint64),
|
|
|
+ histogramCountStore: make(map[string]*uint64),
|
|
|
+ gaugeFuncs: make(map[string]func() float64),
|
|
|
+ shutdownChan: make(chan struct{}),
|
|
|
+ }
|
|
|
+ if sysStats {
|
|
|
+ ms.gaugeFuncsMu.Lock()
|
|
|
+ ms.gaugeFuncs["sys.Alloc"] = func() float64 {
|
|
|
+ memStats := new(runtime.MemStats)
|
|
|
+ runtime.ReadMemStats(memStats)
|
|
|
+ return float64(memStats.Alloc)
|
|
|
+ }
|
|
|
+ ms.gaugeFuncs["sys.NumGC"] = func() float64 {
|
|
|
+ memStats := new(runtime.MemStats)
|
|
|
+ runtime.ReadMemStats(memStats)
|
|
|
+ return float64(memStats.NumGC)
|
|
|
+ }
|
|
|
+ ms.gaugeFuncs["sys.PauseTotalNs"] = func() float64 {
|
|
|
+ memStats := new(runtime.MemStats)
|
|
|
+ runtime.ReadMemStats(memStats)
|
|
|
+ return float64(memStats.PauseTotalNs)
|
|
|
+ }
|
|
|
+ ms.gaugeFuncs["sys.NumGoroutine"] = func() float64 {
|
|
|
+ return float64(runtime.NumGoroutine())
|
|
|
+ }
|
|
|
+ ms.gaugeFuncsMu.Unlock()
|
|
|
+ }
|
|
|
+ return ms
|
|
|
+}
|
|
|
+
|
|
|
+// SpecifyPercentiles allows users to override the default collected
|
|
|
+// and reported percentiles.
|
|
|
+func (ms *MetricSystem) SpecifyPercentiles(percentiles map[string]float64) {
|
|
|
+ ms.percentiles = percentiles
|
|
|
+}
|
|
|
+
|
|
|
+// SubscribeToRawMetrics registers a channel to receive RawMetricSets
|
|
|
+// periodically generated by reaper at each interval.
|
|
|
+func (ms *MetricSystem) SubscribeToRawMetrics(metricStream chan *RawMetricSet) {
|
|
|
+ ms.subscribeToRawMetrics <- metricStream
|
|
|
+}
|
|
|
+
|
|
|
+// UnsubscribeFromRawMetrics registers a channel to receive RawMetricSets
|
|
|
+// periodically generated by reaper at each interval.
|
|
|
+func (ms *MetricSystem) UnsubscribeFromRawMetrics(
|
|
|
+ metricStream chan *RawMetricSet) {
|
|
|
+ ms.unsubscribeFromRawMetrics <- metricStream
|
|
|
+}
|
|
|
+
|
|
|
+// SubscribeToProcessedMetrics registers a channel to receive
|
|
|
+// ProcessedMetricSets periodically generated by reaper at each interval.
|
|
|
+func (ms *MetricSystem) SubscribeToProcessedMetrics(
|
|
|
+ metricStream chan *ProcessedMetricSet) {
|
|
|
+ ms.subscribeToProcessedMetrics <- metricStream
|
|
|
+}
|
|
|
+
|
|
|
+// UnsubscribeFromProcessedMetrics registers a channel to receive
|
|
|
+// ProcessedMetricSets periodically generated by reaper at each interval.
|
|
|
+func (ms *MetricSystem) UnsubscribeFromProcessedMetrics(
|
|
|
+ metricStream chan *ProcessedMetricSet) {
|
|
|
+ ms.unsubscribeFromProcessedMetrics <- metricStream
|
|
|
+}
|
|
|
+
|
|
|
+// StartTimer begins a timer and returns a token which is required for halting
|
|
|
+// the timer. This allows for concurrent timings under the same name.
|
|
|
+func (ms *MetricSystem) StartTimer(name string) TimerToken {
|
|
|
+ return TimerToken{
|
|
|
+ Name: name,
|
|
|
+ Start: time.Now(),
|
|
|
+ MetricSystem: ms,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Stop stops a timer given by StartTimer, submits a Histogram of its duration
|
|
|
+// in nanoseconds, and returns its duration in nanoseconds.
|
|
|
+func (tt *TimerToken) Stop() time.Duration {
|
|
|
+ duration := time.Since(tt.Start)
|
|
|
+ tt.MetricSystem.Histogram(tt.Name, float64(duration.Nanoseconds()))
|
|
|
+ return duration
|
|
|
+}
|
|
|
+
|
|
|
+// Counter is used for recording a running count of the total occurrences of
|
|
|
+// a particular event. A rate is also exported for the amount that a counter
|
|
|
+// has increased during an interval of this MetricSystem.
|
|
|
+func (ms *MetricSystem) Counter(name string, amount uint64) {
|
|
|
+ ms.counterMu.RLock()
|
|
|
+ _, exists := ms.counterCache[name]
|
|
|
+ // perform lock promotion when we need more control
|
|
|
+ if exists {
|
|
|
+ atomic.AddUint64(ms.counterCache[name], amount)
|
|
|
+ ms.counterMu.RUnlock()
|
|
|
+ } else {
|
|
|
+ ms.counterMu.RUnlock()
|
|
|
+ ms.counterMu.Lock()
|
|
|
+ _, syncExists := ms.counterCache[name]
|
|
|
+ if !syncExists {
|
|
|
+ var z uint64
|
|
|
+ ms.counterCache[name] = &z
|
|
|
+ }
|
|
|
+ atomic.AddUint64(ms.counterCache[name], amount)
|
|
|
+ ms.counterMu.Unlock()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Histogram is used for generating rich metrics, such as percentiles, from
|
|
|
+// periodically occurring continuous values.
|
|
|
+func (ms *MetricSystem) Histogram(name string, value float64) {
|
|
|
+ compressedValue := compress(value)
|
|
|
+ ms.histogramMu.RLock()
|
|
|
+ _, present := ms.histogramCache[name][compressedValue]
|
|
|
+ if present {
|
|
|
+ atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
|
|
|
+ ms.histogramMu.RUnlock()
|
|
|
+ } else {
|
|
|
+ ms.histogramMu.RUnlock()
|
|
|
+ ms.histogramMu.Lock()
|
|
|
+ _, syncPresent := ms.histogramCache[name][compressedValue]
|
|
|
+ if !syncPresent {
|
|
|
+ var z uint64
|
|
|
+ _, mapPresent := ms.histogramCache[name]
|
|
|
+ if !mapPresent {
|
|
|
+ ms.histogramCache[name] = make(map[int16]*uint64)
|
|
|
+ }
|
|
|
+ ms.histogramCache[name][compressedValue] = &z
|
|
|
+ }
|
|
|
+ atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
|
|
|
+ ms.histogramMu.Unlock()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// RegisterGaugeFunc registers a function to be called at each interval
|
|
|
+// whose return value will be used to populate the <name> metric.
|
|
|
+func (ms *MetricSystem) RegisterGaugeFunc(name string, f func() float64) {
|
|
|
+ ms.gaugeFuncsMu.Lock()
|
|
|
+ ms.gaugeFuncs[name] = f
|
|
|
+ ms.gaugeFuncsMu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+// DeregisterGaugeFunc deregisters a function for the <name> metric.
|
|
|
+func (ms *MetricSystem) DeregisterGaugeFunc(name string) {
|
|
|
+ ms.gaugeFuncsMu.Lock()
|
|
|
+ delete(ms.gaugeFuncs, name)
|
|
|
+ ms.gaugeFuncsMu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+// compress takes a float64 and lossily shrinks it to an int16 to facilitate
|
|
|
+// bucketing of histogram values, staying within 1% of the true value. This
|
|
|
+// fails for large values of 1e142 and above, and is inaccurate for values
|
|
|
+// closer to 0 than +/- 0.51 or +/- math.Inf.
|
|
|
+func compress(value float64) int16 {
|
|
|
+ i := int16(precision*math.Log(1.0+math.Abs(value)) + 0.5)
|
|
|
+ if value < 0 {
|
|
|
+ return -1 * i
|
|
|
+ }
|
|
|
+ return i
|
|
|
+}
|
|
|
+
|
|
|
+// decompress takes a lossily shrunk int16 and returns a float64 within 1% of
|
|
|
+// the original float64 passed to compress.
|
|
|
+func decompress(compressedValue int16) float64 {
|
|
|
+ f := math.Exp(math.Abs(float64(compressedValue))/precision) - 1.0
|
|
|
+ if compressedValue < 0 {
|
|
|
+ return -1.0 * f
|
|
|
+ }
|
|
|
+ return f
|
|
|
+}
|
|
|
+
|
|
|
+// processHistograms derives rich metrics from histograms, currently
|
|
|
+// percentiles, sum, count, and mean.
|
|
|
+func (ms *MetricSystem) processHistograms(name string,
|
|
|
+ valuesToCounts map[int16]*uint64) map[string]float64 {
|
|
|
+ output := make(map[string]float64)
|
|
|
+ totalSum := float64(0)
|
|
|
+ totalCount := uint64(0)
|
|
|
+ proportions := make([]proportion, 0, len(valuesToCounts))
|
|
|
+ for compressedValue, count := range valuesToCounts {
|
|
|
+ value := decompress(compressedValue)
|
|
|
+ totalSum += value * float64(*count)
|
|
|
+ totalCount += *count
|
|
|
+ proportions = append(proportions, proportion{Value: value, Count: *count})
|
|
|
+ }
|
|
|
+
|
|
|
+ sumName := fmt.Sprintf("%s_sum", name)
|
|
|
+ countName := fmt.Sprintf("%s_count", name)
|
|
|
+ avgName := fmt.Sprintf("%s_avg", name)
|
|
|
+
|
|
|
+ // increment interval sum and count
|
|
|
+ output[countName] = float64(totalCount)
|
|
|
+ output[sumName] = totalSum
|
|
|
+ output[avgName] = totalSum / float64(totalCount)
|
|
|
+
|
|
|
+ // increment aggregate sum and count
|
|
|
+ ms.histogramCountMu.RLock()
|
|
|
+ _, present := ms.histogramCountStore[sumName]
|
|
|
+ if !present {
|
|
|
+ ms.histogramCountMu.RUnlock()
|
|
|
+ ms.histogramCountMu.Lock()
|
|
|
+ _, syncPresent := ms.histogramCountStore[sumName]
|
|
|
+ if !syncPresent {
|
|
|
+ var x uint64
|
|
|
+ ms.histogramCountStore[sumName] = &x
|
|
|
+ var z uint64
|
|
|
+ ms.histogramCountStore[countName] = &z
|
|
|
+ }
|
|
|
+ ms.histogramCountMu.Unlock()
|
|
|
+ ms.histogramCountMu.RLock()
|
|
|
+ }
|
|
|
+ atomic.AddUint64(ms.histogramCountStore[sumName], uint64(totalSum))
|
|
|
+ atomic.AddUint64(ms.histogramCountStore[countName], totalCount)
|
|
|
+ ms.histogramCountMu.RUnlock()
|
|
|
+
|
|
|
+ for label, p := range ms.percentiles {
|
|
|
+ value, err := percentile(totalCount, proportions, p)
|
|
|
+ if err != nil {
|
|
|
+ glog.Errorf("unable to calculate percentile: %s", err)
|
|
|
+ } else {
|
|
|
+ output[fmt.Sprintf(label, name)] = value
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return output
|
|
|
+}
|
|
|
+
|
|
|
+// These next 3 methods are for the implementation of sort.Interface
|
|
|
+
|
|
|
+func (s proportionArray) Len() int {
|
|
|
+ return len(s)
|
|
|
+}
|
|
|
+
|
|
|
+func (s proportionArray) Less(i, j int) bool {
|
|
|
+ return s[i].Value < s[j].Value
|
|
|
+}
|
|
|
+
|
|
|
+func (s proportionArray) Swap(i, j int) {
|
|
|
+ s[i], s[j] = s[j], s[i]
|
|
|
+}
|
|
|
+
|
|
|
+// percentile calculates a percentile represented as a float64 between 0 and 1
|
|
|
+// inclusive from a proportionArray. totalCount is the sum of all counts of
|
|
|
+// elements in the proportionArray.
|
|
|
+func percentile(totalCount uint64, proportions proportionArray,
|
|
|
+ percentile float64) (float64, error) {
|
|
|
+ //TODO(tyler) handle multiple percentiles at once for efficiency
|
|
|
+ sort.Sort(proportions)
|
|
|
+ sofar := uint64(0)
|
|
|
+ for _, proportion := range proportions {
|
|
|
+ sofar += proportion.Count
|
|
|
+ if float64(sofar)/float64(totalCount) >= percentile {
|
|
|
+ return proportion.Value, nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0, errors.New("Invalid percentile. Should be between 0 and 1.")
|
|
|
+}
|
|
|
+
|
|
|
+func (ms *MetricSystem) collectRawMetrics() *RawMetricSet {
|
|
|
+ normalizedInterval := time.Unix(0, time.Now().UnixNano()/
|
|
|
+ ms.interval.Nanoseconds()*
|
|
|
+ ms.interval.Nanoseconds())
|
|
|
+
|
|
|
+ ms.counterMu.Lock()
|
|
|
+ freshCounters := ms.counterCache
|
|
|
+ ms.counterCache = make(map[string]*uint64)
|
|
|
+ ms.counterMu.Unlock()
|
|
|
+
|
|
|
+ rates := make(map[string]uint64)
|
|
|
+ for name, count := range freshCounters {
|
|
|
+ rates[name] = *count
|
|
|
+ }
|
|
|
+
|
|
|
+ counters := make(map[string]uint64)
|
|
|
+ ms.counterStoreMu.RLock()
|
|
|
+ // update counters
|
|
|
+ for name, count := range freshCounters {
|
|
|
+ _, exists := ms.counterStore[name]
|
|
|
+ // only take a write lock when it's a totally new counter
|
|
|
+ if !exists {
|
|
|
+ ms.counterStoreMu.RUnlock()
|
|
|
+ ms.counterStoreMu.Lock()
|
|
|
+ _, syncExists := ms.counterStore[name]
|
|
|
+ if !syncExists {
|
|
|
+ var z uint64
|
|
|
+ ms.counterStore[name] = &z
|
|
|
+ }
|
|
|
+ ms.counterStoreMu.Unlock()
|
|
|
+ ms.counterStoreMu.RLock()
|
|
|
+ }
|
|
|
+ atomic.AddUint64(ms.counterStore[name], *count)
|
|
|
+ }
|
|
|
+ // copy counters for export
|
|
|
+ for name, count := range ms.counterStore {
|
|
|
+ counters[name] = *count
|
|
|
+ }
|
|
|
+ ms.counterStoreMu.RUnlock()
|
|
|
+
|
|
|
+ ms.histogramMu.Lock()
|
|
|
+ histograms := ms.histogramCache
|
|
|
+ ms.histogramCache = make(map[string]map[int16]*uint64)
|
|
|
+ ms.histogramMu.Unlock()
|
|
|
+
|
|
|
+ ms.gaugeFuncsMu.Lock()
|
|
|
+ gauges := make(map[string]float64)
|
|
|
+ for name, f := range ms.gaugeFuncs {
|
|
|
+ gauges[name] = f()
|
|
|
+ }
|
|
|
+ ms.gaugeFuncsMu.Unlock()
|
|
|
+
|
|
|
+ return &RawMetricSet{
|
|
|
+ Time: normalizedInterval,
|
|
|
+ Counters: counters,
|
|
|
+ Rates: rates,
|
|
|
+ Histograms: histograms,
|
|
|
+ Gauges: gauges,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// processMetrics (potentially slowly) creates human consumable metrics from a
|
|
|
+// RawMetricSet, deriving rich statistics from histograms such as percentiles.
|
|
|
+func (ms *MetricSystem) processMetrics(
|
|
|
+ rawMetrics *RawMetricSet) *ProcessedMetricSet {
|
|
|
+ metrics := make(map[string]float64)
|
|
|
+
|
|
|
+ for name, count := range rawMetrics.Counters {
|
|
|
+ metrics[name] = float64(count)
|
|
|
+ }
|
|
|
+
|
|
|
+ for name, count := range rawMetrics.Rates {
|
|
|
+ metrics[fmt.Sprintf("%s_rate", name)] = float64(count)
|
|
|
+ }
|
|
|
+
|
|
|
+ for name, valuesToCounts := range rawMetrics.Histograms {
|
|
|
+ for histoName, histoValue := range ms.processHistograms(name, valuesToCounts) {
|
|
|
+ metrics[histoName] = histoValue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for name, value := range rawMetrics.Gauges {
|
|
|
+ metrics[name] = value
|
|
|
+ }
|
|
|
+
|
|
|
+ return &ProcessedMetricSet{Time: rawMetrics.Time, Metrics: metrics}
|
|
|
+}
|
|
|
+
|
|
|
+func (ms *MetricSystem) updateSubscribers() {
|
|
|
+ ms.subscribersMu.Lock()
|
|
|
+ defer ms.subscribersMu.Unlock()
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case subscriber := <-ms.subscribeToRawMetrics:
|
|
|
+ ms.rawSubscribers[subscriber] = struct{}{}
|
|
|
+ case unsubscriber := <-ms.unsubscribeFromRawMetrics:
|
|
|
+ delete(ms.rawSubscribers, unsubscriber)
|
|
|
+ case subscriber := <-ms.subscribeToProcessedMetrics:
|
|
|
+ ms.processedSubscribers[subscriber] = struct{}{}
|
|
|
+ case unsubscriber := <-ms.unsubscribeFromProcessedMetrics:
|
|
|
+ delete(ms.processedSubscribers, unsubscriber)
|
|
|
+ default: // no changes in subscribers
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// reaper wakes up every <interval> seconds,
|
|
|
+// collects and processes metrics, and pushes
|
|
|
+// them to the corresponding subscribing channels.
|
|
|
+func (ms *MetricSystem) reaper() {
|
|
|
+ ms.reaping = true
|
|
|
+
|
|
|
+ // create goroutine pool to handle multiple processing tasks at once
|
|
|
+ processChan := make(chan func(), 16)
|
|
|
+ for i := 0; i < int(math.Max(float64(runtime.NumCPU()/4), 4)); i++ {
|
|
|
+ go func() {
|
|
|
+ for {
|
|
|
+ c, ok := <-processChan
|
|
|
+ if !ok {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ c()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ // begin reaper main loop
|
|
|
+ for {
|
|
|
+ // sleep until the next interval, or die if shutdownChan is closed
|
|
|
+ tts := ms.interval.Nanoseconds() -
|
|
|
+ (time.Now().UnixNano() % ms.interval.Nanoseconds())
|
|
|
+ select {
|
|
|
+ case <-time.After(time.Duration(tts)):
|
|
|
+ case <-ms.shutdownChan:
|
|
|
+ ms.reaping = false
|
|
|
+ close(processChan)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ rawMetrics := ms.collectRawMetrics()
|
|
|
+
|
|
|
+ ms.updateSubscribers()
|
|
|
+
|
|
|
+ // broadcast raw metrics
|
|
|
+ for subscriber := range ms.rawSubscribers {
|
|
|
+ // new subscribers get all counters, otherwise just the new diffs
|
|
|
+ select {
|
|
|
+ case subscriber <- rawMetrics:
|
|
|
+ delete(ms.rawBadSubscribers, subscriber)
|
|
|
+ default:
|
|
|
+ ms.rawBadSubscribers[subscriber]++
|
|
|
+ glog.Error("a raw subscriber has allowed their channel to fill up. ",
|
|
|
+ "dropping their metrics on the floor rather than blocking.")
|
|
|
+ if ms.rawBadSubscribers[subscriber] >= 2 {
|
|
|
+ glog.Error("this raw subscriber has caused dropped metrics at ",
|
|
|
+ "least 3 times in a row. closing the channel.")
|
|
|
+ delete(ms.rawSubscribers, subscriber)
|
|
|
+ close(subscriber)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Perform the rest in another goroutine since processing is not
|
|
|
+ // gauranteed to complete before the interval is up.
|
|
|
+ sendProcessed := func() {
|
|
|
+ // this is potentially expensive if there is a massive number of metrics
|
|
|
+ processedMetrics := ms.processMetrics(rawMetrics)
|
|
|
+
|
|
|
+ // add aggregate mean
|
|
|
+ for name := range rawMetrics.Histograms {
|
|
|
+ ms.histogramCountMu.RLock()
|
|
|
+ aggCountPtr, countPresent :=
|
|
|
+ ms.histogramCountStore[fmt.Sprintf("%s_count", name)]
|
|
|
+ aggCount := atomic.LoadUint64(aggCountPtr)
|
|
|
+ aggSumPtr, sumPresent :=
|
|
|
+ ms.histogramCountStore[fmt.Sprintf("%s_sum", name)]
|
|
|
+ aggSum := atomic.LoadUint64(aggSumPtr)
|
|
|
+ ms.histogramCountMu.RUnlock()
|
|
|
+
|
|
|
+ if countPresent && sumPresent && aggCount > 0 {
|
|
|
+ processedMetrics.Metrics[fmt.Sprintf("%s_agg_avg", name)] =
|
|
|
+ float64(aggSum / aggCount)
|
|
|
+ processedMetrics.Metrics[fmt.Sprintf("%s_agg_count", name)] =
|
|
|
+ float64(aggCount)
|
|
|
+ processedMetrics.Metrics[fmt.Sprintf("%s_agg_sum", name)] =
|
|
|
+ float64(aggSum)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // broadcast processed metrics
|
|
|
+ ms.subscribersMu.Lock()
|
|
|
+ for subscriber := range ms.processedSubscribers {
|
|
|
+ select {
|
|
|
+ case subscriber <- processedMetrics:
|
|
|
+ delete(ms.processedBadSubscribers, subscriber)
|
|
|
+ default:
|
|
|
+ ms.processedBadSubscribers[subscriber]++
|
|
|
+ glog.Error("a subscriber has allowed their channel to fill up. ",
|
|
|
+ "dropping their metrics on the floor rather than blocking.")
|
|
|
+ if ms.processedBadSubscribers[subscriber] >= 2 {
|
|
|
+ glog.Error("this subscriber has caused dropped metrics at ",
|
|
|
+ "least 3 times in a row. closing the channel.")
|
|
|
+ delete(ms.processedSubscribers, subscriber)
|
|
|
+ close(subscriber)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ms.subscribersMu.Unlock()
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case processChan <- sendProcessed:
|
|
|
+ default:
|
|
|
+ // processChan has filled up, this metric load is not sustainable
|
|
|
+ glog.Errorf("processing of metrics is taking longer than this node can "+
|
|
|
+ "handle. dropping this entire interval of %s metrics on the "+
|
|
|
+ "floor rather than blocking the reaper.", rawMetrics.Time)
|
|
|
+ }
|
|
|
+ } // end main reaper loop
|
|
|
+}
|
|
|
+
|
|
|
+// Start spawns a goroutine for merging metrics into caches from
|
|
|
+// metric submitters, and a reaper goroutine that harvests metrics at the
|
|
|
+// default interval of every 60 seconds.
|
|
|
+func (ms *MetricSystem) Start() {
|
|
|
+ if !ms.reaping {
|
|
|
+ go ms.reaper()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Stop shuts down a MetricSystem
|
|
|
+func (ms *MetricSystem) Stop() {
|
|
|
+ close(ms.shutdownChan)
|
|
|
+}
|