|
@@ -42,12 +42,25 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
var (
|
|
|
- // expose for testing purpose. External test can change this to a
|
|
|
|
|
- // small value to finish fast. The type is int32 instead of time.Duration
|
|
|
|
|
- // in order to placate the race detector by setting the value with atomic stores.
|
|
|
|
|
- ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes
|
|
|
|
|
|
|
+ // External test can read this with GetProgressReportInterval()
|
|
|
|
|
+ // and change this to a small value to finish fast with
|
|
|
|
|
+ // SetProgressReportInterval().
|
|
|
|
|
+ progressReportInterval = 10 * time.Minute
|
|
|
|
|
+ progressReportIntervalMu sync.RWMutex
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+func GetProgressReportInterval() time.Duration {
|
|
|
|
|
+ progressReportIntervalMu.RLock()
|
|
|
|
|
+ defer progressReportIntervalMu.RUnlock()
|
|
|
|
|
+ return progressReportInterval
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func SetProgressReportInterval(newTimeout time.Duration) {
|
|
|
|
|
+ progressReportIntervalMu.Lock()
|
|
|
|
|
+ defer progressReportIntervalMu.Unlock()
|
|
|
|
|
+ progressReportInterval = newTimeout
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
const (
|
|
const (
|
|
|
// We send ctrl response inside the read loop. We do not want
|
|
// We send ctrl response inside the read loop. We do not want
|
|
|
// send to block read, but we still want ctrl response we sent to
|
|
// send to block read, but we still want ctrl response we sent to
|
|
@@ -166,7 +179,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
// watch responses pending on a watch id creation message
|
|
// watch responses pending on a watch id creation message
|
|
|
pending := make(map[storage.WatchID][]*pb.WatchResponse)
|
|
pending := make(map[storage.WatchID][]*pb.WatchResponse)
|
|
|
|
|
|
|
|
- interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond
|
|
|
|
|
|
|
+ interval := GetProgressReportInterval()
|
|
|
progressTicker := time.NewTicker(interval)
|
|
progressTicker := time.NewTicker(interval)
|
|
|
defer progressTicker.Stop()
|
|
defer progressTicker.Stop()
|
|
|
|
|
|