Browse Source

Merge pull request #2979 from xiang90/fix_sendapp

raft: fix panic in send app
Xiang Li 10 years ago
parent
commit
e01d53b853
4 changed files with 148 additions and 55 deletions
  1. 70 24
      raft/log.go
  2. 57 19
      raft/log_test.go
  3. 14 9
      raft/raft.go
  4. 7 3
      raft/raft_test.go

+ 70 - 24
raft/log.go

@@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
 		if !l.matchTerm(ne.Index, ne.Term) {
 			if ne.Index <= l.lastIndex() {
 				raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
-					ne.Index, l.term(ne.Index), ne.Term)
+					ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
 			}
 			return ne.Index
 		}
@@ -135,7 +135,11 @@ func (l *raftLog) unstableEntries() []pb.Entry {
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	off := max(l.applied+1, l.firstIndex())
 	if l.committed+1 > off {
-		return l.slice(off, l.committed+1, noLimit)
+		ents, err := l.slice(off, l.committed+1, noLimit)
+		if err != nil {
+			raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err)
+		}
+		return ents
 	}
 	return nil
 }
@@ -193,38 +197,55 @@ func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
 
 func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
 
-func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }
+func (l *raftLog) lastTerm() uint64 {
+	t, err := l.term(l.lastIndex())
+	if err != nil {
+		raftLogger.Panicf("unexpected error when getting the last term (%v)", err)
+	}
+	return t
+}
 
-func (l *raftLog) term(i uint64) uint64 {
+func (l *raftLog) term(i uint64) (uint64, error) {
 	// the valid term range is [index of dummy entry, last index]
 	dummyIndex := l.firstIndex() - 1
 	if i < dummyIndex || i > l.lastIndex() {
-		return 0
+		// TODO: return an error instead?
+		return 0, nil
 	}
 
 	if t, ok := l.unstable.maybeTerm(i); ok {
-		return t
+		return t, nil
 	}
 
 	t, err := l.storage.Term(i)
 	if err == nil {
-		return t
+		return t, nil
 	}
 	if err == ErrCompacted {
-		return 0
+		return 0, err
 	}
 	panic(err) // TODO(bdarnell)
 }
 
-func (l *raftLog) entries(i, maxsize uint64) []pb.Entry {
+func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
 	if i > l.lastIndex() {
-		return nil
+		return nil, nil
 	}
 	return l.slice(i, l.lastIndex()+1, maxsize)
 }
 
 // allEntries returns all entries in the log.
-func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) }
+func (l *raftLog) allEntries() []pb.Entry {
+	ents, err := l.entries(l.firstIndex(), noLimit)
+	if err == nil {
+		return ents
+	}
+	if err == ErrCompacted { // try again if there was a racing compaction
+		return l.allEntries()
+	}
+	// TODO (xiangli): handle error?
+	panic(err)
+}
 
 // isUpToDate determines if the given (lastIndex,term) log is more up-to-date
 // by comparing the index and term of the last entries in the existing logs.
@@ -236,10 +257,16 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool {
 	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
 }
 
-func (l *raftLog) matchTerm(i, term uint64) bool { return l.term(i) == term }
+func (l *raftLog) matchTerm(i, term uint64) bool {
+	t, err := l.term(i)
+	if err != nil {
+		return false
+	}
+	return t == term
+}
 
 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
-	if maxIndex > l.committed && l.term(maxIndex) == term {
+	if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term {
 		l.commitTo(maxIndex)
 		return true
 	}
@@ -253,17 +280,19 @@ func (l *raftLog) restore(s pb.Snapshot) {
 }
 
 // slice returns a slice of log entries from lo through hi-1, inclusive.
-func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
-	l.mustCheckOutOfBounds(lo, hi)
+func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
+	err := l.mustCheckOutOfBounds(lo, hi)
+	if err != nil {
+		return nil, err
+	}
 	if lo == hi {
-		return nil
+		return nil, nil
 	}
 	var ents []pb.Entry
 	if lo < l.unstable.offset {
 		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
 		if err == ErrCompacted {
-			// This should never fail because it has been checked before.
-			raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
+			return nil, err
 		} else if err == ErrUnavailable {
 			raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
 		} else if err != nil {
@@ -272,7 +301,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
 
 		// check if ents has reached the size limitation
 		if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
-			return storedEnts
+			return storedEnts, nil
 		}
 
 		ents = storedEnts
@@ -286,16 +315,33 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
 			ents = unstable
 		}
 	}
-	return limitSize(ents, maxSize)
+	return limitSize(ents, maxSize), nil
 }
 
 // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
