lock_racer_command.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "github.com/coreos/etcd/clientv3/concurrency"
  20. "github.com/spf13/cobra"
  21. )
  22. // NewLockRacerCommand returns the cobra command for "lock-racer runner".
  23. func NewLockRacerCommand() *cobra.Command {
  24. cmd := &cobra.Command{
  25. Use: "lock-racer [name of lock (defaults to 'racers')]",
  26. Short: "Performs lock race operation",
  27. Run: runRacerFunc,
  28. }
  29. cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
  30. return cmd
  31. }
  32. func runRacerFunc(cmd *cobra.Command, args []string) {
  33. racers := "racers"
  34. if len(args) == 1 {
  35. racers = args[0]
  36. }
  37. if len(args) > 1 {
  38. ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument"))
  39. }
  40. rcs := make([]roundClient, totalClientConnections)
  41. ctx := context.Background()
  42. cnt := 0
  43. eps := endpointsFromFlag(cmd)
  44. dialTimeout := dialTimeoutFromCmd(cmd)
  45. for i := range rcs {
  46. var (
  47. s *concurrency.Session
  48. err error
  49. )
  50. rcs[i].c = newClient(eps, dialTimeout)
  51. for {
  52. s, err = concurrency.NewSession(rcs[i].c)
  53. if err == nil {
  54. break
  55. }
  56. }
  57. m := concurrency.NewMutex(s, racers)
  58. rcs[i].acquire = func() error { return m.Lock(ctx) }
  59. rcs[i].validate = func() error {
  60. if cnt++; cnt != 1 {
  61. return fmt.Errorf("bad lock; count: %d", cnt)
  62. }
  63. return nil
  64. }
  65. rcs[i].release = func() error {
  66. if err := m.Unlock(ctx); err != nil {
  67. return err
  68. }
  69. cnt = 0
  70. return nil
  71. }
  72. }
  73. // each client creates 1 key from NewMutex() and delete it from Unlock()
  74. // a round involves in 2*len(rcs) requests.
  75. doRounds(rcs, rounds, 2*len(rcs))
  76. }