cron.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package cron
  2. import (
  3. "context"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. // Cron keeps track of any number of entries, invoking the associated func as
  9. // specified by the schedule. It may be started, stopped, and the entries may
  10. // be inspected while running.
  11. type Cron struct {
  12. entries []*Entry
  13. chain Chain
  14. stop chan struct{}
  15. add chan *Entry
  16. remove chan EntryID
  17. snapshot chan chan []Entry
  18. running bool
  19. logger Logger
  20. runningMu sync.Mutex
  21. location *time.Location
  22. parser Parser
  23. nextID EntryID
  24. jobWaiter sync.WaitGroup
  25. }
  26. // Job is an interface for submitted cron jobs.
  27. type Job interface {
  28. Run()
  29. }
  30. // Schedule describes a job's duty cycle.
  31. type Schedule interface {
  32. // Next returns the next activation time, later than the given time.
  33. // Next is invoked initially, and then each time the job is run.
  34. Next(time.Time) time.Time
  35. }
  36. // EntryID identifies an entry within a Cron instance
  37. type EntryID int
  38. // Entry consists of a schedule and the func to execute on that schedule.
  39. type Entry struct {
  40. // ID is the cron-assigned ID of this entry, which may be used to look up a
  41. // snapshot or remove it.
  42. ID EntryID
  43. // Schedule on which this job should be run.
  44. Schedule Schedule
  45. // Next time the job will run, or the zero time if Cron has not been
  46. // started or this entry's schedule is unsatisfiable
  47. Next time.Time
  48. // Prev is the last time this job was run, or the zero time if never.
  49. Prev time.Time
  50. // WrappedJob is the thing to run when the Schedule is activated.
  51. WrappedJob Job
  52. // Job is the thing that was submitted to cron.
  53. // It is kept around so that user code that needs to get at the job later,
  54. // e.g. via Entries() can do so.
  55. Job Job
  56. }
  57. // Valid returns true if this is not the zero entry.
  58. func (e Entry) Valid() bool { return e.ID != 0 }
  59. // byTime is a wrapper for sorting the entry array by time
  60. // (with zero time at the end).
  61. type byTime []*Entry
  62. func (s byTime) Len() int { return len(s) }
  63. func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  64. func (s byTime) Less(i, j int) bool {
  65. // Two zero times should return false.
  66. // Otherwise, zero is "greater" than any other time.
  67. // (To sort it at the end of the list.)
  68. if s[i].Next.IsZero() {
  69. return false
  70. }
  71. if s[j].Next.IsZero() {
  72. return true
  73. }
  74. return s[i].Next.Before(s[j].Next)
  75. }
  76. // New returns a new Cron job runner, modified by the given options.
  77. //
  78. // Available Settings
  79. //
  80. // Time Zone
  81. // Description: The time zone in which schedules are interpreted
  82. // Default: time.Local
  83. //
  84. // Parser
  85. // Description: Parser converts cron spec strings into cron.Schedules.
  86. // Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
  87. //
  88. // Chain
  89. // Description: Wrap submitted jobs to customize behavior.
  90. // Default: A chain that recovers panics and logs them to stderr.
  91. //
  92. // See "cron.With*" to modify the default behavior.
  93. func New(opts ...Option) *Cron {
  94. c := &Cron{
  95. entries: nil,
  96. chain: NewChain(),
  97. add: make(chan *Entry),
  98. stop: make(chan struct{}),
  99. snapshot: make(chan chan []Entry),
  100. remove: make(chan EntryID),
  101. running: false,
  102. runningMu: sync.Mutex{},
  103. logger: DefaultLogger,
  104. location: time.Local,
  105. parser: standardParser,
  106. }
  107. for _, opt := range opts {
  108. opt(c)
  109. }
  110. return c
  111. }
  112. // FuncJob is a wrapper that turns a func() into a cron.Job
  113. type FuncJob func()
  114. func (f FuncJob) Run() { f() }
  115. // AddFunc adds a func to the Cron to be run on the given schedule.
  116. // The spec is parsed using the time zone of this Cron instance as the default.
  117. // An opaque ID is returned that can be used to later remove it.
  118. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
  119. return c.AddJob(spec, FuncJob(cmd))
  120. }
  121. // AddJob adds a Job to the Cron to be run on the given schedule.
  122. // The spec is parsed using the time zone of this Cron instance as the default.
  123. // An opaque ID is returned that can be used to later remove it.
  124. func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
  125. schedule, err := c.parser.Parse(spec)
  126. if err != nil {
  127. return 0, err
  128. }
  129. return c.Schedule(schedule, cmd), nil
  130. }
  131. // Schedule adds a Job to the Cron to be run on the given schedule.
  132. // The job is wrapped with the configured Chain.
  133. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
  134. c.runningMu.Lock()
  135. defer c.runningMu.Unlock()
  136. c.nextID++
  137. entry := &Entry{
  138. ID: c.nextID,
  139. Schedule: schedule,
  140. WrappedJob: c.chain.Then(cmd),
  141. Job: cmd,
  142. }
  143. if !c.running {
  144. c.entries = append(c.entries, entry)
  145. } else {
  146. c.add <- entry
  147. }
  148. return entry.ID
  149. }
  150. // Entries returns a snapshot of the cron entries.
  151. func (c *Cron) Entries() []Entry {
  152. c.runningMu.Lock()
  153. defer c.runningMu.Unlock()
  154. if c.running {
  155. replyChan := make(chan []Entry, 1)
  156. c.snapshot <- replyChan
  157. return <-replyChan
  158. }
  159. return c.entrySnapshot()
  160. }
  161. // Location gets the time zone location
  162. func (c *Cron) Location() *time.Location {
  163. return c.location
  164. }
  165. // Entry returns a snapshot of the given entry, or nil if it couldn't be found.
  166. func (c *Cron) Entry(id EntryID) Entry {
  167. for _, entry := range c.Entries() {
  168. if id == entry.ID {
  169. return entry
  170. }
  171. }
  172. return Entry{}
  173. }
  174. // Remove an entry from being run in the future.
  175. func (c *Cron) Remove(id EntryID) {
  176. c.runningMu.Lock()
  177. defer c.runningMu.Unlock()
  178. if c.running {
  179. c.remove <- id
  180. } else {
  181. c.removeEntry(id)
  182. }
  183. }
  184. // Start the cron scheduler in its own goroutine, or no-op if already started.
  185. func (c *Cron) Start() {
  186. c.runningMu.Lock()
  187. defer c.runningMu.Unlock()
  188. if c.running {
  189. return
  190. }
  191. c.running = true
  192. go c.run()
  193. }
  194. // Run the cron scheduler, or no-op if already running.
  195. func (c *Cron) Run() {
  196. c.runningMu.Lock()
  197. if c.running {
  198. c.runningMu.Unlock()
  199. return
  200. }
  201. c.running = true
  202. c.runningMu.Unlock()
  203. c.run()
  204. }
  205. // run the scheduler.. this is private just due to the need to synchronize
  206. // access to the 'running' state variable.
  207. func (c *Cron) run() {
  208. c.logger.Info("start")
  209. // Figure out the next activation times for each entry.
  210. now := c.now()
  211. for _, entry := range c.entries {
  212. entry.Next = entry.Schedule.Next(now)
  213. c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
  214. }
  215. for {
  216. // Determine the next entry to run.
  217. sort.Sort(byTime(c.entries))
  218. var timer *time.Timer
  219. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
  220. // If there are no entries yet, just sleep - it still handles new entries
  221. // and stop requests.
  222. timer = time.NewTimer(100000 * time.Hour)
  223. } else {
  224. timer = time.NewTimer(c.entries[0].Next.Sub(now))
  225. }
  226. for {
  227. select {
  228. case now = <-timer.C:
  229. now = now.In(c.location)
  230. c.logger.Info("wake", "now", now)
  231. // Run every entry whose next time was less than now
  232. for _, e := range c.entries {
  233. if e.Next.After(now) || e.Next.IsZero() {
  234. break
  235. }
  236. c.startJob(e.WrappedJob)
  237. e.Prev = e.Next
  238. e.Next = e.Schedule.Next(now)
  239. c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
  240. }
  241. case newEntry := <-c.add:
  242. timer.Stop()
  243. now = c.now()
  244. newEntry.Next = newEntry.Schedule.Next(now)
  245. c.entries = append(c.entries, newEntry)
  246. c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
  247. case replyChan := <-c.snapshot:
  248. replyChan <- c.entrySnapshot()
  249. continue
  250. case <-c.stop:
  251. timer.Stop()
  252. c.logger.Info("stop")
  253. return
  254. case id := <-c.remove:
  255. timer.Stop()
  256. now = c.now()
  257. c.removeEntry(id)
  258. c.logger.Info("removed", "entry", id)
  259. }
  260. break
  261. }
  262. }
  263. }
  264. // startJob runs the given job in a new goroutine.
  265. func (c *Cron) startJob(j Job) {
  266. c.jobWaiter.Add(1)
  267. go func() {
  268. defer c.jobWaiter.Done()
  269. j.Run()
  270. }()
  271. }
  272. // now returns current time in c location
  273. func (c *Cron) now() time.Time {
  274. return time.Now().In(c.location)
  275. }
  276. // Stop stops the cron scheduler if it is running; otherwise it does nothing.
  277. // A context is returned so the caller can wait for running jobs to complete.
  278. func (c *Cron) Stop() context.Context {
  279. c.runningMu.Lock()
  280. defer c.runningMu.Unlock()
  281. if c.running {
  282. c.stop <- struct{}{}
  283. c.running = false
  284. }
  285. ctx, cancel := context.WithCancel(context.Background())
  286. go func() {
  287. c.jobWaiter.Wait()
  288. cancel()
  289. }()
  290. return ctx
  291. }
  292. // entrySnapshot returns a copy of the current cron entry list.
  293. func (c *Cron) entrySnapshot() []Entry {
  294. var entries = make([]Entry, len(c.entries))
  295. for i, e := range c.entries {
  296. entries[i] = *e
  297. }
  298. return entries
  299. }
  300. func (c *Cron) removeEntry(id EntryID) {
  301. var entries []*Entry
  302. for _, e := range c.entries {
  303. if e.ID != id {
  304. entries = append(entries, e)
  305. }
  306. }
  307. c.entries = entries
  308. }