Browse Source

Merge pull request #4406 from gyuho/f0

*: functional-tester with v3
Gyu-Ho Lee 10 years ago
parent
commit
26f440be7c

+ 9 - 3
tools/functional-tester/etcd-agent/agent_test.go

@@ -23,20 +23,24 @@ import (
 const etcdPath = "./etcd"
 
 func TestAgentStart(t *testing.T) {
+	defer os.Remove("etcd.log")
+
 	a, dir := newTestAgent(t)
 	defer a.terminate()
 
-	err := a.start("-data-dir", dir)
+	err := a.start("--data-dir", dir)
 	if err != nil {
 		t.Fatal(err)
 	}
 }
 
 func TestAgentRestart(t *testing.T) {
+	defer os.Remove("etcd.log")
+
 	a, dir := newTestAgent(t)
 	defer a.terminate()
 
-	err := a.start("-data-dir", dir)
+	err := a.start("--data-dir", dir)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -52,9 +56,11 @@ func TestAgentRestart(t *testing.T) {
 }
 
 func TestAgentTerminate(t *testing.T) {
+	defer os.Remove("etcd.log")
+
 	a, dir := newTestAgent(t)
 
-	err := a.start("-data-dir", dir)
+	err := a.start("--data-dir", dir)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 1 - 0
tools/functional-tester/etcd-agent/rpc.go

@@ -30,6 +30,7 @@ func (a *Agent) serveRPC() {
 	if e != nil {
 		log.Fatal("agent:", e)
 	}
+	log.Println("agent listening on :9027")
 	go http.Serve(l, nil)
 }
 

+ 4 - 4
tools/functional-tester/etcd-agent/rpc_test.go

@@ -43,7 +43,7 @@ func TestRPCStart(t *testing.T) {
 		t.Fatal(err)
 	}
 	var pid int
-	err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid)
+	err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -66,7 +66,7 @@ func TestRPCRestart(t *testing.T) {
 		t.Fatal(err)
 	}
 	var pid int
-	err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid)
+	err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -111,7 +111,7 @@ func TestRPCTerminate(t *testing.T) {
 		t.Fatal(err)
 	}
 	var pid int
-	err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid)
+	err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -146,7 +146,7 @@ func TestRPCStatus(t *testing.T) {
 		t.Fatal(err)
 	}
 	var pid int
-	err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid)
+	err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 82 - 27
tools/functional-tester/etcd-tester/cluster.go

