Browse Source

*: update watch related proto

1. Add watch/cancel request
2. Add necessary fields in response to return watch error
3. Add watch_id into watch response
Xiang Li 10 years ago
parent
commit
ac330bb7c9

+ 2 - 2
etcdctlv3/command/watch_command.go

@@ -74,9 +74,9 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
 		var r *pb.WatchRequest
 		switch segs[0] {
 		case "watch":
-			r = &pb.WatchRequest{Key: []byte(segs[1])}
+			r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte(segs[1])}}
 		case "watchprefix":
-			r = &pb.WatchRequest{Prefix: []byte(segs[1])}
+			r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte(segs[1])}}
 		default:
 			fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n")
 			continue

+ 13 - 7
etcdserver/api/v3rpc/watch.go

@@ -48,14 +48,20 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
 			return err
 		}
 
-		var prefix bool
-		toWatch := req.Key
-		if len(req.Key) == 0 {
-			toWatch = req.Prefix
-			prefix = true
+		switch {
+		case req.CreateRequest != nil:
+			creq := req.CreateRequest
+			var prefix bool
+			toWatch := creq.Key
+			if len(creq.Key) == 0 {
+				toWatch = creq.Prefix
+				prefix = true
+			}
+			watcher.Watch(toWatch, prefix, creq.StartRevision)
+		default:
+			// TODO: support cancellation
+			panic("not implemented")
 		}
-		// TODO: support cancellation
-		watcher.Watch(toWatch, prefix, req.StartRevision)
 	}
 }
 

+ 429 - 16
etcdserver/etcdserverpb/rpc.pb.go

