node.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package raft
  2. import (
  3. "errors"
  4. "log"
  5. "reflect"
  6. pb "github.com/coreos/etcd/raft/raftpb"
  7. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  8. )
  9. var (
  10. emptyState = pb.HardState{}
  11. // ErrStopped is returned by methods on Nodes that have been stopped.
  12. ErrStopped = errors.New("raft: stopped")
  13. )
  14. // SoftState provides state that is useful for logging and debugging.
  15. // The state is volatile and does not need to be persisted to the WAL.
  16. type SoftState struct {
  17. Lead uint64
  18. RaftState StateType
  19. Nodes []uint64
  20. ShouldStop bool
  21. }
  22. func (a *SoftState) equal(b *SoftState) bool {
  23. return reflect.DeepEqual(a, b)
  24. }
  25. // Ready encapsulates the entries and messages that are ready to read,
  26. // be saved to stable storage, committed or sent to other peers.
  27. // All fields in Ready are read-only.
  28. type Ready struct {
  29. // The current volatile state of a Node.
  30. // SoftState will be nil if there is no update.
  31. // It is not required to consume or store SoftState.
  32. *SoftState
  33. // The current state of a Node to be saved to stable storage BEFORE
  34. // Messages are sent.
  35. // HardState will be equal to empty state if there is no update.
  36. pb.HardState
  37. // Entries specifies entries to be saved to stable storage BEFORE
  38. // Messages are sent.
  39. Entries []pb.Entry
  40. // Snapshot specifies the snapshot to be saved to stable storage.
  41. Snapshot pb.Snapshot
  42. // CommittedEntries specifies entries to be committed to a
  43. // store/state-machine. These have previously been committed to stable
  44. // store.
  45. CommittedEntries []pb.Entry
  46. // Messages specifies outbound messages to be sent AFTER Entries are
  47. // committed to stable storage.
  48. Messages []pb.Message
  49. }
  50. type compact struct {
  51. index uint64
  52. nodes []uint64
  53. data []byte
  54. }
  55. func isHardStateEqual(a, b pb.HardState) bool {
  56. return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
  57. }
  58. // IsEmptyHardState returns true if the given HardState is empty.
  59. func IsEmptyHardState(st pb.HardState) bool {
  60. return isHardStateEqual(st, emptyState)
  61. }
  62. // IsEmptySnap returns true if the given Snapshot is empty.
  63. func IsEmptySnap(sp pb.Snapshot) bool {
  64. return sp.Index == 0
  65. }
  66. func (rd Ready) containsUpdates() bool {
  67. return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) ||
  68. len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
  69. }
  70. // Node represents a node in a raft cluster.
  71. type Node interface {
  72. // Tick increments the internal logical clock for the Node by a single tick. Election
  73. // timeouts and heartbeat timeouts are in units of ticks.
  74. Tick()
  75. // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
  76. Campaign(ctx context.Context) error
  77. // Propose proposes that data be appended to the log.
  78. Propose(ctx context.Context, data []byte) error
  79. // ProposeConfChange proposes config change.
  80. // At most one ConfChange can be in the process of going through consensus.
  81. // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
  82. ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
  83. // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
  84. Step(ctx context.Context, msg pb.Message) error
  85. // Ready returns a channel that returns the current point-in-time state
  86. Ready() <-chan Ready
  87. // ApplyConfChange applies config change to the local node.
  88. // TODO: reject existing node when add node
  89. // TODO: reject non-existant node when remove node
  90. ApplyConfChange(cc pb.ConfChange)
  91. // Stop performs any necessary termination of the Node
  92. Stop()
  93. // Compact discards the entrire log up to the given index. It also
  94. // generates a raft snapshot containing the given nodes configuration
  95. // and the given snapshot data.
  96. // It is the caller's responsibility to ensure the given configuration
  97. // and snapshot data match the actual point-in-time configuration and snapshot
  98. // at the given index.
  99. Compact(index uint64, nodes []uint64, d []byte)
  100. }
  101. type Peer struct {
  102. ID uint64
  103. Context []byte
  104. }
  105. // StartNode returns a new Node given a unique raft id, a list of raft peers, and
  106. // the election and heartbeat timeouts in units of ticks.
  107. // It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
  108. func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
  109. n := newNode()
  110. r := newRaft(id, nil, election, heartbeat)
  111. ents := make([]pb.Entry, len(peers))
  112. for i, peer := range peers {
  113. cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
  114. data, err := cc.Marshal()
  115. if err != nil {
  116. panic("unexpected marshal error")
  117. }
  118. ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
  119. }
  120. r.raftLog.append(0, ents...)
  121. r.raftLog.committed = uint64(len(ents))
  122. go n.run(r)
  123. return &n
  124. }
  125. // RestartNode is identical to StartNode but takes an initial State and a slice
  126. // of entries. Generally this is used when restarting from a stable storage
  127. // log.
  128. func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
  129. n := newNode()
  130. r := newRaft(id, nil, election, heartbeat)
  131. if snapshot != nil {
  132. r.restore(*snapshot)
  133. }
  134. r.loadState(st)
  135. r.loadEnts(ents)
  136. go n.run(r)
  137. return &n
  138. }
  139. // node is the canonical implementation of the Node interface
  140. type node struct {
  141. propc chan pb.Message
  142. recvc chan pb.Message
  143. compactc chan compact
  144. confc chan pb.ConfChange
  145. readyc chan Ready
  146. tickc chan struct{}
  147. done chan struct{}
  148. }
  149. func newNode() node {
  150. return node{
  151. propc: make(chan pb.Message),
  152. recvc: make(chan pb.Message),
  153. compactc: make(chan compact),
  154. confc: make(chan pb.ConfChange),
  155. readyc: make(chan Ready),
  156. tickc: make(chan struct{}),
  157. done: make(chan struct{}),
  158. }
  159. }
  160. func (n *node) Stop() {
  161. close(n.done)
  162. }
  163. func (n *node) run(r *raft) {
  164. var propc chan pb.Message
  165. var readyc chan Ready
  166. lead := None
  167. prevSoftSt := r.softState()
  168. prevHardSt := r.HardState
  169. prevSnapi := r.raftLog.snapshot.Index
  170. for {
  171. rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
  172. if rd.containsUpdates() {
  173. readyc = n.readyc
  174. } else {
  175. readyc = nil
  176. }
  177. if rd.SoftState != nil && lead != rd.SoftState.Lead {
  178. log.Printf("raft: leader changed from %#x to %#x", lead, rd.SoftState.Lead)
  179. lead = rd.SoftState.Lead
  180. if r.hasLeader() {
  181. propc = n.propc
  182. } else {
  183. propc = nil
  184. }
  185. }
  186. select {
  187. // TODO: maybe buffer the config propose if there exists one (the way
  188. // described in raft dissertation)
  189. // Currently it is dropped in Step silently.
  190. case m := <-propc:
  191. m.From = r.id
  192. r.Step(m)
  193. case m := <-n.recvc:
  194. r.Step(m) // raft never returns an error
  195. case c := <-n.compactc:
  196. r.compact(c.index, c.nodes, c.data)
  197. case cc := <-n.confc:
  198. switch cc.Type {
  199. case pb.ConfChangeAddNode:
  200. r.addNode(cc.NodeID)
  201. case pb.ConfChangeRemoveNode:
  202. r.removeNode(cc.NodeID)
  203. default:
  204. panic("unexpected conf type")
  205. }
  206. case <-n.tickc:
  207. r.tick()
  208. case readyc <- rd:
  209. if rd.SoftState != nil {
  210. prevSoftSt = rd.SoftState
  211. }
  212. if !IsEmptyHardState(rd.HardState) {
  213. prevHardSt = rd.HardState
  214. }
  215. if !IsEmptySnap(rd.Snapshot) {
  216. prevSnapi = rd.Snapshot.Index
  217. }
  218. // TODO(yichengq): we assume that all committed config
  219. // entries will be applied to make things easy for now.
  220. // TODO(yichengq): it may have race because applied is set
  221. // before entries are applied.
  222. r.raftLog.resetNextEnts()
  223. r.raftLog.resetUnstable()
  224. r.msgs = nil
  225. case <-n.done:
  226. return
  227. }
  228. }
  229. }
  230. // Tick increments the internal logical clock for this Node. Election timeouts
  231. // and heartbeat timeouts are in units of ticks.
  232. func (n *node) Tick() {
  233. select {
  234. case n.tickc <- struct{}{}:
  235. case <-n.done:
  236. }
  237. }
  238. func (n *node) Campaign(ctx context.Context) error {
  239. return n.step(ctx, pb.Message{Type: msgHup})
  240. }
  241. func (n *node) Propose(ctx context.Context, data []byte) error {
  242. return n.step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
  243. }
  244. func (n *node) Step(ctx context.Context, m pb.Message) error {
  245. // ignore unexpected local messages receiving over network
  246. if m.Type == msgHup || m.Type == msgBeat {
  247. // TODO: return an error?
  248. return nil
  249. }
  250. return n.step(ctx, m)
  251. }
  252. func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
  253. data, err := cc.Marshal()
  254. if err != nil {
  255. return err
  256. }
  257. return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
  258. }
  259. // Step advances the state machine using msgs. The ctx.Err() will be returned,
  260. // if any.
  261. func (n *node) step(ctx context.Context, m pb.Message) error {
  262. ch := n.recvc
  263. if m.Type == msgProp {
  264. ch = n.propc
  265. }
  266. select {
  267. case ch <- m:
  268. return nil
  269. case <-ctx.Done():
  270. return ctx.Err()
  271. case <-n.done:
  272. return ErrStopped
  273. }
  274. }
  275. func (n *node) Ready() <-chan Ready {
  276. return n.readyc
  277. }
  278. func (n *node) ApplyConfChange(cc pb.ConfChange) {
  279. select {
  280. case n.confc <- cc:
  281. case <-n.done:
  282. }
  283. }
  284. func (n *node) Compact(index uint64, nodes []uint64, d []byte) {
  285. select {
  286. case n.compactc <- compact{index, nodes, d}:
  287. case <-n.done:
  288. }
  289. }
  290. func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi uint64) Ready {
  291. rd := Ready{
  292. Entries: r.raftLog.unstableEnts(),
  293. CommittedEntries: r.raftLog.nextEnts(),
  294. Messages: r.msgs,
  295. }
  296. if softSt := r.softState(); !softSt.equal(prevSoftSt) {
  297. rd.SoftState = softSt
  298. }
  299. if !isHardStateEqual(r.HardState, prevHardSt) {
  300. rd.HardState = r.HardState
  301. }
  302. if prevSnapi != r.raftLog.snapshot.Index {
  303. rd.Snapshot = r.raftLog.snapshot
  304. }
  305. return rd
  306. }