Browse Source

Merge pull request #5553 from swingbach/master

raft: implemented read-only query when quorum check is on
Xiang Li 9 years ago
parent
commit
5f1c763993
6 changed files with 186 additions and 1 deletions
  1. 21 1
      raft/node.go
  2. 52 0
      raft/node_test.go
  3. 36 0
      raft/raft.go
  4. 69 0
      raft/raft_test.go
  5. 6 0
      raft/raftpb/raft.pb.go
  6. 2 0
      raft/raftpb/raft.proto

+ 21 - 1
raft/node.go

@@ -60,6 +60,12 @@ type Ready struct {
 	// HardState will be equal to empty state if there is no update.
 	// HardState will be equal to empty state if there is no update.
 	pb.HardState
 	pb.HardState
 
 
+	// ReadState can be used for node to serve linearizable read requests locally
+	// when its applied index is greater than the index in ReadState.
+	// Note that the readState will be returned when raft receives msgReadIndex.
+	// The returned is only valid for the request that requested to read.
+	ReadState
+
 	// Entries specifies entries to be saved to stable storage BEFORE
 	// Entries specifies entries to be saved to stable storage BEFORE
 	// Messages are sent.
 	// Messages are sent.
 	Entries []pb.Entry
 	Entries []pb.Entry
@@ -96,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool {
 func (rd Ready) containsUpdates() bool {
 func (rd Ready) containsUpdates() bool {
 	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
 	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
 		!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
 		!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
-		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
+		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None
 }
 }
 
 
 // Node represents a node in a raft cluster.
 // Node represents a node in a raft cluster.
@@ -361,7 +367,10 @@ func (n *node) run(r *raft) {
 			if !IsEmptySnap(rd.Snapshot) {
 			if !IsEmptySnap(rd.Snapshot) {
 				prevSnapi = rd.Snapshot.Metadata.Index
 				prevSnapi = rd.Snapshot.Metadata.Index
 			}
 			}
+
 			r.msgs = nil
 			r.msgs = nil
+			r.readState.Index = None
+			r.readState.RequestCtx = nil
 			advancec = n.advancec
 			advancec = n.advancec
 		case <-advancec:
 		case <-advancec:
 			if prevHardSt.Commit != 0 {
 			if prevHardSt.Commit != 0 {
@@ -478,6 +487,10 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
 	}
 	}
 }
 }
 
 
+func (n *node) ReadIndex(ctx context.Context, id uint64, rctx []byte) error {
+	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, From: id, Entries: []pb.Entry{{Data: rctx}}})
+}
+
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	rd := Ready{
 	rd := Ready{
 		Entries:          r.raftLog.unstableEntries(),
 		Entries:          r.raftLog.unstableEntries(),
@@ -493,5 +506,12 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if r.raftLog.unstable.snapshot != nil {
 	if r.raftLog.unstable.snapshot != nil {
 		rd.Snapshot = *r.raftLog.unstable.snapshot
 		rd.Snapshot = *r.raftLog.unstable.snapshot
 	}
 	}
+	if r.readState.Index != None {
+		c := make([]byte, len(r.readState.RequestCtx))
+		copy(c, r.readState.RequestCtx)
+
+		rd.Index = r.readState.Index
+		rd.RequestCtx = c
+	}
 	return rd
 	return rd
 }
 }

+ 52 - 0
raft/node_test.go

