periodicalexecutor_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package executors
  2. import (
  3. "runtime"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/timex"
  10. )
  11. const threshold = 10
  12. type container struct {
  13. interval time.Duration
  14. tasks []int
  15. execute func(tasks interface{})
  16. }
  17. func newContainer(interval time.Duration, execute func(tasks interface{})) *container {
  18. return &container{
  19. interval: interval,
  20. execute: execute,
  21. }
  22. }
  23. func (c *container) AddTask(task interface{}) bool {
  24. c.tasks = append(c.tasks, task.(int))
  25. return len(c.tasks) > threshold
  26. }
  27. func (c *container) Execute(tasks interface{}) {
  28. if c.execute != nil {
  29. c.execute(tasks)
  30. } else {
  31. time.Sleep(c.interval)
  32. }
  33. }
  34. func (c *container) RemoveAll() interface{} {
  35. tasks := c.tasks
  36. c.tasks = nil
  37. return tasks
  38. }
  39. func TestPeriodicalExecutor_Sync(t *testing.T) {
  40. var done int32
  41. exec := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  42. exec.Sync(func() {
  43. atomic.AddInt32(&done, 1)
  44. })
  45. assert.Equal(t, int32(1), atomic.LoadInt32(&done))
  46. }
  47. func TestPeriodicalExecutor_QuitGoroutine(t *testing.T) {
  48. ticker := timex.NewFakeTicker()
  49. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
  50. exec.newTicker = func(d time.Duration) timex.Ticker {
  51. return ticker
  52. }
  53. routines := runtime.NumGoroutine()
  54. exec.Add(1)
  55. ticker.Tick()
  56. ticker.Wait(time.Millisecond * idleRound * 2)
  57. ticker.Tick()
  58. ticker.Wait(time.Millisecond * idleRound)
  59. assert.Equal(t, routines, runtime.NumGoroutine())
  60. }
  61. func TestPeriodicalExecutor_Bulk(t *testing.T) {
  62. ticker := timex.NewFakeTicker()
  63. var vals []int
  64. // avoid data race
  65. var lock sync.Mutex
  66. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks interface{}) {
  67. t := tasks.([]int)
  68. for _, each := range t {
  69. lock.Lock()
  70. vals = append(vals, each)
  71. lock.Unlock()
  72. }
  73. }))
  74. exec.newTicker = func(d time.Duration) timex.Ticker {
  75. return ticker
  76. }
  77. for i := 0; i < threshold*10; i++ {
  78. if i%threshold == 5 {
  79. time.Sleep(time.Millisecond * idleRound * 2)
  80. }
  81. exec.Add(i)
  82. }
  83. ticker.Tick()
  84. ticker.Wait(time.Millisecond * idleRound * 2)
  85. ticker.Tick()
  86. ticker.Tick()
  87. ticker.Wait(time.Millisecond * idleRound)
  88. var expect []int
  89. for i := 0; i < threshold*10; i++ {
  90. expect = append(expect, i)
  91. }
  92. lock.Lock()
  93. assert.EqualValues(t, expect, vals)
  94. lock.Unlock()
  95. }
  96. func TestPeriodicalExecutor_Wait(t *testing.T) {
  97. var lock sync.Mutex
  98. executer := NewBulkExecutor(func(tasks []interface{}) {
  99. lock.Lock()
  100. defer lock.Unlock()
  101. time.Sleep(10 * time.Millisecond)
  102. }, WithBulkTasks(1), WithBulkInterval(time.Second))
  103. for i := 0; i < 10; i++ {
  104. executer.Add(1)
  105. }
  106. executer.Flush()
  107. executer.Wait()
  108. }
  109. func TestPeriodicalExecutor_WaitFast(t *testing.T) {
  110. const total = 3
  111. var cnt int
  112. var lock sync.Mutex
  113. executer := NewBulkExecutor(func(tasks []interface{}) {
  114. defer func() {
  115. cnt++
  116. }()
  117. lock.Lock()
  118. defer lock.Unlock()
  119. time.Sleep(10 * time.Millisecond)
  120. }, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
  121. for i := 0; i < total; i++ {
  122. executer.Add(2)
  123. }
  124. executer.Flush()
  125. executer.Wait()
  126. assert.Equal(t, total, cnt)
  127. }
  128. func TestPeriodicalExecutor_Deadlock(t *testing.T) {
  129. executor := NewBulkExecutor(func(tasks []interface{}) {
  130. }, WithBulkTasks(1), WithBulkInterval(time.Millisecond))
  131. for i := 0; i < 1e5; i++ {
  132. executor.Add(1)
  133. }
  134. }
  135. func TestPeriodicalExecutor_hasTasks(t *testing.T) {
  136. ticker := timex.NewFakeTicker()
  137. defer ticker.Stop()
  138. exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
  139. exec.newTicker = func(d time.Duration) timex.Ticker {
  140. return ticker
  141. }
  142. assert.False(t, exec.hasTasks(nil))
  143. assert.True(t, exec.hasTasks(1))
  144. }
  145. // go test -benchtime 10s -bench .
  146. func BenchmarkExecutor(b *testing.B) {
  147. b.ReportAllocs()
  148. executor := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
  149. for i := 0; i < b.N; i++ {
  150. executor.Add(1)
  151. }
  152. }