Browse Source

integration: overlapped create and put v3 watcher test

Anthony Romano 10 years ago
parent
commit
155412bbfa
1 changed files with 80 additions and 0 deletions
  1. 80 0
      integration/v3_watch_test.go

+ 80 - 0
integration/v3_watch_test.go

@@ -321,6 +321,86 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
 	clus.Terminate(t)
 }
 
+// TestV3WatchCurrentPutOverlap ensures current watchers receive all events with
+// overlapping puts.
+func TestV3WatchCurrentPutOverlap(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	wStream, wErr := clus.RandClient().Watch.Watch(ctx)
+	if wErr != nil {
+		t.Fatalf("wAPI.Watch error: %v", wErr)
+	}
+
+	// last mod_revision that will be observed
+	nrRevisions := 32
+	// first revision already allocated as empty revision
+	for i := 1; i < nrRevisions; i++ {
+		go func() {
+			kvc := clus.RandClient().KV
+			req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
+			if _, err := kvc.Put(context.TODO(), req); err != nil {
+				t.Fatalf("couldn't put key (%v)", err)
+			}
+		}()
+	}
+
+	// maps watcher to current expected revision
+	progress := make(map[int64]int64)
+
+	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
+		CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}}
+	if err := wStream.Send(wreq); err != nil {
+		t.Fatalf("first watch request failed (%v)", err)
+	}
+
+	more := true
+	progress[-1] = 0 // watcher creation pending
+	for more {
+		resp, err := wStream.Recv()
+		if err != nil {
+			t.Fatalf("wStream.Recv error: %v", err)
+		}
+
+		if resp.Created {
+			// accept events > header revision
+			progress[resp.WatchId] = resp.Header.Revision + 1
+			if resp.Header.Revision == int64(nrRevisions) {
+				// covered all revisions; create no more watchers
+				progress[-1] = int64(nrRevisions) + 1
+			} else if err := wStream.Send(wreq); err != nil {
+				t.Fatalf("watch request failed (%v)", err)
+			}
+		} else if len(resp.Events) == 0 {
+			t.Fatalf("got events %v, want non-empty", resp.Events)
+		} else {
+			wRev, ok := progress[resp.WatchId]
+			if !ok {
+				t.Fatalf("got %+v, but watch id shouldn't exist ", resp)
+			}
+			if resp.Events[0].Kv.ModRevision != wRev {
+				t.Fatalf("got %+v, wanted first revision %d", resp, wRev)
+			}
+			lastRev := resp.Events[len(resp.Events)-1].Kv.ModRevision
+			progress[resp.WatchId] = lastRev + 1
+		}
+		more = false
+		for _, v := range progress {
+			if v <= int64(nrRevisions) {
+				more = true
+				break
+			}
+		}
+	}
+
+	if rok, nr := waitResponse(wStream, time.Second); !rok {
+		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
+	}
+}
+
 func TestV3WatchMultipleWatchersSynced(t *testing.T) {
 	defer testutil.AfterTest(t)
 	testV3WatchMultipleWatchers(t, 0)