bulkexecutor_test.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package executors
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. func TestBulkExecutor(t *testing.T) {
  9. var values []int
  10. var lock sync.Mutex
  11. executor := NewBulkExecutor(func(items []interface{}) {
  12. lock.Lock()
  13. values = append(values, len(items))
  14. lock.Unlock()
  15. }, WithBulkTasks(10), WithBulkInterval(time.Minute))
  16. for i := 0; i < 50; i++ {
  17. executor.Add(1)
  18. time.Sleep(time.Millisecond)
  19. }
  20. lock.Lock()
  21. assert.True(t, len(values) > 0)
  22. // ignore last value
  23. for i := 0; i < len(values); i++ {
  24. assert.Equal(t, 10, values[i])
  25. }
  26. lock.Unlock()
  27. }
  28. func TestBulkExecutorFlushInterval(t *testing.T) {
  29. const (
  30. caches = 10
  31. size = 5
  32. )
  33. var wait sync.WaitGroup
  34. wait.Add(1)
  35. executor := NewBulkExecutor(func(items []interface{}) {
  36. assert.Equal(t, size, len(items))
  37. wait.Done()
  38. }, WithBulkTasks(caches), WithBulkInterval(time.Millisecond*100))
  39. for i := 0; i < size; i++ {
  40. executor.Add(1)
  41. }
  42. wait.Wait()
  43. }
  44. func TestBulkExecutorEmpty(t *testing.T) {
  45. NewBulkExecutor(func(items []interface{}) {
  46. assert.Fail(t, "should not called")
  47. }, WithBulkTasks(10), WithBulkInterval(time.Millisecond))
  48. time.Sleep(time.Millisecond * 100)
  49. }
  50. func TestBulkExecutorFlush(t *testing.T) {
  51. const (
  52. caches = 10
  53. tasks = 5
  54. )
  55. var wait sync.WaitGroup
  56. wait.Add(1)
  57. be := NewBulkExecutor(func(items []interface{}) {
  58. assert.Equal(t, tasks, len(items))
  59. wait.Done()
  60. }, WithBulkTasks(caches), WithBulkInterval(time.Minute))
  61. for i := 0; i < tasks; i++ {
  62. be.Add(1)
  63. }
  64. be.Flush()
  65. wait.Wait()
  66. }
  67. func TestBuldExecutorFlushSlowTasks(t *testing.T) {
  68. const total = 1500
  69. lock := new(sync.Mutex)
  70. result := make([]interface{}, 0, 10000)
  71. exec := NewBulkExecutor(func(tasks []interface{}) {
  72. time.Sleep(time.Millisecond * 100)
  73. lock.Lock()
  74. defer lock.Unlock()
  75. result = append(result, tasks...)
  76. }, WithBulkTasks(1000))
  77. for i := 0; i < total; i++ {
  78. assert.Nil(t, exec.Add(i))
  79. }
  80. exec.Flush()
  81. exec.Wait()
  82. assert.Equal(t, total, len(result))
  83. }
  84. func BenchmarkBulkExecutor(b *testing.B) {
  85. b.ReportAllocs()
  86. be := NewBulkExecutor(func(tasks []interface{}) {
  87. time.Sleep(time.Millisecond * time.Duration(len(tasks)))
  88. })
  89. for i := 0; i < b.N; i++ {
  90. time.Sleep(time.Microsecond * 200)
  91. be.Add(1)
  92. }
  93. be.Flush()
  94. }