node.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package raft
  2. import (
  3. "errors"
  4. "log"
  5. pb "github.com/coreos/etcd/raft/raftpb"
  6. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  7. )
  8. var (
  9. emptyState = pb.HardState{}
  10. ErrStopped = errors.New("raft: stopped")
  11. )
  12. // SoftState provides state that is useful for logging and debugging.
  13. // The state is volatile and does not need to be persisted to the WAL.
  14. type SoftState struct {
  15. Lead int64
  16. RaftState StateType
  17. }
  18. func (a *SoftState) equal(b *SoftState) bool {
  19. return a.Lead == b.Lead && a.RaftState == b.RaftState
  20. }
  21. // Ready encapsulates the entries and messages that are ready to read,
  22. // be saved to stable storage, committed or sent to other peers.
  23. // All fields in Ready are read-only.
  24. type Ready struct {
  25. // The current volatile state of a Node.
  26. // SoftState will be nil if there is no update.
  27. // It is not required to consume or store SoftState.
  28. *SoftState
  29. // The current state of a Node to be saved to stable storage BEFORE
  30. // Messages are sent.
  31. // HardState will be equal to empty state if there is no update.
  32. pb.HardState
  33. // Entries specifies entries to be saved to stable storage BEFORE
  34. // Messages are sent.
  35. Entries []pb.Entry
  36. // Snapshot specifies the snapshot to be saved to stable storage.
  37. Snapshot pb.Snapshot
  38. // CommittedEntries specifies entries to be committed to a
  39. // store/state-machine. These have previously been committed to stable
  40. // store.
  41. CommittedEntries []pb.Entry
  42. // Messages specifies outbound messages to be sent AFTER Entries are
  43. // committed to stable storage.
  44. Messages []pb.Message
  45. }
  46. func isHardStateEqual(a, b pb.HardState) bool {
  47. return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
  48. }
  49. func IsEmptyHardState(st pb.HardState) bool {
  50. return isHardStateEqual(st, emptyState)
  51. }
  52. func IsEmptySnap(sp pb.Snapshot) bool {
  53. return sp.Index == 0
  54. }
  55. func (rd Ready) containsUpdates() bool {
  56. return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) ||
  57. len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
  58. }
  59. type Node interface {
  60. // Tick increments the internal logical clock for the Node by a single tick. Election
  61. // timeouts and heartbeat timeouts are in units of ticks.
  62. Tick()
  63. // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
  64. Campaign(ctx context.Context) error
  65. // Propose proposes that data be appended to the log.
  66. Propose(ctx context.Context, data []byte) error
  67. // Configure proposes config change. At most one config can be in the process of going through consensus.
  68. // Application needs to call AddNode/RemoveNode when applying EntryConfig type entry.
  69. Configure(ctx context.Context, conf pb.Config) error
  70. // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
  71. Step(ctx context.Context, msg pb.Message) error
  72. // Ready returns a channel that returns the current point-in-time state
  73. Ready() <-chan Ready
  74. // Stop performs any necessary termination of the Node
  75. Stop()
  76. // Compact
  77. Compact(d []byte)
  78. // AddNode adds a node with given id into peer list.
  79. // TODO: reject existed node
  80. AddNode(id int64)
  81. // RemoveNode removes a node with give id from peer list.
  82. // TODO: reject unexisted node
  83. RemoveNode(id int64)
  84. }
  85. // StartNode returns a new Node given a unique raft id, a list of raft peers, and
  86. // the election and heartbeat timeouts in units of ticks.
  87. func StartNode(id int64, peers []int64, election, heartbeat int) Node {
  88. n := newNode()
  89. r := newRaft(id, peers, election, heartbeat)
  90. go n.run(r)
  91. return &n
  92. }
  93. // RestartNode is identical to StartNode but takes an initial State and a slice
  94. // of entries. Generally this is used when restarting from a stable storage
  95. // log.
  96. func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
  97. n := newNode()
  98. r := newRaft(id, peers, election, heartbeat)
  99. if snapshot != nil {
  100. r.restore(*snapshot)
  101. }
  102. r.loadState(st)
  103. r.loadEnts(ents)
  104. go n.run(r)
  105. return &n
  106. }
  107. const (
  108. confAdd = iota
  109. confRemove
  110. )
  111. type conf struct {
  112. typ int
  113. id int64
  114. }
  115. // node is the canonical implementation of the Node interface
  116. type node struct {
  117. propc chan pb.Message
  118. recvc chan pb.Message
  119. compactc chan []byte
  120. confc chan conf
  121. readyc chan Ready
  122. tickc chan struct{}
  123. done chan struct{}
  124. }
  125. func newNode() node {
  126. return node{
  127. propc: make(chan pb.Message),
  128. recvc: make(chan pb.Message),
  129. compactc: make(chan []byte),
  130. confc: make(chan conf),
  131. readyc: make(chan Ready),
  132. tickc: make(chan struct{}),
  133. done: make(chan struct{}),
  134. }
  135. }
  136. func (n *node) Stop() {
  137. close(n.done)
  138. }
  139. func (n *node) run(r *raft) {
  140. var propc chan pb.Message
  141. var readyc chan Ready
  142. lead := None
  143. prevSoftSt := r.softState()
  144. prevHardSt := r.HardState
  145. prevSnapi := r.raftLog.snapshot.Index
  146. for {
  147. rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
  148. if rd.containsUpdates() {
  149. readyc = n.readyc
  150. } else {
  151. readyc = nil
  152. }
  153. if rd.SoftState != nil && lead != rd.SoftState.Lead {
  154. log.Printf("raft: leader changed from %#x to %#x", lead, rd.SoftState.Lead)
  155. lead = rd.SoftState.Lead
  156. if r.hasLeader() {
  157. propc = n.propc
  158. } else {
  159. propc = nil
  160. }
  161. }
  162. select {
  163. // TODO: buffer the config propose if there exists one
  164. case m := <-propc:
  165. m.From = r.id
  166. r.Step(m)
  167. case m := <-n.recvc:
  168. r.Step(m) // raft never returns an error
  169. case d := <-n.compactc:
  170. r.compact(d)
  171. case c := <-n.confc:
  172. switch c.typ {
  173. case confAdd:
  174. r.addNode(c.id)
  175. case confRemove:
  176. r.removeNode(c.id)
  177. default:
  178. panic("unexpected conf type")
  179. }
  180. case <-n.tickc:
  181. r.tick()
  182. case readyc <- rd:
  183. if rd.SoftState != nil {
  184. prevSoftSt = rd.SoftState
  185. }
  186. if !IsEmptyHardState(rd.HardState) {
  187. prevHardSt = rd.HardState
  188. }
  189. if !IsEmptySnap(rd.Snapshot) {
  190. prevSnapi = rd.Snapshot.Index
  191. }
  192. // TODO(yichengq): we assume that all committed config
  193. // entries will be applied to make things easy for now.
  194. // TODO(yichengq): it may have race because applied is set
  195. // before entries are applied.
  196. r.raftLog.resetNextEnts()
  197. r.raftLog.resetUnstable()
  198. r.msgs = nil
  199. case <-n.done:
  200. return
  201. }
  202. }
  203. }
  204. // Tick increments the internal logical clock for this Node. Election timeouts
  205. // and heartbeat timeouts are in units of ticks.
  206. func (n *node) Tick() {
  207. select {
  208. case n.tickc <- struct{}{}:
  209. case <-n.done:
  210. }
  211. }
  212. func (n *node) Campaign(ctx context.Context) error {
  213. return n.Step(ctx, pb.Message{Type: msgHup})
  214. }
  215. func (n *node) Propose(ctx context.Context, data []byte) error {
  216. return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
  217. }
  218. func (n *node) Configure(ctx context.Context, conf pb.Config) error {
  219. data, err := conf.Marshal()
  220. if err != nil {
  221. return err
  222. }
  223. return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}})
  224. }
  225. // Step advances the state machine using msgs. The ctx.Err() will be returned,
  226. // if any.
  227. func (n *node) Step(ctx context.Context, m pb.Message) error {
  228. ch := n.recvc
  229. if m.Type == msgProp {
  230. ch = n.propc
  231. }
  232. select {
  233. case ch <- m:
  234. return nil
  235. case <-ctx.Done():
  236. return ctx.Err()
  237. case <-n.done:
  238. return ErrStopped
  239. }
  240. }
  241. func (n *node) Ready() <-chan Ready {
  242. return n.readyc
  243. }
  244. func (n *node) Compact(d []byte) {
  245. select {
  246. case n.compactc <- d:
  247. case <-n.done:
  248. }
  249. }
  250. func (n *node) AddNode(id int64) {
  251. select {
  252. case n.confc <- conf{typ: confAdd, id: id}:
  253. case <-n.done:
  254. }
  255. }
  256. func (n *node) RemoveNode(id int64) {
  257. select {
  258. case n.confc <- conf{typ: confRemove, id: id}:
  259. case <-n.done:
  260. }
  261. }
  262. func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
  263. rd := Ready{
  264. Entries: r.raftLog.unstableEnts(),
  265. CommittedEntries: r.raftLog.nextEnts(),
  266. Messages: r.msgs,
  267. }
  268. if softSt := r.softState(); !softSt.equal(prevSoftSt) {
  269. rd.SoftState = softSt
  270. }
  271. if !isHardStateEqual(r.HardState, prevHardSt) {
  272. rd.HardState = r.HardState
  273. }
  274. if prevSnapi != r.raftLog.snapshot.Index {
  275. rd.Snapshot = r.raftLog.snapshot
  276. }
  277. return rd
  278. }