chain_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package cron
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "reflect"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func appendingJob(slice *[]int, value int) Job {
  11. var m sync.Mutex
  12. return FuncJob(func() {
  13. m.Lock()
  14. *slice = append(*slice, value)
  15. m.Unlock()
  16. })
  17. }
  18. func appendingWrapper(slice *[]int, value int) JobWrapper {
  19. return func(j Job) Job {
  20. return FuncJob(func() {
  21. appendingJob(slice, value).Run()
  22. j.Run()
  23. })
  24. }
  25. }
  26. func TestChain(t *testing.T) {
  27. var nums []int
  28. var (
  29. append1 = appendingWrapper(&nums, 1)
  30. append2 = appendingWrapper(&nums, 2)
  31. append3 = appendingWrapper(&nums, 3)
  32. append4 = appendingJob(&nums, 4)
  33. )
  34. NewChain(append1, append2, append3).Then(append4).Run()
  35. if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
  36. t.Error("unexpected order of calls:", nums)
  37. }
  38. }
  39. func TestChainRecover(t *testing.T) {
  40. panickingJob := FuncJob(func() {
  41. panic("panickingJob panics")
  42. })
  43. t.Run("panic exits job by default", func(t *testing.T) {
  44. defer func() {
  45. if err := recover(); err == nil {
  46. t.Errorf("panic expected, but none received")
  47. }
  48. }()
  49. NewChain().Then(panickingJob).
  50. Run()
  51. })
  52. t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
  53. NewChain(RecoverWithLogger(log.New(ioutil.Discard, "", 0))).
  54. Then(panickingJob).
  55. Run()
  56. })
  57. t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
  58. NewChain(RecoverWithLogger(log.New(ioutil.Discard, "", 0))).
  59. Then(panickingJob).
  60. Run()
  61. })
  62. }
  63. type countJob struct {
  64. m sync.Mutex
  65. started int
  66. done int
  67. delay time.Duration
  68. }
  69. func (j *countJob) Run() {
  70. j.m.Lock()
  71. j.started++
  72. j.m.Unlock()
  73. time.Sleep(j.delay)
  74. j.m.Lock()
  75. j.done++
  76. j.m.Unlock()
  77. }
  78. func (j *countJob) Started() int {
  79. defer j.m.Unlock()
  80. j.m.Lock()
  81. return j.started
  82. }
  83. func (j *countJob) Done() int {
  84. defer j.m.Unlock()
  85. j.m.Lock()
  86. return j.done
  87. }
  88. func TestChainDelayIfStillRunning(t *testing.T) {
  89. t.Run("runs immediately", func(t *testing.T) {
  90. var j countJob
  91. wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
  92. go wrappedJob.Run()
  93. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  94. if c := j.Done(); c != 1 {
  95. t.Errorf("expected job run once, immediately, got %d", c)
  96. }
  97. })
  98. t.Run("second run immediate if first done", func(t *testing.T) {
  99. var j countJob
  100. wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
  101. go func() {
  102. go wrappedJob.Run()
  103. time.Sleep(time.Millisecond)
  104. go wrappedJob.Run()
  105. }()
  106. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  107. if c := j.Done(); c != 2 {
  108. t.Errorf("expected job run twice, immediately, got %d", c)
  109. }
  110. })
  111. t.Run("second run delayed if first not done", func(t *testing.T) {
  112. var j countJob
  113. j.delay = 10 * time.Millisecond
  114. wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
  115. go func() {
  116. go wrappedJob.Run()
  117. time.Sleep(time.Millisecond)
  118. go wrappedJob.Run()
  119. }()
  120. // After 5ms, the first job is still in progress, and the second job was
  121. // run but should be waiting for it to finish.
  122. time.Sleep(5 * time.Millisecond)
  123. started, done := j.Started(), j.Done()
  124. if started != 1 || done != 0 {
  125. t.Error("expected first job started, but not finished, got", started, done)
  126. }
  127. // Verify that the second job completes.
  128. time.Sleep(25 * time.Millisecond)
  129. started, done = j.Started(), j.Done()
  130. if started != 2 || done != 2 {
  131. t.Error("expected both jobs done, got", started, done)
  132. }
  133. })
  134. t.Run("11th run skipped on long queue", func(t *testing.T) {
  135. var j countJob
  136. j.delay = 10 * time.Millisecond
  137. wrappedJob := NewChain(DelayIfStillRunning()).Then(&j)
  138. for i := 0; i < 11; i++ {
  139. go wrappedJob.Run()
  140. }
  141. time.Sleep(200 * time.Millisecond)
  142. done := j.Done()
  143. if done != 10 {
  144. t.Error("expected 10 jobs executed, 1 job dropped, got", done)
  145. }
  146. })
  147. }
  148. func TestChainSkipIfStillRunning(t *testing.T) {
  149. t.Run("runs immediately", func(t *testing.T) {
  150. var j countJob
  151. wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
  152. go wrappedJob.Run()
  153. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  154. if c := j.Done(); c != 1 {
  155. t.Errorf("expected job run once, immediately, got %d", c)
  156. }
  157. })
  158. t.Run("second run immediate if first done", func(t *testing.T) {
  159. var j countJob
  160. wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
  161. go func() {
  162. go wrappedJob.Run()
  163. time.Sleep(time.Millisecond)
  164. go wrappedJob.Run()
  165. }()
  166. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  167. if c := j.Done(); c != 2 {
  168. t.Errorf("expected job run twice, immediately, got %d", c)
  169. }
  170. })
  171. t.Run("second run skipped if first not done", func(t *testing.T) {
  172. var j countJob
  173. j.delay = 10 * time.Millisecond
  174. wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
  175. go func() {
  176. go wrappedJob.Run()
  177. time.Sleep(time.Millisecond)
  178. go wrappedJob.Run()
  179. }()
  180. // After 5ms, the first job is still in progress, and the second job was
  181. // aleady skipped.
  182. time.Sleep(5 * time.Millisecond)
  183. started, done := j.Started(), j.Done()
  184. if started != 1 || done != 0 {
  185. t.Error("expected first job started, but not finished, got", started, done)
  186. }
  187. // Verify that the first job completes and second does not run.
  188. time.Sleep(25 * time.Millisecond)
  189. started, done = j.Started(), j.Done()
  190. if started != 1 || done != 1 {
  191. t.Error("expected second job skipped, got", started, done)
  192. }
  193. })
  194. t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
  195. var j countJob
  196. j.delay = 10 * time.Millisecond
  197. wrappedJob := NewChain(SkipIfStillRunning()).Then(&j)
  198. for i := 0; i < 11; i++ {
  199. go wrappedJob.Run()
  200. }
  201. time.Sleep(200 * time.Millisecond)
  202. done := j.Done()
  203. if done != 1 {
  204. t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
  205. }
  206. })
  207. }