Browse Source

Merge pull request #4433 from gyuho/f0

functional-tester/etcd-tester: silent grpclog, get revs, sleep longer
Gyu-Ho Lee 10 years ago
parent
commit
054a4f3d7a

+ 7 - 0
tools/functional-tester/etcd-tester/stresser.go

@@ -16,6 +16,8 @@ package main
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"io/ioutil"
+	"log"
 	"math/rand"
 	"math/rand"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
@@ -24,10 +26,15 @@ import (
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/grpclog"
 	clientV2 "github.com/coreos/etcd/client"
 	clientV2 "github.com/coreos/etcd/client"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 )
 
 
+func init() {
+	grpclog.SetLogger(log.New(ioutil.Discard, "", 0))
+}
+
 type Stresser interface {
 type Stresser interface {
 	// Stress starts to stress the etcd cluster
 	// Stress starts to stress the etcd cluster
 	Stress() error
 	Stress() error

+ 73 - 31
tools/functional-tester/etcd-tester/tester.go

@@ -15,7 +15,6 @@
 package main
 package main
 
 
 import (
 import (
-	"fmt"
 	"log"
 	"log"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -85,21 +84,53 @@ func (tt *tester) runLoop() {
 				s.Cancel()
 				s.Cancel()
 			}
 			}
 
 
-			log.Printf("etcd-tester: [round#%d case#%d] waiting 5s for pending PUTs to be committed across cluster...", i, j)
-			time.Sleep(5 * time.Second)
+			ok := false
+			for k := 0; k < 5; k++ {
+				time.Sleep(time.Second)
+				log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k)
+				revs, err := tt.cluster.getRevision()
+				if err != nil {
+					if e := tt.cleanup(i, j); e != nil {
+						log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e)
+						return
+					}
+					log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err)
+					continue
+				}
+				if ok = isSameValueInMap(revs); ok {
+					log.Printf("etcd-tester: [round#%d case#%d] checking current revisions succeed!", i, j)
+					break
+				} else {
+					log.Printf("etcd-tester: [round#%d case#%d] current revisions %+v", i, j, revs)
+				}
+			}
+			if !ok {
+				log.Printf("etcd-tester: [round#%d case#%d] checking current revisions failure...", i, j)
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+				continue
+			}
 
 
-			log.Printf("etcd-tester: [round#%d case#%d] starting checking consistency...", i, j)
-			err := tt.cluster.checkConsistency()
+			log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
+			hashes, err := tt.cluster.getKVHash()
 			if err != nil {
 			if err != nil {
-				log.Printf("etcd-tester: [round#%d case#%d] checkConsistency error (%v)", i, j, err)
+				log.Printf("etcd-tester: [round#%d case#%d] getKVHash error (%v)", i, j, err)
 				if err := tt.cleanup(i, j); err != nil {
 				if err := tt.cleanup(i, j); err != nil {
 					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
 					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
 					return
 					return
 				}
 				}
-			} else {
-				log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
-				log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
 			}
 			}
+			if !isSameValueInMap(hashes) {
+				if err := tt.cleanup(i, j); err != nil {
+					log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
+					return
+				}
+				continue
+			}
+			log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
+			log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
 
 
 			log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
 			log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
 			for _, s := range tt.cluster.Stressers {
 			for _, s := range tt.cluster.Stressers {
@@ -152,44 +183,55 @@ func (s *Status) setCase(c int) {
 	s.Case = c
 	s.Case = c
 }
 }
 
 
-// checkConsistency stops the cluster for a moment and get the hashes of KV storages.
-func (c *cluster) checkConsistency() error {
-	hashes := make(map[string]uint32)
+func (c *cluster) getRevision() (map[string]int64, error) {
+	revs := make(map[string]int64)
 	for _, u := range c.GRPCURLs {
 	for _, u := range c.GRPCURLs {
 		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
 		if err != nil {
 		if err != nil {
-			return err
+			return nil, err
 		}
 		}
 		kvc := pb.NewKVClient(conn)
 		kvc := pb.NewKVClient(conn)
+		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
+		if err != nil {
+			return nil, err
+		}
+		cancel()
+		revs[u] = resp.Header.Revision
+	}
+	return revs, nil
+}
 
 
+func (c *cluster) getKVHash() (map[string]int64, error) {
+	hashes := make(map[string]int64)
+	for _, u := range c.GRPCURLs {
+		conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
+		if err != nil {
+			return nil, err
+		}
+		kvc := pb.NewKVClient(conn)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		resp, err := kvc.Hash(ctx, &pb.HashRequest{})
 		resp, err := kvc.Hash(ctx, &pb.HashRequest{})
-		hv := resp.Hash
 		if resp != nil && err != nil {
 		if resp != nil && err != nil {
-			return err
+			return nil, err
 		}
 		}
 		cancel()
 		cancel()
-
-		hashes[u] = hv
-	}
-
-	if !checkConsistency(hashes) {
-		return fmt.Errorf("check consistency fails: %v", hashes)
+		hashes[u] = int64(resp.Hash)
 	}
 	}
-	return nil
+	return hashes, nil
 }
 }
 
 
-// checkConsistency returns true if all nodes have the same KV hash values.
-func checkConsistency(hashes map[string]uint32) bool {
-	var cv uint32
-	isConsistent := true
+func isSameValueInMap(hashes map[string]int64) bool {
+	var rv int64
+	ok := true
 	for _, v := range hashes {
 	for _, v := range hashes {
-		if cv == 0 {
-			cv = v
+		if rv == 0 {
+			rv = v
 		}
 		}
-		if cv != v {
-			isConsistent = false
+		if rv != v {
+			ok = false
+			break
 		}
 		}
 	}
 	}
-	return isConsistent
+	return ok
 }
 }