Browse Source

raft: limit the size of msgApp

limit the max size of entries sent per message.
Lower the cost at probing state as we limit the size per message;
lower the penalty when aggressively decrease to a too low next.
Xiang Li 10 years ago
parent
commit
7571b2cde2
9 changed files with 126 additions and 38 deletions
  1. 13 7
      raft/log.go
  2. 30 12
      raft/log_test.go
  3. 2 2
      raft/multinode_test.go
  4. 13 6
      raft/raft.go
  5. 1 1
      raft/raft_test.go
  6. 7 3
      raft/storage.go
  7. 18 7
      raft/storage_test.go
  8. 15 0
      raft/util.go
  9. 27 0
      raft/util_test.go

+ 13 - 7
raft/log.go

@@ -136,7 +136,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	off := max(l.applied+1, l.firstIndex())
 	if l.committed+1 > off {
-		return l.slice(off, l.committed+1)
+		return l.slice(off, l.committed+1, noLimit)
 	}
 	return nil
 }
@@ -217,15 +217,15 @@ func (l *raftLog) term(i uint64) uint64 {
 	panic(err) // TODO(bdarnell)
 }
 
-func (l *raftLog) entries(i uint64) []pb.Entry {
+func (l *raftLog) entries(i, maxsize uint64) []pb.Entry {
 	if i > l.lastIndex() {
 		return nil
 	}
-	return l.slice(i, l.lastIndex()+1)
+	return l.slice(i, l.lastIndex()+1, maxsize)
 }
 
 // allEntries returns all entries in the log.
-func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex()) }
+func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) }
 
 // isUpToDate determines if the given (lastIndex,term) log is more up-to-date
 // by comparing the index and term of the last entries in the existing logs.
@@ -254,14 +254,14 @@ func (l *raftLog) restore(s pb.Snapshot) {
 }
 
 // slice returns a slice of log entries from lo through hi-1, inclusive.
-func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
+func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
 	l.mustCheckOutOfBounds(lo, hi)
 	if lo == hi {
 		return nil
 	}
 	var ents []pb.Entry
 	if lo < l.unstable.offset {
-		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset))
+		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
 		if err == ErrCompacted {
 			// This should never fail because it has been checked before.
 			raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
@@ -270,6 +270,12 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
 		} else if err != nil {
 			panic(err) // TODO(bdarnell)
 		}
+
+		// check if ents has reached the size limitation
+		if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
+			return storedEnts
+		}
+
 		ents = storedEnts
 	}
 	if hi > l.unstable.offset {
@@ -281,7 +287,7 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
 			ents = unstable
 		}
 	}
-	return ents
+	return limitSize(ents, maxSize)
 }
 
 // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)

+ 30 - 12
raft/log_test.go

@@ -132,7 +132,7 @@ func TestAppend(t *testing.T) {
 		if index != tt.windex {
 			t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
 		}
-		if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) {
+		if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) {
 			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
 		}
 		if g := raftLog.unstable.offset; g != tt.wunstable {
@@ -257,7 +257,7 @@ func TestLogMaybeAppend(t *testing.T) {
 				t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit)
 			}
 			if gappend && len(tt.ents) != 0 {
-				gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1)
+				gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit)
 				if !reflect.DeepEqual(tt.ents, gents) {
 					t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents)
 				}
@@ -322,7 +322,7 @@ func TestCompactionSideEffects(t *testing.T) {
 		t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1)
 	}
 
-	ents := raftLog.entries(raftLog.lastIndex())
+	ents := raftLog.entries(raftLog.lastIndex(), noLimit)
 	if len(ents) != 1 {
 		t.Errorf("len(entries) = %d, want = %d", len(ents), 1)
 	}
@@ -691,25 +691,43 @@ func TestSlice(t *testing.T) {
 	var i uint64
 	offset := uint64(100)
 	num := uint64(100)
+	last := offset + num
+	half := offset + num/2
+	halfe := pb.Entry{Index: half, Term: half}
 
 	storage := NewMemoryStorage()
 	storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
+	for i = 1; i < num/2; i++ {
+		storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}})
+	}
 	l := newLog(storage)
