Browse Source

Merge pull request #4581 from heyitsanthony/recipes-clientv3

contrib/recipes: use clientv3 APIs
Anthony Romano 10 years ago
parent
commit
ab9f925c04

+ 11 - 9
contrib/recipes/barrier.go

@@ -16,38 +16,40 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // Barrier creates a key in etcd to block processes, then deletes the key to
 // release all blocked processes.
 type Barrier struct {
-	client *clientv3.Client
-	key    string
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
+
+	key string
 }
 
-func NewBarrier(client *clientv3.Client, key string) *Barrier {
-	return &Barrier{client, key}
+func NewBarrier(client *v3.Client, key string) *Barrier {
+	return &Barrier{client, v3.NewKV(client), context.TODO(), key}
 }
 
 // Hold creates the barrier key causing processes to block on Wait.
 func (b *Barrier) Hold() error {
-	_, err := NewKey(b.client, b.key, 0)
+	_, err := NewKey(b.kv, b.key, 0)
 	return err
 }
 
 // Release deletes the barrier key to unblock all waiting processes.
 func (b *Barrier) Release() error {
-	_, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)})
+	_, err := b.kv.Delete(b.ctx, b.key)
 	return err
 }
 
 // Wait blocks on the barrier key until it is deleted. If there is no key, Wait
 // assumes Release has already been called and returns immediately.
 func (b *Barrier) Wait() error {
-	resp, err := NewRange(b.client, b.key).FirstKey()
+	resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...)
 	if err != nil {
 		return err
 	}

+ 10 - 34
contrib/recipes/client.go

@@ -18,7 +18,7 @@ import (
 	"errors"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	v3 "github.com/coreos/etcd/clientv3"
 	spb "github.com/coreos/etcd/storage/storagepb"
 )
 
@@ -26,25 +26,14 @@ var (
 	ErrKeyExists      = errors.New("key already exists")
 	ErrWaitMismatch   = errors.New("unexpected wait result")
 	ErrTooManyClients = errors.New("too many clients")
+	ErrNoWatcher      = errors.New("no watcher channel")
 )
 
 // deleteRevKey deletes a key by revision, returning false if key is missing
-func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
-	cmp := &pb.Compare{
-		Result:      pb.Compare_EQUAL,
-		Target:      pb.Compare_MOD,
-		Key:         []byte(key),
-		TargetUnion: &pb.Compare_ModRevision{ModRevision: rev},
-	}
-	req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
-		RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
-	txnresp, err := kvc.Txn(
-		context.TODO(),
-		&pb.TxnRequest{
-			Compare: []*pb.Compare{cmp},
-			Success: []*pb.RequestUnion{req},
-			Failure: nil,
-		})
+func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) {
+	cmp := v3.Compare(v3.ModifiedRevision(key), "=", rev)
+	req := v3.OpDelete(key)
+	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
 	if err != nil {
 		return false, err
 	} else if txnresp.Succeeded == false {
@@ -53,27 +42,14 @@ func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
 	return true, nil
 }
 
-func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
-	for _, kv := range kvs {
-		ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision)
+func claimFirstKey(kv v3.KV, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
+	for _, k := range kvs {
+		ok, err := deleteRevKey(kv, string(k.Key), k.ModRevision)
 		if err != nil {
 			return nil, err
 		} else if ok {
-			return kv, nil
+			return k, nil
 		}
 	}
 	return nil, nil
 }
-
-func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) {
-	return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}})
-}
-
-// deletePrefix performs a RangeRequest to get keys on a given prefix
-func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) {
-	return kv.DeleteRange(
-		context.TODO(),
-		&pb.DeleteRangeRequest{
-			Key:      []byte(prefix),
-			RangeEnd: []byte(prefixEnd(prefix))})
-}

+ 17 - 10
contrib/recipes/double_barrier.go

@@ -17,7 +17,6 @@ package recipe
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
@@ -25,13 +24,22 @@ import (
 // blocks again on Leave until all processes have left.
 type DoubleBarrier struct {
 	client *clientv3.Client
-	key    string // key for the collective barrier
-	count  int
-	myKey  *EphemeralKV // current key for this process on the barrier
+	kv     clientv3.KV
+	ctx    context.Context
+
+	key   string // key for the collective barrier
+	count int
+	myKey *EphemeralKV // current key for this process on the barrier
 }
 
 func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
-	return &DoubleBarrier{client, key, count, nil}
+	return &DoubleBarrier{
+		client: client,
+		kv:     clientv3.NewKV(client),
+		ctx:    context.TODO(),
+		key:    key,
+		count:  count,
+	}
 }
 
 // Enter waits for "count" processes to enter the barrier then returns
