瀏覽代碼

Merge pull request #6670 from fanminshi/lease_stressor

functional-tester: add lease stresser
fanmin shi 9 年之前
父節點
當前提交
20bdb315f5

+ 99 - 0
tools/functional-tester/etcd-tester/checks.go

@@ -16,7 +16,10 @@ package main
 
 import (
 	"fmt"
+	"strings"
 	"time"
+
+	"golang.org/x/net/context"
 )
 
 type Checker interface {
@@ -34,6 +37,8 @@ type hashChecker struct {
 
 func newHashChecker(hrg hashAndRevGetter) Checker { return &hashChecker{hrg} }
 
+const leaseCheckerTimeout = 10 * time.Second
+
 func (hc *hashChecker) Check() (err error) {
 	plog.Printf("fetching current revisions...")
 	var (
@@ -69,6 +74,100 @@ func (hc *hashChecker) Check() (err error) {
 	return nil
 }
 
+type leaseChecker struct {
+	leaseStressers []Stresser
+}
+
+func newLeaseChecker(leaseStressers []Stresser) Checker { return &leaseChecker{leaseStressers} }
+
+func (lc *leaseChecker) Check() error {
+	plog.Info("lease stresser invariant check...")
+	errc := make(chan error)
+	for _, ls := range lc.leaseStressers {
+		go func(s Stresser) { errc <- lc.checkInvariant(s) }(ls)
+	}
+	var errs []error
+	for i := 0; i < len(lc.leaseStressers); i++ {
+		if err := <-errc; err != nil {
+			errs = append(errs, err)
+		}
+	}
+
+	if len(errs) == 0 {
+		return nil
+	}
+	return fmt.Errorf("lease stresser encounters error: (%v)", fromErrsToString(errs))
+}
+
+func fromErrsToString(errs []error) string {
+	stringArr := make([]string, len(errs))
+	for i, err := range errs {
+		stringArr[i] = err.Error()
+	}
+	return strings.Join(stringArr, ",")
+}
+
+func (lc *leaseChecker) checkInvariant(lStresser Stresser) error {
+	ls := lStresser.(*leaseStresser)
+	if err := checkLeasesExpired(ls); err != nil {
+		return err
+	}
+	ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
+	return checkLeasesAlive(ls)
+}
+
+func checkLeasesExpired(ls *leaseStresser) error {
+	plog.Infof("revoked leases %v", ls.revokedLeases.getLeasesMap())
+	return checkLeases(true, ls, ls.revokedLeases.getLeasesMap())
+}
+
+func checkLeasesAlive(ls *leaseStresser) error {
+	plog.Infof("alive leases %v", ls.aliveLeases.getLeasesMap())
+	return checkLeases(false, ls, ls.aliveLeases.getLeasesMap())
+}
+
+func checkLeases(expired bool, ls *leaseStresser, leases map[int64]time.Time) error {
+	ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
+	defer cancel()
+	for leaseID := range leases {
+		keysExpired, err := ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
+		if err != nil {
+			plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
+			return err
+		}
+		leaseExpired, err := ls.hasLeaseExpired(ctx, leaseID)
+		if err != nil {
+			plog.Errorf("hasLeaseExpired error: (%v)", err)
+			return err
+		}
+		if leaseExpired != keysExpired {
+			return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
+		}
+		if leaseExpired != expired {
+			return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
+		}
+	}
+	return nil
+}
+
+type compositeChecker struct {
+	checkers []Checker
+}
+
+func newCompositeChecker(checkers []Checker) Checker {
+	return &compositeChecker{checkers}
+}
+
+func (cchecker *compositeChecker) Check() error {
+	for _, checker := range cchecker.checkers {
+		if err := checker.Check(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type noChecker struct{}
 
 func newNoChecker() Checker        { return &noChecker{} }

+ 18 - 7
tools/functional-tester/etcd-tester/cluster.go

@@ -45,9 +45,10 @@ type cluster struct {
 	consistencyCheck bool
 	Size             int
 
-	Stressers     []Stresser
-	stressBuilder stressBuilder
-	Checker       Checker
+	Stressers            []Stresser
+	stressBuilder        stressBuilder
+	leaseStresserBuilder leaseStresserBuilder
+	Checker              Checker
 
 	Members []*member
 }
@@ -99,18 +100,27 @@ func (c *cluster) bootstrap() error {
 		}
 	}
 
-	c.Stressers = make([]Stresser, len(members))
+	c.Stressers = make([]Stresser, 0)
+	leaseStressers := make([]Stresser, len(members))
 	for i, m := range members {
-		c.Stressers[i] = c.stressBuilder(m)
+		lStresser := c.leaseStresserBuilder(m)
+		leaseStressers[i] = lStresser
+		c.Stressers = append(c.Stressers, c.stressBuilder(m), lStresser)
+	}
+
+	for i := range c.Stressers {
 		go c.Stressers[i].Stress()
 	}
 
+	var checkers []Checker
 	if c.consistencyCheck && !c.v2Only {
-		c.Checker = newHashChecker(hashAndRevGetter(c))
+		checkers = append(checkers, newHashChecker(hashAndRevGetter(c)), newLeaseChecker(leaseStressers))
 	} else {
-		c.Checker = newNoChecker()
+		checkers = append(checkers, newNoChecker())
 	}
 
+	c.Checker = newCompositeChecker(checkers)
+
 	c.Size = size
 	c.Members = members
 	return nil
@@ -176,6 +186,7 @@ func (c *cluster) Cleanup() error {
 	for _, s := range c.Stressers {
 		s.Cancel()
 	}
+
 	return lasterr
 }
 

+ 387 - 0
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -0,0 +1,387 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+// time to live for lease
+const TTL = 30
+
+type leaseStressConfig struct {
+	numLeases    int
+	keysPerLease int
+	qps          int
+}
+
+type leaseStresser struct {
+	endpoint string
+	cancel   func()
+	conn     *grpc.ClientConn
+	kvc      pb.KVClient
+	lc       pb.LeaseClient
+	ctx      context.Context
+
+	success      int
+	failure      int
+	numLeases    int
+	keysPerLease int
+
+	aliveLeases   *atomicLeases
+	revokedLeases *atomicLeases
+
+	runWg   sync.WaitGroup
+	aliveWg sync.WaitGroup
+}
+
+type atomicLeases struct {
+	// rwLock is used to protect read/write access of leases map
+	// which are accessed and modified by different go routines.
+	rwLock sync.RWMutex
+	leases map[int64]time.Time
+}
+
+func (al *atomicLeases) add(leaseID int64, t time.Time) {
+	al.rwLock.Lock()
+	al.leases[leaseID] = t
+	al.rwLock.Unlock()
+}
+
+func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
+	al.rwLock.RLock()
+	rv, ok = al.leases[leaseID]
+	al.rwLock.RUnlock()
+	return rv, ok
+}
+
+func (al *atomicLeases) remove(leaseID int64) {
+	al.rwLock.Lock()
+	delete(al.leases, leaseID)
+	al.rwLock.Unlock()
+}
+
+func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
+	leasesCopy := make(map[int64]time.Time)
+	al.rwLock.RLock()
+	for k, v := range al.leases {
+		leasesCopy[k] = v
+	}
+	al.rwLock.RUnlock()
+	return leasesCopy
+}
+
+type leaseStresserBuilder func(m *member) Stresser
+
+func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresserBuilder {
+	// TODO: probably need to combine newLeaseStresserBuilder with newStresserBuilder to have a unified stresser builder.
+	switch s {
+	case "nop":
+		return func(*member) Stresser {
+			return &nopStresser{
+				start: time.Now(),
+				qps:   lsConfig.qps,
+			}
+		}
+	case "default":
+		return func(mem *member) Stresser {
+			return &leaseStresser{
+				endpoint:     mem.grpcAddr(),
+				numLeases:    lsConfig.numLeases,
+				keysPerLease: lsConfig.keysPerLease,
+			}
+		}
+	default:
+		plog.Panicf("unknown stresser type: %s\n", s)
+	}
+	// never reach here
+	return nil
+}
+
+func (ls *leaseStresser) setupOnce() error {
+	if ls.aliveLeases != nil {
+		return nil
+	}
+	if ls.numLeases == 0 {
+		panic("expect numLeases to be set")
+	}
+	if ls.keysPerLease == 0 {
+		panic("expect keysPerLease to be set")
+	}
+
+	conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure())
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, ls.endpoint)
+	}
+	ls.conn = conn
+	ls.kvc = pb.NewKVClient(conn)
+	ls.lc = pb.NewLeaseClient(conn)
+
+	ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
+	ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
+	return nil
+}
+
+func (ls *leaseStresser) Stress() error {
+	plog.Infof("lease Stresser %v starting ...", ls.endpoint)
+	if err := ls.setupOnce(); err != nil {
+		return err
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	ls.cancel = cancel
+	ls.ctx = ctx
+
+	ls.runWg.Add(1)
+	go ls.run()
+	return nil
+}
+
+func (ls *leaseStresser) run() {
+	defer ls.runWg.Done()
+	ls.restartKeepAlives()
+	for ls.ctx.Err() == nil {
+		plog.Debugf("creating lease on %v ", ls.endpoint)
+		ls.createLeases()
+		plog.Debugf("done creating lease on %v ", ls.endpoint)
+		plog.Debugf("dropping lease on %v ", ls.endpoint)
+		ls.randomlyDropLeases()
+		plog.Debugf("done dropping lease on %v ", ls.endpoint)
+	}
+}
+
+func (ls *leaseStresser) restartKeepAlives() {
+	for leaseID := range ls.aliveLeases.getLeasesMap() {
+		ls.aliveWg.Add(1)
+		go func(id int64) {
+			ls.keepLeaseAlive(id)
+		}(leaseID)
+	}
+}
+
+func (ls *leaseStresser) createLeases() {
+	neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
+	var wg sync.WaitGroup
+	for i := 0; i < neededLeases; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			leaseID, err := ls.createLease()
+			if err != nil {
+				plog.Errorf("lease creation error: (%v)", err)
+				return
+			}
+			plog.Debugf("lease %v created ", leaseID)
+			// if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
+			// because invariant check on the lease will fail due to keys not found
+			if err := ls.attachKeysWithLease(leaseID); err != nil {
+				return
+			}
+			ls.aliveLeases.add(leaseID, time.Now())
+			// keep track of all the keep lease alive go routines
+			ls.aliveWg.Add(1)
+			go ls.keepLeaseAlive(leaseID)
+		}()
+	}
+	wg.Wait()
+}
+
+func (ls *leaseStresser) randomlyDropLeases() {
+	var wg sync.WaitGroup
+	for l := range ls.aliveLeases.getLeasesMap() {
+		wg.Add(1)
+		go func(leaseID int64) {
+			defer wg.Done()
+			dropped, err := ls.randomlyDropLease(leaseID)
+			// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
+			// becasue we can't tell whether the lease is dropped or not.
+			if err != nil {
+				ls.aliveLeases.remove(leaseID)
+				return
+			}
+			if !dropped {
+				return
+			}
+			plog.Debugf("lease %v dropped ", leaseID)
+			ls.revokedLeases.add(leaseID, time.Now())
+			ls.aliveLeases.remove(leaseID)
+		}(l)
+	}
+	wg.Wait()
+}
+
+func (ls *leaseStresser) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
+	ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
+	return ls.lc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
+}
+
+func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
+	resp, err := ls.getLeaseByID(ctx, leaseID)
+	plog.Debugf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
+	if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
+		return true, nil
+	}
+	return false, err
+}
+
+// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
+// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
+// determines whether the attached keys for a given leaseID has been deleted or not
+func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
+	// plog.Infof("retriving keys attached to lease %v", leaseID)
+	resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{
+		Key:      []byte(fmt.Sprintf("%d", leaseID)),
+		RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
+	}, grpc.FailFast(false))
+	plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
+	if err != nil {
+		plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err)
+		return false, err
+	}
+	return len(resp.Kvs) == 0, nil
+}
+
+func (ls *leaseStresser) createLease() (int64, error) {
+	resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: TTL})
+	if err != nil {
+		return -1, err
+	}
+	return resp.ID, nil
+}
+
+func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
+	defer ls.aliveWg.Done()
+	ctx, cancel := context.WithCancel(ls.ctx)
+	stream, err := ls.lc.LeaseKeepAlive(ctx)
+	for {
+		select {
+		case <-time.After(500 * time.Millisecond):
+		case <-ls.ctx.Done():
+			plog.Debugf("keepLeaseAlive lease %v context canceled ", leaseID)
+			_, leaseDropped := ls.revokedLeases.read(leaseID)
+			// it is possible that a lease exists in both revoked leases and alive leases map.
+			// this scenerio can occur if a lease is renewed at the moment that the lease is revoked.
+			// If renewing request arrives before revoking request and client recieves renewing response after the revoking response,
+			// then revoking logic would remove the lease from aliveLeases map and put it into revokedLeases map.
+			// immediately after, the renewing logic (down below) will update lease's timestamp by adding the lease back to aliveLeases map.
+			// therefore, a lease can exist in both aliveLeases and revokedLeases map and needs to be removed from aliveLeases.
+			// it is also possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
+			// this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
+			// to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
+			// if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
+			// if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
+			renewTime, _ := ls.aliveLeases.read(leaseID)
+			if leaseDropped || renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
+				ls.aliveLeases.remove(leaseID)
+				plog.Debugf("keepLeaseAlive lease %v has not been renewed. drop it.", leaseID)
+			}
+			return
+		}
+
+		if err != nil {
+			plog.Debugf("keepLeaseAlive lease %v creates stream error: (%v)", leaseID, err)
+			cancel()
+			ctx, cancel = context.WithCancel(ls.ctx)
+			stream, err = ls.lc.LeaseKeepAlive(ctx)
+			continue
+		}
+		err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
+		plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
+		if err != nil {
+			plog.Debugf("keepLeaseAlive stream sends lease %v error (%v) ", leaseID, err)
+			continue
+		}
+		leaseRenewTime := time.Now()
+		plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
+		respRC, err := stream.Recv()
+		if err != nil {
+			plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v) ", leaseID, err)
+			continue
+		}
+		// lease expires after TTL become 0
+		// don't send keepalive if the lease has expired
+		if respRC.TTL <= 0 {
+			plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0 ", leaseID)
+			ls.aliveLeases.remove(leaseID)
+			return
+		}
+		// update lease's renew time
+		plog.Debugf("keepLeaseAlive renew lease %v", leaseID)
+		ls.aliveLeases.add(leaseID, leaseRenewTime)
+	}
+}
+
+// attachKeysWithLease function attaches keys to the lease.
+// the format of key is the concat of leaseID + '_' + '<order of key creation>'
+// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
+func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
+	var txnPuts []*pb.RequestOp
+	for j := 0; j < ls.keysPerLease; j++ {
+		txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
+			Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
+		txnPuts = append(txnPuts, txnput)
+	}
+	// keep retrying until lease is not found or ctx is being canceled
+	for ls.ctx.Err() == nil {
+		txn := &pb.TxnRequest{Success: txnPuts}
+		_, err := ls.kvc.Txn(ls.ctx, txn)
+		if err == nil {
+			return nil
+		}
+		if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
+			return err
+		}
+	}
+
+	return ls.ctx.Err()
+}
+
+// randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
+// This creates a 50/50 percents chance of dropping a lease
+func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
+	if rand.Intn(2) != 0 {
+		return false, nil
+	}
+	// keep retrying until a lease is dropped or ctx is being canceled
+	for ls.ctx.Err() == nil {
+		_, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
+		if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
+			return true, nil
+		}
+	}
+	plog.Debugf("randomlyDropLease error: (%v)", ls.ctx.Err())
+	return false, ls.ctx.Err()
+}
+
+func (ls *leaseStresser) Cancel() {
+	plog.Debugf("lease stresser %q is canceling...", ls.endpoint)
+	ls.cancel()
+	ls.runWg.Wait()
+	ls.aliveWg.Wait()
+	plog.Infof("lease stresser %q is canceled", ls.endpoint)
+}
+
+func (ls *leaseStresser) Report() (int, int) {
+	return ls.success, ls.failure
+}

