|
|
@@ -1079,3 +1079,72 @@ func TestV3WatchWithFilter(t *testing.T) {
|
|
|
t.Fatal("failed to receive delete event")
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func TestV3WatchWithPrevKV(t *testing.T) {
|
|
|
+ clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ tests := []struct {
|
|
|
+ key string
|
|
|
+ end string
|
|
|
+ vals []string
|
|
|
+ }{{
|
|
|
+ key: "foo",
|
|
|
+ end: "fop",
|
|
|
+ vals: []string{"bar1", "bar2"},
|
|
|
+ }, {
|
|
|
+ key: "/abc",
|
|
|
+ end: "/abd",
|
|
|
+ vals: []string{"first", "second"},
|
|
|
+ }}
|
|
|
+ for i, tt := range tests {
|
|
|
+ kvc := toGRPC(clus.RandClient()).KV
|
|
|
+ if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(tt.key), Value: []byte(tt.vals[0])}); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ ws, werr := toGRPC(clus.RandClient()).Watch.Watch(context.TODO())
|
|
|
+ if werr != nil {
|
|
|
+ t.Fatal(werr)
|
|
|
+ }
|
|
|
+
|
|
|
+ req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
|
|
+ CreateRequest: &pb.WatchCreateRequest{
|
|
|
+ Key: []byte(tt.key),
|
|
|
+ RangeEnd: []byte(tt.end),
|
|
|
+ PrevKv: true,
|
|
|
+ }}}
|
|
|
+ if err := ws.Send(req); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if _, err := ws.Recv(); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(tt.key), Value: []byte(tt.vals[1])}); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ recv := make(chan *pb.WatchResponse)
|
|
|
+ go func() {
|
|
|
+ // check received PUT
|
|
|
+ resp, rerr := ws.Recv()
|
|
|
+ if rerr != nil {
|
|
|
+ t.Fatal(rerr)
|
|
|
+ }
|
|
|
+ recv <- resp
|
|
|
+ }()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case resp := <-recv:
|
|
|
+ if tt.vals[1] != string(resp.Events[0].Kv.Value) {
|
|
|
+ t.Errorf("#%d: unequal value: want=%s, get=%s", i, tt.vals[1], resp.Events[0].Kv.Value)
|
|
|
+ }
|
|
|
+ if tt.vals[0] != string(resp.Events[0].PrevKv.Value) {
|
|
|
+ t.Errorf("#%d: unequal value: want=%s, get=%s", i, tt.vals[0], resp.Events[0].PrevKv.Value)
|
|
|
+ }
|
|
|
+ case <-time.After(30 * time.Second):
|
|
|
+ t.Error("timeout waiting for watch response")
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|