force_cluster.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "log"
  16. "github.com/coreos/etcd/pkg/pbutil"
  17. "github.com/coreos/etcd/pkg/types"
  18. "github.com/coreos/etcd/raft"
  19. "github.com/coreos/etcd/raft/raftpb"
  20. "github.com/coreos/etcd/wal"
  21. )
  22. func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
  23. var err error
  24. if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
  25. log.Fatalf("etcdserver: open wal error: %v", err)
  26. }
  27. id, cid, st, ents, err := readWAL(w, index)
  28. if err != nil {
  29. log.Fatalf("etcdserver: read wal error: %v", err)
  30. }
  31. cfg.Cluster.SetID(cid)
  32. // discard the previously uncommitted entries
  33. if len(ents) != 0 {
  34. ents = ents[:st.Commit+1]
  35. }
  36. // force append the configuration change entries
  37. toAppEnts := createConfigChangeEnts(getIDset(snapshot, ents), uint64(id), st.Term, st.Commit)
  38. ents = append(ents, toAppEnts...)
  39. // force commit newly appended entries
  40. for _, e := range toAppEnts {
  41. err := w.SaveEntry(&e)
  42. if err != nil {
  43. log.Fatalf("etcdserver: %v", err)
  44. }
  45. }
  46. if len(ents) != 0 {
  47. st.Commit = ents[len(ents)-1].Index
  48. }
  49. log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
  50. n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
  51. return
  52. }
  53. // getIDset returns a set of IDs included in the given snapshot and the entries.
  54. // The given snapshot contians a list of IDs.
  55. // The given entries might contain two kinds of ID related entry.
  56. // If the entry type is Add, the contained ID will be added into the set.
  57. // If the entry type is Remove, the contained ID will be removed from the set.
  58. func getIDset(snap *raftpb.Snapshot, ents []raftpb.Entry) map[uint64]bool {
  59. ids := make(map[uint64]bool)
  60. if snap != nil {
  61. for _, id := range snap.Nodes {
  62. ids[id] = true
  63. }
  64. }
  65. for _, e := range ents {
  66. if e.Type != raftpb.EntryConfChange {
  67. continue
  68. }
  69. var cc raftpb.ConfChange
  70. pbutil.MustUnmarshal(&cc, e.Data)
  71. switch cc.Type {
  72. case raftpb.ConfChangeAddNode:
  73. ids[cc.NodeID] = true
  74. case raftpb.ConfChangeRemoveNode:
  75. delete(ids, cc.NodeID)
  76. default:
  77. log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
  78. }
  79. }
  80. return ids
  81. }
  82. func createConfigChangeEnts(ids map[uint64]bool, self uint64, term, index uint64) []raftpb.Entry {
  83. ents := make([]raftpb.Entry, 0)
  84. next := index + 1
  85. for id := range ids {
  86. if id == self {
  87. continue
  88. }
  89. cc := &raftpb.ConfChange{
  90. Type: raftpb.ConfChangeRemoveNode,
  91. NodeID: id,
  92. }
  93. e := raftpb.Entry{
  94. Type: raftpb.EntryConfChange,
  95. Data: pbutil.MustMarshal(cc),
  96. Term: term,
  97. Index: next,
  98. }
  99. ents = append(ents, e)
  100. next++
  101. }
  102. return ents
  103. }