example_stm_test.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package concurrency_test
  15. import (
  16. "context"
  17. "fmt"
  18. "log"
  19. "math/rand"
  20. "sync"
  21. "go.etcd.io/etcd/clientv3"
  22. "go.etcd.io/etcd/clientv3/concurrency"
  23. )
  24. // ExampleSTM_apply shows how to use STM with a transactional
  25. // transfer between balances.
  26. func ExampleSTM_apply() {
  27. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
  28. if err != nil {
  29. log.Fatal(err)
  30. }
  31. defer cli.Close()
  32. // set up "accounts"
  33. totalAccounts := 5
  34. for i := 0; i < totalAccounts; i++ {
  35. k := fmt.Sprintf("accts/%d", i)
  36. if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
  37. log.Fatal(err)
  38. }
  39. }
  40. exchange := func(stm concurrency.STM) error {
  41. from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
  42. if from == to {
  43. // nothing to do
  44. return nil
  45. }
  46. // read values
  47. fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
  48. fromV, toV := stm.Get(fromK), stm.Get(toK)
  49. fromInt, toInt := 0, 0
  50. fmt.Sscanf(fromV, "%d", &fromInt)
  51. fmt.Sscanf(toV, "%d", &toInt)
  52. // transfer amount
  53. xfer := fromInt / 2
  54. fromInt, toInt = fromInt-xfer, toInt+xfer
  55. // write back
  56. stm.Put(fromK, fmt.Sprintf("%d", fromInt))
  57. stm.Put(toK, fmt.Sprintf("%d", toInt))
  58. return nil
  59. }
  60. // concurrently exchange values between accounts
  61. var wg sync.WaitGroup
  62. wg.Add(10)
  63. for i := 0; i < 10; i++ {
  64. go func() {
  65. defer wg.Done()
  66. if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
  67. log.Fatal(serr)
  68. }
  69. }()
  70. }
  71. wg.Wait()
  72. // confirm account sum matches sum from beginning.
  73. sum := 0
  74. accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
  75. if err != nil {
  76. log.Fatal(err)
  77. }
  78. for _, kv := range accts.Kvs {
  79. v := 0
  80. fmt.Sscanf(string(kv.Value), "%d", &v)
  81. sum += v
  82. }
  83. fmt.Println("account sum is", sum)
  84. // Output:
  85. // account sum is 500
  86. }