Browse Source

raft: add index to entry

Xiang Li 11 years ago
parent
commit
54b4f52e48
6 changed files with 24 additions and 28 deletions
  1. 1 1
      etcd/participant.go
  2. 6 6
      raft/log.go
  3. 9 14
      raft/log_test.go
  4. 1 1
      raft/node.go
  5. 1 0
      raft/raft.go
  6. 6 6
      raft/raft_test.go

+ 1 - 1
etcd/participant.go

@@ -201,7 +201,7 @@ func (p *participant) run() int64 {
 			return stopMode
 		}
 		p.apply(node.Next())
-		_, ents := node.UnstableEnts()
+		ents := node.UnstableEnts()
 		p.save(ents, node.UnstableState())
 		p.send(node.Msgs())
 		if node.IsRemoved() {

+ 6 - 6
raft/log.go

@@ -15,9 +15,10 @@ const (
 )
 
 type Entry struct {
-	Type int64
-	Term int64
-	Data []byte
+	Type  int64
+	Term  int64
+	Index int64
+	Data  []byte
 }
 
 func (e *Entry) isConfig() bool {
@@ -88,11 +89,10 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 {
 	return -1
 }
 
-func (l *raftLog) unstableEnts() (int64, []Entry) {
-	offset := l.unstable
+func (l *raftLog) unstableEnts() []Entry {
 	ents := l.entries(l.unstable)
 	l.unstable = l.lastIndex() + 1
-	return offset, ents
+	return ents
 }
 
 func (l *raftLog) lastIndex() int64 {

+ 9 - 14
raft/log_test.go

@@ -77,7 +77,7 @@ func TestCompactionSideEffects(t *testing.T) {
 	raftLog := newLog()
 
 	for i = 0; i < lastIndex; i++ {
-		raftLog.append(int64(i), Entry{Term: int64(i + 1)})
+		raftLog.append(int64(i), Entry{Term: int64(i + 1), Index: int64(i + 1)})
 	}
 
 	raftLog.compact(500)
@@ -98,13 +98,13 @@ func TestCompactionSideEffects(t *testing.T) {
 		}
 	}
 
-	offset, unstableEnts := raftLog.unstableEnts()
-	if offset != 501 {
-		t.Errorf("offset(unstableEntries) = %d, want = %d", offset, 500)
-	}
+	unstableEnts := raftLog.unstableEnts()
 	if g := len(unstableEnts); g != 500 {
 		t.Errorf("len(unstableEntries) = %d, want = %d", g, 500)
 	}
+	if unstableEnts[0].Index != 501 {
+		t.Errorf("Index = %d, want = %d", unstableEnts[0].Index, 501)
+	}
 
 	prev := raftLog.lastIndex()
 	raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1})
@@ -119,25 +119,21 @@ func TestCompactionSideEffects(t *testing.T) {
 }
 
 func TestUnstableEnts(t *testing.T) {
-	previousEnts := []Entry{{Term: 1}, {Term: 2}}
+	previousEnts := []Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
 	tests := []struct {
 		unstable  int64
-		woffset   int64
 		wents     []Entry
 		wunstable int64
 	}{
-		{3, 3, nil, 3},
-		{1, 1, []Entry{{Term: 1}, {Term: 2}}, 3},
+		{3, nil, 3},
+		{1, previousEnts, 3},
 	}
 
 	for i, tt := range tests {
 		raftLog := newLog()
 		raftLog.ents = append(raftLog.ents, previousEnts...)
 		raftLog.unstable = tt.unstable
-		offset, ents := raftLog.unstableEnts()
-		if offset != tt.woffset {
-			t.Errorf("#%d: offset = %d, want = %d", i, offset, tt.woffset)
-		}
+		ents := raftLog.unstableEnts()
 		if !reflect.DeepEqual(ents, tt.wents) {
 			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
 		}
@@ -145,7 +141,6 @@ func TestUnstableEnts(t *testing.T) {
 			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
 		}
 	}
-
 }
 
 //TestCompaction ensures that the number of log entreis is correct after compactions.

+ 1 - 1
raft/node.go

@@ -219,7 +219,7 @@ func (n *Node) UpdateConf(t int64, c *Config) {
 
 // UnstableEnts retuens all the entries that need to be persistent.
 // The first return value is offset, and the second one is unstable entries.
-func (n *Node) UnstableEnts() (int64, []Entry) {
+func (n *Node) UnstableEnts() []Entry {
 	return n.sm.raftLog.unstableEnts()
 }
 

+ 1 - 0
raft/raft.go

@@ -301,6 +301,7 @@ func (sm *stateMachine) q() int {
 
 func (sm *stateMachine) appendEntry(e Entry) {
 	e.Term = sm.term.Get()
+	e.Index = sm.raftLog.lastIndex() + 1
 	sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e))
 	sm.ins[sm.id].update(sm.raftLog.lastIndex())
 	sm.maybeCommit()

+ 6 - 6
raft/raft_test.go

@@ -210,7 +210,7 @@ func TestDuelingCandidates(t *testing.T) {
 	nt.recover()
 	nt.send(Message{From: 2, To: 2, Type: msgHup})
 
-	wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1}}, committed: 1}
+	wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1}
 	tests := []struct {
 		sm      *stateMachine
 		state   stateType
@@ -262,7 +262,7 @@ func TestCandidateConcede(t *testing.T) {
 	if g := a.term; g != 1 {
 		t.Errorf("term = %d, want %d", g, 1)
 	}
-	wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2})
+	wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
 	for i, p := range tt.peers {
 		if sm, ok := p.(*stateMachine); ok {
 			l := ltoa(sm.raftLog)
@@ -296,8 +296,8 @@ func TestOldMessages(t *testing.T) {
 
 	l := &raftLog{
 		ents: []Entry{
-			{}, {Type: Normal, Data: nil, Term: 1},
-			{Type: Normal, Data: nil, Term: 2}, {Type: Normal, Data: nil, Term: 3},
+			{}, {Type: Normal, Data: nil, Term: 1, Index: 1},
+			{Type: Normal, Data: nil, Term: 2, Index: 2}, {Type: Normal, Data: nil, Term: 3, Index: 3},
 		},
 		committed: 3,
 	}
@@ -351,7 +351,7 @@ func TestProposal(t *testing.T) {
 
 		wantLog := newLog()
 		if tt.success {
-			wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}
+			wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
 		}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
@@ -385,7 +385,7 @@ func TestProposalByProxy(t *testing.T) {
 		// propose via follower
 		tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}})
 
-		wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}
+		wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
 			if sm, ok := p.(*stateMachine); ok {