@@ -42,7 +50,7 @@ func (b *DoubleBarrier) Enter() error {
 	}
 	b.myKey = ek
 
-	resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
+	resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
 	if err != nil {
 		return err
 	}
@@ -53,7 +61,7 @@ func (b *DoubleBarrier) Enter() error {
 
 	if len(resp.Kvs) == b.count {
 		// unblock waiters
-		_, err = putEmptyKey(b.client.KV, b.key+"/ready")
+		_, err = b.kv.Put(b.ctx, b.key+"/ready", "")
 		return err
 	}
 
@@ -67,7 +75,7 @@ func (b *DoubleBarrier) Enter() error {
 
 // Leave waits for "count" processes to leave the barrier then returns
 func (b *DoubleBarrier) Leave() error {
-	resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
+	resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
 	if len(resp.Kvs) == 0 {
 		return nil
 	}
@@ -85,8 +93,7 @@ func (b *DoubleBarrier) Leave() error {
 
 	if len(resp.Kvs) == 1 {
 		// this is the only node in the barrier; finish up
-		req := &pb.DeleteRangeRequest{Key: []byte(b.key + "/ready")}
-		if _, err = b.client.KV.DeleteRange(context.TODO(), req); err != nil {
+		if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil {
 			return err
 		}
 		return b.myKey.Delete()

+ 12 - 10
contrib/recipes/election.go

@@ -14,20 +14,24 @@
 package recipe
 
 import (
-	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 type Election struct {
-	client    *clientv3.Client
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
+
 	keyPrefix string
 	leaderKey *EphemeralKV
 }
 
 // NewElection returns a new election on a given key prefix.
-func NewElection(client *clientv3.Client, keyPrefix string) *Election {
-	return &Election{client, keyPrefix, nil}
+func NewElection(client *v3.Client, keyPrefix string) *Election {
+	return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil}
 }
 
 // Volunteer puts a value as eligible for the election. It blocks until
@@ -58,7 +62,7 @@ func (e *Election) Resign() (err error) {
 
 // Leader returns the leader value for the current election.
 func (e *Election) Leader() (string, error) {
-	resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
+	resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...)
 	if err != nil {
 		return "", err
 	} else if len(resp.Kvs) == 0 {
@@ -70,7 +74,7 @@ func (e *Election) Leader() (string, error) {
 
 // Wait waits for a leader to be elected, returning the leader value.
 func (e *Election) Wait() (string, error) {
-	resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
+	resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...)
 	if err != nil {
 		return "", err
 	} else if len(resp.Kvs) != 0 {
@@ -89,10 +93,8 @@ func (e *Election) Wait() (string, error) {
 }
 
 func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
-	resp, err := NewRangeRev(
-		e.client,
-		e.keyPrefix,
-		tryKey.Revision()-1).LastCreate()
+	opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1))
+	resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...)
 	if err != nil {
 		return err
 	} else if len(resp.Kvs) == 0 {

+ 47 - 89
contrib/recipes/key.go

@@ -20,36 +20,32 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/lease"
 )
 
 // Key is a key/revision pair created by the client and stored on etcd
 type RemoteKV struct {
-	client *clientv3.Client
-	key    string
-	rev    int64
-	val    string
+	kv  v3.KV
+	key string
+	rev int64
+	val string
 }
 
-func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
-	return NewKV(client, key, "", leaseID)
+func NewKey(kv v3.KV, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	return NewKV(kv, key, "", leaseID)
 }
 
-func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
-	rev, err := putNewKV(client, key, val, leaseID)
+func NewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	rev, err := putNewKV(kv, key, val, leaseID)
 	if err != nil {
 		return nil, err
 	}
-	return &RemoteKV{client, key, rev, val}, nil
+	return &RemoteKV{kv, key, rev, val}, nil
 }
 
-func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
-	resp, err := client.KV.Range(
-		context.TODO(),
-		&pb.RangeRequest{Key: []byte(key)},
-	)
+func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) {
+	resp, err := kv.Get(context.TODO(), key)
 	if err != nil {
 		return nil, err
 	}
@@ -59,23 +55,19 @@ func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
 		rev = resp.Kvs[0].ModRevision
 		val = string(resp.Kvs[0].Value)
 	}
-	return &RemoteKV{
-		client: client,
-		key:    key,
-		rev:    rev,
-		val:    val}, nil
+	return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil
 }
 
-func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) {
-	return NewUniqueKV(client, prefix, "", 0)
+func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) {
+	return NewUniqueKV(kv, prefix, "", 0)
 }
 
-func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
 	for {
 		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
-		rev, err := putNewKV(client, newKey, val, 0)
+		rev, err := putNewKV(kv, newKey, val, 0)
 		if err == nil {
-			return &RemoteKV{client, newKey, rev, val}, nil
+			return &RemoteKV{kv, newKey, rev, val}, nil
 		}
 		if err != ErrKeyExists {
 			return nil, err
@@ -85,22 +77,10 @@ func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lea
 
 // putNewKV attempts to create the given key, only succeeding if the key did
 // not yet exist.
-func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) {
-	cmp := &pb.Compare{
-		Result:      pb.Compare_EQUAL,
-		Target:      pb.Compare_VERSION,
-		Key:         []byte(key),
-		TargetUnion: &pb.Compare_Version{Version: 0}}
-
-	req := &pb.RequestUnion{
-		Request: &pb.RequestUnion_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   []byte(key),
-				Value: []byte(val),
-				Lease: int64(leaseID)}}}
-	txnresp, err := ec.KV.Txn(
-		context.TODO(),
-		&pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil})
+func putNewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (int64, error) {
+	cmp := v3.Compare(v3.Version(key), "=", 0)
+	req := v3.OpPut(key, val, v3.WithLease(leaseID))
+	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
 	if err != nil {
 		return 0, err
 	}
@@ -111,14 +91,14 @@ func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int6
 }
 
 // NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
-func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) {
-	return newSequentialKV(client, prefix, val, 0)
+func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
+	return newSequentialKV(kv, prefix, val, 0)
 }
 
 // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
 // value and lease.  Note: a bookkeeping node __<prefix> is also allocated.
-func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
-	resp, err := NewRange(client, prefix).LastKey()
+func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	resp, err := kv.Get(context.TODO(), prefix, withLastKey()...)
 	if err != nil {
 		return nil, err
 	}
@@ -127,9 +107,9 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.
 	newSeqNum := 0
 	if len(resp.Kvs) != 0 {
 		fields := strings.Split(string(resp.Kvs[0].Key), "/")
-		_, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
-		if err != nil {
-			return nil, err
+		_, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
+		if serr != nil {
+			return nil, serr
 		}
 		newSeqNum++
 	}
@@ -140,42 +120,22 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.
 	// N1: LastKey() == 1, start txn.
 	// N2: New Key 2, New Key 3, Delete Key 2
 	// N1: txn succeeds allocating key 2 when it shouldn't
-	baseKey := []byte("__" + prefix)
-	cmp := &pb.Compare{
-		Result: pb.Compare_LESS,
-		Target: pb.Compare_MOD,
-		Key:    []byte(baseKey),
-		// current revision might contain modification so +1
-		TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1},
-	}
+	baseKey := "__" + prefix
+
+	// current revision might contain modification so +1
+	cmp := v3.Compare(v3.ModifiedRevision(baseKey), "<", resp.Header.Revision+1)
+	reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID))
+	reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID))
 
