periodicalexecutor.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package executors
  2. import (
  3. "reflect"
  4. "sync"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/proc"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. "github.com/tal-tech/go-zero/core/threading"
  10. "github.com/tal-tech/go-zero/core/timex"
  11. )
  12. const idleRound = 10
  13. type (
  14. // A type that satisfies executors.TaskContainer can be used as the underlying
  15. // container that used to do periodical executions.
  16. TaskContainer interface {
  17. // AddTask adds the task into the container.
  18. // Returns true if the container needs to be flushed after the addition.
  19. AddTask(task interface{}) bool
  20. // Execute handles the collected tasks by the container when flushing.
  21. Execute(tasks interface{})
  22. // RemoveAll removes the contained tasks, and return them.
  23. RemoveAll() interface{}
  24. }
  25. PeriodicalExecutor struct {
  26. commander chan interface{}
  27. interval time.Duration
  28. container TaskContainer
  29. waitGroup sync.WaitGroup
  30. // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
  31. wgBarrier syncx.Barrier
  32. confirmChan chan lang.PlaceholderType
  33. guarded bool
  34. newTicker func(duration time.Duration) timex.Ticker
  35. lock sync.Mutex
  36. }
  37. )
  38. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  39. executor := &PeriodicalExecutor{
  40. // buffer 1 to let the caller go quickly
  41. commander: make(chan interface{}, 1),
  42. interval: interval,
  43. container: container,
  44. confirmChan: make(chan lang.PlaceholderType),
  45. newTicker: func(d time.Duration) timex.Ticker {
  46. return timex.NewTicker(interval)
  47. },
  48. }
  49. proc.AddShutdownListener(func() {
  50. executor.Flush()
  51. })
  52. return executor
  53. }
  54. func (pe *PeriodicalExecutor) Add(task interface{}) {
  55. if vals, ok := pe.addAndCheck(task); ok {
  56. pe.commander <- vals
  57. <-pe.confirmChan
  58. }
  59. }
  60. func (pe *PeriodicalExecutor) Flush() bool {
  61. pe.enterExecution()
  62. return pe.executeTasks(func() interface{} {
  63. pe.lock.Lock()
  64. defer pe.lock.Unlock()
  65. return pe.container.RemoveAll()
  66. }())
  67. }
  68. func (pe *PeriodicalExecutor) Sync(fn func()) {
  69. pe.lock.Lock()
  70. defer pe.lock.Unlock()
  71. fn()
  72. }
  73. func (pe *PeriodicalExecutor) Wait() {
  74. pe.wgBarrier.Guard(func() {
  75. pe.waitGroup.Wait()
  76. })
  77. }
  78. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  79. pe.lock.Lock()
  80. defer func() {
  81. var start bool
  82. if !pe.guarded {
  83. pe.guarded = true
  84. start = true
  85. }
  86. pe.lock.Unlock()
  87. if start {
  88. pe.backgroundFlush()
  89. }
  90. }()
  91. if pe.container.AddTask(task) {
  92. return pe.container.RemoveAll(), true
  93. }
  94. return nil, false
  95. }
  96. func (pe *PeriodicalExecutor) backgroundFlush() {
  97. threading.GoSafe(func() {
  98. ticker := pe.newTicker(pe.interval)
  99. defer ticker.Stop()
  100. var commanded bool
  101. last := timex.Now()
  102. for {
  103. select {
  104. case vals := <-pe.commander:
  105. commanded = true
  106. pe.enterExecution()
  107. pe.confirmChan <- lang.Placeholder
  108. pe.executeTasks(vals)
  109. last = timex.Now()
  110. case <-ticker.Chan():
  111. if commanded {
  112. commanded = false
  113. } else if pe.Flush() {
  114. last = timex.Now()
  115. } else if timex.Since(last) > pe.interval*idleRound {
  116. pe.lock.Lock()
  117. pe.guarded = false
  118. pe.lock.Unlock()
  119. // flush again to avoid missing tasks
  120. pe.Flush()
  121. return
  122. }
  123. }
  124. }
  125. })
  126. }
  127. func (pe *PeriodicalExecutor) doneExecution() {
  128. pe.waitGroup.Done()
  129. }
  130. func (pe *PeriodicalExecutor) enterExecution() {
  131. pe.wgBarrier.Guard(func() {
  132. pe.waitGroup.Add(1)
  133. })
  134. }
  135. func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
  136. defer pe.doneExecution()
  137. ok := pe.hasTasks(tasks)
  138. if ok {
  139. pe.container.Execute(tasks)
  140. }
  141. return ok
  142. }
  143. func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
  144. if tasks == nil {
  145. return false
  146. }
  147. val := reflect.ValueOf(tasks)
  148. switch val.Kind() {
  149. case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
  150. return val.Len() > 0
  151. default:
  152. // unknown type, let caller execute it
  153. return true
  154. }
  155. }