Browse Source

storage, v3: pass compaction revision through watchresponse

Anthony Romano 10 years ago
parent
commit
ee1a03167d

+ 2 - 2
clientv3/watch.go

@@ -166,7 +166,7 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 	if pendingReq == nil {
 		// no pending request; ignore
 		return
-	} else if resp.WatchId == -1 || resp.Compacted {
+	} else if resp.WatchId == -1 || resp.CompactRevision != 0 {
 		// failed; no channel
 		pendingReq.retc <- nil
 		return
@@ -238,7 +238,7 @@ func (w *watcher) run() {
 			switch {
 			case pbresp.Canceled:
 				delete(cancelSet, pbresp.WatchId)
-			case pbresp.Compacted:
+			case pbresp.CompactRevision != 0:
 				w.mu.Lock()
 				if ws, ok := w.streams[pbresp.WatchId]; ok {
 					w.closeStream(ws)

+ 4 - 4
etcdserver/api/v3rpc/watch.go

@@ -146,10 +146,10 @@ func (sws *serverWatchStream) sendLoop() {
 			}
 
 			err := sws.gRPCStream.Send(&pb.WatchResponse{
-				Header:    sws.newResponseHeader(wresp.Revision),
-				WatchId:   int64(wresp.WatchID),
-				Events:    events,
-				Compacted: wresp.Compacted,
+				Header:          sws.newResponseHeader(wresp.Revision),
+				WatchId:         int64(wresp.WatchID),
+				Events:          events,
+				CompactRevision: wresp.CompactRevision,
 			})
 			storage.ReportEventReceived()
 			if err != nil {

+ 11 - 16
etcdserver/etcdserverpb/rpc.pb.go

@@ -890,15 +890,16 @@ type WatchResponse struct {
 	// If the response is for a cancel watch request, cancel is set to true.
 	// No further events will be sent to the canceled watching.
 	Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"`
-	// If a watching tries to watch at a compacted index, compacted will be set to true.
+	// CompactRevision is set to the minimum index if a watching tries to watch
+	// at a compacted index.
 	//
 	// This happens when creating a watching at a compacted revision or the watching cannot
 	// catch up with the progress of the KV.
 	//
 	// Client should treat the watching as canceled and should not try to create any
 	// watching with same start_revision again.
-	Compacted bool               `protobuf:"varint,5,opt,name=compacted,proto3" json:"compacted,omitempty"`
-	Events    []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"`
+	CompactRevision int64              `protobuf:"varint,5,opt,name=compact_revision,proto3" json:"compact_revision,omitempty"`
+	Events          []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"`
 }
 
 func (m *WatchResponse) Reset()         { *m = WatchResponse{} }
@@ -2651,15 +2652,10 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) {
 		}
 		i++
 	}
-	if m.Compacted {
+	if m.CompactRevision != 0 {
 		data[i] = 0x28
 		i++
-		if m.Compacted {
-			data[i] = 1
-		} else {
-			data[i] = 0
-		}
-		i++
+		i = encodeVarintRpc(data, i, uint64(m.CompactRevision))
 	}
 	if len(m.Events) > 0 {
 		for _, msg := range m.Events {
@@ -3601,8 +3597,8 @@ func (m *WatchResponse) Size() (n int) {
 	if m.Canceled {
 		n += 2
 	}
-	if m.Compacted {
-		n += 2
+	if m.CompactRevision != 0 {
+		n += 1 + sovRpc(uint64(m.CompactRevision))
 	}
 	if len(m.Events) > 0 {
 		for _, e := range m.Events {
@@ -6185,9 +6181,9 @@ func (m *WatchResponse) Unmarshal(data []byte) error {
 			m.Canceled = bool(v != 0)
 		case 5:
 			if wireType != 0 {
-				return fmt.Errorf("proto: wrong wireType = %d for field Compacted", wireType)
+				return fmt.Errorf("proto: wrong wireType = %d for field CompactRevision", wireType)
 			}
-			var v int
+			m.CompactRevision = 0
 			for shift := uint(0); ; shift += 7 {
 				if shift >= 64 {
 					return ErrIntOverflowRpc
@@ -6197,12 +6193,11 @@ func (m *WatchResponse) Unmarshal(data []byte) error {
 				}
 				b := data[iNdEx]
 				iNdEx++
-				v |= (int(b) & 0x7F) << shift
+				m.CompactRevision |= (int64(b) & 0x7F) << shift
 				if b < 0x80 {
 					break
 				}
 			}
-			m.Compacted = bool(v != 0)
 		case 11:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType)

+ 3 - 2
etcdserver/etcdserverpb/rpc.proto

@@ -282,14 +282,15 @@ message WatchResponse {
   // If the response is for a cancel watch request, cancel is set to true.
   // No further events will be sent to the canceled watching.
   bool canceled = 4;
-  // If a watching tries to watch at a compacted index, compacted will be set to true.
+  // CompactRevision is set to the minimum index if a watching tries to watch
+  // at a compacted index.
   //
   // This happens when creating a watching at a compacted revision or the watching cannot
   // catch up with the progress of the KV.
   //
   // Client should treat the watching as canceled and should not try to create any
   // watching with same start_revision again.
-  bool compacted = 5;
+  int64 compact_revision  = 5;
 
   repeated storagepb.Event events = 11;
 }

+ 1 - 1
storage/watchable_store.go

@@ -300,7 +300,7 @@ func (s *watchableStore) syncWatchers() {
 
 			if w.cur < compactionRev {
 				select {
-				case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}:
+				case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactionRev}:
 					s.unsynced.delete(w)
 				default:
 					// retry next time

+ 2 - 2
storage/watchable_store_test.go

@@ -247,8 +247,8 @@ func TestWatchCompacted(t *testing.T) {
 		if resp.WatchID != wt {
 			t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
 		}
-		if resp.Compacted != true {
-			t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true)
+		if resp.CompactRevision == 0 {
+			t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, compactRev)
 		}
 	case <-time.After(1 * time.Second):
 		t.Fatalf("failed to receive response (timeout)")

+ 2 - 2
storage/watcher.go

@@ -68,8 +68,8 @@ type WatchResponse struct {
 	// inside Events.
 	Revision int64
 
-	// Compacted is set when the watcher is cancelled due to compaction.
-	Compacted bool
+	// CompactRevision is set when the watcher is cancelled due to compaction.
+	CompactRevision int64
 }
 
 // watchStream contains a collection of watchers that share