stream.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Package quantile computes approximate quantiles over an unbounded data
  2. // stream within low memory and CPU bounds.
  3. //
  4. // A small amount of accuracy is traded to achieve the above properties.
  5. //
  6. // Multiple streams can be merged before calling Query to generate a single set
  7. // of results. This is meaningful when the streams represent the same type of
  8. // data. See Merge and Samples.
  9. //
  10. // For more detailed information about the algorithm used, see:
  11. //
  12. // Effective Computation of Biased Quantiles over Data Streams
  13. //
  14. // http://www.cs.rutgers.edu/~muthu/bquant.pdf
  15. package quantile
  16. import (
  17. "math"
  18. "sort"
  19. )
  20. // Sample holds an observed value and meta information for compression. JSON
  21. // tags have been added for convenience.
  22. type Sample struct {
  23. Value float64 `json:",string"`
  24. Width float64 `json:",string"`
  25. Delta float64 `json:",string"`
  26. }
  27. // Samples represents a slice of samples. It implements sort.Interface.
  28. type Samples []Sample
  29. func (a Samples) Len() int { return len(a) }
  30. func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
  31. func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  32. type invariant func(s *stream, r float64) float64
  33. // NewLowBiased returns an initialized Stream for low-biased quantiles
  34. // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
  35. // error guarantees can still be given even for the lower ranks of the data
  36. // distribution.
  37. //
  38. // The provided epsilon is a relative error, i.e. the true quantile of a value
  39. // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
  40. //
  41. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
  42. // properties.
  43. func NewLowBiased(epsilon float64) *Stream {
  44. ƒ := func(s *stream, r float64) float64 {
  45. return 2 * epsilon * r
  46. }
  47. return newStream(ƒ)
  48. }
  49. // NewHighBiased returns an initialized Stream for high-biased quantiles
  50. // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
  51. // error guarantees can still be given even for the higher ranks of the data
  52. // distribution.
  53. //
  54. // The provided epsilon is a relative error, i.e. the true quantile of a value
  55. // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
  56. //
  57. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
  58. // properties.
  59. func NewHighBiased(epsilon float64) *Stream {
  60. ƒ := func(s *stream, r float64) float64 {
  61. return 2 * epsilon * (s.n - r)
  62. }
  63. return newStream(ƒ)
  64. }
  65. // NewTargeted returns an initialized Stream concerned with a particular set of
  66. // quantile values that are supplied a priori. Knowing these a priori reduces
  67. // space and computation time. The targets map maps the desired quantiles to
  68. // their absolute errors, i.e. the true quantile of a value returned by a query
  69. // is guaranteed to be within (Quantile±Epsilon).
  70. //
  71. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
  72. func NewTargeted(targets map[float64]float64) *Stream {
  73. ƒ := func(s *stream, r float64) float64 {
  74. var m = math.MaxFloat64
  75. var f float64
  76. for quantile, epsilon := range targets {
  77. if quantile*s.n <= r {
  78. f = (2 * epsilon * r) / quantile
  79. } else {
  80. f = (2 * epsilon * (s.n - r)) / (1 - quantile)
  81. }
  82. if f < m {
  83. m = f
  84. }
  85. }
  86. return m
  87. }
  88. return newStream(ƒ)
  89. }
  90. // Stream computes quantiles for a stream of float64s. It is not thread-safe by
  91. // design. Take care when using across multiple goroutines.
  92. type Stream struct {
  93. *stream
  94. b Samples
  95. sorted bool
  96. }
  97. func newStream(ƒ invariant) *Stream {
  98. x := &stream{ƒ: ƒ}
  99. return &Stream{x, make(Samples, 0, 500), true}
  100. }
  101. // Insert inserts v into the stream.
  102. func (s *Stream) Insert(v float64) {
  103. s.insert(Sample{Value: v, Width: 1})
  104. }
  105. func (s *Stream) insert(sample Sample) {
  106. s.b = append(s.b, sample)
  107. s.sorted = false
  108. if len(s.b) == cap(s.b) {
  109. s.flush()
  110. }
  111. }
  112. // Query returns the computed qth percentiles value. If s was created with
  113. // NewTargeted, and q is not in the set of quantiles provided a priori, Query
  114. // will return an unspecified result.
  115. func (s *Stream) Query(q float64) float64 {
  116. if !s.flushed() {
  117. // Fast path when there hasn't been enough data for a flush;
  118. // this also yields better accuracy for small sets of data.
  119. l := len(s.b)
  120. if l == 0 {
  121. return 0
  122. }
  123. i := int(float64(l) * q)
  124. if i > 0 {
  125. i -= 1
  126. }
  127. s.maybeSort()
  128. return s.b[i].Value
  129. }
  130. s.flush()
  131. return s.stream.query(q)
  132. }
  133. // Merge merges samples into the underlying streams samples. This is handy when
  134. // merging multiple streams from separate threads, database shards, etc.
  135. //
  136. // ATTENTION: This method is broken and does not yield correct results. The
  137. // underlying algorithm is not capable of merging streams correctly.
  138. func (s *Stream) Merge(samples Samples) {
  139. sort.Sort(samples)
  140. s.stream.merge(samples)
  141. }
  142. // Reset reinitializes and clears the list reusing the samples buffer memory.
  143. func (s *Stream) Reset() {
  144. s.stream.reset()
  145. s.b = s.b[:0]
  146. }
  147. // Samples returns stream samples held by s.
  148. func (s *Stream) Samples() Samples {
  149. if !s.flushed() {
  150. return s.b
  151. }
  152. s.flush()
  153. return s.stream.samples()
  154. }
  155. // Count returns the total number of samples observed in the stream
  156. // since initialization.
  157. func (s *Stream) Count() int {
  158. return len(s.b) + s.stream.count()
  159. }
  160. func (s *Stream) flush() {
  161. s.maybeSort()
  162. s.stream.merge(s.b)
  163. s.b = s.b[:0]
  164. }
  165. func (s *Stream) maybeSort() {
  166. if !s.sorted {
  167. s.sorted = true
  168. sort.Sort(s.b)
  169. }
  170. }
  171. func (s *Stream) flushed() bool {
  172. return len(s.stream.l) > 0
  173. }
  174. type stream struct {
  175. n float64
  176. l []Sample
  177. ƒ invariant
  178. }
  179. func (s *stream) reset() {
  180. s.l = s.l[:0]
  181. s.n = 0
  182. }
  183. func (s *stream) insert(v float64) {
  184. s.merge(Samples{{v, 1, 0}})
  185. }
  186. func (s *stream) merge(samples Samples) {
  187. // TODO(beorn7): This tries to merge not only individual samples, but
  188. // whole summaries. The paper doesn't mention merging summaries at
  189. // all. Unittests show that the merging is inaccurate. Find out how to
  190. // do merges properly.
  191. var r float64
  192. i := 0
  193. for _, sample := range samples {
  194. for ; i < len(s.l); i++ {
  195. c := s.l[i]
  196. if c.Value > sample.Value {
  197. // Insert at position i.
  198. s.l = append(s.l, Sample{})
  199. copy(s.l[i+1:], s.l[i:])
  200. s.l[i] = Sample{
  201. sample.Value,
  202. sample.Width,
  203. math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
  204. // TODO(beorn7): How to calculate delta correctly?
  205. }
  206. i++
  207. goto inserted
  208. }
  209. r += c.Width
  210. }
  211. s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
  212. i++
  213. inserted:
  214. s.n += sample.Width
  215. r += sample.Width
  216. }
  217. s.compress()
  218. }
  219. func (s *stream) count() int {
  220. return int(s.n)
  221. }
  222. func (s *stream) query(q float64) float64 {
  223. t := math.Ceil(q * s.n)
  224. t += math.Ceil(s.ƒ(s, t) / 2)
  225. p := s.l[0]
  226. var r float64
  227. for _, c := range s.l[1:] {
  228. r += p.Width
  229. if r+c.Width+c.Delta > t {
  230. return p.Value
  231. }
  232. p = c
  233. }
  234. return p.Value
  235. }
  236. func (s *stream) compress() {
  237. if len(s.l) < 2 {
  238. return
  239. }
  240. x := s.l[len(s.l)-1]
  241. xi := len(s.l) - 1
  242. r := s.n - 1 - x.Width
  243. for i := len(s.l) - 2; i >= 0; i-- {
  244. c := s.l[i]
  245. if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
  246. x.Width += c.Width
  247. s.l[xi] = x
  248. // Remove element at i.
  249. copy(s.l[i:], s.l[i+1:])
  250. s.l = s.l[:len(s.l)-1]
  251. xi -= 1
  252. } else {
  253. x = c
  254. xi = i
  255. }
  256. r -= c.Width
  257. }
  258. }
  259. func (s *stream) samples() Samples {
  260. samples := make(Samples, len(s.l))
  261. copy(samples, s.l)
  262. return samples
  263. }