Browse Source

raft: moved into new raft

Blake Mizerany 11 years ago
parent
commit
0453d09af6
12 changed files with 634 additions and 661 deletions
  1. 0 593
      raft/raft.go
  2. 0 0
      raft2/diff_test.go
  3. 0 0
      raft2/entry.pb.go
  4. 0 0
      raft2/info.pb.go
  5. 0 0
      raft2/log.go
  6. 0 0
      raft2/log_test.go
  7. 9 14
      raft2/node.go
  8. 586 18
      raft2/raft.go
  9. 39 36
      raft2/raft_test.go
  10. 0 0
      raft2/snapshot.go
  11. 0 0
      raft2/state.pb.go
  12. 0 0
      raft2/state.proto

+ 0 - 593
raft/raft.go

@@ -1,593 +0,0 @@
-package raft
-
-import (
-	"errors"
-	"fmt"
-	"sort"
-	"sync/atomic"
-)
-
-const none = -1
-
-type messageType int64
-
-const (
-	msgHup messageType = iota
-	msgBeat
-	msgProp
-	msgApp
-	msgAppResp
-	msgVote
-	msgVoteResp
-	msgSnap
-	msgDenied
-)
-
-var mtmap = [...]string{
-	msgHup:      "msgHup",
-	msgBeat:     "msgBeat",
-	msgProp:     "msgProp",
-	msgApp:      "msgApp",
-	msgAppResp:  "msgAppResp",
-	msgVote:     "msgVote",
-	msgVoteResp: "msgVoteResp",
-	msgSnap:     "msgSnap",
-	msgDenied:   "msgDenied",
-}
-
-func (mt messageType) String() string {
-	return mtmap[int64(mt)]
-}
-
-var errNoLeader = errors.New("no leader")
-
-const (
-	stateFollower stateType = iota
-	stateCandidate
-	stateLeader
-)
-
-type stateType int64
-
-var stmap = [...]string{
-	stateFollower:  "stateFollower",
-	stateCandidate: "stateCandidate",
-	stateLeader:    "stateLeader",
-}
-
-var stepmap = [...]stepFunc{
-	stateFollower:  stepFollower,
-	stateCandidate: stepCandidate,
-	stateLeader:    stepLeader,
-}
-
-func (st stateType) String() string {
-	return stmap[int64(st)]
-}
-
-var EmptyState = State{}
-
-type Message struct {
-	Type      messageType
-	ClusterId int64
-	To        int64
-	From      int64
-	Term      int64
-	LogTerm   int64
-	Index     int64
-	Entries   []Entry
-	Commit    int64
-	Snapshot  Snapshot
-}
-
-func (m Message) IsMsgApp() bool {
-	return m.Type == msgApp
-}
-
-func (m Message) String() string {
-	return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
-		m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))
-}
-
-type index struct {
-	match, next int64
-}
-
-func (in *index) update(n int64) {
-	in.match = n
-	in.next = n + 1
-}
-
-func (in *index) decr() {
-	if in.next--; in.next < 1 {
-		in.next = 1
-	}
-}
-
-func (in *index) String() string {
-	return fmt.Sprintf("n=%d m=%d", in.next, in.match)
-}
-
-// An AtomicInt is an int64 to be accessed atomically.
-type atomicInt int64
-
-func (i *atomicInt) Set(n int64) {
-	atomic.StoreInt64((*int64)(i), n)
-}
-
-func (i *atomicInt) Get() int64 {
-	return atomic.LoadInt64((*int64)(i))
-}
-
-// int64Slice implements sort interface
-type int64Slice []int64
-
-func (p int64Slice) Len() int           { return len(p) }
-func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
-func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
-
-type stateMachine struct {
-	clusterId int64
-	id        int64
-
-	// the term we are participating in at any time
-	term  atomicInt
-	index atomicInt
-
-	// who we voted for in term
-	vote int64
-
-	// the log
-	raftLog *raftLog
-
-	ins map[int64]*index
-
-	state stateType
-
-	votes map[int64]bool
-
-	msgs []Message
-
-	// the leader id
-	lead atomicInt
-
-	// pending reconfiguration
-	pendingConf bool
-
-	unstableState State
-
-	// promotable indicates whether state machine could be promoted.
-	// New machine has to wait until it has been added to the cluster, or it
-	// may become the leader of the cluster without it.
-	promotable bool
-}
-
-func newStateMachine(id int64, peers []int64) *stateMachine {
-	if id == none {
-		panic("cannot use none id")
-	}
-	sm := &stateMachine{id: id, clusterId: none, lead: none, raftLog: newLog(), ins: make(map[int64]*index)}
-	for _, p := range peers {
-		sm.ins[p] = &index{}
-	}
-	sm.reset(0)
-	return sm
-}
-
-func (sm *stateMachine) String() string {
-	s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term)
-	switch sm.state {
-	case stateFollower:
-		s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead)
-	case stateCandidate:
-		s += fmt.Sprintf(` votes="%v"`, sm.votes)
-	case stateLeader:
-		s += fmt.Sprintf(` ins="%v"`, sm.ins)
-	}
-	return s
-}
-
-func (sm *stateMachine) poll(id int64, v bool) (granted int) {
-	if _, ok := sm.votes[id]; !ok {
-		sm.votes[id] = v
-	}
-	for _, vv := range sm.votes {
-		if vv {
-			granted++
-		}
-	}
-	return granted
-}
-
-// send persists state to stable storage and then sends to its mailbox.
-func (sm *stateMachine) send(m Message) {
-	m.ClusterId = sm.clusterId
-	m.From = sm.id
-	m.Term = sm.term.Get()
-	sm.msgs = append(sm.msgs, m)
-}
-
-// sendAppend sends RRPC, with entries to the given peer.
-func (sm *stateMachine) sendAppend(to int64) {
-	in := sm.ins[to]
-	m := Message{}
-	m.To = to
-	m.Index = in.next - 1
-	if sm.needSnapshot(m.Index) {
-		m.Type = msgSnap
-		m.Snapshot = sm.raftLog.snapshot
-	} else {
-		m.Type = msgApp
-		m.LogTerm = sm.raftLog.term(in.next - 1)
-		m.Entries = sm.raftLog.entries(in.next)
-		m.Commit = sm.raftLog.committed
-	}
-	sm.send(m)
-}
-
-// sendHeartbeat sends RRPC, without entries to the given peer.
-func (sm *stateMachine) sendHeartbeat(to int64) {
-	in := sm.ins[to]
-	index := max(in.next-1, sm.raftLog.lastIndex())
-	m := Message{
-		To:      to,
-		Type:    msgApp,
-		Index:   index,
-		LogTerm: sm.raftLog.term(index),
-		Commit:  sm.raftLog.committed,
-	}
-	sm.send(m)
-}
-
-// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
-func (sm *stateMachine) bcastAppend() {
-	for i := range sm.ins {
-		if i == sm.id {
-			continue
-		}
-		sm.sendAppend(i)
-	}
-}
-
-// bcastHeartbeat sends RRPC, without entries to all the peers.
-func (sm *stateMachine) bcastHeartbeat() {
-	for i := range sm.ins {
-		if i == sm.id {
-			continue
-		}
-		sm.sendHeartbeat(i)
-	}
-}
-
-func (sm *stateMachine) maybeCommit() bool {
-	// TODO(bmizerany): optimize.. Currently naive
-	mis := make(int64Slice, 0, len(sm.ins))
-	for i := range sm.ins {
-		mis = append(mis, sm.ins[i].match)
-	}
-	sort.Sort(sort.Reverse(mis))
-	mci := mis[sm.q()-1]
-
-	return sm.raftLog.maybeCommit(mci, sm.term.Get())
-}
-
-// nextEnts returns the appliable entries and updates the applied index
-func (sm *stateMachine) nextEnts() (ents []Entry) {
-	ents = sm.raftLog.nextEnts()
-	sm.raftLog.resetNextEnts()
-	return ents
-}
-
-func (sm *stateMachine) reset(term int64) {
-	sm.setTerm(term)
-	sm.lead.Set(none)
-	sm.setVote(none)
-	sm.votes = make(map[int64]bool)
-	for i := range sm.ins {
-		sm.ins[i] = &index{next: sm.raftLog.lastIndex() + 1}
-		if i == sm.id {
-			sm.ins[i].match = sm.raftLog.lastIndex()
-		}
-	}
-}
-
-func (sm *stateMachine) q() int {
-	return len(sm.ins)/2 + 1
-}
-
-func (sm *stateMachine) appendEntry(e Entry) {
-	e.Term = sm.term.Get()
-	e.Index = sm.raftLog.lastIndex() + 1
-	sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e))
-	sm.ins[sm.id].update(sm.raftLog.lastIndex())
-	sm.maybeCommit()
-}
-
-func (sm *stateMachine) becomeFollower(term int64, lead int64) {
-	sm.reset(term)
-	sm.lead.Set(lead)
-	sm.state = stateFollower
-	sm.pendingConf = false
-}
-
-func (sm *stateMachine) becomeCandidate() {
-	// TODO(xiangli) remove the panic when the raft implementation is stable
-	if sm.state == stateLeader {
-		panic("invalid transition [leader -> candidate]")
-	}
-	sm.reset(sm.term.Get() + 1)
-	sm.setVote(sm.id)
-	sm.state = stateCandidate
-}
-
-func (sm *stateMachine) becomeLeader() {
-	// TODO(xiangli) remove the panic when the raft implementation is stable
-	if sm.state == stateFollower {
-		panic("invalid transition [follower -> leader]")
-	}
-	sm.reset(sm.term.Get())
-	sm.lead.Set(sm.id)
-	sm.state = stateLeader
-
-	for _, e := range sm.raftLog.entries(sm.raftLog.committed + 1) {
-		if e.isConfig() {
-			sm.pendingConf = true
-		}
-	}
-
-	sm.appendEntry(Entry{Type: Normal, Data: nil})
-}
-
-func (sm *stateMachine) Msgs() []Message {
-	msgs := sm.msgs
-	sm.msgs = make([]Message, 0)
-
-	return msgs
-}
-
-func (sm *stateMachine) Step(m Message) (ok bool) {
-	if m.Type == msgHup {
-		sm.becomeCandidate()
-		if sm.q() == sm.poll(sm.id, true) {
-			sm.becomeLeader()
-			return true
-		}
-		for i := range sm.ins {
-			if i == sm.id {
-				continue
-			}
-			lasti := sm.raftLog.lastIndex()
-			sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.raftLog.term(lasti)})
-		}
-		return true
-	}
-
-	switch {
-	case m.Term == 0:
-		// local message
-	case m.Term > sm.term.Get():
-		lead := m.From
-		if m.Type == msgVote {
-			lead = none
-		}
-		sm.becomeFollower(m.Term, lead)
-	case m.Term < sm.term.Get():
-		// ignore
-		return true
-	}
-
-	return stepmap[sm.state](sm, m)
-}
-
-func (sm *stateMachine) handleAppendEntries(m Message) {
-	if sm.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
-		sm.index.Set(sm.raftLog.lastIndex())
-		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
-	} else {
-		sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
-	}
-}
-
-func (sm *stateMachine) handleSnapshot(m Message) {
-	if sm.restore(m.Snapshot) {
-		sm.raftLog.unstableSnapshot = m.Snapshot
-		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
-	} else {
-		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed})
-	}
-}
-
-func (sm *stateMachine) addNode(id int64) {
-	sm.addIns(id, 0, sm.raftLog.lastIndex()+1)
-	sm.pendingConf = false
-	if id == sm.id {
-		sm.promotable = true
-	}
-}
-
-func (sm *stateMachine) removeNode(id int64) {
-	sm.deleteIns(id)
-	sm.pendingConf = false
-}
-
-type stepFunc func(sm *stateMachine, m Message) bool
-
-func stepLeader(sm *stateMachine, m Message) bool {
-	switch m.Type {
-	case msgBeat:
-		sm.bcastHeartbeat()
-	case msgProp:
-		if len(m.Entries) != 1 {
-			panic("unexpected length(entries) of a msgProp")
-		}
-		e := m.Entries[0]
-		if e.isConfig() {
-			if sm.pendingConf {
-				return false
-			}
-			sm.pendingConf = true
-		}
-		sm.appendEntry(e)
-		sm.bcastAppend()
-	case msgAppResp:
-		if m.Index < 0 {
-			sm.ins[m.From].decr()
-			sm.sendAppend(m.From)
-		} else {
-			sm.ins[m.From].update(m.Index)
-			if sm.maybeCommit() {
-				sm.bcastAppend()
-			}
-		}
-	case msgVote:
-		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
-	}
-	return true
-}
-
-func stepCandidate(sm *stateMachine, m Message) bool {
-	switch m.Type {
-	case msgProp:
-		return false
-	case msgApp:
-		sm.becomeFollower(sm.term.Get(), m.From)
-		sm.handleAppendEntries(m)
-	case msgSnap:
-		sm.becomeFollower(m.Term, m.From)
-		sm.handleSnapshot(m)
-	case msgVote:
-		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
-	case msgVoteResp:
-		gr := sm.poll(m.From, m.Index >= 0)
-		switch sm.q() {
-		case gr:
-			sm.becomeLeader()
-			sm.bcastAppend()
-		case len(sm.votes) - gr:
-			sm.becomeFollower(sm.term.Get(), none)
-		}
-	}
-	return true
-}
-
-func stepFollower(sm *stateMachine, m Message) bool {
-	switch m.Type {
-	case msgProp:
-		if sm.lead.Get() == none {
-			return false
-		}
-		m.To = sm.lead.Get()
-		sm.send(m)
-	case msgApp:
-		sm.lead.Set(m.From)
-		sm.handleAppendEntries(m)
-	case msgSnap:
-		sm.handleSnapshot(m)
-	case msgVote:
-		if (sm.vote == none || sm.vote == m.From) && sm.raftLog.isUpToDate(m.Index, m.LogTerm) {
-			sm.setVote(m.From)
-			sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.raftLog.lastIndex()})
-		} else {
-			sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
-		}
-	}
-	return true
-}
-
-func (sm *stateMachine) compact(d []byte) {
-	sm.raftLog.snap(d, sm.clusterId, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes())
-	sm.raftLog.compact(sm.raftLog.applied)
-}
-
-// restore recovers the statemachine from a snapshot. It restores the log and the
-// configuration of statemachine.
-func (sm *stateMachine) restore(s Snapshot) bool {
-	if s.Index <= sm.raftLog.committed {
-		return false
-	}
-
-	sm.raftLog.restore(s)
-	sm.index.Set(sm.raftLog.lastIndex())
-	sm.clusterId = s.ClusterId
-	sm.ins = make(map[int64]*index)
-	for _, n := range s.Nodes {
-		if n == sm.id {
-			sm.addIns(n, sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1)
-			sm.promotable = true
-		} else {
-			sm.addIns(n, 0, sm.raftLog.lastIndex()+1)
-		}
-	}
-	sm.pendingConf = false
-	return true
-}
-
-func (sm *stateMachine) needSnapshot(i int64) bool {
-	if i < sm.raftLog.offset {
-		if sm.raftLog.snapshot.IsEmpty() {
-			panic("need non-empty snapshot")
-		}
-		return true
-	}
-	return false
-}
-
-func (sm *stateMachine) nodes() []int64 {
-	nodes := make([]int64, 0, len(sm.ins))
-	for k := range sm.ins {
-		nodes = append(nodes, k)
-	}
-	return nodes
-}
-
-func (sm *stateMachine) setTerm(term int64) {
-	sm.term.Set(term)
-	sm.saveState()
-}
-
-func (sm *stateMachine) setVote(vote int64) {
-	sm.vote = vote
-	sm.saveState()
-}
-
-func (sm *stateMachine) addIns(id, match, next int64) {
-	sm.ins[id] = &index{next: next, match: match}
-	sm.saveState()
-}
-
-func (sm *stateMachine) deleteIns(id int64) {
-	delete(sm.ins, id)
-	sm.saveState()
-}
-
-// saveState saves the state to sm.unstableState
-// When there is a term change, vote change or configuration change, raft
-// must call saveState.
-func (sm *stateMachine) saveState() {
-	sm.setState(sm.vote, sm.term.Get(), sm.raftLog.committed)
-}
-
-func (sm *stateMachine) clearState() {
-	sm.setState(0, 0, 0)
-}
-
-func (sm *stateMachine) setState(vote, term, commit int64) {
-	sm.unstableState.Vote = vote
-	sm.unstableState.Term = term
-	sm.unstableState.Commit = commit
-}
-
-func (sm *stateMachine) loadEnts(ents []Entry) {
-	sm.raftLog.append(sm.raftLog.lastIndex(), ents...)
-	sm.raftLog.unstable = sm.raftLog.lastIndex() + 1
-}
-
-func (sm *stateMachine) loadState(state State) {
-	sm.raftLog.committed = state.Commit
-	sm.setTerm(state.Term)
-	sm.setVote(state.Vote)
-}
-
-func (s *State) IsEmpty() bool {
-	return s.Term == 0
-}

