batcher.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Package batcher implements the batching resiliency pattern for Go.
  2. package batcher
  3. import (
  4. "sync"
  5. "time"
  6. )
  7. type work struct {
  8. param interface{}
  9. future chan error
  10. }
  11. // Batcher implements the batching resiliency pattern
  12. type Batcher struct {
  13. timeout time.Duration
  14. prefilter func(interface{}) error
  15. lock sync.Mutex
  16. submit chan *work
  17. doWork func([]interface{}) error
  18. }
  19. // New constructs a new batcher that will batch all calls to Run that occur within
  20. // `timeout` time before calling doWork just once for the entire batch. The doWork
  21. // function must be safe to run concurrently with itself as this may occur, especially
  22. // when the timeout is small.
  23. func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
  24. return &Batcher{
  25. timeout: timeout,
  26. doWork: doWork,
  27. }
  28. }
  29. // Run runs the work function with the given parameter, possibly
  30. // including it in a batch with other calls to Run that occur within the
  31. // specified timeout. It is safe to call Run concurrently on the same batcher.
  32. func (b *Batcher) Run(param interface{}) error {
  33. if b.prefilter != nil {
  34. if err := b.prefilter(param); err != nil {
  35. return err
  36. }
  37. }
  38. if b.timeout == 0 {
  39. return b.doWork([]interface{}{param})
  40. }
  41. w := &work{
  42. param: param,
  43. future: make(chan error, 1),
  44. }
  45. b.submitWork(w)
  46. return <-w.future
  47. }
  48. // Prefilter specifies an optional function that can be used to run initial checks on parameters
  49. // passed to Run before being added to the batch. If the prefilter returns a non-nil error,
  50. // that error is returned immediately from Run and the batcher is not invoked. A prefilter
  51. // cannot safely be specified for a batcher if Run has already been invoked. The filter function
  52. // specified must be concurrency-safe.
  53. func (b *Batcher) Prefilter(filter func(interface{}) error) {
  54. b.prefilter = filter
  55. }
  56. func (b *Batcher) submitWork(w *work) {
  57. b.lock.Lock()
  58. defer b.lock.Unlock()
  59. if b.submit == nil {
  60. b.submit = make(chan *work, 4)
  61. go b.batch()
  62. }
  63. b.submit <- w
  64. }
  65. func (b *Batcher) batch() {
  66. var params []interface{}
  67. var futures []chan error
  68. input := b.submit
  69. go b.timer()
  70. for work := range input {
  71. params = append(params, work.param)
  72. futures = append(futures, work.future)
  73. }
  74. ret := b.doWork(params)
  75. for _, future := range futures {
  76. future <- ret
  77. close(future)
  78. }
  79. }
  80. func (b *Batcher) timer() {
  81. time.Sleep(b.timeout)
  82. b.lock.Lock()
  83. defer b.lock.Unlock()
  84. close(b.submit)
  85. b.submit = nil
  86. }