bulkexecutor_test.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package executors
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. func TestBulkExecutor(t *testing.T) {
  9. var values []int
  10. var lock sync.Mutex
  11. executor := NewBulkExecutor(func(items []interface{}) {
  12. lock.Lock()
  13. values = append(values, len(items))
  14. lock.Unlock()
  15. }, WithBulkTasks(10), WithBulkInterval(time.Minute))
  16. for i := 0; i < 50; i++ {
  17. executor.Add(1)
  18. time.Sleep(time.Millisecond)
  19. }
  20. lock.Lock()
  21. assert.True(t, len(values) > 0)
  22. // ignore last value
  23. for i := 0; i < len(values); i++ {
  24. assert.Equal(t, 10, values[i])
  25. }
  26. lock.Unlock()
  27. }
  28. func TestBulkExecutorFlushInterval(t *testing.T) {
  29. const (
  30. caches = 10
  31. size = 5
  32. )
  33. var wait sync.WaitGroup
  34. wait.Add(1)
  35. executor := NewBulkExecutor(func(items []interface{}) {
  36. assert.Equal(t, size, len(items))
  37. wait.Done()
  38. }, WithBulkTasks(caches), WithBulkInterval(time.Millisecond*100))
  39. for i := 0; i < size; i++ {
  40. executor.Add(1)
  41. }
  42. wait.Wait()
  43. }
  44. func TestBulkExecutorEmpty(t *testing.T) {
  45. NewBulkExecutor(func(items []interface{}) {
  46. assert.Fail(t, "should not called")
  47. }, WithBulkTasks(10), WithBulkInterval(time.Millisecond))
  48. time.Sleep(time.Millisecond * 100)
  49. }
  50. func TestBulkExecutorFlush(t *testing.T) {
  51. const (
  52. caches = 10
  53. tasks = 5
  54. )
  55. var wait sync.WaitGroup
  56. wait.Add(1)
  57. be := NewBulkExecutor(func(items []interface{}) {
  58. assert.Equal(t, tasks, len(items))
  59. wait.Done()
  60. }, WithBulkTasks(caches), WithBulkInterval(time.Minute))
  61. for i := 0; i < tasks; i++ {
  62. be.Add(1)
  63. }
  64. be.Flush()
  65. wait.Wait()
  66. }
  67. func TestBuldExecutorFlushSlowTasks(t *testing.T) {
  68. const total = 1500
  69. lock := new(sync.Mutex)
  70. result := make([]interface{}, 0, 10000)
  71. exec := NewBulkExecutor(func(tasks []interface{}) {
  72. time.Sleep(time.Millisecond * 100)
  73. lock.Lock()
  74. defer lock.Unlock()
  75. for _, i := range tasks {
  76. result = append(result, i)
  77. }
  78. }, WithBulkTasks(1000))
  79. for i := 0; i < total; i++ {
  80. assert.Nil(t, exec.Add(i))
  81. }
  82. exec.Flush()
  83. exec.Wait()
  84. assert.Equal(t, total, len(result))
  85. }
  86. func BenchmarkBulkExecutor(b *testing.B) {
  87. b.ReportAllocs()
  88. be := NewBulkExecutor(func(tasks []interface{}) {
  89. time.Sleep(time.Millisecond * time.Duration(len(tasks)))
  90. })
  91. for i := 0; i < b.N; i++ {
  92. time.Sleep(time.Microsecond * 200)
  93. be.Add(1)
  94. }
  95. be.Flush()
  96. }