Browse Source

Merge pull request #10920 from tbg/rawnode-ready

raft: require app to consume result from Ready()
Tobias Grieger 6 years ago
parent
commit
d137fa9d4a
3 changed files with 47 additions and 25 deletions
  1. 2 2
      raft/node.go
  2. 11 23
      raft/rawnode.go
  3. 34 0
      raft/rawnode_test.go

+ 2 - 2
raft/node.go

@@ -316,7 +316,7 @@ func (n *node) run(rn *RawNode) {
 			// handled first, but it's generally good to emit larger Readys plus
 			// it simplifies testing (by emitting less frequently and more
 			// predictably).
-			rd = rn.Ready()
+			rd = rn.readyWithoutAccept()
 			readyc = n.readyc
 		}
 
@@ -387,7 +387,7 @@ func (n *node) run(rn *RawNode) {
 			rn.acceptReady(rd)
 			advancec = n.advancec
 		case <-advancec:
-			rn.commitReady(rd)
+			rn.Advance(rd)
 			rd = Ready{}
 			advancec = nil
 		case c := <-n.status:

+ 11 - 23
raft/rawnode.go

@@ -121,18 +121,17 @@ func (rn *RawNode) Step(m pb.Message) error {
 
 // Ready returns the outstanding work that the application needs to handle. This
 // includes appending and applying entries or a snapshot, updating the HardState,
-// and sending messages. Ready() is a read-only operation, that is, it does not
-// require the caller to actually handle the result. Typically, a caller will
-// want to handle the Ready and must pass the Ready to Advance *after* having
-// done so. While a Ready is being handled, the RawNode must not be used for
-// operations that may alter its state. For example, it is illegal to call
-// Ready, followed by Step, followed by Advance.
+// and sending messages. The returned Ready() *must* be handled and subsequently
+// passed back via Advance().
 func (rn *RawNode) Ready() Ready {
-	rd := rn.newReady()
+	rd := rn.readyWithoutAccept()
+	rn.acceptReady(rd)
 	return rd
 }
 
-func (rn *RawNode) newReady() Ready {
+// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
+// is no obligation that the Ready must be handled.
+func (rn *RawNode) readyWithoutAccept() Ready {
 	return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
 }
 
@@ -149,15 +148,6 @@ func (rn *RawNode) acceptReady(rd Ready) {
 	rn.raft.msgs = nil
 }
 
-// commitReady is called when the consumer of the RawNode has successfully
-// handled a Ready (having previously called acceptReady).
-func (rn *RawNode) commitReady(rd Ready) {
-	if !IsEmptyHardState(rd.HardState) {
-		rn.prevHardSt = rd.HardState
-	}
-	rn.raft.advance(rd)
-}
-
 // HasReady called when RawNode user need to check if any Ready pending.
 // Checking logic in this method should be consistent with Ready.containsUpdates().
 func (rn *RawNode) HasReady() bool {
@@ -183,12 +173,10 @@ func (rn *RawNode) HasReady() bool {
 // Advance notifies the RawNode that the application has applied and saved progress in the
 // last Ready results.
 func (rn *RawNode) Advance(rd Ready) {
-	// Advance combines accept and commit. Callers can't mutate the RawNode
-	// between the call to Ready and the matching call to Advance, or the work
-	// done in acceptReady will clobber potentially newer data that has not been
-	// emitted in a Ready yet.
-	rn.acceptReady(rd)
-	rn.commitReady(rd)
+	if !IsEmptyHardState(rd.HardState) {
+		rn.prevHardSt = rd.HardState
+	}
+	rn.raft.advance(rd)
 }
 
 // Status returns the current status of the given group. This allocates, see

+ 34 - 0
raft/rawnode_test.go

@@ -924,3 +924,37 @@ func BenchmarkStatus(b *testing.B) {
 		})
 	}
 }
+
+func TestRawNodeConsumeReady(t *testing.T) {
+	// Check that readyWithoutAccept() does not call acceptReady (which resets
+	// the messages) but Ready() does.
+	s := NewMemoryStorage()
+	rn := newTestRawNode(1, []uint64{1}, 3, 1, s)
+	m1 := pb.Message{Context: []byte("foo")}
+	m2 := pb.Message{Context: []byte("bar")}
+
+	// Inject first message, make sure it's visible via readyWithoutAccept.
+	rn.raft.msgs = append(rn.raft.msgs, m1)
+	rd := rn.readyWithoutAccept()
+	if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
+		t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
+	}
+	if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
+		t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
+	}
+	// Now call Ready() which should move the message into the Ready (as opposed
+	// to leaving it in both places).
+	rd = rn.Ready()
+	if len(rn.raft.msgs) > 0 {
+		t.Fatalf("messages not reset: %+v", rn.raft.msgs)
+	}
+	if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
+		t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
+	}
+	// Add a message to raft to make sure that Advance() doesn't drop it.
+	rn.raft.msgs = append(rn.raft.msgs, m2)
+	rn.Advance(rd)
+	if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
+		t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
+	}
+}