schedule.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package schedule
  15. import (
  16. "sync"
  17. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  18. )
  19. type Job func(context.Context)
  20. // Scheduler can schedule jobs.
  21. type Scheduler interface {
  22. // Schedule asks the scheduler to schedule a job defined by the given func.
  23. // Schedule to a stopped scheduler might panic.
  24. Schedule(j Job)
  25. // Pending returns number of pending jobs
  26. Pending() int
  27. // Scheduled returns the number of scheduled jobs (excluding pending jobs)
  28. Scheduled() int
  29. // Finished returns the number of finished jobs
  30. Finished() int
  31. // WaitFinish waits until at least n job are finished and all pending jobs are finished.
  32. WaitFinish(n int)
  33. // Stop stops the scheduler.
  34. Stop()
  35. }
  36. type fifo struct {
  37. mu sync.Mutex
  38. resume chan struct{}
  39. scheduled int
  40. finished int
  41. pendings []Job
  42. ctx context.Context
  43. cancel context.CancelFunc
  44. finishCond *sync.Cond
  45. donec chan struct{}
  46. }
  47. // NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
  48. // order sequentially
  49. func NewFIFOScheduler() Scheduler {
  50. f := &fifo{
  51. resume: make(chan struct{}, 1),
  52. donec: make(chan struct{}, 1),
  53. }
  54. f.finishCond = sync.NewCond(&f.mu)
  55. f.ctx, f.cancel = context.WithCancel(context.Background())
  56. go f.run()
  57. return f
  58. }
  59. // Schedule schedules a job that will be ran in FIFO order sequentially.
  60. func (f *fifo) Schedule(j Job) {
  61. f.mu.Lock()
  62. defer f.mu.Unlock()
  63. if f.cancel == nil {
  64. panic("schedule: schedule to stopped scheduler")
  65. }
  66. if len(f.pendings) == 0 {
  67. select {
  68. case f.resume <- struct{}{}:
  69. default:
  70. }
  71. }
  72. f.pendings = append(f.pendings, j)
  73. return
  74. }
  75. func (f *fifo) Pending() int {
  76. f.mu.Lock()
  77. defer f.mu.Unlock()
  78. return len(f.pendings)
  79. }
  80. func (f *fifo) Scheduled() int {
  81. f.mu.Lock()
  82. defer f.mu.Unlock()
  83. return f.scheduled
  84. }
  85. func (f *fifo) Finished() int {
  86. f.finishCond.L.Lock()
  87. defer f.finishCond.L.Unlock()
  88. return f.finished
  89. }
  90. func (f *fifo) WaitFinish(n int) {
  91. f.finishCond.L.Lock()
  92. for f.finished < n || len(f.pendings) != 0 {
  93. f.finishCond.Wait()
  94. }
  95. f.finishCond.L.Unlock()
  96. }
  97. // Stop stops the scheduler and cancels all pending jobs.
  98. func (f *fifo) Stop() {
  99. f.mu.Lock()
  100. f.cancel()
  101. f.cancel = nil
  102. f.mu.Unlock()
  103. <-f.donec
  104. }
  105. func (f *fifo) run() {
  106. // TODO: recover from job panic?
  107. defer func() {
  108. close(f.donec)
  109. close(f.resume)
  110. }()
  111. for {
  112. var todo Job
  113. f.mu.Lock()
  114. if len(f.pendings) != 0 {
  115. f.scheduled++
  116. todo = f.pendings[0]
  117. }
  118. f.mu.Unlock()
  119. if todo == nil {
  120. select {
  121. case <-f.resume:
  122. case <-f.ctx.Done():
  123. f.mu.Lock()
  124. pendings := f.pendings
  125. f.pendings = nil
  126. f.mu.Unlock()
  127. // clean up pending jobs
  128. for _, todo := range pendings {
  129. todo(f.ctx)
  130. }
  131. return
  132. }
  133. } else {
  134. todo(f.ctx)
  135. f.finishCond.L.Lock()
  136. f.finished++
  137. f.pendings = f.pendings[1:]
  138. f.finishCond.Broadcast()
  139. f.finishCond.L.Unlock()
  140. }
  141. }
  142. }