raft_snap_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "testing"
  17. pb "go.etcd.io/etcd/raft/raftpb"
  18. )
  19. var (
  20. testingSnap = pb.Snapshot{
  21. Metadata: pb.SnapshotMetadata{
  22. Index: 11, // magic number
  23. Term: 11, // magic number
  24. ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  25. },
  26. }
  27. )
  28. func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
  29. storage := NewMemoryStorage()
  30. sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
  31. sm.restore(testingSnap)
  32. sm.becomeCandidate()
  33. sm.becomeLeader()
  34. // force set the next of node 2, so that
  35. // node 2 needs a snapshot
  36. sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
  37. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
  38. if sm.prs.Progress[2].PendingSnapshot != 11 {
  39. t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.Progress[2].PendingSnapshot)
  40. }
  41. }
  42. func TestPendingSnapshotPauseReplication(t *testing.T) {
  43. storage := NewMemoryStorage()
  44. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  45. sm.restore(testingSnap)
  46. sm.becomeCandidate()
  47. sm.becomeLeader()
  48. sm.prs.Progress[2].BecomeSnapshot(11)
  49. sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  50. msgs := sm.readMessages()
  51. if len(msgs) != 0 {
  52. t.Fatalf("len(msgs) = %d, want 0", len(msgs))
  53. }
  54. }
  55. func TestSnapshotFailure(t *testing.T) {
  56. storage := NewMemoryStorage()
  57. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  58. sm.restore(testingSnap)
  59. sm.becomeCandidate()
  60. sm.becomeLeader()
  61. sm.prs.Progress[2].Next = 1
  62. sm.prs.Progress[2].BecomeSnapshot(11)
  63. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
  64. if sm.prs.Progress[2].PendingSnapshot != 0 {
  65. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  66. }
  67. if sm.prs.Progress[2].Next != 1 {
  68. t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
  69. }
  70. if !sm.prs.Progress[2].ProbeSent {
  71. t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
  72. }
  73. }
  74. func TestSnapshotSucceed(t *testing.T) {
  75. storage := NewMemoryStorage()
  76. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  77. sm.restore(testingSnap)
  78. sm.becomeCandidate()
  79. sm.becomeLeader()
  80. sm.prs.Progress[2].Next = 1
  81. sm.prs.Progress[2].BecomeSnapshot(11)
  82. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
  83. if sm.prs.Progress[2].PendingSnapshot != 0 {
  84. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  85. }
  86. if sm.prs.Progress[2].Next != 12 {
  87. t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
  88. }
  89. if !sm.prs.Progress[2].ProbeSent {
  90. t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
  91. }
  92. }
  93. func TestSnapshotAbort(t *testing.T) {
  94. storage := NewMemoryStorage()
  95. sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
  96. sm.restore(testingSnap)
  97. sm.becomeCandidate()
  98. sm.becomeLeader()
  99. sm.prs.Progress[2].Next = 1
  100. sm.prs.Progress[2].BecomeSnapshot(11)
  101. // A successful msgAppResp that has a higher/equal index than the
  102. // pending snapshot should abort the pending snapshot.
  103. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
  104. if sm.prs.Progress[2].PendingSnapshot != 0 {
  105. t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
  106. }
  107. // The follower entered StateReplicate and the leader send an append
  108. // and optimistically updated the progress (so we see 13 instead of 12).
  109. // There is something to append because the leader appended an empty entry
  110. // to the log at index 12 when it assumed leadership.
  111. if sm.prs.Progress[2].Next != 13 {
  112. t.Fatalf("Next = %d, want 13", sm.prs.Progress[2].Next)
  113. }
  114. if n := sm.prs.Progress[2].Inflights.Count(); n != 1 {
  115. t.Fatalf("expected an inflight message, got %d", n)
  116. }
  117. }