123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package executors
- import (
- "runtime"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/tal-tech/go-zero/core/timex"
- )
- const threshold = 10
- type container struct {
- interval time.Duration
- tasks []int
- execute func(tasks interface{})
- }
- func newContainer(interval time.Duration, execute func(tasks interface{})) *container {
- return &container{
- interval: interval,
- execute: execute,
- }
- }
- func (c *container) AddTask(task interface{}) bool {
- c.tasks = append(c.tasks, task.(int))
- return len(c.tasks) > threshold
- }
- func (c *container) Execute(tasks interface{}) {
- if c.execute != nil {
- c.execute(tasks)
- } else {
- time.Sleep(c.interval)
- }
- }
- func (c *container) RemoveAll() interface{} {
- tasks := c.tasks
- c.tasks = nil
- return tasks
- }
- func TestPeriodicalExecutor_Sync(t *testing.T) {
- var done int32
- exec := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
- exec.Sync(func() {
- atomic.AddInt32(&done, 1)
- })
- assert.Equal(t, int32(1), atomic.LoadInt32(&done))
- }
- func TestPeriodicalExecutor_QuitGoroutine(t *testing.T) {
- ticker := timex.NewFakeTicker()
- exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, nil))
- exec.newTicker = func(d time.Duration) timex.Ticker {
- return ticker
- }
- routines := runtime.NumGoroutine()
- exec.Add(1)
- ticker.Tick()
- ticker.Wait(time.Millisecond * idleRound * 2)
- ticker.Tick()
- ticker.Wait(time.Millisecond * idleRound)
- assert.Equal(t, routines, runtime.NumGoroutine())
- }
- func TestPeriodicalExecutor_Bulk(t *testing.T) {
- ticker := timex.NewFakeTicker()
- var vals []int
- // avoid data race
- var lock sync.Mutex
- exec := NewPeriodicalExecutor(time.Millisecond, newContainer(time.Millisecond, func(tasks interface{}) {
- t := tasks.([]int)
- for _, each := range t {
- lock.Lock()
- vals = append(vals, each)
- lock.Unlock()
- }
- }))
- exec.newTicker = func(d time.Duration) timex.Ticker {
- return ticker
- }
- for i := 0; i < threshold*10; i++ {
- if i%threshold == 5 {
- time.Sleep(time.Millisecond * idleRound * 2)
- }
- exec.Add(i)
- }
- ticker.Tick()
- ticker.Wait(time.Millisecond * idleRound * 2)
- ticker.Tick()
- ticker.Tick()
- ticker.Wait(time.Millisecond * idleRound)
- var expect []int
- for i := 0; i < threshold*10; i++ {
- expect = append(expect, i)
- }
- lock.Lock()
- assert.EqualValues(t, expect, vals)
- lock.Unlock()
- }
- func TestPeriodicalExecutor_Wait(t *testing.T) {
- var lock sync.Mutex
- executer := NewBulkExecutor(func(tasks []interface{}) {
- lock.Lock()
- defer lock.Unlock()
- time.Sleep(10 * time.Millisecond)
- }, WithBulkTasks(1), WithBulkInterval(time.Second))
- for i := 0; i < 10; i++ {
- executer.Add(1)
- }
- executer.Flush()
- executer.Wait()
- }
- func TestPeriodicalExecutor_WaitFast(t *testing.T) {
- const total = 3
- var cnt int
- var lock sync.Mutex
- executer := NewBulkExecutor(func(tasks []interface{}) {
- defer func() {
- cnt++
- }()
- lock.Lock()
- defer lock.Unlock()
- time.Sleep(10 * time.Millisecond)
- }, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
- for i := 0; i < total; i++ {
- executer.Add(2)
- }
- executer.Flush()
- executer.Wait()
- assert.Equal(t, total, cnt)
- }
- // go test -benchtime 10s -bench .
- func BenchmarkExecutor(b *testing.B) {
- b.ReportAllocs()
- executor := NewPeriodicalExecutor(time.Second, newContainer(time.Millisecond*500, nil))
- for i := 0; i < b.N; i++ {
- executor.Add(1)
- }
- }
|