snapshot_store_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "io"
  17. "reflect"
  18. "sync"
  19. "testing"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  21. "github.com/coreos/etcd/pkg/testutil"
  22. "github.com/coreos/etcd/raft"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. dstorage "github.com/coreos/etcd/storage"
  25. "github.com/coreos/etcd/storage/storagepb"
  26. )
  27. func TestSnapshotStoreCreateSnap(t *testing.T) {
  28. snap := raftpb.Snapshot{
  29. Metadata: raftpb.SnapshotMetadata{Index: 1},
  30. }
  31. ss := newSnapshotStore("", &nopKV{})
  32. fakeClock := clockwork.NewFakeClock()
  33. ss.clock = fakeClock
  34. go func() {
  35. <-ss.reqsnapc
  36. ss.raftsnapc <- snap
  37. }()
  38. // create snapshot
  39. ss.createSnap()
  40. if !reflect.DeepEqual(ss.snap.raft(), snap) {
  41. t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap)
  42. }
  43. // unused snapshot is cleared after clearUnusedSnapshotInterval
  44. fakeClock.BlockUntil(1)
  45. fakeClock.Advance(clearUnusedSnapshotInterval)
  46. testutil.WaitSchedule()
  47. ss.mu.Lock()
  48. if ss.snap != nil {
  49. t.Errorf("snap = %+v, want %+v", ss.snap, nil)
  50. }
  51. ss.mu.Unlock()
  52. }
  53. func TestSnapshotStoreGetSnap(t *testing.T) {
  54. snap := raftpb.Snapshot{
  55. Metadata: raftpb.SnapshotMetadata{Index: 1},
  56. }
  57. ss := newSnapshotStore("", &nopKV{})
  58. fakeClock := clockwork.NewFakeClock()
  59. ss.clock = fakeClock
  60. ss.tr = &nopTransporter{}
  61. go func() {
  62. <-ss.reqsnapc
  63. ss.raftsnapc <- snap
  64. }()
  65. // get snap when no snapshot stored
  66. _, err := ss.getSnap()
  67. if err != raft.ErrSnapshotTemporarilyUnavailable {
  68. t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
  69. }
  70. // wait for asynchronous snapshot creation to finish
  71. testutil.WaitSchedule()
  72. // get the created snapshot
  73. s, err := ss.getSnap()
  74. if err != nil {
  75. t.Fatalf("getSnap error = %v, want nil", err)
  76. }
  77. if !reflect.DeepEqual(s.raft(), snap) {
  78. t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap)
  79. }
  80. if !ss.inUse {
  81. t.Errorf("inUse = %v, want true", ss.inUse)
  82. }
  83. // get snap when snapshot stored has been in use
  84. _, err = ss.getSnap()
  85. if err != raft.ErrSnapshotTemporarilyUnavailable {
  86. t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
  87. }
  88. // clean up
  89. fakeClock.Advance(clearUnusedSnapshotInterval)
  90. }
  91. func TestSnapshotStoreClearUsedSnap(t *testing.T) {
  92. s := &fakeSnapshot{}
  93. var once sync.Once
  94. once.Do(func() {})
  95. ss := &snapshotStore{
  96. snap: newSnapshot(raftpb.Snapshot{}, s),
  97. inUse: true,
  98. createOnce: once,
  99. }
  100. ss.clearUsedSnap()
  101. // wait for underlying KV snapshot closed
  102. testutil.WaitSchedule()
  103. s.mu.Lock()
  104. if !s.closed {
  105. t.Errorf("snapshot closed = %v, want true", s.closed)
  106. }
  107. s.mu.Unlock()
  108. if ss.snap != nil {
  109. t.Errorf("snapshot = %v, want nil", ss.snap)
  110. }
  111. if ss.inUse {
  112. t.Errorf("isUse = %v, want false", ss.inUse)
  113. }
  114. // test createOnce is reset
  115. if ss.createOnce == once {
  116. t.Errorf("createOnce fails to reset")
  117. }
  118. }
  119. func TestSnapshotStoreCloseSnapBefore(t *testing.T) {
  120. snapIndex := uint64(5)
  121. tests := []struct {
  122. index uint64
  123. wok bool
  124. }{
  125. {snapIndex - 2, false},
  126. {snapIndex - 1, false},
  127. {snapIndex, true},
  128. }
  129. for i, tt := range tests {
  130. rs := raftpb.Snapshot{
  131. Metadata: raftpb.SnapshotMetadata{Index: 5},
  132. }
  133. s := &fakeSnapshot{}
  134. ss := &snapshotStore{
  135. snap: newSnapshot(rs, s),
  136. }
  137. ok := ss.closeSnapBefore(tt.index)
  138. if ok != tt.wok {
  139. t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok)
  140. }
  141. if ok {
  142. // wait for underlying KV snapshot closed
  143. testutil.WaitSchedule()
  144. s.mu.Lock()
  145. if !s.closed {
  146. t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed)
  147. }
  148. s.mu.Unlock()
  149. }
  150. }
  151. }
  152. type nopKV struct{}
  153. func (kv *nopKV) Rev() int64 { return 0 }
  154. func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  155. return nil, 0, nil
  156. }
  157. func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 }
  158. func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
  159. func (kv *nopKV) TxnBegin() int64 { return 0 }
  160. func (kv *nopKV) TxnEnd(txnID int64) error { return nil }
  161. func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
  162. return nil, 0, nil
  163. }
  164. func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil }
  165. func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
  166. return 0, 0, nil
  167. }
  168. func (kv *nopKV) Compact(rev int64) error { return nil }
  169. func (kv *nopKV) Hash() (uint32, error) { return 0, nil }
  170. func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
  171. func (kv *nopKV) Restore() error { return nil }
  172. func (kv *nopKV) Close() error { return nil }
  173. type fakeSnapshot struct {
  174. mu sync.Mutex
  175. closed bool
  176. }
  177. func (s *fakeSnapshot) Size() int64 { return 0 }
  178. func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil }
  179. func (s *fakeSnapshot) Close() error {
  180. s.mu.Lock()
  181. s.closed = true
  182. s.mu.Unlock()
  183. return nil
  184. }