cron.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Package cron implements a cron spec parser and runner.
  2. package cron
  3. import (
  4. "sort"
  5. "time"
  6. )
  7. // Cron keeps track of any number of entries, invoking the associated func as
  8. // specified by the schedule. It may be started, stopped, and the entries may
  9. // be inspected while running.
  10. type Cron struct {
  11. entries []*Entry
  12. stop chan struct{}
  13. add chan *Entry
  14. snapshot chan []*Entry
  15. running bool
  16. }
  17. // Job is an interface for submitted cron jobs.
  18. type Job interface {
  19. Run()
  20. }
  21. // Schedule describes a job's duty cycle.
  22. type Schedule interface {
  23. // Next returns the next activation time, later than the given time.
  24. // Next is invoked initially, and then each time the job is run.
  25. Next(time.Time) time.Time
  26. }
  27. // Entry consists of a schedule and the func to execute on that schedule.
  28. type Entry struct {
  29. // The schedule on which this job should be run.
  30. Schedule Schedule
  31. // Next time the job will run. This is the zero time if Cron has not been
  32. // started or this entry's schedule is unsatisfiable
  33. Next time.Time
  34. // Prev is the last time this job was run.
  35. // This is the zero time if the job has never been run.
  36. Prev time.Time
  37. // Job is the thing to run when the Schedule is activated.
  38. Job Job
  39. }
  40. // byTime is a wrapper for sorting the entry array by time
  41. // (with zero time at the end).
  42. type byTime []*Entry
  43. func (s byTime) Len() int { return len(s) }
  44. func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  45. func (s byTime) Less(i, j int) bool {
  46. // Two zero times should return false.
  47. // Otherwise, zero is "greater" than any other time.
  48. // (To sort it at the end of the list.)
  49. if s[i].Next.IsZero() {
  50. return false
  51. }
  52. if s[j].Next.IsZero() {
  53. return true
  54. }
  55. return s[i].Next.Before(s[j].Next)
  56. }
  57. // New returns a new Cron job runner.
  58. func New() *Cron {
  59. return &Cron{
  60. entries: nil,
  61. add: make(chan *Entry),
  62. stop: make(chan struct{}),
  63. snapshot: make(chan []*Entry),
  64. running: false,
  65. }
  66. }
  67. // FuncJob is a wrapper that turns a func() into a cron.Job
  68. type FuncJob func()
  69. func (f FuncJob) Run() { f() }
  70. // AddFunc adds a func to the Cron to be run on the given schedule.
  71. func (c *Cron) AddFunc(spec string, cmd func()) error {
  72. return c.AddJob(spec, FuncJob(cmd))
  73. }
  74. // AddJob adds a Job to the Cron to be run on the given schedule.
  75. func (c *Cron) AddJob(spec string, cmd Job) error {
  76. schedule, err := Parse(spec)
  77. if err != nil {
  78. return err
  79. }
  80. c.Schedule(schedule, cmd)
  81. return nil
  82. }
  83. // Schedule adds a Job to the Cron to be run on the given schedule.
  84. func (c *Cron) Schedule(schedule Schedule, cmd Job) {
  85. entry := &Entry{
  86. Schedule: schedule,
  87. Job: cmd,
  88. }
  89. if !c.running {
  90. c.entries = append(c.entries, entry)
  91. return
  92. }
  93. c.add <- entry
  94. }
  95. // Entries returns a snapshot of the cron entries.
  96. func (c *Cron) Entries() []*Entry {
  97. if c.running {
  98. c.snapshot <- nil
  99. x := <-c.snapshot
  100. return x
  101. }
  102. return c.entrySnapshot()
  103. }
  104. // Start the cron scheduler in its own go-routine.
  105. func (c *Cron) Start() {
  106. c.running = true
  107. go c.run()
  108. }
  109. // run the scheduler.. this is private just due to the need to synchronize
  110. // access to the 'running' state variable.
  111. func (c *Cron) run() {
  112. // Figure out the next activation times for each entry.
  113. now := time.Now().Local()
  114. for _, entry := range c.entries {
  115. entry.Next = entry.Schedule.Next(now)
  116. }
  117. for {
  118. // Determine the next entry to run.
  119. sort.Sort(byTime(c.entries))
  120. var effective time.Time
  121. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
  122. // If there are no entries yet, just sleep - it still handles new entries
  123. // and stop requests.
  124. effective = now.AddDate(10, 0, 0)
  125. } else {
  126. effective = c.entries[0].Next
  127. }
  128. select {
  129. case now = <-time.After(effective.Sub(now)):
  130. // Run every entry whose next time was this effective time.
  131. for _, e := range c.entries {
  132. if e.Next != effective {
  133. break
  134. }
  135. go e.Job.Run()
  136. e.Prev = e.Next
  137. e.Next = e.Schedule.Next(effective)
  138. }
  139. continue
  140. case newEntry := <-c.add:
  141. c.entries = append(c.entries, newEntry)
  142. newEntry.Next = newEntry.Schedule.Next(now)
  143. case <-c.snapshot:
  144. c.snapshot <- c.entrySnapshot()
  145. case <-c.stop:
  146. return
  147. }
  148. // 'now' should be updated after newEntry and snapshot cases.
  149. now = time.Now().Local()
  150. }
  151. }
  152. // Stop the cron scheduler.
  153. func (c *Cron) Stop() {
  154. c.stop <- struct{}{}
  155. c.running = false
  156. }
  157. // entrySnapshot returns a copy of the current cron entry list.
  158. func (c *Cron) entrySnapshot() []*Entry {
  159. entries := []*Entry{}
  160. for _, e := range c.entries {
  161. entries = append(entries, &Entry{
  162. Schedule: e.Schedule,
  163. Next: e.Next,
  164. Prev: e.Prev,
  165. Job: e.Job,
  166. })
  167. }
  168. return entries
  169. }