rollingwindow_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package collection
  2. import (
  3. "math/rand"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/tal-tech/go-zero/core/stringx"
  8. )
  9. const duration = time.Millisecond * 50
  10. func TestNewRollingWindow(t *testing.T) {
  11. assert.NotNil(t, NewRollingWindow(10, time.Second))
  12. assert.Panics(t, func() {
  13. NewRollingWindow(0, time.Second)
  14. })
  15. }
  16. func TestRollingWindowAdd(t *testing.T) {
  17. const size = 3
  18. r := NewRollingWindow(size, duration)
  19. listBuckets := func() []float64 {
  20. var buckets []float64
  21. r.Reduce(func(b *Bucket) {
  22. buckets = append(buckets, b.Sum)
  23. })
  24. return buckets
  25. }
  26. assert.Equal(t, []float64{0, 0, 0}, listBuckets())
  27. r.Add(1)
  28. assert.Equal(t, []float64{0, 0, 1}, listBuckets())
  29. elapse()
  30. r.Add(2)
  31. r.Add(3)
  32. assert.Equal(t, []float64{0, 1, 5}, listBuckets())
  33. elapse()
  34. r.Add(4)
  35. r.Add(5)
  36. r.Add(6)
  37. assert.Equal(t, []float64{1, 5, 15}, listBuckets())
  38. elapse()
  39. r.Add(7)
  40. assert.Equal(t, []float64{5, 15, 7}, listBuckets())
  41. }
  42. func TestRollingWindowAdd2(t *testing.T) {
  43. const size = 3
  44. interval := time.Millisecond * 50
  45. r := NewRollingWindow(size, interval)
  46. listBuckets := func() []float64 {
  47. var buckets []float64
  48. r.Reduce(func(b *Bucket) {
  49. buckets = append(buckets, b.Sum)
  50. })
  51. return buckets
  52. }
  53. assert.Equal(t, []float64{0, 0, 0}, listBuckets())
  54. r.Add(1)
  55. assert.Equal(t, []float64{0, 0, 1}, listBuckets())
  56. time.Sleep(time.Millisecond * 90)
  57. r.Add(2)
  58. r.Add(3)
  59. assert.Equal(t, []float64{0, 1, 5}, listBuckets())
  60. time.Sleep(time.Millisecond * 20)
  61. r.Add(4)
  62. r.Add(5)
  63. r.Add(6)
  64. assert.Equal(t, []float64{1, 5, 15}, listBuckets())
  65. }
  66. func TestRollingWindowReset(t *testing.T) {
  67. const size = 3
  68. r := NewRollingWindow(size, duration, IgnoreCurrentBucket())
  69. listBuckets := func() []float64 {
  70. var buckets []float64
  71. r.Reduce(func(b *Bucket) {
  72. buckets = append(buckets, b.Sum)
  73. })
  74. return buckets
  75. }
  76. r.Add(1)
  77. elapse()
  78. assert.Equal(t, []float64{0, 1}, listBuckets())
  79. elapse()
  80. assert.Equal(t, []float64{1}, listBuckets())
  81. elapse()
  82. assert.Nil(t, listBuckets())
  83. // cross window
  84. r.Add(1)
  85. time.Sleep(duration * 10)
  86. assert.Nil(t, listBuckets())
  87. }
  88. func TestRollingWindowReduce(t *testing.T) {
  89. const size = 4
  90. tests := []struct {
  91. win *RollingWindow
  92. expect float64
  93. }{
  94. {
  95. win: NewRollingWindow(size, duration),
  96. expect: 10,
  97. },
  98. {
  99. win: NewRollingWindow(size, duration, IgnoreCurrentBucket()),
  100. expect: 4,
  101. },
  102. }
  103. for _, test := range tests {
  104. t.Run(stringx.Rand(), func(t *testing.T) {
  105. r := test.win
  106. for x := 0; x < size; x++ {
  107. for i := 0; i <= x; i++ {
  108. r.Add(float64(i))
  109. }
  110. if x < size-1 {
  111. elapse()
  112. }
  113. }
  114. var result float64
  115. r.Reduce(func(b *Bucket) {
  116. result += b.Sum
  117. })
  118. assert.Equal(t, test.expect, result)
  119. })
  120. }
  121. }
  122. func TestRollingWindowDataRace(t *testing.T) {
  123. const size = 3
  124. r := NewRollingWindow(size, duration)
  125. var stop = make(chan bool)
  126. go func() {
  127. for {
  128. select {
  129. case <-stop:
  130. return
  131. default:
  132. r.Add(float64(rand.Int63()))
  133. time.Sleep(duration / 2)
  134. }
  135. }
  136. }()
  137. go func() {
  138. for {
  139. select {
  140. case <-stop:
  141. return
  142. default:
  143. r.Reduce(func(b *Bucket) {})
  144. }
  145. }
  146. }()
  147. time.Sleep(duration * 5)
  148. close(stop)
  149. }
  150. func elapse() {
  151. time.Sleep(duration)
  152. }