bulkexecutor.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package executors
  2. import "time"
  3. const defaultBulkTasks = 1000
  4. type (
  5. // BulkOption defines the method to customize a BulkExecutor.
  6. BulkOption func(options *bulkOptions)
  7. // A BulkExecutor is an executor that can execute tasks on either requirement meets:
  8. // 1. up to given size of tasks
  9. // 2. flush interval time elapsed
  10. BulkExecutor struct {
  11. executor *PeriodicalExecutor
  12. container *bulkContainer
  13. }
  14. bulkOptions struct {
  15. cachedTasks int
  16. flushInterval time.Duration
  17. }
  18. )
  19. // NewBulkExecutor returns a BulkExecutor.
  20. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  21. options := newBulkOptions()
  22. for _, opt := range opts {
  23. opt(&options)
  24. }
  25. container := &bulkContainer{
  26. execute: execute,
  27. maxTasks: options.cachedTasks,
  28. }
  29. executor := &BulkExecutor{
  30. executor: NewPeriodicalExecutor(options.flushInterval, container),
  31. container: container,
  32. }
  33. return executor
  34. }
  35. // Add adds task into be.
  36. func (be *BulkExecutor) Add(task interface{}) error {
  37. be.executor.Add(task)
  38. return nil
  39. }
  40. // Flush forces be to flush and execute tasks.
  41. func (be *BulkExecutor) Flush() {
  42. be.executor.Flush()
  43. }
  44. // Wait waits be to done with the task execution.
  45. func (be *BulkExecutor) Wait() {
  46. be.executor.Wait()
  47. }
  48. // WithBulkTasks customizes a BulkExecutor with given tasks limit.
  49. func WithBulkTasks(tasks int) BulkOption {
  50. return func(options *bulkOptions) {
  51. options.cachedTasks = tasks
  52. }
  53. }
  54. // WithBulkInterval customizes a BulkExecutor with given flush interval.
  55. func WithBulkInterval(duration time.Duration) BulkOption {
  56. return func(options *bulkOptions) {
  57. options.flushInterval = duration
  58. }
  59. }
  60. func newBulkOptions() bulkOptions {
  61. return bulkOptions{
  62. cachedTasks: defaultBulkTasks,
  63. flushInterval: defaultFlushInterval,
  64. }
  65. }
  66. type bulkContainer struct {
  67. tasks []interface{}
  68. execute Execute
  69. maxTasks int
  70. }
  71. func (bc *bulkContainer) AddTask(task interface{}) bool {
  72. bc.tasks = append(bc.tasks, task)
  73. return len(bc.tasks) >= bc.maxTasks
  74. }
  75. func (bc *bulkContainer) Execute(tasks interface{}) {
  76. vals := tasks.([]interface{})
  77. bc.execute(vals)
  78. }
  79. func (bc *bulkContainer) RemoveAll() interface{} {
  80. tasks := bc.tasks
  81. bc.tasks = nil
  82. return tasks
  83. }