Browse Source

Merge pull request #7332 from hhkbp2/fix-read-index

raft: fix read index request for #7331
Xiang Li 8 years ago
parent
commit
78fbe669ad
2 changed files with 76 additions and 0 deletions
  1. 5 0
      raft/raft.go
  2. 71 0
      raft/raft_test.go

+ 5 - 0
raft/raft.go

@@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) {
 		return
 	case pb.MsgReadIndex:
 		if r.quorum() > 1 {
+			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
+				// Reject read only request when this leader has not committed any log entry at its term.
+				return
+			}
+
 			// thinking: use an interally defined context instead of the user given context.
 			// We can express this in terms of the term and index instead of a user-supplied value.
 			// This would allow multiple reads to piggyback on the same message.

+ 71 - 0
raft/raft_test.go

@@ -1856,6 +1856,77 @@ func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
 	}
 }
 
+// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
+// when it commits at least one log entry at it term.
+func TestReadOnlyForNewLeader(t *testing.T) {
+	cfg := newTestConfig(1, []uint64{1, 2, 3}, 10, 1,
+		&MemoryStorage{
+			ents:      []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}},
+			hardState: pb.HardState{Commit: 1, Term: 1},
+		})
+	cfg.Applied = 1
+	a := newRaft(cfg)
+	cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
+		&MemoryStorage{
+			ents:      []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
+			hardState: pb.HardState{Commit: 2, Term: 1},
+		})
+	cfg.Applied = 2
+	b := newRaft(cfg)
+	cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
+		&MemoryStorage{
+			ents:      []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
+			hardState: pb.HardState{Commit: 2, Term: 1},
+		})
+	cfg.Applied = 2
+	c := newRaft(cfg)
+	nt := newNetwork(a, b, c)
+
+	// Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader.
+	nt.ignore(pb.MsgApp)
+	// Force peer a to become leader.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+	if a.state != StateLeader {
+		t.Fatalf("state = %s, want %s", a.state, StateLeader)
+	}
+
+	// Ensure peer a drops read only request.
+	var windex uint64 = 4
+	wctx := []byte("ctx")
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
+	if len(a.readStates) != 0 {
+		t.Fatalf("len(readStates) = %d, want zero", len(a.readStates))
+	}
+
+	nt.recover()
+
+	// Force peer a to commit a log entry at its term
+	for i := 0; i < a.heartbeatTimeout; i++ {
+		a.tick()
+	}
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+	if a.raftLog.committed != 4 {
+		t.Fatalf("committed = %d, want 4", a.raftLog.committed)
+	}
+	lastLogTerm := a.raftLog.zeroTermOnErrCompacted(a.raftLog.term(a.raftLog.committed))
+	if lastLogTerm != a.Term {
+		t.Fatalf("last log term = %d, want %d", lastLogTerm, a.Term)
+	}
+
+	// Ensure peer a accepts read only request after it commits a entry at its term.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
+	if len(a.readStates) != 1 {
+		t.Fatalf("len(readStates) = %d, want 1", len(a.readStates))
+	}
+	rs := a.readStates[0]
+	if rs.Index != windex {
+		t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
+	}
+	if !bytes.Equal(rs.RequestCtx, wctx) {
+		t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
+	}
+}
+
 func TestLeaderAppResp(t *testing.T) {
 	// initial progress: match = 0; next = 3
 	tests := []struct {