timingwheel.go 6.7 KB


  1. package collection
  2. import (
  3. "container/list"
  4. "fmt"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. "github.com/tal-tech/go-zero/core/timex"
  9. )
  10. const drainWorkers = 8
  11. type (
  12. // Execute defines the method to execute the task.
  13. Execute func(key, value interface{})
  14. // A TimingWheel is a timing wheel object to schedule tasks.
  15. TimingWheel struct {
  16. interval time.Duration
  17. ticker timex.Ticker
  18. slots []*list.List
  19. timers *SafeMap
  20. tickedPos int
  21. numSlots int
  22. execute Execute
  23. setChannel chan timingEntry
  24. moveChannel chan baseEntry
  25. removeChannel chan interface{}
  26. drainChannel chan func(key, value interface{})
  27. stopChannel chan lang.PlaceholderType
  28. }
  29. timingEntry struct {
  30. baseEntry
  31. value interface{}
  32. circle int
  33. diff int
  34. removed bool
  35. }
  36. baseEntry struct {
  37. delay time.Duration
  38. key interface{}
  39. }
  40. positionEntry struct {
  41. pos int
  42. item *timingEntry
  43. }
  44. timingTask struct {
  45. key interface{}
  46. value interface{}
  47. }
  48. )
  49. // NewTimingWheel returns a TimingWheel.
  50. func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
  51. if interval <= 0 || numSlots <= 0 || execute == nil {
  52. return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p", interval, numSlots, execute)
  53. }
  54. return newTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval))
  55. }
  56. func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
  57. *TimingWheel, error) {
  58. tw := &TimingWheel{
  59. interval: interval,
  60. ticker: ticker,
  61. slots: make([]*list.List, numSlots),
  62. timers: NewSafeMap(),
  63. tickedPos: numSlots - 1, // at previous virtual circle
  64. execute: execute,
  65. numSlots: numSlots,
  66. setChannel: make(chan timingEntry),
  67. moveChannel: make(chan baseEntry),
  68. removeChannel: make(chan interface{}),
  69. drainChannel: make(chan func(key, value interface{})),
  70. stopChannel: make(chan lang.PlaceholderType),
  71. }
  72. tw.initSlots()
  73. go tw.run()
  74. return tw, nil
  75. }
  76. // Drain drains all items and executes them.
  77. func (tw *TimingWheel) Drain(fn func(key, value interface{})) {
  78. tw.drainChannel <- fn
  79. }
  80. // MoveTimer moves the task with the given key to the given delay.
  81. func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
  82. if delay <= 0 || key == nil {
  83. return
  84. }
  85. tw.moveChannel <- baseEntry{
  86. delay: delay,
  87. key: key,
  88. }
  89. }
  90. // RemoveTimer removes the task with the given key.
  91. func (tw *TimingWheel) RemoveTimer(key interface{}) {
  92. if key == nil {
  93. return
  94. }
  95. tw.removeChannel <- key
  96. }
  97. // SetTimer sets the task value with the given key to the delay.
  98. func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
  99. if delay <= 0 || key == nil {
  100. return
  101. }
  102. tw.setChannel <- timingEntry{
  103. baseEntry: baseEntry{
  104. delay: delay,
  105. key: key,
  106. },
  107. value: value,
  108. }
  109. }
  110. // Stop stops tw.
  111. func (tw *TimingWheel) Stop() {
  112. close(tw.stopChannel)
  113. }
  114. func (tw *TimingWheel) drainAll(fn func(key, value interface{})) {
  115. runner := threading.NewTaskRunner(drainWorkers)
  116. for _, slot := range tw.slots {
  117. for e := slot.Front(); e != nil; {
  118. task := e.Value.(*timingEntry)
  119. next := e.Next()
  120. slot.Remove(e)
  121. e = next
  122. if !task.removed {
  123. runner.Schedule(func() {
  124. fn(task.key, task.value)
  125. })
  126. }
  127. }
  128. }
  129. }
  130. func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
  131. steps := int(d / tw.interval)
  132. pos = (tw.tickedPos + steps) % tw.numSlots
  133. circle = (steps - 1) / tw.numSlots
  134. return
  135. }
  136. func (tw *TimingWheel) initSlots() {
  137. for i := 0; i < tw.numSlots; i++ {
  138. tw.slots[i] = list.New()
  139. }
  140. }
  141. func (tw *TimingWheel) moveTask(task baseEntry) {
  142. val, ok := tw.timers.Get(task.key)
  143. if !ok {
  144. return
  145. }
  146. timer := val.(*positionEntry)
  147. if task.delay < tw.interval {
  148. threading.GoSafe(func() {
  149. tw.execute(timer.item.key, timer.item.value)
  150. })
  151. return
  152. }
  153. pos, circle := tw.getPositionAndCircle(task.delay)
  154. if pos >= timer.pos {
  155. timer.item.circle = circle
  156. timer.item.diff = pos - timer.pos
  157. } else if circle > 0 {
  158. circle--
  159. timer.item.circle = circle
  160. timer.item.diff = tw.numSlots + pos - timer.pos
  161. } else {
  162. timer.item.removed = true
  163. newItem := &timingEntry{
  164. baseEntry: task,
  165. value: timer.item.value,
  166. }
  167. tw.slots[pos].PushBack(newItem)
  168. tw.setTimerPosition(pos, newItem)
  169. }
  170. }
  171. func (tw *TimingWheel) onTick() {
  172. tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  173. l := tw.slots[tw.tickedPos]
  174. tw.scanAndRunTasks(l)
  175. }
  176. func (tw *TimingWheel) removeTask(key interface{}) {
  177. val, ok := tw.timers.Get(key)
  178. if !ok {
  179. return
  180. }
  181. timer := val.(*positionEntry)
  182. timer.item.removed = true
  183. tw.timers.Del(key)
  184. }
  185. func (tw *TimingWheel) run() {
  186. for {
  187. select {
  188. case <-tw.ticker.Chan():
  189. tw.onTick()
  190. case task := <-tw.setChannel:
  191. tw.setTask(&task)
  192. case key := <-tw.removeChannel:
  193. tw.removeTask(key)
  194. case task := <-tw.moveChannel:
  195. tw.moveTask(task)
  196. case fn := <-tw.drainChannel:
  197. tw.drainAll(fn)
  198. case <-tw.stopChannel:
  199. tw.ticker.Stop()
  200. return
  201. }
  202. }
  203. }
  204. func (tw *TimingWheel) runTasks(tasks []timingTask) {
  205. if len(tasks) == 0 {
  206. return
  207. }
  208. go func() {
  209. for i := range tasks {
  210. threading.RunSafe(func() {
  211. tw.execute(tasks[i].key, tasks[i].value)
  212. })
  213. }
  214. }()
  215. }
  216. func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
  217. var tasks []timingTask
  218. for e := l.Front(); e != nil; {
  219. task := e.Value.(*timingEntry)
  220. if task.removed {
  221. next := e.Next()
  222. l.Remove(e)
  223. e = next
  224. continue
  225. } else if task.circle > 0 {
  226. task.circle--
  227. e = e.Next()
  228. continue
  229. } else if task.diff > 0 {
  230. next := e.Next()
  231. l.Remove(e)
  232. // (tw.tickedPos+task.diff)%tw.numSlots
  233. // cannot be the same value of tw.tickedPos
  234. pos := (tw.tickedPos + task.diff) % tw.numSlots
  235. tw.slots[pos].PushBack(task)
  236. tw.setTimerPosition(pos, task)
  237. task.diff = 0
  238. e = next
  239. continue
  240. }
  241. tasks = append(tasks, timingTask{
  242. key: task.key,
  243. value: task.value,
  244. })
  245. next := e.Next()
  246. l.Remove(e)
  247. tw.timers.Del(task.key)
  248. e = next
  249. }
  250. tw.runTasks(tasks)
  251. }
  252. func (tw *TimingWheel) setTask(task *timingEntry) {
  253. if task.delay < tw.interval {
  254. task.delay = tw.interval
  255. }
  256. if val, ok := tw.timers.Get(task.key); ok {
  257. entry := val.(*positionEntry)
  258. entry.item.value = task.value
  259. tw.moveTask(task.baseEntry)
  260. } else {
  261. pos, circle := tw.getPositionAndCircle(task.delay)
  262. task.circle = circle
  263. tw.slots[pos].PushBack(task)
  264. tw.setTimerPosition(pos, task)
  265. }
  266. }
  267. func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
  268. if val, ok := tw.timers.Get(task.key); ok {
  269. timer := val.(*positionEntry)
  270. timer.item = task
  271. timer.pos = pos
  272. } else {
  273. tw.timers.Set(task.key, &positionEntry{
  274. pos: pos,
  275. item: task,
  276. })
  277. }
  278. }