Browse Source

integration: add integration test for remove member

Xiang Li 11 years ago
parent
commit
c26de66262
2 changed files with 75 additions and 0 deletions
  1. 8 0
      etcdserver/server.go
  2. 67 0
      integration/cluster_test.go

+ 8 - 0
etcdserver/server.go

@@ -186,6 +186,8 @@ type EtcdServer struct {
 	// Cache of the latest raft index and raft term the server has seen
 	raftIndex uint64
 	raftTerm  uint64
+
+	raftLead uint64
 }
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -342,6 +344,7 @@ func (s *EtcdServer) run() {
 			s.node.Tick()
 		case rd := <-s.node.Ready():
 			if rd.SoftState != nil {
+				atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
 				nodes = rd.SoftState.Nodes
 				if rd.RaftState == raft.StateLeader {
 					syncC = s.SyncTicker
@@ -532,6 +535,11 @@ func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) }
 
 func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) }
 
+// Only for testing purpose
+// TODO: add Raft server interface to expose raft related info:
+// Index, Term, Lead, Committed, Applied, LastIndex, etc.
+func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) }
+
 // configure sends a configuration change through consensus and
 // then waits for it to be applied to the server. It
 // will block until the change is performed or there is an error.

+ 67 - 0
integration/cluster_test.go

@@ -99,6 +99,27 @@ func testDoubleClusterSize(t *testing.T, size int) {
 	clusterMustProgress(t, c)
 }
 
+func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
+func TestDecreaseClusterSizeOf5(t *testing.T) {
+	t.Skip("enable after reducing the election collision rate")
+	// election collision rate is too high when enabling --race
+	testDecreaseClusterSize(t, 5)
+}
+
+func testDecreaseClusterSize(t *testing.T, size int) {
+	defer afterTest(t)
+	c := NewCluster(t, size)
+	c.Launch(t)
+	defer c.Terminate(t)
+
+	for i := 0; i < size-1; i++ {
+		id := c.Members[len(c.Members)-1].s.ID()
+		c.RemoveMember(t, uint64(id))
+		c.waitLeader(t)
+	}
+	clusterMustProgress(t, c)
+}
+
 // clusterMustProgress ensures that cluster can make progress. It creates
 // a key first, and check the new key could be got from all client urls of
 // the cluster.
@@ -251,6 +272,32 @@ func (c *cluster) AddMember(t *testing.T) {
 	c.waitMembersMatch(t, c.HTTPMembers())
 }
 
+func (c *cluster) RemoveMember(t *testing.T, id uint64) {
+	// send remove request to the cluster
+	cc := mustNewHTTPClient(t, []string{c.URL(0)})
+	ma := client.NewMembersAPI(cc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
+		t.Fatalf("unexpected remove error %v", err)
+	}
+	cancel()
+	newMembers := make([]*member, 0)
+	for _, m := range c.Members {
+		if uint64(m.s.ID()) != id {
+			newMembers = append(newMembers, m)
+		} else {
+			select {
+			case <-m.s.StopNotify():
+				m.Terminate(t)
+			case <-time.After(time.Second):
+				t.Fatalf("failed to remove member %s in one second", m.s.ID())
+			}
+		}
+	}
+	c.Members = newMembers
+	c.waitMembersMatch(t, c.HTTPMembers())
+}
+
 func (c *cluster) Terminate(t *testing.T) {
 	for _, m := range c.Members {
 		m.Terminate(t)
@@ -274,6 +321,26 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) {
 	return
 }
 
+func (c *cluster) waitLeader(t *testing.T) {
+	possibleLead := make(map[uint64]bool)
+	var lead uint64
+	for _, m := range c.Members {
+		possibleLead[uint64(m.s.ID())] = true
+	}
+
+	for lead == 0 || !possibleLead[lead] {
+		lead = 0
+		for _, m := range c.Members {
+			if lead != 0 && lead != m.s.Lead() {
+				lead = 0
+				break
+			}
+			lead = m.s.Lead()
+		}
+		time.Sleep(10 * tickDuration)
+	}
+}
+
 func (c *cluster) name(i int) string {
 	return fmt.Sprint("node", i)
 }