raft_snap_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "testing"
  17. pb "go.etcd.io/etcd/v3/raft/raftpb"
  18. )
  19. var (
  20. testingSnap = pb.Snapshot{
  21. Metadata: pb.SnapshotMetadata{
  22. Index: 11, // magic number
  23. Term: 11, // magic number
  24. ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
  25. },
  26. }
  27. )
  28. func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
  29. storage := NewMemoryStorage()
  30. sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
  31. sm.restore(testingSnap)
  32. sm.becomeCandidate()
  33. sm.becomeLeader()
  34. // force set the next of node 2, so that
  35. // node 2 needs a snapshot
  36. sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
  37. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
  38. if sm.prs.nodes[2].PendingSnapshot != 11 {
  39. t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot)
  40. }
  41. }
  42. func TestPendingSnapshotPauseReplication(t *testing.T) {
  43. storage := NewMemoryStorage()
  44. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  45. sm.restore(testingSnap)
  46. sm.becomeCandidate()
  47. sm.becomeLeader()
  48. sm.prs.nodes[2].becomeSnapshot(11)
  49. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  50. msgs := sm.readMessages()
  51. if len(msgs) != 0 {
  52. t.Fatalf("len(msgs) = %d, want 0", len(msgs))
  53. }
  54. }
  55. func TestSnapshotFailure(t *testing.T) {
  56. storage := NewMemoryStorage()
  57. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  58. sm.restore(testingSnap)
  59. sm.becomeCandidate()
  60. sm.becomeLeader()
  61. sm.prs.nodes[2].Next = 1
  62. sm.prs.nodes[2].becomeSnapshot(11)
  63. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
  64. if sm.prs.nodes[2].PendingSnapshot != 0 {
  65. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
  66. }
  67. if sm.prs.nodes[2].Next != 1 {
  68. t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next)
  69. }
  70. if !sm.prs.nodes[2].Paused {
  71. t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
  72. }
  73. }
  74. func TestSnapshotSucceed(t *testing.T) {
  75. storage := NewMemoryStorage()
  76. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  77. sm.restore(testingSnap)
  78. sm.becomeCandidate()
  79. sm.becomeLeader()
  80. sm.prs.nodes[2].Next = 1
  81. sm.prs.nodes[2].becomeSnapshot(11)
  82. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
  83. if sm.prs.nodes[2].PendingSnapshot != 0 {
  84. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
  85. }
  86. if sm.prs.nodes[2].Next != 12 {
  87. t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next)
  88. }
  89. if !sm.prs.nodes[2].Paused {
  90. t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
  91. }
  92. }
  93. // TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
  94. // shot is sent to a follower at the most recent index (i.e. the snapshot index
  95. // is the leader's last index is the committed index). In that situation, a bug
  96. // in the past left the follower in probing status until the next log entry was
  97. // committed.
  98. func TestSnapshotSucceedViaAppResp(t *testing.T) {
  99. snap := pb.Snapshot{
  100. Metadata: pb.SnapshotMetadata{
  101. Index: 11, // magic number
  102. Term: 11, // magic number
  103. ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
  104. },
  105. }
  106. s1 := NewMemoryStorage()
  107. n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1)
  108. // Become follower because otherwise the way this test sets things up the
  109. // leadership term will be 1 (which is stale). We want it to match the snap-
  110. // shot term in this test.
  111. n1.becomeFollower(snap.Metadata.Term-1, 2)
  112. n1.becomeCandidate()
  113. n1.becomeLeader()
  114. // Apply a snapshot on the leader. This is a workaround against the fact that
  115. // the leader will always append an empty entry, but that empty entry works
  116. // against what we're trying to assert in this test, namely that a snapshot
  117. // at the latest committed index leaves the follower in probing state.
  118. // With the snapshot, the empty entry is fully committed.
  119. n1.restore(snap)
  120. noMessage := pb.MessageType(-1)
  121. mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
  122. t.Helper()
  123. for i, msg := range from.msgs {
  124. if msg.From != from.id || msg.To != to.id || msg.Type != typ {
  125. continue
  126. }
  127. t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
  128. if err := to.Step(msg); err != nil {
  129. t.Fatalf("%v: %s", msg, err)
  130. }
  131. from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
  132. return msg
  133. }
  134. if typ == noMessage {
  135. if len(from.msgs) == 0 {
  136. return pb.Message{}
  137. }
  138. t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
  139. }
  140. t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
  141. return pb.Message{} // unreachable
  142. }
  143. // Create the follower that will receive the snapshot.
  144. s2 := NewMemoryStorage()
  145. n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2)
  146. // Let the leader probe the follower.
  147. if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
  148. t.Fatalf("expected message to be sent")
  149. }
  150. if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
  151. // For this test to work, the leader must not have anything to append
  152. // to the follower right now.
  153. t.Fatalf("unexpectedly appending entries %v", msg.Entries)
  154. }
  155. // Follower rejects the append (because it doesn't have any log entries)
  156. if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
  157. t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
  158. }
  159. expIdx := snap.Metadata.Index
  160. // Leader sends snapshot due to RejectHint of zero (the storage we use here
  161. // has index zero compacted).
  162. if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
  163. t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
  164. }
  165. // n2 reacts to snapshot with MsgAppResp.
  166. if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
  167. t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
  168. }
  169. // Leader sends MsgApp to communicate commit index.
  170. if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
  171. t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
  172. }
  173. // Follower responds.
  174. mustSend(n2, n1, pb.MsgAppResp)
  175. // Leader has correct state for follower.
  176. pr := n1.prs.nodes[2]
  177. if pr.State != ProgressStateReplicate {
  178. t.Fatalf("unexpected state %v", pr)
  179. }
  180. if pr.Match != expIdx || pr.Next != expIdx+1 {
  181. t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
  182. }
  183. // Leader and follower are done.
  184. mustSend(n1, n2, noMessage)
  185. mustSend(n2, n1, noMessage)
  186. }
  187. func TestSnapshotAbort(t *testing.T) {
  188. storage := NewMemoryStorage()
  189. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  190. sm.restore(testingSnap)
  191. sm.becomeCandidate()
  192. sm.becomeLeader()
  193. sm.prs.nodes[2].Next = 1
  194. sm.prs.nodes[2].becomeSnapshot(11)
  195. // A successful msgAppResp that has a higher/equal index than the
  196. // pending snapshot should abort the pending snapshot.
  197. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
  198. if sm.prs.nodes[2].PendingSnapshot != 0 {
  199. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
  200. }
  201. // The follower entered ProgressStateReplicate and the leader send an append
  202. // and optimistically updated the progress (so we see 13 instead of 12).
  203. // There is something to append because the leader appended an empty entry
  204. // to the log at index 12 when it assumed leadership.
  205. if sm.prs.nodes[2].Next != 13 {
  206. t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next)
  207. }
  208. if n := sm.prs.nodes[2].ins.count; n != 1 {
  209. t.Fatalf("expected an inflight message, got %d", n)
  210. }
  211. }