|
@@ -3,6 +3,7 @@ package raft
|
|
|
import (
|
|
import (
|
|
|
"errors"
|
|
"errors"
|
|
|
"sort"
|
|
"sort"
|
|
|
|
|
+ "sync/atomic"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const none = -1
|
|
const none = -1
|
|
@@ -89,6 +90,19 @@ func (in *index) decr() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// An AtomicInt is an int64 to be accessed atomically.
|
|
|
|
|
+type atomicInt int64
|
|
|
|
|
+
|
|
|
|
|
+// Add atomically adds n to i.
|
|
|
|
|
+func (i *atomicInt) Set(n int64) {
|
|
|
|
|
+ atomic.StoreInt64((*int64)(i), n)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Get atomically gets the value of i.
|
|
|
|
|
+func (i *atomicInt) Get() int64 {
|
|
|
|
|
+ return atomic.LoadInt64((*int64)(i))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
type stateMachine struct {
|
|
type stateMachine struct {
|
|
|
id int64
|
|
id int64
|
|
|
|
|
|
|
@@ -110,7 +124,7 @@ type stateMachine struct {
|
|
|
msgs []Message
|
|
msgs []Message
|
|
|
|
|
|
|
|
// the leader id
|
|
// the leader id
|
|
|
- lead int64
|
|
|
|
|
|
|
+ lead atomicInt
|
|
|
|
|
|
|
|
// pending reconfiguration
|
|
// pending reconfiguration
|
|
|
pendingConf bool
|
|
pendingConf bool
|
|
@@ -197,7 +211,7 @@ func (sm *stateMachine) nextEnts() (ents []Entry) {
|
|
|
|
|
|
|
|
func (sm *stateMachine) reset(term int) {
|
|
func (sm *stateMachine) reset(term int) {
|
|
|
sm.term = term
|
|
sm.term = term
|
|
|
- sm.lead = none
|
|
|
|
|
|
|
+ sm.lead.Set(none)
|
|
|
sm.vote = none
|
|
sm.vote = none
|
|
|
sm.votes = make(map[int64]bool)
|
|
sm.votes = make(map[int64]bool)
|
|
|
for i := range sm.ins {
|
|
for i := range sm.ins {
|
|
@@ -228,7 +242,7 @@ func (sm *stateMachine) promotable() bool {
|
|
|
|
|
|
|
|
func (sm *stateMachine) becomeFollower(term int, lead int64) {
|
|
func (sm *stateMachine) becomeFollower(term int, lead int64) {
|
|
|
sm.reset(term)
|
|
sm.reset(term)
|
|
|
- sm.lead = lead
|
|
|
|
|
|
|
+ sm.lead.Set(lead)
|
|
|
sm.state = stateFollower
|
|
sm.state = stateFollower
|
|
|
sm.pendingConf = false
|
|
sm.pendingConf = false
|
|
|
}
|
|
}
|
|
@@ -249,7 +263,7 @@ func (sm *stateMachine) becomeLeader() {
|
|
|
panic("invalid transition [follower -> leader]")
|
|
panic("invalid transition [follower -> leader]")
|
|
|
}
|
|
}
|
|
|
sm.reset(sm.term)
|
|
sm.reset(sm.term)
|
|
|
- sm.lead = sm.id
|
|
|
|
|
|
|
+ sm.lead.Set(sm.id)
|
|
|
sm.state = stateLeader
|
|
sm.state = stateLeader
|
|
|
|
|
|
|
|
for _, e := range sm.log.entries(sm.log.committed + 1) {
|
|
for _, e := range sm.log.entries(sm.log.committed + 1) {
|
|
@@ -384,10 +398,10 @@ func stepCandidate(sm *stateMachine, m Message) bool {
|
|
|
func stepFollower(sm *stateMachine, m Message) bool {
|
|
func stepFollower(sm *stateMachine, m Message) bool {
|
|
|
switch m.Type {
|
|
switch m.Type {
|
|
|
case msgProp:
|
|
case msgProp:
|
|
|
- if sm.lead == none {
|
|
|
|
|
|
|
+ if sm.lead.Get() == none {
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
- m.To = sm.lead
|
|
|
|
|
|
|
+ m.To = sm.lead.Get()
|
|
|
sm.send(m)
|
|
sm.send(m)
|
|
|
case msgApp:
|
|
case msgApp:
|
|
|
sm.handleAppendEntries(m)
|
|
sm.handleAppendEntries(m)
|