|
|
@@ -24,18 +24,20 @@ import (
|
|
|
storagepb "github.com/coreos/etcd/storage/storagepb"
|
|
|
)
|
|
|
|
|
|
+type WatchChan <-chan WatchResponse
|
|
|
+
|
|
|
type Watcher interface {
|
|
|
// Watch watches on a single key. The watched events will be returned
|
|
|
// through the returned channel.
|
|
|
// If the watch is slow or the required rev is compacted, the watch request
|
|
|
// might be canceled from the server-side and the chan will be closed.
|
|
|
- Watch(cxt context.Context, key string, rev int64) <-chan WatchResponse
|
|
|
+ Watch(ctx context.Context, key string, rev int64) WatchChan
|
|
|
|
|
|
- // Watch watches on a prefix. The watched events will be returned
|
|
|
+ // WatchPrefix watches on a prefix. The watched events will be returned
|
|
|
// through the returned channel.
|
|
|
// If the watch is slow or the required rev is compacted, the watch request
|
|
|
// might be canceled from the server-side and the chan will be closed.
|
|
|
- WatchPrefix(cxt context.Context, prefix string, rev int64) <-chan WatchResponse
|
|
|
+ WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan
|
|
|
|
|
|
// Close closes the watcher and cancels all watch requests.
|
|
|
Close() error
|
|
|
@@ -44,6 +46,9 @@ type Watcher interface {
|
|
|
type WatchResponse struct {
|
|
|
Header pb.ResponseHeader
|
|
|
Events []*storagepb.Event
|
|
|
+ // CompactRevision is set to the compaction revision that
|
|
|
+ // caused the watcher to cancel.
|
|
|
+ CompactRevision int64
|
|
|
}
|
|
|
|
|
|
// watcher implements the Watcher interface
|
|
|
@@ -122,11 +127,11 @@ func NewWatcher(c *Client) Watcher {
|
|
|
return w
|
|
|
}
|
|
|
|
|
|
-func (w *watcher) Watch(ctx context.Context, key string, rev int64) <-chan WatchResponse {
|
|
|
+func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan {
|
|
|
return w.watch(ctx, key, "", rev)
|
|
|
}
|
|
|
|
|
|
-func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) <-chan WatchResponse {
|
|
|
+func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan {
|
|
|
return w.watch(ctx, "", prefix, rev)
|
|
|
}
|
|
|
|
|
|
@@ -140,7 +145,7 @@ func (w *watcher) Close() error {
|
|
|
}
|
|
|
|
|
|
// watch posts a watch request to run() and waits for a new watcher channel
|
|
|
-func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) <-chan WatchResponse {
|
|
|
+func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan {
|
|
|
retc := make(chan chan WatchResponse, 1)
|
|
|
wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc}
|
|
|
// submit request
|
|
|
@@ -166,7 +171,18 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
|
|
|
if pendingReq == nil {
|
|
|
// no pending request; ignore
|
|
|
return
|
|
|
- } else if resp.WatchId == -1 || resp.Compacted {
|
|
|
+ }
|
|
|
+ if resp.CompactRevision != 0 {
|
|
|
+ // compaction after start revision
|
|
|
+ ret := make(chan WatchResponse, 1)
|
|
|
+ ret <- WatchResponse{
|
|
|
+ Header: *resp.Header,
|
|
|
+ CompactRevision: resp.CompactRevision}
|
|
|
+ close(ret)
|
|
|
+ pendingReq.retc <- ret
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if resp.WatchId == -1 {
|
|
|
// failed; no channel
|
|
|
pendingReq.retc <- nil
|
|
|
return
|
|
|
@@ -238,12 +254,6 @@ func (w *watcher) run() {
|
|
|
switch {
|
|
|
case pbresp.Canceled:
|
|
|
delete(cancelSet, pbresp.WatchId)
|
|
|
- case pbresp.Compacted:
|
|
|
- w.mu.Lock()
|
|
|
- if ws, ok := w.streams[pbresp.WatchId]; ok {
|
|
|
- w.closeStream(ws)
|
|
|
- }
|
|
|
- w.mu.Unlock()
|
|
|
case pbresp.Created:
|
|
|
// response to pending req, try to add
|
|
|
w.addStream(pbresp, pendingReq)
|
|
|
@@ -305,7 +315,10 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|
|
defer w.mu.RUnlock()
|
|
|
ws, ok := w.streams[pbresp.WatchId]
|
|
|
if ok {
|
|
|
- wr := &WatchResponse{*pbresp.Header, pbresp.Events}
|
|
|
+ wr := &WatchResponse{
|
|
|
+ Header: *pbresp.Header,
|
|
|
+ Events: pbresp.Events,
|
|
|
+ CompactRevision: pbresp.CompactRevision}
|
|
|
ws.recvc <- wr
|
|
|
}
|
|
|
return ok
|
|
|
@@ -346,6 +359,11 @@ func (w *watcher) serveStream(ws *watcherStream) {
|
|
|
}
|
|
|
select {
|
|
|
case outc <- *curWr:
|
|
|
+ if len(wrs[0].Events) == 0 {
|
|
|
+ // compaction message
|
|
|
+ closing = true
|
|
|
+ break
|
|
|
+ }
|
|
|
newRev := wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
|
|
|
if newRev != ws.lastRev {
|
|
|
ws.lastRev = newRev
|