v3_snapshot_test.go 7.8 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package snapshot
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "net/url"
  20. "os"
  21. "path/filepath"
  22. "testing"
  23. "time"
  24. "go.etcd.io/etcd/clientv3"
  25. "go.etcd.io/etcd/embed"
  26. "go.etcd.io/etcd/pkg/fileutil"
  27. "go.etcd.io/etcd/pkg/testutil"
  28. "go.uber.org/zap"
  29. )
  30. // TestSnapshotV3RestoreSingle tests single node cluster restoring
  31. // from a snapshot file.
  32. func TestSnapshotV3RestoreSingle(t *testing.T) {
  33. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  34. dbPath := createSnapshotFile(t, kvs)
  35. defer os.RemoveAll(dbPath)
  36. clusterN := 1
  37. urls := newEmbedURLs(clusterN * 2)
  38. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  39. cfg := embed.NewConfig()
  40. cfg.Logger = "zap"
  41. cfg.LogOutputs = []string{"/dev/null"}
  42. cfg.Debug = false
  43. cfg.Name = "s1"
  44. cfg.InitialClusterToken = testClusterTkn
  45. cfg.ClusterState = "existing"
  46. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  47. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  48. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  49. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  50. sp := NewV3(zap.NewExample())
  51. pss := make([]string, 0, len(pURLs))
  52. for _, p := range pURLs {
  53. pss = append(pss, p.String())
  54. }
  55. if err := sp.Restore(RestoreConfig{
  56. SnapshotPath: dbPath,
  57. Name: cfg.Name,
  58. OutputDataDir: cfg.Dir,
  59. InitialCluster: cfg.InitialCluster,
  60. InitialClusterToken: cfg.InitialClusterToken,
  61. PeerURLs: pss,
  62. }); err != nil {
  63. t.Fatal(err)
  64. }
  65. srv, err := embed.StartEtcd(cfg)
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. defer func() {
  70. os.RemoveAll(cfg.Dir)
  71. srv.Close()
  72. }()
  73. select {
  74. case <-srv.Server.ReadyNotify():
  75. case <-time.After(3 * time.Second):
  76. t.Fatalf("failed to start restored etcd member")
  77. }
  78. var cli *clientv3.Client
  79. cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. defer cli.Close()
  84. for i := range kvs {
  85. var gresp *clientv3.GetResponse
  86. gresp, err = cli.Get(context.Background(), kvs[i].k)
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. if string(gresp.Kvs[0].Value) != kvs[i].v {
  91. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  92. }
  93. }
  94. }
  95. // TestSnapshotV3RestoreMulti ensures that multiple members
  96. // can boot into the same cluster after being restored from a same
  97. // snapshot file.
  98. func TestSnapshotV3RestoreMulti(t *testing.T) {
  99. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  100. dbPath := createSnapshotFile(t, kvs)
  101. defer os.RemoveAll(dbPath)
  102. clusterN := 3
  103. cURLs, _, srvs := restoreCluster(t, clusterN, dbPath)
  104. defer func() {
  105. for i := 0; i < clusterN; i++ {
  106. os.RemoveAll(srvs[i].Config().Dir)
  107. srvs[i].Close()
  108. }
  109. }()
  110. // wait for leader election
  111. time.Sleep(time.Second)
  112. for i := 0; i < clusterN; i++ {
  113. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}})
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. defer cli.Close()
  118. for i := range kvs {
  119. var gresp *clientv3.GetResponse
  120. gresp, err = cli.Get(context.Background(), kvs[i].k)
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. if string(gresp.Kvs[0].Value) != kvs[i].v {
  125. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  126. }
  127. }
  128. }
  129. }
  130. // TestSnapshotFilePermissions ensures that the snapshot is saved with
  131. // the correct file permissions.
  132. func TestSnapshotFilePermissions(t *testing.T) {
  133. expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
  134. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  135. dbPath := createSnapshotFile(t, kvs)
  136. defer os.RemoveAll(dbPath)
  137. dbInfo, err := os.Stat(dbPath)
  138. if err != nil {
  139. t.Fatalf("failed to get test snapshot file status: %v", err)
  140. }
  141. actualFileMode := dbInfo.Mode()
  142. if expectedFileMode != actualFileMode {
  143. t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode)
  144. }
  145. }
  146. type kv struct {
  147. k, v string
  148. }
  149. // creates a snapshot file and returns the file path.
  150. func createSnapshotFile(t *testing.T, kvs []kv) string {
  151. clusterN := 1
  152. urls := newEmbedURLs(clusterN * 2)
  153. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  154. cfg := embed.NewConfig()
  155. cfg.Logger = "zap"
  156. cfg.LogOutputs = []string{"/dev/null"}
  157. cfg.Debug = false
  158. cfg.Name = "default"
  159. cfg.ClusterState = "new"
  160. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  161. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  162. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  163. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  164. srv, err := embed.StartEtcd(cfg)
  165. if err != nil {
  166. t.Fatal(err)
  167. }
  168. defer func() {
  169. os.RemoveAll(cfg.Dir)
  170. srv.Close()
  171. }()
  172. select {
  173. case <-srv.Server.ReadyNotify():
  174. case <-time.After(3 * time.Second):
  175. t.Fatalf("failed to start embed.Etcd for creating snapshots")
  176. }
  177. ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
  178. cli, err := clientv3.New(ccfg)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. defer cli.Close()
  183. for i := range kvs {
  184. ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
  185. _, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
  186. cancel()
  187. if err != nil {
  188. t.Fatal(err)
  189. }
  190. }
  191. sp := NewV3(zap.NewExample())
  192. dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
  193. if err = sp.Save(context.Background(), ccfg, dpPath); err != nil {
  194. t.Fatal(err)
  195. }
  196. os.RemoveAll(cfg.Dir)
  197. srv.Close()
  198. return dpPath
  199. }
  200. const testClusterTkn = "tkn"
  201. func restoreCluster(t *testing.T, clusterN int, dbPath string) (
  202. cURLs []url.URL,
  203. pURLs []url.URL,
  204. srvs []*embed.Etcd) {
  205. urls := newEmbedURLs(clusterN * 2)
  206. cURLs, pURLs = urls[:clusterN], urls[clusterN:]
  207. ics := ""
  208. for i := 0; i < clusterN; i++ {
  209. ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
  210. }
  211. ics = ics[1:]
  212. cfgs := make([]*embed.Config, clusterN)
  213. for i := 0; i < clusterN; i++ {
  214. cfg := embed.NewConfig()
  215. cfg.Logger = "zap"
  216. cfg.LogOutputs = []string{"/dev/null"}
  217. cfg.Debug = false
  218. cfg.Name = fmt.Sprintf("%d", i)
  219. cfg.InitialClusterToken = testClusterTkn
  220. cfg.ClusterState = "existing"
  221. cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
  222. cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
  223. cfg.InitialCluster = ics
  224. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
  225. sp := NewV3(zap.NewExample())
  226. if err := sp.Restore(RestoreConfig{
  227. SnapshotPath: dbPath,
  228. Name: cfg.Name,
  229. OutputDataDir: cfg.Dir,
  230. PeerURLs: []string{pURLs[i].String()},
  231. InitialCluster: ics,
  232. InitialClusterToken: cfg.InitialClusterToken,
  233. }); err != nil {
  234. t.Fatal(err)
  235. }
  236. cfgs[i] = cfg
  237. }
  238. sch := make(chan *embed.Etcd)
  239. for i := range cfgs {
  240. go func(idx int) {
  241. srv, err := embed.StartEtcd(cfgs[idx])
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. <-srv.Server.ReadyNotify()
  246. sch <- srv
  247. }(i)
  248. }
  249. srvs = make([]*embed.Etcd, clusterN)
  250. for i := 0; i < clusterN; i++ {
  251. select {
  252. case srv := <-sch:
  253. srvs[i] = srv
  254. case <-time.After(5 * time.Second):
  255. t.Fatalf("#%d: failed to start embed.Etcd", i)
  256. }
  257. }
  258. return cURLs, pURLs, srvs
  259. }
  260. // TODO: TLS
  261. func newEmbedURLs(n int) (urls []url.URL) {
  262. urls = make([]url.URL, n)
  263. for i := 0; i < n; i++ {
  264. rand.Seed(int64(time.Now().Nanosecond()))
  265. u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000)))
  266. urls[i] = *u
  267. }
  268. return urls
  269. }