Browse Source

server: recover peerHub when restart from snapshot

Yicheng Qin 11 years ago
parent
commit
fab17f216d
2 changed files with 55 additions and 26 deletions
  1. 34 25
      etcd/etcd_test.go
  2. 21 1
      etcd/participant.go

+ 34 - 25
etcd/etcd_test.go

@@ -230,7 +230,7 @@ func TestTakingSnapshot(t *testing.T) {
 		cl.Participant(0).Set("/foo", false, "bar", store.Permanent)
 		cl.Participant(0).Set("/foo", false, "bar", store.Permanent)
 	}
 	}
 	snap := cl.Participant(0).node.GetSnap()
 	snap := cl.Participant(0).node.GetSnap()
-	if snap.Index != defaultCompact {
+	if snap.Index != int64(defaultCompact) {
 		t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
 		t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
 	}
 	}
 }
 }
@@ -273,7 +273,7 @@ func TestRestoreSnapshotFromLeader(t *testing.T) {
 	}
 	}
 
 
 	// check new proposal could be committed in the new machine
 	// check new proposal could be committed in the new machine
-	wch, err := ts.Participant().Watch("/foo", false, false, defaultCompact)
+	wch, err := ts.Participant().Watch("/foo", false, false, uint64(defaultCompact))
 	if err != nil {
 	if err != nil {
 		t.Errorf("watch err = %v", err)
 		t.Errorf("watch err = %v", err)
 	}
 	}
@@ -314,34 +314,43 @@ func TestSaveSnapshot(t *testing.T) {
 func TestRestoreSnapshotFromDisk(t *testing.T) {
 func TestRestoreSnapshotFromDisk(t *testing.T) {
 	defer afterTest(t)
 	defer afterTest(t)
 
 
-	cl := testCluster{Size: 1}
-	cl.Start()
-	defer cl.Destroy()
+	tests := []int{1, 3, 5}
 
 
-	lead, _ := cl.Leader()
-	for i := 0; i < defaultCompact+10; i++ {
-		cl.Participant(lead).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
-	}
+	// TODO(xiangli): tunable compact; reduce testing time
+	oldDefaultCompact := defaultCompact
+	defaultCompact = 10
+	defer func() { defaultCompact = oldDefaultCompact }()
 
 
-	cl.Stop()
-	cl.Restart()
+	for _, tt := range tests {
+		cl := testCluster{Size: tt}
+		cl.Start()
+		defer cl.Destroy()
 
 
-	lead, _ = cl.Leader()
-	// check store is recovered
-	for i := 0; i < defaultCompact+10; i++ {
-		ev, err := cl.Participant(lead).Store.Get(fmt.Sprint("/foo", i), false, false)
-		if err != nil {
-			t.Errorf("get err = %v", err)
-			continue
+		lead, _ := cl.Leader()
+		for i := 0; i < defaultCompact+10; i++ {
+			cl.Participant(lead).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
 		}
 		}
-		w := fmt.Sprint("bar", i)
-		if g := *ev.Node.Value; g != w {
-			t.Errorf("value = %v, want %v", g, w)
+
+		cl.Stop()
+		cl.Restart()
+
+		lead, _ = cl.Leader()
+		// check store is recovered
+		for i := 0; i < defaultCompact+10; i++ {
+			ev, err := cl.Participant(lead).Store.Get(fmt.Sprint("/foo", i), false, false)
+			if err != nil {
+				t.Errorf("get err = %v", err)
+				continue
+			}
+			w := fmt.Sprint("bar", i)
+			if g := *ev.Node.Value; g != w {
+				t.Errorf("value = %v, want %v", g, w)
+			}
+		}
+		// check new proposal could be submitted
+		if _, err := cl.Participant(lead).Set("/foo", false, "bar", store.Permanent); err != nil {
+			t.Fatal(err)
 		}
 		}
-	}
-	// check new proposal could be submitted
-	if _, err := cl.Participant(lead).Set("/foo", false, "bar", store.Permanent); err != nil {
-		t.Fatal(err)
 	}
 	}
 }
 }
 
 

+ 21 - 1
etcd/participant.go

@@ -22,6 +22,7 @@ import (
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
 	"net/http"
 	"net/http"
+	"net/url"
 	"os"
 	"os"
 	"path"
 	"path"
 	"time"
 	"time"
@@ -37,7 +38,6 @@ import (
 const (
 const (
 	defaultHeartbeat = 1
 	defaultHeartbeat = 1
 	defaultElection  = 5
 	defaultElection  = 5
-	defaultCompact   = 10000
 
 
 	maxBufferedProposal = 128
 	maxBufferedProposal = 128
 
 
@@ -58,6 +58,8 @@ const (
 )
 )
 
 
 var (
 var (
+	defaultCompact = 10000
+
 	tmpErr      = fmt.Errorf("try again")
 	tmpErr      = fmt.Errorf("try again")
 	stopErr     = fmt.Errorf("server is stopped")
 	stopErr     = fmt.Errorf("server is stopped")
 	raftStopErr = fmt.Errorf("raft is stopped")
 	raftStopErr = fmt.Errorf("raft is stopped")
@@ -158,6 +160,24 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 				panic(err)
 				panic(err)
 			}
 			}
 			log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index)
 			log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index)
+
+			for _, node := range s.Nodes {
+				pp := path.Join(v2machineKVPrefix, fmt.Sprint(node))
+				ev, err := p.Store.Get(pp, false, false)
+				if err != nil {
+					panic(err)
+				}
+				q, err := url.ParseQuery(*ev.Node.Value)
+				if err != nil {
+					panic(err)
+				}
+				peer, err := p.peerHub.add(node, q["raft"][0])
+				if err != nil {
+					panic(err)
+				}
+				peer.participate()
+			}
+
 			snapIndex = s.Index
 			snapIndex = s.Index
 		}
 		}
 		n, err := wal.Read(walDir, snapIndex)
 		n, err := wal.Read(walDir, snapIndex)