Kaynağa Gözat

raft: leader response to learner MsgReadIndex

Leader should check message sender after receiving MsgReadIndex, even
when raft quorum is 1. The message could be sent from learner node, and
leader should respond.
Jingyi Hu 6 yıl önce
ebeveyn
işleme
5088d70d69
2 değiştirilmiş dosya ile 57 ekleme ve 2 silme
  1. 6 2
      raft/raft.go
  2. 51 0
      raft/raft_test.go

+ 6 - 2
raft/raft.go

@@ -1044,8 +1044,12 @@ func stepLeader(r *raft, m pb.Message) error {
 					r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
 				}
 			}
-		} else {
-			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+		} else { // there is only one voting member (the leader) in the cluster
+			if m.From == None || m.From == r.id { // from leader itself
+				r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+			} else { // from learner member
+				r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
+			}
 		}
 
 		return nil

+ 51 - 0
raft/raft_test.go

@@ -2416,6 +2416,57 @@ func TestReadOnlyOptionSafe(t *testing.T) {
 	}
 }
 
+func TestReadOnlyWithLearner(t *testing.T) {
+	a := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
+	b := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
+
+	nt := newNetwork(a, b)
+	setRandomizedElectionTimeout(b, b.electionTimeout+1)
+
+	for i := 0; i < b.electionTimeout; i++ {
+		b.tick()
+	}
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	if a.state != StateLeader {
+		t.Fatalf("state = %s, want %s", a.state, StateLeader)
+	}
+
+	tests := []struct {
+		sm        *raft
+		proposals int
+		wri       uint64
+		wctx      []byte
+	}{
+		{a, 10, 11, []byte("ctx1")},
+		{b, 10, 21, []byte("ctx2")},
+		{a, 10, 31, []byte("ctx3")},
+		{b, 10, 41, []byte("ctx4")},
+	}
+
+	for i, tt := range tests {
+		for j := 0; j < tt.proposals; j++ {
+			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+		}
+
+		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
+
+		r := tt.sm
+		if len(r.readStates) == 0 {
+			t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
+		}
+		rs := r.readStates[0]
+		if rs.Index != tt.wri {
+			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
+		}
+
+		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
+			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
+		}
+		r.readStates = nil
+	}
+}
+
 func TestReadOnlyOptionLease(t *testing.T) {
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())