+ 0 - 0
raft/diff_test.go → raft2/diff_test.go


+ 0 - 0
raft/entry.pb.go → raft2/entry.pb.go


+ 0 - 0
raft/info.pb.go → raft2/info.pb.go


+ 0 - 0
raft/log.go → raft2/log.go


+ 0 - 0
raft/log_test.go → raft2/log_test.go


+ 9 - 14
raft2/node.go

@@ -9,14 +9,9 @@ type stateResp struct {
 	msgs  []Message
 }
 
-type proposal struct {
-	id   int64
-	data []byte
-}
-
 type Node struct {
 	ctx    context.Context
-	propc  chan proposal
+	propc  chan []byte
 	recvc  chan Message
 	statec chan stateResp
 	tickc  chan struct{}
@@ -25,7 +20,7 @@ type Node struct {
 func Start(ctx context.Context, name string, election, heartbeat int) *Node {
 	n := &Node{
 		ctx:    ctx,
-		propc:  make(chan proposal),
+		propc:  make(chan []byte),
 		recvc:  make(chan Message),
 		statec: make(chan stateResp),
 		tickc:  make(chan struct{}),
@@ -54,13 +49,13 @@ func (n *Node) run(r *raft) {
 
 		select {
 		case p := <-propc:
-			r.propose(p.id, p.data)
+			r.propose(p)
 		case m := <-n.recvc:
-			r.step(m)
+			r.Step(m) // raft never returns an error
 		case <-n.tickc:
-			r.tick()
-		case n.statec <- stateResp{r.State, r.ents, r.msgs}:
-			r.resetState()
+			// r.tick()
+		// case n.statec <- stateResp{r.State, r.ents, r.msgs}:
+		// r.resetState()
 		case <-n.ctx.Done():
 			return
 		}
@@ -77,9 +72,9 @@ func (n *Node) Tick() error {
 }
 
 // Propose proposes data be appended to the log.
-func (n *Node) Propose(id int64, data []byte) error {
+func (n *Node) Propose(data []byte) error {
 	select {
-	case n.propc <- proposal{id, data}:
+	case n.propc <- data:
 		return nil
 	case <-n.ctx.Done():
 		return n.ctx.Err()

+ 586 - 18
raft2/raft.go

@@ -1,35 +1,603 @@
 package raft
 
-type State struct {
-	CommitIndex int64
+import (
+	"errors"
+	"fmt"
+	"sort"
+	"sync/atomic"
+)
+
+const none = -1
+
+type messageType int64
+
+const (
+	msgHup messageType = iota
+	msgBeat
+	msgProp
+	msgApp
+	msgAppResp
+	msgVote
+	msgVoteResp
+	msgSnap
+	msgDenied
+)
+
+var mtmap = [...]string{
+	msgHup:      "msgHup",
+	msgBeat:     "msgBeat",
+	msgProp:     "msgProp",
+	msgApp:      "msgApp",
+	msgAppResp:  "msgAppResp",
+	msgVote:     "msgVote",
+	msgVoteResp: "msgVoteResp",
+	msgSnap:     "msgSnap",
+	msgDenied:   "msgDenied",
 }
 
+func (mt messageType) String() string {
+	return mtmap[int64(mt)]
+}
+
+var errNoLeader = errors.New("no leader")
+
+const (
+	stateFollower stateType = iota
+	stateCandidate
+	stateLeader
+)
+
+type stateType int64
+
+var stmap = [...]string{
+	stateFollower:  "stateFollower",
+	stateCandidate: "stateCandidate",
+	stateLeader:    "stateLeader",
+}
+
+var stepmap = [...]stepFunc{
+	stateFollower:  stepFollower,
+	stateCandidate: stepCandidate,
+	stateLeader:    stepLeader,
+}
+
+func (st stateType) String() string {
+	return stmap[int64(st)]
+}
+
+var EmptyState = State{}
+
 type Message struct {
-	State State
-	To    string
-	Data  []byte
+	Type      messageType
+	ClusterId int64
+	To        int64
+	From      int64
+	Term      int64
+	LogTerm   int64
+	Index     int64
+	Entries   []Entry
+	Commit    int64
+	Snapshot  Snapshot
 }
 
-type Entry struct {
-	Id    int64
-	Index int64
-	Data  []byte
+func (m Message) IsMsgApp() bool {
+	return m.Type == msgApp
 }
 
-type raft struct {
-	name string
+func (m Message) String() string {
+	return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
+		m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))
+}
+
+type index struct {
+	match, next int64
+}
+
+func (in *index) update(n int64) {
+	in.match = n
+	in.next = n + 1
+}
+
+func (in *index) decr() {
+	if in.next--; in.next < 1 {
+		in.next = 1
+	}
+}
+
+func (in *index) String() string {
+	return fmt.Sprintf("n=%d m=%d", in.next, in.match)
+}
+
+// An AtomicInt is an int64 to be accessed atomically.
+type atomicInt int64
+
+func (i *atomicInt) Set(n int64) {
+	atomic.StoreInt64((*int64)(i), n)
+}
 
-	State
+func (i *atomicInt) Get() int64 {
+	return atomic.LoadInt64((*int64)(i))
+}
+
+// int64Slice implements sort interface
+type int64Slice []int64
 
+func (p int64Slice) Len() int           { return len(p) }
+func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+
+type raft struct {
+	// --- new stuff ---
+	name      string
 	election  int
 	heartbeat int
+	// -----------------
+
+	clusterId int64
+	id        int64
+
+	// the term we are participating in at any time
+	term  atomicInt
+	index atomicInt
+
+	// who we voted for in term
+	vote int64
+
+	// the log
+	raftLog *raftLog
+
+	ins map[int64]*index
+
+	state stateType
+
+	votes map[int64]bool
 
 	msgs []Message
-	ents []Entry
+
+	// the leader id
+	lead atomicInt
+
+	// pending reconfiguration
+	pendingConf bool
+
+	unstableState State
+
+	// promotable indicates whether state machine could be promoted.
+	// New machine has to wait until it has been added to the cluster, or it
+	// may become the leader of the cluster without it.
+	promotable bool
+}
+
+func newStateMachine(id int64, peers []int64) *raft {
+	if id == none {
+		panic("cannot use none id")
+	}
+	sm := &raft{id: id, clusterId: none, lead: none, raftLog: newLog(), ins: make(map[int64]*index)}
+	for _, p := range peers {
+		sm.ins[p] = &index{}
+	}
+	sm.reset(0)
+	return sm
+}
+
+func (r *raft) hasLeader() bool { return r.state != stateCandidate }
+
+func (r *raft) propose(data []byte) {
+	r.Step(Message{From: r.id, Type: msgProp, Entries: []Entry{{Data: data}}})
+}
+
+func (sm *raft) String() string {
+	s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term)
+	switch sm.state {
+	case stateFollower:
+		s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead)
+	case stateCandidate:
+		s += fmt.Sprintf(` votes="%v"`, sm.votes)
+	case stateLeader:
+		s += fmt.Sprintf(` ins="%v"`, sm.ins)
+	}
+	return s
+}
+
+func (sm *raft) poll(id int64, v bool) (granted int) {
+	if _, ok := sm.votes[id]; !ok {
+		sm.votes[id] = v
+	}
+	for _, vv := range sm.votes {
+		if vv {
+			granted++
+		}
+	}
+	return granted
+}
+
+// send persists state to stable storage and then sends to its mailbox.
+func (sm *raft) send(m Message) {
+	m.ClusterId = sm.clusterId
+	m.From = sm.id
+	m.Term = sm.term.Get()
+	sm.msgs = append(sm.msgs, m)
+}
+
+// sendAppend sends RRPC, with entries to the given peer.
+func (sm *raft) sendAppend(to int64) {
+	in := sm.ins[to]
+	m := Message{}
+	m.To = to
+	m.Index = in.next - 1
+	if sm.needSnapshot(m.Index) {
+		m.Type = msgSnap
+		m.Snapshot = sm.raftLog.snapshot
+	} else {
+		m.Type = msgApp
+		m.LogTerm = sm.raftLog.term(in.next - 1)
+		m.Entries = sm.raftLog.entries(in.next)
+		m.Commit = sm.raftLog.committed
+	}
+	sm.send(m)
+}
+
+// sendHeartbeat sends RRPC, without entries to the given peer.
+func (sm *raft) sendHeartbeat(to int64) {
+	in := sm.ins[to]
+	index := max(in.next-1, sm.raftLog.lastIndex())
+	m := Message{
+		To:      to,
+		Type:    msgApp,
+		Index:   index,
+		LogTerm: sm.raftLog.term(index),
+		Commit:  sm.raftLog.committed,
+	}
+	sm.send(m)
+}
+
+// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
+func (sm *raft) bcastAppend() {
+	for i := range sm.ins {
+		if i == sm.id {
+			continue
+		}
+		sm.sendAppend(i)
+	}
+}
+
+// bcastHeartbeat sends RRPC, without entries to all the peers.
+func (sm *raft) bcastHeartbeat() {
+	for i := range sm.ins {
+		if i == sm.id {
+			continue
+		}
+		sm.sendHeartbeat(i)
+	}
+}
+
+func (sm *raft) maybeCommit() bool {
+	// TODO(bmizerany): optimize.. Currently naive
+	mis := make(int64Slice, 0, len(sm.ins))
+	for i := range sm.ins {
+		mis = append(mis, sm.ins[i].match)
+	}
+	sort.Sort(sort.Reverse(mis))
+	mci := mis[sm.q()-1]
+
+	return sm.raftLog.maybeCommit(mci, sm.term.Get())
 }
 
-func (sm *raft) hasLeader() bool               { return false }
-func (sm *raft) step(m Message)                {}
-func (sm *raft) resetState()                   {}
-func (sm *raft) propose(id int64, data []byte) {}
-func (sm *raft) tick()                         {}
+// nextEnts returns the appliable entries and updates the applied index
+func (sm *raft) nextEnts() (ents []Entry) {
+	ents = sm.raftLog.nextEnts()
+	sm.raftLog.resetNextEnts()
+	return ents
+}
+
+func (sm *raft) reset(term int64) {
+	sm.setTerm(term)
+	sm.lead.Set(none)
+	sm.setVote(none)
+	sm.votes = make(map[int64]bool)
+	for i := range sm.ins {
+		sm.ins[i] = &index{next: sm.raftLog.lastIndex() + 1}
+		if i == sm.id {
+			sm.ins[i].match = sm.raftLog.lastIndex()
+		}
+	}
+}
+
+func (sm *raft) q() int {
+	return len(sm.ins)/2 + 1
+}
+
+func (sm *raft) appendEntry(e Entry) {
+	e.Term = sm.term.Get()
+	e.Index = sm.raftLog.lastIndex() + 1
+	sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e))
+	sm.ins[sm.id].update(sm.raftLog.lastIndex())
+	sm.maybeCommit()
+}
+
+func (sm *raft) becomeFollower(term int64, lead int64) {
+	sm.reset(term)
+	sm.lead.Set(lead)
+	sm.state = stateFollower
+	sm.pendingConf = false
+}
+
+func (sm *raft) becomeCandidate() {
+	// TODO(xiangli) remove the panic when the raft implementation is stable
+	if sm.state == stateLeader {
+		panic("invalid transition [leader -> candidate]")
+	}
+	sm.reset(sm.term.Get() + 1)
+	sm.setVote(sm.id)
+	sm.state = stateCandidate
+}
+
+func (sm *raft) becomeLeader() {
+	// TODO(xiangli) remove the panic when the raft implementation is stable
+	if sm.state == stateFollower {
+		panic("invalid transition [follower -> leader]")
+	}
+	sm.reset(sm.term.Get())
+	sm.lead.Set(sm.id)
+	sm.state = stateLeader
+
+	for _, e := range sm.raftLog.entries(sm.raftLog.committed + 1) {
+		if e.isConfig() {
+			sm.pendingConf = true
+		}
+	}
+
+	sm.appendEntry(Entry{Type: Normal, Data: nil})
+}
+
+func (sm *raft) ReadMessages() []Message {
+	msgs := sm.msgs
+	sm.msgs = make([]Message, 0)
+
+	return msgs
+}
+
+func (sm *raft) Step(m Message) error {
+	if m.Type == msgHup {
+		sm.becomeCandidate()
+		if sm.q() == sm.poll(sm.id, true) {
+			sm.becomeLeader()
+		}
+		for i := range sm.ins {
+			if i == sm.id {
+				continue
+			}
+			lasti := sm.raftLog.lastIndex()
+			sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.raftLog.term(lasti)})
+		}
+	}
+
+	switch {
+	case m.Term == 0:
+		// local message
+	case m.Term > sm.term.Get():
+		lead := m.From
+		if m.Type == msgVote {
+			lead = none
+		}
+		sm.becomeFollower(m.Term, lead)
+	case m.Term < sm.term.Get():
+		// ignore
+	}
+
+	stepmap[sm.state](sm, m)
+	return nil
+}
+
+func (sm *raft) handleAppendEntries(m Message) {
+	if sm.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
+		sm.index.Set(sm.raftLog.lastIndex())
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
+	} else {
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
+	}
+}
+
+func (sm *raft) handleSnapshot(m Message) {
+	if sm.restore(m.Snapshot) {
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
+	} else {
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed})
+	}
+}
+
+func (sm *raft) addNode(id int64) {
+	sm.addIns(id, 0, sm.raftLog.lastIndex()+1)
+	sm.pendingConf = false
+	if id == sm.id {
+		sm.promotable = true
+	}
+}
+
+func (sm *raft) removeNode(id int64) {
+	sm.deleteIns(id)
+	sm.pendingConf = false
+}
+
+type stepFunc func(sm *raft, m Message) bool
+
+func stepLeader(sm *raft, m Message) bool {
+	switch m.Type {
+	case msgBeat:
+		sm.bcastHeartbeat()
+	case msgProp:
+		if len(m.Entries) != 1 {
+			panic("unexpected length(entries) of a msgProp")
+		}
+		e := m.Entries[0]
+		if e.isConfig() {
+			if sm.pendingConf {
+				return false
+			}
+			sm.pendingConf = true
+		}
+		sm.appendEntry(e)
+		sm.bcastAppend()
+	case msgAppResp:
+		if m.Index < 0 {
+			sm.ins[m.From].decr()
+			sm.sendAppend(m.From)
+		} else {
+			sm.ins[m.From].update(m.Index)
+			if sm.maybeCommit() {
+				sm.bcastAppend()
+			}
+		}
+	case msgVote:
+		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+	}
+	return true
+}
+
+func stepCandidate(sm *raft, m Message) bool {
+	switch m.Type {
+	case msgProp:
+		return false
+	case msgApp:
+		sm.becomeFollower(sm.term.Get(), m.From)
+		sm.handleAppendEntries(m)
+	case msgSnap:
+		sm.becomeFollower(m.Term, m.From)
+		sm.handleSnapshot(m)
+	case msgVote:
+		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+	case msgVoteResp:
+		gr := sm.poll(m.From, m.Index >= 0)
+		switch sm.q() {
+		case gr:
+			sm.becomeLeader()
+			sm.bcastAppend()
+		case len(sm.votes) - gr:
+			sm.becomeFollower(sm.term.Get(), none)
+		}
+	}
+	return true
+}
+
+func stepFollower(sm *raft, m Message) bool {
+	switch m.Type {
+	case msgProp:
+		if sm.lead.Get() == none {
+			return false
+		}
+		m.To = sm.lead.Get()
+		sm.send(m)
+	case msgApp:
+		sm.lead.Set(m.From)
+		sm.handleAppendEntries(m)
+	case msgSnap:
+		sm.handleSnapshot(m)
+	case msgVote:
+		if (sm.vote == none || sm.vote == m.From) && sm.raftLog.isUpToDate(m.Index, m.LogTerm) {
+			sm.setVote(m.From)
+			sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.raftLog.lastIndex()})
+		} else {
+			sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+		}
+	}
+	return true
+}
+
+func (sm *raft) compact(d []byte) {
+	sm.raftLog.snap(d, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes())
+	sm.raftLog.compact(sm.raftLog.applied)
+}
+
+// restore recovers the statemachine from a snapshot. It restores the log and the
+// configuration of statemachine.
+func (sm *raft) restore(s Snapshot) bool {
+	if s.Index <= sm.raftLog.committed {
+		return false
+	}
+
+	sm.raftLog.restore(s)
+	sm.index.Set(sm.raftLog.lastIndex())
+	sm.ins = make(map[int64]*index)
+	for _, n := range s.Nodes {
+		if n == sm.id {
+			sm.addIns(n, sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1)
+		} else {
+			sm.addIns(n, 0, sm.raftLog.lastIndex()+1)
+		}
+	}
+	sm.pendingConf = false
+	return true
+}
+
+func (sm *raft) needSnapshot(i int64) bool {
+	if i < sm.raftLog.offset {
+		if sm.raftLog.snapshot.IsEmpty() {
+			panic("need non-empty snapshot")
+		}
+		return true
+	}
+	return false
+}
+
+func (sm *raft) nodes() []int64 {
+	nodes := make([]int64, 0, len(sm.ins))
+	for k := range sm.ins {
+		nodes = append(nodes, k)
+	}
+	return nodes
+}
+
+func (sm *raft) setTerm(term int64) {
+	sm.term.Set(term)
+	sm.saveState()
+}
+
+func (sm *raft) setVote(vote int64) {
+	sm.vote = vote
+	sm.saveState()
+}
+
+func (sm *raft) addIns(id, match, next int64) {
+	sm.ins[id] = &index{next: next, match: match}
+	sm.saveState()
+}
+
+func (sm *raft) deleteIns(id int64) {
+	delete(sm.ins, id)
+	sm.saveState()
+}
+
+// saveState saves the state to sm.unstableState
+// When there is a term change, vote change or configuration change, raft
+// must call saveState.
+func (sm *raft) saveState() {
+	sm.setState(sm.vote, sm.term.Get(), sm.raftLog.committed)
+}
+
+func (sm *raft) clearState() {
+	sm.setState(0, 0, 0)
+}
+
+func (sm *raft) setState(vote, term, commit int64) {
+	sm.unstableState.Vote = vote
+	sm.unstableState.Term = term
+	sm.unstableState.Commit = commit
+}
+
+func (sm *raft) loadEnts(ents []Entry) {
+	if !sm.raftLog.isEmpty() {
+		panic("cannot load entries when log is not empty")
+	}
+	sm.raftLog.append(0, ents...)
+	sm.raftLog.unstable = sm.raftLog.lastIndex() + 1
+}
+
+func (sm *raft) loadState(state State) {
+	sm.raftLog.committed = state.Commit
+	sm.setTerm(state.Term)
+	sm.setVote(state.Vote)
+}
+
+func (s *State) IsEmpty() bool {
+	return s.Term == 0
+}

