kv_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ordering
  15. import (
  16. "context"
  17. "errors"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/clientv3"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "github.com/coreos/etcd/integration"
  24. "github.com/coreos/etcd/pkg/testutil"
  25. gContext "golang.org/x/net/context"
  26. )
  27. func TestDetectKvOrderViolation(t *testing.T) {
  28. var errOrderViolation = errors.New("Detected Order Violation")
  29. defer testutil.AfterTest(t)
  30. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  31. defer clus.Terminate(t)
  32. cfg := clientv3.Config{
  33. Endpoints: []string{
  34. clus.Members[0].GRPCAddr(),
  35. clus.Members[1].GRPCAddr(),
  36. clus.Members[2].GRPCAddr(),
  37. },
  38. }
  39. cli, err := clientv3.New(cfg)
  40. ctx := context.TODO()
  41. if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
  42. t.Fatal(err)
  43. }
  44. // ensure that the second member has the current revision for the key foo
  45. if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
  46. t.Fatal(err)
  47. }
  48. // stop third member in order to force the member to have an outdated revision
  49. clus.Members[2].Stop(t)
  50. time.Sleep(1 * time.Second) // give enough time for operation
  51. _, err = cli.Put(ctx, "foo", "buzz")
  52. if err != nil {
  53. t.Fatal(err)
  54. }
  55. // perform get request against the first member, in order to
  56. // set up kvOrdering to expect "foo" revisions greater than that of
  57. // the third member.
  58. orderingKv := NewKV(cli.KV,
  59. func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  60. return errOrderViolation
  61. })
  62. _, err = orderingKv.Get(ctx, "foo")
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. // ensure that only the third member is queried during requests
  67. clus.Members[0].Stop(t)
  68. clus.Members[1].Stop(t)
  69. clus.Members[2].Restart(t)
  70. // force OrderingKv to query the third member
  71. cli.SetEndpoints(clus.Members[2].GRPCAddr())
  72. _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
  73. if err != errOrderViolation {
  74. t.Fatalf("expected %v, got %v", errOrderViolation, err)
  75. }
  76. }
  77. func TestDetectTxnOrderViolation(t *testing.T) {
  78. var errOrderViolation = errors.New("Detected Order Violation")
  79. defer testutil.AfterTest(t)
  80. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  81. defer clus.Terminate(t)
  82. cfg := clientv3.Config{
  83. Endpoints: []string{
  84. clus.Members[0].GRPCAddr(),
  85. clus.Members[1].GRPCAddr(),
  86. clus.Members[2].GRPCAddr(),
  87. },
  88. }
  89. cli, err := clientv3.New(cfg)
  90. ctx := context.TODO()
  91. if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
  92. t.Fatal(err)
  93. }
  94. // ensure that the second member has the current revision for the key foo
  95. if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
  96. t.Fatal(err)
  97. }
  98. // stop third member in order to force the member to have an outdated revision
  99. clus.Members[2].Stop(t)
  100. time.Sleep(1 * time.Second) // give enough time for operation
  101. if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
  102. t.Fatal(err)
  103. }
  104. // perform get request against the first member, in order to
  105. // set up kvOrdering to expect "foo" revisions greater than that of
  106. // the third member.
  107. orderingKv := NewKV(cli.KV,
  108. func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  109. return errOrderViolation
  110. })
  111. orderingTxn := orderingKv.Txn(ctx)
  112. _, err = orderingTxn.If(
  113. clientv3.Compare(clientv3.Value("b"), ">", "a"),
  114. ).Then(
  115. clientv3.OpGet("foo"),
  116. ).Commit()
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. // ensure that only the third member is queried during requests
  121. clus.Members[0].Stop(t)
  122. clus.Members[1].Stop(t)
  123. clus.Members[2].Restart(t)
  124. // force OrderingKv to query the third member
  125. cli.SetEndpoints(clus.Members[2].GRPCAddr())
  126. _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
  127. orderingTxn = orderingKv.Txn(ctx)
  128. _, err = orderingTxn.If(
  129. clientv3.Compare(clientv3.Value("b"), ">", "a"),
  130. ).Then(
  131. clientv3.OpGet("foo", clientv3.WithSerializable()),
  132. ).Commit()
  133. if err != errOrderViolation {
  134. t.Fatalf("expected %v, got %v", errOrderViolation, err)
  135. }
  136. }
  137. type mockKV struct {
  138. clientv3.KV
  139. response clientv3.OpResponse
  140. }
  141. func (kv *mockKV) Do(ctx gContext.Context, op clientv3.Op) (clientv3.OpResponse, error) {
  142. return kv.response, nil
  143. }
  144. var rangeTests = []struct {
  145. prevRev int64
  146. response *clientv3.GetResponse
  147. }{
  148. {
  149. 5,
  150. &clientv3.GetResponse{
  151. Header: &pb.ResponseHeader{
  152. Revision: 5,
  153. },
  154. },
  155. },
  156. {
  157. 5,
  158. &clientv3.GetResponse{
  159. Header: &pb.ResponseHeader{
  160. Revision: 4,
  161. },
  162. },
  163. },
  164. {
  165. 5,
  166. &clientv3.GetResponse{
  167. Header: &pb.ResponseHeader{
  168. Revision: 6,
  169. },
  170. },
  171. },
  172. }
  173. func TestKvOrdering(t *testing.T) {
  174. for i, tt := range rangeTests {
  175. mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
  176. kv := &kvOrdering{
  177. mKV,
  178. func(r *clientv3.GetResponse) OrderViolationFunc {
  179. return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  180. r.Header.Revision++
  181. return nil
  182. }
  183. }(tt.response),
  184. tt.prevRev,
  185. sync.RWMutex{},
  186. }
  187. res, err := kv.Get(nil, "mockKey")
  188. if err != nil {
  189. t.Errorf("#%d: expected response %+v, got error %+v", i, tt.response, err)
  190. }
  191. if rev := res.Header.Revision; rev < tt.prevRev {
  192. t.Errorf("#%d: expected revision %d, got %d", i, tt.prevRev, rev)
  193. }
  194. }
  195. }
  196. var txnTests = []struct {
  197. prevRev int64
  198. response *clientv3.TxnResponse
  199. }{
  200. {
  201. 5,
  202. &clientv3.TxnResponse{
  203. Header: &pb.ResponseHeader{
  204. Revision: 5,
  205. },
  206. },
  207. },
  208. {
  209. 5,
  210. &clientv3.TxnResponse{
  211. Header: &pb.ResponseHeader{
  212. Revision: 8,
  213. },
  214. },
  215. },
  216. {
  217. 5,
  218. &clientv3.TxnResponse{
  219. Header: &pb.ResponseHeader{
  220. Revision: 4,
  221. },
  222. },
  223. },
  224. }
  225. func TestTxnOrdering(t *testing.T) {
  226. for i, tt := range txnTests {
  227. mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()}
  228. kv := &kvOrdering{
  229. mKV,
  230. func(r *clientv3.TxnResponse) OrderViolationFunc {
  231. return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  232. r.Header.Revision++
  233. return nil
  234. }
  235. }(tt.response),
  236. tt.prevRev,
  237. sync.RWMutex{},
  238. }
  239. txn := &txnOrdering{
  240. kv.Txn(context.Background()),
  241. kv,
  242. context.Background(),
  243. sync.Mutex{},
  244. []clientv3.Cmp{},
  245. []clientv3.Op{},
  246. []clientv3.Op{},
  247. }
  248. res, err := txn.Commit()
  249. if err != nil {
  250. t.Errorf("#%d: expected response %+v, got error %+v", i, tt.response, err)
  251. }
  252. if rev := res.Header.Revision; rev < tt.prevRev {
  253. t.Errorf("#%d: expected revision %d, got %d", i, tt.prevRev, rev)
  254. }
  255. }
  256. }