Browse Source

Merge pull request #9352 from gyuho/raft-pre-vote

*: configure Raft Pre-Vote to reduce disruptive rejoining servers
Gyuho Lee 7 years ago
parent
commit
89292affaa
9 changed files with 233 additions and 1 deletions
  1. 8 0
      embed/config.go
  2. 1 0
      embed/etcd.go
  3. 1 0
      etcdmain/config.go
  4. 2 0
      etcdmain/help.go
  5. 3 0
      etcdserver/config.go
  6. 3 0
      etcdserver/raft.go
  7. 6 0
      etcdserver/server.go
  8. 8 1
      raft/raft.go
  9. 201 0
      raft/raft_test.go

+ 8 - 0
embed/config.go

@@ -234,6 +234,13 @@ type Config struct {
 	ExperimentalInitialCorruptCheck bool          `json:"experimental-initial-corrupt-check"`
 	ExperimentalCorruptCheckTime    time.Duration `json:"experimental-corrupt-check-time"`
 	ExperimentalEnableV2V3          string        `json:"experimental-enable-v2v3"`
+
+	// ExperimentalPreVote is true to enable Raft Pre-Vote.
+	// If enabled, Raft runs an additional election phase
+	// to check whether it would get enough votes to win
+	// an election, thus minimizing disruptions.
+	// TODO: change to "pre-vote" and enable by default in 3.5.
+	ExperimentalPreVote bool `json:"experimental-pre-vote"`
 }
 
 // configYAML holds the config suitable for yaml parsing
@@ -293,6 +300,7 @@ func NewConfig() *Config {
 		EnableV2:              DefaultEnableV2,
 		HostWhitelist:         defaultHostWhitelist,
 		AuthToken:             "simple",
+		ExperimentalPreVote:   false, // TODO: enable by default in v3.5
 	}
 	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
 	return cfg

+ 1 - 0
embed/etcd.go

@@ -171,6 +171,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		AuthToken:               cfg.AuthToken,
 		InitialCorruptCheck:     cfg.ExperimentalInitialCorruptCheck,
 		CorruptCheckTime:        cfg.ExperimentalCorruptCheckTime,
+		PreVote:                 cfg.ExperimentalPreVote,
 		Debug:                   cfg.Debug,
 	}
 

+ 1 - 0
etcdmain/config.go

