Browse Source

Merge pull request #2441 from yichengq/334

tools/functional-tester: make it work basically
Yicheng Qin 11 years ago
parent
commit
0a04eec481

+ 14 - 3
tools/functional-tester/etcd-agent/agent.go

@@ -22,8 +22,9 @@ import (
 )
 
 type Agent struct {
-	cmd *exec.Cmd
-	l   net.Listener
+	cmd     *exec.Cmd
+	logfile *os.File
+	l       net.Listener
 }
 
 func newAgent(etcd string) (*Agent, error) {
@@ -34,12 +35,20 @@ func newAgent(etcd string) (*Agent, error) {
 	}
 
 	c := exec.Command(etcd)
-	return &Agent{cmd: c}, nil
+
+	f, err := os.Create("etcd.log")
+	if err != nil {
+		return nil, err
+	}
+
+	return &Agent{cmd: c, logfile: f}, nil
 }
 
 // start starts a new etcd process with the given args.
 func (a *Agent) start(args ...string) error {
 	a.cmd = exec.Command(a.cmd.Path, args...)
+	a.cmd.Stdout = a.logfile
+	a.cmd.Stderr = a.logfile
 	return a.cmd.Start()
 }
 
@@ -56,6 +65,8 @@ func (a *Agent) stop() error {
 // restart restarts the stopped etcd process.
 func (a *Agent) restart() error {
 	a.cmd = exec.Command(a.cmd.Path, a.cmd.Args[1:]...)
+	a.cmd.Stdout = a.logfile
+	a.cmd.Stderr = a.logfile
 	return a.cmd.Start()
 }
 

+ 1 - 1
tools/functional-tester/etcd-agent/client/client.go

@@ -36,7 +36,7 @@ type agent struct {
 }
 
 func NewAgent(endpoint string) (Agent, error) {
-	c, err := rpc.Dial("tcp", endpoint)
+	c, err := rpc.DialHTTP("tcp", endpoint)
 	if err != nil {
 		return nil, err
 	}

+ 16 - 1
tools/functional-tester/etcd-agent/main.go

@@ -14,6 +14,21 @@
 
 package main
 
+import (
+	"flag"
+	"log"
+)
+
 func main() {
-	panic("not implemented")
+	etcdPath := flag.String("etcd-path", "/opt/etcd/bin/etcd", "")
+	flag.Parse()
+
+	a, err := newAgent(*etcdPath)
+	if err != nil {
+		log.Fatal(err)
+	}
+	a.serveRPC()
+
+	var done chan struct{}
+	<-done
 }

+ 4 - 1
tools/functional-tester/etcd-agent/rpc.go

@@ -32,20 +32,22 @@ func (a *Agent) serveRPC() {
 }
 
 func (a *Agent) RPCStart(args []string, pid *int) error {
+	log.Printf("rpc: start etcd with args %v", args)
 	err := a.start(args...)
 	if err != nil {
 		return err
 	}
-	log.Print("start", a.cmd.Args)
 	*pid = a.cmd.Process.Pid
 	return nil
 }
 
 func (a *Agent) RPCStop(args struct{}, reply *struct{}) error {
+	log.Printf("rpc: stop etcd")
 	return a.stop()
 }
 
 func (a *Agent) RPCRestart(args struct{}, pid *int) error {
+	log.Printf("rpc: restart etcd")
 	err := a.restart()
 	if err != nil {
 		return err
@@ -55,6 +57,7 @@ func (a *Agent) RPCRestart(args struct{}, pid *int) error {
 }
 
 func (a *Agent) RPCTerminate(args struct{}, reply *struct{}) error {
+	log.Printf("rpc: terminate etcd")
 	return a.terminate()
 }
 

+ 143 - 0
tools/functional-tester/etcd-tester/cluster.go

@@ -0,0 +1,143 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"net"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	etcdclient "github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
+)
+
+type cluster struct {
+	agentEndpoints []string
+	datadir        string
+
+	Size       int
+	Agents     []client.Agent
+	Names      []string
+	ClientURLs []string
+}
+
+// newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down.
+func newCluster(agentEndpoints []string, datadir string) (*cluster, error) {
+	c := &cluster{
+		agentEndpoints: agentEndpoints,
+		datadir:        datadir,
+	}
+	if err := c.Bootstrap(); err != nil {
+		return nil, err
+	}
+	return c, nil
+}
+
+func (c *cluster) Bootstrap() error {
+	size := len(c.agentEndpoints)
+
+	agents := make([]client.Agent, size)
+	names := make([]string, size)
+	clientURLs := make([]string, size)
+	peerURLs := make([]string, size)
+	members := make([]string, size)
+	for i, u := range c.agentEndpoints {
+		var err error
+		agents[i], err = client.NewAgent(u)
+		if err != nil {
+			return err
+		}
+
+		names[i] = fmt.Sprintf("etcd-%d", i)
+
+		host, _, err := net.SplitHostPort(u)
+		if err != nil {
+			return err
+		}
+		clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
+		peerURLs[i] = fmt.Sprintf("http://%s:2380", host)
+
+		members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
+	}
+	clusterStr := strings.Join(members, ",")
+	token := fmt.Sprint(rand.Int())
+
+	for i, a := range agents {
+		_, err := a.Start(
+			"-name", names[i],
+			"-data-dir", c.datadir,
+			"-advertise-client-urls", clientURLs[i],
+			"-listen-client-urls", clientURLs[i],
+			"-initial-advertise-peer-urls", peerURLs[i],
+			"-listen-peer-urls", peerURLs[i],
+			"-initial-cluster-token", token,
+			"-initial-cluster", clusterStr,
+			"-initial-cluster-state", "new",
+		)
+		if err != nil {
+			// cleanup
+			for j := 0; j < i; j++ {
+				agents[j].Terminate()
+			}
+			return err
+		}
+	}
+
+	c.Size = size
+	c.Agents = agents
+	c.Names = names
+	c.ClientURLs = clientURLs
+	return nil
+}
+
+func (c *cluster) WaitHealth() error {
+	var err error
+	for i := 0; i < 10; i++ {
+		err = setHealthKey(c.ClientURLs)
+		if err == nil {
+			return nil
+		}
+		time.Sleep(time.Second)
+	}
+	return err
+}
+
+func (c *cluster) Terminate() {
+	for _, a := range c.Agents {
+		a.Terminate()
+	}
+}
+
+// setHealthKey sets health key on all given urls.
+func setHealthKey(us []string) error {
+	for _, u := range us {
+		cfg := etcdclient.Config{
+			Endpoints: []string{u},
+		}
+		c, err := etcdclient.New(cfg)
+		if err != nil {
+			return err
+		}
+		kapi := etcdclient.NewKeysAPI(c)
+		_, err = kapi.Set(context.TODO(), "health", "good", nil)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 6 - 4
tools/functional-tester/etcd-tester/failure.go

@@ -14,14 +14,16 @@
 
 package main
 
-import "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
-
 type failure interface {
 	// inject the failure into the testing cluster
-	Inject(agents []client.Agent) error
+	Inject(c *cluster) error
 	// recover the injected failure and wait for the
 	// recovery of the testing cluster
-	Recover(agents []client.Agent) error
+	Recover(c *cluster) error
 	// return a description of the failure
 	Desc() string
 }
+
+type description string
+
+func (d description) Desc() string { return string(d) }

+ 43 - 0
tools/functional-tester/etcd-tester/failure_killall.go

@@ -0,0 +1,43 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+type failureKillAll struct {
+	description
+}
+
+func newFailureKillAll() *failureKillAll {
+	return &failureKillAll{
+		description: "kill all members",
+	}
+}
+
+func (f *failureKillAll) Inject(c *cluster) error {
+	for _, a := range c.Agents {
+		if err := a.Stop(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (f *failureKillAll) Recover(c *cluster) error {
+	for _, a := range c.Agents {
+		if _, err := a.Restart(); err != nil {
+			return err
+		}
+	}
+	return c.WaitHealth()
+}

+ 29 - 0
tools/functional-tester/etcd-tester/failure_no.go

@@ -0,0 +1,29 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+type failureBase struct {
+	description
+}
+
+func newFailureBase() *failureBase {
+	return &failureBase{
+		description: "do nothing",
+	}
+}
+
+func (f *failureBase) Inject(c *cluster) error { return nil }
+
+func (f *failureBase) Recover(c *cluster) error { return nil }

+ 24 - 1
tools/functional-tester/etcd-tester/main.go

@@ -14,6 +14,29 @@
 
 package main
 
+import (
+	"flag"
+	"log"
+	"strings"
+)
+
 func main() {
-	panic("not implemented")
+	endpointStr := flag.String("agent-endpoints", ":9027", "")
+	datadir := flag.String("data-dir", "agent.etcd", "")
+	limit := flag.Int("limit", 3, "")
+	flag.Parse()
+
+	endpoints := strings.Split(*endpointStr, ",")
+	c, err := newCluster(endpoints, *datadir)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer c.Terminate()
+
+	t := &tester{
+		failures: []failure{newFailureBase(), newFailureKillAll()},
+		cluster:  c,
+		limit:    *limit,
+	}
+	t.runLoop()
 }

+ 32 - 22
tools/functional-tester/etcd-tester/tester.go

@@ -14,41 +14,51 @@
 
 package main
 
-import (
-	"fmt"
-
-	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
-)
+import "log"
 
 type tester struct {
 	failures []failure
-	agents   []client.Agent
+	cluster  *cluster
 	limit    int
 }
 
 func (tt *tester) runLoop() {
 	for i := 0; i < tt.limit; i++ {
 		for j, f := range tt.failures {
-			fmt.Println("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
-			fmt.Println("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
-			if err := f.Inject(tt.agents); err != nil {
-				fmt.Println("etcd-tester: [round#%d case#%d] injection failing...", i, j)
-				tt.cleanup(i, j)
+			if err := tt.cluster.WaitHealth(); err != nil {
+				log.Printf("etcd-tester: [round#%d case#%d] wait full health error: %v", i, j, err)
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+				continue
+			}
+			log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
+			log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
+			if err := f.Inject(tt.cluster); err != nil {
+				log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+				continue
 			}
-			fmt.Println("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
-			if err := f.Recover(tt.agents); err != nil {
-				fmt.Println("etcd-tester: [round#%d case#%d] recovery failing...", i, j)
-				tt.cleanup(i, j)
+			log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
+			if err := f.Recover(tt.cluster); err != nil {
+				log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+				continue
 			}
-			fmt.Println("etcd-tester: [round#%d case#%d] succeed!", i, j)
+			log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
 		}
 	}
 }
 
-func (tt *tester) cleanup(i, j int) {
-	fmt.Println("etcd-tester: [round#%d case#%d] cleaning up...", i, j)
-	for _, a := range tt.agents {
-		a.Terminate()
-		a.Start()
-	}
+func (tt *tester) cleanup(i, j int) error {
+	log.Printf("etcd-tester: [round#%d case#%d] cleaning up...", i, j)
+	tt.cluster.Terminate()
+	return tt.cluster.Bootstrap()
 }