123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package clientv3
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- )
- const (
- EventTypeDelete = mvccpb.DELETE
- EventTypePut = mvccpb.PUT
- closeSendErrTimeout = 250 * time.Millisecond
- )
- type Event mvccpb.Event
- type WatchChan <-chan WatchResponse
- type Watcher interface {
- // Watch watches on a key or prefix. The watched events will be returned
- // through the returned channel. If revisions waiting to be sent over the
- // watch are compacted, then the watch will be canceled by the server, the
- // client will post a compacted error watch response, and the channel will close.
- // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
- // and "WatchResponse" from this closed channel has zero events and nil "Err()".
- // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
- // to release the associated resources.
- //
- // If the context is "context.Background/TODO", returned "WatchChan" will
- // not be closed and block until event is triggered, except when server
- // returns a non-recoverable error (e.g. ErrCompacted).
- // For example, when context passed with "WithRequireLeader" and the
- // connected server has no leader (e.g. due to network partition),
- // error "etcdserver: no leader" (ErrNoLeader) will be returned,
- // and then "WatchChan" is closed with non-nil "Err()".
- // In order to prevent a watch stream being stuck in a partitioned node,
- // make sure to wrap context with "WithRequireLeader".
- //
- // Otherwise, as long as the context has not been canceled or timed out,
- // watch will retry on other recoverable errors forever until reconnected.
- //
- // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
- // Currently, client contexts are overwritten with "valCtx" that never closes.
- // TODO(v3.4): configure watch retry policy, limit maximum retry number
- // (see https://github.com/etcd-io/etcd/issues/8980)
- Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
- // RequestProgress requests a progress notify response be sent in all watch channels.
- RequestProgress(ctx context.Context) error
- // Close closes the watcher and cancels all watch requests.
- Close() error
- }
- type WatchResponse struct {
- Header pb.ResponseHeader
- Events []*Event
- // CompactRevision is the minimum revision the watcher may receive.
- CompactRevision int64
- // Canceled is used to indicate watch failure.
- // If the watch failed and the stream was about to close, before the channel is closed,
- // the channel sends a final response that has Canceled set to true with a non-nil Err().
- Canceled bool
- // Created is used to indicate the creation of the watcher.
- Created bool
- closeErr error
- // cancelReason is a reason of canceling watch
- cancelReason string
- }
- // IsCreate returns true if the event tells that the key is newly created.
- func (e *Event) IsCreate() bool {
- return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
- }
- // IsModify returns true if the event tells that a new value is put on existing key.
- func (e *Event) IsModify() bool {
- return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
- }
- // Err is the error value if this WatchResponse holds an error.
- func (wr *WatchResponse) Err() error {
- switch {
- case wr.closeErr != nil:
- return v3rpc.Error(wr.closeErr)
- case wr.CompactRevision != 0:
- return v3rpc.ErrCompacted
- case wr.Canceled:
- if len(wr.cancelReason) != 0 {
- return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
- }
- return v3rpc.ErrFutureRev
- }
- return nil
- }
- // IsProgressNotify returns true if the WatchResponse is progress notification.
- func (wr *WatchResponse) IsProgressNotify() bool {
- return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
- }
- // watcher implements the Watcher interface
- type watcher struct {
- remote pb.WatchClient
- callOpts []grpc.CallOption
- // mu protects the grpc streams map
- mu sync.RWMutex
- // streams holds all the active grpc streams keyed by ctx value.
- streams map[string]*watchGrpcStream
- }
- // watchGrpcStream tracks all watch resources attached to a single grpc stream.
- type watchGrpcStream struct {
- owner *watcher
- remote pb.WatchClient
- callOpts []grpc.CallOption
- // ctx controls internal remote.Watch requests
- ctx context.Context
- // ctxKey is the key used when looking up this stream's context
- ctxKey string
- cancel context.CancelFunc
- // substreams holds all active watchers on this grpc stream
- substreams map[int64]*watcherStream
- // resuming holds all resuming watchers on this grpc stream
- resuming []*watcherStream
- // reqc sends a watch request from Watch() to the main goroutine
- reqc chan watchStreamRequest
- // respc receives data from the watch client
- respc chan *pb.WatchResponse
- // donec closes to broadcast shutdown
- donec chan struct{}
- // errc transmits errors from grpc Recv to the watch stream reconnect logic
- errc chan error
- // closingc gets the watcherStream of closing watchers
- closingc chan *watcherStream
- // wg is Done when all substream goroutines have exited
- wg sync.WaitGroup
- // resumec closes to signal that all substreams should begin resuming
- resumec chan struct{}
- // closeErr is the error that closed the watch stream
- closeErr error
- }
- // watchStreamRequest is a union of the supported watch request operation types
- type watchStreamRequest interface {
- toPB() *pb.WatchRequest
- }
- // watchRequest is issued by the subscriber to start a new watcher
- type watchRequest struct {
- ctx context.Context
- key string
- end string
- rev int64
- // send created notification event if this field is true
- createdNotify bool
- // progressNotify is for progress updates
- progressNotify bool
- // fragmentation should be disabled by default
- // if true, split watch events when total exceeds
- // "--max-request-bytes" flag value + 512-byte
- fragment bool
- // filters is the list of events to filter out
- filters []pb.WatchCreateRequest_FilterType
- // get the previous key-value pair before the event happens
- prevKV bool
- // retc receives a chan WatchResponse once the watcher is established
- retc chan chan WatchResponse
- }
- // progressRequest is issued by the subscriber to request watch progress
- type progressRequest struct {
- }
- // watcherStream represents a registered watcher
- type watcherStream struct {
- // initReq is the request that initiated this request
- initReq watchRequest
- // outc publishes watch responses to subscriber
- outc chan WatchResponse
- // recvc buffers watch responses before publishing
- recvc chan *WatchResponse
- // donec closes when the watcherStream goroutine stops.
- donec chan struct{}
- // closing is set to true when stream should be scheduled to shutdown.
- closing bool
- // id is the registered watch id on the grpc stream
- id int64
- // buf holds all events received from etcd but not yet consumed by the client
- buf []*WatchResponse
- }
- func NewWatcher(c *Client) Watcher {
- return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
- }
- func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
- w := &watcher{
- remote: wc,
- streams: make(map[string]*watchGrpcStream),
- }
- if c != nil {
- w.callOpts = c.callOpts
- }
- return w
- }
- // never closes
- var valCtxCh = make(chan struct{})
- var zeroTime = time.Unix(0, 0)
- // ctx with only the values; never Done
- type valCtx struct{ context.Context }
- func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
- func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
- func (vc *valCtx) Err() error { return nil }
- func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
- ctx, cancel := context.WithCancel(&valCtx{inctx})
- wgs := &watchGrpcStream{
- owner: w,
- remote: w.remote,
- callOpts: w.callOpts,
- ctx: ctx,
- ctxKey: streamKeyFromCtx(inctx),
- cancel: cancel,
- substreams: make(map[int64]*watcherStream),
- respc: make(chan *pb.WatchResponse),
- reqc: make(chan watchStreamRequest),
- donec: make(chan struct{}),
- errc: make(chan error, 1),
- closingc: make(chan *watcherStream),
- resumec: make(chan struct{}),
- }
- go wgs.run()
- return wgs
- }
- // Watch posts a watch request to run() and waits for a new watcher channel
- func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
- ow := opWatch(key, opts...)
- var filters []pb.WatchCreateRequest_FilterType
- if ow.filterPut {
- filters = append(filters, pb.WatchCreateRequest_NOPUT)
- }
- if ow.filterDelete {
- filters = append(filters, pb.WatchCreateRequest_NODELETE)
- }
- wr := &watchRequest{
- ctx: ctx,
- createdNotify: ow.createdNotify,
- key: string(ow.key),
- end: string(ow.end),
- rev: ow.rev,
- progressNotify: ow.progressNotify,
- fragment: ow.fragment,
- filters: filters,
- prevKV: ow.prevKV,
- retc: make(chan chan WatchResponse, 1),
- }
- ok := false
- ctxKey := streamKeyFromCtx(ctx)
- // find or allocate appropriate grpc watch stream
- w.mu.Lock()
- if w.streams == nil {
- // closed
- w.mu.Unlock()
- ch := make(chan WatchResponse)
- close(ch)
- return ch
- }
- wgs := w.streams[ctxKey]
- if wgs == nil {
- wgs = w.newWatcherGrpcStream(ctx)
- w.streams[ctxKey] = wgs
- }
- donec := wgs.donec
- reqc := wgs.reqc
- w.mu.Unlock()
- // couldn't create channel; return closed channel
- closeCh := make(chan WatchResponse, 1)
- // submit request
- select {
- case reqc <- wr:
- ok = true
- case <-wr.ctx.Done():
- case <-donec:
- if wgs.closeErr != nil {
- closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
- break
- }
- // retry; may have dropped stream from no ctxs
- return w.Watch(ctx, key, opts...)
- }
- // receive channel
- if ok {
- select {
- case ret := <-wr.retc:
- return ret
- case <-ctx.Done():
- case <-donec:
- if wgs.closeErr != nil {
- closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
- break
- }
- // retry; may have dropped stream from no ctxs
- return w.Watch(ctx, key, opts...)
- }
- }
- close(closeCh)
- return closeCh
- }
- func (w *watcher) Close() (err error) {
- w.mu.Lock()
- streams := w.streams
- w.streams = nil
- w.mu.Unlock()
- for _, wgs := range streams {
- if werr := wgs.close(); werr != nil {
- err = werr
- }
- }
- // Consider context.Canceled as a successful close
- if err == context.Canceled {
- err = nil
- }
- return err
- }
- // RequestProgress requests a progress notify response be sent in all watch channels.
- func (w *watcher) RequestProgress(ctx context.Context) (err error) {
- ctxKey := streamKeyFromCtx(ctx)
- w.mu.Lock()
- if w.streams == nil {
- w.mu.Unlock()
- return fmt.Errorf("no stream found for context")
- }
- wgs := w.streams[ctxKey]
- if wgs == nil {
- wgs = w.newWatcherGrpcStream(ctx)
- w.streams[ctxKey] = wgs
- }
- donec := wgs.donec
- reqc := wgs.reqc
- w.mu.Unlock()
- pr := &progressRequest{}
- select {
- case reqc <- pr:
- return nil
- case <-ctx.Done():
- if err == nil {
- return ctx.Err()
- }
- return err
- case <-donec:
- if wgs.closeErr != nil {
- return wgs.closeErr
- }
- // retry; may have dropped stream from no ctxs
- return w.RequestProgress(ctx)
- }
- }
- func (w *watchGrpcStream) close() (err error) {
- w.cancel()
- <-w.donec
- select {
- case err = <-w.errc:
- default:
- }
- return toErr(w.ctx, err)
- }
- func (w *watcher) closeStream(wgs *watchGrpcStream) {
- w.mu.Lock()
- close(wgs.donec)
- wgs.cancel()
- if w.streams != nil {
- delete(w.streams, wgs.ctxKey)
- }
- w.mu.Unlock()
- }
- func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
- // check watch ID for backward compatibility (<= v3.3)
- if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
- w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
- // failed; no channel
- close(ws.recvc)
- return
- }
- ws.id = resp.WatchId
- w.substreams[ws.id] = ws
- }
- func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
- select {
- case ws.outc <- *resp:
- case <-ws.initReq.ctx.Done():
- case <-time.After(closeSendErrTimeout):
- }
- close(ws.outc)
- }
- func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
- // send channel response in case stream was never established
- select {
- case ws.initReq.retc <- ws.outc:
- default:
- }
- // close subscriber's channel
- if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
- go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
- } else if ws.outc != nil {
- close(ws.outc)
- }
- if ws.id != -1 {
- delete(w.substreams, ws.id)
- return
- }
- for i := range w.resuming {
- if w.resuming[i] == ws {
- w.resuming[i] = nil
- return
- }
- }
- }
- // run is the root of the goroutines for managing a watcher client
- func (w *watchGrpcStream) run() {
- var wc pb.Watch_WatchClient
- var closeErr error
- // substreams marked to close but goroutine still running; needed for
- // avoiding double-closing recvc on grpc stream teardown
- closing := make(map[*watcherStream]struct{})
- defer func() {
- w.closeErr = closeErr
- // shutdown substreams and resuming substreams
- for _, ws := range w.substreams {
- if _, ok := closing[ws]; !ok {
- close(ws.recvc)
- closing[ws] = struct{}{}
- }
- }
- for _, ws := range w.resuming {
- if _, ok := closing[ws]; ws != nil && !ok {
- close(ws.recvc)
- closing[ws] = struct{}{}
- }
- }
- w.joinSubstreams()
- for range closing {
- w.closeSubstream(<-w.closingc)
- }
- w.wg.Wait()
- w.owner.closeStream(w)
- }()
- // start a stream with the etcd grpc server
- if wc, closeErr = w.newWatchClient(); closeErr != nil {
- return
- }
- cancelSet := make(map[int64]struct{})
- var cur *pb.WatchResponse
- for {
- select {
- // Watch() requested
- case req := <-w.reqc:
- switch wreq := req.(type) {
- case *watchRequest:
- outc := make(chan WatchResponse, 1)
- // TODO: pass custom watch ID?
- ws := &watcherStream{
- initReq: *wreq,
- id: -1,
- outc: outc,
- // unbuffered so resumes won't cause repeat events
- recvc: make(chan *WatchResponse),
- }
- ws.donec = make(chan struct{})
- w.wg.Add(1)
- go w.serveSubstream(ws, w.resumec)
- // queue up for watcher creation/resume
- w.resuming = append(w.resuming, ws)
- if len(w.resuming) == 1 {
- // head of resume queue, can register a new watcher
- wc.Send(ws.initReq.toPB())
- }
- case *progressRequest:
- wc.Send(wreq.toPB())
- }
- // new events from the watch client
- case pbresp := <-w.respc:
- if cur == nil || pbresp.Created || pbresp.Canceled {
- cur = pbresp
- } else if cur != nil && cur.WatchId == pbresp.WatchId {
- // merge new events
- cur.Events = append(cur.Events, pbresp.Events...)
- // update "Fragment" field; last response with "Fragment" == false
- cur.Fragment = pbresp.Fragment
- }
- switch {
- case pbresp.Created:
- // response to head of queue creation
- if ws := w.resuming[0]; ws != nil {
- w.addSubstream(pbresp, ws)
- w.dispatchEvent(pbresp)
- w.resuming[0] = nil
- }
- if ws := w.nextResume(); ws != nil {
- wc.Send(ws.initReq.toPB())
- }
- // reset for next iteration
- cur = nil
- case pbresp.Canceled && pbresp.CompactRevision == 0:
- delete(cancelSet, pbresp.WatchId)
- if ws, ok := w.substreams[pbresp.WatchId]; ok {
- // signal to stream goroutine to update closingc
- close(ws.recvc)
- closing[ws] = struct{}{}
- }
- // reset for next iteration
- cur = nil
- case cur.Fragment:
- // watch response events are still fragmented
- // continue to fetch next fragmented event arrival
- continue
- default:
- // dispatch to appropriate watch stream
- ok := w.dispatchEvent(cur)
- // reset for next iteration
- cur = nil
- if ok {
- break
- }
- // watch response on unexpected watch id; cancel id
- if _, ok := cancelSet[pbresp.WatchId]; ok {
- break
- }
- cancelSet[pbresp.WatchId] = struct{}{}
- cr := &pb.WatchRequest_CancelRequest{
- CancelRequest: &pb.WatchCancelRequest{
- WatchId: pbresp.WatchId,
- },
- }
- req := &pb.WatchRequest{RequestUnion: cr}
- wc.Send(req)
- }
- // watch client failed on Recv; spawn another if possible
- case err := <-w.errc:
- if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
- closeErr = err
- return
- }
- if wc, closeErr = w.newWatchClient(); closeErr != nil {
- return
- }
- if ws := w.nextResume(); ws != nil {
- wc.Send(ws.initReq.toPB())
- }
- cancelSet = make(map[int64]struct{})
- case <-w.ctx.Done():
- return
- case ws := <-w.closingc:
- w.closeSubstream(ws)
- delete(closing, ws)
- // no more watchers on this stream, shutdown
- if len(w.substreams)+len(w.resuming) == 0 {
- return
- }
- }
- }
- }
- // nextResume chooses the next resuming to register with the grpc stream. Abandoned
- // streams are marked as nil in the queue since the head must wait for its inflight registration.
- func (w *watchGrpcStream) nextResume() *watcherStream {
- for len(w.resuming) != 0 {
- if w.resuming[0] != nil {
- return w.resuming[0]
- }
- w.resuming = w.resuming[1:len(w.resuming)]
- }
- return nil
- }
- // dispatchEvent sends a WatchResponse to the appropriate watcher stream
- func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
- events := make([]*Event, len(pbresp.Events))
- for i, ev := range pbresp.Events {
- events[i] = (*Event)(ev)
- }
- // TODO: return watch ID?
- wr := &WatchResponse{
- Header: *pbresp.Header,
- Events: events,
- CompactRevision: pbresp.CompactRevision,
- Created: pbresp.Created,
- Canceled: pbresp.Canceled,
- cancelReason: pbresp.CancelReason,
- }
- // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
- // indicate they should be broadcast.
- if wr.IsProgressNotify() && pbresp.WatchId == -1 {
- return w.broadcastResponse(wr)
- }
- return w.unicastResponse(wr, pbresp.WatchId)
- }
- // broadcastResponse send a watch response to all watch substreams.
- func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
- for _, ws := range w.substreams {
- select {
- case ws.recvc <- wr:
- case <-ws.donec:
- }
- }
- return true
- }
- // unicastResponse sends a watch response to a specific watch substream.
- func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
- ws, ok := w.substreams[watchId]
- if !ok {
- return false
- }
- select {
- case ws.recvc <- wr:
- case <-ws.donec:
- return false
- }
- return true
- }
- // serveWatchClient forwards messages from the grpc stream to run()
- func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
- for {
- resp, err := wc.Recv()
- if err != nil {
- select {
- case w.errc <- err:
- case <-w.donec:
- }
- return
- }
- select {
- case w.respc <- resp:
- case <-w.donec:
- return
- }
- }
- }
- // serveSubstream forwards watch responses from run() to the subscriber
- func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
- if ws.closing {
- panic("created substream goroutine but substream is closing")
- }
- // nextRev is the minimum expected next revision
- nextRev := ws.initReq.rev
- resuming := false
- defer func() {
- if !resuming {
- ws.closing = true
- }
- close(ws.donec)
- if !resuming {
- w.closingc <- ws
- }
- w.wg.Done()
- }()
- emptyWr := &WatchResponse{}
- for {
- curWr := emptyWr
- outc := ws.outc
- if len(ws.buf) > 0 {
- curWr = ws.buf[0]
- } else {
- outc = nil
- }
- select {
- case outc <- *curWr:
- if ws.buf[0].Err() != nil {
- return
- }
- ws.buf[0] = nil
- ws.buf = ws.buf[1:]
- case wr, ok := <-ws.recvc:
- if !ok {
- // shutdown from closeSubstream
- return
- }
- if wr.Created {
- if ws.initReq.retc != nil {
- ws.initReq.retc <- ws.outc
- // to prevent next write from taking the slot in buffered channel
- // and posting duplicate create events
- ws.initReq.retc = nil
- // send first creation event only if requested
- if ws.initReq.createdNotify {
- ws.outc <- *wr
- }
- // once the watch channel is returned, a current revision
- // watch must resume at the store revision. This is necessary
- // for the following case to work as expected:
- // wch := m1.Watch("a")
- // m2.Put("a", "b")
- // <-wch
- // If the revision is only bound on the first observed event,
- // if wch is disconnected before the Put is issued, then reconnects
- // after it is committed, it'll miss the Put.
- if ws.initReq.rev == 0 {
- nextRev = wr.Header.Revision
- }
- }
- } else {
- // current progress of watch; <= store revision
- nextRev = wr.Header.Revision
- }
- if len(wr.Events) > 0 {
- nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
- }
- ws.initReq.rev = nextRev
- // created event is already sent above,
- // watcher should not post duplicate events
- if wr.Created {
- continue
- }
- // TODO pause channel if buffer gets too large
- ws.buf = append(ws.buf, wr)
- case <-w.ctx.Done():
- return
- case <-ws.initReq.ctx.Done():
- return
- case <-resumec:
- resuming = true
- return
- }
- }
- // lazily send cancel message if events on missing id
- }
- func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
- // mark all substreams as resuming
- close(w.resumec)
- w.resumec = make(chan struct{})
- w.joinSubstreams()
- for _, ws := range w.substreams {
- ws.id = -1
- w.resuming = append(w.resuming, ws)
- }
- // strip out nils, if any
- var resuming []*watcherStream
- for _, ws := range w.resuming {
- if ws != nil {
- resuming = append(resuming, ws)
- }
- }
- w.resuming = resuming
- w.substreams = make(map[int64]*watcherStream)
- // connect to grpc stream while accepting watcher cancelation
- stopc := make(chan struct{})
- donec := w.waitCancelSubstreams(stopc)
- wc, err := w.openWatchClient()
- close(stopc)
- <-donec
- // serve all non-closing streams, even if there's a client error
- // so that the teardown path can shutdown the streams as expected.
- for _, ws := range w.resuming {
- if ws.closing {
- continue
- }
- ws.donec = make(chan struct{})
- w.wg.Add(1)
- go w.serveSubstream(ws, w.resumec)
- }
- if err != nil {
- return nil, v3rpc.Error(err)
- }
- // receive data from new grpc stream
- go w.serveWatchClient(wc)
- return wc, nil
- }
- func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
- var wg sync.WaitGroup
- wg.Add(len(w.resuming))
- donec := make(chan struct{})
- for i := range w.resuming {
- go func(ws *watcherStream) {
- defer wg.Done()
- if ws.closing {
- if ws.initReq.ctx.Err() != nil && ws.outc != nil {
- close(ws.outc)
- ws.outc = nil
- }
- return
- }
- select {
- case <-ws.initReq.ctx.Done():
- // closed ws will be removed from resuming
- ws.closing = true
- close(ws.outc)
- ws.outc = nil
- w.wg.Add(1)
- go func() {
- defer w.wg.Done()
- w.closingc <- ws
- }()
- case <-stopc:
- }
- }(w.resuming[i])
- }
- go func() {
- defer close(donec)
- wg.Wait()
- }()
- return donec
- }
- // joinSubstreams waits for all substream goroutines to complete.
- func (w *watchGrpcStream) joinSubstreams() {
- for _, ws := range w.substreams {
- <-ws.donec
- }
- for _, ws := range w.resuming {
- if ws != nil {
- <-ws.donec
- }
- }
- }
- var maxBackoff = 100 * time.Millisecond
- // openWatchClient retries opening a watch client until success or halt.
- // manually retry in case "ws==nil && err==nil"
- // TODO: remove FailFast=false
- func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
- backoff := time.Millisecond
- for {
- select {
- case <-w.ctx.Done():
- if err == nil {
- return nil, w.ctx.Err()
- }
- return nil, err
- default:
- }
- if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
- break
- }
- if isHaltErr(w.ctx, err) {
- return nil, v3rpc.Error(err)
- }
- if isUnavailableErr(w.ctx, err) {
- // retry, but backoff
- if backoff < maxBackoff {
- // 25% backoff factor
- backoff = backoff + backoff/4
- if backoff > maxBackoff {
- backoff = maxBackoff
- }
- }
- time.Sleep(backoff)
- }
- }
- return ws, nil
- }
- // toPB converts an internal watch request structure to its protobuf WatchRequest structure.
- func (wr *watchRequest) toPB() *pb.WatchRequest {
- req := &pb.WatchCreateRequest{
- StartRevision: wr.rev,
- Key: []byte(wr.key),
- RangeEnd: []byte(wr.end),
- ProgressNotify: wr.progressNotify,
- Filters: wr.filters,
- PrevKv: wr.prevKV,
- Fragment: wr.fragment,
- }
- cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
- return &pb.WatchRequest{RequestUnion: cr}
- }
- // toPB converts an internal progress request structure to its protobuf WatchRequest structure.
- func (pr *progressRequest) toPB() *pb.WatchRequest {
- req := &pb.WatchProgressRequest{}
- cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
- return &pb.WatchRequest{RequestUnion: cr}
- }
- func streamKeyFromCtx(ctx context.Context) string {
- if md, ok := metadata.FromOutgoingContext(ctx); ok {
- return fmt.Sprintf("%+v", md)
- }
- return ""
- }
|