mutex.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package concurrency
  15. import (
  16. "sync"
  17. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  18. v3 "github.com/coreos/etcd/clientv3"
  19. )
  20. // Mutex implements the sync Locker interface with etcd
  21. type Mutex struct {
  22. client *v3.Client
  23. pfx string
  24. myKey string
  25. myRev int64
  26. }
  27. func NewMutex(client *v3.Client, pfx string) *Mutex {
  28. return &Mutex{client, pfx, "", -1}
  29. }
  30. // Lock locks the mutex with a cancellable context. If the context is cancelled
  31. // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
  32. func (m *Mutex) Lock(ctx context.Context) error {
  33. s, err := NewSession(m.client)
  34. if err != nil {
  35. return err
  36. }
  37. // put self in lock waiters via myKey; oldest waiter holds lock
  38. m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease()))
  39. // wait for deletion revisions prior to myKey
  40. err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
  41. // release lock key if cancelled
  42. select {
  43. case <-ctx.Done():
  44. m.Unlock()
  45. default:
  46. }
  47. return err
  48. }
  49. func (m *Mutex) Unlock() error {
  50. if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil {
  51. return err
  52. }
  53. m.myKey = "\x00"
  54. m.myRev = -1
  55. return nil
  56. }
  57. func (m *Mutex) IsOwner() v3.Cmp {
  58. return v3.Compare(v3.CreatedRevision(m.myKey), "=", m.myRev)
  59. }
  60. func (m *Mutex) Key() string { return m.myKey }
  61. type lockerMutex struct{ *Mutex }
  62. func (lm *lockerMutex) Lock() {
  63. if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
  64. panic(err)
  65. }
  66. }
  67. func (lm *lockerMutex) Unlock() {
  68. if err := lm.Mutex.Unlock(); err != nil {
  69. panic(err)
  70. }
  71. }
  72. // NewLocker creates a sync.Locker backed by an etcd mutex.
  73. func NewLocker(client *v3.Client, pfx string) sync.Locker {
  74. return &lockerMutex{NewMutex(client, pfx)}
  75. }