decode.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // Copyright 2015 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package expfmt
  14. import (
  15. "fmt"
  16. "io"
  17. "math"
  18. "mime"
  19. "net/http"
  20. dto "github.com/prometheus/client_model/go"
  21. "github.com/matttproud/golang_protobuf_extensions/pbutil"
  22. "github.com/prometheus/common/model"
  23. )
  24. // Decoder types decode an input stream into metric families.
  25. type Decoder interface {
  26. Decode(*dto.MetricFamily) error
  27. }
  28. type DecodeOptions struct {
  29. // Timestamp is added to each value from the stream that has no explicit timestamp set.
  30. Timestamp model.Time
  31. }
  32. // ResponseFormat extracts the correct format from a HTTP response header.
  33. // If no matching format can be found FormatUnknown is returned.
  34. func ResponseFormat(h http.Header) Format {
  35. ct := h.Get(hdrContentType)
  36. mediatype, params, err := mime.ParseMediaType(ct)
  37. if err != nil {
  38. return FmtUnknown
  39. }
  40. const (
  41. textType = "text/plain"
  42. jsonType = "application/json"
  43. )
  44. switch mediatype {
  45. case ProtoType:
  46. if p, ok := params["proto"]; ok && p != ProtoProtocol {
  47. return FmtUnknown
  48. }
  49. if e, ok := params["encoding"]; ok && e != "delimited" {
  50. return FmtUnknown
  51. }
  52. return FmtProtoDelim
  53. case textType:
  54. if v, ok := params["version"]; ok && v != TextVersion {
  55. return FmtUnknown
  56. }
  57. return FmtText
  58. case jsonType:
  59. var prometheusAPIVersion string
  60. if params["schema"] == "prometheus/telemetry" && params["version"] != "" {
  61. prometheusAPIVersion = params["version"]
  62. } else {
  63. prometheusAPIVersion = h.Get("X-Prometheus-API-Version")
  64. }
  65. switch prometheusAPIVersion {
  66. case "0.0.2", "":
  67. return fmtJSON2
  68. default:
  69. return FmtUnknown
  70. }
  71. }
  72. return FmtUnknown
  73. }
  74. // NewDecoder returns a new decoder based on the given input format.
  75. // If the input format does not imply otherwise, a text format decoder is returned.
  76. func NewDecoder(r io.Reader, format Format) Decoder {
  77. switch format {
  78. case FmtProtoDelim:
  79. return &protoDecoder{r: r}
  80. case fmtJSON2:
  81. return newJSON2Decoder(r)
  82. }
  83. return &textDecoder{r: r}
  84. }
  85. // protoDecoder implements the Decoder interface for protocol buffers.
  86. type protoDecoder struct {
  87. r io.Reader
  88. }
  89. // Decode implements the Decoder interface.
  90. func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
  91. _, err := pbutil.ReadDelimited(d.r, v)
  92. return err
  93. }
  94. // textDecoder implements the Decoder interface for the text protcol.
  95. type textDecoder struct {
  96. r io.Reader
  97. p TextParser
  98. fams []*dto.MetricFamily
  99. }
  100. // Decode implements the Decoder interface.
  101. func (d *textDecoder) Decode(v *dto.MetricFamily) error {
  102. // TODO(fabxc): Wrap this as a line reader to make streaming safer.
  103. if len(d.fams) == 0 {
  104. // No cached metric families, read everything and parse metrics.
  105. fams, err := d.p.TextToMetricFamilies(d.r)
  106. if err != nil {
  107. return err
  108. }
  109. if len(fams) == 0 {
  110. return io.EOF
  111. }
  112. d.fams = make([]*dto.MetricFamily, 0, len(fams))
  113. for _, f := range fams {
  114. d.fams = append(d.fams, f)
  115. }
  116. }
  117. *v = *d.fams[0]
  118. d.fams = d.fams[1:]
  119. return nil
  120. }
  121. type SampleDecoder struct {
  122. Dec Decoder
  123. Opts *DecodeOptions
  124. f dto.MetricFamily
  125. }
  126. func (sd *SampleDecoder) Decode(s *model.Vector) error {
  127. if err := sd.Dec.Decode(&sd.f); err != nil {
  128. return err
  129. }
  130. *s = extractSamples(&sd.f, sd.Opts)
  131. return nil
  132. }
  133. // Extract samples builds a slice of samples from the provided metric families.
  134. func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) model.Vector {
  135. var all model.Vector
  136. for _, f := range fams {
  137. all = append(all, extractSamples(f, o)...)
  138. }
  139. return all
  140. }
  141. func extractSamples(f *dto.MetricFamily, o *DecodeOptions) model.Vector {
  142. switch f.GetType() {
  143. case dto.MetricType_COUNTER:
  144. return extractCounter(o, f)
  145. case dto.MetricType_GAUGE:
  146. return extractGauge(o, f)
  147. case dto.MetricType_SUMMARY:
  148. return extractSummary(o, f)
  149. case dto.MetricType_UNTYPED:
  150. return extractUntyped(o, f)
  151. case dto.MetricType_HISTOGRAM:
  152. return extractHistogram(o, f)
  153. }
  154. panic("expfmt.extractSamples: unknown metric family type")
  155. }
  156. func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  157. samples := make(model.Vector, 0, len(f.Metric))
  158. for _, m := range f.Metric {
  159. if m.Counter == nil {
  160. continue
  161. }
  162. lset := make(model.LabelSet, len(m.Label)+1)
  163. for _, p := range m.Label {
  164. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  165. }
  166. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  167. smpl := &model.Sample{
  168. Metric: model.Metric(lset),
  169. Value: model.SampleValue(m.Counter.GetValue()),
  170. }
  171. if m.TimestampMs != nil {
  172. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  173. } else {
  174. smpl.Timestamp = o.Timestamp
  175. }
  176. samples = append(samples, smpl)
  177. }
  178. return samples
  179. }
  180. func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  181. samples := make(model.Vector, 0, len(f.Metric))
  182. for _, m := range f.Metric {
  183. if m.Gauge == nil {
  184. continue
  185. }
  186. lset := make(model.LabelSet, len(m.Label)+1)
  187. for _, p := range m.Label {
  188. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  189. }
  190. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  191. smpl := &model.Sample{
  192. Metric: model.Metric(lset),
  193. Value: model.SampleValue(m.Gauge.GetValue()),
  194. }
  195. if m.TimestampMs != nil {
  196. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  197. } else {
  198. smpl.Timestamp = o.Timestamp
  199. }
  200. samples = append(samples, smpl)
  201. }
  202. return samples
  203. }
  204. func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  205. samples := make(model.Vector, 0, len(f.Metric))
  206. for _, m := range f.Metric {
  207. if m.Untyped == nil {
  208. continue
  209. }
  210. lset := make(model.LabelSet, len(m.Label)+1)
  211. for _, p := range m.Label {
  212. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  213. }
  214. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  215. smpl := &model.Sample{
  216. Metric: model.Metric(lset),
  217. Value: model.SampleValue(m.Untyped.GetValue()),
  218. }
  219. if m.TimestampMs != nil {
  220. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  221. } else {
  222. smpl.Timestamp = o.Timestamp
  223. }
  224. samples = append(samples, smpl)
  225. }
  226. return samples
  227. }
  228. func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  229. samples := make(model.Vector, 0, len(f.Metric))
  230. for _, m := range f.Metric {
  231. if m.Summary == nil {
  232. continue
  233. }
  234. timestamp := o.Timestamp
  235. if m.TimestampMs != nil {
  236. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  237. }
  238. for _, q := range m.Summary.Quantile {
  239. lset := make(model.LabelSet, len(m.Label)+2)
  240. for _, p := range m.Label {
  241. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  242. }
  243. // BUG(matt): Update other names to "quantile".
  244. lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
  245. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  246. samples = append(samples, &model.Sample{
  247. Metric: model.Metric(lset),
  248. Value: model.SampleValue(q.GetValue()),
  249. Timestamp: timestamp,
  250. })
  251. }
  252. lset := make(model.LabelSet, len(m.Label)+1)
  253. for _, p := range m.Label {
  254. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  255. }
  256. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  257. samples = append(samples, &model.Sample{
  258. Metric: model.Metric(lset),
  259. Value: model.SampleValue(m.Summary.GetSampleSum()),
  260. Timestamp: timestamp,
  261. })
  262. lset = make(model.LabelSet, len(m.Label)+1)
  263. for _, p := range m.Label {
  264. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  265. }
  266. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  267. samples = append(samples, &model.Sample{
  268. Metric: model.Metric(lset),
  269. Value: model.SampleValue(m.Summary.GetSampleCount()),
  270. Timestamp: timestamp,
  271. })
  272. }
  273. return samples
  274. }
  275. func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  276. samples := make(model.Vector, 0, len(f.Metric))
  277. for _, m := range f.Metric {
  278. if m.Histogram == nil {
  279. continue
  280. }
  281. timestamp := o.Timestamp
  282. if m.TimestampMs != nil {
  283. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  284. }
  285. infSeen := false
  286. for _, q := range m.Histogram.Bucket {
  287. lset := make(model.LabelSet, len(m.Label)+2)
  288. for _, p := range m.Label {
  289. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  290. }
  291. lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
  292. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  293. if math.IsInf(q.GetUpperBound(), +1) {
  294. infSeen = true
  295. }
  296. samples = append(samples, &model.Sample{
  297. Metric: model.Metric(lset),
  298. Value: model.SampleValue(q.GetCumulativeCount()),
  299. Timestamp: timestamp,
  300. })
  301. }
  302. lset := make(model.LabelSet, len(m.Label)+1)
  303. for _, p := range m.Label {
  304. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  305. }
  306. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  307. samples = append(samples, &model.Sample{
  308. Metric: model.Metric(lset),
  309. Value: model.SampleValue(m.Histogram.GetSampleSum()),
  310. Timestamp: timestamp,
  311. })
  312. lset = make(model.LabelSet, len(m.Label)+1)
  313. for _, p := range m.Label {
  314. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  315. }
  316. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  317. count := &model.Sample{
  318. Metric: model.Metric(lset),
  319. Value: model.SampleValue(m.Histogram.GetSampleCount()),
  320. Timestamp: timestamp,
  321. }
  322. samples = append(samples, count)
  323. if !infSeen {
  324. // Append an infinity bucket sample.
  325. lset := make(model.LabelSet, len(m.Label)+2)
  326. for _, p := range m.Label {
  327. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  328. }
  329. lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
  330. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  331. samples = append(samples, &model.Sample{
  332. Metric: model.Metric(lset),
  333. Value: count.Value,
  334. Timestamp: timestamp,
  335. })
  336. }
  337. }
  338. return samples
  339. }