123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- // Copyright 2014 The Cockroach Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- // implied. See the License for the specific language governing
- // permissions and limitations under the License. See the AUTHORS file
- // for names of contributors.
- //
- // Author: Tyler Neely (t@jujit.su)
- // IMPORTANT: only subscribe to the metric stream
- // using buffered channels that are regularly
- // flushed, as reaper will NOT block while trying
- // to send metrics to a subscriber, and will ignore
- // a subscriber if they fail to clear their channel
- // 3 times in a row!
- package loghisto
- import (
- "errors"
- "fmt"
- "math"
- "runtime"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- "github.com/golang/glog"
- )
- const (
- // precision effects the bucketing used during histogram value compression.
- precision = 100
- )
- // ProcessedMetricSet contains human-readable metrics that may also be
- // suitable for storage in time-series databases.
- type ProcessedMetricSet struct {
- Time time.Time
- Metrics map[string]float64
- }
- // RawMetricSet contains metrics in a form that supports generation of
- // percentiles and other rich statistics.
- type RawMetricSet struct {
- Time time.Time
- Counters map[string]uint64
- Rates map[string]uint64
- Histograms map[string]map[int16]*uint64
- Gauges map[string]float64
- }
- // TimerToken facilitates concurrent timings of durations of the same label.
- type TimerToken struct {
- Name string
- Start time.Time
- MetricSystem *MetricSystem
- }
- // proportion is a compact value with a corresponding count of
- // occurrences in this interval.
- type proportion struct {
- Value float64
- Count uint64
- }
- // proportionArray is a sortable collection of proportion types.
- type proportionArray []proportion
- // MetricSystem facilitates the collection and distribution of metrics.
- type MetricSystem struct {
- // percentiles is a mapping from labels to desired percentiles to be
- // calculated by the MetricSystem
- percentiles map[string]float64
- // interval is the duration between collections and broadcasts of metrics
- // to subscribers.
- interval time.Duration
- // subscribeToRawMetrics allows subscription to a RawMetricSet generated
- // by reaper at the end of each interval on a sent channel.
- subscribeToRawMetrics chan chan *RawMetricSet
- // unsubscribeFromRawMetrics allows subscribers to unsubscribe from
- // receiving a RawMetricSet on the sent channel.
- unsubscribeFromRawMetrics chan chan *RawMetricSet
- // subscribeToProcessedMetrics allows subscription to a ProcessedMetricSet
- // generated by reaper at the end of each interval on a sent channel.
- subscribeToProcessedMetrics chan chan *ProcessedMetricSet
- // unsubscribeFromProcessedMetrics allows subscribers to unsubscribe from
- // receiving a ProcessedMetricSet on the sent channel.
- unsubscribeFromProcessedMetrics chan chan *ProcessedMetricSet
- // rawSubscribers stores current subscribers to RawMetrics
- rawSubscribers map[chan *RawMetricSet]struct{}
- // rawBadSubscribers tracks misbehaving subscribers who do not clear their
- // subscription channels regularly.
- rawBadSubscribers map[chan *RawMetricSet]int
- // processedSubscribers stores current subscribers to ProcessedMetrics
- processedSubscribers map[chan *ProcessedMetricSet]struct{}
- // processedBadSubscribers tracks misbehaving subscribers who do not clear
- // their subscription channels regularly.
- processedBadSubscribers map[chan *ProcessedMetricSet]int
- // subscribersMu controls access to subscription structures
- subscribersMu sync.RWMutex
- // counterStore maintains the total counts of counters.
- counterStore map[string]*uint64
- counterStoreMu sync.RWMutex
- // counterCache aggregates new Counters until they are collected by reaper().
- counterCache map[string]*uint64
- // counterMu controls access to counterCache.
- counterMu sync.RWMutex
- // histogramCache aggregates Histograms until they are collected by reaper().
- histogramCache map[string]map[int16]*uint64
- // histogramMu controls access to histogramCache.
- histogramMu sync.RWMutex
- // histogramCountStore keeps track of aggregate counts and sums for aggregate
- // mean calculation.
- histogramCountStore map[string]*uint64
- // histogramCountMu controls access to the histogramCountStore.
- histogramCountMu sync.RWMutex
- // gaugeFuncs maps metrics to functions used for calculating their value
- gaugeFuncs map[string]func() float64
- // gaugeFuncsMu controls access to the gaugeFuncs map.
- gaugeFuncsMu sync.Mutex
- // Has reaper() been started?
- reaping bool
- // Close this to bring down this MetricSystem
- shutdownChan chan struct{}
- }
- // Metrics is the default metric system, which collects and broadcasts metrics
- // to subscribers once every 60 seconds. Also includes default system stats.
- var Metrics = NewMetricSystem(60*time.Second, true)
- // NewMetricSystem returns a new metric system that collects and broadcasts
- // metrics after each interval.
- func NewMetricSystem(interval time.Duration, sysStats bool) *MetricSystem {
- ms := &MetricSystem{
- percentiles: map[string]float64{
- "%s_min": 0,
- "%s_50": .5,
- "%s_75": .75,
- "%s_90": .9,
- "%s_95": .95,
- "%s_99": .99,
- "%s_99.9": .999,
- "%s_99.99": .9999,
- "%s_max": 1,
- },
- interval: interval,
- subscribeToRawMetrics: make(chan chan *RawMetricSet, 64),
- unsubscribeFromRawMetrics: make(chan chan *RawMetricSet, 64),
- subscribeToProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
- unsubscribeFromProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
- rawSubscribers: make(map[chan *RawMetricSet]struct{}),
- rawBadSubscribers: make(map[chan *RawMetricSet]int),
- processedSubscribers: make(map[chan *ProcessedMetricSet]struct{}),
- processedBadSubscribers: make(map[chan *ProcessedMetricSet]int),
- counterStore: make(map[string]*uint64),
- counterCache: make(map[string]*uint64),
- histogramCache: make(map[string]map[int16]*uint64),
- histogramCountStore: make(map[string]*uint64),
- gaugeFuncs: make(map[string]func() float64),
- shutdownChan: make(chan struct{}),
- }
- if sysStats {
- ms.gaugeFuncsMu.Lock()
- ms.gaugeFuncs["sys.Alloc"] = func() float64 {
- memStats := new(runtime.MemStats)
- runtime.ReadMemStats(memStats)
- return float64(memStats.Alloc)
- }
- ms.gaugeFuncs["sys.NumGC"] = func() float64 {
- memStats := new(runtime.MemStats)
- runtime.ReadMemStats(memStats)
- return float64(memStats.NumGC)
- }
- ms.gaugeFuncs["sys.PauseTotalNs"] = func() float64 {
- memStats := new(runtime.MemStats)
- runtime.ReadMemStats(memStats)
- return float64(memStats.PauseTotalNs)
- }
- ms.gaugeFuncs["sys.NumGoroutine"] = func() float64 {
- return float64(runtime.NumGoroutine())
- }
- ms.gaugeFuncsMu.Unlock()
- }
- return ms
- }
- // SpecifyPercentiles allows users to override the default collected
- // and reported percentiles.
- func (ms *MetricSystem) SpecifyPercentiles(percentiles map[string]float64) {
- ms.percentiles = percentiles
- }
- // SubscribeToRawMetrics registers a channel to receive RawMetricSets
- // periodically generated by reaper at each interval.
- func (ms *MetricSystem) SubscribeToRawMetrics(metricStream chan *RawMetricSet) {
- ms.subscribeToRawMetrics <- metricStream
- }
- // UnsubscribeFromRawMetrics registers a channel to receive RawMetricSets
- // periodically generated by reaper at each interval.
- func (ms *MetricSystem) UnsubscribeFromRawMetrics(
- metricStream chan *RawMetricSet) {
- ms.unsubscribeFromRawMetrics <- metricStream
- }
- // SubscribeToProcessedMetrics registers a channel to receive
- // ProcessedMetricSets periodically generated by reaper at each interval.
- func (ms *MetricSystem) SubscribeToProcessedMetrics(
- metricStream chan *ProcessedMetricSet) {
- ms.subscribeToProcessedMetrics <- metricStream
- }
- // UnsubscribeFromProcessedMetrics registers a channel to receive
- // ProcessedMetricSets periodically generated by reaper at each interval.
- func (ms *MetricSystem) UnsubscribeFromProcessedMetrics(
- metricStream chan *ProcessedMetricSet) {
- ms.unsubscribeFromProcessedMetrics <- metricStream
- }
- // StartTimer begins a timer and returns a token which is required for halting
- // the timer. This allows for concurrent timings under the same name.
- func (ms *MetricSystem) StartTimer(name string) TimerToken {
- return TimerToken{
- Name: name,
- Start: time.Now(),
- MetricSystem: ms,
- }
- }
- // Stop stops a timer given by StartTimer, submits a Histogram of its duration
- // in nanoseconds, and returns its duration in nanoseconds.
- func (tt *TimerToken) Stop() time.Duration {
- duration := time.Since(tt.Start)
- tt.MetricSystem.Histogram(tt.Name, float64(duration.Nanoseconds()))
- return duration
- }
- // Counter is used for recording a running count of the total occurrences of
- // a particular event. A rate is also exported for the amount that a counter
- // has increased during an interval of this MetricSystem.
- func (ms *MetricSystem) Counter(name string, amount uint64) {
- ms.counterMu.RLock()
- _, exists := ms.counterCache[name]
- // perform lock promotion when we need more control
- if exists {
- atomic.AddUint64(ms.counterCache[name], amount)
- ms.counterMu.RUnlock()
- } else {
- ms.counterMu.RUnlock()
- ms.counterMu.Lock()
- _, syncExists := ms.counterCache[name]
- if !syncExists {
- var z uint64
- ms.counterCache[name] = &z
- }
- atomic.AddUint64(ms.counterCache[name], amount)
- ms.counterMu.Unlock()
- }
- }
- // Histogram is used for generating rich metrics, such as percentiles, from
- // periodically occurring continuous values.
- func (ms *MetricSystem) Histogram(name string, value float64) {
- compressedValue := compress(value)
- ms.histogramMu.RLock()
- _, present := ms.histogramCache[name][compressedValue]
- if present {
- atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
- ms.histogramMu.RUnlock()
- } else {
- ms.histogramMu.RUnlock()
- ms.histogramMu.Lock()
- _, syncPresent := ms.histogramCache[name][compressedValue]
- if !syncPresent {
- var z uint64
- _, mapPresent := ms.histogramCache[name]
- if !mapPresent {
- ms.histogramCache[name] = make(map[int16]*uint64)
- }
- ms.histogramCache[name][compressedValue] = &z
- }
- atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
- ms.histogramMu.Unlock()
- }
- }
- // RegisterGaugeFunc registers a function to be called at each interval
- // whose return value will be used to populate the <name> metric.
- func (ms *MetricSystem) RegisterGaugeFunc(name string, f func() float64) {
- ms.gaugeFuncsMu.Lock()
- ms.gaugeFuncs[name] = f
- ms.gaugeFuncsMu.Unlock()
- }
- // DeregisterGaugeFunc deregisters a function for the <name> metric.
- func (ms *MetricSystem) DeregisterGaugeFunc(name string) {
- ms.gaugeFuncsMu.Lock()
- delete(ms.gaugeFuncs, name)
- ms.gaugeFuncsMu.Unlock()
- }
- // compress takes a float64 and lossily shrinks it to an int16 to facilitate
- // bucketing of histogram values, staying within 1% of the true value. This
- // fails for large values of 1e142 and above, and is inaccurate for values
- // closer to 0 than +/- 0.51 or +/- math.Inf.
- func compress(value float64) int16 {
- i := int16(precision*math.Log(1.0+math.Abs(value)) + 0.5)
- if value < 0 {
- return -1 * i
- }
- return i
- }
- // decompress takes a lossily shrunk int16 and returns a float64 within 1% of
- // the original float64 passed to compress.
- func decompress(compressedValue int16) float64 {
- f := math.Exp(math.Abs(float64(compressedValue))/precision) - 1.0
- if compressedValue < 0 {
- return -1.0 * f
- }
- return f
- }
- // processHistograms derives rich metrics from histograms, currently
- // percentiles, sum, count, and mean.
- func (ms *MetricSystem) processHistograms(name string,
- valuesToCounts map[int16]*uint64) map[string]float64 {
- output := make(map[string]float64)
- totalSum := float64(0)
- totalCount := uint64(0)
- proportions := make([]proportion, 0, len(valuesToCounts))
- for compressedValue, count := range valuesToCounts {
- value := decompress(compressedValue)
- totalSum += value * float64(*count)
- totalCount += *count
- proportions = append(proportions, proportion{Value: value, Count: *count})
- }
- sumName := fmt.Sprintf("%s_sum", name)
- countName := fmt.Sprintf("%s_count", name)
- avgName := fmt.Sprintf("%s_avg", name)
- // increment interval sum and count
- output[countName] = float64(totalCount)
- output[sumName] = totalSum
- output[avgName] = totalSum / float64(totalCount)
- // increment aggregate sum and count
- ms.histogramCountMu.RLock()
- _, present := ms.histogramCountStore[sumName]
- if !present {
- ms.histogramCountMu.RUnlock()
- ms.histogramCountMu.Lock()
- _, syncPresent := ms.histogramCountStore[sumName]
- if !syncPresent {
- var x uint64
- ms.histogramCountStore[sumName] = &x
- var z uint64
- ms.histogramCountStore[countName] = &z
- }
- ms.histogramCountMu.Unlock()
- ms.histogramCountMu.RLock()
- }
- atomic.AddUint64(ms.histogramCountStore[sumName], uint64(totalSum))
- atomic.AddUint64(ms.histogramCountStore[countName], totalCount)
- ms.histogramCountMu.RUnlock()
- for label, p := range ms.percentiles {
- value, err := percentile(totalCount, proportions, p)
- if err != nil {
- glog.Errorf("unable to calculate percentile: %s", err)
- } else {
- output[fmt.Sprintf(label, name)] = value
- }
- }
- return output
- }
- // These next 3 methods are for the implementation of sort.Interface
- func (s proportionArray) Len() int {
- return len(s)
- }
- func (s proportionArray) Less(i, j int) bool {
- return s[i].Value < s[j].Value
- }
- func (s proportionArray) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
- }
- // percentile calculates a percentile represented as a float64 between 0 and 1
- // inclusive from a proportionArray. totalCount is the sum of all counts of
- // elements in the proportionArray.
- func percentile(totalCount uint64, proportions proportionArray,
- percentile float64) (float64, error) {
- //TODO(tyler) handle multiple percentiles at once for efficiency
- sort.Sort(proportions)
- sofar := uint64(0)
- for _, proportion := range proportions {
- sofar += proportion.Count
- if float64(sofar)/float64(totalCount) >= percentile {
- return proportion.Value, nil
- }
- }
- return 0, errors.New("Invalid percentile. Should be between 0 and 1.")
- }
- func (ms *MetricSystem) collectRawMetrics() *RawMetricSet {
- normalizedInterval := time.Unix(0, time.Now().UnixNano()/
- ms.interval.Nanoseconds()*
- ms.interval.Nanoseconds())
- ms.counterMu.Lock()
- freshCounters := ms.counterCache
- ms.counterCache = make(map[string]*uint64)
- ms.counterMu.Unlock()
- rates := make(map[string]uint64)
- for name, count := range freshCounters {
- rates[name] = *count
- }
- counters := make(map[string]uint64)
- ms.counterStoreMu.RLock()
- // update counters
- for name, count := range freshCounters {
- _, exists := ms.counterStore[name]
- // only take a write lock when it's a totally new counter
- if !exists {
- ms.counterStoreMu.RUnlock()
- ms.counterStoreMu.Lock()
- _, syncExists := ms.counterStore[name]
- if !syncExists {
- var z uint64
- ms.counterStore[name] = &z
- }
- ms.counterStoreMu.Unlock()
- ms.counterStoreMu.RLock()
- }
- atomic.AddUint64(ms.counterStore[name], *count)
- }
- // copy counters for export
- for name, count := range ms.counterStore {
- counters[name] = *count
- }
- ms.counterStoreMu.RUnlock()
- ms.histogramMu.Lock()
- histograms := ms.histogramCache
- ms.histogramCache = make(map[string]map[int16]*uint64)
- ms.histogramMu.Unlock()
- ms.gaugeFuncsMu.Lock()
- gauges := make(map[string]float64)
- for name, f := range ms.gaugeFuncs {
- gauges[name] = f()
- }
- ms.gaugeFuncsMu.Unlock()
- return &RawMetricSet{
- Time: normalizedInterval,
- Counters: counters,
- Rates: rates,
- Histograms: histograms,
- Gauges: gauges,
- }
- }
- // processMetrics (potentially slowly) creates human consumable metrics from a
- // RawMetricSet, deriving rich statistics from histograms such as percentiles.
- func (ms *MetricSystem) processMetrics(
- rawMetrics *RawMetricSet) *ProcessedMetricSet {
- metrics := make(map[string]float64)
- for name, count := range rawMetrics.Counters {
- metrics[name] = float64(count)
- }
- for name, count := range rawMetrics.Rates {
- metrics[fmt.Sprintf("%s_rate", name)] = float64(count)
- }
- for name, valuesToCounts := range rawMetrics.Histograms {
- for histoName, histoValue := range ms.processHistograms(name, valuesToCounts) {
- metrics[histoName] = histoValue
- }
- }
- for name, value := range rawMetrics.Gauges {
- metrics[name] = value
- }
- return &ProcessedMetricSet{Time: rawMetrics.Time, Metrics: metrics}
- }
- func (ms *MetricSystem) updateSubscribers() {
- ms.subscribersMu.Lock()
- defer ms.subscribersMu.Unlock()
- for {
- select {
- case subscriber := <-ms.subscribeToRawMetrics:
- ms.rawSubscribers[subscriber] = struct{}{}
- case unsubscriber := <-ms.unsubscribeFromRawMetrics:
- delete(ms.rawSubscribers, unsubscriber)
- case subscriber := <-ms.subscribeToProcessedMetrics:
- ms.processedSubscribers[subscriber] = struct{}{}
- case unsubscriber := <-ms.unsubscribeFromProcessedMetrics:
- delete(ms.processedSubscribers, unsubscriber)
- default: // no changes in subscribers
- return
- }
- }
- }
- // reaper wakes up every <interval> seconds,
- // collects and processes metrics, and pushes
- // them to the corresponding subscribing channels.
- func (ms *MetricSystem) reaper() {
- ms.reaping = true
- // create goroutine pool to handle multiple processing tasks at once
- processChan := make(chan func(), 16)
- for i := 0; i < int(math.Max(float64(runtime.NumCPU()/4), 4)); i++ {
- go func() {
- for {
- c, ok := <-processChan
- if !ok {
- return
- }
- c()
- }
- }()
- }
- // begin reaper main loop
- for {
- // sleep until the next interval, or die if shutdownChan is closed
- tts := ms.interval.Nanoseconds() -
- (time.Now().UnixNano() % ms.interval.Nanoseconds())
- select {
- case <-time.After(time.Duration(tts)):
- case <-ms.shutdownChan:
- ms.reaping = false
- close(processChan)
- return
- }
- rawMetrics := ms.collectRawMetrics()
- ms.updateSubscribers()
- // broadcast raw metrics
- for subscriber := range ms.rawSubscribers {
- // new subscribers get all counters, otherwise just the new diffs
- select {
- case subscriber <- rawMetrics:
- delete(ms.rawBadSubscribers, subscriber)
- default:
- ms.rawBadSubscribers[subscriber]++
- glog.Error("a raw subscriber has allowed their channel to fill up. ",
- "dropping their metrics on the floor rather than blocking.")
- if ms.rawBadSubscribers[subscriber] >= 2 {
- glog.Error("this raw subscriber has caused dropped metrics at ",
- "least 3 times in a row. closing the channel.")
- delete(ms.rawSubscribers, subscriber)
- close(subscriber)
- }
- }
- }
- // Perform the rest in another goroutine since processing is not
- // gauranteed to complete before the interval is up.
- sendProcessed := func() {
- // this is potentially expensive if there is a massive number of metrics
- processedMetrics := ms.processMetrics(rawMetrics)
- // add aggregate mean
- for name := range rawMetrics.Histograms {
- ms.histogramCountMu.RLock()
- aggCountPtr, countPresent :=
- ms.histogramCountStore[fmt.Sprintf("%s_count", name)]
- aggCount := atomic.LoadUint64(aggCountPtr)
- aggSumPtr, sumPresent :=
- ms.histogramCountStore[fmt.Sprintf("%s_sum", name)]
- aggSum := atomic.LoadUint64(aggSumPtr)
- ms.histogramCountMu.RUnlock()
- if countPresent && sumPresent && aggCount > 0 {
- processedMetrics.Metrics[fmt.Sprintf("%s_agg_avg", name)] =
- float64(aggSum / aggCount)
- processedMetrics.Metrics[fmt.Sprintf("%s_agg_count", name)] =
- float64(aggCount)
- processedMetrics.Metrics[fmt.Sprintf("%s_agg_sum", name)] =
- float64(aggSum)
- }
- }
- // broadcast processed metrics
- ms.subscribersMu.Lock()
- for subscriber := range ms.processedSubscribers {
- select {
- case subscriber <- processedMetrics:
- delete(ms.processedBadSubscribers, subscriber)
- default:
- ms.processedBadSubscribers[subscriber]++
- glog.Error("a subscriber has allowed their channel to fill up. ",
- "dropping their metrics on the floor rather than blocking.")
- if ms.processedBadSubscribers[subscriber] >= 2 {
- glog.Error("this subscriber has caused dropped metrics at ",
- "least 3 times in a row. closing the channel.")
- delete(ms.processedSubscribers, subscriber)
- close(subscriber)
- }
- }
- }
- ms.subscribersMu.Unlock()
- }
- select {
- case processChan <- sendProcessed:
- default:
- // processChan has filled up, this metric load is not sustainable
- glog.Errorf("processing of metrics is taking longer than this node can "+
- "handle. dropping this entire interval of %s metrics on the "+
- "floor rather than blocking the reaper.", rawMetrics.Time)
- }
- } // end main reaper loop
- }
- // Start spawns a goroutine for merging metrics into caches from
- // metric submitters, and a reaper goroutine that harvests metrics at the
- // default interval of every 60 seconds.
- func (ms *MetricSystem) Start() {
- if !ms.reaping {
- go ms.reaper()
- }
- }
- // Stop shuts down a MetricSystem
- func (ms *MetricSystem) Stop() {
- close(ms.shutdownChan)
- }
|