| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- // Package raft implements raft.
- package raft
- import "code.google.com/p/go.net/context"
- type stateResp struct {
- st State
- ents, cents []Entry
- msgs []Message
- }
- func (a State) Equal(b State) bool {
- return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
- }
- func (sr stateResp) containsUpdates(prev stateResp) bool {
- return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0
- }
- type Node struct {
- ctx context.Context
- propc chan []byte
- recvc chan Message
- statec chan stateResp
- tickc chan struct{}
- }
- func Start(ctx context.Context, id int64, peers []int64) *Node {
- n := &Node{
- ctx: ctx,
- propc: make(chan []byte),
- recvc: make(chan Message),
- statec: make(chan stateResp),
- tickc: make(chan struct{}),
- }
- r := newRaft(id, peers)
- go n.run(r)
- return n
- }
- func (n *Node) run(r *raft) {
- propc := n.propc
- statec := n.statec
- var prev stateResp
- for {
- if r.hasLeader() {
- propc = n.propc
- } else {
- // We cannot accept proposals because we don't know who
- // to send them to, so we'll apply back-pressure and
- // block senders.
- propc = nil
- }
- sr := stateResp{
- r.State,
- r.raftLog.unstableEnts(),
- r.raftLog.nextEnts(),
- r.msgs,
- }
- if sr.containsUpdates(prev) {
- statec = n.statec
- } else {
- statec = nil
- }
- select {
- case p := <-propc:
- r.propose(p)
- case m := <-n.recvc:
- r.Step(m) // raft never returns an error
- case <-n.tickc:
- // r.tick()
- case statec <- sr:
- r.raftLog.resetNextEnts()
- r.raftLog.resetUnstable()
- r.msgs = nil
- case <-n.ctx.Done():
- return
- }
- }
- }
- func (n *Node) Tick() error {
- select {
- case n.tickc <- struct{}{}:
- return nil
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // Propose proposes data be appended to the log.
- func (n *Node) Propose(ctx context.Context, data []byte) error {
- select {
- case n.propc <- data:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // func (n *Node) SingleFlightPropose(ctx context.Context, id string, data []byte) error {
- // ch := n.getSingleFlightChan(id)
- // select {
- // case ch <- data:
- // return nil
- // case <-ctx.Done():
- // return ctx.Err()
- // case <-n.ctx.Done():
- // return n.ctx.Err()
- // }
- // }
- // Step advances the state machine using m.
- func (n *Node) Step(m Message) error {
- select {
- case n.recvc <- m:
- return nil
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // ReadState returns the current point-in-time state.
- func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) {
- select {
- case sr := <-n.statec:
- return sr.st, sr.ents, sr.cents, sr.msgs, nil
- case <-ctx.Done():
- return State{}, nil, nil, nil, ctx.Err()
- case <-n.ctx.Done():
- return State{}, nil, nil, nil, n.ctx.Err()
- }
- }
|