bulkexecutor.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package executors
  2. import "time"
  3. const defaultBulkTasks = 1000
  4. type (
  5. BulkOption func(options *bulkOptions)
  6. BulkExecutor struct {
  7. executor *PeriodicalExecutor
  8. container *bulkContainer
  9. }
  10. bulkOptions struct {
  11. cachedTasks int
  12. flushInterval time.Duration
  13. }
  14. )
  15. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  16. options := newBulkOptions()
  17. for _, opt := range opts {
  18. opt(&options)
  19. }
  20. container := &bulkContainer{
  21. execute: execute,
  22. maxTasks: options.cachedTasks,
  23. }
  24. executor := &BulkExecutor{
  25. executor: NewPeriodicalExecutor(options.flushInterval, container),
  26. container: container,
  27. }
  28. return executor
  29. }
  30. func (be *BulkExecutor) Add(task interface{}) error {
  31. be.executor.Add(task)
  32. return nil
  33. }
  34. func (be *BulkExecutor) Flush() {
  35. be.executor.Flush()
  36. }
  37. func (be *BulkExecutor) Wait() {
  38. be.executor.Wait()
  39. }
  40. func WithBulkTasks(tasks int) BulkOption {
  41. return func(options *bulkOptions) {
  42. options.cachedTasks = tasks
  43. }
  44. }
  45. func WithBulkInterval(duration time.Duration) BulkOption {
  46. return func(options *bulkOptions) {
  47. options.flushInterval = duration
  48. }
  49. }
  50. func newBulkOptions() bulkOptions {
  51. return bulkOptions{
  52. cachedTasks: defaultBulkTasks,
  53. flushInterval: defaultFlushInterval,
  54. }
  55. }
  56. type bulkContainer struct {
  57. tasks []interface{}
  58. execute Execute
  59. maxTasks int
  60. }
  61. func (bc *bulkContainer) AddTask(task interface{}) bool {
  62. bc.tasks = append(bc.tasks, task)
  63. return len(bc.tasks) >= bc.maxTasks
  64. }
  65. func (bc *bulkContainer) Execute(tasks interface{}) {
  66. vals := tasks.([]interface{})
  67. bc.execute(vals)
  68. }
  69. func (bc *bulkContainer) RemoveAll() interface{} {
  70. tasks := bc.tasks
  71. bc.tasks = nil
  72. return tasks
  73. }