Browse Source

Merge pull request #2447 from yichengq/334

etcd-tester: initial stresser
Yicheng Qin 10 years ago
parent
commit
8b770f8a1a

+ 14 - 0
tools/functional-tester/etcd-tester/main.go

@@ -33,10 +33,24 @@ func main() {
 	}
 	defer c.Terminate()
 
+	stressers := make([]Stresser, len(c.ClientURLs))
+	for i, u := range c.ClientURLs {
+		s := &stresser{
+			Endpoint: u,
+			N:        200,
+		}
+		go s.Stress()
+		stressers[i] = s
+	}
+
 	t := &tester{
 		failures: []failure{newFailureBase(), newFailureKillAll()},
 		cluster:  c,
 		limit:    *limit,
 	}
 	t.runLoop()
+
+	for _, s := range stressers {
+		s.Cancel()
+	}
 }

+ 87 - 0
tools/functional-tester/etcd-tester/stresser.go

@@ -0,0 +1,87 @@
+package main
+
+import (
+	"net"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/client"
+)
+
+type Stresser interface {
+	// Stress starts to stress the etcd cluster
+	Stress() error
+	// Cancel cancels the stress test on the etcd cluster
+	Cancel()
+	// Report reports the success and failure of the stress test
+	Report() (success int, failure int)
+}
+
+type stresser struct {
+	Endpoint string
+	// TODO: not implemented
+	SuffixRange int
+
+	N int
+	// TODO: not implemented
+	Interval time.Duration
+
+	mu      sync.Mutex
+	failure int
+	success int
+
+	cancel func()
+}
+
+func (s *stresser) Stress() error {
+	cfg := client.Config{
+		Endpoints: []string{s.Endpoint},
+		Transport: &http.Transport{
+			Dial: (&net.Dialer{
+				Timeout:   time.Second,
+				KeepAlive: 30 * time.Second,
+			}).Dial,
+			MaxIdleConnsPerHost: s.N,
+		},
+	}
+	c, err := client.New(cfg)
+	if err != nil {
+		return err
+	}
+
+	kv := client.NewKeysAPI(c)
+	ctx, cancel := context.WithCancel(context.Background())
+	s.cancel = cancel
+
+	for i := 0; i < s.N; i++ {
+		go func() {
+			for {
+				_, err := kv.Set(ctx, "foo", "bar", nil)
+				if err == context.Canceled {
+					return
+				}
+				s.mu.Lock()
+				if err != nil {
+					s.failure++
+				}
+				s.success++
+				s.mu.Unlock()
+			}
+		}()
+	}
+
+	<-ctx.Done()
+	return nil
+}
+
+func (s *stresser) Cancel() {
+	s.cancel()
+}
+
+func (s *stresser) Report() (success int, failure int) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.success, s.failure
+}