node.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package raft
  2. type Interface interface {
  3. Step(m Message)
  4. }
  5. type tick int
  6. type Node struct {
  7. // election timeout and heartbeat timeout in tick
  8. election tick
  9. heartbeat tick
  10. // elapsed ticks after the last reset
  11. elapsed tick
  12. sm *stateMachine
  13. next Interface
  14. }
  15. func New(k, addr int, heartbeat, election tick, next Interface) *Node {
  16. if election < heartbeat*3 {
  17. panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
  18. }
  19. n := &Node{
  20. sm: newStateMachine(k, addr),
  21. next: next,
  22. heartbeat: heartbeat,
  23. election: election,
  24. }
  25. return n
  26. }
  27. // Propose asynchronously proposes data be applied to the underlying state machine.
  28. func (n *Node) Propose(data []byte) {
  29. m := Message{Type: msgProp, Data: data}
  30. n.Step(m)
  31. }
  32. func (n *Node) Step(m Message) {
  33. n.sm.Step(m)
  34. ms := n.sm.Msgs()
  35. for _, m := range ms {
  36. // reset elapsed in two cases:
  37. // msgAppResp -> heard from the leader of the same term
  38. // msgVoteResp with grant -> heard from the candidate the node voted for
  39. switch m.Type {
  40. case msgAppResp:
  41. n.elapsed = 0
  42. case msgVoteResp:
  43. if m.Index >= 0 {
  44. n.elapsed = 0
  45. }
  46. }
  47. n.next.Step(m)
  48. }
  49. }
  50. // Next advances the commit index and returns any new
  51. // commitable entries.
  52. func (n *Node) Next() []Entry {
  53. return n.sm.nextEnts()
  54. }
  55. // Tick triggers the node to do a tick.
  56. // If the current elapsed is greater or equal than the timeout,
  57. // node will send corresponding message to the statemachine.
  58. func (n *Node) Tick() {
  59. timeout, msgType := n.election, msgHup
  60. if n.sm.state == stateLeader {
  61. timeout, msgType = n.heartbeat, msgBeat
  62. }
  63. if n.elapsed >= timeout {
  64. n.Step(Message{Type: msgType})
  65. n.elapsed = 0
  66. } else {
  67. n.elapsed++
  68. }
  69. }