barrier.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package recipe
  15. import (
  16. "context"
  17. v3 "go.etcd.io/etcd/clientv3"
  18. "go.etcd.io/etcd/mvcc/mvccpb"
  19. )
  20. // Barrier creates a key in etcd to block processes, then deletes the key to
  21. // release all blocked processes.
  22. type Barrier struct {
  23. client *v3.Client
  24. ctx context.Context
  25. key string
  26. }
  27. func NewBarrier(client *v3.Client, key string) *Barrier {
  28. return &Barrier{client, context.TODO(), key}
  29. }
  30. // Hold creates the barrier key causing processes to block on Wait.
  31. func (b *Barrier) Hold() error {
  32. _, err := newKey(b.client, b.key, v3.NoLease)
  33. return err
  34. }
  35. // Release deletes the barrier key to unblock all waiting processes.
  36. func (b *Barrier) Release() error {
  37. _, err := b.client.Delete(b.ctx, b.key)
  38. return err
  39. }
  40. // Wait blocks on the barrier key until it is deleted. If there is no key, Wait
  41. // assumes Release has already been called and returns immediately.
  42. func (b *Barrier) Wait() error {
  43. resp, err := b.client.Get(b.ctx, b.key, v3.WithFirstKey()...)
  44. if err != nil {
  45. return err
  46. }
  47. if len(resp.Kvs) == 0 {
  48. // key already removed
  49. return nil
  50. }
  51. _, err = WaitEvents(
  52. b.client,
  53. b.key,
  54. resp.Header.Revision,
  55. []mvccpb.Event_EventType{mvccpb.PUT, mvccpb.DELETE})
  56. return err
  57. }