-	reqPrefix := &pb.RequestUnion{
-		Request: &pb.RequestUnion_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   baseKey,
-				Lease: int64(leaseID),
-			}}}
-
-	reqNewKey := &pb.RequestUnion{
-		Request: &pb.RequestUnion_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   []byte(newKey),
-				Value: []byte(val),
-				Lease: int64(leaseID),
-			}}}
-
-	txnresp, err := client.KV.Txn(
-		context.TODO(),
-		&pb.TxnRequest{
-			[]*pb.Compare{cmp},
-			[]*pb.RequestUnion{reqPrefix, reqNewKey}, nil})
+	txn := kv.Txn(context.TODO())
+	txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit()
 	if err != nil {
 		return nil, err
 	}
 	if txnresp.Succeeded == false {
-		return newSequentialKV(client, prefix, val, leaseID)
+		return newSequentialKV(kv, prefix, val, leaseID)
 	}
-	return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil
+	return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
 }
 
 func (rk *RemoteKV) Key() string     { return rk.key }
@@ -183,18 +143,16 @@ func (rk *RemoteKV) Revision() int64 { return rk.rev }
 func (rk *RemoteKV) Value() string   { return rk.val }
 
 func (rk *RemoteKV) Delete() error {
-	if rk.client == nil {
+	if rk.kv == nil {
 		return nil
 	}
-	req := &pb.DeleteRangeRequest{Key: []byte(rk.key)}
-	_, err := rk.client.KV.DeleteRange(context.TODO(), req)
-	rk.client = nil
+	_, err := rk.kv.Delete(context.TODO(), rk.key)
+	rk.kv = nil
 	return err
 }
 
 func (rk *RemoteKV) Put(val string) error {
-	req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)}
-	_, err := rk.client.KV.Put(context.TODO(), req)
+	_, err := rk.kv.Put(context.TODO(), rk.key, val)
 	return err
 }
 
