Browse Source

*: support removing the leader from a 2 members cluster

Xiang Li 11 years ago
parent
commit
152676f43a
3 changed files with 21 additions and 4 deletions
  1. 15 3
      etcdserver/server.go
  2. 1 1
      integration/cluster_test.go
  3. 5 0
      raft/node.go

+ 15 - 3
etcdserver/server.go

@@ -435,7 +435,9 @@ func (s *EtcdServer) run() {
 				}
 				if len(ents) > 0 {
 					if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
-						return
+						m1 := fmt.Sprintf("etcdserver: removed local member %s from cluster %s", s.ID(), s.Cluster.ID())
+						m2 := fmt.Sprint("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
+						go s.stopWithDelay(10*100*time.Millisecond, m1, m2)
 					}
 				}
 			}
@@ -460,14 +462,26 @@ func (s *EtcdServer) run() {
 // Stop stops the server gracefully, and shuts down the running goroutine.
 // Stop should be called after a Start(s), otherwise it will block forever.
 func (s *EtcdServer) Stop() {
+	s.stopWithMessages()
+}
+
+func (s *EtcdServer) stopWithMessages(msgs ...string) {
 	select {
 	case s.stop <- struct{}{}:
+		for _, msg := range msgs {
+			log.Println(msg)
+		}
 	case <-s.done:
 		return
 	}
 	<-s.done
 }
 
+func (s *EtcdServer) stopWithDelay(d time.Duration, msgs ...string) {
+	time.Sleep(d)
+	s.stopWithMessages(msgs...)
+}
+
 // StopNotify returns a channel that receives a empty struct
 // when the server is stopped.
 func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
@@ -784,8 +798,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		id := types.ID(cc.NodeID)
 		s.Cluster.RemoveMember(id)
 		if id == s.id {
-			log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID())
-			log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
 			return true, nil
 		} else {
 			s.sendhub.Remove(id)

+ 1 - 1
integration/cluster_test.go

@@ -113,7 +113,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
 	defer c.Terminate(t)
 
 	// TODO: remove the last but one member
-	for i := 0; i < size-2; i++ {
+	for i := 0; i < size-1; i++ {
 		id := c.Members[len(c.Members)-1].s.ID()
 		c.RemoveMember(t, uint64(id))
 		c.waitLeader(t, c.Members)

+ 5 - 0
raft/node.go

@@ -284,6 +284,11 @@ func (n *node) run(r *raft) {
 			case pb.ConfChangeAddNode:
 				r.addNode(cc.NodeID)
 			case pb.ConfChangeRemoveNode:
+				// block incoming proposal when local node is
+				// removed
+				if cc.NodeID == r.id {
+					n.propc = nil
+				}
 				r.removeNode(cc.NodeID)
 			case pb.ConfChangeUpdateNode:
 				r.resetPendingConf()