|
|
@@ -17,10 +17,12 @@ package integration
|
|
|
import (
|
|
|
"reflect"
|
|
|
"sort"
|
|
|
+ "sync"
|
|
|
"testing"
|
|
|
"time"
|
|
|
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
|
+ "github.com/coreos/etcd/clientv3/concurrency"
|
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
"github.com/coreos/etcd/integration"
|
|
|
"github.com/coreos/etcd/pkg/testutil"
|
|
|
@@ -574,3 +576,57 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
|
|
|
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// TestV3LeaseFailureOverlap issues Grant and Keepalive requests to a cluster
|
|
|
+// before, during, and after quorum loss to confirm Grant/Keepalive tolerates
|
|
|
+// transient cluster failure.
|
|
|
+func TestV3LeaseFailureOverlap(t *testing.T) {
|
|
|
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ numReqs := 5
|
|
|
+ cli := clus.Client(0)
|
|
|
+
|
|
|
+ // bring up a session, tear it down
|
|
|
+ updown := func(i int) error {
|
|
|
+ sess, err := concurrency.NewSession(cli)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ ch := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ defer close(ch)
|
|
|
+ sess.Close()
|
|
|
+ }()
|
|
|
+ select {
|
|
|
+ case <-ch:
|
|
|
+ case <-time.After(time.Minute / 4):
|
|
|
+ t.Fatalf("timeout %d", i)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ mkReqs := func(n int) {
|
|
|
+ wg.Add(numReqs)
|
|
|
+ for i := 0; i < numReqs; i++ {
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ err := updown(n)
|
|
|
+ if err == nil || err == rpctypes.ErrTimeoutDueToConnectionLost {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ t.Fatal(err)
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ mkReqs(1)
|
|
|
+ clus.Members[1].Stop(t)
|
|
|
+ mkReqs(2)
|
|
|
+ time.Sleep(time.Second)
|
|
|
+ mkReqs(3)
|
|
|
+ clus.Members[1].Restart(t)
|
|
|
+ mkReqs(4)
|
|
|
+ wg.Wait()
|
|
|
+}
|