kv_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ordering
  15. import (
  16. "context"
  17. gContext "context"
  18. "errors"
  19. "sync"
  20. "testing"
  21. "time"
  22. "go.etcd.io/etcd/clientv3"
  23. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  24. "go.etcd.io/etcd/integration"
  25. "go.etcd.io/etcd/pkg/testutil"
  26. )
  27. func TestDetectKvOrderViolation(t *testing.T) {
  28. var errOrderViolation = errors.New("Detected Order Violation")
  29. defer testutil.AfterTest(t)
  30. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  31. defer clus.Terminate(t)
  32. cfg := clientv3.Config{
  33. Endpoints: []string{
  34. clus.Members[0].GRPCAddr(),
  35. clus.Members[1].GRPCAddr(),
  36. clus.Members[2].GRPCAddr(),
  37. },
  38. }
  39. cli, err := clientv3.New(cfg)
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. ctx := context.TODO()
  44. if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
  45. t.Fatal(err)
  46. }
  47. // ensure that the second member has the current revision for the key foo
  48. if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
  49. t.Fatal(err)
  50. }
  51. // stop third member in order to force the member to have an outdated revision
  52. clus.Members[2].Stop(t)
  53. time.Sleep(1 * time.Second) // give enough time for operation
  54. _, err = cli.Put(ctx, "foo", "buzz")
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. // perform get request against the first member, in order to
  59. // set up kvOrdering to expect "foo" revisions greater than that of
  60. // the third member.
  61. orderingKv := NewKV(cli.KV,
  62. func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  63. return errOrderViolation
  64. })
  65. _, err = orderingKv.Get(ctx, "foo")
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. // ensure that only the third member is queried during requests
  70. clus.Members[0].Stop(t)
  71. clus.Members[1].Stop(t)
  72. clus.Members[2].Restart(t)
  73. // force OrderingKv to query the third member
  74. cli.SetEndpoints(clus.Members[2].GRPCAddr())
  75. time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
  76. _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
  77. if err != errOrderViolation {
  78. t.Fatalf("expected %v, got %v", errOrderViolation, err)
  79. }
  80. }
  81. func TestDetectTxnOrderViolation(t *testing.T) {
  82. var errOrderViolation = errors.New("Detected Order Violation")
  83. defer testutil.AfterTest(t)
  84. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  85. defer clus.Terminate(t)
  86. cfg := clientv3.Config{
  87. Endpoints: []string{
  88. clus.Members[0].GRPCAddr(),
  89. clus.Members[1].GRPCAddr(),
  90. clus.Members[2].GRPCAddr(),
  91. },
  92. }
  93. cli, err := clientv3.New(cfg)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. ctx := context.TODO()
  98. if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
  99. t.Fatal(err)
  100. }
  101. // ensure that the second member has the current revision for the key foo
  102. if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
  103. t.Fatal(err)
  104. }
  105. // stop third member in order to force the member to have an outdated revision
  106. clus.Members[2].Stop(t)
  107. time.Sleep(1 * time.Second) // give enough time for operation
  108. if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
  109. t.Fatal(err)
  110. }
  111. // perform get request against the first member, in order to
  112. // set up kvOrdering to expect "foo" revisions greater than that of
  113. // the third member.
  114. orderingKv := NewKV(cli.KV,
  115. func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  116. return errOrderViolation
  117. })
  118. orderingTxn := orderingKv.Txn(ctx)
  119. _, err = orderingTxn.If(
  120. clientv3.Compare(clientv3.Value("b"), ">", "a"),
  121. ).Then(
  122. clientv3.OpGet("foo"),
  123. ).Commit()
  124. if err != nil {
  125. t.Fatal(err)
  126. }
  127. // ensure that only the third member is queried during requests
  128. clus.Members[0].Stop(t)
  129. clus.Members[1].Stop(t)
  130. clus.Members[2].Restart(t)
  131. // force OrderingKv to query the third member
  132. cli.SetEndpoints(clus.Members[2].GRPCAddr())
  133. time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
  134. _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
  135. if err != errOrderViolation {
  136. t.Fatalf("expected %v, got %v", errOrderViolation, err)
  137. }
  138. orderingTxn = orderingKv.Txn(ctx)
  139. _, err = orderingTxn.If(
  140. clientv3.Compare(clientv3.Value("b"), ">", "a"),
  141. ).Then(
  142. clientv3.OpGet("foo", clientv3.WithSerializable()),
  143. ).Commit()
  144. if err != errOrderViolation {
  145. t.Fatalf("expected %v, got %v", errOrderViolation, err)
  146. }
  147. }
  148. type mockKV struct {
  149. clientv3.KV
  150. response clientv3.OpResponse
  151. }
  152. func (kv *mockKV) Do(ctx gContext.Context, op clientv3.Op) (clientv3.OpResponse, error) {
  153. return kv.response, nil
  154. }
  155. var rangeTests = []struct {
  156. prevRev int64
  157. response *clientv3.GetResponse
  158. }{
  159. {
  160. 5,
  161. &clientv3.GetResponse{
  162. Header: &pb.ResponseHeader{
  163. Revision: 5,
  164. },
  165. },
  166. },
  167. {
  168. 5,
  169. &clientv3.GetResponse{
  170. Header: &pb.ResponseHeader{
  171. Revision: 4,
  172. },
  173. },
  174. },
  175. {
  176. 5,
  177. &clientv3.GetResponse{
  178. Header: &pb.ResponseHeader{
  179. Revision: 6,
  180. },
  181. },
  182. },
  183. }
  184. func TestKvOrdering(t *testing.T) {
  185. for i, tt := range rangeTests {
  186. mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
  187. kv := &kvOrdering{
  188. mKV,
  189. func(r *clientv3.GetResponse) OrderViolationFunc {
  190. return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  191. r.Header.Revision++
  192. return nil
  193. }
  194. }(tt.response),
  195. tt.prevRev,
  196. sync.RWMutex{},
  197. }
  198. res, err := kv.Get(nil, "mockKey")
  199. if err != nil {
  200. t.Errorf("#%d: expected response %+v, got error %+v", i, tt.response, err)
  201. }
  202. if rev := res.Header.Revision; rev < tt.prevRev {
  203. t.Errorf("#%d: expected revision %d, got %d", i, tt.prevRev, rev)
  204. }
  205. }
  206. }
  207. var txnTests = []struct {
  208. prevRev int64
  209. response *clientv3.TxnResponse
  210. }{
  211. {
  212. 5,
  213. &clientv3.TxnResponse{
  214. Header: &pb.ResponseHeader{
  215. Revision: 5,
  216. },
  217. },
  218. },
  219. {
  220. 5,
  221. &clientv3.TxnResponse{
  222. Header: &pb.ResponseHeader{
  223. Revision: 8,
  224. },
  225. },
  226. },
  227. {
  228. 5,
  229. &clientv3.TxnResponse{
  230. Header: &pb.ResponseHeader{
  231. Revision: 4,
  232. },
  233. },
  234. },
  235. }
  236. func TestTxnOrdering(t *testing.T) {
  237. for i, tt := range txnTests {
  238. mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()}
  239. kv := &kvOrdering{
  240. mKV,
  241. func(r *clientv3.TxnResponse) OrderViolationFunc {
  242. return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
  243. r.Header.Revision++
  244. return nil
  245. }
  246. }(tt.response),
  247. tt.prevRev,
  248. sync.RWMutex{},
  249. }
  250. txn := &txnOrdering{
  251. kv.Txn(context.Background()),
  252. kv,
  253. context.Background(),
  254. sync.Mutex{},
  255. []clientv3.Cmp{},
  256. []clientv3.Op{},
  257. []clientv3.Op{},
  258. }
  259. res, err := txn.Commit()
  260. if err != nil {
  261. t.Errorf("#%d: expected response %+v, got error %+v", i, tt.response, err)
  262. }
  263. if rev := res.Header.Revision; rev < tt.prevRev {
  264. t.Errorf("#%d: expected revision %d, got %d", i, tt.prevRev, rev)
  265. }
  266. }
  267. }