-	for i = 1; i < num; i++ {
+	for i = num / 2; i < num; i++ {
 		l.append(pb.Entry{Index: offset + i, Term: offset + i})
 	}
 
 	tests := []struct {
-		from   uint64
-		to     uint64
+		from  uint64
+		to    uint64
+		limit uint64
+
 		w      []pb.Entry
 		wpanic bool
 	}{
-		{offset - 1, offset + 1, nil, true},
-		{offset, offset + 1, nil, true},
-		{offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: offset + num/2, Term: offset + num/2}}, false},
-		{offset + num - 1, offset + num, []pb.Entry{{Index: offset + num - 1, Term: offset + num - 1}}, false},
-		{offset + num, offset + num + 1, nil, true},
+		// test no limit
+		{offset - 1, offset + 1, noLimit, nil, true},
+		{offset, offset + 1, noLimit, nil, true},
+		{half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false},
+		{half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false},
+		{last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false},
+		{last, last + 1, noLimit, nil, true},
+
+		// test limit
+		{half - 1, half + 1, 0, []pb.Entry{{Index: half - 1, Term: half - 1}}, false},
+		{half - 1, half + 1, uint64(halfe.Size() + 1), []pb.Entry{{Index: half - 1, Term: half - 1}}, false},
+		{half - 1, half + 1, uint64(halfe.Size() * 2), []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false},
+		{half - 1, half + 2, uint64(halfe.Size() * 3), []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}, {Index: half + 1, Term: half + 1}}, false},
+		{half, half + 2, uint64(halfe.Size()), []pb.Entry{{Index: half, Term: half}}, false},
+		{half, half + 2, uint64(halfe.Size() * 2), []pb.Entry{{Index: half, Term: half}, {Index: half + 1, Term: half + 1}}, false},
 	}
 
 	for j, tt := range tests {
@@ -721,7 +739,7 @@ func TestSlice(t *testing.T) {
 					}
 				}
 			}()
-			g := l.slice(tt.from, tt.to)
+			g := l.slice(tt.from, tt.to, tt.limit)
 			if !reflect.DeepEqual(g, tt.w) {
 				t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w)
 			}

+ 2 - 2
raft/multinode_test.go

