|
|
@@ -122,23 +122,23 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
defer func() { stopc <- struct{}{} }()
|
|
|
wps.sendLoop()
|
|
|
}()
|
|
|
- if leaderc != nil {
|
|
|
- go func() {
|
|
|
- defer func() { stopc <- struct{}{} }()
|
|
|
- select {
|
|
|
- case <-leaderc:
|
|
|
- case <-ctx.Done():
|
|
|
- }
|
|
|
- }()
|
|
|
- }
|
|
|
+ // tear down watch if leader goes down or entire watch proxy is terminated
|
|
|
+ go func() {
|
|
|
+ defer func() { stopc <- struct{}{} }()
|
|
|
+ select {
|
|
|
+ case <-leaderc:
|
|
|
+ case <-ctx.Done():
|
|
|
+ case <-wp.ctx.Done():
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
<-stopc
|
|
|
+ cancel()
|
|
|
+
|
|
|
// recv/send may only shutdown after function exits;
|
|
|
// goroutine notifies proxy that stream is through
|
|
|
go func() {
|
|
|
- if leaderc != nil {
|
|
|
- <-stopc
|
|
|
- }
|
|
|
+ <-stopc
|
|
|
<-stopc
|
|
|
wps.close()
|
|
|
wp.wg.Done()
|