chunkexecutor.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package executors
  2. import "time"
  3. const defaultChunkSize = 1024 * 1024 // 1M
  4. type (
  5. // ChunkOption defines the method to customize a ChunkExecutor.
  6. ChunkOption func(options *chunkOptions)
  7. // A ChunkExecutor is an executor to execute tasks when either requirement meets:
  8. // 1. up to given chunk size
  9. // 2. flush interval elapsed
  10. ChunkExecutor struct {
  11. executor *PeriodicalExecutor
  12. container *chunkContainer
  13. }
  14. chunkOptions struct {
  15. chunkSize int
  16. flushInterval time.Duration
  17. }
  18. )
  19. // NewChunkExecutor returns a ChunkExecutor.
  20. func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
  21. options := newChunkOptions()
  22. for _, opt := range opts {
  23. opt(&options)
  24. }
  25. container := &chunkContainer{
  26. execute: execute,
  27. maxChunkSize: options.chunkSize,
  28. }
  29. executor := &ChunkExecutor{
  30. executor: NewPeriodicalExecutor(options.flushInterval, container),
  31. container: container,
  32. }
  33. return executor
  34. }
  35. // Add adds task with given chunk size into ce.
  36. func (ce *ChunkExecutor) Add(task interface{}, size int) error {
  37. ce.executor.Add(chunk{
  38. val: task,
  39. size: size,
  40. })
  41. return nil
  42. }
  43. // Flush forces ce to flush and execute tasks.
  44. func (ce *ChunkExecutor) Flush() {
  45. ce.executor.Flush()
  46. }
  47. // Wait waits the execution to be done.
  48. func (ce *ChunkExecutor) Wait() {
  49. ce.executor.Wait()
  50. }
  51. // WithChunkBytes customizes a ChunkExecutor with the given chunk size.
  52. func WithChunkBytes(size int) ChunkOption {
  53. return func(options *chunkOptions) {
  54. options.chunkSize = size
  55. }
  56. }
  57. // WithFlushInterval customizes a ChunkExecutor with the given flush interval.
  58. func WithFlushInterval(duration time.Duration) ChunkOption {
  59. return func(options *chunkOptions) {
  60. options.flushInterval = duration
  61. }
  62. }
  63. func newChunkOptions() chunkOptions {
  64. return chunkOptions{
  65. chunkSize: defaultChunkSize,
  66. flushInterval: defaultFlushInterval,
  67. }
  68. }
  69. type chunkContainer struct {
  70. tasks []interface{}
  71. execute Execute
  72. size int
  73. maxChunkSize int
  74. }
  75. func (bc *chunkContainer) AddTask(task interface{}) bool {
  76. ck := task.(chunk)
  77. bc.tasks = append(bc.tasks, ck.val)
  78. bc.size += ck.size
  79. return bc.size >= bc.maxChunkSize
  80. }
  81. func (bc *chunkContainer) Execute(tasks interface{}) {
  82. vals := tasks.([]interface{})
  83. bc.execute(vals)
  84. }
  85. func (bc *chunkContainer) RemoveAll() interface{} {
  86. tasks := bc.tasks
  87. bc.tasks = nil
  88. bc.size = 0
  89. return tasks
  90. }
  91. type chunk struct {
  92. val interface{}
  93. size int
  94. }