// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package clientv3 import ( "context" "errors" "fmt" "sync" "time" v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" mvccpb "github.com/coreos/etcd/mvcc/mvccpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) const ( EventTypeDelete = mvccpb.DELETE EventTypePut = mvccpb.PUT closeSendErrTimeout = 250 * time.Millisecond ) type Event mvccpb.Event type WatchChan <-chan WatchResponse type Watcher interface { // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. If revisions waiting to be sent over the // watch are compacted, then the watch will be canceled by the server, the // client will post a compacted error watch response, and the channel will close. // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, // and "WatchResponse" from this closed channel has zero events and nil "Err()". // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, // to release the associated resources. // // If the context is "context.Background/TODO", returned "WatchChan" will // not be closed and block until event is triggered, except when server // returns a non-recoverable error (e.g. ErrCompacted). // For example, when context passed with "WithRequireLeader" and the // connected server has no leader (e.g. due to network partition), // error "etcdserver: no leader" (ErrNoLeader) will be returned, // and then "WatchChan" is closed with non-nil "Err()". // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". // // Otherwise, as long as the context has not been canceled or timed out, // watch will retry on other recoverable errors forever until reconnected. // // TODO: explicitly set context error in the last "WatchResponse" message and close channel? // Currently, client contexts are overwritten with "valCtx" that never closes. // TODO(v3.4): configure watch retry policy, limit maximum retry number // (see https://github.com/etcd-io/etcd/issues/8980) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // RequestProgress requests a progress notify response be sent in all watch channels. RequestProgress(ctx context.Context) error // Close closes the watcher and cancels all watch requests. Close() error } type WatchResponse struct { Header pb.ResponseHeader Events []*Event // CompactRevision is the minimum revision the watcher may receive. CompactRevision int64 // Canceled is used to indicate watch failure. // If the watch failed and the stream was about to close, before the channel is closed, // the channel sends a final response that has Canceled set to true with a non-nil Err(). Canceled bool // Created is used to indicate the creation of the watcher. Created bool closeErr error // cancelReason is a reason of canceling watch cancelReason string } // IsCreate returns true if the event tells that the key is newly created. func (e *Event) IsCreate() bool { return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision } // IsModify returns true if the event tells that a new value is put on existing key. func (e *Event) IsModify() bool { return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision } // Err is the error value if this WatchResponse holds an error. func (wr *WatchResponse) Err() error { switch { case wr.closeErr != nil: return v3rpc.Error(wr.closeErr) case wr.CompactRevision != 0: return v3rpc.ErrCompacted case wr.Canceled: if len(wr.cancelReason) != 0 { return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) } return v3rpc.ErrFutureRev } return nil } // IsProgressNotify returns true if the WatchResponse is progress notification. func (wr *WatchResponse) IsProgressNotify() bool { return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 } // watcher implements the Watcher interface type watcher struct { remote pb.WatchClient callOpts []grpc.CallOption // mu protects the grpc streams map mu sync.RWMutex // streams holds all the active grpc streams keyed by ctx value. streams map[string]*watchGrpcStream } // watchGrpcStream tracks all watch resources attached to a single grpc stream. type watchGrpcStream struct { owner *watcher remote pb.WatchClient callOpts []grpc.CallOption // ctx controls internal remote.Watch requests ctx context.Context // ctxKey is the key used when looking up this stream's context ctxKey string cancel context.CancelFunc // substreams holds all active watchers on this grpc stream substreams map[int64]*watcherStream // resuming holds all resuming watchers on this grpc stream resuming []*watcherStream // reqc sends a watch request from Watch() to the main goroutine reqc chan watchStreamRequest // respc receives data from the watch client respc chan *pb.WatchResponse // donec closes to broadcast shutdown donec chan struct{} // errc transmits errors from grpc Recv to the watch stream reconnect logic errc chan error // closingc gets the watcherStream of closing watchers closingc chan *watcherStream // wg is Done when all substream goroutines have exited wg sync.WaitGroup // resumec closes to signal that all substreams should begin resuming resumec chan struct{} // closeErr is the error that closed the watch stream closeErr error } // watchStreamRequest is a union of the supported watch request operation types type watchStreamRequest interface { toPB() *pb.WatchRequest } // watchRequest is issued by the subscriber to start a new watcher type watchRequest struct { ctx context.Context key string end string rev int64 // send created notification event if this field is true createdNotify bool // progressNotify is for progress updates progressNotify bool // fragmentation should be disabled by default // if true, split watch events when total exceeds // "--max-request-bytes" flag value + 512-byte fragment bool // filters is the list of events to filter out filters []pb.WatchCreateRequest_FilterType // get the previous key-value pair before the event happens prevKV bool // retc receives a chan WatchResponse once the watcher is established retc chan chan WatchResponse } // progressRequest is issued by the subscriber to request watch progress type progressRequest struct { } // watcherStream represents a registered watcher type watcherStream struct { // initReq is the request that initiated this request initReq watchRequest // outc publishes watch responses to subscriber outc chan WatchResponse // recvc buffers watch responses before publishing recvc chan *WatchResponse // donec closes when the watcherStream goroutine stops. donec chan struct{} // closing is set to true when stream should be scheduled to shutdown. closing bool // id is the registered watch id on the grpc stream id int64 // buf holds all events received from etcd but not yet consumed by the client buf []*WatchResponse } func NewWatcher(c *Client) Watcher { return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) } func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { w := &watcher{ remote: wc, streams: make(map[string]*watchGrpcStream), } if c != nil { w.callOpts = c.callOpts } return w } // never closes var valCtxCh = make(chan struct{}) var zeroTime = time.Unix(0, 0) // ctx with only the values; never Done type valCtx struct{ context.Context } func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } func (vc *valCtx) Err() error { return nil } func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { ctx, cancel := context.WithCancel(&valCtx{inctx}) wgs := &watchGrpcStream{ owner: w, remote: w.remote, callOpts: w.callOpts, ctx: ctx, ctxKey: streamKeyFromCtx(inctx), cancel: cancel, substreams: make(map[int64]*watcherStream), respc: make(chan *pb.WatchResponse), reqc: make(chan watchStreamRequest), donec: make(chan struct{}), errc: make(chan error, 1), closingc: make(chan *watcherStream), resumec: make(chan struct{}), } go wgs.run() return wgs } // Watch posts a watch request to run() and waits for a new watcher channel func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { ow := opWatch(key, opts...) var filters []pb.WatchCreateRequest_FilterType if ow.filterPut { filters = append(filters, pb.WatchCreateRequest_NOPUT) } if ow.filterDelete { filters = append(filters, pb.WatchCreateRequest_NODELETE) } wr := &watchRequest{ ctx: ctx, createdNotify: ow.createdNotify, key: string(ow.key), end: string(ow.end), rev: ow.rev, progressNotify: ow.progressNotify, fragment: ow.fragment, filters: filters, prevKV: ow.prevKV, retc: make(chan chan WatchResponse, 1), } ok := false ctxKey := streamKeyFromCtx(ctx) // find or allocate appropriate grpc watch stream w.mu.Lock() if w.streams == nil { // closed w.mu.Unlock() ch := make(chan WatchResponse) close(ch) return ch } wgs := w.streams[ctxKey] if wgs == nil { wgs = w.newWatcherGrpcStream(ctx) w.streams[ctxKey] = wgs } donec := wgs.donec reqc := wgs.reqc w.mu.Unlock() // couldn't create channel; return closed channel closeCh := make(chan WatchResponse, 1) // submit request select { case reqc <- wr: ok = true case <-wr.ctx.Done(): case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs return w.Watch(ctx, key, opts...) } // receive channel if ok { select { case ret := <-wr.retc: return ret case <-ctx.Done(): case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs return w.Watch(ctx, key, opts...) } } close(closeCh) return closeCh } func (w *watcher) Close() (err error) { w.mu.Lock() streams := w.streams w.streams = nil w.mu.Unlock() for _, wgs := range streams { if werr := wgs.close(); werr != nil { err = werr } } // Consider context.Canceled as a successful close if err == context.Canceled { err = nil } return err } // RequestProgress requests a progress notify response be sent in all watch channels. func (w *watcher) RequestProgress(ctx context.Context) (err error) { ctxKey := streamKeyFromCtx(ctx) w.mu.Lock() if w.streams == nil { w.mu.Unlock() return fmt.Errorf("no stream found for context") } wgs := w.streams[ctxKey] if wgs == nil { wgs = w.newWatcherGrpcStream(ctx) w.streams[ctxKey] = wgs } donec := wgs.donec reqc := wgs.reqc w.mu.Unlock() pr := &progressRequest{} select { case reqc <- pr: return nil case <-ctx.Done(): if err == nil { return ctx.Err() } return err case <-donec: if wgs.closeErr != nil { return wgs.closeErr } // retry; may have dropped stream from no ctxs return w.RequestProgress(ctx) } } func (w *watchGrpcStream) close() (err error) { w.cancel() <-w.donec select { case err = <-w.errc: default: } return toErr(w.ctx, err) } func (w *watcher) closeStream(wgs *watchGrpcStream) { w.mu.Lock() close(wgs.donec) wgs.cancel() if w.streams != nil { delete(w.streams, wgs.ctxKey) } w.mu.Unlock() } func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) return } ws.id = resp.WatchId w.substreams[ws.id] = ws } func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { select { case ws.outc <- *resp: case <-ws.initReq.ctx.Done(): case <-time.After(closeSendErrTimeout): } close(ws.outc) } func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { // send channel response in case stream was never established select { case ws.initReq.retc <- ws.outc: default: } // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) } else if ws.outc != nil { close(ws.outc) } if ws.id != -1 { delete(w.substreams, ws.id) return } for i := range w.resuming { if w.resuming[i] == ws { w.resuming[i] = nil return } } } // run is the root of the goroutines for managing a watcher client func (w *watchGrpcStream) run() { var wc pb.Watch_WatchClient var closeErr error // substreams marked to close but goroutine still running; needed for // avoiding double-closing recvc on grpc stream teardown closing := make(map[*watcherStream]struct{}) defer func() { w.closeErr = closeErr // shutdown substreams and resuming substreams for _, ws := range w.substreams { if _, ok := closing[ws]; !ok { close(ws.recvc) closing[ws] = struct{}{} } } for _, ws := range w.resuming { if _, ok := closing[ws]; ws != nil && !ok { close(ws.recvc) closing[ws] = struct{}{} } } w.joinSubstreams() for range closing { w.closeSubstream(<-w.closingc) } w.wg.Wait() w.owner.closeStream(w) }() // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return } cancelSet := make(map[int64]struct{}) var cur *pb.WatchResponse for { select { // Watch() requested case req := <-w.reqc: switch wreq := req.(type) { case *watchRequest: outc := make(chan WatchResponse, 1) // TODO: pass custom watch ID? ws := &watcherStream{ initReq: *wreq, id: -1, outc: outc, // unbuffered so resumes won't cause repeat events recvc: make(chan *WatchResponse), } ws.donec = make(chan struct{}) w.wg.Add(1) go w.serveSubstream(ws, w.resumec) // queue up for watcher creation/resume w.resuming = append(w.resuming, ws) if len(w.resuming) == 1 { // head of resume queue, can register a new watcher wc.Send(ws.initReq.toPB()) } case *progressRequest: wc.Send(wreq.toPB()) } // new events from the watch client case pbresp := <-w.respc: if cur == nil || pbresp.Created || pbresp.Canceled { cur = pbresp } else if cur != nil && cur.WatchId == pbresp.WatchId { // merge new events cur.Events = append(cur.Events, pbresp.Events...) // update "Fragment" field; last response with "Fragment" == false cur.Fragment = pbresp.Fragment } switch { case pbresp.Created: // response to head of queue creation if ws := w.resuming[0]; ws != nil { w.addSubstream(pbresp, ws) w.dispatchEvent(pbresp) w.resuming[0] = nil } if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } // reset for next iteration cur = nil case pbresp.Canceled && pbresp.CompactRevision == 0: delete(cancelSet, pbresp.WatchId) if ws, ok := w.substreams[pbresp.WatchId]; ok { // signal to stream goroutine to update closingc close(ws.recvc) closing[ws] = struct{}{} } // reset for next iteration cur = nil case cur.Fragment: // watch response events are still fragmented // continue to fetch next fragmented event arrival continue default: // dispatch to appropriate watch stream ok := w.dispatchEvent(cur) // reset for next iteration cur = nil if ok { break } // watch response on unexpected watch id; cancel id if _, ok := cancelSet[pbresp.WatchId]; ok { break } cancelSet[pbresp.WatchId] = struct{}{} cr := &pb.WatchRequest_CancelRequest{ CancelRequest: &pb.WatchCancelRequest{ WatchId: pbresp.WatchId, }, } req := &pb.WatchRequest{RequestUnion: cr} wc.Send(req) } // watch client failed on Recv; spawn another if possible case err := <-w.errc: if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { closeErr = err return } if wc, closeErr = w.newWatchClient(); closeErr != nil { return } if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } cancelSet = make(map[int64]struct{}) case <-w.ctx.Done(): return case ws := <-w.closingc: w.closeSubstream(ws) delete(closing, ws) // no more watchers on this stream, shutdown if len(w.substreams)+len(w.resuming) == 0 { return } } } } // nextResume chooses the next resuming to register with the grpc stream. Abandoned // streams are marked as nil in the queue since the head must wait for its inflight registration. func (w *watchGrpcStream) nextResume() *watcherStream { for len(w.resuming) != 0 { if w.resuming[0] != nil { return w.resuming[0] } w.resuming = w.resuming[1:len(w.resuming)] } return nil } // dispatchEvent sends a WatchResponse to the appropriate watcher stream func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { events := make([]*Event, len(pbresp.Events)) for i, ev := range pbresp.Events { events[i] = (*Event)(ev) } // TODO: return watch ID? wr := &WatchResponse{ Header: *pbresp.Header, Events: events, CompactRevision: pbresp.CompactRevision, Created: pbresp.Created, Canceled: pbresp.Canceled, cancelReason: pbresp.CancelReason, } // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to // indicate they should be broadcast. if wr.IsProgressNotify() && pbresp.WatchId == -1 { return w.broadcastResponse(wr) } return w.unicastResponse(wr, pbresp.WatchId) } // broadcastResponse send a watch response to all watch substreams. func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { for _, ws := range w.substreams { select { case ws.recvc <- wr: case <-ws.donec: } } return true } // unicastResponse sends a watch response to a specific watch substream. func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { ws, ok := w.substreams[watchId] if !ok { return false } select { case ws.recvc <- wr: case <-ws.donec: return false } return true } // serveWatchClient forwards messages from the grpc stream to run() func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { for { resp, err := wc.Recv() if err != nil { select { case w.errc <- err: case <-w.donec: } return } select { case w.respc <- resp: case <-w.donec: return } } } // serveSubstream forwards watch responses from run() to the subscriber func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { if ws.closing { panic("created substream goroutine but substream is closing") } // nextRev is the minimum expected next revision nextRev := ws.initReq.rev resuming := false defer func() { if !resuming { ws.closing = true } close(ws.donec) if !resuming { w.closingc <- ws } w.wg.Done() }() emptyWr := &WatchResponse{} for { curWr := emptyWr outc := ws.outc if len(ws.buf) > 0 { curWr = ws.buf[0] } else { outc = nil } select { case outc <- *curWr: if ws.buf[0].Err() != nil { return } ws.buf[0] = nil ws.buf = ws.buf[1:] case wr, ok := <-ws.recvc: if !ok { // shutdown from closeSubstream return } if wr.Created { if ws.initReq.retc != nil { ws.initReq.retc <- ws.outc // to prevent next write from taking the slot in buffered channel // and posting duplicate create events ws.initReq.retc = nil // send first creation event only if requested if ws.initReq.createdNotify { ws.outc <- *wr } // once the watch channel is returned, a current revision // watch must resume at the store revision. This is necessary // for the following case to work as expected: // wch := m1.Watch("a") // m2.Put("a", "b") // <-wch // If the revision is only bound on the first observed event, // if wch is disconnected before the Put is issued, then reconnects // after it is committed, it'll miss the Put. if ws.initReq.rev == 0 { nextRev = wr.Header.Revision } } } else { // current progress of watch; <= store revision nextRev = wr.Header.Revision } if len(wr.Events) > 0 { nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 } ws.initReq.rev = nextRev // created event is already sent above, // watcher should not post duplicate events if wr.Created { continue } // TODO pause channel if buffer gets too large ws.buf = append(ws.buf, wr) case <-w.ctx.Done(): return case <-ws.initReq.ctx.Done(): return case <-resumec: resuming = true return } } // lazily send cancel message if events on missing id } func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { // mark all substreams as resuming close(w.resumec) w.resumec = make(chan struct{}) w.joinSubstreams() for _, ws := range w.substreams { ws.id = -1 w.resuming = append(w.resuming, ws) } // strip out nils, if any var resuming []*watcherStream for _, ws := range w.resuming { if ws != nil { resuming = append(resuming, ws) } } w.resuming = resuming w.substreams = make(map[int64]*watcherStream) // connect to grpc stream while accepting watcher cancelation stopc := make(chan struct{}) donec := w.waitCancelSubstreams(stopc) wc, err := w.openWatchClient() close(stopc) <-donec // serve all non-closing streams, even if there's a client error // so that the teardown path can shutdown the streams as expected. for _, ws := range w.resuming { if ws.closing { continue } ws.donec = make(chan struct{}) w.wg.Add(1) go w.serveSubstream(ws, w.resumec) } if err != nil { return nil, v3rpc.Error(err) } // receive data from new grpc stream go w.serveWatchClient(wc) return wc, nil } func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { var wg sync.WaitGroup wg.Add(len(w.resuming)) donec := make(chan struct{}) for i := range w.resuming { go func(ws *watcherStream) { defer wg.Done() if ws.closing { if ws.initReq.ctx.Err() != nil && ws.outc != nil { close(ws.outc) ws.outc = nil } return } select { case <-ws.initReq.ctx.Done(): // closed ws will be removed from resuming ws.closing = true close(ws.outc) ws.outc = nil w.wg.Add(1) go func() { defer w.wg.Done() w.closingc <- ws }() case <-stopc: } }(w.resuming[i]) } go func() { defer close(donec) wg.Wait() }() return donec } // joinSubstreams waits for all substream goroutines to complete. func (w *watchGrpcStream) joinSubstreams() { for _, ws := range w.substreams { <-ws.donec } for _, ws := range w.resuming { if ws != nil { <-ws.donec } } } var maxBackoff = 100 * time.Millisecond // openWatchClient retries opening a watch client until success or halt. // manually retry in case "ws==nil && err==nil" // TODO: remove FailFast=false func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { backoff := time.Millisecond for { select { case <-w.ctx.Done(): if err == nil { return nil, w.ctx.Err() } return nil, err default: } if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } if isUnavailableErr(w.ctx, err) { // retry, but backoff if backoff < maxBackoff { // 25% backoff factor backoff = backoff + backoff/4 if backoff > maxBackoff { backoff = maxBackoff } } time.Sleep(backoff) } } return ws, nil } // toPB converts an internal watch request structure to its protobuf WatchRequest structure. func (wr *watchRequest) toPB() *pb.WatchRequest { req := &pb.WatchCreateRequest{ StartRevision: wr.rev, Key: []byte(wr.key), RangeEnd: []byte(wr.end), ProgressNotify: wr.progressNotify, Filters: wr.filters, PrevKv: wr.prevKV, Fragment: wr.fragment, } cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} return &pb.WatchRequest{RequestUnion: cr} } // toPB converts an internal progress request structure to its protobuf WatchRequest structure. func (pr *progressRequest) toPB() *pb.WatchRequest { req := &pb.WatchProgressRequest{} cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} return &pb.WatchRequest{RequestUnion: cr} } func streamKeyFromCtx(ctx context.Context) string { if md, ok := metadata.FromOutgoingContext(ctx); ok { return fmt.Sprintf("%+v", md) } return "" }