v3_snapshot_test.go 7.3 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package snapshot
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "net/url"
  20. "os"
  21. "path/filepath"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/embed"
  26. "github.com/coreos/etcd/pkg/logger"
  27. "github.com/coreos/etcd/pkg/testutil"
  28. "github.com/coreos/etcd/pkg/types"
  29. )
  30. // TestSnapshotV3RestoreSingle tests single node cluster restoring
  31. // from a snapshot file.
  32. func TestSnapshotV3RestoreSingle(t *testing.T) {
  33. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  34. dbPath := createSnapshotFile(t, kvs)
  35. defer os.RemoveAll(dbPath)
  36. clusterN := 1
  37. urls := newEmbedURLs(clusterN * 2)
  38. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  39. cfg := embed.NewConfig()
  40. cfg.Name = "s1"
  41. cfg.InitialClusterToken = testClusterTkn
  42. cfg.ClusterState = "existing"
  43. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  44. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  45. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  46. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  47. sp := NewV3(nil, logger.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
  48. err := sp.Restore(dbPath, RestoreConfig{})
  49. if err.Error() != `couldn't find local name "" in the initial cluster configuration` {
  50. t.Fatalf("expected restore error, got %v", err)
  51. }
  52. var iURLs types.URLsMap
  53. iURLs, err = types.NewURLsMap(cfg.InitialCluster)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. if err = sp.Restore(dbPath, RestoreConfig{
  58. Name: cfg.Name,
  59. OutputDataDir: cfg.Dir,
  60. InitialCluster: iURLs,
  61. InitialClusterToken: cfg.InitialClusterToken,
  62. PeerURLs: pURLs,
  63. }); err != nil {
  64. t.Fatal(err)
  65. }
  66. var srv *embed.Etcd
  67. srv, err = embed.StartEtcd(cfg)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. defer func() {
  72. os.RemoveAll(cfg.Dir)
  73. srv.Close()
  74. }()
  75. select {
  76. case <-srv.Server.ReadyNotify():
  77. case <-time.After(3 * time.Second):
  78. t.Fatalf("failed to start restored etcd member")
  79. }
  80. var cli *clientv3.Client
  81. cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. defer cli.Close()
  86. for i := range kvs {
  87. var gresp *clientv3.GetResponse
  88. gresp, err = cli.Get(context.Background(), kvs[i].k)
  89. if err != nil {
  90. t.Fatal(err)
  91. }
  92. if string(gresp.Kvs[0].Value) != kvs[i].v {
  93. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  94. }
  95. }
  96. }
  97. // TestSnapshotV3RestoreMulti ensures that multiple members
  98. // can boot into the same cluster after being restored from a same
  99. // snapshot file.
  100. func TestSnapshotV3RestoreMulti(t *testing.T) {
  101. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  102. dbPath := createSnapshotFile(t, kvs)
  103. defer os.RemoveAll(dbPath)
  104. clusterN := 3
  105. cURLs, _, srvs := restoreCluster(t, clusterN, dbPath)
  106. defer func() {
  107. for i := 0; i < clusterN; i++ {
  108. os.RemoveAll(srvs[i].Config().Dir)
  109. srvs[i].Close()
  110. }
  111. }()
  112. // wait for leader election
  113. time.Sleep(time.Second)
  114. for i := 0; i < clusterN; i++ {
  115. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}})
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. defer cli.Close()
  120. for i := range kvs {
  121. var gresp *clientv3.GetResponse
  122. gresp, err = cli.Get(context.Background(), kvs[i].k)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. if string(gresp.Kvs[0].Value) != kvs[i].v {
  127. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  128. }
  129. }
  130. }
  131. }
  132. type kv struct {
  133. k, v string
  134. }
  135. // creates a snapshot file and returns the file path.
  136. func createSnapshotFile(t *testing.T, kvs []kv) string {
  137. clusterN := 1
  138. urls := newEmbedURLs(clusterN * 2)
  139. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  140. cfg := embed.NewConfig()
  141. cfg.Name = "default"
  142. cfg.ClusterState = "new"
  143. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  144. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  145. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  146. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  147. srv, err := embed.StartEtcd(cfg)
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. defer func() {
  152. os.RemoveAll(cfg.Dir)
  153. srv.Close()
  154. }()
  155. select {
  156. case <-srv.Server.ReadyNotify():
  157. case <-time.After(3 * time.Second):
  158. t.Fatalf("failed to start embed.Etcd for creating snapshots")
  159. }
  160. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. for i := range kvs {
  165. ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
  166. _, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
  167. cancel()
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. }
  172. sp := NewV3(cli, logger.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
  173. dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
  174. if err = sp.Save(context.Background(), dpPath); err != nil {
  175. t.Fatal(err)
  176. }
  177. os.RemoveAll(cfg.Dir)
  178. srv.Close()
  179. return dpPath
  180. }
  181. const testClusterTkn = "tkn"
  182. func restoreCluster(t *testing.T, clusterN int, dbPath string) (
  183. cURLs []url.URL,
  184. pURLs []url.URL,
  185. srvs []*embed.Etcd) {
  186. urls := newEmbedURLs(clusterN * 2)
  187. cURLs, pURLs = urls[:clusterN], urls[clusterN:]
  188. ics := ""
  189. for i := 0; i < clusterN; i++ {
  190. ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
  191. }
  192. ics = ics[1:]
  193. iURLs, err := types.NewURLsMap(ics)
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. cfgs := make([]*embed.Config, clusterN)
  198. for i := 0; i < clusterN; i++ {
  199. cfg := embed.NewConfig()
  200. cfg.Name = fmt.Sprintf("%d", i)
  201. cfg.InitialClusterToken = testClusterTkn
  202. cfg.ClusterState = "existing"
  203. cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
  204. cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
  205. cfg.InitialCluster = ics
  206. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
  207. sp := NewV3(nil, logger.NewPackageLogger("github.com/coreos/etcd", "snapshot"))
  208. if err := sp.Restore(dbPath, RestoreConfig{
  209. Name: cfg.Name,
  210. OutputDataDir: cfg.Dir,
  211. InitialCluster: iURLs,
  212. InitialClusterToken: cfg.InitialClusterToken,
  213. PeerURLs: types.URLs{pURLs[i]},
  214. }); err != nil {
  215. t.Fatal(err)
  216. }
  217. cfgs[i] = cfg
  218. }
  219. sch := make(chan *embed.Etcd)
  220. for i := range cfgs {
  221. go func(idx int) {
  222. srv, err := embed.StartEtcd(cfgs[idx])
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. <-srv.Server.ReadyNotify()
  227. sch <- srv
  228. }(i)
  229. }
  230. srvs = make([]*embed.Etcd, clusterN)
  231. for i := 0; i < clusterN; i++ {
  232. select {
  233. case srv := <-sch:
  234. srvs[i] = srv
  235. case <-time.After(5 * time.Second):
  236. t.Fatalf("#%d: failed to start embed.Etcd", i)
  237. }
  238. }
  239. return cURLs, pURLs, srvs
  240. }
  241. // TODO: TLS
  242. func newEmbedURLs(n int) (urls []url.URL) {
  243. urls = make([]url.URL, n)
  244. for i := 0; i < n; i++ {
  245. rand.Seed(int64(time.Now().Nanosecond()))
  246. u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000)))
  247. urls[i] = *u
  248. }
  249. return urls
  250. }