raft.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. Copyright 2015 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "log"
  17. "os"
  18. "sort"
  19. "time"
  20. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  21. "github.com/coreos/etcd/pkg/pbutil"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft"
  24. "github.com/coreos/etcd/raft/raftpb"
  25. "github.com/coreos/etcd/rafthttp"
  26. "github.com/coreos/etcd/wal"
  27. "github.com/coreos/etcd/wal/walpb"
  28. )
  29. type RaftTimer interface {
  30. Index() uint64
  31. Term() uint64
  32. }
  33. type raftNode struct {
  34. raft.Node
  35. // config
  36. snapCount uint64 // number of entries to trigger a snapshot
  37. // utility
  38. ticker <-chan time.Time
  39. raftStorage *raft.MemoryStorage
  40. storage Storage
  41. // transport specifies the transport to send and receive msgs to members.
  42. // Sending messages MUST NOT block. It is okay to drop messages, since
  43. // clients should timeout and reissue their messages.
  44. // If transport is nil, server will panic.
  45. transport rafthttp.Transporter
  46. // Cache of the latest raft index and raft term the server has seen
  47. index uint64
  48. term uint64
  49. lead uint64
  50. }
  51. // for testing
  52. func (r *raftNode) pauseSending() {
  53. p := r.transport.(rafthttp.Pausable)
  54. p.Pause()
  55. }
  56. func (r *raftNode) resumeSending() {
  57. p := r.transport.(rafthttp.Pausable)
  58. p.Resume()
  59. }
  60. func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
  61. var err error
  62. member := cfg.Cluster.MemberByName(cfg.Name)
  63. metadata := pbutil.MustMarshal(
  64. &pb.Metadata{
  65. NodeID: uint64(member.ID),
  66. ClusterID: uint64(cfg.Cluster.ID()),
  67. },
  68. )
  69. if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
  70. log.Fatalf("etcdserver create snapshot directory error: %v", err)
  71. }
  72. if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
  73. log.Fatalf("etcdserver: create wal error: %v", err)
  74. }
  75. peers := make([]raft.Peer, len(ids))
  76. for i, id := range ids {
  77. ctx, err := json.Marshal((*cfg.Cluster).Member(id))
  78. if err != nil {
  79. log.Panicf("marshal member should never fail: %v", err)
  80. }
  81. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  82. }
  83. id = member.ID
  84. log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
  85. s = raft.NewMemoryStorage()
  86. n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
  87. return
  88. }
  89. func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
  90. var walsnap walpb.Snapshot
  91. if snapshot != nil {
  92. walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
  93. }
  94. w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
  95. cfg.Cluster.SetID(cid)
  96. log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
  97. s := raft.NewMemoryStorage()
  98. if snapshot != nil {
  99. s.ApplySnapshot(*snapshot)
  100. }
  101. s.SetHardState(st)
  102. s.Append(ents)
  103. n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
  104. return id, n, s, w
  105. }
  106. func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
  107. var walsnap walpb.Snapshot
  108. if snapshot != nil {
  109. walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
  110. }
  111. w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
  112. cfg.Cluster.SetID(cid)
  113. // discard the previously uncommitted entries
  114. for i, ent := range ents {
  115. if ent.Index > st.Commit {
  116. log.Printf("etcdserver: discarding %d uncommitted WAL entries ", len(ents)-i)
  117. ents = ents[:i]
  118. break
  119. }
  120. }
  121. // force append the configuration change entries
  122. toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
  123. ents = append(ents, toAppEnts...)
  124. // force commit newly appended entries
  125. err := w.Save(raftpb.HardState{}, toAppEnts)
  126. if err != nil {
  127. log.Fatalf("etcdserver: %v", err)
  128. }
  129. if len(ents) != 0 {
  130. st.Commit = ents[len(ents)-1].Index
  131. }
  132. log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
  133. s := raft.NewMemoryStorage()
  134. if snapshot != nil {
  135. s.ApplySnapshot(*snapshot)
  136. }
  137. s.SetHardState(st)
  138. s.Append(ents)
  139. n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
  140. return id, n, s, w
  141. }
  142. // getIDs returns an ordered set of IDs included in the given snapshot and
  143. // the entries. The given snapshot/entries can contain two kinds of
  144. // ID-related entry:
  145. // - ConfChangeAddNode, in which case the contained ID will be added into the set.
  146. // - ConfChangeAddRemove, in which case the contained ID will be removed from the set.
  147. func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
  148. ids := make(map[uint64]bool)
  149. if snap != nil {
  150. for _, id := range snap.Metadata.ConfState.Nodes {
  151. ids[id] = true
  152. }
  153. }
  154. for _, e := range ents {
  155. if e.Type != raftpb.EntryConfChange {
  156. continue
  157. }
  158. var cc raftpb.ConfChange
  159. pbutil.MustUnmarshal(&cc, e.Data)
  160. switch cc.Type {
  161. case raftpb.ConfChangeAddNode:
  162. ids[cc.NodeID] = true
  163. case raftpb.ConfChangeRemoveNode:
  164. delete(ids, cc.NodeID)
  165. default:
  166. log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
  167. }
  168. }
  169. sids := make(types.Uint64Slice, 0)
  170. for id := range ids {
  171. sids = append(sids, id)
  172. }
  173. sort.Sort(sids)
  174. return []uint64(sids)
  175. }
  176. // createConfigChangeEnts creates a series of Raft entries (i.e.
  177. // EntryConfChange) to remove the set of given IDs from the cluster. The ID
  178. // `self` is _not_ removed, even if present in the set.
  179. // If `self` is not inside the given ids, it creates a Raft entry to add a
  180. // default member with the given `self`.
  181. func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
  182. ents := make([]raftpb.Entry, 0)
  183. next := index + 1
  184. found := false
  185. for _, id := range ids {
  186. if id == self {
  187. found = true
  188. continue
  189. }
  190. cc := &raftpb.ConfChange{
  191. Type: raftpb.ConfChangeRemoveNode,
  192. NodeID: id,
  193. }
  194. e := raftpb.Entry{
  195. Type: raftpb.EntryConfChange,
  196. Data: pbutil.MustMarshal(cc),
  197. Term: term,
  198. Index: next,
  199. }
  200. ents = append(ents, e)
  201. next++
  202. }
  203. if !found {
  204. m := Member{
  205. ID: types.ID(self),
  206. RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
  207. }
  208. ctx, err := json.Marshal(m)
  209. if err != nil {
  210. log.Panicf("marshal member should never fail: %v", err)
  211. }
  212. cc := &raftpb.ConfChange{
  213. Type: raftpb.ConfChangeAddNode,
  214. NodeID: self,
  215. Context: ctx,
  216. }
  217. e := raftpb.Entry{
  218. Type: raftpb.EntryConfChange,
  219. Data: pbutil.MustMarshal(cc),
  220. Term: term,
  221. Index: next,
  222. }
  223. ents = append(ents, e)
  224. }
  225. return ents
  226. }