node.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // Package raft implements raft.
  2. package raft
  3. import "code.google.com/p/go.net/context"
  4. type stateResp struct {
  5. st State
  6. ents, cents []Entry
  7. msgs []Message
  8. }
  9. func (a State) Equal(b State) bool {
  10. return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
  11. }
  12. func (sr stateResp) containsUpdates(prev stateResp) bool {
  13. return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
  14. }
  15. type Node struct {
  16. ctx context.Context
  17. propc chan []byte
  18. recvc chan Message
  19. statec chan stateResp
  20. tickc chan struct{}
  21. }
  22. func Start(ctx context.Context, id int64, peers []int64) *Node {
  23. n := &Node{
  24. ctx: ctx,
  25. propc: make(chan []byte),
  26. recvc: make(chan Message),
  27. statec: make(chan stateResp),
  28. tickc: make(chan struct{}),
  29. }
  30. r := newRaft(id, peers)
  31. go n.run(r)
  32. return n
  33. }
  34. func (n *Node) run(r *raft) {
  35. propc := n.propc
  36. statec := n.statec
  37. var prev stateResp
  38. for {
  39. if r.hasLeader() {
  40. propc = n.propc
  41. } else {
  42. // We cannot accept proposals because we don't know who
  43. // to send them to, so we'll apply back-pressure and
  44. // block senders.
  45. propc = nil
  46. }
  47. sr := stateResp{
  48. r.State,
  49. r.raftLog.unstableEnts(),
  50. r.raftLog.nextEnts(),
  51. r.msgs,
  52. }
  53. if sr.containsUpdates(prev) {
  54. statec = n.statec
  55. } else {
  56. statec = nil
  57. }
  58. select {
  59. case p := <-propc:
  60. r.propose(p)
  61. case m := <-n.recvc:
  62. r.Step(m) // raft never returns an error
  63. case <-n.tickc:
  64. // r.tick()
  65. case statec <- sr:
  66. r.raftLog.resetNextEnts()
  67. r.raftLog.resetUnstable()
  68. r.msgs = nil
  69. case <-n.ctx.Done():
  70. return
  71. }
  72. }
  73. }
  74. func (n *Node) Tick() error {
  75. select {
  76. case n.tickc <- struct{}{}:
  77. return nil
  78. case <-n.ctx.Done():
  79. return n.ctx.Err()
  80. }
  81. }
  82. // Propose proposes data be appended to the log.
  83. func (n *Node) Propose(ctx context.Context, data []byte) error {
  84. select {
  85. case n.propc <- data:
  86. return nil
  87. case <-ctx.Done():
  88. return ctx.Err()
  89. case <-n.ctx.Done():
  90. return n.ctx.Err()
  91. }
  92. }
  93. // func (n *Node) SingleFlightPropose(ctx context.Context, id string, data []byte) error {
  94. // ch := n.getSingleFlightChan(id)
  95. // select {
  96. // case ch <- data:
  97. // return nil
  98. // case <-ctx.Done():
  99. // return ctx.Err()
  100. // case <-n.ctx.Done():
  101. // return n.ctx.Err()
  102. // }
  103. // }
  104. // Step advances the state machine using m.
  105. func (n *Node) Step(m Message) error {
  106. select {
  107. case n.recvc <- m:
  108. return nil
  109. case <-n.ctx.Done():
  110. return n.ctx.Err()
  111. }
  112. }
  113. // ReadState returns the current point-in-time state.
  114. func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
  115. select {
  116. case sr := <-n.statec:
  117. return sr.st, sr.ents, sr.cents, sr.msgs, nil
  118. case <-ctx.Done():
  119. return State{}, nil, nil, nil, ctx.Err()
  120. case <-n.ctx.Done():
  121. return State{}, nil, nil, nil, n.ctx.Err()
  122. }
  123. }