瀏覽代碼

Merge pull request #8283 from heyitsanthony/cancel-compact-rpc

v3rpc: set Canceled=true on compacted watch
Anthony Romano 8 年之前
父節點
當前提交
16943f04e2
共有 4 個文件被更改,包括 7 次插入1 次删除
  1. 3 0
      clientv3/integration/watch_test.go
  2. 1 1
      clientv3/watch.go
  3. 2 0
      etcdserver/api/v3rpc/watch.go
  4. 1 0
      proxy/grpcproxy/watcher.go

+ 3 - 0
clientv3/integration/watch_test.go

@@ -514,6 +514,9 @@ func TestWatchCompactRevision(t *testing.T) {
 	if wresp.Err() != rpctypes.ErrCompacted {
 		t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
 	}
+	if !wresp.Canceled {
+		t.Fatalf("wresp.Canceled expected true, got %+v", wresp)
+	}
 
 	// ensure the channel is closed
 	if wresp, ok = <-wch; ok {

+ 1 - 1
clientv3/watch.go

@@ -461,7 +461,7 @@ func (w *watchGrpcStream) run() {
 				if ws := w.nextResume(); ws != nil {
 					wc.Send(ws.initReq.toPB())
 				}
-			case pbresp.Canceled:
+			case pbresp.Canceled && pbresp.CompactRevision == 0:
 				delete(cancelSet, pbresp.WatchId)
 				if ws, ok := w.substreams[pbresp.WatchId]; ok {
 					// signal to stream goroutine to update closingc

+ 2 - 0
etcdserver/api/v3rpc/watch.go

@@ -321,11 +321,13 @@ func (sws *serverWatchStream) sendLoop() {
 				}
 			}
 
+			canceled := wresp.CompactRevision != 0
 			wr := &pb.WatchResponse{
 				Header:          sws.newResponseHeader(wresp.Revision),
 				WatchId:         int64(wresp.WatchID),
 				Events:          events,
 				CompactRevision: wresp.CompactRevision,
+				Canceled:        canceled,
 			}
 
 			if _, hasId := ids[wresp.WatchID]; !hasId {

+ 1 - 0
proxy/grpcproxy/watcher.go

@@ -111,6 +111,7 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
 		Header:          &wr.Header,
 		Created:         wr.Created,
 		CompactRevision: wr.CompactRevision,
+		Canceled:        wr.Canceled,
 		WatchId:         w.id,
 		Events:          events,
 	})