node.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // Package raft implements raft.
  2. package raft
  3. import (
  4. "sort"
  5. "code.google.com/p/go.net/context"
  6. )
  7. type Ready struct {
  8. // The current state of a Node
  9. State
  10. // Entries specifies entries to be saved to stable storage BEFORE
  11. // Messages are sent.
  12. Entries []Entry
  13. // CommittedEntries specifies entries to be committed to a
  14. // store/state-machine. These have previously been committed to stable
  15. // store.
  16. CommittedEntries []Entry
  17. // Messages specifies outbound messages to be sent AFTER Entries are
  18. // committed to stable storage.
  19. Messages []Message
  20. }
  21. func (a State) Equal(b State) bool {
  22. return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
  23. }
  24. func (rd Ready) containsUpdates(prev Ready) bool {
  25. return !prev.State.Equal(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
  26. }
  27. type Node struct {
  28. ctx context.Context
  29. propc chan Message
  30. recvc chan Message
  31. readyc chan Ready
  32. tickc chan struct{}
  33. }
  34. func Start(ctx context.Context, id int64, peers []int64) Node {
  35. n := Node{
  36. ctx: ctx,
  37. propc: make(chan Message),
  38. recvc: make(chan Message),
  39. readyc: make(chan Ready),
  40. tickc: make(chan struct{}),
  41. }
  42. r := newRaft(id, peers)
  43. go n.run(r)
  44. return n
  45. }
  46. func (n *Node) run(r *raft) {
  47. propc := n.propc
  48. readyc := n.readyc
  49. var prev Ready
  50. for {
  51. if r.hasLeader() {
  52. propc = n.propc
  53. } else {
  54. // We cannot accept proposals because we don't know who
  55. // to send them to, so we'll apply back-pressure and
  56. // block senders.
  57. propc = nil
  58. }
  59. rd := Ready{
  60. r.State,
  61. r.raftLog.unstableEnts(),
  62. r.raftLog.nextEnts(),
  63. r.msgs,
  64. }
  65. if rd.containsUpdates(prev) {
  66. readyc = n.readyc
  67. } else {
  68. readyc = nil
  69. }
  70. select {
  71. case m := <-propc:
  72. m.From = r.id
  73. r.Step(m)
  74. case m := <-n.recvc:
  75. r.Step(m) // raft never returns an error
  76. case <-n.tickc:
  77. // r.tick()
  78. case readyc <- rd:
  79. r.raftLog.resetNextEnts()
  80. r.raftLog.resetUnstable()
  81. r.msgs = nil
  82. case <-n.ctx.Done():
  83. return
  84. }
  85. }
  86. }
  87. func (n *Node) Tick() error {
  88. select {
  89. case n.tickc <- struct{}{}:
  90. return nil
  91. case <-n.ctx.Done():
  92. return n.ctx.Err()
  93. }
  94. }
  95. // Propose proposes data be appended to the log.
  96. func (n *Node) Propose(ctx context.Context, id int64, data []byte) error {
  97. return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}}})
  98. }
  99. // Step advances the state machine using msgs. Proposals are priotized last so
  100. // that any votes and vote requests will not be wedged behind proposals and
  101. // prevent this cluster from making progress. The ctx.Err() will be returned,
  102. // if any.
  103. func (n *Node) Step(ctx context.Context, msgs []Message) error {
  104. sort.Sort(sort.Reverse(byMsgType(msgs)))
  105. for _, m := range msgs {
  106. ch := n.recvc
  107. if m.Type == msgProp {
  108. ch = n.propc
  109. }
  110. select {
  111. case ch <- m:
  112. return nil
  113. case <-ctx.Done():
  114. return ctx.Err()
  115. case <-n.ctx.Done():
  116. return n.ctx.Err()
  117. }
  118. }
  119. return nil
  120. }
  121. // ReadState returns the current point-in-time state.
  122. func (n *Node) Ready() <-chan Ready {
  123. return n.readyc
  124. }
  125. type byMsgType []Message
  126. func (msgs byMsgType) Len() int { return len(msgs) }
  127. func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp }
  128. func (msgs byMsgType) Swap(i, j int) { msgs[i], msgs[j] = msgs[i], msgs[j] }