summary.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. // Copyright 2014 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package prometheus
  14. import (
  15. "fmt"
  16. "hash/fnv"
  17. "math"
  18. "sort"
  19. "sync"
  20. "time"
  21. "github.com/beorn7/perks/quantile"
  22. "github.com/golang/protobuf/proto"
  23. dto "github.com/prometheus/client_model/go"
  24. )
  25. // quantileLabel is used for the label that defines the quantile in a
  26. // summary.
  27. const quantileLabel = "quantile"
  28. // A Summary captures individual observations from an event or sample stream and
  29. // summarizes them in a manner similar to traditional summary statistics: 1. sum
  30. // of observations, 2. observation count, 3. rank estimations.
  31. //
  32. // A typical use-case is the observation of request latencies. By default, a
  33. // Summary provides the median, the 90th and the 99th percentile of the latency
  34. // as rank estimations.
  35. //
  36. // Note that the rank estimations cannot be aggregated in a meaningful way with
  37. // the Prometheus query language (i.e. you cannot average or add them). If you
  38. // need aggregatable quantiles (e.g. you want the 99th percentile latency of all
  39. // queries served across all instances of a service), consider the Histogram
  40. // metric type. See the Prometheus documentation for more details.
  41. //
  42. // To create Summary instances, use NewSummary.
  43. type Summary interface {
  44. Metric
  45. Collector
  46. // Observe adds a single observation to the summary.
  47. Observe(float64)
  48. }
  49. var (
  50. // DefObjectives are the default Summary quantile values.
  51. DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
  52. errQuantileLabelNotAllowed = fmt.Errorf(
  53. "%q is not allowed as label name in summaries", quantileLabel,
  54. )
  55. )
  56. // Default values for SummaryOpts.
  57. const (
  58. // DefMaxAge is the default duration for which observations stay
  59. // relevant.
  60. DefMaxAge time.Duration = 10 * time.Minute
  61. // DefAgeBuckets is the default number of buckets used to calculate the
  62. // age of observations.
  63. DefAgeBuckets = 5
  64. // DefBufCap is the standard buffer size for collecting Summary observations.
  65. DefBufCap = 500
  66. )
  67. // SummaryOpts bundles the options for creating a Summary metric. It is
  68. // mandatory to set Name and Help to a non-empty string. All other fields are
  69. // optional and can safely be left at their zero value.
  70. type SummaryOpts struct {
  71. // Namespace, Subsystem, and Name are components of the fully-qualified
  72. // name of the Summary (created by joining these components with
  73. // "_"). Only Name is mandatory, the others merely help structuring the
  74. // name. Note that the fully-qualified name of the Summary must be a
  75. // valid Prometheus metric name.
  76. Namespace string
  77. Subsystem string
  78. Name string
  79. // Help provides information about this Summary. Mandatory!
  80. //
  81. // Metrics with the same fully-qualified name must have the same Help
  82. // string.
  83. Help string
  84. // ConstLabels are used to attach fixed labels to this
  85. // Summary. Summaries with the same fully-qualified name must have the
  86. // same label names in their ConstLabels.
  87. //
  88. // Note that in most cases, labels have a value that varies during the
  89. // lifetime of a process. Those labels are usually managed with a
  90. // SummaryVec. ConstLabels serve only special purposes. One is for the
  91. // special case where the value of a label does not change during the
  92. // lifetime of a process, e.g. if the revision of the running binary is
  93. // put into a label. Another, more advanced purpose is if more than one
  94. // Collector needs to collect Summaries with the same fully-qualified
  95. // name. In that case, those Summaries must differ in the values of
  96. // their ConstLabels. See the Collector examples.
  97. //
  98. // If the value of a label never changes (not even between binaries),
  99. // that label most likely should not be a label at all (but part of the
  100. // metric name).
  101. ConstLabels Labels
  102. // Objectives defines the quantile rank estimates with their respective
  103. // absolute error. If Objectives[q] = e, then the value reported
  104. // for q will be the φ-quantile value for some φ between q-e and q+e.
  105. // The default value is DefObjectives.
  106. Objectives map[float64]float64
  107. // MaxAge defines the duration for which an observation stays relevant
  108. // for the summary. Must be positive. The default value is DefMaxAge.
  109. MaxAge time.Duration
  110. // AgeBuckets is the number of buckets used to exclude observations that
  111. // are older than MaxAge from the summary. A higher number has a
  112. // resource penalty, so only increase it if the higher resolution is
  113. // really required. For very high observation rates, you might want to
  114. // reduce the number of age buckets. With only one age bucket, you will
  115. // effectively see a complete reset of the summary each time MaxAge has
  116. // passed. The default value is DefAgeBuckets.
  117. AgeBuckets uint32
  118. // BufCap defines the default sample stream buffer size. The default
  119. // value of DefBufCap should suffice for most uses. If there is a need
  120. // to increase the value, a multiple of 500 is recommended (because that
  121. // is the internal buffer size of the underlying package
  122. // "github.com/bmizerany/perks/quantile").
  123. BufCap uint32
  124. }
  125. // TODO: Great fuck-up with the sliding-window decay algorithm... The Merge
  126. // method of perk/quantile is actually not working as advertised - and it might
  127. // be unfixable, as the underlying algorithm is apparently not capable of
  128. // merging summaries in the first place. To avoid using Merge, we are currently
  129. // adding observations to _each_ age bucket, i.e. the effort to add a sample is
  130. // essentially multiplied by the number of age buckets. When rotating age
  131. // buckets, we empty the previous head stream. On scrape time, we simply take
  132. // the quantiles from the head stream (no merging required). Result: More effort
  133. // on observation time, less effort on scrape time, which is exactly the
  134. // opposite of what we try to accomplish, but at least the results are correct.
  135. //
  136. // The quite elegant previous contraption to merge the age buckets efficiently
  137. // on scrape time (see code up commit 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0)
  138. // can't be used anymore.
  139. // NewSummary creates a new Summary based on the provided SummaryOpts.
  140. func NewSummary(opts SummaryOpts) Summary {
  141. return newSummary(
  142. NewDesc(
  143. BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
  144. opts.Help,
  145. nil,
  146. opts.ConstLabels,
  147. ),
  148. opts,
  149. )
  150. }
  151. func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
  152. if len(desc.variableLabels) != len(labelValues) {
  153. panic(errInconsistentCardinality)
  154. }
  155. for _, n := range desc.variableLabels {
  156. if n == quantileLabel {
  157. panic(errQuantileLabelNotAllowed)
  158. }
  159. }
  160. for _, lp := range desc.constLabelPairs {
  161. if lp.GetName() == quantileLabel {
  162. panic(errQuantileLabelNotAllowed)
  163. }
  164. }
  165. if len(opts.Objectives) == 0 {
  166. opts.Objectives = DefObjectives
  167. }
  168. if opts.MaxAge < 0 {
  169. panic(fmt.Errorf("illegal max age MaxAge=%v", opts.MaxAge))
  170. }
  171. if opts.MaxAge == 0 {
  172. opts.MaxAge = DefMaxAge
  173. }
  174. if opts.AgeBuckets == 0 {
  175. opts.AgeBuckets = DefAgeBuckets
  176. }
  177. if opts.BufCap == 0 {
  178. opts.BufCap = DefBufCap
  179. }
  180. s := &summary{
  181. desc: desc,
  182. objectives: opts.Objectives,
  183. sortedObjectives: make([]float64, 0, len(opts.Objectives)),
  184. labelPairs: makeLabelPairs(desc, labelValues),
  185. hotBuf: make([]float64, 0, opts.BufCap),
  186. coldBuf: make([]float64, 0, opts.BufCap),
  187. streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets),
  188. }
  189. s.headStreamExpTime = time.Now().Add(s.streamDuration)
  190. s.hotBufExpTime = s.headStreamExpTime
  191. for i := uint32(0); i < opts.AgeBuckets; i++ {
  192. s.streams = append(s.streams, s.newStream())
  193. }
  194. s.headStream = s.streams[0]
  195. for qu := range s.objectives {
  196. s.sortedObjectives = append(s.sortedObjectives, qu)
  197. }
  198. sort.Float64s(s.sortedObjectives)
  199. s.Init(s) // Init self-collection.
  200. return s
  201. }
  202. type summary struct {
  203. SelfCollector
  204. bufMtx sync.Mutex // Protects hotBuf and hotBufExpTime.
  205. mtx sync.Mutex // Protects every other moving part.
  206. // Lock bufMtx before mtx if both are needed.
  207. desc *Desc
  208. objectives map[float64]float64
  209. sortedObjectives []float64
  210. labelPairs []*dto.LabelPair
  211. sum float64
  212. cnt uint64
  213. hotBuf, coldBuf []float64
  214. streams []*quantile.Stream
  215. streamDuration time.Duration
  216. headStream *quantile.Stream
  217. headStreamIdx int
  218. headStreamExpTime, hotBufExpTime time.Time
  219. }
  220. func (s *summary) Desc() *Desc {
  221. return s.desc
  222. }
  223. func (s *summary) Observe(v float64) {
  224. s.bufMtx.Lock()
  225. defer s.bufMtx.Unlock()
  226. now := time.Now()
  227. if now.After(s.hotBufExpTime) {
  228. s.asyncFlush(now)
  229. }
  230. s.hotBuf = append(s.hotBuf, v)
  231. if len(s.hotBuf) == cap(s.hotBuf) {
  232. s.asyncFlush(now)
  233. }
  234. }
  235. func (s *summary) Write(out *dto.Metric) error {
  236. sum := &dto.Summary{}
  237. qs := make([]*dto.Quantile, 0, len(s.objectives))
  238. s.bufMtx.Lock()
  239. s.mtx.Lock()
  240. // Swap bufs even if hotBuf is empty to set new hotBufExpTime.
  241. s.swapBufs(time.Now())
  242. s.bufMtx.Unlock()
  243. s.flushColdBuf()
  244. sum.SampleCount = proto.Uint64(s.cnt)
  245. sum.SampleSum = proto.Float64(s.sum)
  246. for _, rank := range s.sortedObjectives {
  247. var q float64
  248. if s.headStream.Count() == 0 {
  249. q = math.NaN()
  250. } else {
  251. q = s.headStream.Query(rank)
  252. }
  253. qs = append(qs, &dto.Quantile{
  254. Quantile: proto.Float64(rank),
  255. Value: proto.Float64(q),
  256. })
  257. }
  258. s.mtx.Unlock()
  259. if len(qs) > 0 {
  260. sort.Sort(quantSort(qs))
  261. }
  262. sum.Quantile = qs
  263. out.Summary = sum
  264. out.Label = s.labelPairs
  265. return nil
  266. }
  267. func (s *summary) newStream() *quantile.Stream {
  268. return quantile.NewTargeted(s.objectives)
  269. }
  270. // asyncFlush needs bufMtx locked.
  271. func (s *summary) asyncFlush(now time.Time) {
  272. s.mtx.Lock()
  273. s.swapBufs(now)
  274. // Unblock the original goroutine that was responsible for the mutation
  275. // that triggered the compaction. But hold onto the global non-buffer
  276. // state mutex until the operation finishes.
  277. go func() {
  278. s.flushColdBuf()
  279. s.mtx.Unlock()
  280. }()
  281. }
  282. // rotateStreams needs mtx AND bufMtx locked.
  283. func (s *summary) maybeRotateStreams() {
  284. for !s.hotBufExpTime.Equal(s.headStreamExpTime) {
  285. s.headStream.Reset()
  286. s.headStreamIdx++
  287. if s.headStreamIdx >= len(s.streams) {
  288. s.headStreamIdx = 0
  289. }
  290. s.headStream = s.streams[s.headStreamIdx]
  291. s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration)
  292. }
  293. }
  294. // flushColdBuf needs mtx locked.
  295. func (s *summary) flushColdBuf() {
  296. for _, v := range s.coldBuf {
  297. for _, stream := range s.streams {
  298. stream.Insert(v)
  299. }
  300. s.cnt++
  301. s.sum += v
  302. }
  303. s.coldBuf = s.coldBuf[0:0]
  304. s.maybeRotateStreams()
  305. }
  306. // swapBufs needs mtx AND bufMtx locked, coldBuf must be empty.
  307. func (s *summary) swapBufs(now time.Time) {
  308. if len(s.coldBuf) != 0 {
  309. panic("coldBuf is not empty")
  310. }
  311. s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf
  312. // hotBuf is now empty and gets new expiration set.
  313. for now.After(s.hotBufExpTime) {
  314. s.hotBufExpTime = s.hotBufExpTime.Add(s.streamDuration)
  315. }
  316. }
  317. type quantSort []*dto.Quantile
  318. func (s quantSort) Len() int {
  319. return len(s)
  320. }
  321. func (s quantSort) Swap(i, j int) {
  322. s[i], s[j] = s[j], s[i]
  323. }
  324. func (s quantSort) Less(i, j int) bool {
  325. return s[i].GetQuantile() < s[j].GetQuantile()
  326. }
  327. // SummaryVec is a Collector that bundles a set of Summaries that all share the
  328. // same Desc, but have different values for their variable labels. This is used
  329. // if you want to count the same thing partitioned by various dimensions
  330. // (e.g. HTTP request latencies, partitioned by status code and method). Create
  331. // instances with NewSummaryVec.
  332. type SummaryVec struct {
  333. MetricVec
  334. }
  335. // NewSummaryVec creates a new SummaryVec based on the provided SummaryOpts and
  336. // partitioned by the given label names. At least one label name must be
  337. // provided.
  338. func NewSummaryVec(opts SummaryOpts, labelNames []string) *SummaryVec {
  339. desc := NewDesc(
  340. BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
  341. opts.Help,
  342. labelNames,
  343. opts.ConstLabels,
  344. )
  345. return &SummaryVec{
  346. MetricVec: MetricVec{
  347. children: map[uint64]Metric{},
  348. desc: desc,
  349. hash: fnv.New64a(),
  350. newMetric: func(lvs ...string) Metric {
  351. return newSummary(desc, opts, lvs...)
  352. },
  353. },
  354. }
  355. }
  356. // GetMetricWithLabelValues replaces the method of the same name in
  357. // MetricVec. The difference is that this method returns a Summary and not a
  358. // Metric so that no type conversion is required.
  359. func (m *SummaryVec) GetMetricWithLabelValues(lvs ...string) (Summary, error) {
  360. metric, err := m.MetricVec.GetMetricWithLabelValues(lvs...)
  361. if metric != nil {
  362. return metric.(Summary), err
  363. }
  364. return nil, err
  365. }
  366. // GetMetricWith replaces the method of the same name in MetricVec. The
  367. // difference is that this method returns a Summary and not a Metric so that no
  368. // type conversion is required.
  369. func (m *SummaryVec) GetMetricWith(labels Labels) (Summary, error) {
  370. metric, err := m.MetricVec.GetMetricWith(labels)
  371. if metric != nil {
  372. return metric.(Summary), err
  373. }
  374. return nil, err
  375. }
  376. // WithLabelValues works as GetMetricWithLabelValues, but panics where
  377. // GetMetricWithLabelValues would have returned an error. By not returning an
  378. // error, WithLabelValues allows shortcuts like
  379. // myVec.WithLabelValues("404", "GET").Observe(42.21)
  380. func (m *SummaryVec) WithLabelValues(lvs ...string) Summary {
  381. return m.MetricVec.WithLabelValues(lvs...).(Summary)
  382. }
  383. // With works as GetMetricWith, but panics where GetMetricWithLabels would have
  384. // returned an error. By not returning an error, With allows shortcuts like
  385. // myVec.With(Labels{"code": "404", "method": "GET"}).Observe(42.21)
  386. func (m *SummaryVec) With(labels Labels) Summary {
  387. return m.MetricVec.With(labels).(Summary)
  388. }
  389. type constSummary struct {
  390. desc *Desc
  391. count uint64
  392. sum float64
  393. quantiles map[float64]float64
  394. labelPairs []*dto.LabelPair
  395. }
  396. func (s *constSummary) Desc() *Desc {
  397. return s.desc
  398. }
  399. func (s *constSummary) Write(out *dto.Metric) error {
  400. sum := &dto.Summary{}
  401. qs := make([]*dto.Quantile, 0, len(s.quantiles))
  402. sum.SampleCount = proto.Uint64(s.count)
  403. sum.SampleSum = proto.Float64(s.sum)
  404. for rank, q := range s.quantiles {
  405. qs = append(qs, &dto.Quantile{
  406. Quantile: proto.Float64(rank),
  407. Value: proto.Float64(q),
  408. })
  409. }
  410. if len(qs) > 0 {
  411. sort.Sort(quantSort(qs))
  412. }
  413. sum.Quantile = qs
  414. out.Summary = sum
  415. out.Label = s.labelPairs
  416. return nil
  417. }
  418. // NewConstSummary returns a metric representing a Prometheus summary with fixed
  419. // values for the count, sum, and quantiles. As those parameters cannot be
  420. // changed, the returned value does not implement the Summary interface (but
  421. // only the Metric interface). Users of this package will not have much use for
  422. // it in regular operations. However, when implementing custom Collectors, it is
  423. // useful as a throw-away metric that is generated on the fly to send it to
  424. // Prometheus in the Collect method.
  425. //
  426. // quantiles maps ranks to quantile values. For example, a median latency of
  427. // 0.23s and a 99th percentile latency of 0.56s would be expressed as:
  428. // map[float64]float64{0.5: 0.23, 0.99: 0.56}
  429. //
  430. // NewConstSummary returns an error if the length of labelValues is not
  431. // consistent with the variable labels in Desc.
  432. func NewConstSummary(
  433. desc *Desc,
  434. count uint64,
  435. sum float64,
  436. quantiles map[float64]float64,
  437. labelValues ...string,
  438. ) (Metric, error) {
  439. if len(desc.variableLabels) != len(labelValues) {
  440. return nil, errInconsistentCardinality
  441. }
  442. return &constSummary{
  443. desc: desc,
  444. count: count,
  445. sum: sum,
  446. quantiles: quantiles,
  447. labelPairs: makeLabelPairs(desc, labelValues),
  448. }, nil
  449. }
  450. // MustNewConstSummary is a version of NewConstSummary that panics where
  451. // NewConstMetric would have returned an error.
  452. func MustNewConstSummary(
  453. desc *Desc,
  454. count uint64,
  455. sum float64,
  456. quantiles map[float64]float64,
  457. labelValues ...string,
  458. ) Metric {
  459. m, err := NewConstSummary(desc, count, sum, quantiles, labelValues...)
  460. if err != nil {
  461. panic(err)
  462. }
  463. return m
  464. }