Browse Source

client: KeysAPI.[R]Watch -> Watcher w/ opts struct

Brian Waldon 11 years ago
parent
commit
01fc01ec69
5 changed files with 17 additions and 33 deletions
  1. 9 17
      client/keys.go
  2. 2 2
      discovery/discovery.go
  3. 2 10
      discovery/discovery_test.go
  4. 3 3
      integration/cluster_test.go
  5. 1 1
      integration/member_test.go

+ 9 - 17
client/keys.go

@@ -70,8 +70,12 @@ type KeysAPI interface {
 	Get(ctx context.Context, key string) (*Response, error)
 	RGet(ctx context.Context, key string) (*Response, error)
 
-	Watch(key string, idx uint64) Watcher
-	RWatch(key string, idx uint64) Watcher
+	Watcher(key string, opts WatcherOptions) Watcher
+}
+
+type WatcherOptions struct {
+	WaitIndex uint64
+	Recursive bool
 }
 
 type SetOptions struct {
@@ -184,26 +188,14 @@ func (k *httpKeysAPI) RGet(ctx context.Context, key string) (*Response, error) {
 	return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
 }
 
-func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher {
-	return &httpWatcher{
-		client: k.client,
-		nextWait: waitAction{
-			Prefix:    k.prefix,
-			Key:       key,
-			WaitIndex: idx,
-			Recursive: false,
-		},
-	}
-}
-
-func (k *httpKeysAPI) RWatch(key string, idx uint64) Watcher {
+func (k *httpKeysAPI) Watcher(key string, opts WatcherOptions) Watcher {
 	return &httpWatcher{
 		client: k.client,
 		nextWait: waitAction{
 			Prefix:    k.prefix,
 			Key:       key,
-			WaitIndex: idx,
-			Recursive: true,
+			WaitIndex: opts.WaitIndex,
+			Recursive: opts.Recursive,
 		},
 	}
 }

+ 2 - 2
discovery/discovery.go

@@ -186,7 +186,7 @@ func (d *discovery) createSelf(contents string) error {
 	}
 
 	// ensure self appears on the server we connected to
-	w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
+	w := d.c.Watcher(d.selfKey(), client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex})
 	_, err = w.Next(context.Background())
 	return err
 }
@@ -275,7 +275,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (clien
 		nodes = nodes[:size]
 	}
 	// watch from the next index
-	w := d.c.RWatch(d.cluster, index+1)
+	w := d.c.Watcher(d.cluster, client.WatcherOptions{WaitIndex: index + 1, Recursive: true})
 	all := make(client.Nodes, len(nodes))
 	copy(all, nodes)
 	for _, n := range all {

+ 2 - 10
discovery/discovery_test.go

@@ -431,11 +431,7 @@ func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response,
 	return r, nil
 }
 
-func (c *clientWithResp) Watch(key string, waitIndex uint64) client.Watcher {
-	return c.w
-}
-
-func (c *clientWithResp) RWatch(key string, waitIndex uint64) client.Watcher {
+func (c *clientWithResp) Watcher(key string, opts client.WatcherOptions) client.Watcher {
 	return c.w
 }
 
@@ -453,11 +449,7 @@ func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response,
 	return &client.Response{}, c.err
 }
 
-func (c *clientWithErr) Watch(key string, waitIndex uint64) client.Watcher {
-	return c.w
-}
-
-func (c *clientWithErr) RWatch(key string, waitIndex uint64) client.Watcher {
+func (c *clientWithErr) Watcher(key string, opts client.WatcherOptions) client.Watcher {
 	return c.w
 }
 

+ 3 - 3
integration/cluster_test.go

@@ -142,7 +142,7 @@ func TestForceNewCluster(t *testing.T) {
 	cancel()
 	// ensure create has been applied in this machine
 	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
-	if _, err := kapi.Watch("/foo", resp.Node.ModifiedIndex).Next(ctx); err != nil {
+	if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
 		t.Fatalf("unexpected watch error: %v", err)
 	}
 	cancel()
@@ -163,7 +163,7 @@ func TestForceNewCluster(t *testing.T) {
 	kapi = client.NewKeysAPI(cc)
 	// ensure force restart keep the old data, and new cluster can make progress
 	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
-	if _, err := kapi.Watch("/foo", resp.Node.ModifiedIndex).Next(ctx); err != nil {
+	if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
 		t.Fatalf("unexpected watch error: %v", err)
 	}
 	cancel()
@@ -189,7 +189,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
 		mcc := mustNewHTTPClient(t, []string{u})
 		mkapi := client.NewKeysAPI(mcc)
 		mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
-		if _, err := mkapi.Watch(key, resp.Node.ModifiedIndex).Next(mctx); err != nil {
+		if _, err := mkapi.Watcher(key, client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil {
 			t.Fatalf("#%d: watch on %s error: %v", i, u, err)
 		}
 		mcancel()

+ 1 - 1
integration/member_test.go

@@ -95,7 +95,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
 		kapi := client.NewKeysAPI(cc)
 		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
 		key := fmt.Sprintf("foo%d", i)
-		resps[i], err = kapi.Create(ctx, "/"+key, "bar", -1)
+		resps[i], err = kapi.Create(ctx, "/"+key, "bar")
 		if err != nil {
 			t.Fatalf("#%d: create on %s error: %v", i, m.URL(), err)
 		}