@@ -218,6 +218,7 @@ func newConfig() *config {
 	// experimental
 	fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
 	fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
+	fs.BoolVar(&cfg.ec.ExperimentalPreVote, "experimental-pre-vote", cfg.ec.ExperimentalPreVote, "Enable to run an additional Raft election phase.")
 
 	// ignored
 	for _, f := range cfg.ignored {

+ 2 - 0
etcdmain/help.go

@@ -197,5 +197,7 @@ experimental flags:
 		duration of time between cluster corruption check passes.
 	--experimental-enable-v2v3 ''
 		serve v2 requests through the v3 backend under a given prefix.
+	--experimental-pre-vote 'false'
+		enable to run an additional Raft election phase.
 `
 )

+ 3 - 0
etcdserver/config.go

@@ -76,6 +76,9 @@ type ServerConfig struct {
 	InitialCorruptCheck bool
 	CorruptCheckTime    time.Duration
 
+	// PreVote is true to enable Raft Pre-Vote.
+	PreVote bool
+
 	Debug bool
 }
 

+ 3 - 0
etcdserver/raft.go

@@ -411,6 +411,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
 		MaxSizePerMsg:   maxSizePerMsg,
 		MaxInflightMsgs: maxInflightMsgs,
 		CheckQuorum:     true,
+		PreVote:         cfg.PreVote,
 	}
 
 	n = raft.StartNode(c, peers)
@@ -445,6 +446,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
 		MaxSizePerMsg:   maxSizePerMsg,
 		MaxInflightMsgs: maxInflightMsgs,
 		CheckQuorum:     true,
+		PreVote:         cfg.PreVote,
 	}
 
 	n := raft.RestartNode(c)
@@ -501,6 +503,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
 		MaxSizePerMsg:   maxSizePerMsg,
 		MaxInflightMsgs: maxInflightMsgs,
 		CheckQuorum:     true,
+		PreVote:         cfg.PreVote,
 	}
 	n := raft.RestartNode(c)
 	raftStatus = n.Status

+ 6 - 0
etcdserver/server.go

@@ -258,6 +258,12 @@ type EtcdServer struct {
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // configuration is considered static for the lifetime of the EtcdServer.
 func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
+	if cfg.PreVote {
+		plog.Info("Raft Pre-Vote is enabled")
+	} else {
+		plog.Info("Raft Pre-Vote is disabled")
+	}
+
 	st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
 
 	var (

+ 8 - 1
raft/raft.go

@@ -817,8 +817,15 @@ func (r *raft) Step(m pb.Message) error {
 			// nodes that have been removed from the cluster's configuration: a
 			// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
 			// but it will not receive MsgApp or MsgHeartbeat, so it will not create
-			// disruptive term increases
+			// disruptive term increases, by notifying leader of this node's activeness.
 			// The above comments also true for Pre-Vote
+			//
+			// When follower gets isolated, it soon starts an election ending
+			// up with a higher term than leader, although it won't receive enough
+			// votes to win the election. When it regains connectivity, this response
+			// with "pb.MsgAppResp" of higher term would force leader to step down.
+			// However, this disruption is inevitable to free this stuck node with
+			// fresh election. This can be prevented with Pre-Vote phase.
 			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
 		} else if m.Type == pb.MsgPreVote {
 			// Before Pre-Vote enable, there may have candidate with higher term,

+ 201 - 0
raft/raft_test.go

@@ -1993,6 +1993,207 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
 	}
 }
 
+// TestDisruptiveFollower tests isolated follower,
+// with slow network incoming from leader, election times out
+// to become a candidate with an increased term. Then, the
+// candiate's response to late leader heartbeat forces the leader
+// to step down.
+func TestDisruptiveFollower(t *testing.T) {
+	n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+
+	n1.checkQuorum = true
+	n2.checkQuorum = true
+	n3.checkQuorum = true
+
+	n1.becomeFollower(1, None)
+	n2.becomeFollower(1, None)
+	n3.becomeFollower(1, None)
+
+	nt := newNetwork(n1, n2, n3)
+
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	// check state
+	// n1.state == StateLeader
+	// n2.state == StateFollower
+	// n3.state == StateFollower
+	if n1.state != StateLeader {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
+	}
+	if n2.state != StateFollower {
+		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
+	}
+	if n3.state != StateFollower {
+		t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
+	}
+
+	// etcd server "advanceTicksForElection" on restart;
+	// this is to expedite campaign trigger when given larger
+	// election timeouts (e.g. multi-datacenter deploy)
+	// Or leader messages are being delayed while ticks elapse
+	setRandomizedElectionTimeout(n3, n3.electionTimeout+2)
+	for i := 0; i < n3.randomizedElectionTimeout-1; i++ {
+		n3.tick()
+	}
+
+	// ideally, before last election tick elapses,
+	// the follower n3 receives "pb.MsgApp" or "pb.MsgHeartbeat"
+	// from leader n1, and then resets its "electionElapsed"
+	// however, last tick may elapse before receiving any
+	// messages from leader, thus triggering campaign
+	n3.tick()
+
+	// n1 is still leader yet
+	// while its heartbeat to candidate n3 is being delayed
+
+	// check state
+	// n1.state == StateLeader
+	// n2.state == StateFollower
+	// n3.state == StateCandidate
+	if n1.state != StateLeader {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
+	}
+	if n2.state != StateFollower {
+		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
+	}
+	if n3.state != StateCandidate {
+		t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
+	}
+	// check term
+	// n1.Term == 2
+	// n2.Term == 2
+	// n3.Term == 3
+	if n1.Term != 2 {
+		t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
+	}
+	if n2.Term != 2 {
+		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
+	}
+	if n3.Term != 3 {
+		t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
+	}
+
+	// while outgoing vote requests are still queued in n3,
+	// leader heartbeat finally arrives at candidate n3
+	// however, due to delayed network from leader, leader
+	// heartbeat was sent with lower term than candidate's
+	nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
+
+	// then candidate n3 responds with "pb.MsgAppResp" of higher term
+	// and leader steps down from a message with higher term
+	// this is to disrupt the current leader, so that candidate
+	// with higher term can be freed with following election
+
+	// check state
+	// n1.state == StateFollower
+	// n2.state == StateFollower
+	// n3.state == StateCandidate
+	if n1.state != StateFollower {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateFollower)
+	}
+	if n2.state != StateFollower {
+		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
+	}
+	if n3.state != StateCandidate {
+		t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
+	}
+	// check term
+	// n1.Term == 3
+	// n2.Term == 2
+	// n3.Term == 3
+	if n1.Term != 3 {
+		t.Fatalf("node 1 term: %d, want %d", n1.Term, 3)
+	}
+	if n2.Term != 2 {
+		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
+	}
+	if n3.Term != 3 {
+		t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
+	}
+}
+
+// TestDisruptiveFollowerPreVote tests isolated follower,
+// with slow network incoming from leader, election times out
+// to become a pre-candidate with less log than current leader.
+// Then pre-vote phase prevents this isolated node from forcing
+// current leader to step down, thus less disruptions.
+func TestDisruptiveFollowerPreVote(t *testing.T) {
+	n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+
+	n1.checkQuorum = true
+	n2.checkQuorum = true
+	n3.checkQuorum = true
+
+	n1.becomeFollower(1, None)
+	n2.becomeFollower(1, None)
+	n3.becomeFollower(1, None)
+
+	nt := newNetwork(n1, n2, n3)
+
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	// check state
+	// n1.state == StateLeader
+	// n2.state == StateFollower
+	// n3.state == StateFollower
+	if n1.state != StateLeader {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
+	}
+	if n2.state != StateFollower {
+		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
+	}
+	if n3.state != StateFollower {
+		t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
+	}
+
+	nt.isolate(3)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+	n1.preVote = true
+	n2.preVote = true
+	n3.preVote = true
+	nt.recover()
+	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
+
+	// check state
+	// n1.state == StateLeader
+	// n2.state == StateFollower
+	// n3.state == StatePreCandidate
+	if n1.state != StateLeader {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
+	}
+	if n2.state != StateFollower {
+		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
+	}
+	if n3.state != StatePreCandidate {
+		t.Fatalf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
+	}
+	// check term
+	// n1.Term == 2
+	// n2.Term == 2
+	// n3.Term == 2
+	if n1.Term != 2 {
+		t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
+	}
+	if n2.Term != 2 {
+		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
+	}
+	if n3.Term != 2 {
+		t.Fatalf("node 2 term: %d, want %d", n3.Term, 2)
+	}
+
+	// delayed leader heartbeat does not force current leader to step down
+	nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
+	if n1.state != StateLeader {
+		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
+	}
+}
+
 func TestReadOnlyOptionSafe(t *testing.T) {
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())