-func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) {
+func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
 	if lo > hi {
 		raftLogger.Panicf("invalid slice %d > %d", lo, hi)
 	}
-	length := l.lastIndex() - l.firstIndex() + 1
-	if lo < l.firstIndex() || hi > l.firstIndex()+length {
-		raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex())
+	fi := l.firstIndex()
+	if lo < fi {
+		return ErrCompacted
+	}
+
+	length := l.lastIndex() - fi + 1
+	if lo < fi || hi > fi+length {
+		raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
 	}
+	return nil
+}
+
+func zeroTermOnErrCompacted(t uint64, err error) uint64 {
+	if err == nil {
+		return t
+	}
+	if err == ErrCompacted {
+		return 0
+	}
+	raftLogger.Panicf("unexpected error (%v)", err)
+	return 0
 }

+ 57 - 19
raft/log_test.go

@@ -132,11 +132,15 @@ func TestAppend(t *testing.T) {
 		if index != tt.windex {
 			t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
 		}
-		if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) {
+		g, err := raftLog.entries(1, noLimit)
+		if err != nil {
+			t.Fatalf("#%d: unexpected error %v", i, err)
+		}
+		if !reflect.DeepEqual(g, tt.wents) {
 			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
 		}
-		if g := raftLog.unstable.offset; g != tt.wunstable {
-			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
+		if goff := raftLog.unstable.offset; goff != tt.wunstable {
+			t.Errorf("#%d: unstable = %d, want %d", i, goff, tt.wunstable)
 		}
 	}
 }
@@ -257,7 +261,10 @@ func TestLogMaybeAppend(t *testing.T) {
 				t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit)
 			}
 			if gappend && len(tt.ents) != 0 {
-				gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit)
+				gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit)
+				if err != nil {
+					t.Fatalf("unexpected error %v", err)
+				}
 				if !reflect.DeepEqual(tt.ents, gents) {
 					t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents)
 				}
@@ -297,8 +304,8 @@ func TestCompactionSideEffects(t *testing.T) {
 	}
 
 	for j := offset; j <= raftLog.lastIndex(); j++ {
-		if raftLog.term(j) != j {
-			t.Errorf("term(%d) = %d, want %d", j, raftLog.term(j), j)
+		if mustTerm(raftLog.term(j)) != j {
+			t.Errorf("term(%d) = %d, want %d", j, mustTerm(raftLog.term(j)), j)
 		}
 	}
 
@@ -322,7 +329,10 @@ func TestCompactionSideEffects(t *testing.T) {
 		t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1)
 	}
 
-	ents := raftLog.entries(raftLog.lastIndex(), noLimit)
+	ents, err := raftLog.entries(raftLog.lastIndex(), noLimit)
+	if err != nil {
+		t.Fatalf("unexpected error %v", err)
+	}
 	if len(ents) != 1 {
 		t.Errorf("len(entries) = %d, want = %d", len(ents), 1)
 	}
@@ -555,8 +565,8 @@ func TestLogRestore(t *testing.T) {
 	if raftLog.unstable.offset != index+1 {
 		t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
 	}
-	if raftLog.term(index) != term {
-		t.Errorf("term = %d, want %d", raftLog.term(index), term)
+	if mustTerm(raftLog.term(index)) != term {
+		t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term)
 	}
 }
 
