cleaner.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package internal
  2. import (
  3. "fmt"
  4. "time"
  5. "zero/core/collection"
  6. "zero/core/lang"
  7. "zero/core/logx"
  8. "zero/core/proc"
  9. "zero/core/stat"
  10. "zero/core/stringx"
  11. "zero/core/threading"
  12. )
  13. const (
  14. timingWheelSlots = 300
  15. cleanWorkers = 5
  16. taskKeyLen = 8
  17. )
  18. var (
  19. timingWheel *collection.TimingWheel
  20. taskRunner = threading.NewTaskRunner(cleanWorkers)
  21. )
  22. type delayTask struct {
  23. delay time.Duration
  24. task func() error
  25. keys []string
  26. }
  27. func init() {
  28. var err error
  29. timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
  30. lang.Must(err)
  31. proc.AddShutdownListener(func() {
  32. timingWheel.Drain(clean)
  33. })
  34. }
  35. func AddCleanTask(task func() error, keys ...string) {
  36. timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{
  37. delay: time.Second,
  38. task: task,
  39. keys: keys,
  40. }, time.Second)
  41. }
  42. func clean(key, value interface{}) {
  43. taskRunner.Schedule(func() {
  44. dt := value.(delayTask)
  45. err := dt.task()
  46. if err == nil {
  47. return
  48. }
  49. next, ok := nextDelay(dt.delay)
  50. if ok {
  51. dt.delay = next
  52. timingWheel.SetTimer(key, dt, next)
  53. } else {
  54. msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v",
  55. formatKeys(dt.keys), err)
  56. logx.Error(msg)
  57. stat.Report(msg)
  58. }
  59. })
  60. }
  61. func nextDelay(delay time.Duration) (time.Duration, bool) {
  62. switch delay {
  63. case time.Second:
  64. return time.Second * 5, true
  65. case time.Second * 5:
  66. return time.Minute, true
  67. case time.Minute:
  68. return time.Minute * 5, true
  69. case time.Minute * 5:
  70. return time.Hour, true
  71. default:
  72. return 0, false
  73. }
  74. }