case_sigquit_remove_quorum.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/functional/rpcpb"
  22. "go.uber.org/zap"
  23. )
  24. type fetchSnapshotCaseQuorum struct {
  25. desc string
  26. rpcpbCase rpcpb.Case
  27. injected map[int]struct{}
  28. snapshotted int
  29. }
  30. func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
  31. // 1. Assume node C is the current leader with most up-to-date data.
  32. lead, err := clus.GetLeader()
  33. if err != nil {
  34. return err
  35. }
  36. c.snapshotted = lead
  37. // 2. Download snapshot from node C, before destroying node A and B.
  38. clus.lg.Info(
  39. "save snapshot on leader node START",
  40. zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
  41. )
  42. var resp *rpcpb.Response
  43. resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT)
  44. if resp == nil || (resp != nil && !resp.Success) || err != nil {
  45. clus.lg.Info(
  46. "save snapshot on leader node FAIL",
  47. zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
  48. zap.Error(err),
  49. )
  50. return err
  51. }
  52. clus.lg.Info(
  53. "save snapshot on leader node SUCCESS",
  54. zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
  55. zap.String("member-name", resp.SnapshotInfo.MemberName),
  56. zap.Strings("member-client-urls", resp.SnapshotInfo.MemberClientURLs),
  57. zap.String("snapshot-path", resp.SnapshotInfo.SnapshotPath),
  58. zap.String("snapshot-file-size", resp.SnapshotInfo.SnapshotFileSize),
  59. zap.String("snapshot-total-size", resp.SnapshotInfo.SnapshotTotalSize),
  60. zap.Int64("snapshot-total-key", resp.SnapshotInfo.SnapshotTotalKey),
  61. zap.Int64("snapshot-hash", resp.SnapshotInfo.SnapshotHash),
  62. zap.Int64("snapshot-revision", resp.SnapshotInfo.SnapshotRevision),
  63. zap.String("took", resp.SnapshotInfo.Took),
  64. zap.Error(err),
  65. )
  66. if err != nil {
  67. return err
  68. }
  69. clus.Members[lead].SnapshotInfo = resp.SnapshotInfo
  70. leaderc, err := clus.Members[lead].CreateEtcdClient()
  71. if err != nil {
  72. return err
  73. }
  74. defer leaderc.Close()
  75. var mresp *clientv3.MemberListResponse
  76. mresp, err = leaderc.MemberList(context.Background())
  77. mss := []string{}
  78. if err == nil && mresp != nil {
  79. mss = describeMembers(mresp)
  80. }
  81. clus.lg.Info(
  82. "member list before disastrous machine failure",
  83. zap.String("request-to", clus.Members[lead].EtcdClientEndpoint),
  84. zap.Strings("members", mss),
  85. zap.Error(err),
  86. )
  87. if err != nil {
  88. return err
  89. }
  90. // simulate real life; machine failures may happen
  91. // after some time since last snapshot save
  92. time.Sleep(time.Second)
  93. // 3. Destroy node A and B, and make the whole cluster inoperable.
  94. for {
  95. c.injected = pickQuorum(len(clus.Members))
  96. if _, ok := c.injected[lead]; !ok {
  97. break
  98. }
  99. }
  100. for idx := range c.injected {
  101. clus.lg.Info(
  102. "disastrous machine failure to quorum START",
  103. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  104. )
  105. err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
  106. clus.lg.Info(
  107. "disastrous machine failure to quorum END",
  108. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  109. zap.Error(err),
  110. )
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. // 4. Now node C cannot operate either.
  116. // 5. SIGTERM node C and remove its data directories.
  117. clus.lg.Info(
  118. "disastrous machine failure to old leader START",
  119. zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
  120. )
  121. err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
  122. clus.lg.Info(
  123. "disastrous machine failure to old leader END",
  124. zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
  125. zap.Error(err),
  126. )
  127. return err
  128. }
  129. func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
  130. // 6. Restore a new seed member from node C's latest snapshot file.
  131. oldlead := c.snapshotted
  132. // configuration on restart from recovered snapshot
  133. // seed member's configuration is all the same as previous one
  134. // except initial cluster string is now a single-node cluster
  135. clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd
  136. clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing"
  137. name := clus.Members[oldlead].Etcd.Name
  138. initClus := []string{}
  139. for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs {
  140. initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
  141. }
  142. clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
  143. clus.lg.Info(
  144. "restore snapshot and restart from snapshot request START",
  145. zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
  146. zap.Strings("initial-cluster", initClus),
  147. )
  148. err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT)
  149. clus.lg.Info(
  150. "restore snapshot and restart from snapshot request END",
  151. zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
  152. zap.Strings("initial-cluster", initClus),
  153. zap.Error(err),
  154. )
  155. if err != nil {
  156. return err
  157. }
  158. leaderc, err := clus.Members[oldlead].CreateEtcdClient()
  159. if err != nil {
  160. return err
  161. }
  162. defer leaderc.Close()
  163. // 7. Add another member to establish 2-node cluster.
  164. // 8. Add another member to establish 3-node cluster.
  165. // 9. Add more if any.
  166. idxs := make([]int, 0, len(c.injected))
  167. for idx := range c.injected {
  168. idxs = append(idxs, idx)
  169. }
  170. clus.lg.Info("member add START", zap.Int("members-to-add", len(idxs)))
  171. for i, idx := range idxs {
  172. clus.lg.Info(
  173. "member add request SENT",
  174. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  175. zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
  176. )
  177. ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
  178. _, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs)
  179. cancel()
  180. clus.lg.Info(
  181. "member add request DONE",
  182. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  183. zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
  184. zap.Error(err),
  185. )
  186. if err != nil {
  187. return err
  188. }
  189. // start the added(new) member with fresh data
  190. clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd
  191. clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing"
  192. name := clus.Members[idx].Etcd.Name
  193. for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs {
  194. initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
  195. }
  196. clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
  197. clus.lg.Info(
  198. "restart from snapshot request SENT",
  199. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  200. zap.Strings("initial-cluster", initClus),
  201. )
  202. err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT)
  203. clus.lg.Info(
  204. "restart from snapshot request DONE",
  205. zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
  206. zap.Strings("initial-cluster", initClus),
  207. zap.Error(err),
  208. )
  209. if err != nil {
  210. return err
  211. }
  212. if i != len(c.injected)-1 {
  213. // wait until membership reconfiguration entry gets applied
  214. // TODO: test concurrent member add
  215. dur := 5 * clus.Members[idx].ElectionTimeout()
  216. clus.lg.Info(
  217. "waiting after restart from snapshot request",
  218. zap.Int("i", i),
  219. zap.Int("idx", idx),
  220. zap.Duration("sleep", dur),
  221. )
  222. time.Sleep(dur)
  223. } else {
  224. clus.lg.Info(
  225. "restart from snapshot request ALL END",
  226. zap.Int("i", i),
  227. zap.Int("idx", idx),
  228. )
  229. }
  230. }
  231. return nil
  232. }
  233. func (c *fetchSnapshotCaseQuorum) Desc() string {
  234. if c.desc != "" {
  235. return c.desc
  236. }
  237. return c.rpcpbCase.String()
  238. }
  239. func (c *fetchSnapshotCaseQuorum) TestCase() rpcpb.Case {
  240. return c.rpcpbCase
  241. }
  242. func new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus *Cluster) Case {
  243. c := &fetchSnapshotCaseQuorum{
  244. rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH,
  245. injected: make(map[int]struct{}),
  246. snapshotted: -1,
  247. }
  248. // simulate real life; machine replacements may happen
  249. // after some time since disaster
  250. return &caseDelay{
  251. Case: c,
  252. delayDuration: clus.GetCaseDelayDuration(),
  253. }
  254. }