chain_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package cron
  2. import (
  3. "io/ioutil"
  4. "log"
  5. "reflect"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func appendingJob(slice *[]int, value int) Job {
  11. var m sync.Mutex
  12. return FuncJob(func() {
  13. m.Lock()
  14. *slice = append(*slice, value)
  15. m.Unlock()
  16. })
  17. }
  18. func appendingWrapper(slice *[]int, value int) JobWrapper {
  19. return func(j Job) Job {
  20. return FuncJob(func() {
  21. appendingJob(slice, value).Run()
  22. j.Run()
  23. })
  24. }
  25. }
  26. func TestChain(t *testing.T) {
  27. var nums []int
  28. var (
  29. append1 = appendingWrapper(&nums, 1)
  30. append2 = appendingWrapper(&nums, 2)
  31. append3 = appendingWrapper(&nums, 3)
  32. append4 = appendingJob(&nums, 4)
  33. )
  34. NewChain(append1, append2, append3).Then(append4).Run()
  35. if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
  36. t.Error("unexpected order of calls:", nums)
  37. }
  38. }
  39. func TestChainRecover(t *testing.T) {
  40. panickingJob := FuncJob(func() {
  41. panic("panickingJob panics")
  42. })
  43. t.Run("panic exits job by default", func(t *testing.T) {
  44. defer func() {
  45. if err := recover(); err == nil {
  46. t.Errorf("panic expected, but none received")
  47. }
  48. }()
  49. NewChain().Then(panickingJob).
  50. Run()
  51. })
  52. t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
  53. NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
  54. Then(panickingJob).
  55. Run()
  56. })
  57. t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
  58. NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
  59. Then(panickingJob).
  60. Run()
  61. })
  62. }
  63. type countJob struct {
  64. m sync.Mutex
  65. started int
  66. done int
  67. delay time.Duration
  68. }
  69. func (j *countJob) Run() {
  70. j.m.Lock()
  71. j.started++
  72. j.m.Unlock()
  73. time.Sleep(j.delay)
  74. j.m.Lock()
  75. j.done++
  76. j.m.Unlock()
  77. }
  78. func (j *countJob) Started() int {
  79. defer j.m.Unlock()
  80. j.m.Lock()
  81. return j.started
  82. }
  83. func (j *countJob) Done() int {
  84. defer j.m.Unlock()
  85. j.m.Lock()
  86. return j.done
  87. }
  88. func TestChainDelayIfStillRunning(t *testing.T) {
  89. t.Run("runs immediately", func(t *testing.T) {
  90. var j countJob
  91. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  92. go wrappedJob.Run()
  93. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  94. if c := j.Done(); c != 1 {
  95. t.Errorf("expected job run once, immediately, got %d", c)
  96. }
  97. })
  98. t.Run("second run immediate if first done", func(t *testing.T) {
  99. var j countJob
  100. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  101. go func() {
  102. go wrappedJob.Run()
  103. time.Sleep(time.Millisecond)
  104. go wrappedJob.Run()
  105. }()
  106. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  107. if c := j.Done(); c != 2 {
  108. t.Errorf("expected job run twice, immediately, got %d", c)
  109. }
  110. })
  111. t.Run("second run delayed if first not done", func(t *testing.T) {
  112. var j countJob
  113. j.delay = 10 * time.Millisecond
  114. wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
  115. go func() {
  116. go wrappedJob.Run()
  117. time.Sleep(time.Millisecond)
  118. go wrappedJob.Run()
  119. }()
  120. // After 5ms, the first job is still in progress, and the second job was
  121. // run but should be waiting for it to finish.
  122. time.Sleep(5 * time.Millisecond)
  123. started, done := j.Started(), j.Done()
  124. if started != 1 || done != 0 {
  125. t.Error("expected first job started, but not finished, got", started, done)
  126. }
  127. // Verify that the second job completes.
  128. time.Sleep(25 * time.Millisecond)
  129. started, done = j.Started(), j.Done()
  130. if started != 2 || done != 2 {
  131. t.Error("expected both jobs done, got", started, done)
  132. }
  133. })
  134. }
  135. func TestChainSkipIfStillRunning(t *testing.T) {
  136. t.Run("runs immediately", func(t *testing.T) {
  137. var j countJob
  138. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  139. go wrappedJob.Run()
  140. time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete.
  141. if c := j.Done(); c != 1 {
  142. t.Errorf("expected job run once, immediately, got %d", c)
  143. }
  144. })
  145. t.Run("second run immediate if first done", func(t *testing.T) {
  146. var j countJob
  147. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  148. go func() {
  149. go wrappedJob.Run()
  150. time.Sleep(time.Millisecond)
  151. go wrappedJob.Run()
  152. }()
  153. time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
  154. if c := j.Done(); c != 2 {
  155. t.Errorf("expected job run twice, immediately, got %d", c)
  156. }
  157. })
  158. t.Run("second run skipped if first not done", func(t *testing.T) {
  159. var j countJob
  160. j.delay = 10 * time.Millisecond
  161. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  162. go func() {
  163. go wrappedJob.Run()
  164. time.Sleep(time.Millisecond)
  165. go wrappedJob.Run()
  166. }()
  167. // After 5ms, the first job is still in progress, and the second job was
  168. // aleady skipped.
  169. time.Sleep(5 * time.Millisecond)
  170. started, done := j.Started(), j.Done()
  171. if started != 1 || done != 0 {
  172. t.Error("expected first job started, but not finished, got", started, done)
  173. }
  174. // Verify that the first job completes and second does not run.
  175. time.Sleep(25 * time.Millisecond)
  176. started, done = j.Started(), j.Done()
  177. if started != 1 || done != 1 {
  178. t.Error("expected second job skipped, got", started, done)
  179. }
  180. })
  181. t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
  182. var j countJob
  183. j.delay = 10 * time.Millisecond
  184. wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
  185. for i := 0; i < 11; i++ {
  186. go wrappedJob.Run()
  187. }
  188. time.Sleep(200 * time.Millisecond)
  189. done := j.Done()
  190. if done != 1 {
  191. t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
  192. }
  193. })
  194. t.Run("different jobs independent", func(t *testing.T) {
  195. var j1, j2 countJob
  196. j1.delay = 10 * time.Millisecond
  197. j2.delay = 10 * time.Millisecond
  198. chain := NewChain(SkipIfStillRunning(DiscardLogger))
  199. wrappedJob1 := chain.Then(&j1)
  200. wrappedJob2 := chain.Then(&j2)
  201. for i := 0; i < 11; i++ {
  202. go wrappedJob1.Run()
  203. go wrappedJob2.Run()
  204. }
  205. time.Sleep(100 * time.Millisecond)
  206. var (
  207. done1 = j1.Done()
  208. done2 = j2.Done()
  209. )
  210. if done1 != 1 || done2 != 1 {
  211. t.Error("expected both jobs executed once, got", done1, "and", done2)
  212. }
  213. })
  214. }