|
|
@@ -26,19 +26,43 @@ import (
|
|
|
"github.com/coreos/etcd/pkg/testutil"
|
|
|
)
|
|
|
|
|
|
-func TestBlackholePutWithoutKeealiveEnabled(t *testing.T) {
|
|
|
+func TestBalancerUnderBlackholeNoKeepAlivePut(t *testing.T) {
|
|
|
+ testBalancerUnderBlackholeNoKeepAliveMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
|
|
|
+ _, err := cli.Put(ctx, "foo", "bar")
|
|
|
+ return err
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func TestBalancerUnderBlackholeNoKeepAliveDelete(t *testing.T) {
|
|
|
+ testBalancerUnderBlackholeNoKeepAliveMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
|
|
|
+ _, err := cli.Delete(ctx, "foo")
|
|
|
+ return err
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func TestBalancerUnderBlackholeNoKeepAliveTxn(t *testing.T) {
|
|
|
+ testBalancerUnderBlackholeNoKeepAliveMutable(t, func(cli *clientv3.Client, ctx context.Context) error {
|
|
|
+ _, err := cli.Txn(ctx).
|
|
|
+ If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
|
|
|
+ Then(clientv3.OpPut("foo", "bar")).
|
|
|
+ Else(clientv3.OpPut("foo", "baz")).Commit()
|
|
|
+ return err
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func testBalancerUnderBlackholeNoKeepAliveMutable(t *testing.T, op func(*clientv3.Client, context.Context) error) {
|
|
|
defer testutil.AfterTest(t)
|
|
|
|
|
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
|
|
|
- Size: 2,
|
|
|
- GRPCKeepAliveMinTime: 1 * time.Millisecond,
|
|
|
- SkipCreatingClient: true},
|
|
|
- ) // avoid too_many_pings
|
|
|
-
|
|
|
+ Size: 2,
|
|
|
+ SkipCreatingClient: true,
|
|
|
+ })
|
|
|
defer clus.Terminate(t)
|
|
|
|
|
|
+ eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
|
|
|
+
|
|
|
ccfg := clientv3.Config{
|
|
|
- Endpoints: []string{clus.Members[0].GRPCAddr()},
|
|
|
+ Endpoints: []string{eps[0]},
|
|
|
DialTimeout: 1 * time.Second,
|
|
|
}
|
|
|
cli, err := clientv3.New(ccfg)
|
|
|
@@ -47,23 +71,29 @@ func TestBlackholePutWithoutKeealiveEnabled(t *testing.T) {
|
|
|
}
|
|
|
defer cli.Close()
|
|
|
|
|
|
- // wait for ep[0] to be pinned
|
|
|
+ // wait for eps[0] to be pinned
|
|
|
mustWaitPinReady(t, cli)
|
|
|
|
|
|
- cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr())
|
|
|
+ // add all eps to list, so that when the original pined one fails
|
|
|
+ // the client can switch to other available eps
|
|
|
+ cli.SetEndpoints(eps...)
|
|
|
+
|
|
|
+ // blackhole eps[0]
|
|
|
clus.Members[0].Blackhole()
|
|
|
|
|
|
// fail first due to blackhole, retry should succeed
|
|
|
- // when gRPC supports better retry on non-delivered request, the first put can succeed.
|
|
|
- ctx, c1 := context.WithTimeout(context.Background(), time.Second)
|
|
|
- defer c1()
|
|
|
- if _, err = cli.Put(ctx, "foo", "bar"); err != context.DeadlineExceeded {
|
|
|
- t.Fatalf("err = %v, want %v", err, context.DeadlineExceeded)
|
|
|
- }
|
|
|
-
|
|
|
- ctx, c2 := context.WithTimeout(context.Background(), time.Second)
|
|
|
- defer c2()
|
|
|
- if _, err = cli.Put(ctx, "foo", "bar"); err != nil {
|
|
|
- t.Errorf("put failed with error %v", err)
|
|
|
+ // TODO: first mutable operation can succeed
|
|
|
+ // when gRPC supports better retry on non-delivered request
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
+ err = op(cli, ctx)
|
|
|
+ cancel()
|
|
|
+ if i == 0 {
|
|
|
+ if err != context.DeadlineExceeded {
|
|
|
+ t.Fatalf("#%d: err = %v, want %v", i, err, context.DeadlineExceeded)
|
|
|
+ }
|
|
|
+ } else if err != nil {
|
|
|
+ t.Errorf("#%d: mutable operation failed with error %v", i, err)
|
|
|
+ }
|
|
|
}
|
|
|
}
|