schedule.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package schedule
  15. import (
  16. "context"
  17. "sync"
  18. )
  19. type Job func(context.Context)
  20. // Scheduler can schedule jobs.
  21. type Scheduler interface {
  22. // Schedule asks the scheduler to schedule a job defined by the given func.
  23. // Schedule to a stopped scheduler might panic.
  24. Schedule(j Job)
  25. // Pending returns number of pending jobs
  26. Pending() int
  27. // Scheduled returns the number of scheduled jobs (excluding pending jobs)
  28. Scheduled() int
  29. // Finished returns the number of finished jobs
  30. Finished() int
  31. // WaitFinish waits until at least n job are finished and all pending jobs are finished.
  32. WaitFinish(n int)
  33. // Stop stops the scheduler.
  34. Stop()
  35. }
  36. type fifo struct {
  37. mu sync.Mutex
  38. resume chan struct{}
  39. scheduled int
  40. finished int
  41. pendings []Job
  42. ctx context.Context
  43. cancel context.CancelFunc
  44. finishCond *sync.Cond
  45. donec chan struct{}
  46. }
  47. // NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
  48. // order sequentially
  49. func NewFIFOScheduler() Scheduler {
  50. f := &fifo{
  51. resume: make(chan struct{}, 1),
  52. donec: make(chan struct{}, 1),
  53. }
  54. f.finishCond = sync.NewCond(&f.mu)
  55. f.ctx, f.cancel = context.WithCancel(context.Background())
  56. go f.run()
  57. return f
  58. }
  59. // Schedule schedules a job that will be ran in FIFO order sequentially.
  60. func (f *fifo) Schedule(j Job) {
  61. f.mu.Lock()
  62. defer f.mu.Unlock()
  63. if f.cancel == nil {
  64. panic("schedule: schedule to stopped scheduler")
  65. }
  66. if len(f.pendings) == 0 {
  67. select {
  68. case f.resume <- struct{}{}:
  69. default:
  70. }
  71. }
  72. f.pendings = append(f.pendings, j)
  73. }
  74. func (f *fifo) Pending() int {
  75. f.mu.Lock()
  76. defer f.mu.Unlock()
  77. return len(f.pendings)
  78. }
  79. func (f *fifo) Scheduled() int {
  80. f.mu.Lock()
  81. defer f.mu.Unlock()
  82. return f.scheduled
  83. }
  84. func (f *fifo) Finished() int {
  85. f.finishCond.L.Lock()
  86. defer f.finishCond.L.Unlock()
  87. return f.finished
  88. }
  89. func (f *fifo) WaitFinish(n int) {
  90. f.finishCond.L.Lock()
  91. for f.finished < n || len(f.pendings) != 0 {
  92. f.finishCond.Wait()
  93. }
  94. f.finishCond.L.Unlock()
  95. }
  96. // Stop stops the scheduler and cancels all pending jobs.
  97. func (f *fifo) Stop() {
  98. f.mu.Lock()
  99. f.cancel()
  100. f.cancel = nil
  101. f.mu.Unlock()
  102. <-f.donec
  103. }
  104. func (f *fifo) run() {
  105. // TODO: recover from job panic?
  106. defer func() {
  107. close(f.donec)
  108. close(f.resume)
  109. }()
  110. for {
  111. var todo Job
  112. f.mu.Lock()
  113. if len(f.pendings) != 0 {
  114. f.scheduled++
  115. todo = f.pendings[0]
  116. }
  117. f.mu.Unlock()
  118. if todo == nil {
  119. select {
  120. case <-f.resume:
  121. case <-f.ctx.Done():
  122. f.mu.Lock()
  123. pendings := f.pendings
  124. f.pendings = nil
  125. f.mu.Unlock()
  126. // clean up pending jobs
  127. for _, todo := range pendings {
  128. todo(f.ctx)
  129. }
  130. return
  131. }
  132. } else {
  133. todo(f.ctx)
  134. f.finishCond.L.Lock()
  135. f.finished++
  136. f.pendings = f.pendings[1:]
  137. f.finishCond.Broadcast()
  138. f.finishCond.L.Unlock()
  139. }
  140. }
  141. }