Browse Source

Merge pull request #4412 from gyuho/f0

functional-tester: fix grpc endpoint, consistent check
Xiang Li 10 years ago
parent
commit
2897fb0c5c

+ 3 - 3
tools/functional-tester/etcd-tester/cluster.go

@@ -89,6 +89,7 @@ func (c *cluster) Bootstrap() error {
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
+		grpcURLs[i] = fmt.Sprintf("%s:2378", host)
 		clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
 		clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
 		peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort)
 		peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort)
 
 
@@ -170,8 +171,7 @@ func (c *cluster) WaitHealth() error {
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
 	healthFunc, urls := setHealthKey, c.GRPCURLs
 	healthFunc, urls := setHealthKey, c.GRPCURLs
 	if c.v2Only {
 	if c.v2Only {
-		healthFunc = setHealthKeyV2
-		urls = c.ClientURLs
+		healthFunc, urls = setHealthKeyV2, c.ClientURLs
 	}
 	}
 	for i := 0; i < 60; i++ {
 	for i := 0; i < 60; i++ {
 		err = healthFunc(urls)
 		err = healthFunc(urls)
@@ -237,7 +237,7 @@ func setHealthKey(us []string) error {
 	for _, u := range us {
 	for _, u := range us {
 		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		if err != nil {
 		if err != nil {
-			return fmt.Errorf("no connection available for %s (%v)", u, err)
+			return fmt.Errorf("%v (%s)", err, u)
 		}
 		}
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 		kvc := pb.NewKVClient(conn)
 		kvc := pb.NewKVClient(conn)

+ 1 - 1
tools/functional-tester/etcd-tester/stresser.go

@@ -55,7 +55,7 @@ type stresser struct {
 func (s *stresser) Stress() error {
 func (s *stresser) Stress() error {
 	conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 	conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf("no connection available for %s (%v)", s.Endpoint, err)
+		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 	}
 	}
 	kvc := pb.NewKVClient(conn)
 	kvc := pb.NewKVClient(conn)
 
 

+ 79 - 1
tools/functional-tester/etcd-tester/tester.go

@@ -15,9 +15,15 @@
 package main
 package main
 
 
 import (
 import (
+	"fmt"
 	"log"
 	"log"
 	"sync"
 	"sync"
 	"time"
 	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 )
 
 
 type tester struct {
 type tester struct {
@@ -68,7 +74,37 @@ func (tt *tester) runLoop() {
 				}
 				}
 				continue
 				continue
 			}
 			}
-			log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
+
+			if tt.cluster.v2Only {
+				log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
+				continue
+			}
+
+			log.Printf("etcd-tester: [round#%d case#%d] canceling the stressers...", i, j)
+			for _, s := range tt.cluster.Stressers {
+				s.Cancel()
+			}
+
+			log.Printf("etcd-tester: [round#%d case#%d] waiting 5s for pending PUTs to be committed across cluster...", i, j)
+			time.Sleep(5 * time.Second)
+
+			log.Printf("etcd-tester: [round#%d case#%d] starting checking consistency...", i, j)
+			err := tt.cluster.checkConsistency()
+			if err != nil {
+				log.Printf("etcd-tester: [round#%d case#%d] checkConsistency error (%v)", i, j, err)
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+			} else {
+				log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
+				log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
+			}
+
+			log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
+			for _, s := range tt.cluster.Stressers {
+				go s.Stress()
+			}
 		}
 		}
 	}
 	}
 }
 }
@@ -115,3 +151,45 @@ func (s *Status) setCase(c int) {
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	s.Case = c
 	s.Case = c
 }
 }
+
+// checkConsistency stops the cluster for a moment and get the hashes of KV storages.
+func (c *cluster) checkConsistency() error {
+	hashes := make(map[string]uint32)
+	for _, u := range c.GRPCURLs {
+		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+		if err != nil {
+			return err
+		}
+		kvc := pb.NewKVClient(conn)
+
+		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		resp, err := kvc.Hash(ctx, &pb.HashRequest{})
+		hv := resp.Hash
+		if resp != nil && err != nil {
+			return err
+		}
+		cancel()
+
+		hashes[u] = hv
+	}
+
+	if !checkConsistency(hashes) {
+		return fmt.Errorf("check consistency fails: %v", hashes)
+	}
+	return nil
+}
+
+// checkConsistency returns true if all nodes have the same KV hash values.
+func checkConsistency(hashes map[string]uint32) bool {
+	var cv uint32
+	isConsistent := true
+	for _, v := range hashes {
+		if cv == 0 {
+			cv = v
+		}
+		if cv != v {
+			isConsistent = false
+		}
+	}
+	return isConsistent
+}