Pārlūkot izejas kodu

Merge pull request #5814 from heyitsanthony/functest-refactor

etcd-tester: refactor cluster and failure code
Anthony Romano 9 gadi atpakaļ
vecāks
revīzija
3b80df7f4e

+ 75 - 181
tools/functional-tester/etcd-tester/cluster.go

@@ -22,10 +22,7 @@ import (
 	"time"
 
 	"golang.org/x/net/context"
-	"google.golang.org/grpc"
 
-	clientv2 "github.com/coreos/etcd/client"
-	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
 )
@@ -35,17 +32,14 @@ const peerURLPort = 2380
 type cluster struct {
 	v2Only bool // to be deprecated
 
-	agentEndpoints       []string
 	datadir              string
 	stressKeySize        int
 	stressKeySuffixRange int
 
-	Size       int
-	Agents     []client.Agent
-	Stressers  []Stresser
-	Names      []string
-	GRPCURLs   []string
-	ClientURLs []string
+	Size      int
+	Stressers []Stresser
+
+	Members []*member
 }
 
 type ClusterStatus struct {
@@ -56,68 +50,53 @@ type ClusterStatus struct {
 func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
 	c := &cluster{
 		v2Only:               isV2Only,
-		agentEndpoints:       agentEndpoints,
 		datadir:              datadir,
 		stressKeySize:        stressKeySize,
 		stressKeySuffixRange: stressKeySuffixRange,
 	}
-	if err := c.Bootstrap(); err != nil {
+	if err := c.bootstrap(agentEndpoints); err != nil {
 		return nil, err
 	}
 	return c, nil
 }
 
-func (c *cluster) Bootstrap() error {
-	size := len(c.agentEndpoints)
+func (c *cluster) bootstrap(agentEndpoints []string) error {
+	size := len(agentEndpoints)
 
-	agents := make([]client.Agent, size)
-	names := make([]string, size)
-	grpcURLs := make([]string, size)
-	clientURLs := make([]string, size)
-	peerURLs := make([]string, size)
-	members := make([]string, size)
-	for i, u := range c.agentEndpoints {
-		var err error
-		agents[i], err = client.NewAgent(u)
+	members := make([]*member, size)
+	memberNameURLs := make([]string, size)
+	for i, u := range agentEndpoints {
+		agent, err := client.NewAgent(u)
 		if err != nil {
 			return err
 		}
-
-		names[i] = fmt.Sprintf("etcd-%d", i)
-
 		host, _, err := net.SplitHostPort(u)
 		if err != nil {
 			return err
 		}
-		grpcURLs[i] = fmt.Sprintf("%s:2379", host)
-		clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
-		peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort)
-
-		members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
+		members[i] = &member{
+			Agent:     agent,
+			Endpoint:  u,
+			Name:      fmt.Sprintf("etcd-%d", i),
+			ClientURL: fmt.Sprintf("http://%s:2379", host),
+			PeerURL:   fmt.Sprintf("http://%s:%d", host, peerURLPort),
+		}
+		memberNameURLs[i] = members[i].ClusterEntry()
 	}
-	clusterStr := strings.Join(members, ",")
+	clusterStr := strings.Join(memberNameURLs, ",")
 	token := fmt.Sprint(rand.Int())
 
-	for i, a := range agents {
-		flags := []string{
-			"--name", names[i],
+	for i, m := range members {
+		flags := append(
+			m.Flags(),
 			"--data-dir", c.datadir,
-
-			"--listen-client-urls", clientURLs[i],
-			"--advertise-client-urls", clientURLs[i],
-
-			"--listen-peer-urls", peerURLs[i],
-			"--initial-advertise-peer-urls", peerURLs[i],
-
 			"--initial-cluster-token", token,
-			"--initial-cluster", clusterStr,
-			"--initial-cluster-state", "new",
-		}
+			"--initial-cluster", clusterStr)
 
-		if _, err := a.Start(flags...); err != nil {
+		if _, err := m.Agent.Start(flags...); err != nil {
 			// cleanup
-			for j := 0; j < i; j++ {
-				agents[j].Terminate()
+			for _, m := range members[:i] {
+				m.Agent.Terminate()
 			}
 			return err
 		}
@@ -126,52 +105,55 @@ func (c *cluster) Bootstrap() error {
 	// TODO: Too intensive stressers can panic etcd member with
 	// 'out of memory' error. Put rate limits in server side.
 	stressN := 100
-	var stressers []Stresser
-	if c.v2Only {
-		for _, u := range clientURLs {
-			s := &stresserV2{
-				Endpoint:       u,
+	c.Stressers = make([]Stresser, len(members))
+	for i, m := range members {
+		if c.v2Only {
+			c.Stressers[i] = &stresserV2{
+				Endpoint:       m.ClientURL,
 				KeySize:        c.stressKeySize,
 				KeySuffixRange: c.stressKeySuffixRange,
 				N:              stressN,
 			}
-			go s.Stress()
-			stressers = append(stressers, s)
-		}
-	} else {
-		for _, u := range grpcURLs {
-			s := &stresser{
-				Endpoint:       u,
+		} else {
+			c.Stressers[i] = &stresser{
+				Endpoint:       m.grpcAddr(),
 				KeySize:        c.stressKeySize,
 				KeySuffixRange: c.stressKeySuffixRange,
 				N:              stressN,
 			}
-			go s.Stress()
-			stressers = append(stressers, s)
 		}
+		go c.Stressers[i].Stress()
 	}
 
 	c.Size = size
-	c.Agents = agents
-	c.Stressers = stressers
-	c.Names = names
-	c.GRPCURLs = grpcURLs
-	c.ClientURLs = clientURLs
+	c.Members = members
 	return nil
 }
 
+func (c *cluster) Reset() error {
+	eps := make([]string, len(c.Members))
+	for i, m := range c.Members {
+		eps[i] = m.Endpoint
+	}
+	return c.bootstrap(eps)
+}
+
 func (c *cluster) WaitHealth() error {
 	var err error
 	// wait 60s to check cluster health.
 	// TODO: set it to a reasonable value. It is set that high because
 	// follower may use long time to catch up the leader when reboot under
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
-	healthFunc, urls := setHealthKey, c.GRPCURLs
+	healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
 	if c.v2Only {
-		healthFunc, urls = setHealthKeyV2, c.ClientURLs
+		healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
 	}
 	for i := 0; i < 60; i++ {
-		err = healthFunc(urls)
+		for _, m := range c.Members {
+			if err = healthFunc(m); err != nil {
+				break
+			}
+		}
 		if err == nil {
 			return nil
 		}
@@ -186,27 +168,12 @@ func (c *cluster) GetLeader() (int, error) {
 	if c.v2Only {
 		return 0, nil
 	}
-
-	for i, ep := range c.GRPCURLs {
-		cli, err := clientv3.New(clientv3.Config{
-			Endpoints:   []string{ep},
-			DialTimeout: 5 * time.Second,
-		})
-		if err != nil {
-			return 0, err
-		}
-		defer cli.Close()
-
-		mapi := clientv3.NewMaintenance(cli)
-		resp, err := mapi.Status(context.Background(), ep)
-		if err != nil {
-			return 0, err
-		}
-		if resp.Header.MemberId == resp.Leader {
-			return i, nil
+	for i, m := range c.Members {
+		isLeader, err := m.IsLeader()
+		if isLeader || err != nil {
+			return i, err
 		}
 	}
-
 	return 0, fmt.Errorf("no leader found")
 }
 
@@ -221,8 +188,8 @@ func (c *cluster) Report() (success, failure int) {
 
 func (c *cluster) Cleanup() error {
 	var lasterr error
-	for _, a := range c.Agents {
-		if err := a.Cleanup(); err != nil {
+	for _, m := range c.Members {
+		if err := m.Agent.Cleanup(); err != nil {
 			lasterr = err
 		}
 	}
@@ -233,8 +200,8 @@ func (c *cluster) Cleanup() error {
 }
 
 func (c *cluster) Terminate() {
-	for _, a := range c.Agents {
-		a.Terminate()
+	for _, m := range c.Members {
+		m.Agent.Terminate()
 	}
 	for _, s := range c.Stressers {
 		s.Cancel()
@@ -246,10 +213,10 @@ func (c *cluster) Status() ClusterStatus {
 		AgentStatuses: make(map[string]client.Status),
 	}
 
-	for i, a := range c.Agents {
-		s, err := a.Status()
+	for _, m := range c.Members {
+		s, err := m.Agent.Status()
 		// TODO: add a.Desc() as a key of the map
-		desc := c.agentEndpoints[i]
+		desc := m.Endpoint
 		if err != nil {
 			cs.AgentStatuses[desc] = client.Status{State: "unknown"}
 			plog.Printf("failed to get the status of agent [%s]", desc)
@@ -259,64 +226,16 @@ func (c *cluster) Status() ClusterStatus {
 	return cs
 }
 
-// setHealthKey sets health key on all given urls.
-func setHealthKey(us []string) error {
-	for _, u := range us {
-		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
-		if err != nil {
-			return fmt.Errorf("%v (%s)", err, u)
-		}
-		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-		kvc := pb.NewKVClient(conn)
-		_, err = kvc.Put(ctx, &pb.PutRequest{Key: []byte("health"), Value: []byte("good")})
-		cancel()
-		conn.Close()
-		if err != nil {
-			return fmt.Errorf("%v (%s)", err, u)
-		}
-	}
-	return nil
-}
-
-// setHealthKeyV2 sets health key on all given urls.
-func setHealthKeyV2(us []string) error {
-	for _, u := range us {
-		cfg := clientv2.Config{
-			Endpoints: []string{u},
-		}
-		c, err := clientv2.New(cfg)
-		if err != nil {
-			return err
-		}
-		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-		kapi := clientv2.NewKeysAPI(c)
-		_, err = kapi.Set(ctx, "health", "good", nil)
-		cancel()
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
 func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
 	revs := make(map[string]int64)
 	hashes := make(map[string]int64)
-	for _, u := range c.GRPCURLs {
-		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+	for _, m := range c.Members {
+		rev, hash, err := m.RevHash()
 		if err != nil {
 			return nil, nil, err
 		}
-		m := pb.NewMaintenanceClient(conn)
-		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-		resp, err := m.Hash(ctx, &pb.HashRequest{})
-		cancel()
-		conn.Close()
-		if err != nil {
-			return nil, nil, err
-		}
-		revs[u] = resp.Header.Revision
-		hashes[u] = int64(resp.Hash)
+		revs[m.ClientURL] = rev
+		hashes[m.ClientURL] = hash
 	}
 	return revs, hashes, nil
 }
@@ -326,8 +245,9 @@ func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 		return nil
 	}
 
-	for i, u := range c.GRPCURLs {
-		conn, derr := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+	for i, m := range c.Members {
+		u := m.ClientURL
+		conn, derr := m.dialGRPC()
 		if derr != nil {
 			plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u)
 			err = derr
@@ -360,45 +280,19 @@ func (c *cluster) checkCompact(rev int64) error {
 	if rev == 0 {
 		return nil
 	}
-	for _, u := range c.GRPCURLs {
-		cli, err := clientv3.New(clientv3.Config{
-			Endpoints:   []string{u},
-			DialTimeout: 5 * time.Second,
-		})
-		if err != nil {
-			return fmt.Errorf("%v (endpoint %s)", err, u)
-		}
-
-		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-		wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
-		wr, ok := <-wch
-		cancel()
-
-		cli.Close()
-
-		if !ok {
-			return fmt.Errorf("watch channel terminated (endpoint %s)", u)
-		}
-		if wr.CompactRevision != rev {
-			return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, u)
+	for _, m := range c.Members {
+		if err := m.CheckCompact(rev); err != nil {
+			return err
 		}
 	}
 	return nil
 }
 
 func (c *cluster) defrag() error {
-	for _, u := range c.GRPCURLs {
-		plog.Printf("defragmenting %s\n", u)
-		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
-		if err != nil {
+	for _, m := range c.Members {
+		if err := m.Defrag(); err != nil {
 			return err
 		}
-		mt := pb.NewMaintenanceClient(conn)
-		if _, err = mt.Defragment(context.Background(), &pb.DefragmentRequest{}); err != nil {
-			return err
-		}
-		conn.Close()
-		plog.Printf("defragmented %s\n", u)
 	}
 	return nil
 }

+ 65 - 305
tools/functional-tester/etcd-tester/failure.go

@@ -20,18 +20,6 @@ import (
 	"time"
 )
 
-const (
-	snapshotCount      = 10000
-	slowNetworkLatency = 500 // 500 millisecond
-	randomVariation    = 50
-
-	// Wait more when it recovers from slow network, because network layer
-	// needs extra time to propagate traffic control (tc command) change.
-	// Otherwise, we get different hash values from the previous revision.
-	// For more detail, please see https://github.com/coreos/etcd/issues/5121.
-	waitRecover = 5 * time.Second
-)
-
 type failure interface {
 	// Inject injeccts the failure into the testing cluster at the given
 	// round. When calling the function, the cluster should be in health.
@@ -47,355 +35,127 @@ type description string
 
 func (d description) Desc() string { return string(d) }
 
-type failureKillAll struct {
-	description
-}
-
-func newFailureKillAll() *failureKillAll {
-	return &failureKillAll{
-		description: "kill all members",
-	}
-}
-
-func (f *failureKillAll) Inject(c *cluster, round int) error {
-	for _, a := range c.Agents {
-		if err := a.Stop(); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (f *failureKillAll) Recover(c *cluster, round int) error {
-	for _, a := range c.Agents {
-		if _, err := a.Restart(); err != nil {
-			return err
-		}
-	}
-	return c.WaitHealth()
-}
-
-type failureKillMajority struct {
-	description
-}
-
-func newFailureKillMajority() *failureKillMajority {
-	return &failureKillMajority{
-		description: "kill majority of the cluster",
-	}
-}
-
-func (f *failureKillMajority) Inject(c *cluster, round int) error {
-	for i := range getToKillMap(c.Size, round) {
-		if err := c.Agents[i].Stop(); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-func (f *failureKillMajority) Recover(c *cluster, round int) error {
-	for i := range getToKillMap(c.Size, round) {
-		if _, err := c.Agents[i].Restart(); err != nil {
-			return err
-		}
-	}
-	return c.WaitHealth()
-}
-
-func getToKillMap(size int, seed int) map[int]bool {
-	m := make(map[int]bool)
-	r := rand.New(rand.NewSource(int64(seed)))
-	majority := size/2 + 1
-	for {
-		m[r.Intn(size)] = true
-		if len(m) >= majority {
-			return m
-		}
-	}
-}
+type injectMemberFunc func(*member) error
+type recoverMemberFunc func(*member) error
 
-type failureKillOne struct {
+type failureByFunc struct {
 	description
+	injectMember  injectMemberFunc
+	recoverMember recoverMemberFunc
 }
 
-func newFailureKillOne() *failureKillOne {
-	return &failureKillOne{
-		description: "kill one random member",
-	}
-}
-
-func (f *failureKillOne) Inject(c *cluster, round int) error {
-	i := round % c.Size
-	return c.Agents[i].Stop()
-}
-
-func (f *failureKillOne) Recover(c *cluster, round int) error {
-	i := round % c.Size
-	if _, err := c.Agents[i].Restart(); err != nil {
-		return err
-	}
-	return c.WaitHealth()
-}
-
-type failureKillLeader struct {
-	description
+type failureOne failureByFunc
+type failureAll failureByFunc
+type failureMajority failureByFunc
+type failureLeader struct {
+	failureByFunc
 	idx int
 }
 
-func newFailureKillLeader() *failureKillLeader {
-	return &failureKillLeader{
-		description: "kill leader member",
-	}
-}
+// failureDelay injects a failure and waits for a snapshot event
+type failureDelay struct{ failure }
 
-func (f *failureKillLeader) Inject(c *cluster, round int) error {
-	idx, err := c.GetLeader()
-	if err != nil {
-		return err
-	}
-	f.idx = idx
-	return c.Agents[idx].Stop()
+func (f *failureOne) Inject(c *cluster, round int) error {
+	return f.injectMember(c.Members[round%c.Size])
 }
 
-func (f *failureKillLeader) Recover(c *cluster, round int) error {
-	if _, err := c.Agents[f.idx].Restart(); err != nil {
+func (f *failureOne) Recover(c *cluster, round int) error {
+	if err := f.recoverMember(c.Members[round%c.Size]); err != nil {
 		return err
 	}
 	return c.WaitHealth()
 }
 
-// failureKillOneForLongTime kills one member for long time, and restart
-// after a snapshot is required.
-type failureKillOneForLongTime struct {
-	description
-}
-
-func newFailureKillOneForLongTime() *failureKillOneForLongTime {
-	return &failureKillOneForLongTime{
-		description: "kill one member for long time and expect it to recover from incoming snapshot",
-	}
-}
-
-func (f *failureKillOneForLongTime) Inject(c *cluster, round int) error {
-	i := round % c.Size
-	if err := c.Agents[i].Stop(); err != nil {
-		return err
-	}
-	if c.Size >= 3 {
-		start, _ := c.Report()
-		var end int
-		// Normal healthy cluster could accept 1000req/s at least.
-		// Give it 3-times time to create a new snapshot.
-		retry := snapshotCount / 1000 * 3
-		for j := 0; j < retry; j++ {
-			end, _ = c.Report()
-			// If the number of proposals committed is bigger than snapshot count,
-			// a new snapshot should have been created.
-			if end-start > snapshotCount {
-				return nil
-			}
-			time.Sleep(time.Second)
+func (f *failureAll) Inject(c *cluster, round int) error {
+	for _, m := range c.Members {
+		if err := f.injectMember(m); err != nil {
+			return err
 		}
-		return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
 	}
 	return nil
 }
 
-func (f *failureKillOneForLongTime) Recover(c *cluster, round int) error {
-	i := round % c.Size
-	if _, err := c.Agents[i].Restart(); err != nil {
-		return err
-	}
-	return c.WaitHealth()
-}
-
-// failureKillLeaderForLongTime kills the leader for long time, and restart
-// after a snapshot is required.
-type failureKillLeaderForLongTime struct {
-	description
-	idx int
-}
-
-func newFailureKillLeaderForLongTime() *failureKillLeaderForLongTime {
-	return &failureKillLeaderForLongTime{
-		description: "kill the leader for long time and expect it to recover from incoming snapshot",
-	}
-}
-
-func (f *failureKillLeaderForLongTime) Inject(c *cluster, round int) error {
-	idx, err := c.GetLeader()
-	if err != nil {
-		return err
-	}
-	f.idx = idx
-	if err := c.Agents[idx].Stop(); err != nil {
-		return err
-	}
-	if c.Size >= 3 {
-		start, _ := c.Report()
-		var end int
-		retry := snapshotCount / 1000 * 3
-		for j := 0; j < retry; j++ {
-			end, _ = c.Report()
-			if end-start > snapshotCount {
-				return nil
-			}
-			time.Sleep(time.Second)
+func (f *failureAll) Recover(c *cluster, round int) error {
+	for _, m := range c.Members {
+		if err := f.recoverMember(m); err != nil {
+			return err
 		}
-		return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
-	}
-	return nil
-}
-
-func (f *failureKillLeaderForLongTime) Recover(c *cluster, round int) error {
-	if _, err := c.Agents[f.idx].Restart(); err != nil {
-		return err
 	}
 	return c.WaitHealth()
 }
 
-type failureIsolate struct {
-	description
-}
-
-func newFailureIsolate() *failureIsolate {
-	return &failureIsolate{
-		description: "isolate one member",
-	}
-}
-
-func (f *failureIsolate) Inject(c *cluster, round int) error {
-	i := round % c.Size
-	return c.Agents[i].DropPort(peerURLPort)
-}
-
-func (f *failureIsolate) Recover(c *cluster, round int) error {
-	i := round % c.Size
-	if err := c.Agents[i].RecoverPort(peerURLPort); err != nil {
-		return err
-	}
-	return c.WaitHealth()
-}
-
-type failureIsolateAll struct {
-	description
-}
-
-func newFailureIsolateAll() *failureIsolateAll {
-	return &failureIsolateAll{
-		description: "isolate all members",
-	}
-}
-
-func (f *failureIsolateAll) Inject(c *cluster, round int) error {
-	for _, a := range c.Agents {
-		if err := a.DropPort(peerURLPort); err != nil {
+func (f *failureMajority) Inject(c *cluster, round int) error {
+	for i := range killMap(c.Size, round) {
+		if err := f.injectMember(c.Members[i]); err != nil {
 			return err
 		}
 	}
 	return nil
 }
 
-func (f *failureIsolateAll) Recover(c *cluster, round int) error {
-	for _, a := range c.Agents {
-		if err := a.RecoverPort(peerURLPort); err != nil {
+func (f *failureMajority) Recover(c *cluster, round int) error {
+	for i := range killMap(c.Size, round) {
+		if err := f.recoverMember(c.Members[i]); err != nil {
 			return err
 		}
 	}
-	return c.WaitHealth()
-}
-
-type failureSlowNetworkOneMember struct {
-	description
-}
-
-func newFailureSlowNetworkOneMember() *failureSlowNetworkOneMember {
-	desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency)
-	return &failureSlowNetworkOneMember{
-		description: description(desc),
-	}
-}
-
-func (f *failureSlowNetworkOneMember) Inject(c *cluster, round int) error {
-	i := round % c.Size
-	if err := c.Agents[i].SetLatency(slowNetworkLatency, randomVariation); err != nil {
-		c.Agents[i].RemoveLatency() // roll back
-		return err
-	}
 	return nil
 }
 
-func (f *failureSlowNetworkOneMember) Recover(c *cluster, round int) error {
-	i := round % c.Size
-	if err := c.Agents[i].RemoveLatency(); err != nil {
-		return err
-	}
-	time.Sleep(waitRecover)
-	return c.WaitHealth()
-}
-
-type failureSlowNetworkLeader struct {
-	description
-	idx int
-}
-
-func newFailureSlowNetworkLeader() *failureSlowNetworkLeader {
-	desc := fmt.Sprintf("slow down leader's network by adding %d ms latency", slowNetworkLatency)
-	return &failureSlowNetworkLeader{
-		description: description(desc),
-	}
-}
-
-func (f *failureSlowNetworkLeader) Inject(c *cluster, round int) error {
+func (f *failureLeader) Inject(c *cluster, round int) error {
 	idx, err := c.GetLeader()
 	if err != nil {
 		return err
 	}
 	f.idx = idx
-	if err := c.Agents[idx].SetLatency(slowNetworkLatency, randomVariation); err != nil {
-		c.Agents[idx].RemoveLatency() // roll back
-		return err
-	}
-	return nil
+	return f.injectMember(c.Members[idx])
 }
 
-func (f *failureSlowNetworkLeader) Recover(c *cluster, round int) error {
-	if err := c.Agents[f.idx].RemoveLatency(); err != nil {
+func (f *failureLeader) Recover(c *cluster, round int) error {
+	if err := f.recoverMember(c.Members[f.idx]); err != nil {
 		return err
 	}
-	time.Sleep(waitRecover)
 	return c.WaitHealth()
 }
 
-type failureSlowNetworkAll struct {
-	description
-}
+func (f *failureDelay) Inject(c *cluster, round int) error {
+	if err := f.failure.Inject(c, round); err != nil {
+		return err
+	}
 
-func newFailureSlowNetworkAll() *failureSlowNetworkAll {
-	return &failureSlowNetworkAll{
-		description: "slow down all members' network",
+	if c.Size < 3 {
+		return nil
 	}
-}
 
-func (f *failureSlowNetworkAll) Inject(c *cluster, round int) error {
-	for i, a := range c.Agents {
-		if err := a.SetLatency(slowNetworkLatency, randomVariation); err != nil {
-			for j := 0; j < i; j++ { // roll back
-				c.Agents[j].RemoveLatency()
-			}
-			return err
+	start, _ := c.Report()
+	end := start
+	// Normal healthy cluster could accept 1000req/s at least.
+	// Give it 3-times time to create a new snapshot.
+	retry := snapshotCount / 1000 * 3
+	for j := 0; j < retry; j++ {
+		end, _ = c.Report()
+		// If the number of proposals committed is bigger than snapshot count,
+		// a new snapshot should have been created.
+		if end-start > snapshotCount {
+			return nil
 		}
+		time.Sleep(time.Second)
 	}
-	return nil
+	return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
 }
 
-func (f *failureSlowNetworkAll) Recover(c *cluster, round int) error {
-	for _, a := range c.Agents {
-		if err := a.RemoveLatency(); err != nil {
-			return err
+func (f *failureDelay) Desc() string {
+	return f.failure.Desc() + " for a long time and expect it to recover from an incoming snapshot"
+}
+
+func killMap(size int, seed int) map[int]bool {
+	m := make(map[int]bool)
+	r := rand.New(rand.NewSource(int64(seed)))
+	majority := size/2 + 1
+	for {
+		m[r.Intn(size)] = true
+		if len(m) >= majority {
+			return m
 		}
 	}
-	time.Sleep(waitRecover)
-	return c.WaitHealth()
 }

+ 141 - 0
tools/functional-tester/etcd-tester/failure_agent.go

@@ -0,0 +1,141 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+const (
+	snapshotCount      = 10000
+	slowNetworkLatency = 500 // 500 millisecond
+	randomVariation    = 50
+
+	// Wait more when it recovers from slow network, because network layer
+	// needs extra time to propagate traffic control (tc command) change.
+	// Otherwise, we get different hash values from the previous revision.
+	// For more detail, please see https://github.com/coreos/etcd/issues/5121.
+	waitRecover = 5 * time.Second
+)
+
+func injectStop(m *member) error { return m.Agent.Stop() }
+func recoverStop(m *member) error {
+	_, err := m.Agent.Restart()
+	return err
+}
+
+func newFailureKillAll() failure {
+	return &failureAll{
+		description:   "kill all members",
+		injectMember:  injectStop,
+		recoverMember: recoverStop,
+	}
+}
+
+func newFailureKillMajority() failure {
+	return &failureMajority{
+		description:   "kill majority of the cluster",
+		injectMember:  injectStop,
+		recoverMember: recoverStop,
+	}
+}
+
+func newFailureKillOne() failure {
+	return &failureOne{
+		description:   "kill one random member",
+		injectMember:  injectStop,
+		recoverMember: recoverStop,
+	}
+}
+
+func newFailureKillLeader() failure {
+	ff := failureByFunc{
+		description:   "kill leader member",
+		injectMember:  injectStop,
+		recoverMember: recoverStop,
+	}
+	return &failureLeader{ff, 0}
+}
+
+func newFailureKillOneForLongTime() failure {
+	return &failureDelay{newFailureKillOne()}
+}
+
+func newFailureKillLeaderForLongTime() failure {
+	return &failureDelay{newFailureKillLeader()}
+}
+
+func injectDropPort(m *member) error  { return m.Agent.DropPort(peerURLPort) }
+func recoverDropPort(m *member) error { return m.Agent.RecoverPort(peerURLPort) }
+
+func newFailureIsolate() failure {
+	return &failureOne{
+		description:   "isolate one member",
+		injectMember:  injectDropPort,
+		recoverMember: recoverDropPort,
+	}
+}
+
+func newFailureIsolateAll() failure {
+	return &failureAll{
+		description:   "isolate all members",
+		injectMember:  injectDropPort,
+		recoverMember: recoverDropPort,
+	}
+}
+
+func injectLatency(m *member) error {
+	if err := m.Agent.SetLatency(slowNetworkLatency, randomVariation); err != nil {
+		m.Agent.RemoveLatency()
+		return err
+	}
+	return nil
+}
+
+func recoverLatency(m *member) error {
+	if err := m.Agent.RemoveLatency(); err != nil {
+		return err
+	}
+	time.Sleep(waitRecover)
+	return nil
+}
+
+func newFailureSlowNetworkOneMember() failure {
+	desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency)
+	return &failureOne{
+		description:   description(desc),
+		injectMember:  injectLatency,
+		recoverMember: recoverLatency,
+	}
+}
+
+func newFailureSlowNetworkLeader() failure {
+	desc := fmt.Sprintf("slow down leader's network by adding %d ms latency", slowNetworkLatency)
+	ff := failureByFunc{
+		description:   description(desc),
+		injectMember:  injectStop,
+		recoverMember: recoverStop,
+	}
+	return &failureLeader{ff, 0}
+}
+
+func newFailureSlowNetworkAll() failure {
+	return &failureAll{
+		description:   "slow down all members' network",
+		injectMember:  injectLatency,
+		recoverMember: recoverLatency,
+	}
+}

+ 160 - 0
tools/functional-tester/etcd-tester/member.go

@@ -0,0 +1,160 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"net/url"
+	"time"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+
+	clientv2 "github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
+)
+
+type member struct {
+	Agent     client.Agent
+	Endpoint  string
+	Name      string
+	ClientURL string
+	PeerURL   string
+}
+
+func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL }
+
+func (m *member) Flags() []string {
+	return []string{
+		"--name", m.Name,
+		"--listen-client-urls", m.ClientURL,
+		"--advertise-client-urls", m.ClientURL,
+		"--listen-peer-urls", m.PeerURL,
+		"--initial-advertise-peer-urls", m.PeerURL,
+		"--initial-cluster-state", "new",
+	}
+}
+
+func (m *member) CheckCompact(rev int64) error {
+	cli, err := m.newClientV3()
+	if err != nil {
+		return fmt.Errorf("%v (endpoint %s)", err, m.ClientURL)
+	}
+	defer cli.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
+	wr, ok := <-wch
+	cancel()
+
+	if !ok {
+		return fmt.Errorf("watch channel terminated (endpoint %s)", m.ClientURL)
+	}
+	if wr.CompactRevision != rev {
+		return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.ClientURL)
+	}
+
+	return nil
+}
+
+func (m *member) Defrag() error {
+	plog.Printf("defragmenting %s\n", m.ClientURL)
+	cli, err := m.newClientV3()
+	if err != nil {
+		return err
+	}
+	defer cli.Close()
+	if _, err = cli.Defragment(context.Background(), m.ClientURL); err != nil {
+		return err
+	}
+	plog.Printf("defragmented %s\n", m.ClientURL)
+	return nil
+}
+
+func (m *member) RevHash() (int64, int64, error) {
+	conn, err := m.dialGRPC()
+	if err != nil {
+		return 0, 0, err
+	}
+	mt := pb.NewMaintenanceClient(conn)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	resp, err := mt.Hash(ctx, &pb.HashRequest{})
+	cancel()
+	conn.Close()
+	return resp.Header.Revision, int64(resp.Hash), nil
+}
+
+func (m *member) IsLeader() (bool, error) {
+	cli, err := m.newClientV3()
+	if err != nil {
+		return false, err
+	}
+	defer cli.Close()
+	resp, err := cli.Status(context.Background(), m.ClientURL)
+	if err != nil {
+		return false, err
+	}
+	return resp.Header.MemberId == resp.Leader, nil
+}
+
+func (m *member) SetHealthKeyV3() error {
+	cli, err := m.newClientV3()
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, m.ClientURL)
+	}
+	defer cli.Close()
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	_, err = cli.Put(ctx, "health", "good")
+	cancel()
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, m.ClientURL)
+	}
+	return nil
+}
+
+func (m *member) SetHealthKeyV2() error {
+	cfg := clientv2.Config{Endpoints: []string{m.ClientURL}}
+	c, err := clientv2.New(cfg)
+	if err != nil {
+		return err
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	kapi := clientv2.NewKeysAPI(c)
+	_, err = kapi.Set(ctx, "health", "good", nil)
+	cancel()
+	return err
+}
+
+func (m *member) newClientV3() (*clientv3.Client, error) {
+	return clientv3.New(clientv3.Config{
+		Endpoints:   []string{m.ClientURL},
+		DialTimeout: 5 * time.Second,
+	})
+}
+
+func (m *member) dialGRPC() (*grpc.ClientConn, error) {
+	return grpc.Dial(m.grpcAddr(), grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+}
+
+// grpcAddr gets the host from clientURL so it works with grpc.Dial()
+func (m *member) grpcAddr() string {
+	u, err := url.Parse(m.ClientURL)
+	if err != nil {
+		panic(err)
+	}
+	return u.Host
+}

+ 58 - 74
tools/functional-tester/etcd-tester/tester.go

@@ -41,80 +41,15 @@ func (tt *tester) runLoop() {
 		tt.status.Failures = append(tt.status.Failures, f.Desc())
 	}
 
-	var (
-		round          int
-		prevCompactRev int64
-	)
-	for {
+	var prevCompactRev int64
+	for round := 0; round < tt.limit || tt.limit == -1; round++ {
 		tt.status.setRound(round)
-		tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case'
 		roundTotalCounter.Inc()
 
-		var failed bool
-		for j, f := range tt.failures {
-			caseTotalCounter.WithLabelValues(f.Desc()).Inc()
-			tt.status.setCase(j)
-
-			if err := tt.cluster.WaitHealth(); err != nil {
-				plog.Printf("%s wait full health error: %v", tt.logPrefix(), err)
-				if err := tt.cleanup(); err != nil {
-					return
-				}
-				failed = true
-				break
-			}
-
-			plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc())
-			if err := f.Inject(tt.cluster, round); err != nil {
-				plog.Printf("%s injection error: %v", tt.logPrefix(), err)
-				if err := tt.cleanup(); err != nil {
-					return
-				}
-				failed = true
-				break
-			}
-			plog.Printf("%s injected failure", tt.logPrefix())
-
-			plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc())
-			if err := f.Recover(tt.cluster, round); err != nil {
-				plog.Printf("%s recovery error: %v", tt.logPrefix(), err)
-				if err := tt.cleanup(); err != nil {
-					return
-				}
-				failed = true
-				break
-			}
-			plog.Printf("%s recovered failure", tt.logPrefix())
-
-			if tt.cluster.v2Only {
-				plog.Printf("%s succeed!", tt.logPrefix())
-				continue
-			}
-
-			if !tt.consistencyCheck {
-				if err := tt.updateRevision(); err != nil {
-					plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
-					return
-				}
-				continue
-			}
-
-			var err error
-			failed, err = tt.checkConsistency()
-			if err != nil {
-				plog.Warningf("%s functional-tester returning with tt.checkConsistency error (%v)", tt.logPrefix(), err)
+		if ok, err := tt.doRound(round); !ok {
+			if err != nil || tt.cleanup() != nil {
 				return
 			}
-			if failed {
-				break
-			}
-			plog.Printf("%s succeed!", tt.logPrefix())
-		}
-
-		// -1 so that logPrefix doesn't print out 'case'
-		tt.status.setCase(-1)
-
-		if failed {
 			continue
 		}
 
@@ -139,13 +74,62 @@ func (tt *tester) runLoop() {
 				return
 			}
 		}
+	}
 
-		round++
-		if round == tt.limit {
-			plog.Printf("%s functional-tester is finished", tt.logPrefix())
-			break
+	plog.Printf("%s functional-tester is finished", tt.logPrefix())
+}
+
+func (tt *tester) doRound(round int) (bool, error) {
+	// -1 so that logPrefix doesn't print out 'case'
+	defer tt.status.setCase(-1)
+
+	for j, f := range tt.failures {
+		caseTotalCounter.WithLabelValues(f.Desc()).Inc()
+		tt.status.setCase(j)
+
+		if err := tt.cluster.WaitHealth(); err != nil {
+			plog.Printf("%s wait full health error: %v", tt.logPrefix(), err)
+			return false, nil
+		}
+
+		plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc())
+		if err := f.Inject(tt.cluster, round); err != nil {
+			plog.Printf("%s injection error: %v", tt.logPrefix(), err)
+			return false, nil
+		}
+		plog.Printf("%s injected failure", tt.logPrefix())
+
+		plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc())
+		if err := f.Recover(tt.cluster, round); err != nil {
+			plog.Printf("%s recovery error: %v", tt.logPrefix(), err)
+			return false, nil
+		}
+		plog.Printf("%s recovered failure", tt.logPrefix())
+
+		if tt.cluster.v2Only {
+			plog.Printf("%s succeed!", tt.logPrefix())
+			continue
+		}
+
+		if !tt.consistencyCheck {
+			if err := tt.updateRevision(); err != nil {
+				plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
+				return false, err
+			}
+			continue
+		}
+
+		failed, err := tt.checkConsistency()
+		if err != nil {
+			plog.Warningf("%s functional-tester returning with tt.checkConsistency error (%v)", tt.logPrefix(), err)
+			return false, err
+		}
+		if failed {
+			return false, nil
 		}
+		plog.Printf("%s succeed!", tt.logPrefix())
 	}
+	return true, nil
 }
 
 func (tt *tester) updateRevision() error {
@@ -260,7 +244,7 @@ func (tt *tester) cleanup() error {
 		return err
 	}
 
-	if err := tt.cluster.Bootstrap(); err != nil {
+	if err := tt.cluster.Reset(); err != nil {
 		plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
 		return err
 	}