timingwheel.go 6.3 KB


  1. package collection
  2. import (
  3. "container/list"
  4. "fmt"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/lang"
  7. "github.com/tal-tech/go-zero/core/threading"
  8. "github.com/tal-tech/go-zero/core/timex"
  9. )
  10. const drainWorkers = 8
  11. type (
  12. Execute func(key, value interface{})
  13. TimingWheel struct {
  14. interval time.Duration
  15. ticker timex.Ticker
  16. slots []*list.List
  17. timers *SafeMap
  18. tickedPos int
  19. numSlots int
  20. execute Execute
  21. setChannel chan timingEntry
  22. moveChannel chan baseEntry
  23. removeChannel chan interface{}
  24. drainChannel chan func(key, value interface{})
  25. stopChannel chan lang.PlaceholderType
  26. }
  27. timingEntry struct {
  28. baseEntry
  29. value interface{}
  30. circle int
  31. diff int
  32. removed bool
  33. }
  34. baseEntry struct {
  35. delay time.Duration
  36. key interface{}
  37. }
  38. positionEntry struct {
  39. pos int
  40. item *timingEntry
  41. }
  42. timingTask struct {
  43. key interface{}
  44. value interface{}
  45. }
  46. )
  47. func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
  48. if interval <= 0 || numSlots <= 0 || execute == nil {
  49. return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p", interval, numSlots, execute)
  50. }
  51. return newTimingWheelWithClock(interval, numSlots, execute, timex.NewTicker(interval))
  52. }
  53. func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
  54. *TimingWheel, error) {
  55. tw := &TimingWheel{
  56. interval: interval,
  57. ticker: ticker,
  58. slots: make([]*list.List, numSlots),
  59. timers: NewSafeMap(),
  60. tickedPos: numSlots - 1, // at previous virtual circle
  61. execute: execute,
  62. numSlots: numSlots,
  63. setChannel: make(chan timingEntry),
  64. moveChannel: make(chan baseEntry),
  65. removeChannel: make(chan interface{}),
  66. drainChannel: make(chan func(key, value interface{})),
  67. stopChannel: make(chan lang.PlaceholderType),
  68. }
  69. tw.initSlots()
  70. go tw.run()
  71. return tw, nil
  72. }
  73. func (tw *TimingWheel) Drain(fn func(key, value interface{})) {
  74. tw.drainChannel <- fn
  75. }
  76. func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
  77. if delay <= 0 || key == nil {
  78. return
  79. }
  80. tw.moveChannel <- baseEntry{
  81. delay: delay,
  82. key: key,
  83. }
  84. }
  85. func (tw *TimingWheel) RemoveTimer(key interface{}) {
  86. if key == nil {
  87. return
  88. }
  89. tw.removeChannel <- key
  90. }
  91. func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
  92. if delay <= 0 || key == nil {
  93. return
  94. }
  95. tw.setChannel <- timingEntry{
  96. baseEntry: baseEntry{
  97. delay: delay,
  98. key: key,
  99. },
  100. value: value,
  101. }
  102. }
  103. func (tw *TimingWheel) Stop() {
  104. close(tw.stopChannel)
  105. }
  106. func (tw *TimingWheel) drainAll(fn func(key, value interface{})) {
  107. runner := threading.NewTaskRunner(drainWorkers)
  108. for _, slot := range tw.slots {
  109. for e := slot.Front(); e != nil; {
  110. task := e.Value.(*timingEntry)
  111. next := e.Next()
  112. slot.Remove(e)
  113. e = next
  114. if !task.removed {
  115. runner.Schedule(func() {
  116. fn(task.key, task.value)
  117. })
  118. }
  119. }
  120. }
  121. }
  122. func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
  123. steps := int(d / tw.interval)
  124. pos = (tw.tickedPos + steps) % tw.numSlots
  125. circle = (steps - 1) / tw.numSlots
  126. return
  127. }
  128. func (tw *TimingWheel) initSlots() {
  129. for i := 0; i < tw.numSlots; i++ {
  130. tw.slots[i] = list.New()
  131. }
  132. }
  133. func (tw *TimingWheel) moveTask(task baseEntry) {
  134. val, ok := tw.timers.Get(task.key)
  135. if !ok {
  136. return
  137. }
  138. timer := val.(*positionEntry)
  139. if task.delay < tw.interval {
  140. threading.GoSafe(func() {
  141. tw.execute(timer.item.key, timer.item.value)
  142. })
  143. return
  144. }
  145. pos, circle := tw.getPositionAndCircle(task.delay)
  146. if pos >= timer.pos {
  147. timer.item.circle = circle
  148. timer.item.diff = pos - timer.pos
  149. } else if circle > 0 {
  150. circle--
  151. timer.item.circle = circle
  152. timer.item.diff = tw.numSlots + pos - timer.pos
  153. } else {
  154. timer.item.removed = true
  155. newItem := &timingEntry{
  156. baseEntry: task,
  157. value: timer.item.value,
  158. }
  159. tw.slots[pos].PushBack(newItem)
  160. tw.setTimerPosition(pos, newItem)
  161. }
  162. }
  163. func (tw *TimingWheel) onTick() {
  164. tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  165. l := tw.slots[tw.tickedPos]
  166. tw.scanAndRunTasks(l)
  167. }
  168. func (tw *TimingWheel) removeTask(key interface{}) {
  169. val, ok := tw.timers.Get(key)
  170. if !ok {
  171. return
  172. }
  173. timer := val.(*positionEntry)
  174. timer.item.removed = true
  175. tw.timers.Del(key)
  176. }
  177. func (tw *TimingWheel) run() {
  178. for {
  179. select {
  180. case <-tw.ticker.Chan():
  181. tw.onTick()
  182. case task := <-tw.setChannel:
  183. tw.setTask(&task)
  184. case key := <-tw.removeChannel:
  185. tw.removeTask(key)
  186. case task := <-tw.moveChannel:
  187. tw.moveTask(task)
  188. case fn := <-tw.drainChannel:
  189. tw.drainAll(fn)
  190. case <-tw.stopChannel:
  191. tw.ticker.Stop()
  192. return
  193. }
  194. }
  195. }
  196. func (tw *TimingWheel) runTasks(tasks []timingTask) {
  197. if len(tasks) == 0 {
  198. return
  199. }
  200. go func() {
  201. for i := range tasks {
  202. threading.RunSafe(func() {
  203. tw.execute(tasks[i].key, tasks[i].value)
  204. })
  205. }
  206. }()
  207. }
  208. func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
  209. var tasks []timingTask
  210. for e := l.Front(); e != nil; {
  211. task := e.Value.(*timingEntry)
  212. if task.removed {
  213. next := e.Next()
  214. l.Remove(e)
  215. e = next
  216. continue
  217. } else if task.circle > 0 {
  218. task.circle--
  219. e = e.Next()
  220. continue
  221. } else if task.diff > 0 {
  222. next := e.Next()
  223. l.Remove(e)
  224. // (tw.tickedPos+task.diff)%tw.numSlots
  225. // cannot be the same value of tw.tickedPos
  226. pos := (tw.tickedPos + task.diff) % tw.numSlots
  227. tw.slots[pos].PushBack(task)
  228. tw.setTimerPosition(pos, task)
  229. task.diff = 0
  230. e = next
  231. continue
  232. }
  233. tasks = append(tasks, timingTask{
  234. key: task.key,
  235. value: task.value,
  236. })
  237. next := e.Next()
  238. l.Remove(e)
  239. tw.timers.Del(task.key)
  240. e = next
  241. }
  242. tw.runTasks(tasks)
  243. }
  244. func (tw *TimingWheel) setTask(task *timingEntry) {
  245. if task.delay < tw.interval {
  246. task.delay = tw.interval
  247. }
  248. if val, ok := tw.timers.Get(task.key); ok {
  249. entry := val.(*positionEntry)
  250. entry.item.value = task.value
  251. tw.moveTask(task.baseEntry)
  252. } else {
  253. pos, circle := tw.getPositionAndCircle(task.delay)
  254. task.circle = circle
  255. tw.slots[pos].PushBack(task)
  256. tw.setTimerPosition(pos, task)
  257. }
  258. }
  259. func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {
  260. if val, ok := tw.timers.Get(task.key); ok {
  261. timer := val.(*positionEntry)
  262. timer.item = task
  263. timer.pos = pos
  264. } else {
  265. tw.timers.Set(task.key, &positionEntry{
  266. pos: pos,
  267. item: task,
  268. })
  269. }
  270. }