|
|
@@ -804,21 +804,39 @@ func TestWatchCancelOnServer(t *testing.T) {
|
|
|
defer cluster.Terminate(t)
|
|
|
|
|
|
client := cluster.RandClient()
|
|
|
+ numWatches := 10
|
|
|
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
+ cancels := make([]context.CancelFunc, numWatches)
|
|
|
+ for i := 0; i < numWatches; i++ {
|
|
|
+ // use WithTimeout to force separate streams in client
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
- client.Watch(ctx, "a", clientv3.WithCreatedNotify())
|
|
|
- cancel()
|
|
|
+ cancels[i] = cancel
|
|
|
+ w := client.Watch(ctx, fmt.Sprintf("%d", i), clientv3.WithCreatedNotify())
|
|
|
+ <-w
|
|
|
+ }
|
|
|
+
|
|
|
+ // get max watches; proxy tests have leadership watches, so total may be >numWatches
|
|
|
+ maxWatches, _ := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
|
|
|
+
|
|
|
+ // cancel all and wait for cancels to propagate to etcd server
|
|
|
+ for i := 0; i < numWatches; i++ {
|
|
|
+ cancels[i]()
|
|
|
}
|
|
|
- // wait for cancels to propagate
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
|
- watchers, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
|
|
|
+ minWatches, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- if watchers != "0" {
|
|
|
- t.Fatalf("expected 0 watchers, got %q", watchers)
|
|
|
+
|
|
|
+ maxWatchV, minWatchV := 0, 0
|
|
|
+ n, serr := fmt.Sscanf(maxWatches+" "+minWatches, "%d %d", &maxWatchV, &minWatchV)
|
|
|
+ if n != 2 || serr != nil {
|
|
|
+ t.Fatalf("expected n=2 and err=nil, got n=%d and err=%v", n, serr)
|
|
|
+ }
|
|
|
+
|
|
|
+ if maxWatchV-minWatchV != numWatches {
|
|
|
+ t.Fatalf("expected %d canceled watchers, got %d", numWatches, maxWatchV-minWatchV)
|
|
|
}
|
|
|
}
|
|
|
|