node.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package raft
  2. import (
  3. "encoding/json"
  4. golog "log"
  5. "sync/atomic"
  6. )
  7. type Interface interface {
  8. Step(m Message) bool
  9. Msgs() []Message
  10. }
  11. type tick int64
  12. type Config struct {
  13. NodeId int64
  14. Addr string
  15. Context []byte
  16. }
  17. type Node struct {
  18. sm *stateMachine
  19. elapsed tick
  20. election tick
  21. heartbeat tick
  22. // TODO: it needs garbage collection later
  23. rmNodes map[int64]struct{}
  24. removed bool
  25. }
  26. func New(id int64, heartbeat, election tick) *Node {
  27. if election < heartbeat*3 {
  28. panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
  29. }
  30. n := &Node{
  31. heartbeat: heartbeat,
  32. election: election,
  33. sm: newStateMachine(id, []int64{id}),
  34. rmNodes: make(map[int64]struct{}),
  35. }
  36. return n
  37. }
  38. func (n *Node) Id() int64 {
  39. return atomic.LoadInt64(&n.sm.id)
  40. }
  41. func (n *Node) Index() int64 { return n.sm.index.Get() }
  42. func (n *Node) Term() int64 { return n.sm.term.Get() }
  43. func (n *Node) Applied() int64 { return n.sm.log.applied }
  44. func (n *Node) HasLeader() bool { return n.Leader() != none }
  45. func (n *Node) IsLeader() bool { return n.Leader() == n.Id() }
  46. func (n *Node) Leader() int64 { return n.sm.lead.Get() }
  47. func (n *Node) IsRemoved() bool { return n.removed }
  48. // Propose asynchronously proposes data be applied to the underlying state machine.
  49. func (n *Node) Propose(data []byte) { n.propose(Normal, data) }
  50. func (n *Node) propose(t int64, data []byte) {
  51. n.Step(Message{Type: msgProp, Entries: []Entry{{Type: t, Data: data}}})
  52. }
  53. func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) }
  54. func (n *Node) Add(id int64, addr string, context []byte) {
  55. n.updateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
  56. }
  57. func (n *Node) Remove(id int64) { n.updateConf(RemoveNode, &Config{NodeId: id}) }
  58. func (n *Node) Msgs() []Message { return n.sm.Msgs() }
  59. func (n *Node) Step(m Message) bool {
  60. if m.Type == msgDenied {
  61. n.removed = true
  62. return false
  63. }
  64. if m.Term != 0 {
  65. if _, ok := n.rmNodes[m.From]; ok {
  66. n.sm.send(Message{To: m.From, Type: msgDenied})
  67. return true
  68. }
  69. }
  70. l := len(n.sm.msgs)
  71. if !n.sm.Step(m) {
  72. return false
  73. }
  74. for _, m := range n.sm.msgs[l:] {
  75. switch m.Type {
  76. case msgAppResp:
  77. // We just heard from the leader of the same term.
  78. n.elapsed = 0
  79. case msgVoteResp:
  80. // We just heard from the candidate the node voted for.
  81. if m.Index >= 0 {
  82. n.elapsed = 0
  83. }
  84. }
  85. }
  86. return true
  87. }
  88. // Next returns all the appliable entries
  89. func (n *Node) Next() []Entry {
  90. ents := n.sm.nextEnts()
  91. for i := range ents {
  92. switch ents[i].Type {
  93. case Normal:
  94. case AddNode:
  95. c := new(Config)
  96. if err := json.Unmarshal(ents[i].Data, c); err != nil {
  97. golog.Println(err)
  98. continue
  99. }
  100. n.sm.addNode(c.NodeId)
  101. delete(n.rmNodes, c.NodeId)
  102. case RemoveNode:
  103. c := new(Config)
  104. if err := json.Unmarshal(ents[i].Data, c); err != nil {
  105. golog.Println(err)
  106. continue
  107. }
  108. n.sm.removeNode(c.NodeId)
  109. n.rmNodes[c.NodeId] = struct{}{}
  110. if c.NodeId == n.sm.id {
  111. n.removed = true
  112. }
  113. default:
  114. panic("unexpected entry type")
  115. }
  116. }
  117. return ents
  118. }
  119. // Tick triggers the node to do a tick.
  120. // If the current elapsed is greater or equal than the timeout,
  121. // node will send corresponding message to the statemachine.
  122. func (n *Node) Tick() {
  123. if !n.sm.promotable() {
  124. return
  125. }
  126. timeout, msgType := n.election, msgHup
  127. if n.sm.state == stateLeader {
  128. timeout, msgType = n.heartbeat, msgBeat
  129. }
  130. if n.elapsed >= timeout {
  131. n.Step(Message{Type: msgType})
  132. n.elapsed = 0
  133. } else {
  134. n.elapsed++
  135. }
  136. }
  137. func (n *Node) updateConf(t int64, c *Config) {
  138. data, err := json.Marshal(c)
  139. if err != nil {
  140. panic(err)
  141. }
  142. n.propose(t, data)
  143. }