| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package clientv3
- import (
- "fmt"
- "sync"
- "time"
- v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- )
- const (
- EventTypeDelete = mvccpb.DELETE
- EventTypePut = mvccpb.PUT
- closeSendErrTimeout = 250 * time.Millisecond
- )
- type Event mvccpb.Event
- type WatchChan <-chan WatchResponse
- type Watcher interface {
- // Watch watches on a key or prefix. The watched events will be returned
- // through the returned channel.
- // If the watch is slow or the required rev is compacted, the watch request
- // might be canceled from the server-side and the chan will be closed.
- // 'opts' can be: 'WithRev' and/or 'WithPrefix'.
- Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
- // Close closes the watcher and cancels all watch requests.
- Close() error
- }
- type WatchResponse struct {
- Header pb.ResponseHeader
- Events []*Event
- // CompactRevision is the minimum revision the watcher may receive.
- CompactRevision int64
- // Canceled is used to indicate watch failure.
- // If the watch failed and the stream was about to close, before the channel is closed,
- // the channel sends a final response that has Canceled set to true with a non-nil Err().
- Canceled bool
- // Created is used to indicate the creation of the watcher.
- Created bool
- closeErr error
- }
- // IsCreate returns true if the event tells that the key is newly created.
- func (e *Event) IsCreate() bool {
- return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
- }
- // IsModify returns true if the event tells that a new value is put on existing key.
- func (e *Event) IsModify() bool {
- return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
- }
- // Err is the error value if this WatchResponse holds an error.
- func (wr *WatchResponse) Err() error {
- switch {
- case wr.closeErr != nil:
- return v3rpc.Error(wr.closeErr)
- case wr.CompactRevision != 0:
- return v3rpc.ErrCompacted
- case wr.Canceled:
- return v3rpc.ErrFutureRev
- }
- return nil
- }
- // IsProgressNotify returns true if the WatchResponse is progress notification.
- func (wr *WatchResponse) IsProgressNotify() bool {
- return len(wr.Events) == 0 && !wr.Canceled && !wr.Created
- }
- // watcher implements the Watcher interface
- type watcher struct {
- remote pb.WatchClient
- // mu protects the grpc streams map
- mu sync.RWMutex
- // streams holds all the active grpc streams keyed by ctx value.
- streams map[string]*watchGrpcStream
- }
- type watchGrpcStream struct {
- owner *watcher
- remote pb.WatchClient
- // ctx controls internal remote.Watch requests
- ctx context.Context
- // ctxKey is the key used when looking up this stream's context
- ctxKey string
- cancel context.CancelFunc
- // mu protects the streams map
- mu sync.RWMutex
- // streams holds all active watchers
- streams map[int64]*watcherStream
- // reqc sends a watch request from Watch() to the main goroutine
- reqc chan *watchRequest
- // respc receives data from the watch client
- respc chan *pb.WatchResponse
- // stopc is sent to the main goroutine to stop all processing
- stopc chan struct{}
- // donec closes to broadcast shutdown
- donec chan struct{}
- // errc transmits errors from grpc Recv to the watch stream reconn logic
- errc chan error
- // the error that closed the watch stream
- closeErr error
- }
- // watchRequest is issued by the subscriber to start a new watcher
- type watchRequest struct {
- ctx context.Context
- key string
- end string
- rev int64
- // send created notification event if this field is true
- createdNotify bool
- // progressNotify is for progress updates
- progressNotify bool
- // filters is the list of events to filter out
- filters []pb.WatchCreateRequest_FilterType
- // get the previous key-value pair before the event happens
- prevKV bool
- // retc receives a chan WatchResponse once the watcher is established
- retc chan chan WatchResponse
- }
- // watcherStream represents a registered watcher
- type watcherStream struct {
- // initReq is the request that initiated this request
- initReq watchRequest
- // outc publishes watch responses to subscriber
- outc chan<- WatchResponse
- // recvc buffers watch responses before publishing
- recvc chan *WatchResponse
- id int64
- // lastRev is revision last successfully sent over outc
- lastRev int64
- // resumec indicates the stream must recover at a given revision
- resumec chan int64
- }
- func NewWatcher(c *Client) Watcher {
- return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
- }
- func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
- return &watcher{
- remote: wc,
- streams: make(map[string]*watchGrpcStream),
- }
- }
- // never closes
- var valCtxCh = make(chan struct{})
- var zeroTime = time.Unix(0, 0)
- // ctx with only the values; never Done
- type valCtx struct{ context.Context }
- func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
- func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
- func (vc *valCtx) Err() error { return nil }
- func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
- ctx, cancel := context.WithCancel(&valCtx{inctx})
- wgs := &watchGrpcStream{
- owner: w,
- remote: w.remote,
- ctx: ctx,
- ctxKey: fmt.Sprintf("%v", inctx),
- cancel: cancel,
- streams: make(map[int64]*watcherStream),
- respc: make(chan *pb.WatchResponse),
- reqc: make(chan *watchRequest),
- stopc: make(chan struct{}),
- donec: make(chan struct{}),
- errc: make(chan error, 1),
- }
- go wgs.run()
- return wgs
- }
- // Watch posts a watch request to run() and waits for a new watcher channel
- func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
- ow := opWatch(key, opts...)
- retc := make(chan chan WatchResponse, 1)
- var filters []pb.WatchCreateRequest_FilterType
- if ow.filterPut {
- filters = append(filters, pb.WatchCreateRequest_NOPUT)
- }
- if ow.filterDelete {
- filters = append(filters, pb.WatchCreateRequest_NODELETE)
- }
- wr := &watchRequest{
- ctx: ctx,
- createdNotify: ow.createdNotify,
- key: string(ow.key),
- end: string(ow.end),
- rev: ow.rev,
- progressNotify: ow.progressNotify,
- filters: filters,
- prevKV: ow.prevKV,
- retc: retc,
- }
- ok := false
- ctxKey := fmt.Sprintf("%v", ctx)
- // find or allocate appropriate grpc watch stream
- w.mu.Lock()
- if w.streams == nil {
- // closed
- w.mu.Unlock()
- ch := make(chan WatchResponse)
- close(ch)
- return ch
- }
- wgs := w.streams[ctxKey]
- if wgs == nil {
- wgs = w.newWatcherGrpcStream(ctx)
- w.streams[ctxKey] = wgs
- }
- donec := wgs.donec
- reqc := wgs.reqc
- w.mu.Unlock()
- // couldn't create channel; return closed channel
- closeCh := make(chan WatchResponse, 1)
- // submit request
- select {
- case reqc <- wr:
- ok = true
- case <-wr.ctx.Done():
- wgs.stopIfEmpty()
- case <-donec:
- if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
- break
- }
- // retry; may have dropped stream from no ctxs
- return w.Watch(ctx, key, opts...)
- }
- // receive channel
- if ok {
- select {
- case ret := <-retc:
- return ret
- case <-ctx.Done():
- case <-donec:
- if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
- break
- }
- // retry; may have dropped stream from no ctxs
- return w.Watch(ctx, key, opts...)
- }
- }
- close(closeCh)
- return closeCh
- }
- func (w *watcher) Close() (err error) {
- w.mu.Lock()
- streams := w.streams
- w.streams = nil
- w.mu.Unlock()
- for _, wgs := range streams {
- if werr := wgs.Close(); werr != nil {
- err = werr
- }
- }
- return err
- }
- func (w *watchGrpcStream) Close() (err error) {
- w.mu.Lock()
- if w.stopc != nil {
- close(w.stopc)
- w.stopc = nil
- }
- w.mu.Unlock()
- <-w.donec
- select {
- case err = <-w.errc:
- default:
- }
- return toErr(w.ctx, err)
- }
- func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
- if pendingReq == nil {
- // no pending request; ignore
- return
- }
- if resp.Canceled || resp.CompactRevision != 0 {
- // a cancel at id creation time means the start revision has
- // been compacted out of the store
- ret := make(chan WatchResponse, 1)
- ret <- WatchResponse{
- Header: *resp.Header,
- CompactRevision: resp.CompactRevision,
- Canceled: true}
- close(ret)
- pendingReq.retc <- ret
- return
- }
- ret := make(chan WatchResponse)
- if resp.WatchId == -1 {
- // failed; no channel
- close(ret)
- pendingReq.retc <- ret
- return
- }
- ws := &watcherStream{
- initReq: *pendingReq,
- id: resp.WatchId,
- outc: ret,
- // buffered so unlikely to block on sending while holding mu
- recvc: make(chan *WatchResponse, 4),
- resumec: make(chan int64),
- }
- if pendingReq.rev == 0 {
- // note the header revision so that a put following a current watcher
- // disconnect will arrive on the watcher channel after reconnect
- ws.initReq.rev = resp.Header.Revision
- }
- w.mu.Lock()
- w.streams[ws.id] = ws
- w.mu.Unlock()
- // pass back the subscriber channel for the watcher
- pendingReq.retc <- ret
- // send messages to subscriber
- go w.serveStream(ws)
- }
- // closeStream closes the watcher resources and removes it
- func (w *watchGrpcStream) closeStream(ws *watcherStream) {
- w.mu.Lock()
- // cancels request stream; subscriber receives nil channel
- close(ws.initReq.retc)
- // close subscriber's channel
- close(ws.outc)
- delete(w.streams, ws.id)
- w.mu.Unlock()
- }
- // run is the root of the goroutines for managing a watcher client
- func (w *watchGrpcStream) run() {
- var wc pb.Watch_WatchClient
- var closeErr error
- defer func() {
- w.owner.mu.Lock()
- w.closeErr = closeErr
- if w.owner.streams != nil {
- delete(w.owner.streams, w.ctxKey)
- }
- close(w.donec)
- w.owner.mu.Unlock()
- w.cancel()
- }()
- // already stopped?
- w.mu.RLock()
- stopc := w.stopc
- w.mu.RUnlock()
- if stopc == nil {
- return
- }
- // start a stream with the etcd grpc server
- if wc, closeErr = w.newWatchClient(); closeErr != nil {
- return
- }
- var pendingReq, failedReq *watchRequest
- curReqC := w.reqc
- cancelSet := make(map[int64]struct{})
- for {
- select {
- // Watch() requested
- case pendingReq = <-curReqC:
- // no more watch requests until there's a response
- curReqC = nil
- if err := wc.Send(pendingReq.toPB()); err == nil {
- // pendingReq now waits on w.respc
- break
- }
- failedReq = pendingReq
- // New events from the watch client
- case pbresp := <-w.respc:
- switch {
- case pbresp.Created:
- // response to pending req, try to add
- w.addStream(pbresp, pendingReq)
- pendingReq = nil
- curReqC = w.reqc
- w.dispatchEvent(pbresp)
- case pbresp.Canceled:
- delete(cancelSet, pbresp.WatchId)
- // shutdown serveStream, if any
- w.mu.Lock()
- if ws, ok := w.streams[pbresp.WatchId]; ok {
- close(ws.recvc)
- delete(w.streams, ws.id)
- }
- numStreams := len(w.streams)
- w.mu.Unlock()
- if numStreams == 0 {
- // don't leak watcher streams
- return
- }
- default:
- // dispatch to appropriate watch stream
- if ok := w.dispatchEvent(pbresp); ok {
- break
- }
- // watch response on unexpected watch id; cancel id
- if _, ok := cancelSet[pbresp.WatchId]; ok {
- break
- }
- cancelSet[pbresp.WatchId] = struct{}{}
- cr := &pb.WatchRequest_CancelRequest{
- CancelRequest: &pb.WatchCancelRequest{
- WatchId: pbresp.WatchId,
- },
- }
- req := &pb.WatchRequest{RequestUnion: cr}
- wc.Send(req)
- }
- // watch client failed to recv; spawn another if possible
- // TODO report watch client errors from errc?
- case err := <-w.errc:
- if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
- closeErr = err
- return
- }
- if wc, closeErr = w.newWatchClient(); closeErr != nil {
- return
- }
- curReqC = w.reqc
- if pendingReq != nil {
- failedReq = pendingReq
- }
- cancelSet = make(map[int64]struct{})
- case <-stopc:
- return
- }
- // send failed; queue for retry
- if failedReq != nil {
- go func(wr *watchRequest) {
- select {
- case w.reqc <- wr:
- case <-wr.ctx.Done():
- case <-w.donec:
- }
- }(pendingReq)
- failedReq = nil
- pendingReq = nil
- }
- }
- }
- // dispatchEvent sends a WatchResponse to the appropriate watcher stream
- func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
- w.mu.RLock()
- defer w.mu.RUnlock()
- ws, ok := w.streams[pbresp.WatchId]
- if !ok {
- return false
- }
- events := make([]*Event, len(pbresp.Events))
- for i, ev := range pbresp.Events {
- events[i] = (*Event)(ev)
- }
- wr := &WatchResponse{
- Header: *pbresp.Header,
- Events: events,
- CompactRevision: pbresp.CompactRevision,
- Created: pbresp.Created,
- Canceled: pbresp.Canceled,
- }
- ws.recvc <- wr
- return true
- }
- // serveWatchClient forwards messages from the grpc stream to run()
- func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
- for {
- resp, err := wc.Recv()
- if err != nil {
- select {
- case w.errc <- err:
- case <-w.donec:
- }
- return
- }
- select {
- case w.respc <- resp:
- case <-w.donec:
- return
- }
- }
- }
- // serveStream forwards watch responses from run() to the subscriber
- func (w *watchGrpcStream) serveStream(ws *watcherStream) {
- var closeErr error
- emptyWr := &WatchResponse{}
- wrs := []*WatchResponse{}
- resuming := false
- closing := false
- for !closing {
- curWr := emptyWr
- outc := ws.outc
- // ignore created event if create notify is not requested or
- // we already sent the initial created event (when we are on the resume path).
- if len(wrs) > 0 && wrs[0].Created &&
- (!ws.initReq.createdNotify || ws.lastRev != 0) {
- wrs = wrs[1:]
- }
- if len(wrs) > 0 {
- curWr = wrs[0]
- } else {
- outc = nil
- }
- select {
- case outc <- *curWr:
- if wrs[0].Err() != nil {
- closing = true
- break
- }
- var newRev int64
- if len(wrs[0].Events) > 0 {
- newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
- } else {
- newRev = wrs[0].Header.Revision
- }
- if newRev != ws.lastRev {
- ws.lastRev = newRev
- }
- wrs[0] = nil
- wrs = wrs[1:]
- case wr, ok := <-ws.recvc:
- if !ok {
- // shutdown from closeStream
- return
- }
- // resume up to last seen event if disconnected
- if resuming && wr.Err() == nil {
- resuming = false
- // trim events already seen
- for i := 0; i < len(wr.Events); i++ {
- if wr.Events[i].Kv.ModRevision > ws.lastRev {
- wr.Events = wr.Events[i:]
- break
- }
- }
- // only forward new events
- if wr.Events[0].Kv.ModRevision == ws.lastRev {
- break
- }
- }
- resuming = false
- // TODO don't keep buffering if subscriber stops reading
- wrs = append(wrs, wr)
- case resumeRev := <-ws.resumec:
- wrs = nil
- resuming = true
- if resumeRev == -1 {
- // pause serving stream while resume gets set up
- break
- }
- if resumeRev != ws.lastRev {
- panic("unexpected resume revision")
- }
- case <-w.donec:
- closing = true
- closeErr = w.closeErr
- case <-ws.initReq.ctx.Done():
- closing = true
- }
- }
- // try to send off close error
- if closeErr != nil {
- select {
- case ws.outc <- WatchResponse{closeErr: w.closeErr}:
- case <-w.donec:
- case <-time.After(closeSendErrTimeout):
- }
- }
- w.closeStream(ws)
- w.stopIfEmpty()
- // lazily send cancel message if events on missing id
- }
- func (wgs *watchGrpcStream) stopIfEmpty() {
- wgs.mu.Lock()
- if len(wgs.streams) == 0 && wgs.stopc != nil {
- close(wgs.stopc)
- wgs.stopc = nil
- }
- wgs.mu.Unlock()
- }
- func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
- ws, rerr := w.resume()
- if rerr != nil {
- return nil, rerr
- }
- go w.serveWatchClient(ws)
- return ws, nil
- }
- // resume creates a new WatchClient with all current watchers reestablished
- func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
- for {
- if ws, err = w.openWatchClient(); err != nil {
- break
- } else if err = w.resumeWatchers(ws); err == nil {
- break
- }
- }
- return ws, v3rpc.Error(err)
- }
- // openWatchClient retries opening a watchclient until retryConnection fails
- func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
- for {
- w.mu.Lock()
- stopc := w.stopc
- w.mu.Unlock()
- if stopc == nil {
- if err == nil {
- err = context.Canceled
- }
- return nil, err
- }
- if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
- break
- }
- if isHaltErr(w.ctx, err) {
- return nil, v3rpc.Error(err)
- }
- }
- return ws, nil
- }
- // resumeWatchers rebuilds every registered watcher on a new client
- func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
- w.mu.RLock()
- streams := make([]*watcherStream, 0, len(w.streams))
- for _, ws := range w.streams {
- streams = append(streams, ws)
- }
- w.mu.RUnlock()
- for _, ws := range streams {
- // drain recvc so no old WatchResponses (e.g., Created messages)
- // are processed while resuming
- ws.drain()
- // pause serveStream
- ws.resumec <- -1
- // reconstruct watcher from initial request
- if ws.lastRev != 0 {
- ws.initReq.rev = ws.lastRev
- }
- if err := wc.Send(ws.initReq.toPB()); err != nil {
- return err
- }
- // wait for request ack
- resp, err := wc.Recv()
- if err != nil {
- return err
- } else if len(resp.Events) != 0 || !resp.Created {
- return fmt.Errorf("watcher: unexpected response (%+v)", resp)
- }
- // id may be different since new remote watcher; update map
- w.mu.Lock()
- delete(w.streams, ws.id)
- ws.id = resp.WatchId
- w.streams[ws.id] = ws
- w.mu.Unlock()
- // unpause serveStream
- ws.resumec <- ws.lastRev
- }
- return nil
- }
- // drain removes all buffered WatchResponses from the stream's receive channel.
- func (ws *watcherStream) drain() {
- for {
- select {
- case <-ws.recvc:
- default:
- return
- }
- }
- }
- // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
- func (wr *watchRequest) toPB() *pb.WatchRequest {
- req := &pb.WatchCreateRequest{
- StartRevision: wr.rev,
- Key: []byte(wr.key),
- RangeEnd: []byte(wr.end),
- ProgressNotify: wr.progressNotify,
- Filters: wr.filters,
- PrevKv: wr.prevKV,
- }
- cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
- return &pb.WatchRequest{RequestUnion: cr}
- }
|