+ 39 - 36
raft/raft_test.go → raft2/raft_test.go

@@ -8,6 +8,11 @@ import (
 	"testing"
 )
 
+type Interface interface {
+	Step(m Message) error
+	ReadMessages() []Message
+}
+
 func TestLeaderElection(t *testing.T) {
 	tests := []struct {
 		*network
@@ -28,7 +33,7 @@ func TestLeaderElection(t *testing.T) {
 
 	for i, tt := range tests {
 		tt.send(Message{From: 0, To: 0, Type: msgHup})
-		sm := tt.network.peers[0].(*stateMachine)
+		sm := tt.network.peers[0].(*raft)
 		if sm.state != tt.state {
 			t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
 		}
@@ -71,7 +76,7 @@ func TestLogReplication(t *testing.T) {
 		}
 
 		for j, x := range tt.network.peers {
-			sm := x.(*stateMachine)
+			sm := x.(*raft)
 
 			if sm.raftLog.committed != tt.wcommitted {
 				t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
@@ -104,7 +109,7 @@ func TestSingleNodeCommit(t *testing.T) {
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
-	sm := tt.peers[0].(*stateMachine)
+	sm := tt.peers[0].(*raft)
 	if sm.raftLog.committed != 3 {
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
 	}
@@ -125,7 +130,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
-	sm := tt.peers[0].(*stateMachine)
+	sm := tt.peers[0].(*raft)
 	if sm.raftLog.committed != 1 {
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
 	}
@@ -139,7 +144,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt.send(Message{From: 1, To: 1, Type: msgHup})
 
 	// no log entries from previous term should be committed
-	sm = tt.peers[1].(*stateMachine)
+	sm = tt.peers[1].(*raft)
 	if sm.raftLog.committed != 1 {
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
 	}
@@ -177,7 +182,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
-	sm := tt.peers[0].(*stateMachine)
+	sm := tt.peers[0].(*raft)
 	if sm.raftLog.committed != 1 {
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
 	}
@@ -211,7 +216,7 @@ func TestDuelingCandidates(t *testing.T) {
 
 	wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1}
 	tests := []struct {
-		sm      *stateMachine
+		sm      *raft
 		state   stateType
 		term    int64
 		raftLog *raftLog
@@ -229,7 +234,7 @@ func TestDuelingCandidates(t *testing.T) {
 			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
 		}
 		base := ltoa(tt.raftLog)
-		if sm, ok := nt.peers[int64(i)].(*stateMachine); ok {
+		if sm, ok := nt.peers[int64(i)].(*raft); ok {
 			l := ltoa(sm.raftLog)
 			if g := diffu(base, l); g != "" {
 				t.Errorf("#%d: diff:\n%s", i, g)
@@ -254,7 +259,7 @@ func TestCandidateConcede(t *testing.T) {
 	// send a proposal to 2 to flush out a msgApp to 0
 	tt.send(Message{From: 2, To: 2, Type: msgProp, Entries: []Entry{{Data: data}}})
 
-	a := tt.peers[0].(*stateMachine)
+	a := tt.peers[0].(*raft)
 	if g := a.state; g != stateFollower {
 		t.Errorf("state = %s, want %s", g, stateFollower)
 	}
@@ -263,7 +268,7 @@ func TestCandidateConcede(t *testing.T) {
 	}
 	wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
 	for i, p := range tt.peers {
-		if sm, ok := p.(*stateMachine); ok {
+		if sm, ok := p.(*raft); ok {
 			l := ltoa(sm.raftLog)
 			if g := diffu(wantLog, l); g != "" {
 				t.Errorf("#%d: diff:\n%s", i, g)
@@ -278,7 +283,7 @@ func TestSingleNodeCandidate(t *testing.T) {
 	tt := newNetwork(nil)
 	tt.send(Message{From: 0, To: 0, Type: msgHup})
 
-	sm := tt.peers[0].(*stateMachine)
+	sm := tt.peers[0].(*raft)
 	if sm.state != stateLeader {
 		t.Errorf("state = %d, want %d", sm.state, stateLeader)
 	}
@@ -302,7 +307,7 @@ func TestOldMessages(t *testing.T) {
 	}
 	base := ltoa(l)
 	for i, p := range tt.peers {
-		if sm, ok := p.(*stateMachine); ok {
+		if sm, ok := p.(*raft); ok {
 			l := ltoa(sm.raftLog)
 			if g := diffu(base, l); g != "" {
 				t.Errorf("#%d: diff:\n%s", i, g)
@@ -354,7 +359,7 @@ func TestProposal(t *testing.T) {
 		}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
-			if sm, ok := p.(*stateMachine); ok {
+			if sm, ok := p.(*raft); ok {
 				l := ltoa(sm.raftLog)
 				if g := diffu(base, l); g != "" {
 					t.Errorf("#%d: diff:\n%s", i, g)
@@ -363,7 +368,7 @@ func TestProposal(t *testing.T) {
 				t.Logf("#%d: empty log", i)
 			}
 		}
-		sm := tt.network.peers[0].(*stateMachine)
+		sm := tt.network.peers[0].(*raft)
 		if g := sm.term.Get(); g != 1 {
 			t.Errorf("#%d: term = %d, want %d", i, g, 1)
 		}
@@ -387,7 +392,7 @@ func TestProposalByProxy(t *testing.T) {
 		wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
-			if sm, ok := p.(*stateMachine); ok {
+			if sm, ok := p.(*raft); ok {
 				l := ltoa(sm.raftLog)
 				if g := diffu(base, l); g != "" {
 					t.Errorf("#%d: diff:\n%s", i, g)
@@ -396,7 +401,7 @@ func TestProposalByProxy(t *testing.T) {
 				t.Logf("#%d: empty log", i)
 			}
 		}
-		sm := tt.peers[0].(*stateMachine)
+		sm := tt.peers[0].(*raft)
 		if g := sm.term.Get(); g != 1 {
 			t.Errorf("#%d: term = %d, want %d", i, g, 1)
 		}
@@ -436,7 +441,7 @@ func TestCommit(t *testing.T) {
 		for j := 0; j < len(tt.matches); j++ {
 			ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1}
 		}
-		sm := &stateMachine{raftLog: &raftLog{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)}
+		sm := &raft{raftLog: &raftLog{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)}
 		sm.maybeCommit()
 		if g := sm.raftLog.committed; g != tt.w {
 			t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
@@ -473,7 +478,7 @@ func TestHandleMsgApp(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := &stateMachine{
+		sm := &raft{
 			state:   stateFollower,
 			term:    2,
 			raftLog: &raftLog{committed: 0, ents: []Entry{{}, {Term: 1}, {Term: 2}}},
@@ -486,7 +491,7 @@ func TestHandleMsgApp(t *testing.T) {
 		if sm.raftLog.committed != tt.wCommit {
 			t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
 		}
-		m := sm.Msgs()
+		m := sm.ReadMessages()
 		if len(m) != 1 {
 			t.Errorf("#%d: msg = nil, want 1")
 		}
@@ -535,7 +540,7 @@ func TestRecvMsgVote(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := &stateMachine{
+		sm := &raft{
 			state:   tt.state,
 			vote:    tt.voteFor,
 			raftLog: &raftLog{ents: []Entry{{}, {Term: 2}, {Term: 2}}},
@@ -543,7 +548,7 @@ func TestRecvMsgVote(t *testing.T) {
 
 		sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
 
-		msgs := sm.Msgs()
+		msgs := sm.ReadMessages()
 		if g := len(msgs); g != 1 {
 			t.Errorf("#%d: len(msgs) = %d, want 1", i, g)
 			continue
@@ -724,9 +729,9 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.becomeCandidate()
 		sm.becomeLeader()
-		sm.Msgs()
+		sm.ReadMessages()
 		sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term.Get()})
-		msgs := sm.Msgs()
+		msgs := sm.ReadMessages()
 
 		if len(msgs) != tt.wmsgNum {
 			t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
@@ -761,7 +766,7 @@ func TestRecvMsgBeat(t *testing.T) {
 		sm.state = tt.state
 		sm.Step(Message{From: 0, To: 0, Type: msgBeat})
 
-		msgs := sm.Msgs()
+		msgs := sm.ReadMessages()
 		if len(msgs) != tt.wMsg {
 			t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
 		}
@@ -826,7 +831,7 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeLeader()
 
 	sm.Step(Message{From: 0, To: 0, Type: msgBeat})
-	msgs := sm.Msgs()
+	msgs := sm.ReadMessages()
 	if len(msgs) != 1 {
 		t.Errorf("len(msgs) = %d, want 1", len(msgs))
 	}
@@ -840,7 +845,7 @@ func TestProvideSnap(t *testing.T) {
 	sm.ins[1].next = sm.raftLog.offset
 
 	sm.Step(Message{From: 1, To: 0, Type: msgAppResp, Index: -1})
-	msgs = sm.Msgs()
+	msgs = sm.ReadMessages()
 	if len(msgs) != 1 {
 		t.Errorf("len(msgs) = %d, want 1", len(msgs))
 	}
@@ -874,14 +879,14 @@ func TestSlowNodeRestore(t *testing.T) {
 	for j := 0; j < defaultCompactThreshold+1; j++ {
 		nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}})
 	}
-	lead := nt.peers[0].(*stateMachine)
+	lead := nt.peers[0].(*raft)
 	lead.nextEnts()
 	lead.compact(nil)
 
 	nt.recover()
 	nt.send(Message{From: 0, To: 0, Type: msgBeat})
 
-	follower := nt.peers[2].(*stateMachine)
+	follower := nt.peers[2].(*raft)
 	if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
 		t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
 	}
@@ -928,13 +933,13 @@ func TestUnstableState(t *testing.T) {
 	sm.clearState()
 }
 
-func ents(terms ...int64) *stateMachine {
+func ents(terms ...int64) *raft {
 	ents := []Entry{{}}
 	for _, term := range terms {
 		ents = append(ents, Entry{Term: term})
 	}
 
-	sm := &stateMachine{raftLog: &raftLog{ents: ents}}
+	sm := &raft{raftLog: &raftLog{ents: ents}}
 	sm.reset(0)
 	return sm
 }
@@ -964,7 +969,7 @@ func newNetwork(peers ...Interface) *network {
 		case nil:
 			sm := newStateMachine(nid, defaultPeerAddrs)
 			npeers[nid] = sm
-		case *stateMachine:
+		case *raft:
 			v.id = nid
 			v.ins = make(map[int64]*index)
 			for i := 0; i < size; i++ {
@@ -972,8 +977,6 @@ func newNetwork(peers ...Interface) *network {
 			}
 			v.reset(0)
 			npeers[nid] = v
-		case *Node:
-			npeers[v.sm.id] = v
 		default:
 			npeers[nid] = v
 		}
@@ -990,7 +993,7 @@ func (nw *network) send(msgs ...Message) {
 		m := msgs[0]
 		p := nw.peers[m.To]
 		p.Step(m)
-		msgs = append(msgs[1:], nw.filter(p.Msgs())...)
+		msgs = append(msgs[1:], nw.filter(p.ReadMessages())...)
 	}
 }
 
@@ -1049,7 +1052,7 @@ type connem struct {
 
 type blackHole struct{}
 
-func (blackHole) Step(Message) bool { return true }
-func (blackHole) Msgs() []Message   { return nil }
+func (blackHole) Step(Message) error      { return nil }
+func (blackHole) ReadMessages() []Message { return nil }
 
 var nopStepper = &blackHole{}

+ 0 - 0
raft/snapshot.go → raft2/snapshot.go


+ 0 - 0
raft/state.pb.go → raft2/state.pb.go


+ 0 - 0
raft/state.proto → raft2/state.proto