Browse Source

clientv3/integration: shorten keepalive timeout

Xiang 8 years ago
parent
commit
3899f9e3c5
1 changed files with 22 additions and 20 deletions
  1. 22 20
      clientv3/integration/watch_keepalive_test.go

+ 22 - 20
clientv3/integration/watch_keepalive_test.go

@@ -33,17 +33,23 @@ func TestWatchKeepAlive(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 
 
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
-		Size:                 3,
-		GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
-	})
+		Size:                 2,
+		GRPCKeepAliveMinTime: 1 * time.Millisecond},
+	) // avoid too_many_pings
+
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	ccfg := clientv3.Config{
 	ccfg := clientv3.Config{
 		Endpoints:            []string{clus.Members[0].GRPCAddr()},
 		Endpoints:            []string{clus.Members[0].GRPCAddr()},
 		DialTimeout:          3 * time.Second,
 		DialTimeout:          3 * time.Second,
-		DialKeepAliveTime:    2 * time.Second,
-		DialKeepAliveTimeout: 2 * time.Second,
+		DialKeepAliveTime:    1 * time.Second,
+		DialKeepAliveTimeout: 500 * time.Millisecond,
 	}
 	}
+
+	// gRPC internal implmentation related.
+	pingInterval := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout
+	timeout := pingInterval + 2*time.Second // 2s for slow machine to process watch and reset connections
+
 	cli, err := clientv3.New(ccfg)
 	cli, err := clientv3.New(ccfg)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -55,38 +61,34 @@ func TestWatchKeepAlive(t *testing.T) {
 		t.Fatalf("watch failed on creation")
 		t.Fatalf("watch failed on creation")
 	}
 	}
 
 
-	clus.Members[0].Blackhole()
-
-	// expects endpoint switch to ep[1]
+	// endpoint can switch to ep[1] when it detects the failure of ep0
 	cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr())
 	cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr())
 
 
-	// ep[0] keepalive time-out after DialKeepAliveTime + DialKeepAliveTimeout
-	// wait extra for processing network error for endpoint switching
-	timeout := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout + ccfg.DialTimeout
-	time.Sleep(timeout)
+	clus.Members[0].Blackhole()
 
 
 	if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
 	if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	select {
 	select {
 	case <-wch:
 	case <-wch:
-	case <-time.After(5 * time.Second):
-		t.Fatal("took too long to receive events")
+	case <-time.After(timeout):
+		t.Error("took too long to receive watch events")
 	}
 	}
 
 
 	clus.Members[0].Unblackhole()
 	clus.Members[0].Unblackhole()
 	clus.Members[1].Blackhole()
 	clus.Members[1].Blackhole()
-	defer clus.Members[1].Unblackhole()
 
 
-	// wait for ep[0] recover, ep[1] fail
-	time.Sleep(timeout)
+	// make sure client0 can connect to member 0 after remove the blackhole.
+	if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil {
+		t.Fatal(err)
+	}
 
 
-	if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar"); err != nil {
+	if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar1"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	select {
 	select {
 	case <-wch:
 	case <-wch:
-	case <-time.After(5 * time.Second):
-		t.Fatal("took too long to receive events")
+	case <-time.After(timeout):
+		t.Error("took too long to receive watch events")
 	}
 	}
 }
 }