123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package v3rpc
- import (
- "context"
- "io"
- "math/rand"
- "sync"
- "time"
- "go.etcd.io/etcd/auth"
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/mvcc"
- "go.etcd.io/etcd/mvcc/mvccpb"
- "go.uber.org/zap"
- )
- type watchServer struct {
- lg *zap.Logger
- clusterID int64
- memberID int64
- maxRequestBytes int
- sg etcdserver.RaftStatusGetter
- watchable mvcc.WatchableKV
- ag AuthGetter
- }
- // NewWatchServer returns a new watch server.
- func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
- return &watchServer{
- lg: s.Cfg.Logger,
- clusterID: int64(s.Cluster().ID()),
- memberID: int64(s.ID()),
- maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),
- sg: s,
- watchable: s.Watchable(),
- ag: s,
- }
- }
- var (
- // External test can read this with GetProgressReportInterval()
- // and change this to a small value to finish fast with
- // SetProgressReportInterval().
- progressReportInterval = 10 * time.Minute
- progressReportIntervalMu sync.RWMutex
- )
- // GetProgressReportInterval returns the current progress report interval (for testing).
- func GetProgressReportInterval() time.Duration {
- progressReportIntervalMu.RLock()
- interval := progressReportInterval
- progressReportIntervalMu.RUnlock()
- // add rand(1/10*progressReportInterval) as jitter so that etcdserver will not
- // send progress notifications to watchers around the same time even when watchers
- // are created around the same time (which is common when a client restarts itself).
- jitter := time.Duration(rand.Int63n(int64(interval) / 10))
- return interval + jitter
- }
- // SetProgressReportInterval updates the current progress report interval (for testing).
- func SetProgressReportInterval(newTimeout time.Duration) {
- progressReportIntervalMu.Lock()
- progressReportInterval = newTimeout
- progressReportIntervalMu.Unlock()
- }
- // We send ctrl response inside the read loop. We do not want
- // send to block read, but we still want ctrl response we sent to
- // be serialized. Thus we use a buffered chan to solve the problem.
- // A small buffer should be OK for most cases, since we expect the
- // ctrl requests are infrequent.
- const ctrlStreamBufLen = 16
- // serverWatchStream is an etcd server side stream. It receives requests
- // from client side gRPC stream. It receives watch events from mvcc.WatchStream,
- // and creates responses that forwarded to gRPC stream.
- // It also forwards control message like watch created and canceled.
- type serverWatchStream struct {
- lg *zap.Logger
- clusterID int64
- memberID int64
- maxRequestBytes int
- sg etcdserver.RaftStatusGetter
- watchable mvcc.WatchableKV
- ag AuthGetter
- gRPCStream pb.Watch_WatchServer
- watchStream mvcc.WatchStream
- ctrlStream chan *pb.WatchResponse
- // mu protects progress, prevKV, fragment
- mu sync.RWMutex
- // tracks the watchID that stream might need to send progress to
- // TODO: combine progress and prevKV into a single struct?
- progress map[mvcc.WatchID]bool
- // record watch IDs that need return previous key-value pair
- prevKV map[mvcc.WatchID]bool
- // records fragmented watch IDs
- fragment map[mvcc.WatchID]bool
- // closec indicates the stream is closed.
- closec chan struct{}
- // wg waits for the send loop to complete
- wg sync.WaitGroup
- }
- func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
- sws := serverWatchStream{
- lg: ws.lg,
- clusterID: ws.clusterID,
- memberID: ws.memberID,
- maxRequestBytes: ws.maxRequestBytes,
- sg: ws.sg,
- watchable: ws.watchable,
- ag: ws.ag,
- gRPCStream: stream,
- watchStream: ws.watchable.NewWatchStream(),
- // chan for sending control response like watcher created and canceled.
- ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
- progress: make(map[mvcc.WatchID]bool),
- prevKV: make(map[mvcc.WatchID]bool),
- fragment: make(map[mvcc.WatchID]bool),
- closec: make(chan struct{}),
- }
- sws.wg.Add(1)
- go func() {
- sws.sendLoop()
- sws.wg.Done()
- }()
- errc := make(chan error, 1)
- // Ideally recvLoop would also use sws.wg to signal its completion
- // but when stream.Context().Done() is closed, the stream's recv
- // may continue to block since it uses a different context, leading to
- // deadlock when calling sws.close().
- go func() {
- if rerr := sws.recvLoop(); rerr != nil {
- if isClientCtxErr(stream.Context().Err(), rerr) {
- if sws.lg != nil {
- sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
- } else {
- plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
- } else {
- plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
- }
- streamFailures.WithLabelValues("receive", "watch").Inc()
- }
- errc <- rerr
- }
- }()
- select {
- case err = <-errc:
- close(sws.ctrlStream)
- case <-stream.Context().Done():
- err = stream.Context().Err()
- // the only server-side cancellation is noleader for now.
- if err == context.Canceled {
- err = rpctypes.ErrGRPCNoLeader
- }
- }
- sws.close()
- return err
- }
- func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool {
- authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
- if err != nil {
- return false
- }
- if authInfo == nil {
- // if auth is enabled, IsRangePermitted() can cause an error
- authInfo = &auth.AuthInfo{}
- }
- return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
- }
- func (sws *serverWatchStream) recvLoop() error {
- for {
- req, err := sws.gRPCStream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- switch uv := req.RequestUnion.(type) {
- case *pb.WatchRequest_CreateRequest:
- if uv.CreateRequest == nil {
- break
- }
- creq := uv.CreateRequest
- if len(creq.Key) == 0 {
- // \x00 is the smallest key
- creq.Key = []byte{0}
- }
- if len(creq.RangeEnd) == 0 {
- // force nil since watchstream.Watch distinguishes
- // between nil and []byte{} for single key / >=
- creq.RangeEnd = nil
- }
- if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
- // support >= key queries
- creq.RangeEnd = []byte{}
- }
- if !sws.isWatchPermitted(creq) {
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: creq.WatchId,
- Canceled: true,
- Created: true,
- CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
- }
- select {
- case sws.ctrlStream <- wr:
- case <-sws.closec:
- }
- return nil
- }
- filters := FiltersFromRequest(creq)
- wsrev := sws.watchStream.Rev()
- rev := creq.StartRevision
- if rev == 0 {
- rev = wsrev + 1
- }
- id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
- if err == nil {
- sws.mu.Lock()
- if creq.ProgressNotify {
- sws.progress[id] = true
- }
- if creq.PrevKv {
- sws.prevKV[id] = true
- }
- if creq.Fragment {
- sws.fragment[id] = true
- }
- sws.mu.Unlock()
- }
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wsrev),
- WatchId: int64(id),
- Created: true,
- Canceled: err != nil,
- }
- if err != nil {
- wr.CancelReason = err.Error()
- }
- select {
- case sws.ctrlStream <- wr:
- case <-sws.closec:
- return nil
- }
- case *pb.WatchRequest_CancelRequest:
- if uv.CancelRequest != nil {
- id := uv.CancelRequest.WatchId
- err := sws.watchStream.Cancel(mvcc.WatchID(id))
- if err == nil {
- sws.ctrlStream <- &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: id,
- Canceled: true,
- }
- sws.mu.Lock()
- delete(sws.progress, mvcc.WatchID(id))
- delete(sws.prevKV, mvcc.WatchID(id))
- delete(sws.fragment, mvcc.WatchID(id))
- sws.mu.Unlock()
- }
- }
- case *pb.WatchRequest_ProgressRequest:
- if uv.ProgressRequest != nil {
- sws.ctrlStream <- &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
- }
- }
- default:
- // we probably should not shutdown the entire stream when
- // receive an valid command.
- // so just do nothing instead.
- continue
- }
- }
- }
- func (sws *serverWatchStream) sendLoop() {
- // watch ids that are currently active
- ids := make(map[mvcc.WatchID]struct{})
- // watch responses pending on a watch id creation message
- pending := make(map[mvcc.WatchID][]*pb.WatchResponse)
- interval := GetProgressReportInterval()
- progressTicker := time.NewTicker(interval)
- defer func() {
- progressTicker.Stop()
- // drain the chan to clean up pending events
- for ws := range sws.watchStream.Chan() {
- mvcc.ReportEventReceived(len(ws.Events))
- }
- for _, wrs := range pending {
- for _, ws := range wrs {
- mvcc.ReportEventReceived(len(ws.Events))
- }
- }
- }()
- for {
- select {
- case wresp, ok := <-sws.watchStream.Chan():
- if !ok {
- return
- }
- // TODO: evs is []mvccpb.Event type
- // either return []*mvccpb.Event from the mvcc package
- // or define protocol buffer with []mvccpb.Event.
- evs := wresp.Events
- events := make([]*mvccpb.Event, len(evs))
- sws.mu.RLock()
- needPrevKV := sws.prevKV[wresp.WatchID]
- sws.mu.RUnlock()
- for i := range evs {
- events[i] = &evs[i]
- if needPrevKV {
- opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
- r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
- if err == nil && len(r.KVs) != 0 {
- events[i].PrevKv = &(r.KVs[0])
- }
- }
- }
- canceled := wresp.CompactRevision != 0
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wresp.Revision),
- WatchId: int64(wresp.WatchID),
- Events: events,
- CompactRevision: wresp.CompactRevision,
- Canceled: canceled,
- }
- if _, okID := ids[wresp.WatchID]; !okID {
- // buffer if id not yet announced
- wrs := append(pending[wresp.WatchID], wr)
- pending[wresp.WatchID] = wrs
- continue
- }
- mvcc.ReportEventReceived(len(evs))
- sws.mu.RLock()
- fragmented, ok := sws.fragment[wresp.WatchID]
- sws.mu.RUnlock()
- var serr error
- if !fragmented && !ok {
- serr = sws.gRPCStream.Send(wr)
- } else {
- serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
- }
- if serr != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), serr) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send watch response to gRPC stream", zap.Error(serr))
- } else {
- plog.Debugf("failed to send watch response to gRPC stream (%q)", serr.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send watch response to gRPC stream", zap.Error(serr))
- } else {
- plog.Warningf("failed to send watch response to gRPC stream (%q)", serr.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
- sws.mu.Lock()
- if len(evs) > 0 && sws.progress[wresp.WatchID] {
- // elide next progress update if sent a key update
- sws.progress[wresp.WatchID] = false
- }
- sws.mu.Unlock()
- case c, ok := <-sws.ctrlStream:
- if !ok {
- return
- }
- if err := sws.gRPCStream.Send(c); err != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
- // track id creation
- wid := mvcc.WatchID(c.WatchId)
- if c.Canceled {
- delete(ids, wid)
- continue
- }
- if c.Created {
- // flush buffered events
- ids[wid] = struct{}{}
- for _, v := range pending[wid] {
- mvcc.ReportEventReceived(len(v.Events))
- if err := sws.gRPCStream.Send(v); err != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send pending watch response to gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send pending watch response to gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
- }
- delete(pending, wid)
- }
- case <-progressTicker.C:
- sws.mu.Lock()
- for id, ok := range sws.progress {
- if ok {
- sws.watchStream.RequestProgress(id)
- }
- sws.progress[id] = true
- }
- sws.mu.Unlock()
- case <-sws.closec:
- return
- }
- }
- }
- func sendFragments(
- wr *pb.WatchResponse,
- maxRequestBytes int,
- sendFunc func(*pb.WatchResponse) error) error {
- // no need to fragment if total request size is smaller
- // than max request limit or response contains only one event
- if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
- return sendFunc(wr)
- }
- ow := *wr
- ow.Events = make([]*mvccpb.Event, 0)
- ow.Fragment = true
- var idx int
- for {
- cur := ow
- for _, ev := range wr.Events[idx:] {
- cur.Events = append(cur.Events, ev)
- if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
- cur.Events = cur.Events[:len(cur.Events)-1]
- break
- }
- idx++
- }
- if idx == len(wr.Events) {
- // last response has no more fragment
- cur.Fragment = false
- }
- if err := sendFunc(&cur); err != nil {
- return err
- }
- if !cur.Fragment {
- break
- }
- }
- return nil
- }
- func (sws *serverWatchStream) close() {
- sws.watchStream.Close()
- close(sws.closec)
- sws.wg.Wait()
- }
- func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
- return &pb.ResponseHeader{
- ClusterId: uint64(sws.clusterID),
- MemberId: uint64(sws.memberID),
- Revision: rev,
- RaftTerm: sws.sg.Term(),
- }
- }
- func filterNoDelete(e mvccpb.Event) bool {
- return e.Type == mvccpb.DELETE
- }
- func filterNoPut(e mvccpb.Event) bool {
- return e.Type == mvccpb.PUT
- }
- // FiltersFromRequest returns "mvcc.FilterFunc" from a given watch create request.
- func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
- filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
- for _, ft := range creq.Filters {
- switch ft {
- case pb.WatchCreateRequest_NOPUT:
- filters = append(filters, filterNoPut)
- case pb.WatchCreateRequest_NODELETE:
- filters = append(filters, filterNoDelete)
- default:
- }
- }
- return filters
- }
|