node.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package raft
  2. type Interface interface {
  3. Step(m Message)
  4. Msgs() []Message
  5. }
  6. type tick int
  7. type Node struct {
  8. // election timeout and heartbeat timeout in tick
  9. election tick
  10. heartbeat tick
  11. // elapsed ticks after the last reset
  12. elapsed tick
  13. sm *stateMachine
  14. }
  15. func New(addr int, peer []int, heartbeat, election tick) *Node {
  16. if election < heartbeat*3 {
  17. panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
  18. }
  19. n := &Node{
  20. sm: newStateMachine(addr, peer),
  21. heartbeat: heartbeat,
  22. election: election,
  23. }
  24. return n
  25. }
  26. // Propose asynchronously proposes data be applied to the underlying state machine.
  27. func (n *Node) Propose(data []byte) {
  28. m := Message{Type: msgProp, Entries: []Entry{{Data: data}}}
  29. n.Step(m)
  30. }
  31. func (n *Node) Msgs() []Message {
  32. return n.sm.Msgs()
  33. }
  34. func (n *Node) Step(m Message) {
  35. l := len(n.sm.msgs)
  36. n.sm.Step(m)
  37. for _, m := range n.sm.msgs[l:] {
  38. // reset elapsed in two cases:
  39. // msgAppResp -> heard from the leader of the same term
  40. // msgVoteResp with grant -> heard from the candidate the node voted for
  41. switch m.Type {
  42. case msgAppResp:
  43. n.elapsed = 0
  44. case msgVoteResp:
  45. if m.Index >= 0 {
  46. n.elapsed = 0
  47. }
  48. }
  49. }
  50. }
  51. // Next advances the commit index and returns any new
  52. // commitable entries.
  53. func (n *Node) Next() []Entry {
  54. return n.sm.nextEnts()
  55. }
  56. // Tick triggers the node to do a tick.
  57. // If the current elapsed is greater or equal than the timeout,
  58. // node will send corresponding message to the statemachine.
  59. func (n *Node) Tick() {
  60. timeout, msgType := n.election, msgHup
  61. if n.sm.state == stateLeader {
  62. timeout, msgType = n.heartbeat, msgBeat
  63. }
  64. if n.elapsed >= timeout {
  65. n.Step(Message{Type: msgType})
  66. n.elapsed = 0
  67. } else {
  68. n.elapsed++
  69. }
  70. }