raft_snap_test.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "testing"
  17. pb "go.etcd.io/etcd/raft/raftpb"
  18. "go.etcd.io/etcd/raft/tracker"
  19. )
  20. var (
  21. testingSnap = pb.Snapshot{
  22. Metadata: pb.SnapshotMetadata{
  23. Index: 11, // magic number
  24. Term: 11, // magic number
  25. ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  26. },
  27. }
  28. )
  29. func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
  30. storage := NewMemoryStorage()
  31. sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
  32. sm.restore(testingSnap)
  33. sm.becomeCandidate()
  34. sm.becomeLeader()
  35. // force set the next of node 2, so that
  36. // node 2 needs a snapshot
  37. sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
  38. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
  39. if sm.prs.Progress[2].PendingSnapshot != 11 {
  40. t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.Progress[2].PendingSnapshot)
  41. }
  42. }
  43. func TestPendingSnapshotPauseReplication(t *testing.T) {
  44. storage := NewMemoryStorage()
  45. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  46. sm.restore(testingSnap)
  47. sm.becomeCandidate()
  48. sm.becomeLeader()
  49. sm.prs.Progress[2].BecomeSnapshot(11)
  50. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  51. msgs := sm.readMessages()
  52. if len(msgs) != 0 {
  53. t.Fatalf("len(msgs) = %d, want 0", len(msgs))
  54. }
  55. }
  56. func TestSnapshotFailure(t *testing.T) {
  57. storage := NewMemoryStorage()
  58. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  59. sm.restore(testingSnap)
  60. sm.becomeCandidate()
  61. sm.becomeLeader()
  62. sm.prs.Progress[2].Next = 1
  63. sm.prs.Progress[2].BecomeSnapshot(11)
  64. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
  65. if sm.prs.Progress[2].PendingSnapshot != 0 {
  66. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  67. }
  68. if sm.prs.Progress[2].Next != 1 {
  69. t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
  70. }
  71. if !sm.prs.Progress[2].ProbeSent {
  72. t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
  73. }
  74. }
  75. func TestSnapshotSucceed(t *testing.T) {
  76. storage := NewMemoryStorage()
  77. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  78. sm.restore(testingSnap)
  79. sm.becomeCandidate()
  80. sm.becomeLeader()
  81. sm.prs.Progress[2].Next = 1
  82. sm.prs.Progress[2].BecomeSnapshot(11)
  83. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
  84. if sm.prs.Progress[2].PendingSnapshot != 0 {
  85. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  86. }
  87. if sm.prs.Progress[2].Next != 12 {
  88. t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
  89. }
  90. if !sm.prs.Progress[2].ProbeSent {
  91. t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
  92. }
  93. }
  94. // TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
  95. // shot is sent to a follower at the most recent index (i.e. the snapshot index
  96. // is the leader's last index is the committed index). In that situation, a bug
  97. // in the past left the follower in probing status until the next log entry was
  98. // committed.
  99. func TestSnapshotSucceedViaAppResp(t *testing.T) {
  100. s1 := NewMemoryStorage()
  101. // Create a single-node leader.
  102. n1 := newTestRaft(1, []uint64{1}, 10, 1, s1)
  103. n1.becomeCandidate()
  104. n1.becomeLeader()
  105. // We need to add a second empty entry so that we can truncate the first
  106. // one away.
  107. n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  108. rd := newReady(n1, &SoftState{}, pb.HardState{})
  109. s1.Append(rd.Entries)
  110. s1.SetHardState(rd.HardState)
  111. if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp {
  112. t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1)
  113. }
  114. // Force a log truncation.
  115. if err := s1.Compact(1); err != nil {
  116. t.Fatal(err)
  117. }
  118. // Add a follower to the group. Do this in a clandestine way for simplicity.
  119. // Also set up a snapshot that will be sent to the follower.
  120. n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
  121. s1.snapshot = pb.Snapshot{
  122. Metadata: pb.SnapshotMetadata{
  123. ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  124. Index: s1.lastIndex(),
  125. Term: s1.ents[len(s1.ents)-1].Term,
  126. },
  127. }
  128. noMessage := pb.MessageType(-1)
  129. mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
  130. t.Helper()
  131. for i, msg := range from.msgs {
  132. if msg.From != from.id || msg.To != to.id || msg.Type != typ {
  133. continue
  134. }
  135. t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
  136. if len(msg.Entries) > 0 {
  137. t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) }))
  138. }
  139. if err := to.Step(msg); err != nil {
  140. t.Fatalf("%v: %s", msg, err)
  141. }
  142. from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
  143. return msg
  144. }
  145. if typ == noMessage {
  146. if len(from.msgs) == 0 {
  147. return pb.Message{}
  148. }
  149. t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
  150. }
  151. t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
  152. return pb.Message{} // unreachable
  153. }
  154. // Create the follower that will receive the snapshot.
  155. s2 := NewMemoryStorage()
  156. n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2)
  157. // Let the leader probe the follower.
  158. if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
  159. t.Fatalf("expected message to be sent")
  160. }
  161. if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
  162. // For this test to work, the leader must not have anything to append
  163. // to the follower right now.
  164. t.Fatalf("unexpectedly appending entries %v", msg.Entries)
  165. }
  166. // Follower rejects the append (because it doesn't have any log entries)
  167. if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
  168. t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
  169. }
  170. const expIdx = 2
  171. // Leader sends snapshot due to RejectHint of zero (we set up the raft log
  172. // to start at index 2).
  173. if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
  174. t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
  175. }
  176. // n2 reacts to snapshot with MsgAppResp.
  177. if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
  178. t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
  179. }
  180. // Leader sends MsgApp to communicate commit index.
  181. if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
  182. t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
  183. }
  184. // Follower responds.
  185. mustSend(n2, n1, pb.MsgAppResp)
  186. // Leader has correct state for follower.
  187. pr := n1.prs.Progress[2]
  188. if pr.State != tracker.StateReplicate {
  189. t.Fatalf("unexpected state %v", pr)
  190. }
  191. if pr.Match != expIdx || pr.Next != expIdx+1 {
  192. t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
  193. }
  194. // Leader and follower are done.
  195. mustSend(n1, n2, noMessage)
  196. mustSend(n2, n1, noMessage)
  197. }
  198. func TestSnapshotAbort(t *testing.T) {
  199. storage := NewMemoryStorage()
  200. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  201. sm.restore(testingSnap)
  202. sm.becomeCandidate()
  203. sm.becomeLeader()
  204. sm.prs.Progress[2].Next = 1
  205. sm.prs.Progress[2].BecomeSnapshot(11)
  206. // A successful msgAppResp that has a higher/equal index than the
  207. // pending snapshot should abort the pending snapshot.
  208. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
  209. if sm.prs.Progress[2].PendingSnapshot != 0 {
  210. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  211. }
  212. // The follower entered StateReplicate and the leader send an append
  213. // and optimistically updated the progress (so we see 13 instead of 12).
  214. // There is something to append because the leader appended an empty entry
  215. // to the log at index 12 when it assumed leadership.
  216. if sm.prs.Progress[2].Next != 13 {
  217. t.Fatalf("Next = %d, want 13", sm.prs.Progress[2].Next)
  218. }
  219. if n := sm.prs.Progress[2].Inflights.Count(); n != 1 {
  220. t.Fatalf("expected an inflight message, got %d", n)
  221. }
  222. }