timer.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package raft
  2. import (
  3. "math/rand"
  4. "sync"
  5. "time"
  6. )
  7. //------------------------------------------------------------------------------
  8. //
  9. // Typedefs
  10. //
  11. //------------------------------------------------------------------------------
  12. type timer struct {
  13. fireChan chan time.Time
  14. stopChan chan bool
  15. state int
  16. rand *rand.Rand
  17. minDuration time.Duration
  18. maxDuration time.Duration
  19. internalTimer *time.Timer
  20. mutex sync.Mutex
  21. }
  22. const (
  23. STOPPED = iota
  24. READY
  25. RUNNING
  26. )
  27. //------------------------------------------------------------------------------
  28. //
  29. // Constructors
  30. //
  31. //------------------------------------------------------------------------------
  32. // Creates a new timer. Panics if a non-positive duration is used.
  33. func newTimer(minDuration time.Duration, maxDuration time.Duration) *timer {
  34. if minDuration <= 0 {
  35. panic("raft: Non-positive minimum duration not allowed")
  36. } else if maxDuration <= 0 {
  37. panic("raft: Non-positive maximum duration not allowed")
  38. } else if minDuration > maxDuration {
  39. panic("raft: Minimum duration cannot be greater than maximum duration")
  40. }
  41. return &timer{
  42. minDuration: minDuration,
  43. maxDuration: maxDuration,
  44. state: READY,
  45. rand: rand.New(rand.NewSource(time.Now().UnixNano())),
  46. stopChan: make(chan bool, 1),
  47. fireChan: make(chan time.Time),
  48. }
  49. }
  50. //------------------------------------------------------------------------------
  51. //
  52. // Accessors
  53. //
  54. //------------------------------------------------------------------------------
  55. // Sets the minimum and maximum duration of the timer.
  56. func (t *timer) setDuration(duration time.Duration) {
  57. t.minDuration = duration
  58. t.maxDuration = duration
  59. }
  60. //------------------------------------------------------------------------------
  61. //
  62. // Methods
  63. //
  64. //------------------------------------------------------------------------------
  65. // Checks if the timer is currently running.
  66. func (t *timer) running() bool {
  67. return t.state == RUNNING
  68. }
  69. // Stops the timer and closes the channel.
  70. func (t *timer) stop() {
  71. t.mutex.Lock()
  72. defer t.mutex.Unlock()
  73. if t.internalTimer != nil {
  74. t.internalTimer.Stop()
  75. }
  76. if t.state != STOPPED {
  77. t.state = STOPPED
  78. // non-blocking buffer
  79. t.stopChan <- true
  80. }
  81. }
  82. // Change the state of timer to ready
  83. func (t *timer) ready() {
  84. t.mutex.Lock()
  85. defer t.mutex.Unlock()
  86. if t.state == RUNNING {
  87. panic("Timer is already running")
  88. }
  89. t.state = READY
  90. t.stopChan = make(chan bool, 1)
  91. t.fireChan = make(chan time.Time)
  92. }
  93. // Fire at the timer
  94. func (t *timer) fire() {
  95. select {
  96. case t.fireChan <- time.Now():
  97. return
  98. default:
  99. return
  100. }
  101. }
  102. // Start the timer, this func will be blocked until the timer:
  103. // (1) times out
  104. // (2) stopped
  105. // (3) fired
  106. // Return false if stopped.
  107. // Make sure the start func will not restart the stopped timer.
  108. func (t *timer) start() bool {
  109. t.mutex.Lock()
  110. if t.state != READY {
  111. t.mutex.Unlock()
  112. return false
  113. }
  114. t.state = RUNNING
  115. d := t.minDuration
  116. if t.maxDuration > t.minDuration {
  117. d += time.Duration(t.rand.Int63n(int64(t.maxDuration - t.minDuration)))
  118. }
  119. t.internalTimer = time.NewTimer(d)
  120. internalTimer := t.internalTimer
  121. t.mutex.Unlock()
  122. // Wait for the timer channel, stop channel or fire channel.
  123. stopped := false
  124. select {
  125. case <-internalTimer.C:
  126. case <-t.fireChan:
  127. case <-t.stopChan:
  128. stopped = true
  129. }
  130. // Clean up timer and state.
  131. t.mutex.Lock()
  132. t.internalTimer.Stop()
  133. t.internalTimer = nil
  134. if stopped {
  135. t.state = STOPPED
  136. } else if t.state == RUNNING {
  137. t.state = READY
  138. }
  139. t.mutex.Unlock()
  140. return !stopped
  141. }