periodicalexecutor.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package executors
  2. import (
  3. "reflect"
  4. "sync"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/proc"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. "github.com/tal-tech/go-zero/core/threading"
  10. "github.com/tal-tech/go-zero/core/timex"
  11. )
  12. const idleRound = 10
  13. type (
  14. // A type that satisfies executors.TaskContainer can be used as the underlying
  15. // container that used to do periodical executions.
  16. TaskContainer interface {
  17. // AddTask adds the task into the container.
  18. // Returns true if the container needs to be flushed after the addition.
  19. AddTask(task interface{}) bool
  20. // Execute handles the collected tasks by the container when flushing.
  21. Execute(tasks interface{})
  22. // RemoveAll removes the contained tasks, and return them.
  23. RemoveAll() interface{}
  24. }
  25. PeriodicalExecutor struct {
  26. commander chan interface{}
  27. interval time.Duration
  28. container TaskContainer
  29. waitGroup sync.WaitGroup
  30. // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
  31. wgBarrier syncx.Barrier
  32. confirmChan chan lang.PlaceholderType
  33. guarded bool
  34. newTicker func(duration time.Duration) timex.Ticker
  35. currTask int
  36. lock sync.Mutex
  37. }
  38. )
  39. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  40. executor := &PeriodicalExecutor{
  41. // buffer 1 to let the caller go quickly
  42. commander: make(chan interface{}, 1),
  43. interval: interval,
  44. container: container,
  45. confirmChan: make(chan lang.PlaceholderType),
  46. newTicker: func(d time.Duration) timex.Ticker {
  47. return timex.NewTicker(interval)
  48. },
  49. }
  50. proc.AddShutdownListener(func() {
  51. executor.Flush()
  52. })
  53. return executor
  54. }
  55. func (pe *PeriodicalExecutor) Add(task interface{}) {
  56. if vals, ok := pe.addAndCheck(task); ok {
  57. pe.commander <- vals
  58. <-pe.confirmChan
  59. }
  60. }
  61. func (pe *PeriodicalExecutor) Flush() bool {
  62. pe.enterExecution()
  63. return pe.executeTasks(func() interface{} {
  64. pe.lock.Lock()
  65. defer pe.lock.Unlock()
  66. return pe.container.RemoveAll()
  67. }())
  68. }
  69. func (pe *PeriodicalExecutor) Sync(fn func()) {
  70. pe.lock.Lock()
  71. defer pe.lock.Unlock()
  72. fn()
  73. }
  74. func (pe *PeriodicalExecutor) Wait() {
  75. pe.Flush()
  76. pe.wgBarrier.Guard(func() {
  77. pe.waitGroup.Wait()
  78. })
  79. }
  80. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  81. pe.lock.Lock()
  82. defer func() {
  83. var start bool
  84. if !pe.guarded {
  85. pe.guarded = true
  86. start = true
  87. }
  88. pe.lock.Unlock()
  89. if start {
  90. pe.backgroundFlush()
  91. }
  92. }()
  93. if pe.container.AddTask(task) {
  94. vals := pe.container.RemoveAll()
  95. pe.currTask++
  96. return vals, true
  97. }
  98. return nil, false
  99. }
  100. func (pe *PeriodicalExecutor) backgroundFlush() {
  101. threading.GoSafe(func() {
  102. ticker := pe.newTicker(pe.interval)
  103. defer ticker.Stop()
  104. var commanded bool
  105. last := timex.Now()
  106. for {
  107. select {
  108. case vals := <-pe.commander:
  109. commanded = true
  110. pe.enterExecution()
  111. pe.confirmChan <- lang.Placeholder
  112. pe.lock.Lock()
  113. pe.currTask--
  114. pe.lock.Unlock()
  115. pe.executeTasks(vals)
  116. last = timex.Now()
  117. case <-ticker.Chan():
  118. if commanded {
  119. commanded = false
  120. } else if pe.Flush() {
  121. last = timex.Now()
  122. } else if timex.Since(last) > pe.interval*idleRound {
  123. var exit bool = true
  124. pe.lock.Lock()
  125. if pe.currTask > 0 {
  126. exit = false
  127. } else {
  128. pe.guarded = false
  129. }
  130. pe.lock.Unlock()
  131. if exit {
  132. // flush again to avoid missing tasks
  133. pe.Flush()
  134. return
  135. }
  136. }
  137. }
  138. }
  139. })
  140. }
  141. func (pe *PeriodicalExecutor) doneExecution() {
  142. pe.waitGroup.Done()
  143. }
  144. func (pe *PeriodicalExecutor) enterExecution() {
  145. pe.wgBarrier.Guard(func() {
  146. pe.waitGroup.Add(1)
  147. })
  148. }
  149. func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
  150. defer pe.doneExecution()
  151. ok := pe.hasTasks(tasks)
  152. if ok {
  153. pe.container.Execute(tasks)
  154. }
  155. return ok
  156. }
  157. func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool {
  158. if tasks == nil {
  159. return false
  160. }
  161. val := reflect.ValueOf(tasks)
  162. switch val.Kind() {
  163. case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
  164. return val.Len() > 0
  165. default:
  166. // unknown type, let caller execute it
  167. return true
  168. }
  169. }