|
@@ -106,6 +106,7 @@ type watcher struct {
|
|
|
streams map[string]*watchGrpcStream
|
|
streams map[string]*watchGrpcStream
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
|
|
type watchGrpcStream struct {
|
|
type watchGrpcStream struct {
|
|
|
owner *watcher
|
|
owner *watcher
|
|
|
remote pb.WatchClient
|
|
remote pb.WatchClient
|
|
@@ -116,10 +117,10 @@ type watchGrpcStream struct {
|
|
|
ctxKey string
|
|
ctxKey string
|
|
|
cancel context.CancelFunc
|
|
cancel context.CancelFunc
|
|
|
|
|
|
|
|
- // mu protects the streams map
|
|
|
|
|
- mu sync.RWMutex
|
|
|
|
|
- // streams holds all active watchers
|
|
|
|
|
- streams map[int64]*watcherStream
|
|
|
|
|
|
|
+ // substreams holds all active watchers on this grpc stream
|
|
|
|
|
+ substreams map[int64]*watcherStream
|
|
|
|
|
+ // resuming holds all resuming watchers on this grpc stream
|
|
|
|
|
+ resuming []*watcherStream
|
|
|
|
|
|
|
|
// reqc sends a watch request from Watch() to the main goroutine
|
|
// reqc sends a watch request from Watch() to the main goroutine
|
|
|
reqc chan *watchRequest
|
|
reqc chan *watchRequest
|
|
@@ -134,7 +135,9 @@ type watchGrpcStream struct {
|
|
|
// closingc gets the watcherStream of closing watchers
|
|
// closingc gets the watcherStream of closing watchers
|
|
|
closingc chan *watcherStream
|
|
closingc chan *watcherStream
|
|
|
|
|
|
|
|
- // the error that closed the watch stream
|
|
|
|
|
|
|
+ // resumec closes to signal that all substreams should begin resuming
|
|
|
|
|
+ resumec chan struct{}
|
|
|
|
|
+ // closeErr is the error that closed the watch stream
|
|
|
closeErr error
|
|
closeErr error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -162,15 +165,18 @@ type watcherStream struct {
|
|
|
initReq watchRequest
|
|
initReq watchRequest
|
|
|
|
|
|
|
|
// outc publishes watch responses to subscriber
|
|
// outc publishes watch responses to subscriber
|
|
|
- outc chan<- WatchResponse
|
|
|
|
|
|
|
+ outc chan WatchResponse
|
|
|
// recvc buffers watch responses before publishing
|
|
// recvc buffers watch responses before publishing
|
|
|
recvc chan *WatchResponse
|
|
recvc chan *WatchResponse
|
|
|
- id int64
|
|
|
|
|
|
|
+ // donec closes when the watcherStream goroutine stops.
|
|
|
|
|
+ donec chan struct{}
|
|
|
|
|
+ // closing is set to true when stream should be scheduled to shutdown.
|
|
|
|
|
+ closing bool
|
|
|
|
|
+ // id is the registered watch id on the grpc stream
|
|
|
|
|
+ id int64
|
|
|
|
|
|
|
|
- // lastRev is revision last successfully sent over outc
|
|
|
|
|
- lastRev int64
|
|
|
|
|
- // resumec indicates the stream must recover at a given revision
|
|
|
|
|
- resumec chan int64
|
|
|
|
|
|
|
+ // buf holds all events received from etcd but not yet consumed by the client
|
|
|
|
|
+ buf []*WatchResponse
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func NewWatcher(c *Client) Watcher {
|
|
func NewWatcher(c *Client) Watcher {
|
|
@@ -198,12 +204,12 @@ func (vc *valCtx) Err() error { return nil }
|
|
|
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
|
ctx, cancel := context.WithCancel(&valCtx{inctx})
|
|
ctx, cancel := context.WithCancel(&valCtx{inctx})
|
|
|
wgs := &watchGrpcStream{
|
|
wgs := &watchGrpcStream{
|
|
|
- owner: w,
|
|
|
|
|
- remote: w.remote,
|
|
|
|
|
- ctx: ctx,
|
|
|
|
|
- ctxKey: fmt.Sprintf("%v", inctx),
|
|
|
|
|
- cancel: cancel,
|
|
|
|
|
- streams: make(map[int64]*watcherStream),
|
|
|
|
|
|
|
+ owner: w,
|
|
|
|
|
+ remote: w.remote,
|
|
|
|
|
+ ctx: ctx,
|
|
|
|
|
+ ctxKey: fmt.Sprintf("%v", inctx),
|
|
|
|
|
+ cancel: cancel,
|
|
|
|
|
+ substreams: make(map[int64]*watcherStream),
|
|
|
|
|
|
|
|
respc: make(chan *pb.WatchResponse),
|
|
respc: make(chan *pb.WatchResponse),
|
|
|
reqc: make(chan *watchRequest),
|
|
reqc: make(chan *watchRequest),
|
|
@@ -211,6 +217,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
|
donec: make(chan struct{}),
|
|
donec: make(chan struct{}),
|
|
|
errc: make(chan error, 1),
|
|
errc: make(chan error, 1),
|
|
|
closingc: make(chan *watcherStream),
|
|
closingc: make(chan *watcherStream),
|
|
|
|
|
+ resumec: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
go wgs.run()
|
|
go wgs.run()
|
|
|
return wgs
|
|
return wgs
|
|
@@ -220,8 +227,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|
|
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
|
|
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
|
|
|
ow := opWatch(key, opts...)
|
|
ow := opWatch(key, opts...)
|
|
|
|
|
|
|
|
- retc := make(chan chan WatchResponse, 1)
|
|
|
|
|
-
|
|
|
|
|
var filters []pb.WatchCreateRequest_FilterType
|
|
var filters []pb.WatchCreateRequest_FilterType
|
|
|
if ow.filterPut {
|
|
if ow.filterPut {
|
|
|
filters = append(filters, pb.WatchCreateRequest_NOPUT)
|
|
filters = append(filters, pb.WatchCreateRequest_NOPUT)
|
|
@@ -239,7 +244,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
|
|
progressNotify: ow.progressNotify,
|
|
progressNotify: ow.progressNotify,
|
|
|
filters: filters,
|
|
filters: filters,
|
|
|
prevKV: ow.prevKV,
|
|
prevKV: ow.prevKV,
|
|
|
- retc: retc,
|
|
|
|
|
|
|
+ retc: make(chan chan WatchResponse, 1),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
ok := false
|
|
ok := false
|
|
@@ -283,7 +288,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
|
|
// receive channel
|
|
// receive channel
|
|
|
if ok {
|
|
if ok {
|
|
|
select {
|
|
select {
|
|
|
- case ret := <-retc:
|
|
|
|
|
|
|
+ case ret := <-wr.retc:
|
|
|
return ret
|
|
return ret
|
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
|
case <-donec:
|
|
case <-donec:
|
|
@@ -314,12 +319,7 @@ func (w *watcher) Close() (err error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (w *watchGrpcStream) Close() (err error) {
|
|
func (w *watchGrpcStream) Close() (err error) {
|
|
|
- w.mu.Lock()
|
|
|
|
|
- if w.stopc != nil {
|
|
|
|
|
- close(w.stopc)
|
|
|
|
|
- w.stopc = nil
|
|
|
|
|
- }
|
|
|
|
|
- w.mu.Unlock()
|
|
|
|
|
|
|
+ close(w.stopc)
|
|
|
<-w.donec
|
|
<-w.donec
|
|
|
select {
|
|
select {
|
|
|
case err = <-w.errc:
|
|
case err = <-w.errc:
|
|
@@ -328,71 +328,57 @@ func (w *watchGrpcStream) Close() (err error) {
|
|
|
return toErr(w.ctx, err)
|
|
return toErr(w.ctx, err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
|
|
|
|
|
- if pendingReq == nil {
|
|
|
|
|
- // no pending request; ignore
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- if resp.Canceled || resp.CompactRevision != 0 {
|
|
|
|
|
- // a cancel at id creation time means the start revision has
|
|
|
|
|
- // been compacted out of the store
|
|
|
|
|
- ret := make(chan WatchResponse, 1)
|
|
|
|
|
- ret <- WatchResponse{
|
|
|
|
|
- Header: *resp.Header,
|
|
|
|
|
- CompactRevision: resp.CompactRevision,
|
|
|
|
|
- Canceled: true}
|
|
|
|
|
- close(ret)
|
|
|
|
|
- pendingReq.retc <- ret
|
|
|
|
|
- return
|
|
|
|
|
|
|
+func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
|
|
|
|
+ w.mu.Lock()
|
|
|
|
|
+ close(wgs.donec)
|
|
|
|
|
+ wgs.cancel()
|
|
|
|
|
+ if w.streams != nil {
|
|
|
|
|
+ delete(w.streams, wgs.ctxKey)
|
|
|
}
|
|
}
|
|
|
|
|
+ w.mu.Unlock()
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- ret := make(chan WatchResponse)
|
|
|
|
|
|
|
+func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
|
|
if resp.WatchId == -1 {
|
|
if resp.WatchId == -1 {
|
|
|
// failed; no channel
|
|
// failed; no channel
|
|
|
- close(ret)
|
|
|
|
|
- pendingReq.retc <- ret
|
|
|
|
|
|
|
+ close(ws.recvc)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
+ ws.id = resp.WatchId
|
|
|
|
|
+ w.substreams[ws.id] = ws
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- ws := &watcherStream{
|
|
|
|
|
- initReq: *pendingReq,
|
|
|
|
|
- id: resp.WatchId,
|
|
|
|
|
- outc: ret,
|
|
|
|
|
- // buffered so unlikely to block on sending while holding mu
|
|
|
|
|
- recvc: make(chan *WatchResponse, 4),
|
|
|
|
|
- resumec: make(chan int64),
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if pendingReq.rev == 0 {
|
|
|
|
|
- // note the header revision so that a put following a current watcher
|
|
|
|
|
- // disconnect will arrive on the watcher channel after reconnect
|
|
|
|
|
- ws.initReq.rev = resp.Header.Revision
|
|
|
|
|
|
|
+func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case ws.outc <- *resp:
|
|
|
|
|
+ case <-ws.initReq.ctx.Done():
|
|
|
|
|
+ case <-time.After(closeSendErrTimeout):
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- w.mu.Lock()
|
|
|
|
|
- w.streams[ws.id] = ws
|
|
|
|
|
- w.mu.Unlock()
|
|
|
|
|
-
|
|
|
|
|
- // pass back the subscriber channel for the watcher
|
|
|
|
|
- pendingReq.retc <- ret
|
|
|
|
|
-
|
|
|
|
|
- // send messages to subscriber
|
|
|
|
|
- go w.serveStream(ws)
|
|
|
|
|
|
|
+ close(ws.outc)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
|
|
|
|
|
- w.mu.Lock()
|
|
|
|
|
- // cancels request stream; subscriber receives nil channel
|
|
|
|
|
- close(ws.initReq.retc)
|
|
|
|
|
|
|
+func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
|
|
|
|
+ // send channel response in case stream was never established
|
|
|
|
|
+ select {
|
|
|
|
|
+ case ws.initReq.retc <- ws.outc:
|
|
|
|
|
+ default:
|
|
|
|
|
+ }
|
|
|
// close subscriber's channel
|
|
// close subscriber's channel
|
|
|
- close(ws.outc)
|
|
|
|
|
- delete(w.streams, ws.id)
|
|
|
|
|
- empty := len(w.streams) == 0
|
|
|
|
|
- if empty && w.stopc != nil {
|
|
|
|
|
- w.stopc = nil
|
|
|
|
|
|
|
+ if closeErr := w.closeErr; closeErr != nil {
|
|
|
|
|
+ go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
|
|
|
|
|
+ } else {
|
|
|
|
|
+ close(ws.outc)
|
|
|
|
|
+ }
|
|
|
|
|
+ if ws.id != -1 {
|
|
|
|
|
+ delete(w.substreams, ws.id)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ for i := range w.resuming {
|
|
|
|
|
+ if w.resuming[i] == ws {
|
|
|
|
|
+ w.resuming[i] = nil
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- w.mu.Unlock()
|
|
|
|
|
- return empty
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// run is the root of the goroutines for managing a watcher client
|
|
// run is the root of the goroutines for managing a watcher client
|
|
@@ -400,67 +386,79 @@ func (w *watchGrpcStream) run() {
|
|
|
var wc pb.Watch_WatchClient
|
|
var wc pb.Watch_WatchClient
|
|
|
var closeErr error
|
|
var closeErr error
|
|
|
|
|
|
|
|
|
|
+ // substreams marked to close but goroutine still running; needed for
|
|
|
|
|
+ // avoiding double-closing recvc on grpc stream teardown
|
|
|
|
|
+ closing := make(map[*watcherStream]struct{})
|
|
|
|
|
+
|
|
|
defer func() {
|
|
defer func() {
|
|
|
- w.owner.mu.Lock()
|
|
|
|
|
w.closeErr = closeErr
|
|
w.closeErr = closeErr
|
|
|
- if w.owner.streams != nil {
|
|
|
|
|
- delete(w.owner.streams, w.ctxKey)
|
|
|
|
|
|
|
+ // shutdown substreams and resuming substreams
|
|
|
|
|
+ for _, ws := range w.substreams {
|
|
|
|
|
+ if _, ok := closing[ws]; !ok {
|
|
|
|
|
+ close(ws.recvc)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, ws := range w.resuming {
|
|
|
|
|
+ if _, ok := closing[ws]; ws != nil && !ok {
|
|
|
|
|
+ close(ws.recvc)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ w.joinSubstreams()
|
|
|
|
|
+ for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
|
|
|
|
|
+ w.closeSubstream(<-w.closingc)
|
|
|
}
|
|
}
|
|
|
- close(w.donec)
|
|
|
|
|
- w.owner.mu.Unlock()
|
|
|
|
|
- w.cancel()
|
|
|
|
|
- }()
|
|
|
|
|
|
|
|
|
|
- // already stopped?
|
|
|
|
|
- w.mu.RLock()
|
|
|
|
|
- stopc := w.stopc
|
|
|
|
|
- w.mu.RUnlock()
|
|
|
|
|
- if stopc == nil {
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ w.owner.closeStream(w)
|
|
|
|
|
+ }()
|
|
|
|
|
|
|
|
// start a stream with the etcd grpc server
|
|
// start a stream with the etcd grpc server
|
|
|
if wc, closeErr = w.newWatchClient(); closeErr != nil {
|
|
if wc, closeErr = w.newWatchClient(); closeErr != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- var pendingReq, failedReq *watchRequest
|
|
|
|
|
- curReqC := w.reqc
|
|
|
|
|
cancelSet := make(map[int64]struct{})
|
|
cancelSet := make(map[int64]struct{})
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
// Watch() requested
|
|
// Watch() requested
|
|
|
- case pendingReq = <-curReqC:
|
|
|
|
|
- // no more watch requests until there's a response
|
|
|
|
|
- curReqC = nil
|
|
|
|
|
- if err := wc.Send(pendingReq.toPB()); err == nil {
|
|
|
|
|
- // pendingReq now waits on w.respc
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ case wreq := <-w.reqc:
|
|
|
|
|
+ outc := make(chan WatchResponse, 1)
|
|
|
|
|
+ ws := &watcherStream{
|
|
|
|
|
+ initReq: *wreq,
|
|
|
|
|
+ id: -1,
|
|
|
|
|
+ outc: outc,
|
|
|
|
|
+ // unbufffered so resumes won't cause repeat events
|
|
|
|
|
+ recvc: make(chan *WatchResponse),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ws.donec = make(chan struct{})
|
|
|
|
|
+ go w.serveSubstream(ws, w.resumec)
|
|
|
|
|
+
|
|
|
|
|
+ // queue up for watcher creation/resume
|
|
|
|
|
+ w.resuming = append(w.resuming, ws)
|
|
|
|
|
+ if len(w.resuming) == 1 {
|
|
|
|
|
+ // head of resume queue, can register a new watcher
|
|
|
|
|
+ wc.Send(ws.initReq.toPB())
|
|
|
}
|
|
}
|
|
|
- failedReq = pendingReq
|
|
|
|
|
// New events from the watch client
|
|
// New events from the watch client
|
|
|
case pbresp := <-w.respc:
|
|
case pbresp := <-w.respc:
|
|
|
switch {
|
|
switch {
|
|
|
case pbresp.Created:
|
|
case pbresp.Created:
|
|
|
- // response to pending req, try to add
|
|
|
|
|
- w.addStream(pbresp, pendingReq)
|
|
|
|
|
- pendingReq = nil
|
|
|
|
|
- curReqC = w.reqc
|
|
|
|
|
- w.dispatchEvent(pbresp)
|
|
|
|
|
|
|
+ // response to head of queue creation
|
|
|
|
|
+ if ws := w.resuming[0]; ws != nil {
|
|
|
|
|
+ w.addSubstream(pbresp, ws)
|
|
|
|
|
+ w.dispatchEvent(pbresp)
|
|
|
|
|
+ w.resuming[0] = nil
|
|
|
|
|
+ }
|
|
|
|
|
+ if ws := w.nextResume(); ws != nil {
|
|
|
|
|
+ wc.Send(ws.initReq.toPB())
|
|
|
|
|
+ }
|
|
|
case pbresp.Canceled:
|
|
case pbresp.Canceled:
|
|
|
delete(cancelSet, pbresp.WatchId)
|
|
delete(cancelSet, pbresp.WatchId)
|
|
|
- // shutdown serveStream, if any
|
|
|
|
|
- w.mu.Lock()
|
|
|
|
|
- if ws, ok := w.streams[pbresp.WatchId]; ok {
|
|
|
|
|
|
|
+ if ws, ok := w.substreams[pbresp.WatchId]; ok {
|
|
|
|
|
+ // signal to stream goroutine to update closingc
|
|
|
close(ws.recvc)
|
|
close(ws.recvc)
|
|
|
- delete(w.streams, ws.id)
|
|
|
|
|
- }
|
|
|
|
|
- numStreams := len(w.streams)
|
|
|
|
|
- w.mu.Unlock()
|
|
|
|
|
- if numStreams == 0 {
|
|
|
|
|
- // don't leak watcher streams
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ closing[ws] = struct{}{}
|
|
|
}
|
|
}
|
|
|
default:
|
|
default:
|
|
|
// dispatch to appropriate watch stream
|
|
// dispatch to appropriate watch stream
|
|
@@ -481,7 +479,6 @@ func (w *watchGrpcStream) run() {
|
|
|
wc.Send(req)
|
|
wc.Send(req)
|
|
|
}
|
|
}
|
|
|
// watch client failed to recv; spawn another if possible
|
|
// watch client failed to recv; spawn another if possible
|
|
|
- // TODO report watch client errors from errc?
|
|
|
|
|
case err := <-w.errc:
|
|
case err := <-w.errc:
|
|
|
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
|
|
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
|
|
|
closeErr = err
|
|
closeErr = err
|
|
@@ -490,43 +487,41 @@ func (w *watchGrpcStream) run() {
|
|
|
if wc, closeErr = w.newWatchClient(); closeErr != nil {
|
|
if wc, closeErr = w.newWatchClient(); closeErr != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- curReqC = w.reqc
|
|
|
|
|
- if pendingReq != nil {
|
|
|
|
|
- failedReq = pendingReq
|
|
|
|
|
|
|
+ if ws := w.nextResume(); ws != nil {
|
|
|
|
|
+ wc.Send(ws.initReq.toPB())
|
|
|
}
|
|
}
|
|
|
cancelSet = make(map[int64]struct{})
|
|
cancelSet = make(map[int64]struct{})
|
|
|
- case <-stopc:
|
|
|
|
|
|
|
+ case <-w.stopc:
|
|
|
return
|
|
return
|
|
|
case ws := <-w.closingc:
|
|
case ws := <-w.closingc:
|
|
|
- if w.closeStream(ws) {
|
|
|
|
|
|
|
+ w.closeSubstream(ws)
|
|
|
|
|
+ delete(closing, ws)
|
|
|
|
|
+ if len(w.substreams)+len(w.resuming) == 0 {
|
|
|
|
|
+ // no more watchers on this stream, shutdown
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // send failed; queue for retry
|
|
|
|
|
- if failedReq != nil {
|
|
|
|
|
- go func(wr *watchRequest) {
|
|
|
|
|
- select {
|
|
|
|
|
- case w.reqc <- wr:
|
|
|
|
|
- case <-wr.ctx.Done():
|
|
|
|
|
- case <-w.donec:
|
|
|
|
|
- }
|
|
|
|
|
- }(pendingReq)
|
|
|
|
|
- failedReq = nil
|
|
|
|
|
- pendingReq = nil
|
|
|
|
|
|
|
+// nextResume chooses the next resuming to register with the grpc stream. Abandoned
|
|
|
|
|
+// streams are marked as nil in the queue since the head must wait for its inflight registration.
|
|
|
|
|
+func (w *watchGrpcStream) nextResume() *watcherStream {
|
|
|
|
|
+ for len(w.resuming) != 0 {
|
|
|
|
|
+ if w.resuming[0] != nil {
|
|
|
|
|
+ return w.resuming[0]
|
|
|
}
|
|
}
|
|
|
|
|
+ w.resuming = w.resuming[1:len(w.resuming)]
|
|
|
}
|
|
}
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
|
|
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
|
|
|
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
|
- w.mu.RLock()
|
|
|
|
|
- defer w.mu.RUnlock()
|
|
|
|
|
- ws, ok := w.streams[pbresp.WatchId]
|
|
|
|
|
|
|
+ ws, ok := w.substreams[pbresp.WatchId]
|
|
|
if !ok {
|
|
if !ok {
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
events := make([]*Event, len(pbresp.Events))
|
|
events := make([]*Event, len(pbresp.Events))
|
|
|
for i, ev := range pbresp.Events {
|
|
for i, ev := range pbresp.Events {
|
|
|
events[i] = (*Event)(ev)
|
|
events[i] = (*Event)(ev)
|
|
@@ -538,7 +533,11 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
|
Created: pbresp.Created,
|
|
Created: pbresp.Created,
|
|
|
Canceled: pbresp.Canceled,
|
|
Canceled: pbresp.Canceled,
|
|
|
}
|
|
}
|
|
|
- ws.recvc <- wr
|
|
|
|
|
|
|
+ select {
|
|
|
|
|
+ case ws.recvc <- wr:
|
|
|
|
|
+ case <-ws.donec:
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
return true
|
|
return true
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -561,140 +560,126 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// serveStream forwards watch responses from run() to the subscriber
|
|
|
|
|
-func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
|
|
|
|
|
|
+// serveSubstream forwards watch responses from run() to the subscriber
|
|
|
|
|
+func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
|
|
|
|
|
+ if ws.closing {
|
|
|
|
|
+ panic("created substream goroutine but substream is closing")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // nextRev is the minimum expected next revision
|
|
|
|
|
+ nextRev := ws.initReq.rev
|
|
|
|
|
+ resuming := false
|
|
|
defer func() {
|
|
defer func() {
|
|
|
- // signal that this watcherStream is finished
|
|
|
|
|
- select {
|
|
|
|
|
- case w.closingc <- ws:
|
|
|
|
|
- case <-w.donec:
|
|
|
|
|
- w.closeStream(ws)
|
|
|
|
|
|
|
+ if !resuming {
|
|
|
|
|
+ ws.closing = true
|
|
|
|
|
+ }
|
|
|
|
|
+ ws.initReq.rev = nextRev
|
|
|
|
|
+ close(ws.donec)
|
|
|
|
|
+ if !resuming {
|
|
|
|
|
+ w.closingc <- ws
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
- var closeErr error
|
|
|
|
|
emptyWr := &WatchResponse{}
|
|
emptyWr := &WatchResponse{}
|
|
|
- wrs := []*WatchResponse{}
|
|
|
|
|
- resuming := false
|
|
|
|
|
- closing := false
|
|
|
|
|
- for !closing {
|
|
|
|
|
|
|
+ for {
|
|
|
curWr := emptyWr
|
|
curWr := emptyWr
|
|
|
outc := ws.outc
|
|
outc := ws.outc
|
|
|
|
|
|
|
|
- // ignore created event if create notify is not requested or
|
|
|
|
|
- // we already sent the initial created event (when we are on the resume path).
|
|
|
|
|
- if len(wrs) > 0 && wrs[0].Created &&
|
|
|
|
|
- (!ws.initReq.createdNotify || ws.lastRev != 0) {
|
|
|
|
|
- wrs = wrs[1:]
|
|
|
|
|
|
|
+ if len(ws.buf) > 0 && ws.buf[0].Created {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case ws.initReq.retc <- ws.outc:
|
|
|
|
|
+ // send first creation event and only if requested
|
|
|
|
|
+ if !ws.initReq.createdNotify {
|
|
|
|
|
+ ws.buf = ws.buf[1:]
|
|
|
|
|
+ }
|
|
|
|
|
+ default:
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if len(wrs) > 0 {
|
|
|
|
|
- curWr = wrs[0]
|
|
|
|
|
|
|
+ if len(ws.buf) > 0 {
|
|
|
|
|
+ curWr = ws.buf[0]
|
|
|
} else {
|
|
} else {
|
|
|
outc = nil
|
|
outc = nil
|
|
|
}
|
|
}
|
|
|
select {
|
|
select {
|
|
|
case outc <- *curWr:
|
|
case outc <- *curWr:
|
|
|
- if wrs[0].Err() != nil {
|
|
|
|
|
- closing = true
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- var newRev int64
|
|
|
|
|
- if len(wrs[0].Events) > 0 {
|
|
|
|
|
- newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
|
|
|
|
|
- } else {
|
|
|
|
|
- newRev = wrs[0].Header.Revision
|
|
|
|
|
- }
|
|
|
|
|
- if newRev != ws.lastRev {
|
|
|
|
|
- ws.lastRev = newRev
|
|
|
|
|
|
|
+ if ws.buf[0].Err() != nil {
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
- wrs[0] = nil
|
|
|
|
|
- wrs = wrs[1:]
|
|
|
|
|
|
|
+ ws.buf[0] = nil
|
|
|
|
|
+ ws.buf = ws.buf[1:]
|
|
|
case wr, ok := <-ws.recvc:
|
|
case wr, ok := <-ws.recvc:
|
|
|
if !ok {
|
|
if !ok {
|
|
|
- // shutdown from closeStream
|
|
|
|
|
|
|
+ // shutdown from closeSubstream
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- // resume up to last seen event if disconnected
|
|
|
|
|
- if resuming && wr.Err() == nil {
|
|
|
|
|
- resuming = false
|
|
|
|
|
- // trim events already seen
|
|
|
|
|
- for i := 0; i < len(wr.Events); i++ {
|
|
|
|
|
- if wr.Events[i].Kv.ModRevision > ws.lastRev {
|
|
|
|
|
- wr.Events = wr.Events[i:]
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- // only forward new events
|
|
|
|
|
- if wr.Events[0].Kv.ModRevision == ws.lastRev {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- resuming = false
|
|
|
|
|
- // TODO don't keep buffering if subscriber stops reading
|
|
|
|
|
- wrs = append(wrs, wr)
|
|
|
|
|
- case resumeRev := <-ws.resumec:
|
|
|
|
|
- wrs = nil
|
|
|
|
|
- resuming = true
|
|
|
|
|
- if resumeRev == -1 {
|
|
|
|
|
- // pause serving stream while resume gets set up
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- if resumeRev != ws.lastRev {
|
|
|
|
|
- panic("unexpected resume revision")
|
|
|
|
|
|
|
+ // TODO pause channel if buffer gets too large
|
|
|
|
|
+ ws.buf = append(ws.buf, wr)
|
|
|
|
|
+ nextRev = wr.Header.Revision
|
|
|
|
|
+ if len(wr.Events) > 0 {
|
|
|
|
|
+ nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
|
|
}
|
|
}
|
|
|
- case <-w.donec:
|
|
|
|
|
- closing = true
|
|
|
|
|
- closeErr = w.closeErr
|
|
|
|
|
case <-ws.initReq.ctx.Done():
|
|
case <-ws.initReq.ctx.Done():
|
|
|
- closing = true
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // try to send off close error
|
|
|
|
|
- if closeErr != nil {
|
|
|
|
|
- select {
|
|
|
|
|
- case ws.outc <- WatchResponse{closeErr: w.closeErr}:
|
|
|
|
|
- case <-w.donec:
|
|
|
|
|
- case <-time.After(closeSendErrTimeout):
|
|
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-resumec:
|
|
|
|
|
+ resuming = true
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
// lazily send cancel message if events on missing id
|
|
// lazily send cancel message if events on missing id
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|
|
- ws, rerr := w.resume()
|
|
|
|
|
- if rerr != nil {
|
|
|
|
|
- return nil, rerr
|
|
|
|
|
|
|
+ // connect to grpc stream
|
|
|
|
|
+ wc, err := w.openWatchClient()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, v3rpc.Error(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ // mark all substreams as resuming
|
|
|
|
|
+ if len(w.substreams)+len(w.resuming) > 0 {
|
|
|
|
|
+ close(w.resumec)
|
|
|
|
|
+ w.resumec = make(chan struct{})
|
|
|
|
|
+ w.joinSubstreams()
|
|
|
|
|
+ for _, ws := range w.substreams {
|
|
|
|
|
+ ws.id = -1
|
|
|
|
|
+ w.resuming = append(w.resuming, ws)
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, ws := range w.resuming {
|
|
|
|
|
+ if ws == nil || ws.closing {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ ws.donec = make(chan struct{})
|
|
|
|
|
+ go w.serveSubstream(ws, w.resumec)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- go w.serveWatchClient(ws)
|
|
|
|
|
- return ws, nil
|
|
|
|
|
|
|
+ w.substreams = make(map[int64]*watcherStream)
|
|
|
|
|
+ // receive data from new grpc stream
|
|
|
|
|
+ go w.serveWatchClient(wc)
|
|
|
|
|
+ return wc, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// resume creates a new WatchClient with all current watchers reestablished
|
|
|
|
|
-func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
|
|
|
|
|
- for {
|
|
|
|
|
- if ws, err = w.openWatchClient(); err != nil {
|
|
|
|
|
- break
|
|
|
|
|
- } else if err = w.resumeWatchers(ws); err == nil {
|
|
|
|
|
- break
|
|
|
|
|
|
|
+// joinSubstream waits for all substream goroutines to complete
|
|
|
|
|
+func (w *watchGrpcStream) joinSubstreams() {
|
|
|
|
|
+ for _, ws := range w.substreams {
|
|
|
|
|
+ <-ws.donec
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, ws := range w.resuming {
|
|
|
|
|
+ if ws != nil {
|
|
|
|
|
+ <-ws.donec
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return ws, v3rpc.Error(err)
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// openWatchClient retries opening a watchclient until retryConnection fails
|
|
// openWatchClient retries opening a watchclient until retryConnection fails
|
|
|
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
|
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
|
|
for {
|
|
for {
|
|
|
- w.mu.Lock()
|
|
|
|
|
- stopc := w.stopc
|
|
|
|
|
- w.mu.Unlock()
|
|
|
|
|
- if stopc == nil {
|
|
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-w.stopc:
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
- err = context.Canceled
|
|
|
|
|
|
|
+ return nil, context.Canceled
|
|
|
}
|
|
}
|
|
|
return nil, err
|
|
return nil, err
|
|
|
|
|
+ default:
|
|
|
}
|
|
}
|
|
|
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
|
|
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
|
|
|
break
|
|
break
|
|
@@ -706,63 +691,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
|
|
|
return ws, nil
|
|
return ws, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// resumeWatchers rebuilds every registered watcher on a new client
|
|
|
|
|
-func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|
|
|
|
- w.mu.RLock()
|
|
|
|
|
- streams := make([]*watcherStream, 0, len(w.streams))
|
|
|
|
|
- for _, ws := range w.streams {
|
|
|
|
|
- streams = append(streams, ws)
|
|
|
|
|
- }
|
|
|
|
|
- w.mu.RUnlock()
|
|
|
|
|
-
|
|
|
|
|
- for _, ws := range streams {
|
|
|
|
|
- // drain recvc so no old WatchResponses (e.g., Created messages)
|
|
|
|
|
- // are processed while resuming
|
|
|
|
|
- ws.drain()
|
|
|
|
|
-
|
|
|
|
|
- // pause serveStream
|
|
|
|
|
- ws.resumec <- -1
|
|
|
|
|
-
|
|
|
|
|
- // reconstruct watcher from initial request
|
|
|
|
|
- if ws.lastRev != 0 {
|
|
|
|
|
- ws.initReq.rev = ws.lastRev
|
|
|
|
|
- }
|
|
|
|
|
- if err := wc.Send(ws.initReq.toPB()); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // wait for request ack
|
|
|
|
|
- resp, err := wc.Recv()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- } else if len(resp.Events) != 0 || !resp.Created {
|
|
|
|
|
- return fmt.Errorf("watcher: unexpected response (%+v)", resp)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // id may be different since new remote watcher; update map
|
|
|
|
|
- w.mu.Lock()
|
|
|
|
|
- delete(w.streams, ws.id)
|
|
|
|
|
- ws.id = resp.WatchId
|
|
|
|
|
- w.streams[ws.id] = ws
|
|
|
|
|
- w.mu.Unlock()
|
|
|
|
|
-
|
|
|
|
|
- // unpause serveStream
|
|
|
|
|
- ws.resumec <- ws.lastRev
|
|
|
|
|
- }
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// drain removes all buffered WatchResponses from the stream's receive channel.
|
|
|
|
|
-func (ws *watcherStream) drain() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-ws.recvc:
|
|
|
|
|
- default:
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
|
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
|
|
req := &pb.WatchCreateRequest{
|
|
req := &pb.WatchCreateRequest{
|