node.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // Package raft implements raft.
  2. package raft
  3. import (
  4. "code.google.com/p/go.net/context"
  5. )
  6. type Ready struct {
  7. // The current state of a Node
  8. State
  9. // Entries specifies entries to be saved to stable storage BEFORE
  10. // Messages are sent.
  11. Entries []Entry
  12. // CommittedEntries specifies entries to be committed to a
  13. // store/state-machine. These have previously been committed to stable
  14. // store.
  15. CommittedEntries []Entry
  16. // Messages specifies outbound messages to be sent AFTER Entries are
  17. // committed to stable storage.
  18. Messages []Message
  19. }
  20. func (a State) Equal(b State) bool {
  21. return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
  22. }
  23. func (rd Ready) containsUpdates(prev Ready) bool {
  24. return !prev.State.Equal(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
  25. }
  26. type Node struct {
  27. ctx context.Context
  28. propc chan Message
  29. recvc chan Message
  30. readyc chan Ready
  31. tickc chan struct{}
  32. }
  33. func Start(ctx context.Context, id int64, peers []int64) Node {
  34. n := Node{
  35. ctx: ctx,
  36. propc: make(chan Message),
  37. recvc: make(chan Message),
  38. readyc: make(chan Ready),
  39. tickc: make(chan struct{}),
  40. }
  41. r := newRaft(id, peers)
  42. go n.run(r)
  43. return n
  44. }
  45. func (n *Node) run(r *raft) {
  46. propc := n.propc
  47. readyc := n.readyc
  48. var prev Ready
  49. for {
  50. if r.hasLeader() {
  51. propc = n.propc
  52. } else {
  53. // We cannot accept proposals because we don't know who
  54. // to send them to, so we'll apply back-pressure and
  55. // block senders.
  56. propc = nil
  57. }
  58. rd := Ready{
  59. r.State,
  60. r.raftLog.unstableEnts(),
  61. r.raftLog.nextEnts(),
  62. r.msgs,
  63. }
  64. if rd.containsUpdates(prev) {
  65. readyc = n.readyc
  66. } else {
  67. readyc = nil
  68. }
  69. select {
  70. case m := <-propc:
  71. m.From = r.id
  72. r.Step(m)
  73. case m := <-n.recvc:
  74. r.Step(m) // raft never returns an error
  75. case <-n.tickc:
  76. // r.tick()
  77. case readyc <- rd:
  78. r.raftLog.resetNextEnts()
  79. r.raftLog.resetUnstable()
  80. r.msgs = nil
  81. case <-n.ctx.Done():
  82. return
  83. }
  84. }
  85. }
  86. func (n *Node) Tick() error {
  87. select {
  88. case n.tickc <- struct{}{}:
  89. return nil
  90. case <-n.ctx.Done():
  91. return n.ctx.Err()
  92. }
  93. }
  94. // Propose proposes data be appended to the log.
  95. func (n *Node) Propose(ctx context.Context, id int64, data []byte) error {
  96. return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}})
  97. }
  98. // Step advances the state machine using msgs. The ctx.Err() will be returned,
  99. // if any.
  100. func (n *Node) Step(ctx context.Context, m Message) error {
  101. ch := n.recvc
  102. if m.Type == msgProp {
  103. ch = n.propc
  104. }
  105. select {
  106. case ch <- m:
  107. return nil
  108. case <-ctx.Done():
  109. return ctx.Err()
  110. case <-n.ctx.Done():
  111. return n.ctx.Err()
  112. }
  113. }
  114. // ReadState returns the current point-in-time state.
  115. func (n *Node) Ready() <-chan Ready {
  116. return n.readyc
  117. }
  118. type byMsgType []Message
  119. func (msgs byMsgType) Len() int { return len(msgs) }
  120. func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp }
  121. func (msgs byMsgType) Swap(i, j int) { msgs[i], msgs[j] = msgs[i], msgs[j] }