12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package executors
- import "time"
- const defaultBulkTasks = 1000
- type (
- BulkOption func(options *bulkOptions)
- BulkExecutor struct {
- executor *PeriodicalExecutor
- container *bulkContainer
- }
- bulkOptions struct {
- cachedTasks int
- flushInterval time.Duration
- }
- )
- func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
- options := newBulkOptions()
- for _, opt := range opts {
- opt(&options)
- }
- container := &bulkContainer{
- execute: execute,
- maxTasks: options.cachedTasks,
- }
- executor := &BulkExecutor{
- executor: NewPeriodicalExecutor(options.flushInterval, container),
- container: container,
- }
- return executor
- }
- func (be *BulkExecutor) Add(task interface{}) error {
- be.executor.Add(task)
- return nil
- }
- func (be *BulkExecutor) Flush() {
- be.executor.Flush()
- }
- func (be *BulkExecutor) Wait() {
- be.executor.Wait()
- }
- func WithBulkTasks(tasks int) BulkOption {
- return func(options *bulkOptions) {
- options.cachedTasks = tasks
- }
- }
- func WithBulkInterval(duration time.Duration) BulkOption {
- return func(options *bulkOptions) {
- options.flushInterval = duration
- }
- }
- func newBulkOptions() bulkOptions {
- return bulkOptions{
- cachedTasks: defaultBulkTasks,
- flushInterval: defaultFlushInterval,
- }
- }
- type bulkContainer struct {
- tasks []interface{}
- execute Execute
- maxTasks int
- }
- func (bc *bulkContainer) AddTask(task interface{}) bool {
- bc.tasks = append(bc.tasks, task)
- return len(bc.tasks) >= bc.maxTasks
- }
- func (bc *bulkContainer) Execute(tasks interface{}) {
- vals := tasks.([]interface{})
- bc.execute(vals)
- }
- func (bc *bulkContainer) RemoveAll() interface{} {
- tasks := bc.tasks
- bc.tasks = nil
- return tasks
- }
|