ctl_v3_snapshot_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "os"
  21. "path/filepath"
  22. "strings"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/pkg/expect"
  26. "github.com/coreos/etcd/pkg/testutil"
  27. )
  28. func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
  29. func snapshotTest(cx ctlCtx) {
  30. var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
  31. for i := range kvs {
  32. if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
  33. cx.t.Fatal(err)
  34. }
  35. }
  36. fpath := "test.snapshot"
  37. defer os.RemoveAll(fpath)
  38. if err := ctlV3SnapshotSave(cx, fpath); err != nil {
  39. cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
  40. }
  41. st, err := getSnapshotStatus(cx, fpath)
  42. if err != nil {
  43. cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
  44. }
  45. if st.Revision != 4 {
  46. cx.t.Fatalf("expected 4, got %d", st.Revision)
  47. }
  48. if st.TotalKey < 3 {
  49. cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
  50. }
  51. }
  52. func TestCtlV3SnapshotCorrupt(t *testing.T) { testCtl(t, snapshotCorruptTest) }
  53. func snapshotCorruptTest(cx ctlCtx) {
  54. fpath := "test.snapshot"
  55. defer os.RemoveAll(fpath)
  56. if err := ctlV3SnapshotSave(cx, fpath); err != nil {
  57. cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
  58. }
  59. // corrupt file
  60. f, oerr := os.OpenFile(fpath, os.O_WRONLY, 0)
  61. if oerr != nil {
  62. cx.t.Fatal(oerr)
  63. }
  64. if _, err := f.Write(make([]byte, 512)); err != nil {
  65. cx.t.Fatal(err)
  66. }
  67. f.Close()
  68. defer os.RemoveAll("snap.etcd")
  69. serr := spawnWithExpect(
  70. append(cx.PrefixArgs(), "snapshot", "restore",
  71. "--data-dir", "snap.etcd",
  72. fpath),
  73. "expected sha256")
  74. if serr != nil {
  75. cx.t.Fatal(serr)
  76. }
  77. }
  78. func ctlV3SnapshotSave(cx ctlCtx, fpath string) error {
  79. cmdArgs := append(cx.PrefixArgs(), "snapshot", "save", fpath)
  80. return spawnWithExpect(cmdArgs, fmt.Sprintf("Snapshot saved at %s", fpath))
  81. }
  82. type snapshotStatus struct {
  83. Hash uint32 `json:"hash"`
  84. Revision int64 `json:"revision"`
  85. TotalKey int `json:"totalKey"`
  86. TotalSize int64 `json:"totalSize"`
  87. }
  88. func getSnapshotStatus(cx ctlCtx, fpath string) (snapshotStatus, error) {
  89. cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "snapshot", "status", fpath)
  90. proc, err := spawnCmd(cmdArgs)
  91. if err != nil {
  92. return snapshotStatus{}, err
  93. }
  94. var txt string
  95. txt, err = proc.Expect("totalKey")
  96. if err != nil {
  97. return snapshotStatus{}, err
  98. }
  99. if err = proc.Close(); err != nil {
  100. return snapshotStatus{}, err
  101. }
  102. resp := snapshotStatus{}
  103. dec := json.NewDecoder(strings.NewReader(txt))
  104. if err := dec.Decode(&resp); err == io.EOF {
  105. return snapshotStatus{}, err
  106. }
  107. return resp, nil
  108. }
  109. // TestIssue6361 ensures new member that starts with snapshot correctly
  110. // syncs up with other members and serve correct data.
  111. func TestIssue6361(t *testing.T) {
  112. defer testutil.AfterTest(t)
  113. mustEtcdctl(t)
  114. os.Setenv("ETCDCTL_API", "3")
  115. defer os.Unsetenv("ETCDCTL_API")
  116. epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
  117. clusterSize: 1,
  118. initialToken: "new",
  119. keepDataDir: true,
  120. })
  121. if err != nil {
  122. t.Fatalf("could not start etcd process cluster (%v)", err)
  123. }
  124. defer func() {
  125. if errC := epc.Close(); errC != nil {
  126. t.Fatalf("error closing etcd processes (%v)", errC)
  127. }
  128. }()
  129. dialTimeout := 7 * time.Second
  130. prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()}
  131. // write some keys
  132. kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
  133. for i := range kvs {
  134. if err = spawnWithExpect(append(prefixArgs, "put", kvs[i].key, kvs[i].val), "OK"); err != nil {
  135. t.Fatal(err)
  136. }
  137. }
  138. fpath := filepath.Join(os.TempDir(), "test.snapshot")
  139. defer os.RemoveAll(fpath)
  140. // etcdctl save snapshot
  141. if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil {
  142. t.Fatal(err)
  143. }
  144. if err = epc.processes()[0].Stop(); err != nil {
  145. t.Fatal(err)
  146. }
  147. newDataDir := filepath.Join(os.TempDir(), "test.data")
  148. defer os.RemoveAll(newDataDir)
  149. // etcdctl restore the snapshot
  150. err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member")
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. // start the etcd member using the restored snapshot
  155. epc.procs[0].cfg.dataDirPath = newDataDir
  156. for i := range epc.procs[0].cfg.args {
  157. if epc.procs[0].cfg.args[i] == "--data-dir" {
  158. epc.procs[0].cfg.args[i+1] = newDataDir
  159. }
  160. }
  161. if err = epc.processes()[0].Restart(); err != nil {
  162. t.Fatal(err)
  163. }
  164. // ensure the restored member has the correct data
  165. for i := range kvs {
  166. if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
  167. t.Fatal(err)
  168. }
  169. }
  170. // add a new member into the cluster
  171. clientURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+30)
  172. peerURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+31)
  173. err = spawnWithExpect(append(prefixArgs, "member", "add", "newmember", fmt.Sprintf("--peer-urls=%s", peerURL)), " added to cluster ")
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. var newDataDir2 string
  178. newDataDir2, err = ioutil.TempDir("", "newdata2")
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. defer os.RemoveAll(newDataDir2)
  183. name2 := "infra2"
  184. initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
  185. // start the new member
  186. var nepc *expect.ExpectProcess
  187. nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2,
  188. "--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
  189. "--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
  190. "--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2})
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. if _, err = nepc.Expect("enabled capabilities for version"); err != nil {
  195. t.Fatal(err)
  196. }
  197. prefixArgs = []string{ctlBinPath, "--endpoints", clientURL, "--dial-timeout", dialTimeout.String()}
  198. // ensure added member has data from incoming snapshot
  199. for i := range kvs {
  200. if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
  201. t.Fatal(err)
  202. }
  203. }
  204. if err = nepc.Stop(); err != nil {
  205. t.Fatal(err)
  206. }
  207. }