|
|
@@ -18,7 +18,7 @@ import (
|
|
|
"sync"
|
|
|
|
|
|
"golang.org/x/net/context"
|
|
|
- "golang.org/x/time/rate"
|
|
|
+ "google.golang.org/grpc"
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
|
@@ -31,50 +31,35 @@ type watchProxy struct {
|
|
|
cw clientv3.Watcher
|
|
|
ctx context.Context
|
|
|
|
|
|
- ranges *watchRanges
|
|
|
+ leader *leader
|
|
|
|
|
|
- // retryLimiter controls the create watch retry rate on lost leaders.
|
|
|
- retryLimiter *rate.Limiter
|
|
|
+ ranges *watchRanges
|
|
|
|
|
|
- // mu protects leaderc updates.
|
|
|
- mu sync.RWMutex
|
|
|
- leaderc chan struct{}
|
|
|
+ // mu protects adding outstanding watch servers through wg.
|
|
|
+ mu sync.Mutex
|
|
|
|
|
|
// wg waits until all outstanding watch servers quit.
|
|
|
wg sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
-const (
|
|
|
- lostLeaderKey = "__lostleader" // watched to detect leader loss
|
|
|
- retryPerSecond = 10
|
|
|
-)
|
|
|
-
|
|
|
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
|
|
+ cctx, cancel := context.WithCancel(c.Ctx())
|
|
|
wp := &watchProxy{
|
|
|
- cw: c.Watcher,
|
|
|
- ctx: c.Ctx(),
|
|
|
- retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
|
|
|
- leaderc: make(chan struct{}),
|
|
|
+ cw: c.Watcher,
|
|
|
+ ctx: cctx,
|
|
|
+ leader: newLeader(c.Ctx(), c.Watcher),
|
|
|
}
|
|
|
wp.ranges = newWatchRanges(wp)
|
|
|
ch := make(chan struct{})
|
|
|
go func() {
|
|
|
defer close(ch)
|
|
|
- // a new streams without opening any watchers won't catch
|
|
|
- // a lost leader event, so have a special watch to monitor it
|
|
|
- rev := int64((uint64(1) << 63) - 2)
|
|
|
- lctx := clientv3.WithRequireLeader(wp.ctx)
|
|
|
- for wp.ctx.Err() == nil {
|
|
|
- wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev))
|
|
|
- for range wch {
|
|
|
- }
|
|
|
- wp.mu.Lock()
|
|
|
- close(wp.leaderc)
|
|
|
- wp.leaderc = make(chan struct{})
|
|
|
- wp.mu.Unlock()
|
|
|
- wp.retryLimiter.Wait(wp.ctx)
|
|
|
- }
|
|
|
+ <-wp.leader.stopNotify()
|
|
|
wp.mu.Lock()
|
|
|
+ select {
|
|
|
+ case <-wp.ctx.Done():
|
|
|
+ case <-wp.leader.disconnectNotify():
|
|
|
+ cancel()
|
|
|
+ }
|
|
|
<-wp.ctx.Done()
|
|
|
wp.mu.Unlock()
|
|
|
wp.wg.Wait()
|
|
|
@@ -104,11 +89,19 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
cancel: cancel,
|
|
|
}
|
|
|
|
|
|
- var leaderc <-chan struct{}
|
|
|
+ var lostLeaderC <-chan struct{}
|
|
|
if md, ok := metadata.FromContext(stream.Context()); ok {
|
|
|
v := md[rpctypes.MetadataRequireLeaderKey]
|
|
|
if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
|
|
|
- leaderc = wp.lostLeaderNotify()
|
|
|
+ lostLeaderC = wp.leader.lostNotify()
|
|
|
+ // if leader is known to be lost at creation time, avoid
|
|
|
+ // letting events through at all
|
|
|
+ select {
|
|
|
+ case <-lostLeaderC:
|
|
|
+ wp.wg.Done()
|
|
|
+ return rpctypes.ErrNoLeader
|
|
|
+ default:
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -127,7 +120,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
go func() {
|
|
|
defer func() { stopc <- struct{}{} }()
|
|
|
select {
|
|
|
- case <-leaderc:
|
|
|
+ case <-lostLeaderC:
|
|
|
case <-ctx.Done():
|
|
|
case <-wp.ctx.Done():
|
|
|
}
|
|
|
@@ -146,19 +139,15 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
}()
|
|
|
|
|
|
select {
|
|
|
- case <-leaderc:
|
|
|
+ case <-lostLeaderC:
|
|
|
return rpctypes.ErrNoLeader
|
|
|
+ case <-wp.leader.disconnectNotify():
|
|
|
+ return grpc.ErrClientConnClosing
|
|
|
default:
|
|
|
return wps.ctx.Err()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (wp *watchProxy) lostLeaderNotify() <-chan struct{} {
|
|
|
- wp.mu.RLock()
|
|
|
- defer wp.mu.RUnlock()
|
|
|
- return wp.leaderc
|
|
|
-}
|
|
|
-
|
|
|
// watchProxyStream forwards etcd watch events to a proxied client stream.
|
|
|
type watchProxyStream struct {
|
|
|
ranges *watchRanges
|