Prechádzať zdrojové kódy

Merge pull request #4673 from gyuho/slow

functional-tester: add latency test (simulate slow network)
Gyu-Ho Lee 9 rokov pred
rodič
commit
793fb2cf64

+ 7 - 0
tools/functional-tester/etcd-agent/agent.go

@@ -172,6 +172,13 @@ func (a *Agent) recoverPort(port int) error {
 	return netutil.RecoverPort(port)
 }
 
+func (a *Agent) setLatency(ms, rv int) error {
+	if ms == 0 {
+		return netutil.RemoveLatency()
+	}
+	return netutil.SetLatency(ms, rv)
+}
+
 func (a *Agent) status() client.Status {
 	return client.Status{State: a.state}
 }

+ 12 - 0
tools/functional-tester/etcd-agent/client/client.go

@@ -38,6 +38,10 @@ type Agent interface {
 	DropPort(port int) error
 	// RecoverPort stops dropping all network packets at the given port.
 	RecoverPort(port int) error
+	// SetLatency slows down network by introducing latency.
+	SetLatency(ms, rv int) error
+	// RemoveLatency removes latency introduced by SetLatency.
+	RemoveLatency() error
 	// Status returns the status of etcd on the agent
 	Status() (Status, error)
 }
@@ -93,6 +97,14 @@ func (a *agent) RecoverPort(port int) error {
 	return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil)
 }
 
+func (a *agent) SetLatency(ms, rv int) error {
+	return a.rpcClient.Call("Agent.RPCSetLatency", []int{ms, rv}, nil)
+}
+
+func (a *agent) RemoveLatency() error {
+	return a.rpcClient.Call("Agent.RPCRemoveLatency", struct{}{}, nil)
+}
+
 func (a *agent) Status() (Status, error) {
 	var s Status
 	err := a.rpcClient.Call("Agent.RPCStatus", struct{}{}, &s)

+ 22 - 0
tools/functional-tester/etcd-agent/rpc.go

@@ -15,6 +15,7 @@
 package main
 
 import (
+	"fmt"
 	"net"
 	"net/http"
 	"net/rpc"
@@ -102,6 +103,27 @@ func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error {
 	return nil
 }
 
+func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {
+	if len(args) != 2 {
+		return fmt.Errorf("SetLatency needs two args, got (%v)", args)
+	}
+	plog.Printf("set latency of %dms (+/- %dms)", args[0], args[1])
+	err := a.setLatency(args[0], args[1])
+	if err != nil {
+		plog.Println("error setting latency", err)
+	}
+	return nil
+}
+
+func (a *Agent) RPCRemoveLatency(args struct{}, reply *struct{}) error {
+	plog.Println("removing latency")
+	err := a.setLatency(0, 0)
+	if err != nil {
+		plog.Println("error removing latency")
+	}
+	return nil
+}
+
 func (a *Agent) RPCStatus(args struct{}, status *client.Status) error {
 	*status = a.status()
 	return nil

+ 96 - 1
tools/functional-tester/etcd-tester/failure.go

@@ -20,7 +20,11 @@ import (
 	"time"
 )
 
-const snapshotCount = 10000
+const (
+	snapshotCount      = 10000
+	slowNetworkLatency = 1000 // 1-second
+	randomVariation    = 50
+)
 
 type failure interface {
 	// Inject injeccts the failure into the testing cluster at the given
@@ -295,3 +299,94 @@ func (f *failureIsolateAll) Recover(c *cluster, round int) error {
 	}
 	return c.WaitHealth()
 }
+
+type failureSlowNetworkOneMember struct {
+	description
+}
+
+func newFailureSlowNetworkOneMember() *failureSlowNetworkOneMember {
+	desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency)
+	return &failureSlowNetworkOneMember{
+		description: description(desc),
+	}
+}
+
+func (f *failureSlowNetworkOneMember) Inject(c *cluster, round int) error {
+	i := round % c.Size
+	if err := c.Agents[i].SetLatency(slowNetworkLatency, randomVariation); err != nil {
+		c.Agents[i].RemoveLatency() // roll back
+		return err
+	}
+	return nil
+}
+
+func (f *failureSlowNetworkOneMember) Recover(c *cluster, round int) error {
+	i := round % c.Size
+	if err := c.Agents[i].RemoveLatency(); err != nil {
+		return err
+	}
+	return c.WaitHealth()
+}
+
+type failureSlowNetworkLeader struct {
+	description
+	idx int
+}
+
+func newFailureSlowNetworkLeader() *failureSlowNetworkLeader {
+	desc := fmt.Sprintf("slow down leader's network by adding %d ms latency", slowNetworkLatency)
+	return &failureSlowNetworkLeader{
+		description: description(desc),
+	}
+}
+
+func (f *failureSlowNetworkLeader) Inject(c *cluster, round int) error {
+	idx, err := c.GetLeader()
+	if err != nil {
+		return err
+	}
+	f.idx = idx
+	if err := c.Agents[idx].SetLatency(slowNetworkLatency, randomVariation); err != nil {
+		c.Agents[idx].RemoveLatency() // roll back
+		return err
+	}
+	return nil
+}
+
+func (f *failureSlowNetworkLeader) Recover(c *cluster, round int) error {
+	if err := c.Agents[f.idx].RemoveLatency(); err != nil {
+		return err
+	}
+	return c.WaitHealth()
+}
+
+type failureSlowNetworkAll struct {
+	description
+}
+
+func newFailureSlowNetworkAll() *failureSlowNetworkAll {
+	return &failureSlowNetworkAll{
+		description: "slow down all members' network",
+	}
+}
+
+func (f *failureSlowNetworkAll) Inject(c *cluster, round int) error {
+	for i, a := range c.Agents {
+		if err := a.SetLatency(slowNetworkLatency, randomVariation); err != nil {
+			for j := 0; j < i; j++ { // roll back
+				c.Agents[j].RemoveLatency()
+			}
+			return err
+		}
+	}
+	return nil
+}
+
+func (f *failureSlowNetworkAll) Recover(c *cluster, round int) error {
+	for _, a := range c.Agents {
+		if err := a.RemoveLatency(); err != nil {
+			return err
+		}
+	}
+	return c.WaitHealth()
+}

+ 3 - 0
tools/functional-tester/etcd-tester/main.go

@@ -53,6 +53,9 @@ func main() {
 		newFailureKillLeaderForLongTime(),
 		newFailureIsolate(),
 		newFailureIsolateAll(),
+		newFailureSlowNetworkOneMember(),
+		newFailureSlowNetworkLeader(),
+		newFailureSlowNetworkAll(),
 	}
 
 	schedule := failures