Browse Source

raft: add reportUnreachable

Xiang Li 11 years ago
parent
commit
2af33fd494
8 changed files with 101 additions and 19 deletions
  1. 2 0
      etcdserver/server_test.go
  2. 10 1
      raft/node.go
  3. 12 6
      raft/raft.go
  4. 61 0
      raft/raft_test.go
  5. 13 10
      raft/raftpb/raft.pb.go
  6. 1 0
      raft/raftpb/raft.proto
  7. 1 1
      raft/status.go
  8. 1 1
      raft/util.go

+ 2 - 0
etcdserver/server_test.go

@@ -1310,6 +1310,8 @@ func (n *nodeRecorder) Stop() {
 	n.Record(testutil.Action{Name: "Stop"})
 }
 
+func (n *nodeRecorder) ReportUnreachable(id uint64) {}
+
 func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
 	n.Record(testutil.Action{Name: "Compact"})
 }

+ 10 - 1
raft/node.go

@@ -119,6 +119,8 @@ type Node interface {
 	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
 	// Status returns the current status of the raft state machine.
 	Status() Status
+	// Report reports the given node is not reachable for the last send.
+	ReportUnreachable(id uint64)
 	// Stop performs any necessary termination of the Node
 	Stop()
 }
@@ -270,7 +272,7 @@ func (n *node) run(r *raft) {
 			m.From = r.id
 			r.Step(m)
 		case m := <-n.recvc:
-			// filter out response message from unknow From.
+			// filter out response message from unknown From.
 			if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) {
 				r.Step(m) // raft never returns an error
 			}
@@ -418,6 +420,13 @@ func (n *node) Status() Status {
 	return <-c
 }
 
