Browse Source

Merge pull request #6275 from xiang90/raft_l

raft: support safe readonly request
Xiang Li 9 years ago
parent
commit
cfe717e926
6 changed files with 297 additions and 68 deletions
  1. 7 1
      raft/README.md
  2. 6 16
      raft/node.go
  3. 6 11
      raft/node_test.go
  4. 89 29
      raft/raft.go
  5. 71 11
      raft/raft_test.go
  6. 118 0
      raft/read_only.go

+ 7 - 1
raft/README.md

@@ -26,7 +26,13 @@ This raft implementation is a full feature implementation of Raft protocol. Feat
 - Log compaction 
 - Membership changes
 - Leadership transfer extension
-- Lease-based linearizable read-only queries served by both the leader and followers
+- Efficient linearizable read-only queries served by both the leader and followers
+ - leader checks with quorum and bypasses Raft log before processing read-only queries
+ - followers asks leader to get a safe read index before processing read-only queries
+- More efficient lease-based linearizable read-only queries served by both the leader and followers
+ - leader bypasses Raft log and processing read-only queries locally
+ - followers asks leader to get a safe read index before processing read-only queries
+ - this approach relies on the clock of the all the machines in raft group
 
 This raft implementation also includes a few optional enhancements:
 

+ 6 - 16
raft/node.go