@@ -355,6 +355,29 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader {
 }
 
 type WatchRequest struct {
+	CreateRequest *WatchCreateRequest `protobuf:"bytes,1,opt,name=create_request" json:"create_request,omitempty"`
+	CancelRequest *WatchCancelRequest `protobuf:"bytes,2,opt,name=cancel_request" json:"cancel_request,omitempty"`
+}
+
+func (m *WatchRequest) Reset()         { *m = WatchRequest{} }
+func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
+func (*WatchRequest) ProtoMessage()    {}
+
+func (m *WatchRequest) GetCreateRequest() *WatchCreateRequest {
+	if m != nil {
+		return m.CreateRequest
+	}
+	return nil
+}
+
+func (m *WatchRequest) GetCancelRequest() *WatchCancelRequest {
+	if m != nil {
+		return m.CancelRequest
+	}
+	return nil
+}
+
+type WatchCreateRequest struct {
 	// the key to be watched
 	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
 	// the prefix to be watched.
@@ -363,13 +386,39 @@ type WatchRequest struct {
 	StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"`
 }
 
-func (m *WatchRequest) Reset()         { *m = WatchRequest{} }
-func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
-func (*WatchRequest) ProtoMessage()    {}
+func (m *WatchCreateRequest) Reset()         { *m = WatchCreateRequest{} }
+func (m *WatchCreateRequest) String() string { return proto.CompactTextString(m) }
+func (*WatchCreateRequest) ProtoMessage()    {}
+
+type WatchCancelRequest struct {
+	WatchId int64 `protobuf:"varint,1,opt,name=watch_id,proto3" json:"watch_id,omitempty"`
+}
+
+func (m *WatchCancelRequest) Reset()         { *m = WatchCancelRequest{} }
+func (m *WatchCancelRequest) String() string { return proto.CompactTextString(m) }
+func (*WatchCancelRequest) ProtoMessage()    {}
 
 type WatchResponse struct {
-	Header *ResponseHeader    `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
-	Events []*storagepb.Event `protobuf:"bytes,2,rep,name=events" json:"events,omitempty"`
+	Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
+	// watch_id is the ID of the watching the response sent to.
+	WatchId int64 `protobuf:"varint,2,opt,name=watch_id,proto3" json:"watch_id,omitempty"`
+	// If the response is for a create watch request, created is set to true.
+	// Client should record the watch_id and prepare for receiving events for
+	// that watching from the same stream.
+	// All events sent to the created watching will attach with the same watch_id.
+	Created bool `protobuf:"varint,3,opt,name=created,proto3" json:"created,omitempty"`
+	// If the response is for a cancel watch request, cancel is set to true.
+	// No further events will be sent to the canceled watching.
+	Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"`
+	// If a watching tries to watch at a compacted index, compacted will be set to true.
+	//
+	// This happens when creating a watching at a compacted revision or the watching cannot
+	// catch up with the progress of the KV.
+	//
+	// Client should treat the watching as canceled and should not try to create any
+	// watching with same start_revision again.
+	Compacted bool               `protobuf:"varint,5,opt,name=compacted,proto3" json:"compacted,omitempty"`
+	Events    []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"`
 }
 
 func (m *WatchResponse) Reset()         { *m = WatchResponse{} }
@@ -685,7 +734,7 @@ func NewWatchClient(cc *grpc.ClientConn) WatchClient {
 }
 
 func (c *watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) {
-	stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.watch/Watch", opts...)
+	stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Watch/Watch", opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -756,7 +805,7 @@ func (x *watchWatchServer) Recv() (*WatchRequest, error) {
 }
 
 var _Watch_serviceDesc = grpc.ServiceDesc{
-	ServiceName: "etcdserverpb.watch",
+	ServiceName: "etcdserverpb.Watch",
 	HandlerType: (*WatchServer)(nil),
 	Methods:     []grpc.MethodDesc{},
 	Streams: []grpc.StreamDesc{
@@ -1513,6 +1562,44 @@ func (m *WatchRequest) Marshal() (data []byte, err error) {
 }
 
 func (m *WatchRequest) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.CreateRequest != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.CreateRequest.Size()))
+		n12, err := m.CreateRequest.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n12
+	}
+	if m.CancelRequest != nil {
+		data[i] = 0x12
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.CancelRequest.Size()))
+		n13, err := m.CancelRequest.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n13
+	}
+	return i, nil
+}
+
+func (m *WatchCreateRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) {
 	var i int
 	_ = i
 	var l int
@@ -1541,6 +1628,29 @@ func (m *WatchRequest) MarshalTo(data []byte) (int, error) {
 	return i, nil
 }
 
+func (m *WatchCancelRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *WatchCancelRequest) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.WatchId != 0 {
+		data[i] = 0x8
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.WatchId))
+	}
+	return i, nil
+}
+
 func (m *WatchResponse) Marshal() (data []byte, err error) {
 	size := m.Size()
 	data = make([]byte, size)
@@ -1560,15 +1670,50 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n12, err := m.Header.MarshalTo(data[i:])
+		n14, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n12
+		i += n14
+	}
+	if m.WatchId != 0 {
+		data[i] = 0x10
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.WatchId))
+	}
+	if m.Created {
+		data[i] = 0x18
+		i++
+		if m.Created {
+			data[i] = 1
+		} else {
+			data[i] = 0
+		}
+		i++
+	}
+	if m.Canceled {
+		data[i] = 0x20
+		i++
+		if m.Canceled {
+			data[i] = 1
+		} else {
+			data[i] = 0
+		}
+		i++
+	}
+	if m.Compacted {
+		data[i] = 0x28
+		i++
+		if m.Compacted {
+			data[i] = 1
+		} else {
+			data[i] = 0
+		}
+		i++
 	}
 	if len(m.Events) > 0 {
 		for _, msg := range m.Events {
-			data[i] = 0x12
+			data[i] = 0x5a
 			i++
 			i = encodeVarintRpc(data, i, uint64(msg.Size()))
 			n, err := msg.MarshalTo(data[i:])
@@ -1623,11 +1768,11 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n13, err := m.Header.MarshalTo(data[i:])
+		n15, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n13
+		i += n15
 	}
 	if m.LeaseId != 0 {
 		data[i] = 0x10
@@ -1690,11 +1835,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n14, err := m.Header.MarshalTo(data[i:])
+		n16, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n14
+		i += n16
 	}
 	return i, nil
 }
