|
|
@@ -32,25 +32,27 @@ import (
|
|
|
)
|
|
|
|
|
|
type watchServer struct {
|
|
|
+ lg *zap.Logger
|
|
|
+
|
|
|
clusterID int64
|
|
|
memberID int64
|
|
|
+
|
|
|
sg etcdserver.RaftStatusGetter
|
|
|
watchable mvcc.WatchableKV
|
|
|
-
|
|
|
- ag AuthGetter
|
|
|
-
|
|
|
- lg *zap.Logger
|
|
|
+ ag AuthGetter
|
|
|
}
|
|
|
|
|
|
// NewWatchServer returns a new watch server.
|
|
|
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
|
|
return &watchServer{
|
|
|
+ lg: s.Cfg.Logger,
|
|
|
+
|
|
|
clusterID: int64(s.Cluster().ID()),
|
|
|
memberID: int64(s.ID()),
|
|
|
+
|
|
|
sg: s,
|
|
|
watchable: s.Watchable(),
|
|
|
ag: s,
|
|
|
- lg: s.Cfg.Logger,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -79,8 +81,8 @@ func GetProgressReportInterval() time.Duration {
|
|
|
// SetProgressReportInterval updates the current progress report interval (for testing).
|
|
|
func SetProgressReportInterval(newTimeout time.Duration) {
|
|
|
progressReportIntervalMu.Lock()
|
|
|
- defer progressReportIntervalMu.Unlock()
|
|
|
progressReportInterval = newTimeout
|
|
|
+ progressReportIntervalMu.Unlock()
|
|
|
}
|
|
|
|
|
|
// We send ctrl response inside the read loop. We do not want
|
|
|
@@ -95,54 +97,54 @@ const ctrlStreamBufLen = 16
|
|
|
// and creates responses that forwarded to gRPC stream.
|
|
|
// It also forwards control message like watch created and canceled.
|
|
|
type serverWatchStream struct {
|
|
|
+ lg *zap.Logger
|
|
|
+
|
|
|
clusterID int64
|
|
|
memberID int64
|
|
|
- sg etcdserver.RaftStatusGetter
|
|
|
|
|
|
+ sg etcdserver.RaftStatusGetter
|
|
|
watchable mvcc.WatchableKV
|
|
|
+ ag AuthGetter
|
|
|
|
|
|
gRPCStream pb.Watch_WatchServer
|
|
|
watchStream mvcc.WatchStream
|
|
|
ctrlStream chan *pb.WatchResponse
|
|
|
|
|
|
// mu protects progress, prevKV
|
|
|
- mu sync.Mutex
|
|
|
- // progress tracks the watchID that stream might need to send
|
|
|
- // progress to.
|
|
|
+ mu sync.RWMutex
|
|
|
+ // tracks the watchID that stream might need to send progress to
|
|
|
// TODO: combine progress and prevKV into a single struct?
|
|
|
progress map[mvcc.WatchID]bool
|
|
|
- prevKV map[mvcc.WatchID]bool
|
|
|
+ // record watch IDs that need return previous key-value pair
|
|
|
+ prevKV map[mvcc.WatchID]bool
|
|
|
|
|
|
// closec indicates the stream is closed.
|
|
|
closec chan struct{}
|
|
|
|
|
|
// wg waits for the send loop to complete
|
|
|
wg sync.WaitGroup
|
|
|
-
|
|
|
- ag AuthGetter
|
|
|
-
|
|
|
- lg *zap.Logger
|
|
|
}
|
|
|
|
|
|
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
sws := serverWatchStream{
|
|
|
+ lg: ws.lg,
|
|
|
+
|
|
|
clusterID: ws.clusterID,
|
|
|
memberID: ws.memberID,
|
|
|
- sg: ws.sg,
|
|
|
|
|
|
+ sg: ws.sg,
|
|
|
watchable: ws.watchable,
|
|
|
+ ag: ws.ag,
|
|
|
|
|
|
gRPCStream: stream,
|
|
|
watchStream: ws.watchable.NewWatchStream(),
|
|
|
// chan for sending control response like watcher created and canceled.
|
|
|
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
|
|
|
- progress: make(map[mvcc.WatchID]bool),
|
|
|
- prevKV: make(map[mvcc.WatchID]bool),
|
|
|
- closec: make(chan struct{}),
|
|
|
|
|
|
- ag: ws.ag,
|
|
|
+ progress: make(map[mvcc.WatchID]bool),
|
|
|
+ prevKV: make(map[mvcc.WatchID]bool),
|
|
|
|
|
|
- lg: ws.lg,
|
|
|
+ closec: make(chan struct{}),
|
|
|
}
|
|
|
|
|
|
sws.wg.Add(1)
|
|
|
@@ -174,9 +176,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
errc <- rerr
|
|
|
}
|
|
|
}()
|
|
|
+
|
|
|
select {
|
|
|
case err = <-errc:
|
|
|
close(sws.ctrlStream)
|
|
|
+
|
|
|
case <-stream.Context().Done():
|
|
|
err = stream.Context().Err()
|
|
|
// the only server-side cancellation is noleader for now.
|
|
|
@@ -184,6 +188,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
|
err = rpctypes.ErrGRPCNoLeader
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
sws.close()
|
|
|
return err
|
|
|
}
|
|
|
@@ -197,7 +202,6 @@ func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool
|
|
|
// if auth is enabled, IsRangePermitted() can cause an error
|
|
|
authInfo = &auth.AuthInfo{}
|
|
|
}
|
|
|
-
|
|
|
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
|
|
|
}
|
|
|
|
|
|
@@ -280,6 +284,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|
|
case <-sws.closec:
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
case *pb.WatchRequest_CancelRequest:
|
|
|
if uv.CancelRequest != nil {
|
|
|
id := uv.CancelRequest.WatchId
|
|
|
@@ -296,6 +301,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|
|
sws.mu.Unlock()
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
default:
|
|
|
// we probably should not shutdown the entire stream when
|
|
|
// receive an valid command.
|
|
|
@@ -339,12 +345,11 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
// or define protocol buffer with []mvccpb.Event.
|
|
|
evs := wresp.Events
|
|
|
events := make([]*mvccpb.Event, len(evs))
|
|
|
- sws.mu.Lock()
|
|
|
+ sws.mu.RLock()
|
|
|
needPrevKV := sws.prevKV[wresp.WatchID]
|
|
|
- sws.mu.Unlock()
|
|
|
+ sws.mu.RUnlock()
|
|
|
for i := range evs {
|
|
|
events[i] = &evs[i]
|
|
|
-
|
|
|
if needPrevKV {
|
|
|
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
|
|
|
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
|