v3_snapshot_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package snapshot
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "net/url"
  20. "os"
  21. "path/filepath"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/embed"
  26. "github.com/coreos/etcd/pkg/testutil"
  27. "go.uber.org/zap"
  28. )
  29. // TestSnapshotV3RestoreSingle tests single node cluster restoring
  30. // from a snapshot file.
  31. func TestSnapshotV3RestoreSingle(t *testing.T) {
  32. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  33. dbPath := createSnapshotFile(t, kvs)
  34. defer os.RemoveAll(dbPath)
  35. clusterN := 1
  36. urls := newEmbedURLs(clusterN * 2)
  37. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  38. cfg := embed.NewConfig()
  39. cfg.Logger = "zap"
  40. cfg.LogOutputs = []string{"/dev/null"}
  41. cfg.Debug = false
  42. cfg.Name = "s1"
  43. cfg.InitialClusterToken = testClusterTkn
  44. cfg.ClusterState = "existing"
  45. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  46. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  47. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  48. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  49. sp := NewV3(zap.NewExample())
  50. pss := make([]string, 0, len(pURLs))
  51. for _, p := range pURLs {
  52. pss = append(pss, p.String())
  53. }
  54. if err := sp.Restore(RestoreConfig{
  55. SnapshotPath: dbPath,
  56. Name: cfg.Name,
  57. OutputDataDir: cfg.Dir,
  58. InitialCluster: cfg.InitialCluster,
  59. InitialClusterToken: cfg.InitialClusterToken,
  60. PeerURLs: pss,
  61. }); err != nil {
  62. t.Fatal(err)
  63. }
  64. srv, err := embed.StartEtcd(cfg)
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. defer func() {
  69. os.RemoveAll(cfg.Dir)
  70. srv.Close()
  71. }()
  72. select {
  73. case <-srv.Server.ReadyNotify():
  74. case <-time.After(3 * time.Second):
  75. t.Fatalf("failed to start restored etcd member")
  76. }
  77. var cli *clientv3.Client
  78. cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. defer cli.Close()
  83. for i := range kvs {
  84. var gresp *clientv3.GetResponse
  85. gresp, err = cli.Get(context.Background(), kvs[i].k)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. if string(gresp.Kvs[0].Value) != kvs[i].v {
  90. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  91. }
  92. }
  93. }
  94. // TestSnapshotV3RestoreMulti ensures that multiple members
  95. // can boot into the same cluster after being restored from a same
  96. // snapshot file.
  97. func TestSnapshotV3RestoreMulti(t *testing.T) {
  98. kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
  99. dbPath := createSnapshotFile(t, kvs)
  100. defer os.RemoveAll(dbPath)
  101. clusterN := 3
  102. cURLs, _, srvs := restoreCluster(t, clusterN, dbPath)
  103. defer func() {
  104. for i := 0; i < clusterN; i++ {
  105. os.RemoveAll(srvs[i].Config().Dir)
  106. srvs[i].Close()
  107. }
  108. }()
  109. // wait for leader election
  110. time.Sleep(time.Second)
  111. for i := 0; i < clusterN; i++ {
  112. cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}})
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. defer cli.Close()
  117. for i := range kvs {
  118. var gresp *clientv3.GetResponse
  119. gresp, err = cli.Get(context.Background(), kvs[i].k)
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. if string(gresp.Kvs[0].Value) != kvs[i].v {
  124. t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
  125. }
  126. }
  127. }
  128. }
  129. type kv struct {
  130. k, v string
  131. }
  132. // creates a snapshot file and returns the file path.
  133. func createSnapshotFile(t *testing.T, kvs []kv) string {
  134. clusterN := 1
  135. urls := newEmbedURLs(clusterN * 2)
  136. cURLs, pURLs := urls[:clusterN], urls[clusterN:]
  137. cfg := embed.NewConfig()
  138. cfg.Logger = "zap"
  139. cfg.LogOutputs = []string{"/dev/null"}
  140. cfg.Debug = false
  141. cfg.Name = "default"
  142. cfg.ClusterState = "new"
  143. cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
  144. cfg.LPUrls, cfg.APUrls = pURLs, pURLs
  145. cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
  146. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
  147. srv, err := embed.StartEtcd(cfg)
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. defer func() {
  152. os.RemoveAll(cfg.Dir)
  153. srv.Close()
  154. }()
  155. select {
  156. case <-srv.Server.ReadyNotify():
  157. case <-time.After(3 * time.Second):
  158. t.Fatalf("failed to start embed.Etcd for creating snapshots")
  159. }
  160. ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
  161. cli, err := clientv3.New(ccfg)
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. defer cli.Close()
  166. for i := range kvs {
  167. ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
  168. _, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
  169. cancel()
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. }
  174. sp := NewV3(zap.NewExample())
  175. dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
  176. if err = sp.Save(context.Background(), ccfg, dpPath); err != nil {
  177. t.Fatal(err)
  178. }
  179. os.RemoveAll(cfg.Dir)
  180. srv.Close()
  181. return dpPath
  182. }
  183. const testClusterTkn = "tkn"
  184. func restoreCluster(t *testing.T, clusterN int, dbPath string) (
  185. cURLs []url.URL,
  186. pURLs []url.URL,
  187. srvs []*embed.Etcd) {
  188. urls := newEmbedURLs(clusterN * 2)
  189. cURLs, pURLs = urls[:clusterN], urls[clusterN:]
  190. ics := ""
  191. for i := 0; i < clusterN; i++ {
  192. ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
  193. }
  194. ics = ics[1:]
  195. cfgs := make([]*embed.Config, clusterN)
  196. for i := 0; i < clusterN; i++ {
  197. cfg := embed.NewConfig()
  198. cfg.Logger = "zap"
  199. cfg.LogOutputs = []string{"/dev/null"}
  200. cfg.Debug = false
  201. cfg.Name = fmt.Sprintf("%d", i)
  202. cfg.InitialClusterToken = testClusterTkn
  203. cfg.ClusterState = "existing"
  204. cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
  205. cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
  206. cfg.InitialCluster = ics
  207. cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
  208. sp := NewV3(zap.NewExample())
  209. if err := sp.Restore(RestoreConfig{
  210. SnapshotPath: dbPath,
  211. Name: cfg.Name,
  212. OutputDataDir: cfg.Dir,
  213. PeerURLs: []string{pURLs[i].String()},
  214. InitialCluster: ics,
  215. InitialClusterToken: cfg.InitialClusterToken,
  216. }); err != nil {
  217. t.Fatal(err)
  218. }
  219. cfgs[i] = cfg
  220. }
  221. sch := make(chan *embed.Etcd)
  222. for i := range cfgs {
  223. go func(idx int) {
  224. srv, err := embed.StartEtcd(cfgs[idx])
  225. if err != nil {
  226. t.Fatal(err)
  227. }
  228. <-srv.Server.ReadyNotify()
  229. sch <- srv
  230. }(i)
  231. }
  232. srvs = make([]*embed.Etcd, clusterN)
  233. for i := 0; i < clusterN; i++ {
  234. select {
  235. case srv := <-sch:
  236. srvs[i] = srv
  237. case <-time.After(5 * time.Second):
  238. t.Fatalf("#%d: failed to start embed.Etcd", i)
  239. }
  240. }
  241. return cURLs, pURLs, srvs
  242. }
  243. // TODO: TLS
  244. func newEmbedURLs(n int) (urls []url.URL) {
  245. urls = make([]url.URL, n)
  246. for i := 0; i < n; i++ {
  247. rand.Seed(int64(time.Now().Nanosecond()))
  248. u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000)))
  249. urls[i] = *u
  250. }
  251. return urls
  252. }