@@ -202,12 +160,12 @@ func (rk *RemoteKV) Put(val string) error {
 type EphemeralKV struct{ RemoteKV }
 
 // NewEphemeralKV creates a new key/value pair associated with a session lease
-func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) {
+func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
 	leaseID, err := SessionLease(client)
 	if err != nil {
 		return nil, err
 	}
-	k, err := NewKV(client, key, val, leaseID)
+	k, err := NewKV(v3.NewKV(client), key, val, leaseID)
 	if err != nil {
 		return nil, err
 	}
@@ -215,12 +173,12 @@ func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, err
 }
 
 // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
-func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) {
+func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) {
 	return NewUniqueEphemeralKV(client, prefix, "")
 }
 
 // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
-func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) {
+func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) {
 	for {
 		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
 		ek, err = NewEphemeralKV(client, newKey, val)

+ 19 - 35
contrib/recipes/lease.go

@@ -15,11 +15,9 @@ package recipe
 
 import (
 	"sync"
-	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 )
 
@@ -32,8 +30,9 @@ type clientLeaseMgr struct {
 }
 
 type leaseKeepAlive struct {
-	id    lease.LeaseID
-	donec chan struct{}
+	id     lease.LeaseID
+	cancel context.CancelFunc
+	donec  <-chan struct{}
 }
 
 func SessionLease(client *clientv3.Client) (lease.LeaseID, error) {
@@ -49,13 +48,10 @@ func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error)
 // would fail) or if transferring lease ownership.
 func StopSessionLease(client *clientv3.Client) {
 	clientLeases.mu.Lock()
-	lka, ok := clientLeases.leases[client]
-	if ok {
-		delete(clientLeases.leases, client)
-	}
+	lka := clientLeases.leases[client]
 	clientLeases.mu.Unlock()
 	if lka != nil {
-		lka.donec <- struct{}{}
+		lka.cancel()
 		<-lka.donec
 	}
 }
@@ -67,8 +63,7 @@ func RevokeSessionLease(client *clientv3.Client) (err error) {
 	clientLeases.mu.Unlock()
 	StopSessionLease(client)
 	if lka != nil {
-		req := &pb.LeaseRevokeRequest{ID: int64(lka.id)}
-		_, err = client.Lease.LeaseRevoke(context.TODO(), req)
+		_, err = clientv3.NewLease(client).Revoke(context.TODO(), lka.id)
 	}
 	return err
 }
@@ -80,48 +75,37 @@ func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lea
 		return lka.id, nil
 	}
 
-	resp, err := client.Lease.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: ttl})
+	lc := clientv3.NewLease(client)
+	resp, err := lc.Create(context.TODO(), ttl)
 	if err != nil {
 		return lease.NoLease, err
 	}
 	id := lease.LeaseID(resp.ID)
 
 	ctx, cancel := context.WithCancel(context.Background())
-	keepAlive, err := client.Lease.LeaseKeepAlive(ctx)
+	keepAlive, err := lc.KeepAlive(ctx, id)
 	if err != nil || keepAlive == nil {
 		return lease.NoLease, err
 	}
 
-	lka := &leaseKeepAlive{id: id, donec: make(chan struct{})}
+	donec := make(chan struct{})
+	lka := &leaseKeepAlive{
+		id:     id,
+		cancel: cancel,
+		donec:  donec}
 	clm.leases[client] = lka
 
-	// keep the lease alive until client error
+	// keep the lease alive until client error or cancelled context
 	go func() {
 		defer func() {
-			keepAlive.CloseSend()
 			clm.mu.Lock()
 			delete(clm.leases, client)
 			clm.mu.Unlock()
-			cancel()
-			close(lka.donec)
+			lc.Close()
+			close(donec)
 		}()
-
-		ttl := resp.TTL
-		for {
-			lreq := &pb.LeaseKeepAliveRequest{ID: int64(id)}
-			select {
-			case <-lka.donec:
-				return
-			case <-time.After(time.Duration(ttl/2) * time.Second):
-			}
-			if err := keepAlive.Send(lreq); err != nil {
-				break
-			}
-			resp, err := keepAlive.Recv()
-			if err != nil {
-				break
-			}
-			ttl = resp.TTL
+		for range keepAlive {
+			// eat messages until keep alive channel closes
 		}
 	}()
 

+ 15 - 10
contrib/recipes/mutex.go

@@ -17,29 +17,33 @@ package recipe
 import (
 	"sync"
 
-	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // Mutex implements the sync Locker interface with etcd
 type Mutex struct {
-	client *clientv3.Client
-	key    string
-	myKey  *RemoteKV
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
+
+	key   string
+	myKey *EphemeralKV
 }
 
-func NewMutex(client *clientv3.Client, key string) *Mutex {
-	return &Mutex{client, key, nil}
+func NewMutex(client *v3.Client, key string) *Mutex {
+	return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil}
 }
 
 func (m *Mutex) Lock() (err error) {
 	// put self in lock waiters via myKey; oldest waiter holds lock
-	m.myKey, err = NewUniqueKey(m.client, m.key)
+	m.myKey, err = NewUniqueEphemeralKey(m.client, m.key)
 	if err != nil {
 		return err
 	}
 	// find oldest element in waiters via revision of insertion
-	resp, err := NewRange(m.client, m.key).FirstRev()
+	resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...)
 	if err != nil {
 		return err
 	}
@@ -48,7 +52,8 @@ func (m *Mutex) Lock() (err error) {
 		return nil
 	}
 	// otherwise myKey isn't lowest, so there must be a key prior to myKey
-	lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev()
+	opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1))
+	lastKey, err := m.kv.Get(m.ctx, m.key, opts...)
 	if err != nil {
 		return err
 	}
