Browse Source

etcdserver: ranges in watcher rpc protocol

protocol change so watch requests are ranges; server rejects non-prefix ranges
Anthony Romano 9 years ago
parent
commit
8dbc6cfd43

+ 0 - 21
clientv3/op.go

@@ -69,27 +69,6 @@ func (op Op) toRequestUnion() *pb.RequestUnion {
 	}
 }
 
-func (op Op) toWatchRequest() *watchRequest {
-	switch op.t {
-	case tRange:
-		key := string(op.key)
-		prefix := ""
-		if op.end != nil {
-			prefix = key
-			key = ""
-		}
-		wr := &watchRequest{
-			key:    key,
-			prefix: prefix,
-			rev:    op.rev,
-		}
-		return wr
-
-	default:
-		panic("Only for tRange")
-	}
-}
-
 func (op Op) isWrite() bool {
 	return op.t != tRange
 }

+ 15 - 13
clientv3/watch.go

@@ -78,10 +78,10 @@ type watcher struct {
 
 // watchRequest is issued by the subscriber to start a new watcher
 type watchRequest struct {
-	ctx    context.Context
-	key    string
-	prefix string
-	rev    int64
+	ctx context.Context
+	key string
+	end string
+	rev int64
 	// retc receives a chan WatchResponse once the watcher is established
 	retc chan chan WatchResponse
 }
@@ -129,11 +129,14 @@ func NewWatcher(c *Client) Watcher {
 func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
 	ow := opWatch(key, opts...)
 
-	wr := ow.toWatchRequest()
-	wr.ctx = ctx
-
 	retc := make(chan chan WatchResponse, 1)
-	wr.retc = retc
+	wr := &watchRequest{
+		ctx:  ctx,
+		key:  string(ow.key),
+		end:  string(ow.end),
+		rev:  ow.rev,
+		retc: retc,
+	}
 
 	ok := false
 
@@ -502,11 +505,10 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
 
 // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
 func (wr *watchRequest) toPB() *pb.WatchRequest {
-	req := &pb.WatchCreateRequest{StartRevision: wr.rev}
-	if wr.key != "" {
-		req.Key = []byte(wr.key)
-	} else {
-		req.Prefix = []byte(wr.prefix)
+	req := &pb.WatchCreateRequest{
+		StartRevision: wr.rev,
+		Key:           []byte(wr.key),
+		RangeEnd:      []byte(wr.end),
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	return &pb.WatchRequest{RequestUnion: cr}

+ 45 - 28
etcdserver/api/v3rpc/watch.go

@@ -16,6 +16,7 @@ package v3rpc
 
 import (
 	"io"
+	"reflect"
 
 	"github.com/coreos/etcd/etcdserver"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -94,35 +95,33 @@ func (sws *serverWatchStream) recvLoop() error {
 
 		switch uv := req.RequestUnion.(type) {
 		case *pb.WatchRequest_CreateRequest:
-			if uv.CreateRequest != nil {
-				creq := uv.CreateRequest
-				var prefix bool
-				toWatch := creq.Key
-				if len(creq.Key) == 0 {
-					toWatch = creq.Prefix
-					prefix = true
-				}
+			if uv.CreateRequest == nil {
+				break
+			}
 
-				rev := creq.StartRevision
-				wsrev := sws.watchStream.Rev()
-				if rev == 0 {
-					// rev 0 watches past the current revision
-					rev = wsrev + 1
-				} else if rev > wsrev { // do not allow watching future revision.
-					sws.ctrlStream <- &pb.WatchResponse{
-						Header:   sws.newResponseHeader(wsrev),
-						WatchId:  -1,
-						Created:  true,
-						Canceled: true,
-					}
-					continue
-				}
-				id := sws.watchStream.Watch(toWatch, prefix, rev)
-				sws.ctrlStream <- &pb.WatchResponse{
-					Header:  sws.newResponseHeader(wsrev),
-					WatchId: int64(id),
-					Created: true,
-				}
+			creq := uv.CreateRequest
+			toWatch := creq.Key
+			isPrefix := len(creq.RangeEnd) != 0
+			badPrefix := isPrefix && !reflect.DeepEqual(getPrefix(toWatch), creq.RangeEnd)
+
+			rev := creq.StartRevision
+			wsrev := sws.watchStream.Rev()
+			futureRev := rev > wsrev
+			if rev == 0 {
+				// rev 0 watches past the current revision
+				rev = wsrev + 1
+			}
+			// do not allow future watch revision
+			// do not allow range that is not a prefix
+			id := storage.WatchID(-1)
+			if !futureRev && !badPrefix {
+				id = sws.watchStream.Watch(toWatch, isPrefix, rev)
+			}
+			sws.ctrlStream <- &pb.WatchResponse{
+				Header:   sws.newResponseHeader(wsrev),
+				WatchId:  int64(id),
+				Created:  true,
+				Canceled: futureRev || badPrefix,
 			}
 		case *pb.WatchRequest_CancelRequest:
 			if uv.CancelRequest != nil {
@@ -238,3 +237,21 @@ func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
 		RaftTerm:  sws.raftTimer.Term(),
 	}
 }
+
+// TODO: remove getPrefix when storage supports full range watchers
+
+func getPrefix(key []byte) []byte {
+	end := make([]byte, len(key))
+	copy(end, key)
+	for i := len(end) - 1; i >= 0; i-- {
+		if end[i] < 0xff {
+			end[i] = end[i] + 1
+			end = end[:i+1]
+			return end
+		}
+	}
+	// next prefix does not exist (e.g., 0xffff);
+	// default to WithFromKey policy
+	end = []byte{0}
+	return end
+}

+ 13 - 12
etcdserver/etcdserverpb/rpc.pb.go

@@ -870,8 +870,9 @@ func _WatchRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.B
 type WatchCreateRequest struct {
 	// the key to be watched
 	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
-	// the prefix to be watched.
-	Prefix []byte `protobuf:"bytes,2,opt,name=prefix,proto3" json:"prefix,omitempty"`
+	// if the range_end is given, keys in [key, range_end) are watched
+	// NOTE: only range_end == prefixEnd(key) is accepted now
+	RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"`
 	// start_revision is an optional revision (including) to watch from. No start_revision is "now".
 	StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"`
 }
@@ -2588,12 +2589,12 @@ func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) {
 			i += copy(data[i:], m.Key)
 		}
 	}
-	if m.Prefix != nil {
-		if len(m.Prefix) > 0 {
+	if m.RangeEnd != nil {
+		if len(m.RangeEnd) > 0 {
 			data[i] = 0x12
 			i++
-			i = encodeVarintRpc(data, i, uint64(len(m.Prefix)))
-			i += copy(data[i:], m.Prefix)
+			i = encodeVarintRpc(data, i, uint64(len(m.RangeEnd)))
+			i += copy(data[i:], m.RangeEnd)
 		}
 	}
 	if m.StartRevision != 0 {
@@ -3592,8 +3593,8 @@ func (m *WatchCreateRequest) Size() (n int) {
 			n += 1 + l + sovRpc(uint64(l))
 		}
 	}
-	if m.Prefix != nil {
-		l = len(m.Prefix)
+	if m.RangeEnd != nil {
+		l = len(m.RangeEnd)
 		if l > 0 {
 			n += 1 + l + sovRpc(uint64(l))
 		}
@@ -6004,7 +6005,7 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error {
 			iNdEx = postIndex
 		case 2:
 			if wireType != 2 {
-				return fmt.Errorf("proto: wrong wireType = %d for field Prefix", wireType)
+				return fmt.Errorf("proto: wrong wireType = %d for field RangeEnd", wireType)
 			}
 			var byteLen int
 			for shift := uint(0); ; shift += 7 {
@@ -6028,9 +6029,9 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			m.Prefix = append(m.Prefix[:0], data[iNdEx:postIndex]...)
-			if m.Prefix == nil {
-				m.Prefix = []byte{}
+			m.RangeEnd = append(m.RangeEnd[:0], data[iNdEx:postIndex]...)
+			if m.RangeEnd == nil {
+				m.RangeEnd = []byte{}
 			}
 			iNdEx = postIndex
 		case 3:

+ 3 - 3
etcdserver/etcdserverpb/rpc.proto

@@ -262,11 +262,11 @@ message WatchRequest {
 message WatchCreateRequest {
   // the key to be watched
   bytes key = 1;
-  // the prefix to be watched.
-  bytes prefix = 2;
+  // if the range_end is given, keys in [key, range_end) are watched
+  // NOTE: only range_end == prefixEnd(key) is accepted now
+  bytes range_end = 2;
   // start_revision is an optional revision (including) to watch from. No start_revision is "now".
   int64 start_revision = 3;
-  // TODO: support Range watch?
 }
 
 message WatchCancelRequest {

+ 15 - 7
integration/v3_watch_test.go

@@ -71,7 +71,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 			[]string{"fooLong"},
 			&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 				CreateRequest: &pb.WatchCreateRequest{
-					Prefix: []byte("foo")}}},
+					Key:      []byte("foo"),
+					RangeEnd: []byte("fop")}}},
 
 			[]*pb.WatchResponse{
 				{
@@ -91,7 +92,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 			[]string{"foo"},
 			&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 				CreateRequest: &pb.WatchCreateRequest{
-					Prefix: []byte("helloworld")}}},
+					Key:      []byte("helloworld"),
+					RangeEnd: []byte("helloworle")}}},
 
 			[]*pb.WatchResponse{},
 		},
@@ -140,7 +142,8 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 			[]string{"foo", "foo", "foo"},
 			&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 				CreateRequest: &pb.WatchCreateRequest{
-					Prefix: []byte("foo")}}},
+					Key:      []byte("foo"),
+					RangeEnd: []byte("fop")}}},
 
 			[]*pb.WatchResponse{
 				{
@@ -203,6 +206,11 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
 			t.Errorf("#%d: did not create watchid, got +%v", i, cresp)
 			continue
 		}
+		if cresp.Canceled {
+			t.Errorf("#%d: canceled watcher on create", i, cresp)
+			continue
+		}
+
 		createdWatchId := cresp.WatchId
 		if cresp.Header == nil || cresp.Header.Revision != 1 {
 			t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp)
@@ -353,7 +361,7 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
 	progress := make(map[int64]int64)
 
 	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
-		CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}}
+		CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), RangeEnd: []byte("fop")}}}
 	if err := wStream.Send(wreq); err != nil {
 		t.Fatalf("first watch request failed (%v)", err)
 	}
@@ -437,7 +445,7 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
 		} else {
 			wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 				CreateRequest: &pb.WatchCreateRequest{
-					Prefix: []byte("fo"), StartRevision: startRev}}}
+					Key: []byte("fo"), RangeEnd: []byte("fp"), StartRevision: startRev}}}
 		}
 		if err := wStream.Send(wreq); err != nil {
 			t.Fatalf("wStream.Send error: %v", err)
@@ -530,7 +538,7 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 
 	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 		CreateRequest: &pb.WatchCreateRequest{
-			Prefix: []byte("foo"), StartRevision: startRev}}}
+			Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: startRev}}}
 	if err := wStream.Send(wreq); err != nil {
 		t.Fatalf("wStream.Send error: %v", err)
 	}
@@ -623,7 +631,7 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
 
 	wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
 		CreateRequest: &pb.WatchCreateRequest{
-			Prefix: []byte("foo"), StartRevision: 1}}}
+			Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: 1}}}
 	if err := wStream.Send(wreq); err != nil {
 		t.Fatalf("wStream.Send error: %v", err)
 	}