lock_racer_command.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runner
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "sync"
  20. "github.com/coreos/etcd/clientv3/concurrency"
  21. "github.com/spf13/cobra"
  22. )
  23. // NewLockRacerCommand returns the cobra command for "lock-racer runner".
  24. func NewLockRacerCommand() *cobra.Command {
  25. cmd := &cobra.Command{
  26. Use: "lock-racer [name of lock (defaults to 'racers')]",
  27. Short: "Performs lock race operation",
  28. Run: runRacerFunc,
  29. }
  30. cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
  31. return cmd
  32. }
  33. func runRacerFunc(cmd *cobra.Command, args []string) {
  34. racers := "racers"
  35. if len(args) == 1 {
  36. racers = args[0]
  37. }
  38. if len(args) > 1 {
  39. ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument"))
  40. }
  41. rcs := make([]roundClient, totalClientConnections)
  42. ctx := context.Background()
  43. // mu ensures validate and release funcs are atomic.
  44. var mu sync.Mutex
  45. cnt := 0
  46. eps := endpointsFromFlag(cmd)
  47. for i := range rcs {
  48. var (
  49. s *concurrency.Session
  50. err error
  51. )
  52. rcs[i].c = newClient(eps, dialTimeout)
  53. for {
  54. s, err = concurrency.NewSession(rcs[i].c)
  55. if err == nil {
  56. break
  57. }
  58. }
  59. m := concurrency.NewMutex(s, racers)
  60. rcs[i].acquire = func() error { return m.Lock(ctx) }
  61. rcs[i].validate = func() error {
  62. mu.Lock()
  63. defer mu.Unlock()
  64. if cnt++; cnt != 1 {
  65. return fmt.Errorf("bad lock; count: %d", cnt)
  66. }
  67. return nil
  68. }
  69. rcs[i].release = func() error {
  70. mu.Lock()
  71. defer mu.Unlock()
  72. if err := m.Unlock(ctx); err != nil {
  73. return err
  74. }
  75. cnt = 0
  76. return nil
  77. }
  78. }
  79. // each client creates 1 key from NewMutex() and delete it from Unlock()
  80. // a round involves in 2*len(rcs) requests.
  81. doRounds(rcs, rounds, 2*len(rcs))
  82. }