Browse Source

raft/etcd: recover node

Yicheng Qin 11 years ago
parent
commit
a28dc4559b
5 changed files with 21 additions and 21 deletions
  1. 11 9
      etcd/participant.go
  2. 0 1
      raft/log.go
  3. 7 6
      raft/node.go
  4. 1 1
      raft/node_test.go
  5. 2 4
      raft/raft.go

+ 11 - 9
etcd/participant.go

@@ -152,22 +152,24 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 			log.Printf("id=%x participant.snapload err=%s\n", p.id, err)
 			return nil, err
 		}
+		var logIndex int64
 		if s != nil {
-			p.node.Restore(*s)
-			if err := p.Recovery(s.Data); err != nil {
-				panic(err)
-			}
-			log.Printf("id=%x recovered index=%d\n", p.id, s.Index)
+			logIndex = s.Index
 		}
-
-		n, err := wal.Read(walDir, 0)
+		n, err := wal.Read(walDir, logIndex)
 		if err != nil {
 			return nil, err
 		}
 		p.id = n.Id
-		p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
+		p.node.Node = raft.Recover(s, n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
 		p.apply(p.node.Next())
-		log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walDir, n.State, len(n.Ents))
+		log.Printf("id=%x participant.load path=%s snap=%+v state=\"%+v\" len(ents)=%d", p.id, p.cfg.DataDir, s, n.State, len(n.Ents))
+		if s != nil {
+			if err := p.Recovery(s.Data); err != nil {
+				panic(err)
+			}
+			log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index)
+		}
 		if w, err = wal.Open(walDir); err != nil {
 			return nil, err
 		}

+ 0 - 1
raft/log.go

@@ -171,7 +171,6 @@ func (l *raftLog) restore(s Snapshot) {
 	l.applied = s.Index
 	l.offset = s.Index
 	l.snapshot = s
-	l.unstableSnapshot = s
 }
 
 func (l *raftLog) at(i int64) *Entry {

+ 7 - 6
raft/node.go

@@ -52,10 +52,15 @@ func New(id int64, heartbeat, election tick) *Node {
 	return n
 }
 
-func Recover(id int64, ents []Entry, state State, heartbeat, election tick) *Node {
+func Recover(s *Snapshot, id int64, ents []Entry, state State, heartbeat, election tick) *Node {
 	n := New(id, heartbeat, election)
+	if s != nil {
+		n.sm.restore(*s)
+	}
 	n.sm.loadEnts(ents)
-	n.sm.loadState(state)
+	if !state.IsEmpty() {
+		n.sm.loadState(state)
+	}
 	return n
 }
 
@@ -231,10 +236,6 @@ func (n *Node) UpdateConf(t int64, c *Config) {
 	n.propose(t, data)
 }
 
-func (n *Node) Restore(s Snapshot) bool {
-	return n.sm.restore(s)
-}
-
 // UnstableEnts retuens all the entries that need to be persistent.
 // The first return value is offset, and the second one is unstable entries.
 func (n *Node) UnstableEnts() []Entry {

+ 1 - 1
raft/node_test.go

@@ -192,7 +192,7 @@ func TestRecover(t *testing.T) {
 	ents := []Entry{{Term: 1}, {Term: 2}, {Term: 3}}
 	state := State{Term: 500, Vote: 1, Commit: 3}
 
-	n := Recover(0, ents, state, defaultHeartbeat, defaultElection)
+	n := Recover(nil, 0, ents, state, defaultHeartbeat, defaultElection)
 	if g := n.Next(); !reflect.DeepEqual(g, ents) {
 		t.Errorf("ents = %+v, want %+v", g, ents)
 	}

+ 2 - 4
raft/raft.go

@@ -388,6 +388,7 @@ func (sm *stateMachine) handleAppendEntries(m Message) {
 
 func (sm *stateMachine) handleSnapshot(m Message) {
 	if sm.restore(m.Snapshot) {
+		sm.raftLog.unstableSnapshot = m.Snapshot
 		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
 	} else {
 		sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.committed})
@@ -574,10 +575,7 @@ func (sm *stateMachine) setState(vote, term, commit int64) {
 }
 
 func (sm *stateMachine) loadEnts(ents []Entry) {
-	if !sm.raftLog.isEmpty() {
-		panic("cannot load entries when log is not empty")
-	}
-	sm.raftLog.append(0, ents...)
+	sm.raftLog.append(sm.raftLog.lastIndex(), ents...)
 	sm.raftLog.unstable = sm.raftLog.lastIndex() + 1
 }