@@ -81,6 +86,6 @@ func (lm *lockerMutex) Unlock() {
 	}
 }
 
-func NewLocker(client *clientv3.Client, key string) sync.Locker {
+func NewLocker(client *v3.Client, key string) sync.Locker {
 	return &lockerMutex{NewMutex(client, key)}
 }

+ 11 - 8
contrib/recipes/priority_queue.go

@@ -17,25 +17,28 @@ package recipe
 import (
 	"fmt"
 
-	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // PriorityQueue implements a multi-reader, multi-writer distributed queue.
 type PriorityQueue struct {
-	client *clientv3.Client
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
 	key    string
 }
 
 // NewPriorityQueue creates an etcd priority queue.
-func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue {
-	return &PriorityQueue{client, key + "/"}
+func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
+	return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"}
 }
 
 // Enqueue puts a value into a queue with a given priority.
 func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
 	prefix := fmt.Sprintf("%s%05d", q.key, pr)
-	_, err := NewSequentialKV(q.client, prefix, val)
+	_, err := NewSequentialKV(q.kv, prefix, val)
 	return err
 }
 
@@ -43,12 +46,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
 // queue is empty, Dequeue blocks until items are available.
 func (q *PriorityQueue) Dequeue() (string, error) {
 	// TODO: fewer round trips by fetching more than one key
-	resp, err := NewRange(q.client, q.key).FirstKey()
+	resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...)
 	if err != nil {
 		return "", err
 	}
 
-	kv, err := claimFirstKey(q.client.KV, resp.Kvs)
+	kv, err := claimFirstKey(q.kv, resp.Kvs)
 	if err != nil {
 		return "", err
 	} else if kv != nil {
@@ -68,7 +71,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
+	ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
 	if err != nil {
 		return "", err
 	} else if !ok {

+ 12 - 8
contrib/recipes/queue.go

@@ -15,22 +15,26 @@
 package recipe
 
 import (
-	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 // Queue implements a multi-reader, multi-writer distributed queue.
 type Queue struct {
-	client    *clientv3.Client
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
+
 	keyPrefix string
 }
 
-func NewQueue(client *clientv3.Client, keyPrefix string) *Queue {
-	return &Queue{client, keyPrefix}
+func NewQueue(client *v3.Client, keyPrefix string) *Queue {
+	return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix}
 }
 
 func (q *Queue) Enqueue(val string) error {
-	_, err := NewUniqueKV(q.client, q.keyPrefix, val, 0)
+	_, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0)
 	return err
 }
 
@@ -38,12 +42,12 @@ func (q *Queue) Enqueue(val string) error {
 // queue is empty, Dequeue blocks until elements are available.
 func (q *Queue) Dequeue() (string, error) {
 	// TODO: fewer round trips by fetching more than one key
-	resp, err := NewRange(q.client, q.keyPrefix).FirstRev()
+	resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...)
 	if err != nil {
 		return "", err
 	}
 
-	kv, err := claimFirstKey(q.client.KV, resp.Kvs)
+	kv, err := claimFirstKey(q.kv, resp.Kvs)
 	if err != nil {
 		return "", err
 	} else if kv != nil {
@@ -63,7 +67,7 @@ func (q *Queue) Dequeue() (string, error) {
 		return "", err
 	}
 
-	ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
+	ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
 	if err != nil {
 		return "", err
 	} else if !ok {

+ 14 - 88
contrib/recipes/range.go

@@ -15,94 +15,20 @@
 package recipe
 
 import (
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	v3 "github.com/coreos/etcd/clientv3"
 )
 
-type Range struct {
-	kv     pb.KVClient
-	key    []byte
-	rev    int64
-	keyEnd []byte
-}
-
-func NewRange(client *clientv3.Client, key string) *Range {
-	return NewRangeRev(client, key, 0)
-}
-
-func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range {
-	return &Range{client.KV, []byte(key), rev, prefixEnd(key)}
-}
-
-// Prefix performs a RangeRequest to get keys matching <key>*
-func (r *Range) Prefix() (*pb.RangeResponse, error) {
-	return r.kv.Range(
-		context.TODO(),
-		&pb.RangeRequest{
-			Key:      prefixNext(string(r.key)),
-			RangeEnd: r.keyEnd,
-			Revision: r.rev})
-}
-
-// OpenInterval gets the keys in the set <key>* - <key>
-func (r *Range) OpenInterval() (*pb.RangeResponse, error) {
-	return r.kv.Range(
-		context.TODO(),
-		&pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev})
-}
-
-func (r *Range) FirstKey() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY)
-}
-
-func (r *Range) LastKey() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY)
-}
-
-func (r *Range) FirstRev() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
-}
-
-func (r *Range) LastRev() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
-}
-
-func (r *Range) FirstCreate() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
-}
-
-func (r *Range) LastCreate() (*pb.RangeResponse, error) {
-	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
-}
-
-// topTarget gets the first key for a given sort order and target
-func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) {
-	return r.kv.Range(
-		context.TODO(),
-		&pb.RangeRequest{
-			Key:        r.key,
-			RangeEnd:   r.keyEnd,
-			Limit:      1,
-			Revision:   r.rev,
-			SortOrder:  order,
-			SortTarget: target})
-}
-
-// prefixNext returns the first key possibly matched by <prefix>* - <prefix>
-func prefixNext(prefix string) []byte {
-	return append([]byte(prefix), 0)
-}
-
-// prefixEnd returns the last key possibly matched by <prefix>*
-func prefixEnd(prefix string) []byte {
-	keyEnd := []byte(prefix)
-	for i := len(keyEnd) - 1; i >= 0; i-- {
-		if keyEnd[i] < 0xff {
-			keyEnd[i] = keyEnd[i] + 1
-			keyEnd = keyEnd[:i+1]
-			break
-		}
-	}
-	return keyEnd
+func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) }
+func withLastCreate() []v3.OpOption  { return withTop(v3.SortByCreatedRev, v3.SortDescend) }
+func withFirstKey() []v3.OpOption    { return withTop(v3.SortByKey, v3.SortAscend) }
+func withLastKey() []v3.OpOption     { return withTop(v3.SortByKey, v3.SortDescend) }
+func withFirstRev() []v3.OpOption    { return withTop(v3.SortByModifiedRev, v3.SortAscend) }
+func withLastRev() []v3.OpOption     { return withTop(v3.SortByModifiedRev, v3.SortDescend) }
+
+// withTop gets the first key over the get's prefix given a sort order
+func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption {
+	return []v3.OpOption{
+		v3.WithPrefix(),
+		v3.WithSort(target, order),
+		v3.WithLimit(1)}
 }

