123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package tester
- import (
- "context"
- "fmt"
- "strings"
- "time"
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/functional/rpcpb"
- "go.uber.org/zap"
- )
- type fetchSnapshotCaseQuorum struct {
- desc string
- rpcpbCase rpcpb.Case
- injected map[int]struct{}
- snapshotted int
- }
- func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
- // 1. Assume node C is the current leader with most up-to-date data.
- lead, err := clus.GetLeader()
- if err != nil {
- return err
- }
- c.snapshotted = lead
- // 2. Download snapshot from node C, before destroying node A and B.
- clus.lg.Info(
- "save snapshot on leader node START",
- zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
- )
- var resp *rpcpb.Response
- resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT)
- if resp == nil || (resp != nil && !resp.Success) || err != nil {
- clus.lg.Info(
- "save snapshot on leader node FAIL",
- zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
- zap.Error(err),
- )
- return err
- }
- clus.lg.Info(
- "save snapshot on leader node SUCCESS",
- zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
- zap.String("member-name", resp.SnapshotInfo.MemberName),
- zap.Strings("member-client-urls", resp.SnapshotInfo.MemberClientURLs),
- zap.String("snapshot-path", resp.SnapshotInfo.SnapshotPath),
- zap.String("snapshot-file-size", resp.SnapshotInfo.SnapshotFileSize),
- zap.String("snapshot-total-size", resp.SnapshotInfo.SnapshotTotalSize),
- zap.Int64("snapshot-total-key", resp.SnapshotInfo.SnapshotTotalKey),
- zap.Int64("snapshot-hash", resp.SnapshotInfo.SnapshotHash),
- zap.Int64("snapshot-revision", resp.SnapshotInfo.SnapshotRevision),
- zap.String("took", resp.SnapshotInfo.Took),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- clus.Members[lead].SnapshotInfo = resp.SnapshotInfo
- leaderc, err := clus.Members[lead].CreateEtcdClient()
- if err != nil {
- return err
- }
- defer leaderc.Close()
- var mresp *clientv3.MemberListResponse
- mresp, err = leaderc.MemberList(context.Background())
- mss := []string{}
- if err == nil && mresp != nil {
- mss = describeMembers(mresp)
- }
- clus.lg.Info(
- "member list before disastrous machine failure",
- zap.String("request-to", clus.Members[lead].EtcdClientEndpoint),
- zap.Strings("members", mss),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- // simulate real life; machine failures may happen
- // after some time since last snapshot save
- time.Sleep(time.Second)
- // 3. Destroy node A and B, and make the whole cluster inoperable.
- for {
- c.injected = pickQuorum(len(clus.Members))
- if _, ok := c.injected[lead]; !ok {
- break
- }
- }
- for idx := range c.injected {
- clus.lg.Info(
- "disastrous machine failure to quorum START",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- )
- err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
- clus.lg.Info(
- "disastrous machine failure to quorum END",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- }
- // 4. Now node C cannot operate either.
- // 5. SIGTERM node C and remove its data directories.
- clus.lg.Info(
- "disastrous machine failure to old leader START",
- zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
- )
- err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
- clus.lg.Info(
- "disastrous machine failure to old leader END",
- zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
- zap.Error(err),
- )
- return err
- }
- func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
- // 6. Restore a new seed member from node C's latest snapshot file.
- oldlead := c.snapshotted
- // configuration on restart from recovered snapshot
- // seed member's configuration is all the same as previous one
- // except initial cluster string is now a single-node cluster
- clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd
- clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing"
- name := clus.Members[oldlead].Etcd.Name
- initClus := []string{}
- for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs {
- initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
- }
- clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
- clus.lg.Info(
- "restore snapshot and restart from snapshot request START",
- zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
- zap.Strings("initial-cluster", initClus),
- )
- err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT)
- clus.lg.Info(
- "restore snapshot and restart from snapshot request END",
- zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
- zap.Strings("initial-cluster", initClus),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- leaderc, err := clus.Members[oldlead].CreateEtcdClient()
- if err != nil {
- return err
- }
- defer leaderc.Close()
- // 7. Add another member to establish 2-node cluster.
- // 8. Add another member to establish 3-node cluster.
- // 9. Add more if any.
- idxs := make([]int, 0, len(c.injected))
- for idx := range c.injected {
- idxs = append(idxs, idx)
- }
- clus.lg.Info("member add START", zap.Int("members-to-add", len(idxs)))
- for i, idx := range idxs {
- clus.lg.Info(
- "member add request SENT",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
- )
- ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
- _, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs)
- cancel()
- clus.lg.Info(
- "member add request DONE",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- // start the added(new) member with fresh data
- clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd
- clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing"
- name := clus.Members[idx].Etcd.Name
- for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs {
- initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
- }
- clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
- clus.lg.Info(
- "restart from snapshot request SENT",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- zap.Strings("initial-cluster", initClus),
- )
- err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT)
- clus.lg.Info(
- "restart from snapshot request DONE",
- zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
- zap.Strings("initial-cluster", initClus),
- zap.Error(err),
- )
- if err != nil {
- return err
- }
- if i != len(c.injected)-1 {
- // wait until membership reconfiguration entry gets applied
- // TODO: test concurrent member add
- dur := 5 * clus.Members[idx].ElectionTimeout()
- clus.lg.Info(
- "waiting after restart from snapshot request",
- zap.Int("i", i),
- zap.Int("idx", idx),
- zap.Duration("sleep", dur),
- )
- time.Sleep(dur)
- } else {
- clus.lg.Info(
- "restart from snapshot request ALL END",
- zap.Int("i", i),
- zap.Int("idx", idx),
- )
- }
- }
- return nil
- }
- func (c *fetchSnapshotCaseQuorum) Desc() string {
- if c.desc != "" {
- return c.desc
- }
- return c.rpcpbCase.String()
- }
- func (c *fetchSnapshotCaseQuorum) TestCase() rpcpb.Case {
- return c.rpcpbCase
- }
- func new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus *Cluster) Case {
- c := &fetchSnapshotCaseQuorum{
- rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH,
- injected: make(map[int]struct{}),
- snapshotted: -1,
- }
- // simulate real life; machine replacements may happen
- // after some time since disaster
- return &caseDelay{
- Case: c,
- delayDuration: clus.GetCaseDelayDuration(),
- }
- }
|