|
@@ -2,6 +2,8 @@ package raft
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"errors"
|
|
"errors"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ golog "log"
|
|
|
"sort"
|
|
"sort"
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
)
|
|
)
|
|
@@ -78,6 +80,11 @@ type Message struct {
|
|
|
Snapshot Snapshot
|
|
Snapshot Snapshot
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (m Message) String() string {
|
|
|
|
|
+ return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
|
|
|
|
|
+ m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
type index struct {
|
|
type index struct {
|
|
|
match, next int64
|
|
match, next int64
|
|
|
}
|
|
}
|
|
@@ -93,6 +100,10 @@ func (in *index) decr() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (in *index) String() string {
|
|
|
|
|
+ return fmt.Sprintf("n=%d m=%d", in.next, in.match)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// An AtomicInt is an int64 to be accessed atomically.
|
|
// An AtomicInt is an int64 to be accessed atomically.
|
|
|
type atomicInt int64
|
|
type atomicInt int64
|
|
|
|
|
|
|
@@ -154,6 +165,19 @@ func newStateMachine(id int64, peers []int64) *stateMachine {
|
|
|
return sm
|
|
return sm
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (sm *stateMachine) String() string {
|
|
|
|
|
+ s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term)
|
|
|
|
|
+ switch sm.state {
|
|
|
|
|
+ case stateFollower:
|
|
|
|
|
+ s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead)
|
|
|
|
|
+ case stateCandidate:
|
|
|
|
|
+ s += fmt.Sprintf(` votes="%v"`, sm.votes)
|
|
|
|
|
+ case stateLeader:
|
|
|
|
|
+ s += fmt.Sprintf(` ins="%v"`, sm.ins)
|
|
|
|
|
+ }
|
|
|
|
|
+ return s
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (sm *stateMachine) setSnapshoter(snapshoter Snapshoter) {
|
|
func (sm *stateMachine) setSnapshoter(snapshoter Snapshoter) {
|
|
|
sm.snapshoter = snapshoter
|
|
sm.snapshoter = snapshoter
|
|
|
}
|
|
}
|
|
@@ -175,6 +199,7 @@ func (sm *stateMachine) send(m Message) {
|
|
|
m.ClusterId = sm.clusterId
|
|
m.ClusterId = sm.clusterId
|
|
|
m.From = sm.id
|
|
m.From = sm.id
|
|
|
m.Term = sm.term.Get()
|
|
m.Term = sm.term.Get()
|
|
|
|
|
+ golog.Printf("raft.send msg %v\n", m)
|
|
|
sm.msgs = append(sm.msgs, m)
|
|
sm.msgs = append(sm.msgs, m)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -321,6 +346,11 @@ func (sm *stateMachine) Msgs() []Message {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (sm *stateMachine) Step(m Message) (ok bool) {
|
|
func (sm *stateMachine) Step(m Message) (ok bool) {
|
|
|
|
|
+ golog.Printf("raft.step beforeState %v\n", sm)
|
|
|
|
|
+ golog.Printf("raft.step beforeLog %v\n", sm.log)
|
|
|
|
|
+ defer golog.Printf("raft.step afterLog %v\n", sm.log)
|
|
|
|
|
+ defer golog.Printf("raft.step afterState %v\n", sm)
|
|
|
|
|
+ golog.Printf("raft.step msg %v\n", m)
|
|
|
if m.Type == msgHup {
|
|
if m.Type == msgHup {
|
|
|
sm.becomeCandidate()
|
|
sm.becomeCandidate()
|
|
|
if sm.q() == sm.poll(sm.id, true) {
|
|
if sm.q() == sm.poll(sm.id, true) {
|