@@ -60,11 +60,11 @@ type Ready struct {
 	// HardState will be equal to empty state if there is no update.
 	pb.HardState
 
-	// ReadState can be used for node to serve linearizable read requests locally
+	// ReadStates can be used for node to serve linearizable read requests locally
 	// when its applied index is greater than the index in ReadState.
 	// Note that the readState will be returned when raft receives msgReadIndex.
 	// The returned is only valid for the request that requested to read.
-	ReadState
+	ReadStates []ReadState
 
 	// Entries specifies entries to be saved to stable storage BEFORE
 	// Messages are sent.
@@ -102,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool {
 func (rd Ready) containsUpdates() bool {
 	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
 		!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
-		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None
+		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
 }
 
 // Node represents a node in a raft cluster.
@@ -151,11 +151,6 @@ type Node interface {
 	// Read state has a read index. Once the application advances further than the read
 	// index, any linearizable read requests issued before the read request can be
 	// processed safely. The read state will have the same rctx attached.
-	//
-	// Note: the current implementation depends on the leader lease. If the clock drift is unbounded,
-	// leader might keep the lease longer than it should (clock can move backward/pause without any bound).
-	// ReadIndex is not safe in that case.
-	// TODO: add clock drift bound into raft configuration.
 	ReadIndex(ctx context.Context, rctx []byte) error
 
 	// Status returns the current status of the raft state machine.
@@ -370,8 +365,7 @@ func (n *node) run(r *raft) {
 			}
 
 			r.msgs = nil
-			r.readState.Index = None
-			r.readState.RequestCtx = nil
+			r.readStates = nil
 			advancec = n.advancec
 		case <-advancec:
 			if prevHardSt.Commit != 0 {
@@ -516,12 +510,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if r.raftLog.unstable.snapshot != nil {
 		rd.Snapshot = *r.raftLog.unstable.snapshot
 	}
-	if r.readState.Index != None {
-		c := make([]byte, len(r.readState.RequestCtx))
-		copy(c, r.readState.RequestCtx)
-
-		rd.Index = r.readState.Index
-		rd.RequestCtx = c
+	if len(r.readStates) != 0 {
+		rd.ReadStates = r.readStates
 	}
 	return rd
 }

+ 6 - 11
raft/node_test.go

@@ -150,24 +150,19 @@ func TestNodeReadIndex(t *testing.T) {
 	appendStep := func(r *raft, m raftpb.Message) {
 		msgs = append(msgs, m)
 	}
-	wreadIndex := uint64(1)
-	wrequestCtx := []byte("somedata")
+	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
 
 	n := newNode()
 	s := NewMemoryStorage()
 	r := newTestRaft(1, []uint64{1}, 10, 1, s)
-	r.readState.Index = wreadIndex
-	r.readState.RequestCtx = wrequestCtx
+	r.readStates = wrs
+
 	go n.run(r)
 	n.Campaign(context.TODO())
 	for {
 		rd := <-n.Ready()
-		if rd.Index != wreadIndex {
-			t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex)
-		}
-
-		if !bytes.Equal(rd.RequestCtx, wrequestCtx) {
-			t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx)
+		if !reflect.DeepEqual(rd.ReadStates, wrs) {
+			t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
 		}
 
 		s.Append(rd.Entries)
@@ -180,7 +175,7 @@ func TestNodeReadIndex(t *testing.T) {
 	}
 
 	r.step = appendStep
-	wrequestCtx = []byte("somedata2")
+	wrequestCtx := []byte("somedata2")
 	n.ReadIndex(context.TODO(), wrequestCtx)
 	n.Stop()
 

+ 89 - 29
raft/raft.go

@@ -39,6 +39,20 @@ const (
 	StateLeader
 )
 
+type ReadOnlyOption int
+
+const (
+	// ReadOnlySafe guarantees the linearizability of the read only request by
+	// communicating with the quorum. It is the default and suggested option.
+	ReadOnlySafe ReadOnlyOption = iota
+	// ReadOnlyLeaseBased ensures linearizability of the read only request by
+	// relying on the leader lease. It can be affected by clock drift.
+	// If the clock drift is unbounded, leader might keep the lease longer than it
+	// should (clock can move backward/pause without any bound). ReadIndex is not safe
+	// in that case.
+	ReadOnlyLeaseBased
+)
+
 // Possible values for CampaignType
 const (
 	// campaignElection represents the type of normal election
@@ -135,6 +149,18 @@ type Config struct {
 	// steps down when quorum is not active for an electionTimeout.
 	CheckQuorum bool
 
+	// ReadOnlyOption specifies how the read only request is processed.
+	//
+	// ReadOnlySafe guarantees the linearizability of the read only request by
+	// communicating with the quorum. It is the default and suggested option.
+	//
+	// ReadOnlyLeaseBased ensures linearizability of the read only request by
+	// relying on the leader lease. It can be affected by clock drift.
+	// If the clock drift is unbounded, leader might keep the lease longer than it
+	// should (clock can move backward/pause without any bound). ReadIndex is not safe
+	// in that case.
+	ReadOnlyOption ReadOnlyOption
+
 	// Logger is the logger used for raft log. For multinode which can host
 	// multiple raft group, each raft group can have its own logger
 	Logger Logger
@@ -168,23 +194,13 @@ func (c *Config) validate() error {
 	return nil
 }
 
-// ReadState provides state for read only query.
-// It's caller's responsibility to send MsgReadIndex first before getting
-// this state from ready, It's also caller's duty to differentiate if this
-// state is what it requests through RequestCtx, eg. given a unique id as
-// RequestCtx
-type ReadState struct {
-	Index      uint64
-	RequestCtx []byte
-}
-
 type raft struct {
 	id uint64
 
 	Term uint64
 	Vote uint64
 
-	readState ReadState
+	readStates []ReadState
 
 	// the log
 	raftLog *raftLog
@@ -207,6 +223,8 @@ type raft struct {
 	// New configuration is ignored if there exists unapplied configuration.
 	pendingConf bool
 
+	readOnly *readOnly
+
 	// number of ticks since it reached last electionTimeout when it is leader
 	// or candidate.
 	// number of ticks since it reached last electionTimeout or received a
@@ -254,7 +272,6 @@ func newRaft(c *Config) *raft {
 	r := &raft{
 		id:               c.ID,
 		lead:             None,
-		readState:        ReadState{Index: None, RequestCtx: nil},
 		raftLog:          raftlog,
 		maxMsgSize:       c.MaxSizePerMsg,
 		maxInflight:      c.MaxInflightMsgs,
@@ -263,6 +280,7 @@ func newRaft(c *Config) *raft {
 		heartbeatTimeout: c.HeartbeatTick,
 		logger:           c.Logger,
 		checkQuorum:      c.CheckQuorum,
+		readOnly:         newReadOnly(c.ReadOnlyOption),
 	}
 	for _, p := range peers {
 		r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
@@ -380,7 +398,7 @@ func (r *raft) sendAppend(to uint64) {
 }
 
 // sendHeartbeat sends an empty MsgApp
-func (r *raft) sendHeartbeat(to uint64) {
+func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
 	// Attach the commit as min(to.matched, r.committed).
 	// When the leader sends out heartbeat message,
 	// the receiver(follower) might not be matched with the leader
@@ -389,10 +407,12 @@ func (r *raft) sendHeartbeat(to uint64) {
 	// an unmatched index.
 	commit := min(r.prs[to].Match, r.raftLog.committed)
 	m := pb.Message{
-		To:     to,
-		Type:   pb.MsgHeartbeat,
-		Commit: commit,
+		To:      to,
+		Type:    pb.MsgHeartbeat,
+		Commit:  commit,
+		Context: ctx,
 	}
+
 	r.send(m)
 }
 
@@ -409,11 +429,20 @@ func (r *raft) bcastAppend() {
 
 // bcastHeartbeat sends RPC, without entries to all the peers.
 func (r *raft) bcastHeartbeat() {
+	lastCtx := r.readOnly.lastPendingRequestCtx()
+	if len(lastCtx) == 0 {
+		r.bcastHeartbeatWithCtx(nil)
+	} else {
+		r.bcastHeartbeatWithCtx([]byte(lastCtx))
+	}
+}
+
+func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
 	for id := range r.prs {
 		if id == r.id {
 			continue
 		}
-		r.sendHeartbeat(id)
+		r.sendHeartbeat(id, ctx)
 		r.prs[id].resume()
 	}
 }
@@ -453,6 +482,7 @@ func (r *raft) reset(term uint64) {
 		}
 	}
 	r.pendingConf = false
+	r.readOnly = newReadOnly(r.readOnly.option)
 }
 
 func (r *raft) appendEntry(es ...pb.Entry) {
@@ -699,16 +729,29 @@ func stepLeader(r *raft, m pb.Message) {
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		return
 	case pb.MsgReadIndex:
-		ri := None
-		if r.checkQuorum {
-			ri = r.raftLog.committed
-		}
-		if m.From == None || m.From == r.id { // from local member
-			r.readState.Index = ri
-			r.readState.RequestCtx = m.Entries[0].Data
+		if r.quorum() > 1 {
+			// thinking: use an interally defined context instead of the user given context.
+			// We can express this in terms of the term and index instead of a user-supplied value.
+			// This would allow multiple reads to piggyback on the same message.
+			switch r.readOnly.option {
+			case ReadOnlySafe:
+				r.readOnly.addRequest(r.raftLog.committed, m)
+				r.bcastHeartbeatWithCtx(m.Entries[0].Data)
+			case ReadOnlyLeaseBased:
+				var ri uint64
+				if r.checkQuorum {
+					ri = r.raftLog.committed
+				}
+				if m.From == None || m.From == r.id { // from local member
+					r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+				} else {
+					r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+				}
+			}
 		} else {
-			r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
 		}
+
 		return
 	}
 
@@ -769,6 +812,25 @@ func stepLeader(r *raft, m pb.Message) {
 		if pr.Match < r.raftLog.lastIndex() {
 			r.sendAppend(m.From)
 		}
+
+		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
+			return
+		}
+
+		ackCount := r.readOnly.recvAck(m)
+		if ackCount < r.quorum() {
+			return
+		}
+
+		rss := r.readOnly.advance(m)
+		for _, rs := range rss {
+			req := rs.req
+			if req.From == None || req.From == r.id { // from local member
+				r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
+			} else {
+				r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
+			}
+		}
 	case pb.MsgSnapStatus:
 		if pr.State != ProgressStateSnapshot {
 			return
@@ -910,9 +972,7 @@ func stepFollower(r *raft, m pb.Message) {
 			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
 			return
 		}
-
-		r.readState.Index = m.Index
-		r.readState.RequestCtx = m.Entries[0].Data
+		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
 	}
 }
 
@@ -933,7 +993,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 
 func (r *raft) handleHeartbeat(m pb.Message) {
 	r.raftLog.commitTo(m.Commit)
-	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
+	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
 }
 
 func (r *raft) handleSnapshot(m pb.Message) {

+ 71 - 11
raft/raft_test.go

@@ -1411,11 +1411,65 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
 	}
 }
 
-func TestReadIndexWithCheckQuorum(t *testing.T) {
+func TestReadOnlyOptionSafe(t *testing.T) {
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 
+	nt := newNetwork(a, b, c)
+	setRandomizedElectionTimeout(b, b.electionTimeout+1)
+
+	for i := 0; i < b.electionTimeout; i++ {
+		b.tick()
+	}
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	if a.state != StateLeader {
+		t.Fatalf("state = %s, want %s", a.state, StateLeader)
+	}
+
+	tests := []struct {
+		sm        *raft
+		proposals int
+		wri       uint64
+		wctx      []byte
+	}{
+		{b, 10, 11, []byte("ctx1")},
+		{c, 10, 21, []byte("ctx2")},
+		{b, 10, 31, []byte("ctx3")},
+		{c, 10, 41, []byte("ctx4")},
+	}
+
+	for i, tt := range tests {
+		for j := 0; j < tt.proposals; j++ {
+			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+		}
+
+		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
+
+		r := tt.sm
+		if len(r.readStates) == 0 {
+			t.Errorf("#%d: len(readStates) = 0, want non-zero", i)
+		}
+		rs := r.readStates[0]
+		if rs.Index != tt.wri {
+			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
+		}
+
+		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
+			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
+		}
+		r.readStates = nil
+	}
+}
+
+func TestReadOnlyOptionLease(t *testing.T) {
+	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	a.readOnly.option = ReadOnlyLeaseBased
+	b.readOnly.option = ReadOnlyLeaseBased
+	c.readOnly.option = ReadOnlyLeaseBased
 	a.checkQuorum = true
 	b.checkQuorum = true
 	c.checkQuorum = true
@@ -1444,7 +1498,7 @@ func TestReadIndexWithCheckQuorum(t *testing.T) {
 		{c, 10, 41, []byte("ctx4")},
 	}
 
-	for _, tt := range tests {
+	for i, tt := range tests {
 		for j := 0; j < tt.proposals; j++ {
 			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
 		}
@@ -1452,20 +1506,25 @@ func TestReadIndexWithCheckQuorum(t *testing.T) {
 		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
 
 		r := tt.sm
-		if r.readState.Index != tt.wri {
-			t.Errorf("readIndex = %d, want %d", r.readState.Index, tt.wri)
+		rs := r.readStates[0]
+		if rs.Index != tt.wri {
+			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
 		}
 
-		if !bytes.Equal(r.readState.RequestCtx, tt.wctx) {
-			t.Errorf("requestCtx = %v, want %v", r.readState.RequestCtx, tt.wctx)
+		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
+			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
 		}
+		r.readStates = nil
 	}
 }
 
-func TestReadIndexWithoutCheckQuorum(t *testing.T) {
+func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
 	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	a.readOnly.option = ReadOnlyLeaseBased
+	b.readOnly.option = ReadOnlyLeaseBased
+	c.readOnly.option = ReadOnlyLeaseBased
 
 	nt := newNetwork(a, b, c)
 	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
@@ -1473,12 +1532,13 @@ func TestReadIndexWithoutCheckQuorum(t *testing.T) {
 	ctx := []byte("ctx1")
 	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
 
-	if b.readState.Index != None {
-		t.Errorf("readIndex = %d, want %d", b.readState.Index, None)
+	rs := b.readStates[0]
+	if rs.Index != None {
+		t.Errorf("readIndex = %d, want %d", rs.Index, None)
 	}
 
-	if !bytes.Equal(b.readState.RequestCtx, ctx) {
-		t.Errorf("requestCtx = %v, want %v", b.readState.RequestCtx, ctx)
+	if !bytes.Equal(rs.RequestCtx, ctx) {
+		t.Errorf("requestCtx = %v, want %v", rs.RequestCtx, ctx)
 	}
 }
 

+ 118 - 0
raft/read_only.go

@@ -0,0 +1,118 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import pb "github.com/coreos/etcd/raft/raftpb"
+
+// ReadState provides state for read only query.
+// It's caller's responsibility to call ReadIndex first before getting
+// this state from ready, It's also caller's duty to differentiate if this
+// state is what it requests through RequestCtx, eg. given a unique id as
+// RequestCtx
+type ReadState struct {
+	Index      uint64
+	RequestCtx []byte
+}
+
+type readIndexStatus struct {
+	req   pb.Message
+	index uint64
+	acks  map[uint64]struct{}
+}
+
+type readOnly struct {
+	option           ReadOnlyOption
+	pendingReadIndex map[string]*readIndexStatus
+	readIndexQueue   []string
+}
+
+func newReadOnly(option ReadOnlyOption) *readOnly {
+	return &readOnly{
+		option:           option,
+		pendingReadIndex: make(map[string]*readIndexStatus),
+	}
+}
+
+// addRequest adds a read only reuqest into readonly struct.
+// `index` is the commit index of the raft state machine when it received
+// the read only request.
+// `m` is the original read only request message from the local or remote node.
+func (ro *readOnly) addRequest(index uint64, m pb.Message) {
+	ctx := string(m.Entries[0].Data)
+	if _, ok := ro.pendingReadIndex[ctx]; ok {
+		return
+	}
+	ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
+	ro.readIndexQueue = append(ro.readIndexQueue, ctx)
+}
+
+// recvAck notifies the readonly struct that the raft state machine received
+// an acknowledgment of the heartbeat that attached with the read only request
+// context.
+func (ro *readOnly) recvAck(m pb.Message) int {
+	rs, ok := ro.pendingReadIndex[string(m.Context)]
+	if !ok {
+		return 0
+	}
+
+	rs.acks[m.From] = struct{}{}
+	// add one to include an ack from local node
+	return len(rs.acks) + 1
+}
+
+// advance advances the read only request queue kept by the readonly struct.
+// It dequeues the requests until it finds the read only request that has
+// the same context as the given `m`.
+func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
+	var (
+		i     int
+		found bool
+	)
+
+	ctx := string(m.Context)
+	rss := []*readIndexStatus{}
+
+	for _, okctx := range ro.readIndexQueue {
+		i++
+		rs, ok := ro.pendingReadIndex[okctx]
+		if !ok {
+			panic("cannot find corresponding read state from pending map")
+		}
+		rss = append(rss, rs)
+		if okctx == ctx {
+			found = true
+			break
+		}
+	}
+
+	if found {
+		ro.readIndexQueue = ro.readIndexQueue[i:]
+		for _, rs := range rss {
+			delete(ro.pendingReadIndex, string(rs.req.Context))
+		}
+		return rss
+	}
+
+	return nil
+}
+
+// lastPendingRequestCtx returns the context of the last pending read only
+// request in readonly struct.
+func (ro *readOnly) lastPendingRequestCtx() string {
+	if len(ro.readIndexQueue) == 0 {
+		return ""
+	}
+	return ro.readIndexQueue[len(ro.readIndexQueue)-1]
+}