node.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Package raft implements raft.
  2. package raft
  3. import "code.google.com/p/go.net/context"
  4. type stateResp struct {
  5. state State
  6. ents, cents []Entry
  7. msgs []Message
  8. }
  9. func (sr stateResp) containsUpdates(prev stateResp) bool {
  10. return !prev.state.Equal(sr.state) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
  11. }
  12. type Node struct {
  13. ctx context.Context
  14. propc chan []byte
  15. recvc chan Message
  16. statec chan stateResp
  17. tickc chan struct{}
  18. }
  19. func Start(ctx context.Context, name string, election, heartbeat int) *Node {
  20. n := &Node{
  21. ctx: ctx,
  22. propc: make(chan []byte),
  23. recvc: make(chan Message),
  24. statec: make(chan stateResp),
  25. tickc: make(chan struct{}),
  26. }
  27. r := &raft{
  28. name: name,
  29. election: election,
  30. heartbeat: heartbeat,
  31. }
  32. go n.run(r)
  33. return n
  34. }
  35. func (n *Node) run(r *raft) {
  36. propc := n.propc
  37. statec := n.statec
  38. var prev stateResp
  39. for {
  40. if r.hasLeader() {
  41. propc = n.propc
  42. } else {
  43. // We cannot accept proposals because we don't know who
  44. // to send them to, so we'll apply back-pressure and
  45. // block senders.
  46. propc = nil
  47. }
  48. sr := stateResp{
  49. r.State,
  50. r.raftLog.unstableEnts(),
  51. r.raftLog.nextEnts(),
  52. r.msgs,
  53. }
  54. if sr.containsUpdates(prev) {
  55. statec = n.statec
  56. } else {
  57. statec = nil
  58. }
  59. select {
  60. case p := <-propc:
  61. r.propose(p)
  62. case m := <-n.recvc:
  63. r.Step(m) // raft never returns an error
  64. case <-n.tickc:
  65. // r.tick()
  66. case statec <- sr:
  67. r.raftLog.resetNextEnts()
  68. r.raftLog.resetUnstable()
  69. r.msgs = nil
  70. case <-n.ctx.Done():
  71. return
  72. }
  73. }
  74. }
  75. func (n *Node) Tick() error {
  76. select {
  77. case n.tickc <- struct{}{}:
  78. return nil
  79. case <-n.ctx.Done():
  80. return n.ctx.Err()
  81. }
  82. }
  83. // Propose proposes data be appended to the log.
  84. func (n *Node) Propose(data []byte) error {
  85. select {
  86. case n.propc <- data:
  87. return nil
  88. case <-n.ctx.Done():
  89. return n.ctx.Err()
  90. }
  91. }
  92. // Step advances the state machine using m.
  93. func (n *Node) Step(m Message) error {
  94. select {
  95. case n.recvc <- m:
  96. return nil
  97. case <-n.ctx.Done():
  98. return n.ctx.Err()
  99. }
  100. }
  101. // ReadState returns the current point-in-time state.
  102. func (n *Node) ReadState() (st State, ents, cents []Entry, msgs []Message, err error) {
  103. select {
  104. case sr := <-n.statec:
  105. return sr.state, sr.ents, sr.cents, sr.msgs, nil
  106. case <-n.ctx.Done():
  107. return State{}, nil, nil, nil, n.ctx.Err()
  108. }
  109. }