Browse Source

Merge pull request #819 from unihorn/97

fix(server): joinIndex is not set after recovery from full outage
Yicheng Qin 11 years ago
parent
commit
2387ef3f21

+ 2 - 0
etcd/etcd.go

@@ -232,6 +232,7 @@ func (e *Etcd) Run() {
 		DataDir:    e.Config.DataDir,
 	}
 	e.StandbyServer = server.NewStandbyServer(ssConfig, client)
+	e.StandbyServer.SetRaftServer(raftServer)
 
 	// Generating config could be slow.
 	// Put it here to make listen happen immediately after peer-server starting.
@@ -347,6 +348,7 @@ func (e *Etcd) runServer() {
 			raftServer.SetElectionTimeout(electionTimeout)
 			raftServer.SetHeartbeatInterval(heartbeatInterval)
 			e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
+			e.StandbyServer.SetRaftServer(raftServer)
 
 			e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
 			e.setMode(PeerMode)

+ 1 - 0
server/peer_server.go

@@ -214,6 +214,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo
 		// TODO(yichengq): Think about the action that should be done
 		// if it cannot connect any of the previous known node.
 		log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
+		s.SetJoinIndex(s.raftServer.CommitIndex())
 		toStart = true
 		return
 	}

+ 8 - 3
server/standby_server.go

@@ -36,8 +36,9 @@ type standbyInfo struct {
 }
 
 type StandbyServer struct {
-	Config StandbyServerConfig
-	client *Client
+	Config     StandbyServerConfig
+	client     *Client
+	raftServer raft.Server
 
 	standbyInfo
 	joinIndex uint64
@@ -62,6 +63,10 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer
 	return s
 }
 
+func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
+	s.raftServer = raftServer
+}
+
 func (s *StandbyServer) Start() {
 	s.Lock()
 	defer s.Unlock()
@@ -237,7 +242,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
 func (s *StandbyServer) join(peer string) error {
 	for _, url := range s.ClusterURLs() {
 		if s.Config.PeerURL == url {
-			s.joinIndex = 0
+			s.joinIndex = s.raftServer.CommitIndex()
 			return nil
 		}
 	}

+ 68 - 0
tests/functional/multi_node_kill_all_and_recovery_test.go

@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"os"
 	"strconv"
+	"strings"
 	"testing"
 	"time"
 
@@ -100,6 +101,8 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) {
 		t.Fatal("cannot create cluster")
 	}
 
+	time.Sleep(time.Second)
+
 	c := etcd.NewClient(nil)
 
 	go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
@@ -239,3 +242,68 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 	assert.NoError(t, err)
 	assert.Equal(t, len(result.Node.Nodes), 7)
 }
+
+// Create a five nodes
+// Kill all the nodes and restart, then remove the leader
+func TestMultiNodeKillAllAndRecoveryAndRemoveLeader(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+	defer DestroyCluster(etcds)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	c := etcd.NewClient(nil)
+
+	go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
+	<-all
+	<-leaderChan
+	stop <- true
+
+	c.SyncCluster()
+
+	// kill all
+	DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	stop = make(chan bool)
+	leaderChan = make(chan string, 1)
+	all = make(chan bool, 1)
+
+	time.Sleep(time.Second)
+
+	for i := 0; i < clusterSize; i++ {
+		etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
+	}
+
+	go Monitor(clusterSize, 1, leaderChan, all, stop)
+
+	<-all
+	leader := <-leaderChan
+
+	_, err = c.Set("foo", "bar", 0)
+	if err != nil {
+		t.Fatalf("Recovery error: %s", err)
+	}
+
+	port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
+	num := port - 7000
+	resp, _ := tests.Delete(leader+"/v2/admin/machines/node"+strconv.Itoa(num), "application/json", nil)
+	if !assert.Equal(t, resp.StatusCode, 200) {
+		t.FailNow()
+	}
+
+	// check the old leader is in standby mode now
+	time.Sleep(time.Second)
+	resp, _ = tests.Get(leader + "/name")
+	assert.Equal(t, resp.StatusCode, 404)
+}

+ 2 - 1
tests/functional/remove_node_test.go

@@ -169,7 +169,8 @@ func TestRemovePausedNode(t *testing.T) {
 	if !assert.Equal(t, r.StatusCode, 200) {
 		t.FailNow()
 	}
-	time.Sleep(2 * time.Second)
+	// Wait for standby instances to update its cluster config
+	time.Sleep(6 * time.Second)
 
 	resp, err := c.Get("_etcd/machines", false, false)
 	if err != nil {

+ 1 - 1
tests/functional/simple_snapshot_test.go

@@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) {
 
 	index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
 
-	if index < 1010 || index > 1025 {
+	if index < 1010 || index > 1029 {
 		t.Fatal("wrong name of snapshot :", snapshots[0].Name())
 	}
 }