barrier.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package recipe
  15. import (
  16. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  17. v3 "github.com/coreos/etcd/clientv3"
  18. "github.com/coreos/etcd/storage/storagepb"
  19. )
  20. // Barrier creates a key in etcd to block processes, then deletes the key to
  21. // release all blocked processes.
  22. type Barrier struct {
  23. client *v3.Client
  24. kv v3.KV
  25. ctx context.Context
  26. key string
  27. }
  28. func NewBarrier(client *v3.Client, key string) *Barrier {
  29. return &Barrier{client, v3.NewKV(client), context.TODO(), key}
  30. }
  31. // Hold creates the barrier key causing processes to block on Wait.
  32. func (b *Barrier) Hold() error {
  33. _, err := NewKey(b.kv, b.key, 0)
  34. return err
  35. }
  36. // Release deletes the barrier key to unblock all waiting processes.
  37. func (b *Barrier) Release() error {
  38. _, err := b.kv.Delete(b.ctx, b.key)
  39. return err
  40. }
  41. // Wait blocks on the barrier key until it is deleted. If there is no key, Wait
  42. // assumes Release has already been called and returns immediately.
  43. func (b *Barrier) Wait() error {
  44. resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...)
  45. if err != nil {
  46. return err
  47. }
  48. if len(resp.Kvs) == 0 {
  49. // key already removed
  50. return nil
  51. }
  52. _, err = WaitEvents(
  53. b.client,
  54. b.key,
  55. resp.Header.Revision,
  56. []storagepb.Event_EventType{storagepb.PUT, storagepb.DELETE})
  57. return err
  58. }