snapshot_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package raft
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/stretchr/testify/assert"
  6. "github.com/stretchr/testify/mock"
  7. )
  8. // Ensure that a snapshot occurs when there are existing logs.
  9. func TestSnapshot(t *testing.T) {
  10. runServerWithMockStateMachine(Leader, func(s Server, m *mock.Mock) {
  11. m.On("Save").Return([]byte("foo"), nil)
  12. m.On("Recovery", []byte("foo")).Return(nil)
  13. s.Do(&testCommand1{})
  14. err := s.TakeSnapshot()
  15. assert.NoError(t, err)
  16. assert.Equal(t, s.(*server).snapshot.LastIndex, uint64(2))
  17. // Repeat to make sure new snapshot gets created.
  18. s.Do(&testCommand1{})
  19. err = s.TakeSnapshot()
  20. assert.NoError(t, err)
  21. assert.Equal(t, s.(*server).snapshot.LastIndex, uint64(4))
  22. // Restart server.
  23. s.Stop()
  24. // Recover from snapshot.
  25. err = s.LoadSnapshot()
  26. assert.NoError(t, err)
  27. s.Start()
  28. })
  29. }
  30. // Ensure that a new server can recover from previous snapshot with log
  31. func TestSnapshotRecovery(t *testing.T) {
  32. runServerWithMockStateMachine(Leader, func(s Server, m *mock.Mock) {
  33. m.On("Save").Return([]byte("foo"), nil)
  34. m.On("Recovery", []byte("foo")).Return(nil)
  35. s.Do(&testCommand1{})
  36. err := s.TakeSnapshot()
  37. assert.NoError(t, err)
  38. assert.Equal(t, s.(*server).snapshot.LastIndex, uint64(2))
  39. // Repeat to make sure new snapshot gets created.
  40. s.Do(&testCommand1{})
  41. // Stop the old server
  42. s.Stop()
  43. // create a new server with previous log and snapshot
  44. newS, err := NewServer("1", s.Path(), &testTransporter{}, s.StateMachine(), nil, "")
  45. // Recover from snapshot.
  46. err = newS.LoadSnapshot()
  47. assert.NoError(t, err)
  48. newS.Start()
  49. defer newS.Stop()
  50. // wait for it to become leader
  51. time.Sleep(time.Second)
  52. // ensure server load the previous log
  53. assert.Equal(t, len(newS.LogEntries()), 3, "")
  54. })
  55. }
  56. // Ensure that a snapshot request can be sent and received.
  57. func TestSnapshotRequest(t *testing.T) {
  58. runServerWithMockStateMachine(Follower, func(s Server, m *mock.Mock) {
  59. m.On("Recovery", []byte("bar")).Return(nil)
  60. // Send snapshot request.
  61. resp := s.RequestSnapshot(&SnapshotRequest{LastIndex: 5, LastTerm: 1})
  62. assert.Equal(t, resp.Success, true)
  63. assert.Equal(t, s.State(), Snapshotting)
  64. // Send recovery request.
  65. resp2 := s.SnapshotRecoveryRequest(&SnapshotRecoveryRequest{
  66. LeaderName: "1",
  67. LastIndex: 5,
  68. LastTerm: 2,
  69. Peers: make([]*Peer, 0),
  70. State: []byte("bar"),
  71. })
  72. assert.Equal(t, resp2.Success, true)
  73. })
  74. }
  75. func runServerWithMockStateMachine(state string, fn func(s Server, m *mock.Mock)) {
  76. var m mockStateMachine
  77. s := newTestServer("1", &testTransporter{})
  78. s.(*server).stateMachine = &m
  79. if err := s.Start(); err != nil {
  80. panic("server start error: " + err.Error())
  81. }
  82. if state == Leader {
  83. if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
  84. panic("unable to join server to self: " + err.Error())
  85. }
  86. }
  87. defer s.Stop()
  88. fn(s, &m.Mock)
  89. }