Browse Source

Merge pull request #6401 from hhkbp2/add-read-index-for-raft-rawnode

raft: add read index for RawNode
Xiang Li 9 years ago
parent
commit
8eac9fb93d
2 changed files with 74 additions and 0 deletions
  1. 14 0
      raft/rawnode.go
  2. 60 0
      raft/rawnode_test.go

+ 14 - 0
raft/rawnode.go

@@ -66,6 +66,9 @@ func (rn *RawNode) commitReady(rd Ready) {
 	if !IsEmptySnap(rd.Snapshot) {
 		rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
 	}
+	if len(rd.ReadStates) != 0 {
+		rn.raft.readStates = nil
+	}
 }
 
 // NewRawNode returns a new RawNode given configuration and a list of raft peers.
@@ -205,6 +208,9 @@ func (rn *RawNode) HasReady() bool {
 	if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
 		return true
 	}
+	if len(r.readStates) != 0 {
+		return true
+	}
 	return false
 }
 
@@ -236,3 +242,11 @@ func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
 func (rn *RawNode) TransferLeader(transferee uint64) {
 	_ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
 }
+
+// ReadIndex requests a read state. The read state will be set in ready.
+// Read State has a read index. Once the application advances further than the read
+// index, any linearizable read requests issued before the read request can be
+// processed safely. The read state will have the same rctx attached.
+func (rn *RawNode) ReadIndex(rctx []byte) {
+	_ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
+}

+ 60 - 0
raft/rawnode_test.go

@@ -110,6 +110,66 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
 	}
 }
 
+// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
+// to the underlying raft. It also ensures that ReadState can be read out.
+func TestRawNodeReadIndex(t *testing.T) {
+	msgs := []raftpb.Message{}
+	appendStep := func(r *raft, m raftpb.Message) {
+		msgs = append(msgs, m)
+	}
+	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
+
+	s := NewMemoryStorage()
+	c := newTestConfig(1, nil, 10, 1, s)
+	rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rawNode.raft.readStates = wrs
+	// ensure the ReadStates can be read out
+	hasReady := rawNode.HasReady()
+	if hasReady != true {
+		t.Errorf("HasReady() returns %t, want %t", hasReady, true)
+	}
+	rd := rawNode.Ready()
+	if !reflect.DeepEqual(rd.ReadStates, wrs) {
+		t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
+	}
+	s.Append(rd.Entries)
+	rawNode.Advance(rd)
+	// ensure raft.readStates is reset after advance
+	if rawNode.raft.readStates != nil {
+		t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
+	}
+
+	wrequestCtx := []byte("somedata2")
+	rawNode.Campaign()
+	for {
+		rd = rawNode.Ready()
+		s.Append(rd.Entries)
+
+		if rd.SoftState.Lead == rawNode.raft.id {
+			rawNode.Advance(rd)
+
+			// Once we are the leader, issue a ReadIndex request
+			rawNode.raft.step = appendStep
+			rawNode.ReadIndex(wrequestCtx)
+			break
+		}
+		rawNode.Advance(rd)
+	}
+	// ensure that MsgReadIndex message is sent to the underlying raft
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
+	}
+	if msgs[0].Type != raftpb.MsgReadIndex {
+		t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
+	}
+	if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
+		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
+	}
+}
+
 // TestBlockProposal from node_test.go has no equivalent in rawNode because there is
 // no leader check in RawNode.