@@ -1741,11 +1886,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n15, err := m.Header.MarshalTo(data[i:])
+		n17, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n15
+		i += n17
 	}
 	if m.LeaseId != 0 {
 		data[i] = 0x10
@@ -2039,6 +2184,20 @@ func (m *CompactionResponse) Size() (n int) {
 }
 
 func (m *WatchRequest) Size() (n int) {
+	var l int
+	_ = l
+	if m.CreateRequest != nil {
+		l = m.CreateRequest.Size()
+		n += 1 + l + sovRpc(uint64(l))
+	}
+	if m.CancelRequest != nil {
+		l = m.CancelRequest.Size()
+		n += 1 + l + sovRpc(uint64(l))
+	}
+	return n
+}
+
+func (m *WatchCreateRequest) Size() (n int) {
 	var l int
 	_ = l
 	if m.Key != nil {
@@ -2059,6 +2218,15 @@ func (m *WatchRequest) Size() (n int) {
 	return n
 }
 
+func (m *WatchCancelRequest) Size() (n int) {
+	var l int
+	_ = l
+	if m.WatchId != 0 {
+		n += 1 + sovRpc(uint64(m.WatchId))
+	}
+	return n
+}
+
 func (m *WatchResponse) Size() (n int) {
 	var l int
 	_ = l
@@ -2066,6 +2234,18 @@ func (m *WatchResponse) Size() (n int) {
 		l = m.Header.Size()
 		n += 1 + l + sovRpc(uint64(l))
 	}
+	if m.WatchId != 0 {
+		n += 1 + sovRpc(uint64(m.WatchId))
+	}
+	if m.Created {
+		n += 2
+	}
+	if m.Canceled {
+		n += 2
+	}
+	if m.Compacted {
+		n += 2
+	}
 	if len(m.Events) > 0 {
 		for _, e := range m.Events {
 			l = e.Size()
@@ -3704,6 +3884,111 @@ func (m *CompactionResponse) Unmarshal(data []byte) error {
 	return nil
 }
 func (m *WatchRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field CreateRequest", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthRpc
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.CreateRequest == nil {
+				m.CreateRequest = &WatchCreateRequest{}
+			}
+			if err := m.CreateRequest.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field CancelRequest", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthRpc
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.CancelRequest == nil {
+				m.CancelRequest = &WatchCancelRequest{}
+			}
+			if err := m.CancelRequest.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			iNdEx -= sizeOfWire
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	return nil
+}
+func (m *WatchCreateRequest) Unmarshal(data []byte) error {
 	l := len(data)
 	iNdEx := 0
 	for iNdEx < l {
@@ -3814,6 +4099,67 @@ func (m *WatchRequest) Unmarshal(data []byte) error {
 
 	return nil
 }
+func (m *WatchCancelRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field WatchId", wireType)
+			}
+			m.WatchId = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.WatchId |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			iNdEx -= sizeOfWire
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	return nil
+}
 func (m *WatchResponse) Unmarshal(data []byte) error {
 	l := len(data)
 	iNdEx := 0
@@ -3864,6 +4210,73 @@ func (m *WatchResponse) Unmarshal(data []byte) error {
 			}
 			iNdEx = postIndex
 		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field WatchId", wireType)
+			}
+			m.WatchId = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.WatchId |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Created", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Created = bool(v != 0)
+		case 4:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Canceled", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Canceled = bool(v != 0)
+		case 5:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Compacted", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Compacted = bool(v != 0)
+		case 11:
 			if wireType != 2 {
 				return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType)
 			}

+ 32 - 4
etcdserver/etcdserverpb/rpc.proto

@@ -32,7 +32,7 @@ service KV {
   rpc Compact(CompactionRequest) returns (CompactionResponse) {}
 }
 
-service watch {
+service Watch {
   // Watch watches the events happening or happened. Both input and output
   // are stream. One watch rpc can watch for multiple keys or prefixs and
   // get a stream of events. The whole events history can be watched unless
@@ -197,6 +197,13 @@ message CompactionResponse {
 }
 
 message WatchRequest {
+  oneof request_union {
+    WatchCreateRequest create_request = 1;
+    WatchCancelRequest cancel_request = 2;
+  }
+}
+
+message WatchCreateRequest {
   // the key to be watched
   bytes key = 1;
   // the prefix to be watched.
@@ -204,13 +211,34 @@ message WatchRequest {
   // start_revision is an optional revision (including) to watch from. No start_revision is "now".
   int64 start_revision = 3;
   // TODO: support Range watch?
-  // TODO: support notification every time interval or revision increase?
-  // TODO: support cancel watch if the server cannot reach with majority?
+}
+
+message WatchCancelRequest {
+  int64 watch_id = 1;
 }
 
 message WatchResponse {
   ResponseHeader header = 1;
-  repeated storagepb.Event events = 2;
+  // watch_id is the ID of the watching the response sent to.
+  int64 watch_id = 2;
+  // If the response is for a create watch request, created is set to true.
+  // Client should record the watch_id and prepare for receiving events for
+  // that watching from the same stream.
+  // All events sent to the created watching will attach with the same watch_id.
+  bool created = 3;
+  // If the response is for a cancel watch request, cancel is set to true.
+  // No further events will be sent to the canceled watching.
+  bool canceled = 4;
+  // If a watching tries to watch at a compacted index, compacted will be set to true.
+  //
+  // This happens when creating a watching at a compacted revision or the watching cannot
+  // catch up with the progress of the KV.
+  //
+  // Client should treat the watching as canceled and should not try to create any
+  // watching with same start_revision again. 
+  bool compacted = 5;
+
+  repeated storagepb.Event events = 11;
 }
 
 message LeaseCreateRequest {

+ 1 - 1
tools/benchmark/cmd/watch.go

@@ -109,7 +109,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	go func() {
 		for i := 0; i < watchTotal; i++ {
 			requests <- etcdserverpb.WatchRequest{
-				Key: watched[i%(len(watched))],
+				CreateRequest: &etcdserverpb.WatchCreateRequest{Key: watched[i%(len(watched))]},
 			}
 		}
 		close(requests)