+ 19 - 14
contrib/recipes/rwmutex.go

@@ -15,23 +15,26 @@
 package recipe
 
 import (
-	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
 type RWMutex struct {
-	client *clientv3.Client
-	key    string
-	myKey  *RemoteKV
+	client *v3.Client
+	kv     v3.KV
+	ctx    context.Context
+
+	key   string
+	myKey *EphemeralKV
 }
 
-func NewRWMutex(client *clientv3.Client, key string) *RWMutex {
-	return &RWMutex{client, key, nil}
+func NewRWMutex(client *v3.Client, key string) *RWMutex {
+	return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil}
 }
 
 func (rwm *RWMutex) RLock() error {
-	// XXX: make reads ephemeral locks?
-	rk, err := NewUniqueKey(rwm.client, rwm.key+"/read")
+	rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read")
 	if err != nil {
 		return err
 	}
@@ -39,7 +42,7 @@ func (rwm *RWMutex) RLock() error {
 
 	// if there are nodes with "write-" and a lower
 	// revision number than us we must wait
-	resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev()
+	resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...)
 	if err != nil {
 		return err
 	}
@@ -51,21 +54,22 @@ func (rwm *RWMutex) RLock() error {
 }
 
 func (rwm *RWMutex) Lock() error {
-	rk, err := NewUniqueKey(rwm.client, rwm.key+"/write")
+	rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write")
 	if err != nil {
 		return err
 	}
 	rwm.myKey = rk
 
 	for {
-		// any key of lower rev number blocks the write lock
-		resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev()
+		// find any key of lower rev number blocks the write lock
+		opts := append(withLastRev(), v3.WithRev(rk.Revision()-1))
+		resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
 		if err != nil {
 			return err
 		}
 		if len(resp.Kvs) == 0 {
 			// no matching for revision before myKey; acquired
-			return nil
+			break
 		}
 		if err := rwm.waitOnLowest(); err != nil {
 			return err
@@ -78,7 +82,8 @@ func (rwm *RWMutex) Lock() error {
 
 func (rwm *RWMutex) waitOnLowest() error {
 	// must block; get key before ek for waiting
-	lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev()
+	opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
+	lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
 	if err != nil {
 		return err
 	}

+ 15 - 23
contrib/recipes/stm.go

@@ -16,13 +16,13 @@ package recipe
 
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	v3 "github.com/coreos/etcd/clientv3"
 )
 
 // STM implements software transactional memory over etcd
 type STM struct {
-	client *clientv3.Client
+	client *v3.Client
+	kv     v3.KV
 	// rset holds the read key's value and revision of read
 	rset map[string]*RemoteKV
 	// wset holds the write key and its value
@@ -33,8 +33,8 @@ type STM struct {
 }
 
 // NewSTM creates new transaction loop for a given apply function.
-func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
-	s := &STM{client: client, apply: apply}
+func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error {
+	s := &STM{client: client, kv: v3.NewKV(client), apply: apply}
 	errc := make(chan error, 1)
 	go func() {
 		var err error
@@ -43,7 +43,8 @@ func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
 			if err = apply(s); err != nil || s.aborted {
 				break
 			}
-			if ok, err := s.commit(); ok || err != nil {
+			if ok, cerr := s.commit(); ok || cerr != nil {
+				err = cerr
 				break
 			}
 		}
@@ -63,7 +64,7 @@ func (s *STM) Get(key string) (string, error) {
 	if rk, ok := s.rset[key]; ok {
 		return rk.Value(), nil
 	}
-	rk, err := GetRemoteKV(s.client, key)
+	rk, err := GetRemoteKV(s.kv, key)
 	if err != nil {
 		return "", err
 	}
@@ -76,30 +77,21 @@ func (s *STM) Get(key string) (string, error) {
 func (s *STM) Put(key string, val string) { s.wset[key] = val }
 
 // commit attempts to apply the txn's changes to the server.
-func (s *STM) commit() (ok bool, err error) {
+func (s *STM) commit() (ok bool, rr error) {
 	// read set must not change
-	cmps := []*pb.Compare{}
+	cmps := make([]v3.Cmp, 0, len(s.rset))
 	for k, rk := range s.rset {
 		// use < to support updating keys that don't exist yet
-		cmp := &pb.Compare{
-			Result:      pb.Compare_LESS,
-			Target:      pb.Compare_MOD,
-			Key:         []byte(k),
-			TargetUnion: &pb.Compare_ModRevision{ModRevision: rk.Revision() + 1},
-		}
+		cmp := v3.Compare(v3.ModifiedRevision(k), "<", rk.Revision()+1)
 		cmps = append(cmps, cmp)
 	}
+
 	// apply all writes
-	puts := []*pb.RequestUnion{}
+	puts := make([]v3.Op, 0, len(s.wset))
 	for k, v := range s.wset {
-		puts = append(puts, &pb.RequestUnion{
-			Request: &pb.RequestUnion_RequestPut{
-				RequestPut: &pb.PutRequest{
-					Key:   []byte(k),
-					Value: []byte(v),
-				}}})
+		puts = append(puts, v3.OpPut(k, v))
 	}
-	txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil})
+	txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit()
 	return txnresp.Succeeded, err
 }
 

+ 24 - 121
contrib/recipes/watch.go

@@ -17,138 +17,41 @@ package recipe
 import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/clientv3"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
-type Watcher struct {
-	wstream pb.Watch_WatchClient
-	cancel  context.CancelFunc
-	donec   chan struct{}
-	id      storage.WatchID
-	recvc   chan *storagepb.Event
-	lastErr error
-}
-
-func NewWatcher(c *clientv3.Client, key string, rev int64) (*Watcher, error) {
-	return newWatcher(c, key, rev, false)
-}
-
-func NewPrefixWatcher(c *clientv3.Client, prefix string, rev int64) (*Watcher, error) {
-	return newWatcher(c, prefix, rev, true)
-}
-
-func newWatcher(c *clientv3.Client, key string, rev int64, isPrefix bool) (*Watcher, error) {
-	ctx, cancel := context.WithCancel(context.Background())
-	w, err := c.Watch.Watch(ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	req := &pb.WatchCreateRequest{StartRevision: rev}
-	if isPrefix {
-		req.Prefix = []byte(key)
-	} else {
-		req.Key = []byte(key)
-	}
-
-	if err := w.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{CreateRequest: req}}); err != nil {
-		return nil, err
-	}
-
-	wresp, err := w.Recv()
-	if err != nil {
-		return nil, err
-	}
-	if len(wresp.Events) != 0 || wresp.Created != true {
-		return nil, ErrWaitMismatch
-	}
-	ret := &Watcher{
-		wstream: w,
-		cancel:  cancel,
-		donec:   make(chan struct{}),
-		id:      storage.WatchID(wresp.WatchId),
-		recvc:   make(chan *storagepb.Event),
-	}
-	go ret.recvLoop()
-	return ret, nil
-}
-
-func (w *Watcher) Close() error {
-	defer w.cancel()
-	if w.wstream == nil {
-		return w.lastErr
-	}
-	req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{
-		CancelRequest: &pb.WatchCancelRequest{
-			WatchId: int64(w.id)}}}
-	err := w.wstream.Send(req)
-	if err != nil && w.lastErr == nil {
-		return err
+// WaitEvents waits on a key until it observes the given events and returns the final one.
+func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+	w := clientv3.NewWatcher(c)
+	wc := w.Watch(context.Background(), key, rev)
+	if wc == nil {
+		w.Close()
+		return nil, ErrNoWatcher
 	}
-	w.wstream.CloseSend()
-	w.donec <- struct{}{}
-	<-w.donec
-	w.wstream = nil
-	return w.lastErr
+	return waitEvents(wc, evs), w.Close()
 }
 
-func (w *Watcher) Chan() <-chan *storagepb.Event { return w.recvc }
-
-func (w *Watcher) recvLoop() {
-	defer close(w.donec)
-	for {
-		wresp, err := w.wstream.Recv()
-		if err != nil {
-			w.lastErr = err
-			break
-		}
-		for i := range wresp.Events {
-			select {
-			case <-w.donec:
-				close(w.recvc)
-				return
-			case w.recvc <- wresp.Events[i]:
-			}
-		}
+func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+	w := clientv3.NewWatcher(c)
+	wc := w.WatchPrefix(context.Background(), prefix, rev)
+	if wc == nil {
+		w.Close()
+		return nil, ErrNoWatcher
 	}
-	close(w.recvc)
-	<-w.donec
+	return waitEvents(wc, evs), w.Close()
 }
 
-func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
 	i := 0
-	for {
-		ev, ok := <-w.recvc
-		if !ok {
-			break
-		}
-		if ev.Type == evs[i] {
-			i++
-			if i == len(evs) {
-				return ev, nil
+	for wresp := range wc {
+		for _, ev := range wresp.Events {
+			if ev.Type == evs[i] {
+				i++
+				if i == len(evs) {
+					return ev
+				}
 			}
 		}
 	}
-	return nil, w.Close()
-}
-
-// WaitEvents waits on a key until it observes the given events and returns the final one.
-func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
-	w, err := NewWatcher(c, key, rev)
-	if err != nil {
-		return nil, err
-	}
-	defer w.Close()
-	return w.waitEvents(evs)
-}
-
-func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
-	w, err := NewPrefixWatcher(c, prefix, rev)
-	if err != nil {
-		return nil, err
-	}
-	defer w.Close()
-	return w.waitEvents(evs)
+	return nil
 }

+ 5 - 4
integration/v3_stm_test.go

@@ -19,6 +19,7 @@ import (
 	"strconv"
 	"testing"
 
+	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/contrib/recipes"
 )
 
@@ -30,7 +31,7 @@ func TestSTMConflict(t *testing.T) {
 	etcdc := clus.RandClient()
 	keys := make([]*recipe.RemoteKV, 5)
 	for i := 0; i < len(keys); i++ {
-		rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
+		rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0)
 		if err != nil {
 			t.Fatalf("could not make key (%v)", err)
 		}
@@ -75,7 +76,7 @@ func TestSTMConflict(t *testing.T) {
 	// ensure sum matches initial sum
 	sum := 0
 	for _, oldRK := range keys {
-		rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key())
+		rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key())
 		if err != nil {
 			t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
 		}
@@ -102,7 +103,7 @@ func TestSTMPutNewKey(t *testing.T) {
 		t.Fatalf("error on stm txn (%v)", err)
 	}
 
-	rk, err := recipe.GetRemoteKV(etcdc, "foo")
+	rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
 	if err != nil {
 		t.Fatalf("error fetching key (%v)", err)
 	}
@@ -128,7 +129,7 @@ func TestSTMAbort(t *testing.T) {
 		t.Fatalf("error on stm txn (%v)", err)
 	}
 
-	rk, err := recipe.GetRemoteKV(etcdc, "foo")
+	rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
 	if err != nil {
 		t.Fatalf("error fetching key (%v)", err)
 	}