| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- // Package raft implements raft.
- package raft
- import "code.google.com/p/go.net/context"
- type stateResp struct {
- state State
- ents []Entry
- msgs []Message
- }
- type proposal struct {
- id int64
- data []byte
- }
- type Node struct {
- ctx context.Context
- propc chan proposal
- recvc chan Message
- statec chan stateResp
- tickc chan struct{}
- }
- func Start(ctx context.Context, name string, election, heartbeat int) *Node {
- n := &Node{
- ctx: ctx,
- propc: make(chan proposal),
- recvc: make(chan Message),
- statec: make(chan stateResp),
- tickc: make(chan struct{}),
- }
- r := &raft{
- name: name,
- election: election,
- heartbeat: heartbeat,
- }
- go n.run(r)
- return n
- }
- func (n *Node) run(r *raft) {
- propc := n.propc
- for {
- if r.hasLeader() {
- propc = n.propc
- } else {
- // We cannot accept proposals because we don't know who
- // to send them to, so we'll apply back-pressure and
- // block senders.
- propc = nil
- }
- select {
- case p := <-propc:
- r.propose(p.id, p.data)
- case m := <-n.recvc:
- r.step(m)
- case <-n.tickc:
- r.tick()
- case n.statec <- stateResp{r.State, r.ents, r.msgs}:
- r.resetState()
- case <-n.ctx.Done():
- return
- }
- }
- }
- func (n *Node) Tick() error {
- select {
- case n.tickc <- struct{}{}:
- return nil
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // Propose proposes data be appended to the log.
- func (n *Node) Propose(id int64, data []byte) error {
- select {
- case n.propc <- proposal{id, data}:
- return nil
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // Step advances the state machine using m.
- func (n *Node) Step(m Message) error {
- select {
- case n.recvc <- m:
- return nil
- case <-n.ctx.Done():
- return n.ctx.Err()
- }
- }
- // ReadState returns the current point-in-time state.
- func (n *Node) ReadState() (State, []Entry, []Message, error) {
- select {
- case sr := <-n.statec:
- return sr.state, sr.ents, sr.msgs, nil
- case <-n.ctx.Done():
- return State{}, nil, nil, n.ctx.Err()
- }
- }
|