lock_racer_command.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "sync"
  20. "github.com/coreos/etcd/clientv3/concurrency"
  21. "github.com/spf13/cobra"
  22. )
  23. // NewLockRacerCommand returns the cobra command for "lock-racer runner".
  24. func NewLockRacerCommand() *cobra.Command {
  25. cmd := &cobra.Command{
  26. Use: "lock-racer [name of lock (defaults to 'racers')]",
  27. Short: "Performs lock race operation",
  28. Run: runRacerFunc,
  29. }
  30. cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
  31. return cmd
  32. }
  33. func runRacerFunc(cmd *cobra.Command, args []string) {
  34. racers := "racers"
  35. if len(args) == 1 {
  36. racers = args[0]
  37. }
  38. if len(args) > 1 {
  39. ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument"))
  40. }
  41. rcs := make([]roundClient, totalClientConnections)
  42. ctx := context.Background()
  43. // mu ensures validate and release funcs are atomic.
  44. var mu sync.Mutex
  45. cnt := 0
  46. eps := endpointsFromFlag(cmd)
  47. dialTimeout := dialTimeoutFromCmd(cmd)
  48. for i := range rcs {
  49. var (
  50. s *concurrency.Session
  51. err error
  52. )
  53. rcs[i].c = newClient(eps, dialTimeout)
  54. for {
  55. s, err = concurrency.NewSession(rcs[i].c)
  56. if err == nil {
  57. break
  58. }
  59. }
  60. m := concurrency.NewMutex(s, racers)
  61. rcs[i].acquire = func() error { return m.Lock(ctx) }
  62. rcs[i].validate = func() error {
  63. mu.Lock()
  64. defer mu.Unlock()
  65. if cnt++; cnt != 1 {
  66. return fmt.Errorf("bad lock; count: %d", cnt)
  67. }
  68. return nil
  69. }
  70. rcs[i].release = func() error {
  71. mu.Lock()
  72. defer mu.Unlock()
  73. if err := m.Unlock(ctx); err != nil {
  74. return err
  75. }
  76. cnt = 0
  77. return nil
  78. }
  79. }
  80. // each client creates 1 key from NewMutex() and delete it from Unlock()
  81. // a round involves in 2*len(rcs) requests.
  82. doRounds(rcs, rounds, 2*len(rcs))
  83. }