123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package executors
- import "time"
- const defaultBulkTasks = 1000
- type (
- // BulkOption defines the method to customize a BulkExecutor.
- BulkOption func(options *bulkOptions)
- // A BulkExecutor is an executor that can execute tasks on either requirement meets:
- // 1. up to given size of tasks
- // 2. flush interval time elapsed
- BulkExecutor struct {
- executor *PeriodicalExecutor
- container *bulkContainer
- }
- bulkOptions struct {
- cachedTasks int
- flushInterval time.Duration
- }
- )
- // NewBulkExecutor returns a BulkExecutor.
- func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
- options := newBulkOptions()
- for _, opt := range opts {
- opt(&options)
- }
- container := &bulkContainer{
- execute: execute,
- maxTasks: options.cachedTasks,
- }
- executor := &BulkExecutor{
- executor: NewPeriodicalExecutor(options.flushInterval, container),
- container: container,
- }
- return executor
- }
- // Add adds task into be.
- func (be *BulkExecutor) Add(task interface{}) error {
- be.executor.Add(task)
- return nil
- }
- // Flush forces be to flush and execute tasks.
- func (be *BulkExecutor) Flush() {
- be.executor.Flush()
- }
- // Wait waits be to done with the task execution.
- func (be *BulkExecutor) Wait() {
- be.executor.Wait()
- }
- // WithBulkTasks customizes a BulkExecutor with given tasks limit.
- func WithBulkTasks(tasks int) BulkOption {
- return func(options *bulkOptions) {
- options.cachedTasks = tasks
- }
- }
- // WithBulkInterval customizes a BulkExecutor with given flush interval.
- func WithBulkInterval(duration time.Duration) BulkOption {
- return func(options *bulkOptions) {
- options.flushInterval = duration
- }
- }
- func newBulkOptions() bulkOptions {
- return bulkOptions{
- cachedTasks: defaultBulkTasks,
- flushInterval: defaultFlushInterval,
- }
- }
- type bulkContainer struct {
- tasks []interface{}
- execute Execute
- maxTasks int
- }
- func (bc *bulkContainer) AddTask(task interface{}) bool {
- bc.tasks = append(bc.tasks, task)
- return len(bc.tasks) >= bc.maxTasks
- }
- func (bc *bulkContainer) Execute(tasks interface{}) {
- vals := tasks.([]interface{})
- bc.execute(vals)
- }
- func (bc *bulkContainer) RemoveAll() interface{} {
- tasks := bc.tasks
- bc.tasks = nil
- return tasks
- }
|