Преглед на файлове

clientv3: expose event type in user API

- add another layer of abstraction in clientv3 for user, not expose internal storagepb ones
- provide commonly used routines IsCreate(), IsModify() on event
Hongchao Deng преди 9 години
родител
ревизия
aa11dafaf8
променени са 4 файла, в които са добавени 154 реда и са изтрити 6 реда
  1. 73 1
      clientv3/integration/watch_test.go
  2. 23 2
      clientv3/watch.go
  3. 55 0
      clientv3/watch_test.go
  4. 3 3
      contrib/recipes/watch.go

+ 73 - 1
clientv3/integration/watch_test.go

@@ -101,7 +101,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
 			t.Fatalf("expected watcher channel, got nil")
 		}
 		readyc <- struct{}{}
-		evs := []*storagepb.Event{}
+		evs := []*clientv3.Event{}
 		for i := 0; i < numKeyUpdates*2; i++ {
 			resp, ok := <-prefixc
 			if !ok {
@@ -430,3 +430,75 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 		t.Fatalf("watch response expected in %v, but timed out", pi)
 	}
 }
+
+func TestWatchEventType(t *testing.T) {
+	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer cluster.Terminate(t)
+
+	client := cluster.RandClient()
+	ctx := context.Background()
+	watchChan := client.Watch(ctx, "/", clientv3.WithPrefix())
+
+	if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil {
+		t.Fatalf("Put failed: %v", err)
+	}
+	if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil {
+		t.Fatalf("Put failed: %v", err)
+	}
+	if _, err := client.Delete(ctx, "/toDelete"); err != nil {
+		t.Fatalf("Delete failed: %v", err)
+	}
+	lcr, err := client.Lease.Create(ctx, 1)
+	if err != nil {
+		t.Fatalf("lease create failed: %v", err)
+	}
+	if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(clientv3.LeaseID(lcr.ID))); err != nil {
+		t.Fatalf("Put failed: %v", err)
+	}
+
+	tests := []struct {
+		et       storagepb.Event_EventType
+		isCreate bool
+		isModify bool
+	}{{
+		et:       clientv3.EventTypePut,
+		isCreate: true,
+	}, {
+		et:       clientv3.EventTypePut,
+		isModify: true,
+	}, {
+		et: clientv3.EventTypeDelete,
+	}, {
+		et:       clientv3.EventTypePut,
+		isCreate: true,
+	}, {
+		et: clientv3.EventTypeDelete,
+	}}
+
+	var res []*clientv3.Event
+
+	for {
+		select {
+		case wres := <-watchChan:
+			res = append(res, wres.Events...)
+		case <-time.After(10 * time.Second):
+			t.Fatalf("Should receive %d events and then break out loop", len(tests))
+		}
+		if len(res) == len(tests) {
+			break
+		}
+	}
+
+	for i, tt := range tests {
+		ev := res[i]
+		if tt.et != ev.Type {
+			t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type)
+		}
+		if tt.isCreate && !ev.IsCreate() {
+			t.Errorf("#%d: event should be CreateEvent", i)
+		}
+		if tt.isModify && !ev.IsModify() {
+			t.Errorf("#%d: event should be ModifyEvent", i)
+		}
+	}
+}

+ 23 - 2
clientv3/watch.go

@@ -25,6 +25,13 @@ import (
 	"google.golang.org/grpc"
 )
 
+const (
+	EventTypeDelete = storagepb.DELETE
+	EventTypePut    = storagepb.PUT
+)
+
+type Event storagepb.Event
+
 type WatchChan <-chan WatchResponse
 
 type Watcher interface {
@@ -41,7 +48,7 @@ type Watcher interface {
 
 type WatchResponse struct {
 	Header pb.ResponseHeader
-	Events []*storagepb.Event
+	Events []*Event
 
 	// CompactRevision is the minimum revision the watcher may receive.
 	CompactRevision int64
@@ -52,6 +59,16 @@ type WatchResponse struct {
 	Canceled bool
 }
 
+// IsCreate returns true if the event tells that the key is newly created.
+func (e *Event) IsCreate() bool {
+	return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
+}
+
+// IsModify returns true if the event tells that a new value is put on existing key.
+func (e *Event) IsModify() bool {
+	return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
+}
+
 // Err is the error value if this WatchResponse holds an error.
 func (wr *WatchResponse) Err() error {
 	if wr.CompactRevision != 0 {
@@ -352,10 +369,14 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
 	w.mu.RLock()
 	defer w.mu.RUnlock()
 	ws, ok := w.streams[pbresp.WatchId]
+	events := make([]*Event, len(pbresp.Events))
+	for i, ev := range pbresp.Events {
+		events[i] = (*Event)(ev)
+	}
 	if ok {
 		wr := &WatchResponse{
 			Header:          *pbresp.Header,
-			Events:          pbresp.Events,
+			Events:          events,
 			CompactRevision: pbresp.CompactRevision,
 			Canceled:        pbresp.Canceled}
 		ws.recvc <- wr

+ 55 - 0
clientv3/watch_test.go

@@ -0,0 +1,55 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"testing"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+func TestEvent(t *testing.T) {
+	tests := []struct {
+		ev       *Event
+		isCreate bool
+		isModify bool
+	}{{
+		ev: &Event{
+			Type: EventTypePut,
+			Kv: &storagepb.KeyValue{
+				CreateRevision: 3,
+				ModRevision:    3,
+			},
+		},
+		isCreate: true,
+	}, {
+		ev: &Event{
+			Type: EventTypePut,
+			Kv: &storagepb.KeyValue{
+				CreateRevision: 3,
+				ModRevision:    4,
+			},
+		},
+		isModify: false,
+	}}
+	for i, tt := range tests {
+		if tt.isCreate && !tt.ev.IsCreate() {
+			t.Errorf("#%d: event should be Create event", i)
+		}
+		if tt.isModify && !tt.ev.IsModify() {
+			t.Errorf("#%d: event should be Modify event", i)
+		}
+	}
+}

+ 3 - 3
contrib/recipes/watch.go

@@ -21,7 +21,7 @@ import (
 )
 
 // WaitEvents waits on a key until it observes the given events and returns the final one.
-func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) {
 	wc := c.Watch(context.Background(), key, clientv3.WithRev(rev))
 	if wc == nil {
 		return nil, ErrNoWatcher
@@ -29,7 +29,7 @@ func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event
 	return waitEvents(wc, evs), nil
 }
 
-func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) {
 	wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
 	if wc == nil {
 		return nil, ErrNoWatcher
@@ -37,7 +37,7 @@ func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storag
 	return waitEvents(wc, evs), nil
 }
 
-func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
+func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *clientv3.Event {
 	i := 0
 	for wresp := range wc {
 		for _, ev := range wresp.Events {