Bläddra i källkod

Merge pull request #4677 from heyitsanthony/clientv3-wr-err

clientv3: add Err() to WatchResponse
Anthony Romano 9 år sedan
förälder
incheckning
b0a88ab287

+ 1 - 1
clientv3/concurrency/election.go

@@ -139,7 +139,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 
 			for kv == nil {
 				wr, ok := <-wch
-				if !ok || len(wr.Events) == 0 {
+				if !ok || wr.Err() != nil {
 					cancel()
 					return
 				}

+ 1 - 5
clientv3/concurrency/key.go

@@ -20,7 +20,6 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	v3 "github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
@@ -52,10 +51,7 @@ func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.O
 	if !ok {
 		return ctx.Err()
 	}
-	if len(wresp.Events) == 0 {
-		return v3rpc.ErrCompacted
-	}
-	return nil
+	return wresp.Err()
 }
 
 func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {

+ 42 - 2
clientv3/integration/watch_test.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/integration"
 	"github.com/coreos/etcd/pkg/testutil"
 	storagepb "github.com/coreos/etcd/storage/storagepb"
@@ -347,8 +348,8 @@ func TestWatchInvalidFutureRevision(t *testing.T) {
 	if !ok {
 		t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok)
 	}
-	if !wresp.Canceled {
-		t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled)
+	if wresp.Err() != v3rpc.ErrFutureRev {
+		t.Fatalf("wresp.Err() expected ErrFutureRev, but got %v", wresp.Err())
 	}
 
 	_, ok = <-rch // ensure the channel is closed
@@ -356,3 +357,42 @@ func TestWatchInvalidFutureRevision(t *testing.T) {
 		t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok)
 	}
 }
+
+// TestWatchCompactRevision ensures the CompactRevision error is given on a
+// compaction event ahead of a watcher.
+func TestWatchCompactRevision(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	// set some keys
+	kv := clientv3.NewKV(clus.RandClient())
+	for i := 0; i < 5; i++ {
+		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	w := clientv3.NewWatcher(clus.RandClient())
+	defer w.Close()
+
+	if err := kv.Compact(context.TODO(), 4); err != nil {
+		t.Fatal(err)
+	}
+	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
+
+	// get compacted error message
+	wresp, ok := <-wch
+	if !ok {
+		t.Fatalf("expected wresp, but got closed channel")
+	}
+	if wresp.Err() != v3rpc.ErrCompacted {
+		t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err())
+	}
+
+	// ensure the channel is closed
+	if wresp, ok = <-wch; ok {
+		t.Fatalf("expected closed channel, but got %v", wresp)
+	}
+}

+ 19 - 7
clientv3/watch.go

@@ -20,6 +20,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	storagepb "github.com/coreos/etcd/storage/storagepb"
 )
@@ -41,14 +42,25 @@ type Watcher interface {
 type WatchResponse struct {
 	Header pb.ResponseHeader
 	Events []*storagepb.Event
-	// CompactRevision is set to the compaction revision that
-	// caused the watcher to cancel.
+
+	// CompactRevision is the minimum revision the watcher may receive.
 	CompactRevision int64
 
-	// Canceled is 'true' when it has received wrong watch start revision.
+	// Canceled is set to indicate the channel is about to close.
 	Canceled bool
 }
 
+// Err is the error value if this WatchResponse holds an error.
+func (wr *WatchResponse) Err() error {
+	if wr.CompactRevision != 0 {
+		return v3rpc.ErrCompacted
+	}
+	if wr.Canceled {
+		return v3rpc.ErrFutureRev
+	}
+	return nil
+}
+
 // watcher implements the Watcher interface
 type watcher struct {
 	c      *Client
@@ -179,12 +191,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 		return
 	}
 	if resp.Canceled || resp.CompactRevision != 0 {
-		// compaction after start revision
+		// a cancel at id creation time means the start revision has
+		// been compacted out of the store
 		ret := make(chan WatchResponse, 1)
 		ret <- WatchResponse{
 			Header:          *resp.Header,
 			CompactRevision: resp.CompactRevision,
-			Canceled:        resp.Canceled}
+			Canceled:        true}
 		close(ret)
 		pendingReq.retc <- ret
 		return
@@ -375,8 +388,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
 		}
 		select {
 		case outc <- *curWr:
-			if len(wrs[0].Events) == 0 {
-				// compaction message
+			if wrs[0].Err() != nil {
 				closing = true
 				break
 			}

+ 2 - 2
etcdctlv3/command/snapshot_command.go

@@ -53,7 +53,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) {
 func snapshotToStdout(c *clientv3.Client) {
 	// must explicitly fetch first revision since no retry on stdout
 	wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
-	if len(wr.Events) > 0 {
+	if wr.Err() == nil {
 		wr.CompactRevision = 1
 	}
 	if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 {
@@ -111,7 +111,7 @@ func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
 	wc := s.SyncUpdates(context.TODO())
 
 	for wr := range wc {
-		if len(wr.Events) == 0 {
+		if wr.Err() != nil {
 			return wr.CompactRevision
 		}
 		for _, ev := range wr.Events {