@@ -142,6 +142,58 @@ func TestNodePropose(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
+// It also ensures that ReadState can be read out through ready chan.
+func TestNodeReadIndex(t *testing.T) {
+	msgs := []raftpb.Message{}
+	appendStep := func(r *raft, m raftpb.Message) {
+		msgs = append(msgs, m)
+	}
+	wreadIndex := uint64(1)
+	wrequestCtx := []byte("somedata")
+
+	n := newNode()
+	s := NewMemoryStorage()
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
+	r.readState.Index = wreadIndex
+	r.readState.RequestCtx = wrequestCtx
+	go n.run(r)
+	n.Campaign(context.TODO())
+	for {
+		rd := <-n.Ready()
+		if rd.Index != wreadIndex {
+			t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex)
+		}
+
+		if !reflect.DeepEqual(rd.RequestCtx, wrequestCtx) {
+			t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx)
+		}
+
+		s.Append(rd.Entries)
+
+		if rd.SoftState.Lead == r.id {
+			n.Advance()
+			break
+		}
+		n.Advance()
+	}
+
+	r.step = appendStep
+	wrequestCtx = []byte("somedata2")
+	n.ReadIndex(context.TODO(), r.id, wrequestCtx)
+	n.Stop()
+
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
+	}
+	if msgs[0].Type != raftpb.MsgReadIndex {
+		t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
+	}
+	if !reflect.DeepEqual(msgs[0].Entries[0].Data, wrequestCtx) {
+		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
+	}
+}
+
 // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
 // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
 // to the underlying raft.
 // to the underlying raft.
 func TestNodeProposeConfig(t *testing.T) {
 func TestNodeProposeConfig(t *testing.T) {

+ 36 - 0
raft/raft.go

@@ -133,12 +133,24 @@ func (c *Config) validate() error {
 	return nil
 	return nil
 }
 }
 
 
+// ReadState provides state for read only query.
+// It's caller's responsibility to send MsgReadIndex first before getting
+// this state from ready, It's also caller's duty to differentiate if this
+// state is what it requests through RequestCtx, eg. given a unique id as
+// RequestCtx
+type ReadState struct {
+	Index      uint64
+	RequestCtx []byte
+}
+
 type raft struct {
 type raft struct {
 	id uint64
 	id uint64
 
 
 	Term uint64
 	Term uint64
 	Vote uint64
 	Vote uint64
 
 
+	readState ReadState
+
 	// the log
 	// the log
 	raftLog *raftLog
 	raftLog *raftLog
 
 
@@ -208,6 +220,7 @@ func newRaft(c *Config) *raft {
 	r := &raft{
 	r := &raft{
 		id:               c.ID,
 		id:               c.ID,
 		lead:             None,
 		lead:             None,
+		readState:        ReadState{Index: None, RequestCtx: nil},
 		raftLog:          raftlog,
 		raftLog:          raftlog,
 		maxMsgSize:       c.MaxSizePerMsg,
 		maxMsgSize:       c.MaxSizePerMsg,
 		maxInflight:      c.MaxInflightMsgs,
 		maxInflight:      c.MaxInflightMsgs,
@@ -642,6 +655,14 @@ func stepLeader(r *raft, m pb.Message) {
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		return
 		return
+	case pb.MsgReadIndex:
+		ri := None
+		if r.checkQuorum {
+			ri = r.raftLog.committed
+		}
+
+		r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+		return
 	}
 	}
 
 
 	// All other message types require a progress for m.From (pr).
 	// All other message types require a progress for m.From (pr).
@@ -822,6 +843,21 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgTimeoutNow:
 	case pb.MsgTimeoutNow:
 		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
 		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
 		r.campaign()
 		r.campaign()
+	case pb.MsgReadIndex:
+		if r.lead == None {
+			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
+			return
+		}
+		m.To = r.lead
+		r.send(m)
+	case pb.MsgReadIndexResp:
+		if len(m.Entries) != 1 {
+			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
+			return
+		}
+
+		r.readState.Index = m.Index
+		r.readState.RequestCtx = m.Entries[0].Data
 	}
 	}
 }
 }
 
 

+ 69 - 0
raft/raft_test.go

@@ -1405,6 +1405,75 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestReadIndexWithCheckQuorum(t *testing.T) {
+	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+
+	a.checkQuorum = true
+	b.checkQuorum = true
+	c.checkQuorum = true
+
+	nt := newNetwork(a, b, c)
+	for i := 0; i < b.electionTimeout; i++ {
+		b.tick()
+	}
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	if a.state != StateLeader {
+		t.Fatalf("state = %s, want %s", a.state, StateLeader)
+	}
+
+	tests := []struct {
+		sm        *raft
+		proposals int
+		wri       uint64
+		wctx      []byte
+	}{
+		{b, 10, 11, []byte("ctx1")},
+		{c, 10, 21, []byte("ctx2")},
+		{b, 10, 31, []byte("ctx3")},
+		{c, 10, 41, []byte("ctx4")},
+	}
+
+	for _, tt := range tests {
+		for j := 0; j < tt.proposals; j++ {
+			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+		}
+
+		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
+
+		r := tt.sm
+		if r.readState.Index != tt.wri {
+			t.Errorf("readIndex = %d, want %d", r.readState.Index, tt.wri)
+		}
+
+		if !bytes.Equal(r.readState.RequestCtx, tt.wctx) {
+			t.Errorf("requestCtx = %v, want %v", r.readState.RequestCtx, tt.wctx)
+		}
+	}
+}
+
+func TestReadIndexWithoutCheckQuorum(t *testing.T) {
+	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+
+	nt := newNetwork(a, b, c)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	ctx := []byte("ctx1")
+	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
+
+	if b.readState.Index != None {
+		t.Errorf("readIndex = %d, want %d", b.readState.Index, None)
+	}
+
+	if !bytes.Equal(b.readState.RequestCtx, ctx) {
+		t.Errorf("requestCtx = %v, want %v", b.readState.RequestCtx, ctx)
+	}
+}
+
 func TestLeaderAppResp(t *testing.T) {
 func TestLeaderAppResp(t *testing.T) {
 	// initial progress: match = 0; next = 3
 	// initial progress: match = 0; next = 3
 	tests := []struct {
 	tests := []struct {

+ 6 - 0
raft/raftpb/raft.pb.go

@@ -90,6 +90,8 @@ const (
 	MsgCheckQuorum    MessageType = 12
 	MsgCheckQuorum    MessageType = 12
 	MsgTransferLeader MessageType = 13
 	MsgTransferLeader MessageType = 13
 	MsgTimeoutNow     MessageType = 14
 	MsgTimeoutNow     MessageType = 14
+	MsgReadIndex      MessageType = 15
+	MsgReadIndexResp  MessageType = 16
 )
 )
 
 
 var MessageType_name = map[int32]string{
 var MessageType_name = map[int32]string{
@@ -108,6 +110,8 @@ var MessageType_name = map[int32]string{
 	12: "MsgCheckQuorum",
 	12: "MsgCheckQuorum",
 	13: "MsgTransferLeader",
 	13: "MsgTransferLeader",
 	14: "MsgTimeoutNow",
 	14: "MsgTimeoutNow",
+	15: "MsgReadIndex",
+	16: "MsgReadIndexResp",
 }
 }
 var MessageType_value = map[string]int32{
 var MessageType_value = map[string]int32{
 	"MsgHup":            0,
 	"MsgHup":            0,
@@ -125,6 +129,8 @@ var MessageType_value = map[string]int32{
 	"MsgCheckQuorum":    12,
 	"MsgCheckQuorum":    12,
 	"MsgTransferLeader": 13,
 	"MsgTransferLeader": 13,
 	"MsgTimeoutNow":     14,
 	"MsgTimeoutNow":     14,
+	"MsgReadIndex":      15,
+	"MsgReadIndexResp":  16,
 }
 }
 
 
 func (x MessageType) Enum() *MessageType {
 func (x MessageType) Enum() *MessageType {

+ 2 - 0
raft/raftpb/raft.proto

@@ -48,6 +48,8 @@ enum MessageType {
 	MsgCheckQuorum     = 12;
 	MsgCheckQuorum     = 12;
 	MsgTransferLeader  = 13;
 	MsgTransferLeader  = 13;
 	MsgTimeoutNow      = 14;
 	MsgTimeoutNow      = 14;
+	MsgReadIndex       = 15;
+	MsgReadIndexResp   = 16;
 }
 }
 
 
 message Message {
 message Message {