progress.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Copyright 2019 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tracker
  15. import (
  16. "fmt"
  17. "sort"
  18. "strings"
  19. )
  20. // Progress represents a follower’s progress in the view of the leader. Leader
  21. // maintains progresses of all followers, and sends entries to the follower
  22. // based on its progress.
  23. //
  24. // NB(tbg): Progress is basically a state machine whose transitions are mostly
  25. // strewn around `*raft.raft`. Additionally, some fields are only used when in a
  26. // certain State. All of this isn't ideal.
  27. type Progress struct {
  28. Match, Next uint64
  29. // State defines how the leader should interact with the follower.
  30. //
  31. // When in StateProbe, leader sends at most one replication message
  32. // per heartbeat interval. It also probes actual progress of the follower.
  33. //
  34. // When in StateReplicate, leader optimistically increases next
  35. // to the latest entry sent after sending replication message. This is
  36. // an optimized state for fast replicating log entries to the follower.
  37. //
  38. // When in StateSnapshot, leader should have sent out snapshot
  39. // before and stops sending any replication message.
  40. State StateType
  41. // PendingSnapshot is used in StateSnapshot.
  42. // If there is a pending snapshot, the pendingSnapshot will be set to the
  43. // index of the snapshot. If pendingSnapshot is set, the replication process of
  44. // this Progress will be paused. raft will not resend snapshot until the pending one
  45. // is reported to be failed.
  46. PendingSnapshot uint64
  47. // RecentActive is true if the progress is recently active. Receiving any messages
  48. // from the corresponding follower indicates the progress is active.
  49. // RecentActive can be reset to false after an election timeout.
  50. //
  51. // TODO(tbg): the leader should always have this set to true.
  52. RecentActive bool
  53. // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
  54. // true, raft should pause sending replication message to this peer until
  55. // ProbeSent is reset. See ProbeAcked() and IsPaused().
  56. ProbeSent bool
  57. // Inflights is a sliding window for the inflight messages.
  58. // Each inflight message contains one or more log entries.
  59. // The max number of entries per message is defined in raft config as MaxSizePerMsg.
  60. // Thus inflight effectively limits both the number of inflight messages
  61. // and the bandwidth each Progress can use.
  62. // When inflights is Full, no more message should be sent.
  63. // When a leader sends out a message, the index of the last
  64. // entry should be added to inflights. The index MUST be added
  65. // into inflights in order.
  66. // When a leader receives a reply, the previous inflights should
  67. // be freed by calling inflights.FreeLE with the index of the last
  68. // received entry.
  69. Inflights *Inflights
  70. // IsLearner is true if this progress is tracked for a learner.
  71. IsLearner bool
  72. }
  73. // ResetState moves the Progress into the specified State, resetting ProbeSent,
  74. // PendingSnapshot, and Inflights.
  75. func (pr *Progress) ResetState(state StateType) {
  76. pr.ProbeSent = false
  77. pr.PendingSnapshot = 0
  78. pr.State = state
  79. pr.Inflights.reset()
  80. }
  81. func max(a, b uint64) uint64 {
  82. if a > b {
  83. return a
  84. }
  85. return b
  86. }
  87. func min(a, b uint64) uint64 {
  88. if a > b {
  89. return b
  90. }
  91. return a
  92. }
  93. // ProbeAcked is called when this peer has accepted an append. It resets
  94. // ProbeSent to signal that additional append messages should be sent without
  95. // further delay.
  96. func (pr *Progress) ProbeAcked() {
  97. pr.ProbeSent = false
  98. }
  99. // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
  100. // optionally and if larger, the index of the pending snapshot.
  101. func (pr *Progress) BecomeProbe() {
  102. // If the original state is StateSnapshot, progress knows that
  103. // the pending snapshot has been sent to this peer successfully, then
  104. // probes from pendingSnapshot + 1.
  105. if pr.State == StateSnapshot {
  106. pendingSnapshot := pr.PendingSnapshot
  107. pr.ResetState(StateProbe)
  108. pr.Next = max(pr.Match+1, pendingSnapshot+1)
  109. } else {
  110. pr.ResetState(StateProbe)
  111. pr.Next = pr.Match + 1
  112. }
  113. }
  114. // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
  115. func (pr *Progress) BecomeReplicate() {
  116. pr.ResetState(StateReplicate)
  117. pr.Next = pr.Match + 1
  118. }
  119. // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
  120. // snapshot index.
  121. func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
  122. pr.ResetState(StateSnapshot)
  123. pr.PendingSnapshot = snapshoti
  124. }
  125. // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
  126. // index acked by it. The method returns false if the given n index comes from
  127. // an outdated message. Otherwise it updates the progress and returns true.
  128. func (pr *Progress) MaybeUpdate(n uint64) bool {
  129. var updated bool
  130. if pr.Match < n {
  131. pr.Match = n
  132. updated = true
  133. pr.ProbeAcked()
  134. }
  135. if pr.Next < n+1 {
  136. pr.Next = n + 1
  137. }
  138. return updated
  139. }
  140. // OptimisticUpdate signals that appends all the way up to and including index n
  141. // are in-flight. As a result, Next is increased to n+1.
  142. func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
  143. // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
  144. // arguments are the index the follower rejected to append to its log, and its
  145. // last index.
  146. //
  147. // Rejections can happen spuriously as messages are sent out of order or
  148. // duplicated. In such cases, the rejection pertains to an index that the
  149. // Progress already knows were previously acknowledged, and false is returned
  150. // without changing the Progress.
  151. //
  152. // If the rejection is genuine, Next is lowered sensibly, and the Progress is
  153. // cleared for sending log entries.
  154. func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
  155. if pr.State == StateReplicate {
  156. // The rejection must be stale if the progress has matched and "rejected"
  157. // is smaller than "match".
  158. if rejected <= pr.Match {
  159. return false
  160. }
  161. // Directly decrease next to match + 1.
  162. //
  163. // TODO(tbg): why not use last if it's larger?
  164. pr.Next = pr.Match + 1
  165. return true
  166. }
  167. // The rejection must be stale if "rejected" does not match next - 1. This
  168. // is because non-replicating followers are probed one entry at a time.
  169. if pr.Next-1 != rejected {
  170. return false
  171. }
  172. if pr.Next = min(rejected, last+1); pr.Next < 1 {
  173. pr.Next = 1
  174. }
  175. pr.ProbeSent = false
  176. return true
  177. }
  178. // IsPaused returns whether sending log entries to this node has been throttled.
  179. // This is done when a node has rejected recent MsgApps, is currently waiting
  180. // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
  181. // operation, this is false. A throttled node will be contacted less frequently
  182. // until it has reached a state in which it's able to accept a steady stream of
  183. // log entries again.
  184. func (pr *Progress) IsPaused() bool {
  185. switch pr.State {
  186. case StateProbe:
  187. return pr.ProbeSent
  188. case StateReplicate:
  189. return pr.Inflights.Full()
  190. case StateSnapshot:
  191. return true
  192. default:
  193. panic("unexpected state")
  194. }
  195. }
  196. func (pr *Progress) String() string {
  197. var buf strings.Builder
  198. fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
  199. if pr.IsLearner {
  200. fmt.Fprint(&buf, " learner")
  201. }
  202. if pr.IsPaused() {
  203. fmt.Fprint(&buf, " paused")
  204. }
  205. if pr.PendingSnapshot > 0 {
  206. fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
  207. }
  208. if !pr.RecentActive {
  209. fmt.Fprintf(&buf, " inactive")
  210. }
  211. if n := pr.Inflights.Count(); n > 0 {
  212. fmt.Fprintf(&buf, " inflight=%d", n)
  213. if pr.Inflights.Full() {
  214. fmt.Fprint(&buf, "[full]")
  215. }
  216. }
  217. return buf.String()
  218. }
  219. // ProgressMap is a map of *Progress.
  220. type ProgressMap map[uint64]*Progress
  221. // String prints the ProgressMap in sorted key order, one Progress per line.
  222. func (m ProgressMap) String() string {
  223. ids := make([]uint64, 0, len(m))
  224. for k := range m {
  225. ids = append(ids, k)
  226. }
  227. sort.Slice(ids, func(i, j int) bool {
  228. return ids[i] < ids[j]
  229. })
  230. var buf strings.Builder
  231. for _, id := range ids {
  232. fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
  233. }
  234. return buf.String()
  235. }