chain.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package cron
  2. import (
  3. "log"
  4. "os"
  5. "runtime"
  6. "sync"
  7. "time"
  8. )
  9. // JobWrapper decorates the given Job with some behavior.
  10. type JobWrapper func(Job) Job
  11. // Chain is a sequence of JobWrappers that decorates submitted jobs with
  12. // cross-cutting behaviors like logging or synchronization.
  13. type Chain struct {
  14. wrappers []JobWrapper
  15. }
  16. // NewChain returns a Chain consisting of the given JobWrappers.
  17. func NewChain(c ...JobWrapper) Chain {
  18. return Chain{c}
  19. }
  20. // Then decorates the given job with all JobWrappers in the chain.
  21. //
  22. // This:
  23. // NewChain(m1, m2, m3).Then(job)
  24. // is equivalent to:
  25. // m1(m2(m3(job)))
  26. func (c Chain) Then(j Job) Job {
  27. for i := range c.wrappers {
  28. j = c.wrappers[len(c.wrappers)-i-1](j)
  29. }
  30. return j
  31. }
  32. // RecoverWithLogger recovers panics in wrapped jobs and logs them.
  33. func RecoverWithLogger(logger *log.Logger) JobWrapper {
  34. return func(j Job) Job {
  35. return FuncJob(func() {
  36. defer func() {
  37. if r := recover(); r != nil {
  38. const size = 64 << 10
  39. buf := make([]byte, size)
  40. buf = buf[:runtime.Stack(buf, false)]
  41. logger.Printf("cron: panic running job: %v\n%s", r, buf)
  42. }
  43. }()
  44. j.Run()
  45. })
  46. }
  47. }
  48. // Recover panics in wrapped jobs and logs them to os.Stderr using
  49. // the standard logger / flags.
  50. func Recover() JobWrapper {
  51. return RecoverWithLogger(
  52. log.New(os.Stderr, "", log.LstdFlags),
  53. )
  54. }
  55. // DelayIfStillRunning serializes jobs, delaying subsequent runs until the
  56. // previous one is complete. If more than 10 runs of a job are queued up, it
  57. // begins skipping jobs instead, to avoid unbounded queue growth.
  58. func DelayIfStillRunning() JobWrapper {
  59. // This is implemented by assigning each invocation a unique id and
  60. // inserting that into a queue. On each completion, a condition variable is
  61. // signalled to cause all waiting invocations to wake up and see if they are
  62. // next in line.
  63. // TODO: Could do this much more simply if we didn't care about keeping them in order..
  64. const queueSize = 10
  65. return func(j Job) Job {
  66. var jobQueue []int64
  67. var cond = sync.NewCond(&sync.Mutex{})
  68. return FuncJob(func() {
  69. id := time.Now().UnixNano()
  70. cond.L.Lock()
  71. if len(jobQueue) >= queueSize {
  72. // log skip
  73. cond.L.Unlock()
  74. return
  75. }
  76. jobQueue = append(jobQueue, id)
  77. for jobQueue[0] != id {
  78. cond.Wait()
  79. }
  80. cond.L.Unlock()
  81. defer func() {
  82. cond.L.Lock()
  83. jobQueue = jobQueue[1:]
  84. cond.L.Unlock()
  85. cond.Broadcast()
  86. }()
  87. j.Run()
  88. })
  89. }
  90. }
  91. // SkipIfStillRunning skips an invocation of the Job if a previous invocation is
  92. // still running.
  93. func SkipIfStillRunning() JobWrapper {
  94. var ch = make(chan struct{}, 1)
  95. ch <- struct{}{}
  96. return func(j Job) Job {
  97. return FuncJob(func() {
  98. select {
  99. case v := <-ch:
  100. j.Run()
  101. ch <- v
  102. default:
  103. // skip
  104. }
  105. })
  106. }
  107. }