@@ -572,40 +582,49 @@ func TestIsOutOfBounds(t *testing.T) {
 
 	first := offset + 1
 	tests := []struct {
-		lo, hi uint64
-		wpainc bool
+		lo, hi        uint64
+		wpanic        bool
+		wErrCompacted bool
 	}{
 		{
 			first - 2, first + 1,
+			false,
 			true,
 		},
 		{
 			first - 1, first + 1,
+			false,
 			true,
 		},
 		{
 			first, first,
 			false,
+			false,
 		},
 		{
 			first + num/2, first + num/2,
 			false,
+			false,
 		},
 		{
 			first + num - 1, first + num - 1,
 			false,
+			false,
 		},
 		{
 			first + num, first + num,
 			false,
+			false,
 		},
 		{
 			first + num, first + num + 1,
 			true,
+			false,
 		},
 		{
 			first + num + 1, first + num + 1,
 			true,
+			false,
 		},
 	}
 
@@ -613,15 +632,21 @@ func TestIsOutOfBounds(t *testing.T) {
 		func() {
 			defer func() {
 				if r := recover(); r != nil {
-					if !tt.wpainc {
+					if !tt.wpanic {
 						t.Errorf("%d: panic = %v, want %v: %v", i, true, false, r)
 					}
 				}
 			}()
-			l.mustCheckOutOfBounds(tt.lo, tt.hi)
-			if tt.wpainc {
+			err := l.mustCheckOutOfBounds(tt.lo, tt.hi)
+			if tt.wpanic {
 				t.Errorf("%d: panic = %v, want %v", i, false, true)
 			}
+			if tt.wErrCompacted && err != ErrCompacted {
+				t.Errorf("%d: err = %v, want %v", i, err, ErrCompacted)
+			}
+			if !tt.wErrCompacted && err != nil {
+				t.Errorf("%d: unexpected err %v", i, err)
+			}
 		}()
 	}
 }
@@ -650,7 +675,7 @@ func TestTerm(t *testing.T) {
 	}
 
 	for j, tt := range tests {
-		term := l.term(tt.index)
+		term := mustTerm(l.term(tt.index))
 		if !reflect.DeepEqual(term, tt.w) {
 			t.Errorf("#%d: at = %d, want %d", j, term, tt.w)
 		}
@@ -680,7 +705,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		term := l.term(tt.index)
+		term := mustTerm(l.term(tt.index))
 		if !reflect.DeepEqual(term, tt.w) {
 			t.Errorf("#%d: at = %d, want %d", i, term, tt.w)
 		}
@@ -714,8 +739,8 @@ func TestSlice(t *testing.T) {
 		wpanic bool
 	}{
 		// test no limit
-		{offset - 1, offset + 1, noLimit, nil, true},
-		{offset, offset + 1, noLimit, nil, true},
+		{offset - 1, offset + 1, noLimit, nil, false},
+		{offset, offset + 1, noLimit, nil, false},
 		{half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false},
 		{half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false},
 		{last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false},
@@ -739,10 +764,23 @@ func TestSlice(t *testing.T) {
 					}
 				}
 			}()
-			g := l.slice(tt.from, tt.to, tt.limit)
+			g, err := l.slice(tt.from, tt.to, tt.limit)
+			if tt.from <= offset && err != ErrCompacted {
+				t.Fatalf("#%d: err = %v, want %v", j, err, ErrCompacted)
+			}
+			if tt.from > offset && err != nil {
+				t.Fatalf("#%d: unexpected error %v", j, err)
+			}
 			if !reflect.DeepEqual(g, tt.w) {
 				t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w)
 			}
 		}()
 	}
 }
+
+func mustTerm(term uint64, err error) uint64 {
+	if err != nil {
+		panic(err)
+	}
+	return term
+}

+ 14 - 9
raft/raft.go

@@ -243,7 +243,11 @@ func (r *raft) sendAppend(to uint64) {
 	}
 	m := pb.Message{}
 	m.To = to
-	if r.needSnapshot(pr.Next) {
+
+	term, errt := r.raftLog.term(pr.Next - 1)
+	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
+
+	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
 		m.Type = pb.MsgSnap
 		snapshot, err := r.raftLog.snapshot()
 		if err != nil {
@@ -261,8 +265,8 @@ func (r *raft) sendAppend(to uint64) {
 	} else {
 		m.Type = pb.MsgApp
 		m.Index = pr.Next - 1
-		m.LogTerm = r.raftLog.term(pr.Next - 1)
-		m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize)
+		m.LogTerm = term
+		m.Entries = ents
 		m.Commit = r.raftLog.committed
 		if n := len(m.Entries); n != 0 {
 			switch pr.State {
@@ -413,7 +417,12 @@ func (r *raft) becomeLeader() {
 	r.tick = r.tickHeartbeat
 	r.lead = r.id
 	r.state = StateLeader
-	for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) {
+	ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
+	if err != nil {
+		raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
+	}
+
+	for _, e := range ents {
 		if e.Type != pb.EntryConfChange {
 			continue
 		}
@@ -658,7 +667,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
 	} else {
 		raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
-			r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
+			r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
 	}
 }
@@ -712,10 +721,6 @@ func (r *raft) restore(s pb.Snapshot) bool {
 	return true
 }
 
-func (r *raft) needSnapshot(i uint64) bool {
-	return i < r.raftLog.firstIndex()
-}
-
 // promotable indicates whether state machine can be promoted to leader,
 // which is true when its own id is in progress list.
 func (r *raft) promotable() bool {

+ 7 - 3
raft/raft_test.go

@@ -1480,8 +1480,8 @@ func TestRestore(t *testing.T) {
 	if sm.raftLog.lastIndex() != s.Metadata.Index {
 		t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
 	}
-	if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term {
-		t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term)
+	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
+		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
 	}
 	sg := sm.nodes()
 	if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
@@ -1630,7 +1630,11 @@ func TestStepIgnoreConfig(t *testing.T) {
 	pendingConf := r.pendingConf
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
 	wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
-	if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) {
+	ents, err := r.raftLog.entries(index+1, noLimit)
+	if err != nil {
+		t.Fatalf("unexpected error %v", err)
+	}
+	if !reflect.DeepEqual(ents, wents) {
 		t.Errorf("ents = %+v, want %+v", ents, wents)
 	}
 	if r.pendingConf != pendingConf {