|
|
@@ -42,6 +42,7 @@ type watchServer struct {
|
|
|
lg *zap.Logger
|
|
|
}
|
|
|
|
|
|
+// NewWatchServer returns a new watch server.
|
|
|
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
|
|
return &watchServer{
|
|
|
clusterID: int64(s.Cluster().ID()),
|
|
|
@@ -61,6 +62,7 @@ var (
|
|
|
progressReportIntervalMu sync.RWMutex
|
|
|
)
|
|
|
|
|
|
+// GetProgressReportInterval returns the current progress report interval (for testing).
|
|
|
func GetProgressReportInterval() time.Duration {
|
|
|
progressReportIntervalMu.RLock()
|
|
|
interval := progressReportInterval
|
|
|
@@ -74,20 +76,19 @@ func GetProgressReportInterval() time.Duration {
|
|
|
return interval + jitter
|
|
|
}
|
|
|
|
|
|
+// SetProgressReportInterval updates the current progress report interval (for testing).
|
|
|
func SetProgressReportInterval(newTimeout time.Duration) {
|
|
|
progressReportIntervalMu.Lock()
|
|
|
defer progressReportIntervalMu.Unlock()
|
|
|
progressReportInterval = newTimeout
|
|
|
}
|
|
|
|
|
|
-const (
|
|
|
- // We send ctrl response inside the read loop. We do not want
|
|
|
- // send to block read, but we still want ctrl response we sent to
|
|
|
- // be serialized. Thus we use a buffered chan to solve the problem.
|
|
|
- // A small buffer should be OK for most cases, since we expect the
|
|
|
- // ctrl requests are infrequent.
|
|
|
- ctrlStreamBufLen = 16
|
|
|
-)
|
|
|
+// We send ctrl response inside the read loop. We do not want
|
|
|
+// send to block read, but we still want ctrl response we sent to
|
|
|
+// be serialized. Thus we use a buffered chan to solve the problem.
|
|
|
+// A small buffer should be OK for most cases, since we expect the
|
|
|
+// ctrl requests are infrequent.
|
|
|
+const ctrlStreamBufLen = 16
|
|
|
|
|
|
// serverWatchStream is an etcd server side stream. It receives requests
|
|
|
// from client side gRPC stream. It receives watch events from mvcc.WatchStream,
|
|
|
@@ -362,7 +363,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
Canceled: canceled,
|
|
|
}
|
|
|
|
|
|
- if _, hasId := ids[wresp.WatchID]; !hasId {
|
|
|
+ if _, okID := ids[wresp.WatchID]; !okID {
|
|
|
// buffer if id not yet announced
|
|
|
wrs := append(pending[wresp.WatchID], wr)
|
|
|
pending[wresp.WatchID] = wrs
|
|
|
@@ -446,6 +447,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
}
|
|
|
delete(pending, wid)
|
|
|
}
|
|
|
+
|
|
|
case <-progressTicker.C:
|
|
|
sws.mu.Lock()
|
|
|
for id, ok := range sws.progress {
|
|
|
@@ -455,6 +457,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
sws.progress[id] = true
|
|
|
}
|
|
|
sws.mu.Unlock()
|
|
|
+
|
|
|
case <-sws.closec:
|
|
|
return
|
|
|
}
|
|
|
@@ -484,6 +487,7 @@ func filterNoPut(e mvccpb.Event) bool {
|
|
|
return e.Type == mvccpb.PUT
|
|
|
}
|
|
|
|
|
|
+// FiltersFromRequest returns "mvcc.FilterFunc" from a given watch create request.
|
|
|
func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
|
|
|
filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
|
|
|
for _, ft := range creq.Filters {
|