@@ -140,7 +140,7 @@ func TestMultiNodePropose(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	entries, err := s.Entries(lastIndex, lastIndex+1)
+	entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -191,7 +191,7 @@ func TestMultiNodeProposeConfig(t *testing.T) {
 	}
 	mn.Stop()
 
-	entries, err := s.Entries(lastIndex, lastIndex+1)
+	entries, err := s.Entries(lastIndex, lastIndex+1, noLimit)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 13 - 6
raft/raft.go

@@ -17,6 +17,7 @@ package raft
 import (
 	"errors"
 	"fmt"
+	"math"
 	"math/rand"
 	"sort"
 	"strings"
@@ -26,6 +27,7 @@ import (
 
 // None is a placeholder node ID used when there is no leader.
 const None uint64 = 0
+const noLimit = math.MaxUint64
 
 var errNoLeader = errors.New("no leader")
 
@@ -189,7 +191,8 @@ type raft struct {
 	// the log
 	raftLog *raftLog
 
-	prs map[uint64]*Progress
+	maxMsgSize uint64
+	prs        map[uint64]*Progress
 
 	state StateType
 
@@ -231,9 +234,13 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
 		peers = cs.Nodes
 	}
 	r := &raft{
-		id:               id,
-		lead:             None,
-		raftLog:          raftlog,
+		id:      id,
+		lead:    None,
+		raftLog: raftlog,
+		// 4MB for now and hard code it
+		// TODO(xiang): add a config arguement into newRaft after we add
+		// the max inflight message field.
+		maxMsgSize:       4 * 1024 * 1024,
 		prs:              make(map[uint64]*Progress),
 		electionTimeout:  election,
 		heartbeatTimeout: heartbeat,
@@ -314,7 +321,7 @@ func (r *raft) sendAppend(to uint64) {
 		m.Type = pb.MsgApp
 		m.Index = pr.Next - 1
 		m.LogTerm = r.raftLog.term(pr.Next - 1)
-		m.Entries = r.raftLog.entries(pr.Next)
+		m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize)
 		m.Commit = r.raftLog.committed
 		if n := len(m.Entries); n != 0 {
 			switch pr.State {
@@ -463,7 +470,7 @@ func (r *raft) becomeLeader() {
 	r.tick = r.tickHeartbeat
 	r.lead = r.id
 	r.state = StateLeader
-	for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
+	for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) {
 		if e.Type != pb.EntryConfChange {
 			continue
 		}

+ 1 - 1
raft/raft_test.go

@@ -1629,7 +1629,7 @@ func TestStepIgnoreConfig(t *testing.T) {
 	pendingConf := r.pendingConf
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
 	wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
-	if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) {
+	if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) {
 		t.Errorf("ents = %+v, want %+v", ents, wents)
 	}
 	if r.pendingConf != pendingConf {

+ 7 - 3
raft/storage.go

@@ -41,7 +41,9 @@ type Storage interface {
 	// InitialState returns the saved HardState and ConfState information.
 	InitialState() (pb.HardState, pb.ConfState, error)
 	// Entries returns a slice of log entries in the range [lo,hi).
-	Entries(lo, hi uint64) ([]pb.Entry, error)
+	// MaxSize limits the total size of the log entries returned, but
+	// Entries returns at least one entry if any.
+	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
 	// Term returns the term of entry i, which must be in the range
 	// [FirstIndex()-1, LastIndex()]. The term of the entry before
 	// FirstIndex is retained for matching purposes even though the
@@ -92,7 +94,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
 }
 
 // Entries implements the Storage interface.
-func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
+func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
 	ms.Lock()
 	defer ms.Unlock()
 	offset := ms.ents[0].Index
@@ -106,7 +108,9 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
 	if len(ms.ents) == 1 {
 		return nil, ErrUnavailable
 	}
-	return ms.ents[lo-offset : hi-offset], nil
+
+	ents := ms.ents[lo-offset : hi-offset]
+	return limitSize(ents, maxSize), nil
 }
 
 // Term implements the Storage interface.

+ 18 - 7
raft/storage_test.go

@@ -15,6 +15,7 @@
 package raft
 
 import (
+	"math"
 	"reflect"
 	"testing"
 
@@ -50,22 +51,32 @@ func TestStorageTerm(t *testing.T) {
 }
 
 func TestStorageEntries(t *testing.T) {
-	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}
 	tests := []struct {
-		lo, hi uint64
+		lo, hi, maxsize uint64
 
 		werr     error
 		wentries []pb.Entry
 	}{
-		{2, 6, ErrCompacted, nil},
-		{3, 4, ErrCompacted, nil},
-		{4, 5, nil, []pb.Entry{{Index: 4, Term: 4}}},
-		{4, 6, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		{2, 6, math.MaxUint64, ErrCompacted, nil},
+		{3, 4, math.MaxUint64, ErrCompacted, nil},
+		{4, 5, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}}},
+		{4, 6, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		{4, 7, math.MaxUint64, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}},
+		// even if maxsize is zero, the first entry should be returned
+		{4, 7, 0, nil, []pb.Entry{{Index: 4, Term: 4}}},
+		// limit to 2
+		{4, 7, uint64(ents[1].Size() + ents[2].Size()), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		// limit to 2
+		{4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size()/2), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		{4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size() - 1), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		// all
+		{4, 7, uint64(ents[1].Size() + ents[2].Size() + ents[3].Size()), nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}},
 	}
 
 	for i, tt := range tests {
 		s := &MemoryStorage{ents: ents}
-		entries, err := s.Entries(tt.lo, tt.hi)
+		entries, err := s.Entries(tt.lo, tt.hi, tt.maxsize)
 		if err != tt.werr {
 			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
 		}

+ 15 - 0
raft/util.go

@@ -93,3 +93,18 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string {
 	}
 	return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
 }
+
+func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
+	if len(ents) == 0 {
+		return ents
+	}
+	size := ents[0].Size()
+	var limit int
+	for limit = 1; limit < len(ents); limit++ {
+		size += ents[limit].Size()
+		if uint64(size) > maxSize {
+			break
+		}
+	}
+	return ents[:limit]
+}

+ 27 - 0
raft/util_test.go

@@ -15,6 +15,8 @@
 package raft
 
 import (
+	"math"
+	"reflect"
 	"strings"
 	"testing"
 
@@ -43,3 +45,28 @@ func TestDescribeEntry(t *testing.T) {
 		t.Errorf("unexpected custom output: %s", customFormatted)
 	}
 }
+
+func TestLimitSize(t *testing.T) {
+	ents := []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}
+	tests := []struct {
+		maxsize  uint64
+		wentries []pb.Entry
+	}{
+		{math.MaxUint64, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}},
+		// even if maxsize is zero, the first entry should be returned
+		{0, []pb.Entry{{Index: 4, Term: 4}}},
+		// limit to 2
+		{uint64(ents[0].Size() + ents[1].Size()), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		// limit to 2
+		{uint64(ents[0].Size() + ents[1].Size() + ents[2].Size()/2), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		{uint64(ents[0].Size() + ents[1].Size() + ents[2].Size() - 1), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+		// all
+		{uint64(ents[0].Size() + ents[1].Size() + ents[2].Size()), []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}},
+	}
+
+	for i, tt := range tests {
+		if !reflect.DeepEqual(limitSize(ents, tt.maxsize), tt.wentries) {
+			t.Errorf("#%d: entries = %v, want %v", i, limitSize(ents, tt.maxsize), tt.wentries)
+		}
+	}
+}