Browse Source

Merge pull request #5997 from xiang90/l_r

raft: fix readindex
Xiang Li 9 years ago
parent
commit
80c2e4098d
4 changed files with 25 additions and 8 deletions
  1. 4 3
      etcdserver/server_test.go
  2. 14 2
      raft/node.go
  3. 1 1
      raft/node_test.go
  4. 6 2
      raft/raft.go

+ 4 - 3
etcdserver/server_test.go

@@ -1357,9 +1357,10 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 	n.Record(testutil.Action{Name: "Step"})
 	return nil
 }
-func (n *nodeRecorder) Status() raft.Status      { return raft.Status{} }
-func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
-func (n *nodeRecorder) Advance()                 {}
+func (n *nodeRecorder) Status() raft.Status                              { return raft.Status{} }
+func (n *nodeRecorder) Ready() <-chan raft.Ready                         { return nil }
+func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
+func (n *nodeRecorder) Advance()                                         {}
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
 	n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
 	return &raftpb.ConfState{}

+ 14 - 2
raft/node.go

@@ -157,6 +157,18 @@ type Node interface {
 	// in snapshots. Will never return nil; it returns a pointer only
 	// to match MemoryStorage.Compact.
 	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+
+	// ReadIndex request a read state. The read state will be set in the ready.
+	// Read state has a read index. Once the application advances further than the read
+	// index, any linearizable read requests issued before the read request can be
+	// processed safely. The read state will have the same rctx attached.
+	//
+	// Note: the current implementation depends on the leader lease. If the clock drift is unbounded,
+	// leader might keep the lease longer than it should (clock can move backward/pause without any bound).
+	// ReadIndex is not safe in that case.
+	// TODO: add clock drift bound into raft configuration.
+	ReadIndex(ctx context.Context, rctx []byte) error
+
 	// Status returns the current status of the raft state machine.
 	Status() Status
 	// ReportUnreachable reports the given node is not reachable for the last send.
@@ -487,8 +499,8 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
 	}
 }
 
-func (n *node) ReadIndex(ctx context.Context, id uint64, rctx []byte) error {
-	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, From: id, Entries: []pb.Entry{{Data: rctx}}})
+func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
+	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
 }
 
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {

+ 1 - 1
raft/node_test.go

@@ -180,7 +180,7 @@ func TestNodeReadIndex(t *testing.T) {
 
 	r.step = appendStep
 	wrequestCtx = []byte("somedata2")
-	n.ReadIndex(context.TODO(), r.id, wrequestCtx)
+	n.ReadIndex(context.TODO(), wrequestCtx)
 	n.Stop()
 
 	if len(msgs) != 1 {

+ 6 - 2
raft/raft.go

@@ -681,8 +681,12 @@ func stepLeader(r *raft, m pb.Message) {
 		if r.checkQuorum {
 			ri = r.raftLog.committed
 		}
-
-		r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+		if m.From == None || m.From == r.id { // from local member
+			r.readState.Index = ri
+			r.readState.RequestCtx = m.Entries[0].Data
+		} else {
+			r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+		}
 		return
 	}