v3_stm_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "strconv"
  20. "testing"
  21. v3 "go.etcd.io/etcd/clientv3"
  22. "go.etcd.io/etcd/clientv3/concurrency"
  23. "go.etcd.io/etcd/pkg/testutil"
  24. )
  25. // TestSTMConflict tests that conflicts are retried.
  26. func TestSTMConflict(t *testing.T) {
  27. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  28. defer clus.Terminate(t)
  29. etcdc := clus.RandClient()
  30. keys := make([]string, 5)
  31. for i := 0; i < len(keys); i++ {
  32. keys[i] = fmt.Sprintf("foo-%d", i)
  33. if _, err := etcdc.Put(context.TODO(), keys[i], "100"); err != nil {
  34. t.Fatalf("could not make key (%v)", err)
  35. }
  36. }
  37. errc := make(chan error)
  38. for i := range keys {
  39. curEtcdc := clus.RandClient()
  40. srcKey := keys[i]
  41. applyf := func(stm concurrency.STM) error {
  42. src := stm.Get(srcKey)
  43. // must be different key to avoid double-adding
  44. dstKey := srcKey
  45. for dstKey == srcKey {
  46. dstKey = keys[rand.Intn(len(keys))]
  47. }
  48. dst := stm.Get(dstKey)
  49. srcV, _ := strconv.ParseInt(src, 10, 64)
  50. dstV, _ := strconv.ParseInt(dst, 10, 64)
  51. if srcV == 0 {
  52. // can't rand.Intn on 0, so skip this transaction
  53. return nil
  54. }
  55. xfer := int64(rand.Intn(int(srcV)) / 2)
  56. stm.Put(srcKey, fmt.Sprintf("%d", srcV-xfer))
  57. stm.Put(dstKey, fmt.Sprintf("%d", dstV+xfer))
  58. return nil
  59. }
  60. go func() {
  61. iso := concurrency.WithIsolation(concurrency.RepeatableReads)
  62. _, err := concurrency.NewSTM(curEtcdc, applyf, iso)
  63. errc <- err
  64. }()
  65. }
  66. // wait for txns
  67. for range keys {
  68. if err := <-errc; err != nil {
  69. t.Fatalf("apply failed (%v)", err)
  70. }
  71. }
  72. // ensure sum matches initial sum
  73. sum := 0
  74. for _, oldkey := range keys {
  75. rk, err := etcdc.Get(context.TODO(), oldkey)
  76. if err != nil {
  77. t.Fatalf("couldn't fetch key %s (%v)", oldkey, err)
  78. }
  79. v, _ := strconv.ParseInt(string(rk.Kvs[0].Value), 10, 64)
  80. sum += int(v)
  81. }
  82. if sum != len(keys)*100 {
  83. t.Fatalf("bad sum. got %d, expected %d", sum, len(keys)*100)
  84. }
  85. }
  86. // TestSTMPutNewKey confirms a STM put on a new key is visible after commit.
  87. func TestSTMPutNewKey(t *testing.T) {
  88. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  89. defer clus.Terminate(t)
  90. etcdc := clus.RandClient()
  91. applyf := func(stm concurrency.STM) error {
  92. stm.Put("foo", "bar")
  93. return nil
  94. }
  95. iso := concurrency.WithIsolation(concurrency.RepeatableReads)
  96. if _, err := concurrency.NewSTM(etcdc, applyf, iso); err != nil {
  97. t.Fatalf("error on stm txn (%v)", err)
  98. }
  99. resp, err := etcdc.Get(context.TODO(), "foo")
  100. if err != nil {
  101. t.Fatalf("error fetching key (%v)", err)
  102. }
  103. if string(resp.Kvs[0].Value) != "bar" {
  104. t.Fatalf("bad value. got %+v, expected 'bar' value", resp)
  105. }
  106. }
  107. // TestSTMAbort tests that an aborted txn does not modify any keys.
  108. func TestSTMAbort(t *testing.T) {
  109. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  110. defer clus.Terminate(t)
  111. etcdc := clus.RandClient()
  112. ctx, cancel := context.WithCancel(context.TODO())
  113. applyf := func(stm concurrency.STM) error {
  114. stm.Put("foo", "baz")
  115. cancel()
  116. stm.Put("foo", "bap")
  117. return nil
  118. }
  119. iso := concurrency.WithIsolation(concurrency.RepeatableReads)
  120. sctx := concurrency.WithAbortContext(ctx)
  121. if _, err := concurrency.NewSTM(etcdc, applyf, iso, sctx); err == nil {
  122. t.Fatalf("no error on stm txn")
  123. }
  124. resp, err := etcdc.Get(context.TODO(), "foo")
  125. if err != nil {
  126. t.Fatalf("error fetching key (%v)", err)
  127. }
  128. if len(resp.Kvs) != 0 {
  129. t.Fatalf("bad value. got %+v, expected nothing", resp)
  130. }
  131. }
  132. // TestSTMSerialize tests that serialization is honored when serializable.
  133. func TestSTMSerialize(t *testing.T) {
  134. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  135. defer clus.Terminate(t)
  136. etcdc := clus.RandClient()
  137. // set up initial keys
  138. keys := make([]string, 5)
  139. for i := 0; i < len(keys); i++ {
  140. keys[i] = fmt.Sprintf("foo-%d", i)
  141. }
  142. // update keys in full batches
  143. updatec := make(chan struct{})
  144. go func() {
  145. defer close(updatec)
  146. for i := 0; i < 5; i++ {
  147. s := fmt.Sprintf("%d", i)
  148. ops := []v3.Op{}
  149. for _, k := range keys {
  150. ops = append(ops, v3.OpPut(k, s))
  151. }
  152. if _, err := etcdc.Txn(context.TODO()).Then(ops...).Commit(); err != nil {
  153. t.Errorf("couldn't put keys (%v)", err)
  154. }
  155. updatec <- struct{}{}
  156. }
  157. }()
  158. // read all keys in txn, make sure all values match
  159. errc := make(chan error)
  160. for range updatec {
  161. curEtcdc := clus.RandClient()
  162. applyf := func(stm concurrency.STM) error {
  163. vs := []string{}
  164. for i := range keys {
  165. vs = append(vs, stm.Get(keys[i]))
  166. }
  167. for i := range vs {
  168. if vs[0] != vs[i] {
  169. return fmt.Errorf("got vs[%d] = %v, want %v", i, vs[i], vs[0])
  170. }
  171. }
  172. return nil
  173. }
  174. go func() {
  175. iso := concurrency.WithIsolation(concurrency.Serializable)
  176. _, err := concurrency.NewSTM(curEtcdc, applyf, iso)
  177. errc <- err
  178. }()
  179. }
  180. for i := 0; i < 5; i++ {
  181. if err := <-errc; err != nil {
  182. t.Error(err)
  183. }
  184. }
  185. }
  186. // TestSTMApplyOnConcurrentDeletion ensures that concurrent key deletion
  187. // fails the first GET revision comparison within STM; trigger retry.
  188. func TestSTMApplyOnConcurrentDeletion(t *testing.T) {
  189. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  190. defer clus.Terminate(t)
  191. etcdc := clus.RandClient()
  192. if _, err := etcdc.Put(context.TODO(), "foo", "bar"); err != nil {
  193. t.Fatal(err)
  194. }
  195. donec, readyc := make(chan struct{}), make(chan struct{})
  196. go func() {
  197. <-readyc
  198. if _, err := etcdc.Delete(context.TODO(), "foo"); err != nil {
  199. t.Error(err)
  200. }
  201. close(donec)
  202. }()
  203. try := 0
  204. applyf := func(stm concurrency.STM) error {
  205. try++
  206. stm.Get("foo")
  207. if try == 1 {
  208. // trigger delete to make GET rev comparison outdated
  209. close(readyc)
  210. <-donec
  211. }
  212. stm.Put("foo2", "bar2")
  213. return nil
  214. }
  215. iso := concurrency.WithIsolation(concurrency.RepeatableReads)
  216. if _, err := concurrency.NewSTM(etcdc, applyf, iso); err != nil {
  217. t.Fatalf("error on stm txn (%v)", err)
  218. }
  219. if try != 2 {
  220. t.Fatalf("STM apply expected to run twice, got %d", try)
  221. }
  222. resp, err := etcdc.Get(context.TODO(), "foo2")
  223. if err != nil {
  224. t.Fatalf("error fetching key (%v)", err)
  225. }
  226. if string(resp.Kvs[0].Value) != "bar2" {
  227. t.Fatalf("bad value. got %+v, expected 'bar2' value", resp)
  228. }
  229. }
  230. func TestSTMSerializableSnapshotPut(t *testing.T) {
  231. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  232. defer clus.Terminate(t)
  233. cli := clus.Client(0)
  234. // key with lower create/mod revision than keys being updated
  235. _, err := cli.Put(context.TODO(), "a", "0")
  236. testutil.AssertNil(t, err)
  237. tries := 0
  238. applyf := func(stm concurrency.STM) error {
  239. if tries > 2 {
  240. return fmt.Errorf("too many retries")
  241. }
  242. tries++
  243. stm.Get("a")
  244. stm.Put("b", "1")
  245. return nil
  246. }
  247. iso := concurrency.WithIsolation(concurrency.SerializableSnapshot)
  248. _, err = concurrency.NewSTM(cli, applyf, iso)
  249. testutil.AssertNil(t, err)
  250. _, err = concurrency.NewSTM(cli, applyf, iso)
  251. testutil.AssertNil(t, err)
  252. resp, err := cli.Get(context.TODO(), "b")
  253. testutil.AssertNil(t, err)
  254. if resp.Kvs[0].Version != 2 {
  255. t.Fatalf("bad version. got %+v, expected version 2", resp)
  256. }
  257. }