schedule.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package schedule
  15. import (
  16. "sync"
  17. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  18. )
  19. type Job func(context.Context)
  20. // Scheduler can schedule jobs.
  21. type Scheduler interface {
  22. // Schedule asks the scheduler to schedule a job defined by the given func.
  23. // Schedule to a stopped scheduler might panic.
  24. Schedule(j Job)
  25. // Pending returns number of pending jobs
  26. Pending() int
  27. // Scheduled returns the number of scheduled jobs (excluding pending jobs)
  28. Scheduled() int
  29. // Finished returns the number of finished jobs
  30. Finished() int
  31. // WaitFinish waits all pending jobs to finish.
  32. WaitFinish()
  33. // Stop stops the scheduler.
  34. Stop()
  35. }
  36. type fifo struct {
  37. mu sync.Mutex
  38. resume chan struct{}
  39. scheduled int
  40. finished int
  41. pendings []Job
  42. ctx context.Context
  43. cancel context.CancelFunc
  44. finishCond *sync.Cond
  45. donec chan struct{}
  46. }
  47. // NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
  48. // order sequentially
  49. func NewFIFOScheduler() Scheduler {
  50. f := &fifo{
  51. resume: make(chan struct{}, 1),
  52. donec: make(chan struct{}, 1),
  53. }
  54. f.finishCond = sync.NewCond(&f.mu)
  55. f.ctx, f.cancel = context.WithCancel(context.Background())
  56. go f.run()
  57. return f
  58. }
  59. // Schedule schedules a job that will be ran in FIFO order sequentially.
  60. func (f *fifo) Schedule(j Job) {
  61. f.mu.Lock()
  62. defer f.mu.Unlock()
  63. if f.cancel == nil {
  64. panic("schedule: schedule to stopped scheduler")
  65. }
  66. if len(f.pendings) == 0 {
  67. select {
  68. case f.resume <- struct{}{}:
  69. default:
  70. }
  71. }
  72. f.pendings = append(f.pendings, j)
  73. return
  74. }
  75. func (f *fifo) Pending() int {
  76. f.mu.Lock()
  77. defer f.mu.Unlock()
  78. return len(f.pendings)
  79. }
  80. func (f *fifo) Scheduled() int {
  81. f.mu.Lock()
  82. defer f.mu.Unlock()
  83. return f.scheduled
  84. }
  85. func (f *fifo) Finished() int {
  86. f.finishCond.L.Lock()
  87. defer f.finishCond.L.Unlock()
  88. return f.finished
  89. }
  90. func (f *fifo) WaitFinish() {
  91. f.finishCond.L.Lock()
  92. finish := f.finished
  93. f.finishCond.L.Unlock()
  94. f.finishCond.L.Lock()
  95. for f.finished == finish || len(f.pendings) != 0 {
  96. f.finishCond.Wait()
  97. }
  98. f.finishCond.L.Unlock()
  99. }
  100. // Stop stops the scheduler and cancels all pending jobs.
  101. func (f *fifo) Stop() {
  102. f.mu.Lock()
  103. f.cancel()
  104. f.cancel = nil
  105. f.mu.Unlock()
  106. <-f.donec
  107. }
  108. func (f *fifo) run() {
  109. // TODO: recover from job panic?
  110. defer func() {
  111. close(f.donec)
  112. close(f.resume)
  113. }()
  114. for {
  115. var todo Job
  116. f.mu.Lock()
  117. if len(f.pendings) != 0 {
  118. f.scheduled++
  119. todo = f.pendings[0]
  120. }
  121. f.mu.Unlock()
  122. if todo == nil {
  123. select {
  124. case <-f.resume:
  125. case <-f.ctx.Done():
  126. f.mu.Lock()
  127. pendings := f.pendings
  128. f.pendings = nil
  129. f.mu.Unlock()
  130. // clean up pending jobs
  131. for _, todo := range pendings {
  132. todo(f.ctx)
  133. }
  134. return
  135. }
  136. } else {
  137. todo(f.ctx)
  138. f.finishCond.L.Lock()
  139. f.finished++
  140. f.pendings = f.pendings[1:]
  141. f.finishCond.Broadcast()
  142. f.finishCond.L.Unlock()
  143. }
  144. }
  145. }