force_cluster.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "log"
  17. "sort"
  18. "github.com/coreos/etcd/pkg/pbutil"
  19. "github.com/coreos/etcd/pkg/types"
  20. "github.com/coreos/etcd/raft"
  21. "github.com/coreos/etcd/raft/raftpb"
  22. "github.com/coreos/etcd/wal"
  23. "github.com/coreos/etcd/wal/walpb"
  24. )
  25. func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
  26. var walsnap walpb.Snapshot
  27. if snapshot != nil {
  28. walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
  29. }
  30. w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
  31. cfg.Cluster.SetID(cid)
  32. // discard the previously uncommitted entries
  33. for i, ent := range ents {
  34. if ent.Index > st.Commit {
  35. log.Printf("etcdserver: discarding %d uncommitted WAL entries ", len(ents)-i)
  36. ents = ents[:i]
  37. break
  38. }
  39. }
  40. // force append the configuration change entries
  41. toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
  42. ents = append(ents, toAppEnts...)
  43. // force commit newly appended entries
  44. err := w.Save(raftpb.HardState{}, toAppEnts)
  45. if err != nil {
  46. log.Fatalf("etcdserver: %v", err)
  47. }
  48. if len(ents) != 0 {
  49. st.Commit = ents[len(ents)-1].Index
  50. }
  51. log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
  52. s := raft.NewMemoryStorage()
  53. if snapshot != nil {
  54. s.ApplySnapshot(*snapshot)
  55. }
  56. s.SetHardState(st)
  57. s.Append(ents)
  58. n := raft.RestartNode(uint64(id), 10, 1, s)
  59. return id, n, s, w
  60. }
  61. // getIDs returns an ordered set of IDs included in the given snapshot and
  62. // the entries. The given snapshot/entries can contain two kinds of
  63. // ID-related entry:
  64. // - ConfChangeAddNode, in which case the contained ID will be added into the set.
  65. // - ConfChangeAddRemove, in which case the contained ID will be removed from the set.
  66. func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
  67. ids := make(map[uint64]bool)
  68. if snap != nil {
  69. for _, id := range snap.Metadata.ConfState.Nodes {
  70. ids[id] = true
  71. }
  72. }
  73. for _, e := range ents {
  74. if e.Type != raftpb.EntryConfChange {
  75. continue
  76. }
  77. var cc raftpb.ConfChange
  78. pbutil.MustUnmarshal(&cc, e.Data)
  79. switch cc.Type {
  80. case raftpb.ConfChangeAddNode:
  81. ids[cc.NodeID] = true
  82. case raftpb.ConfChangeRemoveNode:
  83. delete(ids, cc.NodeID)
  84. default:
  85. log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
  86. }
  87. }
  88. sids := make(types.Uint64Slice, 0)
  89. for id := range ids {
  90. sids = append(sids, id)
  91. }
  92. sort.Sort(sids)
  93. return []uint64(sids)
  94. }
  95. // createConfigChangeEnts creates a series of Raft entries (i.e.
  96. // EntryConfChange) to remove the set of given IDs from the cluster. The ID
  97. // `self` is _not_ removed, even if present in the set.
  98. // If `self` is not inside the given ids, it creates a Raft entry to add a
  99. // default member with the given `self`.
  100. func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
  101. ents := make([]raftpb.Entry, 0)
  102. next := index + 1
  103. found := false
  104. for _, id := range ids {
  105. if id == self {
  106. found = true
  107. continue
  108. }
  109. cc := &raftpb.ConfChange{
  110. Type: raftpb.ConfChangeRemoveNode,
  111. NodeID: id,
  112. }
  113. e := raftpb.Entry{
  114. Type: raftpb.EntryConfChange,
  115. Data: pbutil.MustMarshal(cc),
  116. Term: term,
  117. Index: next,
  118. }
  119. ents = append(ents, e)
  120. next++
  121. }
  122. if !found {
  123. m := Member{
  124. ID: types.ID(self),
  125. RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
  126. }
  127. ctx, err := json.Marshal(m)
  128. if err != nil {
  129. log.Panicf("marshal member should never fail: %v", err)
  130. }
  131. cc := &raftpb.ConfChange{
  132. Type: raftpb.ConfChangeAddNode,
  133. NodeID: self,
  134. Context: ctx,
  135. }
  136. e := raftpb.Entry{
  137. Type: raftpb.EntryConfChange,
  138. Data: pbutil.MustMarshal(cc),
  139. Term: term,
  140. Index: next,
  141. }
  142. ents = append(ents, e)
  143. }
  144. return ents
  145. }