|
@@ -16,6 +16,7 @@ package v3rpc
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"io"
|
|
"io"
|
|
|
|
|
+ "sync"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/etcdserver"
|
|
"github.com/coreos/etcd/etcdserver"
|
|
@@ -72,6 +73,8 @@ type serverWatchStream struct {
|
|
|
// progress tracks the watchID that stream might need to send
|
|
// progress tracks the watchID that stream might need to send
|
|
|
// progress to.
|
|
// progress to.
|
|
|
progress map[storage.WatchID]bool
|
|
progress map[storage.WatchID]bool
|
|
|
|
|
+ // mu protects progress
|
|
|
|
|
+ mu sync.Mutex
|
|
|
|
|
|
|
|
// closec indicates the stream is closed.
|
|
// closec indicates the stream is closed.
|
|
|
closec chan struct{}
|
|
closec chan struct{}
|
|
@@ -145,7 +148,9 @@ func (sws *serverWatchStream) recvLoop() error {
|
|
|
WatchId: id,
|
|
WatchId: id,
|
|
|
Canceled: true,
|
|
Canceled: true,
|
|
|
}
|
|
}
|
|
|
|
|
+ sws.mu.Lock()
|
|
|
delete(sws.progress, storage.WatchID(id))
|
|
delete(sws.progress, storage.WatchID(id))
|
|
|
|
|
+ sws.mu.Unlock()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
// TODO: do we need to return error back to client?
|
|
// TODO: do we need to return error back to client?
|
|
@@ -200,9 +205,11 @@ func (sws *serverWatchStream) sendLoop() {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ sws.mu.Lock()
|
|
|
if _, ok := sws.progress[wresp.WatchID]; ok {
|
|
if _, ok := sws.progress[wresp.WatchID]; ok {
|
|
|
sws.progress[wresp.WatchID] = false
|
|
sws.progress[wresp.WatchID] = false
|
|
|
}
|
|
}
|
|
|
|
|
+ sws.mu.Unlock()
|
|
|
|
|
|
|
|
case c, ok := <-sws.ctrlStream:
|
|
case c, ok := <-sws.ctrlStream:
|
|
|
if !ok {
|
|
if !ok {
|