Browse Source

Merge pull request #1823 from xiang90/raft_log

raft: logging state change events and events on bad path
Xiang Li 11 years ago
parent
commit
6692a8060e
4 changed files with 48 additions and 4 deletions
  1. 7 3
      raft/log.go
  2. 7 1
      raft/log_unstable.go
  3. 22 0
      raft/raft.go
  4. 12 0
      raft/raft_paper_test.go

+ 7 - 3
raft/log.go

@@ -111,9 +111,13 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
 // The index of the given entries MUST be continuously increasing.
 func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
 	// TODO(xiangli): validate the index of ents
-	for i, ne := range ents {
-		if !l.matchTerm(from+uint64(i), ne.Term) {
-			return from + uint64(i)
+	for offset, ne := range ents {
+		if i := from + uint64(offset); !l.matchTerm(ne.Index, ne.Term) {
+			if i <= l.lastIndex() {
+				log.Printf("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]",
+					i, l.term(i), ne.Term)
+			}
+			return i
 		}
 	}
 	return 0

+ 7 - 1
raft/log_unstable.go

@@ -16,7 +16,11 @@
 
 package raft
 
-import pb "github.com/coreos/etcd/raft/raftpb"
+import (
+	"log"
+
+	pb "github.com/coreos/etcd/raft/raftpb"
+)
 
 // unstable.entris[i] has raft log position i+unstable.offset.
 // Note that unstable.offset may be less than the highest log
@@ -102,6 +106,7 @@ func (u *unstable) restore(s pb.Snapshot) {
 func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) {
 	switch {
 	case after < u.offset:
+		log.Printf("raftlog: replace the unstable entries from index %d", after+1)
 		// The log is being truncated to before our current offset
 		// portion, so set the offset and replace the entries
 		u.offset = after + 1
@@ -113,6 +118,7 @@ func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) {
 	default:
 		// truncate to after and copy to u.entries
 		// then append
+		log.Printf("raftlog: truncate the unstable entries to index %d", after)
 		u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
 		u.entries = append(u.entries, ents...)
 	}

+ 22 - 0
raft/raft.go

@@ -19,6 +19,7 @@ package raft
 import (
 	"errors"
 	"fmt"
+	"log"
 	"math/rand"
 	"sort"
 
@@ -189,6 +190,11 @@ func (r *raft) String() string {
 }
 
 func (r *raft) poll(id uint64, v bool) (granted int) {
+	if v {
+		log.Printf("raft: %x received vote from %x at term %x", r.id, id, r.Term)
+	} else {
+		log.Printf("raft: %x received vote rejection from %x at term %x", r.id, id, r.Term)
+	}
 	if _, ok := r.votes[id]; !ok {
 		r.votes[id] = v
 	}
@@ -347,6 +353,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
 	r.tick = r.tickElection
 	r.lead = lead
 	r.state = StateFollower
+	log.Printf("raft: %x became follower at term %d", r.id, r.Term)
 }
 
 func (r *raft) becomeCandidate() {
@@ -359,6 +366,7 @@ func (r *raft) becomeCandidate() {
 	r.tick = r.tickElection
 	r.Vote = r.id
 	r.state = StateCandidate
+	log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
 }
 
 func (r *raft) becomeLeader() {
@@ -381,6 +389,7 @@ func (r *raft) becomeLeader() {
 		r.pendingConf = true
 	}
 	r.appendEntry(pb.Entry{Data: nil})
+	log.Printf("raft: %x became leader at term %d", r.id, r.Term)
 }
 
 func (r *raft) campaign() {
@@ -425,6 +434,8 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
 	} else {
+		log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+			r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
 	}
 }
@@ -477,6 +488,8 @@ func stepLeader(r *raft, m pb.Message) {
 			return
 		}
 		if m.Reject {
+			log.Printf("raft: %x received msgApp rejection from %x for index %d",
+				r.id, m.From, m.Index)
 			if r.prs[m.From].maybeDecrTo(m.Index) {
 				r.sendAppend(m.From)
 			}
@@ -487,6 +500,8 @@ func stepLeader(r *raft, m pb.Message) {
 			}
 		}
 	case pb.MsgVote:
+		log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
+			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 	}
 }
@@ -502,9 +517,12 @@ func stepCandidate(r *raft, m pb.Message) {
 		r.becomeFollower(m.Term, m.From)
 		r.handleSnapshot(m)
 	case pb.MsgVote:
+		log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
+			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 	case pb.MsgVoteResp:
 		gr := r.poll(m.From, !m.Reject)
+		log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
 		switch r.q() {
 		case gr:
 			r.becomeLeader()
@@ -537,9 +555,13 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgVote:
 		if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
 			r.elapsed = 0
+			log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
+				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 			r.Vote = m.From
 			r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
 		} else {
+			log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
+				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 			r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		}
 	}

+ 12 - 0
raft/raft_paper_test.go

@@ -29,6 +29,10 @@ package raft
 
 import (
 	"fmt"
+	"io/ioutil"
+	"log"
+	"os"
+
 	"reflect"
 	"sort"
 	"testing"
@@ -292,9 +296,13 @@ func TestCandidateFallback(t *testing.T) {
 }
 
 func TestFollowerElectionTimeoutRandomized(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stderr)
 	testNonleaderElectionTimeoutRandomized(t, StateFollower)
 }
 func TestCandidateElectionTimeoutRandomized(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stderr)
 	testNonleaderElectionTimeoutRandomized(t, StateCandidate)
 }
 
@@ -329,9 +337,13 @@ func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
 }
 
 func TestFollowersElectioinTimeoutNonconflict(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stderr)
 	testNonleadersElectionTimeoutNonconflict(t, StateFollower)
 }
 func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
+	log.SetOutput(ioutil.Discard)
+	defer log.SetOutput(os.Stderr)
 	testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
 }