Browse Source

raft: break Step into pieces

Blake Mizerany 11 years ago
parent
commit
6044b1a0d7
1 changed files with 97 additions and 88 deletions
  1. 97 88
      raft/raft.go

+ 97 - 88
raft/raft.go

@@ -49,6 +49,12 @@ var stmap = [...]string{
 	stateLeader:    "stateLeader",
 }
 
+var stepmap = [...]stepFunc{
+	stateFollower:  stepFollower,
+	stateCandidate: stepCandidate,
+	stateLeader:    stepLeader,
+}
+
 func (st stateType) String() string {
 	return stmap[int(st)]
 }
@@ -241,8 +247,7 @@ func (sm *stateMachine) Msgs() []Message {
 }
 
 func (sm *stateMachine) Step(m Message) (ok bool) {
-	switch m.Type {
-	case msgHup:
+	if m.Type == msgHup {
 		sm.becomeCandidate()
 		if sm.q() == sm.poll(sm.id, true) {
 			sm.becomeLeader()
@@ -256,43 +261,11 @@ func (sm *stateMachine) Step(m Message) (ok bool) {
 			sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
 		}
 		return true
-	case msgBeat:
-		if sm.state != stateLeader {
-			return true
-		}
-		sm.bcastAppend()
-		return
-	case msgProp:
-		if len(m.Entries) != 1 {
-			panic("unexpected length(entries) of a msgProp")
-		}
-
-		switch sm.lead {
-		case sm.id:
-			e := m.Entries[0]
-			if e.isConfig() {
-				if sm.pendingConf {
-					return false
-				}
-				sm.pendingConf = true
-			}
-			e.Term = sm.term
-
-			sm.log.append(sm.log.lastIndex(), e)
-			sm.ins[sm.id].update(sm.log.lastIndex())
-			sm.maybeCommit()
-			sm.bcastAppend()
-		case none:
-			// msgProp given without leader
-			return false
-		default:
-			m.To = sm.lead
-			sm.send(m)
-		}
-		return true
 	}
 
 	switch {
+	case m.Term == 0:
+		// local message
 	case m.Term > sm.term:
 		sm.becomeFollower(m.Term, m.From)
 	case m.Term < sm.term:
@@ -300,69 +273,105 @@ func (sm *stateMachine) Step(m Message) (ok bool) {
 		return true
 	}
 
-	handleAppendEntries := func() {
-		if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
-			sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
-		} else {
-			sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
-		}
+	return stepmap[sm.state](sm, m)
+}
+
+func (sm *stateMachine) handleAppendEntries(m Message) {
+	if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
+	} else {
+		sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
 	}
+}
 
-	switch sm.state {
-	case stateLeader:
-		switch m.Type {
-		case msgAppResp:
-			if m.Index < 0 {
-				sm.ins[m.From].decr()
-				sm.sendAppend(m.From)
-			} else {
-				sm.ins[m.From].update(m.Index)
-				if sm.maybeCommit() {
-					sm.bcastAppend()
-				}
-			}
-		case msgVote:
-			sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+func (sm *stateMachine) addNode(id int) {
+	sm.ins[id] = &index{next: sm.log.lastIndex() + 1}
+	sm.pendingConf = false
+}
+
+func (sm *stateMachine) removeNode(id int) {
+	delete(sm.ins, id)
+	sm.pendingConf = false
+}
+
+type stepFunc func(sm *stateMachine, m Message) bool
+
+func stepLeader(sm *stateMachine, m Message) bool {
+	switch m.Type {
+	case msgBeat:
+		sm.bcastAppend()
+	case msgProp:
+		if len(m.Entries) != 1 {
+			panic("unexpected length(entries) of a msgProp")
 		}
-	case stateCandidate:
-		switch m.Type {
-		case msgApp:
-			sm.becomeFollower(sm.term, m.From)
-			handleAppendEntries()
-		case msgVote:
-			sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
-		case msgVoteResp:
-			gr := sm.poll(m.From, m.Index >= 0)
-			switch sm.q() {
-			case gr:
-				sm.becomeLeader()
-				sm.bcastAppend()
-			case len(sm.votes) - gr:
-				sm.becomeFollower(sm.term, none)
+		e := m.Entries[0]
+		if e.isConfig() {
+			if sm.pendingConf {
+				return false
 			}
+			sm.pendingConf = true
 		}
-	case stateFollower:
-		switch m.Type {
-		case msgApp:
-			handleAppendEntries()
-		case msgVote:
-			if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) {
-				sm.vote = m.From
-				sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
-			} else {
-				sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+		e.Term = sm.term
+
+		sm.log.append(sm.log.lastIndex(), e)
+		sm.ins[sm.id].update(sm.log.lastIndex())
+		sm.maybeCommit()
+		sm.bcastAppend()
+	case msgAppResp:
+		if m.Index < 0 {
+			sm.ins[m.From].decr()
+			sm.sendAppend(m.From)
+		} else {
+			sm.ins[m.From].update(m.Index)
+			if sm.maybeCommit() {
+				sm.bcastAppend()
 			}
 		}
+	case msgVote:
+		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
 	}
 	return true
 }
 
-func (sm *stateMachine) addNode(id int) {
-	sm.ins[id] = &index{next: sm.log.lastIndex() + 1}
-	sm.pendingConf = false
+func stepCandidate(sm *stateMachine, m Message) bool {
+	switch m.Type {
+	case msgProp:
+		return false
+	case msgApp:
+		sm.becomeFollower(sm.term, m.From)
+		sm.handleAppendEntries(m)
+	case msgVote:
+		sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+	case msgVoteResp:
+		gr := sm.poll(m.From, m.Index >= 0)
+		switch sm.q() {
+		case gr:
+			sm.becomeLeader()
+			sm.bcastAppend()
+		case len(sm.votes) - gr:
+			sm.becomeFollower(sm.term, none)
+		}
+	}
+	return true
 }
 
-func (sm *stateMachine) removeNode(id int) {
-	delete(sm.ins, id)
-	sm.pendingConf = false
+func stepFollower(sm *stateMachine, m Message) bool {
+	switch m.Type {
+	case msgProp:
+		if sm.lead == none {
+			return false
+		}
+		m.To = sm.lead
+		sm.send(m)
+	case msgApp:
+		sm.handleAppendEntries(m)
+	case msgVote:
+		if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) {
+			sm.vote = m.From
+			sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
+		} else {
+			sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
+		}
+	}
+	return true
 }