+func (n *node) ReportUnreachable(id uint64) {
+	select {
+	case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
+	case <-n.done:
+	}
+}
+
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	rd := Ready{
 		Entries:          r.raftLog.unstableEntries(),

+ 12 - 6
raft/raft.go

@@ -53,10 +53,13 @@ func (st StateType) String() string {
 type Progress struct {
 	Match, Next uint64
 	Wait        int
+	Unreachable bool
 }
 
 func (pr *Progress) update(n uint64) {
 	pr.waitReset()
+	pr.reachable()
+
 	if pr.Match < n {
 		pr.Match = n
 	}
@@ -71,6 +74,8 @@ func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
 // Otherwise it decreases the progress next index to min(rejected, last) and returns true.
 func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
 	pr.waitReset()
+	pr.reachable()
+
 	if pr.Match != 0 {
 		// the rejection must be stale if the progress has matched and "rejected"
 		// is smaller than "match".
@@ -101,7 +106,9 @@ func (pr *Progress) waitDecr(i int) {
 }
 func (pr *Progress) waitSet(w int)    { pr.Wait = w }
 func (pr *Progress) waitReset()       { pr.Wait = 0 }
-func (pr *Progress) shouldWait() bool { return pr.Match == 0 && pr.Wait > 0 }
+func (pr *Progress) reachable()       { pr.Unreachable = false }
+func (pr *Progress) unreachable()     { pr.Unreachable = true }
+func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }
 
 func (pr *Progress) String() string {
 	return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
@@ -243,12 +250,9 @@ func (r *raft) sendAppend(to uint64) {
 		m.Commit = r.raftLog.committed
 		// optimistically increase the next if the follower
 		// has been matched.
-		if n := len(m.Entries); pr.Match != 0 && n != 0 {
+		if n := len(m.Entries); pr.Match != 0 && !pr.Unreachable && n != 0 {
 			pr.optimisticUpdate(m.Entries[n-1].Index)
-		} else if pr.Match == 0 {
-			// TODO (xiangli): better way to find out if the follower is in good path or not
-			// a follower might be in bad path even if match != 0, since we optimistically
-			// increase the next.
+		} else if pr.Match == 0 || pr.Unreachable {
 			pr.waitSet(r.heartbeatTimeout)
 		}
 	}
@@ -508,6 +512,8 @@ func stepLeader(r *raft, m pb.Message) {
 		log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
+	case pb.MsgUnreachable:
+		r.prs[m.From].unreachable()
 	}
 }
 

+ 61 - 0
raft/raft_test.go

@@ -1289,6 +1289,67 @@ func TestLeaderIncreaseNext(t *testing.T) {
 	}
 }
 
+func TestUnreachable(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	s := NewMemoryStorage()
+	s.Append(previousEnts)
+	r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.readMessages()
+
+	// set node 2 to unreachable
+	r.prs[2].Match = 3
+	r.prs[2].Next = 4
+	r.prs[2].Wait = 0
+	r.prs[2].unreachable()
+
+	for i := 0; i < 3; i++ {
+		// node 2 is unreachable, we expect that raft will only send out one msgAPP per heartbeat timeout
+		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+		msg := r.readMessages()
+		if len(msg) != 1 {
+			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
+		}
+		if msg[0].Index != 3 {
+			t.Errorf("index = %d, want %d", msg[0].Index, 3)
+		}
+
+		if r.prs[2].Wait != r.heartbeatTimeout {
+			t.Errorf("wait = %d, want %d", r.prs[1].Wait, r.heartbeatTimeout)
+		}
+		for i := 0; i < 10; i++ {
+			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+			if l := len(r.readMessages()); l != 0 {
+				t.Errorf("len(msg) = %d, want %d", l, 0)
+			}
+		}
+
+		// do a heartbeat
+		for i := 0; i < r.heartbeatTimeout; i++ {
+			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
+		}
+		// consume the heartbeat
+		msg = r.readMessages()
+		if len(msg) != 1 {
+			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
+		}
+		if msg[0].Type != pb.MsgHeartbeat {
+			t.Errorf("type = %s, want %s", msg[0].Type, pb.MsgHeartbeat)
+		}
+	}
+
+	// recover node 2
+	r.prs[2].reachable()
+	for i := 0; i < 10; i++ {
+		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+		msgs := r.readMessages()
+		if len(msgs) != 1 {
+			t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
+		}
+	}
+}
+
 func TestRestore(t *testing.T) {
 	s := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{

+ 13 - 10
raft/raftpb/raft.pb.go

@@ -79,19 +79,21 @@ const (
 	MsgSnap          MessageType = 7
 	MsgHeartbeat     MessageType = 8
 	MsgHeartbeatResp MessageType = 9
+	MsgUnreachable   MessageType = 10
 )
 
 var MessageType_name = map[int32]string{
-	0: "MsgHup",
-	1: "MsgBeat",
-	2: "MsgProp",
-	3: "MsgApp",
-	4: "MsgAppResp",
-	5: "MsgVote",
-	6: "MsgVoteResp",
-	7: "MsgSnap",
-	8: "MsgHeartbeat",
-	9: "MsgHeartbeatResp",
+	0:  "MsgHup",
+	1:  "MsgBeat",
+	2:  "MsgProp",
+	3:  "MsgApp",
+	4:  "MsgAppResp",
+	5:  "MsgVote",
+	6:  "MsgVoteResp",
+	7:  "MsgSnap",
+	8:  "MsgHeartbeat",
+	9:  "MsgHeartbeatResp",
+	10: "MsgUnreachable",
 }
 var MessageType_value = map[string]int32{
 	"MsgHup":           0,
@@ -104,6 +106,7 @@ var MessageType_value = map[string]int32{
 	"MsgSnap":          7,
 	"MsgHeartbeat":     8,
 	"MsgHeartbeatResp": 9,
+	"MsgUnreachable":   10,
 }
 
 func (x MessageType) Enum() *MessageType {

+ 1 - 0
raft/raftpb/raft.proto

@@ -42,6 +42,7 @@ enum MessageType {
 	MsgSnap          = 7;
 	MsgHeartbeat     = 8;
 	MsgHeartbeatResp = 9;
+	MsgUnreachable   = 10;
 }
 
 message Message {

+ 1 - 1
raft/status.go

@@ -58,7 +58,7 @@ func (s Status) MarshalJSON() ([]byte, error) {
 		j += "}}"
 	} else {
 		for k, v := range s.Progress {
-			subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d},`, k, v.Match, v.Next)
+			subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"unreachable":%t},`, k, v.Match, v.Next, v.Unreachable)
 			j += subj
 		}
 		// remove the trailing ","

+ 1 - 1
raft/util.go

@@ -49,7 +49,7 @@ func max(a, b uint64) uint64 {
 func IsLocalMsg(m pb.Message) bool { return m.Type == pb.MsgHup || m.Type == pb.MsgBeat }
 
 func IsResponseMsg(m pb.Message) bool {
-	return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp
+	return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp || m.Type == pb.MsgUnreachable
 }
 
 // EntryFormatter can be implemented by the application to provide human-readable formatting