Browse Source

Merge pull request #6749 from gyuho/raft-prevote

raft: do not attach term to MsgReadIndex
Gyu-Ho Lee 9 years ago
parent
commit
4a08678ce1
2 changed files with 61 additions and 2 deletions
  1. 58 0
      raft/node_test.go
  2. 3 2
      raft/raft.go

+ 58 - 0
raft/node_test.go

@@ -190,6 +190,64 @@ func TestNodeReadIndex(t *testing.T) {
 	}
 }
 
+// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
+// gets forwarded to the new leader and 'send' method does not attach its term.
+func TestNodeReadIndexToOldLeader(t *testing.T) {
+	r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+
+	nt := newNetwork(r1, r2, r3)
+
+	// elect r1 as leader
+	nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
+
+	var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
+
+	// send readindex request to r2(follower)
+	r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
+
+	// verify r2(follower) forwards this message to r1(leader) with term not set
+	if len(r2.msgs) != 1 {
+		t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
+	}
+	readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
+	if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
+		t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
+	}
+
+	// send readindex request to r3(follower)
+	r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
+
+	// verify r3(follower) forwards this message to r1(leader) with term not set as well.
+	if len(r3.msgs) != 1 {
+		t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
+	}
+	readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
+	if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
+		t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
+	}
+
+	// now elect r3 as leader
+	nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
+
+	// let r1 steps the two messages previously we got from r2, r3
+	r1.Step(readIndxMsg1)
+	r1.Step(readIndxMsg2)
+
+	// verify r1(follower) forwards these messages again to r3(new leader)
+	if len(r1.msgs) != 2 {
+		t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
+	}
+	readIndxMsg3 := raftpb.Message{From: 1, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
+	if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
+		t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
+	}
+	if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
+		t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
+	}
+}
+
 // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
 // to the underlying raft.
 func TestNodeProposeConfig(t *testing.T) {

+ 3 - 2
raft/raft.go

@@ -353,10 +353,11 @@ func (r *raft) send(m pb.Message) {
 		if m.Term != 0 {
 			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
 		}
-		// do not attach term to MsgProp
+		// do not attach term to MsgProp, MsgReadIndex
 		// proposals are a way to forward to the leader and
 		// should be treated as local message.
-		if m.Type != pb.MsgProp {
+		// MsgReadIndex is also forwarded to leader.
+		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
 			m.Term = r.Term
 		}
 	}