chunkexecutor_test.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package executors
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. func TestChunkExecutor(t *testing.T) {
  9. var values []int
  10. var lock sync.Mutex
  11. executor := NewChunkExecutor(func(items []interface{}) {
  12. lock.Lock()
  13. values = append(values, len(items))
  14. lock.Unlock()
  15. }, WithChunkBytes(10), WithFlushInterval(time.Minute))
  16. for i := 0; i < 50; i++ {
  17. executor.Add(1, 1)
  18. time.Sleep(time.Millisecond)
  19. }
  20. lock.Lock()
  21. assert.True(t, len(values) > 0)
  22. // ignore last value
  23. for i := 0; i < len(values); i++ {
  24. assert.Equal(t, 10, values[i])
  25. }
  26. lock.Unlock()
  27. }
  28. func TestChunkExecutorFlushInterval(t *testing.T) {
  29. const (
  30. caches = 10
  31. size = 5
  32. )
  33. var wait sync.WaitGroup
  34. wait.Add(1)
  35. executor := NewChunkExecutor(func(items []interface{}) {
  36. assert.Equal(t, size, len(items))
  37. wait.Done()
  38. }, WithChunkBytes(caches), WithFlushInterval(time.Millisecond*100))
  39. for i := 0; i < size; i++ {
  40. executor.Add(1, 1)
  41. }
  42. wait.Wait()
  43. }
  44. func TestChunkExecutorEmpty(t *testing.T) {
  45. NewChunkExecutor(func(items []interface{}) {
  46. assert.Fail(t, "should not called")
  47. }, WithChunkBytes(10), WithFlushInterval(time.Millisecond))
  48. time.Sleep(time.Millisecond * 100)
  49. }
  50. func TestChunkExecutorFlush(t *testing.T) {
  51. const (
  52. caches = 10
  53. tasks = 5
  54. )
  55. var wait sync.WaitGroup
  56. wait.Add(1)
  57. be := NewChunkExecutor(func(items []interface{}) {
  58. assert.Equal(t, tasks, len(items))
  59. wait.Done()
  60. }, WithChunkBytes(caches), WithFlushInterval(time.Minute))
  61. for i := 0; i < tasks; i++ {
  62. be.Add(1, 1)
  63. }
  64. be.Flush()
  65. wait.Wait()
  66. }
  67. func BenchmarkChunkExecutor(b *testing.B) {
  68. b.ReportAllocs()
  69. be := NewChunkExecutor(func(tasks []interface{}) {
  70. time.Sleep(time.Millisecond * time.Duration(len(tasks)))
  71. })
  72. for i := 0; i < b.N; i++ {
  73. time.Sleep(time.Microsecond * 200)
  74. be.Add(1, 1)
  75. }
  76. be.Flush()
  77. }