Browse Source

Merge pull request #799 from xiangli-cmu/deny_unknow_peer

hack(server): notify removed peers when they try to become candidates
Xiang Li 11 years ago
parent
commit
1e7a7b11dd

+ 10 - 0
server/peer_server.go

@@ -122,6 +122,8 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
 
 	raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
 
+	raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent)
+
 	s.raftServer = raftServer
 	s.removedInLog = false
 
@@ -652,6 +654,14 @@ func (s *PeerServer) PeerStats() []byte {
 	return nil
 }
 
+// removedEvent handles the case where a machine has been removed from the
+// cluster and is notified when it tries to become a candidate.
+func (s *PeerServer) removedEvent(event raft.Event) {
+	// HACK(philips): we need to find a better notification for this.
+	log.Infof("removed during cluster re-configuration")
+	s.asyncRemove()
+}
+
 // raftEventLogger converts events from the Raft server into log messages.
 func (s *PeerServer) raftEventLogger(event raft.Event) {
 	value := event.Value()

+ 79 - 0
tests/functional/remove_node_test.go

@@ -3,8 +3,10 @@ package test
 import (
 	"bytes"
 	"fmt"
+	"math/rand"
 	"net/http"
 	"os"
+	"syscall"
 	"testing"
 	"time"
 
@@ -148,3 +150,80 @@ func TestRemoveNode(t *testing.T) {
 		}
 	}
 }
+
+func TestRemovePausedNode(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 4
+	_, etcds, _ := CreateCluster(clusterSize, procAttr, false)
+	defer DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+
+	r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":1, "syncInterval":1}`))
+	if !assert.Equal(t, r.StatusCode, 200) {
+		t.FailNow()
+	}
+	time.Sleep(2 * time.Second)
+
+	resp, err := c.Get("_etcd/machines", false, false)
+	if err != nil {
+		panic(err)
+	}
+	if len(resp.Node.Nodes) != 3 {
+		t.Fatal("cannot remove peer")
+	}
+
+	for i := 0; i < clusterSize; i++ {
+		// first pause the node, then remove it, then resume it
+		idx := rand.Int() % clusterSize
+
+		etcds[idx].Signal(syscall.SIGSTOP)
+		fmt.Printf("pause node%d and let standby node take its place\n", idx+1)
+		time.Sleep(4 * time.Second)
+
+		resp, err := c.Get("_etcd/machines", false, false)
+		if err != nil {
+			panic(err)
+		}
+		if len(resp.Node.Nodes) != 3 {
+			t.Fatal("cannot remove peer")
+		}
+		for i := 0; i < 3; i++ {
+			if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) {
+				t.Fatal("node should be removed")
+			}
+		}
+
+		etcds[idx].Signal(syscall.SIGCONT)
+		// let it change its state to candidate at least
+		time.Sleep(time.Second)
+
+		stop := make(chan bool)
+		leaderChan := make(chan string, 1)
+		all := make(chan bool, 1)
+
+		go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
+		<-all
+		<-leaderChan
+		stop <- true
+
+		resp, err = c.Get("_etcd/machines", false, false)
+		if err != nil {
+			panic(err)
+		}
+		if len(resp.Node.Nodes) != 3 {
+			t.Fatalf("add peer fails (%d != 3)", len(resp.Node.Nodes))
+		}
+		for i := 0; i < 3; i++ {
+			if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) {
+				t.Fatal("node should be removed")
+			}
+		}
+	}
+}

+ 2 - 1
third_party/github.com/goraft/raft/event.go

@@ -4,9 +4,10 @@ const (
 	StateChangeEventType  = "stateChange"
 	LeaderChangeEventType = "leaderChange"
 	TermChangeEventType   = "termChange"
-	CommitEventType   = "commit"
+	CommitEventType       = "commit"
 	AddPeerEventType      = "addPeer"
 	RemovePeerEventType   = "removePeer"
+	RemovedEventType      = "removed"
 
 	HeartbeatIntervalEventType        = "heartbeatInterval"
 	ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"

+ 11 - 0
third_party/github.com/goraft/raft/server.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"hash/crc32"
 	"io/ioutil"
+	"math"
 	"os"
 	"path"
 	"sort"
@@ -1040,6 +1041,11 @@ func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
 		return true
 	}
 
+	if resp.Term == math.MaxUint64 {
+		s.debugln("got a removal notification, stopping")
+		s.DispatchEvent(newEvent(RemovedEventType, nil, nil))
+	}
+
 	if resp.Term > s.currentTerm {
 		s.debugln("server.candidate.vote.failed")
 		s.updateCurrentTerm(resp.Term, "")
@@ -1064,6 +1070,11 @@ func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
 
 // Processes a "request vote" request.
 func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
+	// Deny the vote quest if the candidate is not in the current cluster
+	if _, ok := s.peers[req.CandidateName]; !ok {
+		s.debugln("server.rv.deny.vote: unknown peer ", req.CandidateName)
+		return newRequestVoteResponse(math.MaxUint64, false), false
+	}
 
 	// If the request is coming from an old term then reject it.
 	if req.Term < s.Term() {