123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- // Copyright 2019 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package tracker
- import (
- "fmt"
- "sort"
- "strings"
- )
- // Progress represents a follower’s progress in the view of the leader. Leader
- // maintains progresses of all followers, and sends entries to the follower
- // based on its progress.
- //
- // NB(tbg): Progress is basically a state machine whose transitions are mostly
- // strewn around `*raft.raft`. Additionally, some fields are only used when in a
- // certain State. All of this isn't ideal.
- type Progress struct {
- Match, Next uint64
- // State defines how the leader should interact with the follower.
- //
- // When in StateProbe, leader sends at most one replication message
- // per heartbeat interval. It also probes actual progress of the follower.
- //
- // When in StateReplicate, leader optimistically increases next
- // to the latest entry sent after sending replication message. This is
- // an optimized state for fast replicating log entries to the follower.
- //
- // When in StateSnapshot, leader should have sent out snapshot
- // before and stops sending any replication message.
- State StateType
- // PendingSnapshot is used in StateSnapshot.
- // If there is a pending snapshot, the pendingSnapshot will be set to the
- // index of the snapshot. If pendingSnapshot is set, the replication process of
- // this Progress will be paused. raft will not resend snapshot until the pending one
- // is reported to be failed.
- PendingSnapshot uint64
- // RecentActive is true if the progress is recently active. Receiving any messages
- // from the corresponding follower indicates the progress is active.
- // RecentActive can be reset to false after an election timeout.
- //
- // TODO(tbg): the leader should always have this set to true.
- RecentActive bool
- // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
- // true, raft should pause sending replication message to this peer until
- // ProbeSent is reset. See ProbeAcked() and IsPaused().
- ProbeSent bool
- // Inflights is a sliding window for the inflight messages.
- // Each inflight message contains one or more log entries.
- // The max number of entries per message is defined in raft config as MaxSizePerMsg.
- // Thus inflight effectively limits both the number of inflight messages
- // and the bandwidth each Progress can use.
- // When inflights is Full, no more message should be sent.
- // When a leader sends out a message, the index of the last
- // entry should be added to inflights. The index MUST be added
- // into inflights in order.
- // When a leader receives a reply, the previous inflights should
- // be freed by calling inflights.FreeLE with the index of the last
- // received entry.
- Inflights *Inflights
- // IsLearner is true if this progress is tracked for a learner.
- IsLearner bool
- }
- // ResetState moves the Progress into the specified State, resetting ProbeSent,
- // PendingSnapshot, and Inflights.
- func (pr *Progress) ResetState(state StateType) {
- pr.ProbeSent = false
- pr.PendingSnapshot = 0
- pr.State = state
- pr.Inflights.reset()
- }
- func max(a, b uint64) uint64 {
- if a > b {
- return a
- }
- return b
- }
- func min(a, b uint64) uint64 {
- if a > b {
- return b
- }
- return a
- }
- // ProbeAcked is called when this peer has accepted an append. It resets
- // ProbeSent to signal that additional append messages should be sent without
- // further delay.
- func (pr *Progress) ProbeAcked() {
- pr.ProbeSent = false
- }
- // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
- // optionally and if larger, the index of the pending snapshot.
- func (pr *Progress) BecomeProbe() {
- // If the original state is StateSnapshot, progress knows that
- // the pending snapshot has been sent to this peer successfully, then
- // probes from pendingSnapshot + 1.
- if pr.State == StateSnapshot {
- pendingSnapshot := pr.PendingSnapshot
- pr.ResetState(StateProbe)
- pr.Next = max(pr.Match+1, pendingSnapshot+1)
- } else {
- pr.ResetState(StateProbe)
- pr.Next = pr.Match + 1
- }
- }
- // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
- func (pr *Progress) BecomeReplicate() {
- pr.ResetState(StateReplicate)
- pr.Next = pr.Match + 1
- }
- // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
- // snapshot index.
- func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
- pr.ResetState(StateSnapshot)
- pr.PendingSnapshot = snapshoti
- }
- // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
- // index acked by it. The method returns false if the given n index comes from
- // an outdated message. Otherwise it updates the progress and returns true.
- func (pr *Progress) MaybeUpdate(n uint64) bool {
- var updated bool
- if pr.Match < n {
- pr.Match = n
- updated = true
- pr.ProbeAcked()
- }
- if pr.Next < n+1 {
- pr.Next = n + 1
- }
- return updated
- }
- // OptimisticUpdate signals that appends all the way up to and including index n
- // are in-flight. As a result, Next is increased to n+1.
- func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
- // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
- // arguments are the index the follower rejected to append to its log, and its
- // last index.
- //
- // Rejections can happen spuriously as messages are sent out of order or
- // duplicated. In such cases, the rejection pertains to an index that the
- // Progress already knows were previously acknowledged, and false is returned
- // without changing the Progress.
- //
- // If the rejection is genuine, Next is lowered sensibly, and the Progress is
- // cleared for sending log entries.
- func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
- if pr.State == StateReplicate {
- // The rejection must be stale if the progress has matched and "rejected"
- // is smaller than "match".
- if rejected <= pr.Match {
- return false
- }
- // Directly decrease next to match + 1.
- //
- // TODO(tbg): why not use last if it's larger?
- pr.Next = pr.Match + 1
- return true
- }
- // The rejection must be stale if "rejected" does not match next - 1. This
- // is because non-replicating followers are probed one entry at a time.
- if pr.Next-1 != rejected {
- return false
- }
- if pr.Next = min(rejected, last+1); pr.Next < 1 {
- pr.Next = 1
- }
- pr.ProbeSent = false
- return true
- }
- // IsPaused returns whether sending log entries to this node has been throttled.
- // This is done when a node has rejected recent MsgApps, is currently waiting
- // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
- // operation, this is false. A throttled node will be contacted less frequently
- // until it has reached a state in which it's able to accept a steady stream of
- // log entries again.
- func (pr *Progress) IsPaused() bool {
- switch pr.State {
- case StateProbe:
- return pr.ProbeSent
- case StateReplicate:
- return pr.Inflights.Full()
- case StateSnapshot:
- return true
- default:
- panic("unexpected state")
- }
- }
- func (pr *Progress) String() string {
- var buf strings.Builder
- fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
- if pr.IsLearner {
- fmt.Fprint(&buf, " learner")
- }
- if pr.IsPaused() {
- fmt.Fprint(&buf, " paused")
- }
- if pr.PendingSnapshot > 0 {
- fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
- }
- if !pr.RecentActive {
- fmt.Fprintf(&buf, " inactive")
- }
- if n := pr.Inflights.Count(); n > 0 {
- fmt.Fprintf(&buf, " inflight=%d", n)
- if pr.Inflights.Full() {
- fmt.Fprint(&buf, "[full]")
- }
- }
- return buf.String()
- }
- // ProgressMap is a map of *Progress.
- type ProgressMap map[uint64]*Progress
- // String prints the ProgressMap in sorted key order, one Progress per line.
- func (m ProgressMap) String() string {
- ids := make([]uint64, 0, len(m))
- for k := range m {
- ids = append(ids, k)
- }
- sort.Slice(ids, func(i, j int) bool {
- return ids[i] < ids[j]
- })
- var buf strings.Builder
- for _, id := range ids {
- fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
- }
- return buf.String()
- }
|