|
|
@@ -106,27 +106,35 @@ func TestTxnReadRetry(t *testing.T) {
|
|
|
defer clus.Terminate(t)
|
|
|
|
|
|
kv := clus.Client(0)
|
|
|
- clus.Members[0].Stop(t)
|
|
|
- <-clus.Members[0].StopNotify()
|
|
|
|
|
|
- donec := make(chan struct{})
|
|
|
- go func() {
|
|
|
- ctx := context.TODO()
|
|
|
- _, err := kv.Txn(ctx).Then(clientv3.OpGet("foo")).Commit()
|
|
|
- if err != nil {
|
|
|
- t.Fatalf("expected response, got error %v", err)
|
|
|
+ thenOps := [][]clientv3.Op{
|
|
|
+ {clientv3.OpGet("foo")},
|
|
|
+ {clientv3.OpTxn(nil, []clientv3.Op{clientv3.OpGet("foo")}, nil)},
|
|
|
+ {clientv3.OpTxn(nil, nil, nil)},
|
|
|
+ {},
|
|
|
+ }
|
|
|
+ for i := range thenOps {
|
|
|
+ clus.Members[0].Stop(t)
|
|
|
+ <-clus.Members[0].StopNotify()
|
|
|
+
|
|
|
+ donec := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ _, err := kv.Txn(context.TODO()).Then(thenOps[i]...).Commit()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("expected response, got error %v", err)
|
|
|
+ }
|
|
|
+ donec <- struct{}{}
|
|
|
+ }()
|
|
|
+ // wait for txn to fail on disconnect
|
|
|
+ time.Sleep(100 * time.Millisecond)
|
|
|
+
|
|
|
+ // restart node; client should resume
|
|
|
+ clus.Members[0].Restart(t)
|
|
|
+ select {
|
|
|
+ case <-donec:
|
|
|
+ case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
|
|
|
+ t.Fatalf("waited too long")
|
|
|
}
|
|
|
- donec <- struct{}{}
|
|
|
- }()
|
|
|
- // wait for txn to fail on disconnect
|
|
|
- time.Sleep(100 * time.Millisecond)
|
|
|
-
|
|
|
- // restart node; client should resume
|
|
|
- clus.Members[0].Restart(t)
|
|
|
- select {
|
|
|
- case <-donec:
|
|
|
- case <-time.After(2 * clus.Members[1].ServerConfig.ReqTimeout()):
|
|
|
- t.Fatalf("waited too long")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -179,3 +187,41 @@ func TestTxnCompareRange(t *testing.T) {
|
|
|
t.Fatal("expected prefix compare to false, got compares as true")
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func TestTxnNested(t *testing.T) {
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
+
|
|
|
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ kv := clus.Client(0)
|
|
|
+
|
|
|
+ tresp, err := kv.Txn(context.TODO()).
|
|
|
+ If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
|
|
|
+ Then(
|
|
|
+ clientv3.OpPut("foo", "bar"),
|
|
|
+ clientv3.OpTxn(nil, []clientv3.Op{clientv3.OpPut("abc", "123")}, nil)).
|
|
|
+ Else(clientv3.OpPut("foo", "baz")).Commit()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if len(tresp.Responses) != 2 {
|
|
|
+ t.Errorf("expected 2 top-level txn responses, got %+v", tresp.Responses)
|
|
|
+ }
|
|
|
+
|
|
|
+ // check txn writes were applied
|
|
|
+ resp, err := kv.Get(context.TODO(), "foo")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "bar" {
|
|
|
+ t.Errorf("unexpected Get response %+v", resp)
|
|
|
+ }
|
|
|
+ resp, err = kv.Get(context.TODO(), "abc")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "123" {
|
|
|
+ t.Errorf("unexpected Get response %+v", resp)
|
|
|
+ }
|
|
|
+}
|