@@ -23,13 +23,18 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	etcdclient "github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+
+	clientV2 "github.com/coreos/etcd/client"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
 )
 
 const peerURLPort = 2380
 
 type cluster struct {
+	v2Only bool // to be deprecated
+
 	agentEndpoints       []string
 	datadir              string
 	stressKeySize        int
@@ -39,6 +44,7 @@ type cluster struct {
 	Agents     []client.Agent
 	Stressers  []Stresser
 	Names      []string
+	GRPCURLs   []string
 	ClientURLs []string
 }
 
@@ -47,8 +53,9 @@ type ClusterStatus struct {
 }
 
 // newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down.
-func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int) (*cluster, error) {
+func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
 	c := &cluster{
+		v2Only:               isV2Only,
 		agentEndpoints:       agentEndpoints,
 		datadir:              datadir,
 		stressKeySize:        stressKeySize,
@@ -65,6 +72,7 @@ func (c *cluster) Bootstrap() error {
 
 	agents := make([]client.Agent, size)
 	names := make([]string, size)
+	grpcURLs := make([]string, size)
 	clientURLs := make([]string, size)
 	peerURLs := make([]string, size)
 	members := make([]string, size)
@@ -90,18 +98,28 @@ func (c *cluster) Bootstrap() error {
 	token := fmt.Sprint(rand.Int())
 
 	for i, a := range agents {
-		_, err := a.Start(
-			"-name", names[i],
-			"-data-dir", c.datadir,
-			"-advertise-client-urls", clientURLs[i],
-			"-listen-client-urls", clientURLs[i],
-			"-initial-advertise-peer-urls", peerURLs[i],
-			"-listen-peer-urls", peerURLs[i],
-			"-initial-cluster-token", token,
-			"-initial-cluster", clusterStr,
-			"-initial-cluster-state", "new",
-		)
-		if err != nil {
+		flags := []string{
+			"--name", names[i],
+			"--data-dir", c.datadir,
+
+			"--listen-client-urls", clientURLs[i],
+			"--advertise-client-urls", clientURLs[i],
+
+			"--listen-peer-urls", peerURLs[i],
+			"--initial-advertise-peer-urls", peerURLs[i],
+
+			"--initial-cluster-token", token,
+			"--initial-cluster", clusterStr,
+			"--initial-cluster-state", "new",
+		}
+		if !c.v2Only {
+			flags = append(flags,
+				"--experimental-v3demo",
+				"--experimental-gRPC-addr", grpcURLs[i],
+			)
+		}
+
+		if _, err := a.Start(flags...); err != nil {
 			// cleanup
 			for j := 0; j < i; j++ {
 				agents[j].Terminate()
@@ -110,22 +128,36 @@ func (c *cluster) Bootstrap() error {
 		}
 	}
 
-	stressers := make([]Stresser, len(clientURLs))
-	for i, u := range clientURLs {
-		s := &stresser{
-			Endpoint:       u,
-			KeySize:        c.stressKeySize,
-			KeySuffixRange: c.stressKeySuffixRange,
-			N:              200,
+	var stressers []Stresser
+	if c.v2Only {
+		for _, u := range clientURLs {
+			s := &stresserV2{
+				Endpoint:       u,
+				KeySize:        c.stressKeySize,
+				KeySuffixRange: c.stressKeySuffixRange,
+				N:              200,
+			}
+			go s.Stress()
+			stressers = append(stressers, s)
+		}
+	} else {
+		for _, u := range grpcURLs {
+			s := &stresser{
+				Endpoint:       u,
+				KeySize:        c.stressKeySize,
+				KeySuffixRange: c.stressKeySuffixRange,
+				N:              200,
+			}
+			go s.Stress()
+			stressers = append(stressers, s)
 		}
-		go s.Stress()
-		stressers[i] = s
 	}
 
 	c.Size = size
 	c.Agents = agents
 	c.Stressers = stressers
 	c.Names = names
+	c.GRPCURLs = grpcURLs
 	c.ClientURLs = clientURLs
 	return nil
 }
@@ -136,8 +168,13 @@ func (c *cluster) WaitHealth() error {
 	// TODO: set it to a reasonable value. It is set that high because
 	// follower may use long time to catch up the leader when reboot under
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
+	healthFunc, urls := setHealthKey, c.GRPCURLs
+	if c.v2Only {
+		healthFunc = setHealthKeyV2
+		urls = c.ClientURLs
+	}
 	for i := 0; i < 60; i++ {
-		err = setHealthKey(c.ClientURLs)
+		err = healthFunc(urls)
 		if err == nil {
 			return nil
 		}
@@ -198,15 +235,33 @@ func (c *cluster) Status() ClusterStatus {
 // setHealthKey sets health key on all given urls.
 func setHealthKey(us []string) error {
 	for _, u := range us {
-		cfg := etcdclient.Config{
+		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+		if err != nil {
+			return fmt.Errorf("no connection available for %s (%v)", u, err)
+		}
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+		kvc := pb.NewKVClient(conn)
+		_, err = kvc.Put(ctx, &pb.PutRequest{Key: []byte("health"), Value: []byte("good")})
+		cancel()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// setHealthKeyV2 sets health key on all given urls.
+func setHealthKeyV2(us []string) error {
+	for _, u := range us {
+		cfg := clientV2.Config{
 			Endpoints: []string{u},
 		}
-		c, err := etcdclient.New(cfg)
+		c, err := clientV2.New(cfg)
 		if err != nil {
 			return err
 		}
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-		kapi := etcdclient.NewKeysAPI(c)
+		kapi := clientV2.NewKeysAPI(c)
 		_, err = kapi.Set(ctx, "health", "good", nil)
 		cancel()
 		if err != nil {

+ 7 - 6
tools/functional-tester/etcd-tester/main.go

@@ -22,15 +22,16 @@ import (
 )
 
 func main() {
-	endpointStr := flag.String("agent-endpoints", ":9027", "HTTP RPC endpoints of agents")
-	datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine")
-	stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd")
-	stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd")
-	limit := flag.Int("limit", 3, "the limit of rounds to run failure set")
+	endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
+	datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
+	stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
+	stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
+	limit := flag.Int("limit", 3, "the limit of rounds to run failure set.")
+	isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
 	flag.Parse()
 
 	endpoints := strings.Split(*endpointStr, ",")
-	c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange)
+	c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange, *isV2Only)
 	if err != nil {
 		log.Fatal(err)
 	}

+ 72 - 8
tools/functional-tester/etcd-tester/stresser.go

@@ -23,7 +23,9 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	clientV2 "github.com/coreos/etcd/client"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
 type Stresser interface {
@@ -41,6 +43,68 @@ type stresser struct {
 	KeySize        int
 	KeySuffixRange int
 
+	N int
+
+	mu      sync.Mutex
+	failure int
+	success int
+
+	cancel func()
+}
+
+func (s *stresser) Stress() error {
+	conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+	if err != nil {
+		return fmt.Errorf("no connection available for %s (%v)", s.Endpoint, err)
+	}
+	kvc := pb.NewKVClient(conn)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	s.cancel = cancel
+
+	for i := 0; i < s.N; i++ {
+		go func(i int) {
+			for {
+				putctx, putcancel := context.WithTimeout(ctx, 5*time.Second)
+				_, err := kvc.Put(putctx, &pb.PutRequest{
+					Key:   []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
+					Value: []byte(randStr(s.KeySize)),
+				})
+				putcancel()
+				if err == context.Canceled {
+					return
+				}
+				s.mu.Lock()
+				if err != nil {
+					s.failure++
+				} else {
+					s.success++
+				}
+				s.mu.Unlock()
+			}
+		}(i)
+	}
+
+	<-ctx.Done()
+	return nil
+}
+
+func (s *stresser) Cancel() {
+	s.cancel()
+}
+
+func (s *stresser) Report() (success int, failure int) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.success, s.failure
+}
+
+type stresserV2 struct {
+	Endpoint string
+
+	KeySize        int
+	KeySuffixRange int
+
 	N int
 	// TODO: not implemented
 	Interval time.Duration
@@ -52,8 +116,8 @@ type stresser struct {
 	cancel func()
 }
 
-func (s *stresser) Stress() error {
-	cfg := client.Config{
+func (s *stresserV2) Stress() error {
+	cfg := clientV2.Config{
 		Endpoints: []string{s.Endpoint},
 		Transport: &http.Transport{
 			Dial: (&net.Dialer{
@@ -63,19 +127,19 @@ func (s *stresser) Stress() error {
 			MaxIdleConnsPerHost: s.N,
 		},
 	}
-	c, err := client.New(cfg)
+	c, err := clientV2.New(cfg)
 	if err != nil {
 		return err
 	}
 
-	kv := client.NewKeysAPI(c)
+	kv := clientV2.NewKeysAPI(c)
 	ctx, cancel := context.WithCancel(context.Background())
 	s.cancel = cancel
 
 	for i := 0; i < s.N; i++ {
 		go func() {
 			for {
-				setctx, setcancel := context.WithTimeout(ctx, client.DefaultRequestTimeout)
+				setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
 				key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))
 				_, err := kv.Set(setctx, key, randStr(s.KeySize), nil)
 				setcancel()
@@ -97,11 +161,11 @@ func (s *stresser) Stress() error {
 	return nil
 }
 
-func (s *stresser) Cancel() {
+func (s *stresserV2) Cancel() {
 	s.cancel()
 }
 
-func (s *stresser) Report() (success int, failure int) {
+func (s *stresserV2) Report() (success int, failure int) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	return s.success, s.failure