+ 12 - 5
tools/functional-tester/etcd-tester/main.go

@@ -58,6 +58,7 @@ func main() {
 	pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort)
 	fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort)
 	agents := make([]agentConfig, len(eps))
+
 	for i := range eps {
 		agents[i].endpoint = eps[i]
 		agents[i].clientPort = cports[i]
@@ -74,11 +75,18 @@ func main() {
 		v2:             *isV2Only,
 	}
 
+	lsConfig := &leaseStressConfig{
+		numLeases:    10,
+		keysPerLease: 10,
+		qps:          *stressQPS, // only used to create nop stresser in leaseStresserBuilder
+	}
+
 	c := &cluster{
-		agents:           agents,
-		v2Only:           *isV2Only,
-		stressBuilder:    newStressBuilder(*stresserType, sConfig),
-		consistencyCheck: *consistencyCheck,
+		agents:               agents,
+		v2Only:               *isV2Only,
+		stressBuilder:        newStressBuilder(*stresserType, sConfig),
+		leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig),
+		consistencyCheck:     *consistencyCheck,
 	}
 
 	if err := c.bootstrap(); err != nil {
@@ -121,7 +129,6 @@ func main() {
 			schedule[i] = failures[caseNum]
 		}
 	}
-
 	t := &tester{
 		failures: schedule,
 		cluster:  c,

+ 1 - 1
tools/functional-tester/etcd-tester/tester.go

@@ -92,7 +92,6 @@ func (tt *tester) doRound(round int) (bool, error) {
 			plog.Printf("%s wait full health error: %v", tt.logPrefix(), err)
 			return false, nil
 		}
-
 		plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc())
 		if err := f.Inject(tt.cluster, round); err != nil {
 			plog.Printf("%s injection error: %v", tt.logPrefix(), err)
@@ -147,6 +146,7 @@ func (tt *tester) checkConsistency() (err error) {
 	if err = tt.cluster.Checker.Check(); err != nil {
 		plog.Printf("%s %v", tt.logPrefix(), err)
 	}
+
 	return err
 }