batcher_test.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package batcher
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. )
  9. var errSomeError = errors.New("errSomeError")
  10. func returnsError(params []interface{}) error {
  11. return errSomeError
  12. }
  13. func returnsSuccess(params []interface{}) error {
  14. return nil
  15. }
  16. func TestBatcherSuccess(t *testing.T) {
  17. b := New(10*time.Millisecond, returnsSuccess)
  18. wg := &sync.WaitGroup{}
  19. for i := 0; i < 10; i++ {
  20. wg.Add(1)
  21. go func() {
  22. if err := b.Run(nil); err != nil {
  23. t.Error(err)
  24. }
  25. wg.Done()
  26. }()
  27. }
  28. wg.Wait()
  29. b = New(0, returnsSuccess)
  30. for i := 0; i < 10; i++ {
  31. if err := b.Run(nil); err != nil {
  32. t.Error(err)
  33. }
  34. }
  35. }
  36. func TestBatcherError(t *testing.T) {
  37. b := New(10*time.Millisecond, returnsError)
  38. wg := &sync.WaitGroup{}
  39. for i := 0; i < 10; i++ {
  40. wg.Add(1)
  41. go func() {
  42. if err := b.Run(nil); err != errSomeError {
  43. t.Error(err)
  44. }
  45. wg.Done()
  46. }()
  47. }
  48. wg.Wait()
  49. }
  50. func TestBatcherPrefilter(t *testing.T) {
  51. b := New(1*time.Millisecond, returnsSuccess)
  52. b.Prefilter(func(param interface{}) error {
  53. if param == nil {
  54. return errSomeError
  55. }
  56. return nil
  57. })
  58. if err := b.Run(nil); err != errSomeError {
  59. t.Error(err)
  60. }
  61. if err := b.Run(1); err != nil {
  62. t.Error(err)
  63. }
  64. }
  65. func TestBatcherMultipleBatches(t *testing.T) {
  66. var iters uint32
  67. b := New(10*time.Millisecond, func(params []interface{}) error {
  68. atomic.AddUint32(&iters, 1)
  69. return nil
  70. })
  71. wg := &sync.WaitGroup{}
  72. for group := 0; group < 5; group++ {
  73. for i := 0; i < 10; i++ {
  74. wg.Add(1)
  75. go func() {
  76. if err := b.Run(nil); err != nil {
  77. t.Error(err)
  78. }
  79. wg.Done()
  80. }()
  81. }
  82. time.Sleep(15 * time.Millisecond)
  83. }
  84. wg.Wait()
  85. if iters != 5 {
  86. t.Error("Wrong number of iters:", iters)
  87. }
  88. }
  89. func ExampleBatcher() {
  90. b := New(10*time.Millisecond, func(params []interface{}) error {
  91. // do something with the batch of parameters
  92. return nil
  93. })
  94. b.Prefilter(func(param interface{}) error {
  95. // do some sort of sanity check on the parameter, and return an error if it fails
  96. return nil
  97. })
  98. for i := 0; i < 10; i++ {
  99. go b.Run(i)
  100. }
  101. }