Browse Source

Merge pull request #10590 from jingyih/fix_readIndex_for_learner

raft: leader respond to learner read index message
Xiang Li 6 years ago
parent
commit
952b9e75c6
2 changed files with 57 additions and 2 deletions
  1. 6 2
      raft/raft.go
  2. 51 0
      raft/raft_test.go

+ 6 - 2
raft/raft.go

@@ -1044,8 +1044,12 @@ func stepLeader(r *raft, m pb.Message) error {
 					r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
 					r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
 				}
 				}
 			}
 			}
-		} else {
-			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+		} else { // there is only one voting member (the leader) in the cluster
+			if m.From == None || m.From == r.id { // from leader itself
+				r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+			} else { // from learner member
+				r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
+			}
 		}
 		}
 
 
 		return nil
 		return nil

+ 51 - 0
raft/raft_test.go

@@ -2416,6 +2416,57 @@ func TestReadOnlyOptionSafe(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestReadOnlyWithLearner(t *testing.T) {
+	a := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
+	b := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
+
+	nt := newNetwork(a, b)
+	setRandomizedElectionTimeout(b, b.electionTimeout+1)
+
+	for i := 0; i < b.electionTimeout; i++ {
+		b.tick()
+	}
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	if a.state != StateLeader {
+		t.Fatalf("state = %s, want %s", a.state, StateLeader)
+	}
+
+	tests := []struct {
+		sm        *raft
+		proposals int
+		wri       uint64
+		wctx      []byte
+	}{
+		{a, 10, 11, []byte("ctx1")},
+		{b, 10, 21, []byte("ctx2")},
+		{a, 10, 31, []byte("ctx3")},
+		{b, 10, 41, []byte("ctx4")},
+	}
+
+	for i, tt := range tests {
+		for j := 0; j < tt.proposals; j++ {
+			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+		}
+
+		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
+
+		r := tt.sm
+		if len(r.readStates) == 0 {
+			t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
+		}
+		rs := r.readStates[0]
+		if rs.Index != tt.wri {
+			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
+		}
+
+		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
+			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
+		}
+		r.readStates = nil
+	}
+}
+
 func TestReadOnlyOptionLease(t *testing.T) {
 func TestReadOnlyOptionLease(t *testing.T) {
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())