batcher.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. // Package batcher implements the batching resiliency pattern for Go.
  2. package batcher
  3. import (
  4. "sync"
  5. "time"
  6. )
  7. type work struct {
  8. param interface{}
  9. future chan error
  10. }
  11. // Batcher implements the batching resiliency pattern
  12. type Batcher struct {
  13. timeout time.Duration
  14. prefilter func(interface{}) error
  15. lock sync.Mutex
  16. submit chan *work
  17. doWork func([]interface{}) error
  18. }
  19. // New constructs a new batcher that will batch all calls to Run that occur within
  20. // `timeout` time before calling doWork just once for the entire batch.
  21. func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
  22. return &Batcher{
  23. timeout: timeout,
  24. doWork: doWork,
  25. }
  26. }
  27. // Run runs the work function with the given parameter, possibly
  28. // including it in a batch with other calls to Run that occur within the
  29. // specified timeout. It is safe to call Run concurrently on the same batcher.
  30. func (b *Batcher) Run(param interface{}) error {
  31. if b.prefilter != nil {
  32. if err := b.prefilter(param); err != nil {
  33. return err
  34. }
  35. }
  36. w := &work{
  37. param: param,
  38. future: make(chan error, 1),
  39. }
  40. b.submitWork(w)
  41. return <-w.future
  42. }
  43. // Prefilter specifies an optional function that can be used to run initial checks on parameters
  44. // passed to Run before being added to the batch. If the prefilter returns a non-nil error,
  45. // that error is returned immediately from Run and the batcher is not invoked. A prefilter
  46. // cannot safely be specified for a batcher if Run has already been invoked. The filter function
  47. // specified must be concurrency-safe.
  48. func (b *Batcher) Prefilter(filter func(interface{}) error) {
  49. b.prefilter = filter
  50. }
  51. func (b *Batcher) submitWork(w *work) {
  52. b.lock.Lock()
  53. defer b.lock.Unlock()
  54. if b.submit == nil {
  55. b.submit = make(chan *work, 4)
  56. go b.batch()
  57. }
  58. b.submit <- w
  59. }
  60. func (b *Batcher) batch() {
  61. var params []interface{}
  62. var futures []chan error
  63. input := b.submit
  64. go b.timer()
  65. for work := range input {
  66. params = append(params, work.param)
  67. futures = append(futures, work.future)
  68. }
  69. ret := b.doWork(params)
  70. for _, future := range futures {
  71. future <- ret
  72. close(future)
  73. }
  74. }
  75. func (b *Batcher) timer() {
  76. time.Sleep(b.timeout)
  77. b.lock.Lock()
  78. defer b.lock.Unlock()
  79. close(b.submit)
  80. b.submit = nil
  81. }