浏览代码

Merge pull request #4238 from heyitsanthony/v3-recipes

contrib: v3 recipes
Anthony Romano 10 年之前
父节点
当前提交
ae05c87c2f

+ 63 - 0
contrib/recipes/barrier.go

@@ -0,0 +1,63 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// Barrier creates a key in etcd to block processes, then deletes the key to
+// release all blocked processes.
+type Barrier struct {
+	client *EtcdClient
+	key    string
+}
+
+func NewBarrier(client *EtcdClient, key string) *Barrier {
+	return &Barrier{client, key}
+}
+
+// Hold creates the barrier key causing processes to block on Wait.
+func (b *Barrier) Hold() error {
+	_, err := NewKey(b.client, b.key, 0)
+	return err
+}
+
+// Release deletes the barrier key to unblock all waiting processes.
+func (b *Barrier) Release() error {
+	_, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)})
+	return err
+}
+
+// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
+// assumes Release has already been called and returns immediately.
+func (b *Barrier) Wait() error {
+	resp, err := NewRange(b.client, b.key).FirstKey()
+	if err != nil {
+		return err
+	}
+	if len(resp.Kvs) == 0 {
+		// key already removed
+		return nil
+	}
+	_, err = WaitEvents(
+		b.client,
+		b.key,
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.PUT, storagepb.DELETE})
+	return err
+}

+ 87 - 0
contrib/recipes/client.go

@@ -0,0 +1,87 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"errors"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	spb "github.com/coreos/etcd/storage/storagepb"
+)
+
+var (
+	ErrKeyExists    = errors.New("key already exists")
+	ErrWaitMismatch = errors.New("unexpected wait result")
+)
+
+type EtcdClient struct {
+	conn  *grpc.ClientConn
+	KV    pb.KVClient
+	Lease pb.LeaseClient
+	Watch pb.WatchClient
+}
+
+func NewEtcdClient(conn *grpc.ClientConn) *EtcdClient {
+	kv := pb.NewKVClient(conn)
+	lease := pb.NewLeaseClient(conn)
+	watch := pb.NewWatchClient(conn)
+	return &EtcdClient{conn, kv, lease, watch}
+}
+
+// deleteRevKey deletes a key by revision, returning false if key is missing
+func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) {
+	cmp := &pb.Compare{
+		Result:      pb.Compare_EQUAL,
+		Target:      pb.Compare_MOD,
+		Key:         []byte(key),
+		ModRevision: rev}
+	req := &pb.RequestUnion{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}
+	txnresp, err := ec.KV.Txn(
+		context.TODO(),
+		&pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil})
+	if err != nil {
+		return false, err
+	} else if txnresp.Succeeded == false {
+		return false, nil
+	}
+	return true, nil
+}
+
+func (ec *EtcdClient) claimFirstKey(kvs []*spb.KeyValue) (*spb.KeyValue, error) {
+	for _, kv := range kvs {
+		ok, err := ec.deleteRevKey(string(kv.Key), kv.ModRevision)
+		if err != nil {
+			return nil, err
+		} else if ok {
+			return kv, nil
+		}
+	}
+	return nil, nil
+}
+
+func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) {
+	return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}})
+}
+
+// deletePrefix performs a RangeRequest to get keys on a given prefix
+func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) {
+	return kv.DeleteRange(
+		context.TODO(),
+		&pb.DeleteRangeRequest{
+			Key:      []byte(prefix),
+			RangeEnd: []byte(prefixEnd(prefix))})
+}

+ 190 - 0
contrib/recipes/key.go

@@ -0,0 +1,190 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
+)
+
+// Key is a key/revision pair created by the client and stored on etcd
+type RemoteKV struct {
+	client *EtcdClient
+	key    string
+	rev    int64
+	val    string
+}
+
+func NewKey(client *EtcdClient, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	return NewKV(client, key, "", leaseID)
+}
+
+func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	rev, err := putNewKV(client, key, val, leaseID)
+	if err != nil {
+		return nil, err
+	}
+	return &RemoteKV{client, key, rev, val}, nil
+}
+
+func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) {
+	resp, err := client.KV.Range(
+		context.TODO(),
+		&pb.RangeRequest{Key: []byte(key)},
+	)
+	if err != nil {
+		return nil, err
+	}
+	rev := resp.Header.Revision
+	val := ""
+	if len(resp.Kvs) > 0 {
+		rev = resp.Kvs[0].ModRevision
+		val = string(resp.Kvs[0].Value)
+	}
+	return &RemoteKV{
+		client: client,
+		key:    key,
+		rev:    rev,
+		val:    val}, nil
+}
+
+func NewUniqueKey(client *EtcdClient, prefix string) (*RemoteKV, error) {
+	return NewUniqueKV(client, prefix, "", 0)
+}
+
+func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	for {
+		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
+		rev, err := putNewKV(client, newKey, val, 0)
+		if err == nil {
+			return &RemoteKV{client, newKey, rev, val}, nil
+		}
+		if err != ErrKeyExists {
+			return nil, err
+		}
+	}
+}
+
+// putNewKV attempts to create the given key, only succeeding if the key did
+// not yet exist.
+func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, error) {
+	cmp := &pb.Compare{
+		Result: pb.Compare_EQUAL,
+		Target: pb.Compare_VERSION,
+		Key:    []byte(key)}
+	req := &pb.RequestUnion{
+		RequestPut: &pb.PutRequest{
+			Key:   []byte(key),
+			Value: []byte(val),
+			Lease: int64(leaseID)}}
+
+	txnresp, err := ec.KV.Txn(
+		context.TODO(),
+		&pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil})
+	if err != nil {
+		return 0, err
+	}
+	if txnresp.Succeeded == false {
+		return 0, ErrKeyExists
+	}
+	return txnresp.Header.Revision, nil
+}
+
+// NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
+func NewSequentialKV(client *EtcdClient, prefix, val string) (*RemoteKV, error) {
+	return newSequentialKV(client, prefix, val, 0)
+}
+
+// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
+// value and lease.  Note: a bookkeeping node __<prefix> is also allocated.
+func newSequentialKV(client *EtcdClient, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
+	resp, err := NewRange(client, prefix).LastKey()
+	if err != nil {
+		return nil, err
+	}
+
+	// add 1 to last key, if any
+	newSeqNum := 0
+	if len(resp.Kvs) != 0 {
+		fields := strings.Split(string(resp.Kvs[0].Key), "/")
+		_, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
+		if err != nil {
+			return nil, err
+		}
+		newSeqNum++
+	}
+	newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
+
+	// base prefix key must be current (i.e., <=) with the server update;
+	// the base key is important to avoid the following:
+	// N1: LastKey() == 1, start txn.
+	// N2: New Key 2, New Key 3, Delete Key 2
+	// N1: txn succeeds allocating key 2 when it shouldn't
+	baseKey := []byte("__" + prefix)
+	cmp := &pb.Compare{
+		Result: pb.Compare_LESS,
+		Target: pb.Compare_MOD,
+		Key:    []byte(baseKey),
+		// current revision might contain modification so +1
+		ModRevision: resp.Header.Revision + 1,
+	}
+	prPrefix := &pb.PutRequest{Key: baseKey, Lease: int64(leaseID)}
+	reqPrefix := &pb.RequestUnion{RequestPut: prPrefix}
+
+	prNewKey := &pb.PutRequest{
+		Key:   []byte(newKey),
+		Value: []byte(val),
+		Lease: int64(leaseID),
+	}
+	reqNewKey := &pb.RequestUnion{RequestPut: prNewKey}
+
+	txnresp, err := client.KV.Txn(
+		context.TODO(),
+		&pb.TxnRequest{
+			[]*pb.Compare{cmp},
+			[]*pb.RequestUnion{reqPrefix, reqNewKey}, nil})
+	if err != nil {
+		return nil, err
+	}
+	if txnresp.Succeeded == false {
+		return newSequentialKV(client, prefix, val, leaseID)
+	}
+	return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil
+}
+
+func (rk *RemoteKV) Key() string     { return rk.key }
+func (rk *RemoteKV) Revision() int64 { return rk.rev }
+func (rk *RemoteKV) Value() string   { return rk.val }
+
+func (rk *RemoteKV) Delete() error {
+	if rk.client == nil {
+		return nil
+	}
+	req := &pb.DeleteRangeRequest{Key: []byte(rk.key)}
+	_, err := rk.client.KV.DeleteRange(context.TODO(), req)
+	rk.client = nil
+	return err
+}
+
+func (rk *RemoteKV) Put(val string) error {
+	req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)}
+	_, err := rk.client.KV.Put(context.TODO(), req)
+	return err
+}

+ 85 - 0
contrib/recipes/mutex.go

@@ -0,0 +1,85 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// Mutex implements the sync Locker interface with etcd
+type Mutex struct {
+	client *EtcdClient
+	key    string
+	myKey  *RemoteKV
+}
+
+func NewMutex(client *EtcdClient, key string) *Mutex {
+	return &Mutex{client, key, nil}
+}
+
+func (m *Mutex) Lock() (err error) {
+	// put self in lock waiters via myKey; oldest waiter holds lock
+	m.myKey, err = NewUniqueKey(m.client, m.key)
+	if err != nil {
+		return err
+	}
+	// find oldest element in waiters via revision of insertion
+	resp, err := NewRange(m.client, m.key).FirstRev()
+	if err != nil {
+		return err
+	}
+	// if myKey is oldest in waiters, then myKey holds the lock
+	if m.myKey.Revision() == resp.Kvs[0].CreateRevision {
+		return nil
+	}
+	// otherwise myKey isn't lowest, so there must be a key prior to myKey
+	lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev()
+	if err != nil {
+		return err
+	}
+	// wait for release on prior key
+	_, err = WaitEvents(
+		m.client,
+		string(lastKey.Kvs[0].Key),
+		m.myKey.Revision()-1,
+		[]storagepb.Event_EventType{storagepb.DELETE})
+	// myKey now oldest
+	return err
+}
+
+func (m *Mutex) Unlock() error {
+	err := m.myKey.Delete()
+	m.myKey = nil
+	return err
+}
+
+type lockerMutex struct{ *Mutex }
+
+func (lm *lockerMutex) Lock() {
+	if err := lm.Mutex.Lock(); err != nil {
+		panic(err)
+	}
+}
+func (lm *lockerMutex) Unlock() {
+	if err := lm.Mutex.Unlock(); err != nil {
+		panic(err)
+	}
+}
+
+func NewLocker(client *EtcdClient, key string) sync.Locker {
+	return &lockerMutex{NewMutex(client, key)}
+}

+ 77 - 0
contrib/recipes/priority_queue.go

@@ -0,0 +1,77 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"fmt"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// PriorityQueue implements a multi-reader, multi-writer distributed queue.
+type PriorityQueue struct {
+	client *EtcdClient
+	key    string
+}
+
+// NewPriorityQueue creates an etcd priority queue.
+func NewPriorityQueue(client *EtcdClient, key string) *PriorityQueue {
+	return &PriorityQueue{client, key + "/"}
+}
+
+// Enqueue puts a value into a queue with a given priority.
+func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
+	prefix := fmt.Sprintf("%s%05d", q.key, pr)
+	_, err := NewSequentialKV(q.client, prefix, val)
+	return err
+}
+
+// Dequeue returns Enqueued()'d items in FIFO order. If the
+// queue is empty, Dequeue blocks until items are available.
+func (q *PriorityQueue) Dequeue() (string, error) {
+	// TODO: fewer round trips by fetching more than one key
+	resp, err := NewRange(q.client, q.key).FirstKey()
+	if err != nil {
+		return "", err
+	}
+
+	kv, err := q.client.claimFirstKey(resp.Kvs)
+	if err != nil {
+		return "", err
+	} else if kv != nil {
+		return string(kv.Value), nil
+	} else if resp.More {
+		// missed some items, retry to read in more
+		return q.Dequeue()
+	}
+
+	// nothing to dequeue; wait on items
+	ev, err := WaitPrefixEvents(
+		q.client,
+		q.key,
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.PUT})
+	if err != nil {
+		return "", err
+	}
+
+	ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
+	if err != nil {
+		return "", err
+	} else if !ok {
+		return q.Dequeue()
+	}
+	return string(ev.Kv.Value), err
+}

+ 72 - 0
contrib/recipes/queue.go

@@ -0,0 +1,72 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// Queue implements a multi-reader, multi-writer distributed queue.
+type Queue struct {
+	client    *EtcdClient
+	keyPrefix string
+}
+
+func NewQueue(client *EtcdClient, keyPrefix string) *Queue {
+	return &Queue{client, keyPrefix}
+}
+
+func (q *Queue) Enqueue(val string) error {
+	_, err := NewUniqueKV(q.client, q.keyPrefix, val, 0)
+	return err
+}
+
+// Dequeue returns Enqueued()'d elements in FIFO order. If the
+// queue is empty, Dequeue blocks until elements are available.
+func (q *Queue) Dequeue() (string, error) {
+	// TODO: fewer round trips by fetching more than one key
+	resp, err := NewRange(q.client, q.keyPrefix).FirstRev()
+	if err != nil {
+		return "", err
+	}
+
+	kv, err := q.client.claimFirstKey(resp.Kvs)
+	if err != nil {
+		return "", err
+	} else if kv != nil {
+		return string(kv.Value), nil
+	} else if resp.More {
+		// missed some items, retry to read in more
+		return q.Dequeue()
+	}
+
+	// nothing yet; wait on elements
+	ev, err := WaitPrefixEvents(
+		q.client,
+		q.keyPrefix,
+		resp.Header.Revision,
+		[]storagepb.Event_EventType{storagepb.PUT})
+	if err != nil {
+		return "", err
+	}
+
+	ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision)
+	if err != nil {
+		return "", err
+	} else if !ok {
+		return q.Dequeue()
+	}
+	return string(ev.Kv.Value), err
+}

+ 107 - 0
contrib/recipes/range.go

@@ -0,0 +1,107 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type Range struct {
+	kv     pb.KVClient
+	key    []byte
+	rev    int64
+	keyEnd []byte
+}
+
+func NewRange(client *EtcdClient, key string) *Range {
+	return NewRangeRev(client, key, 0)
+}
+
+func NewRangeRev(client *EtcdClient, key string, rev int64) *Range {
+	return &Range{client.KV, []byte(key), rev, prefixEnd(key)}
+}
+
+// Prefix performs a RangeRequest to get keys matching <key>*
+func (r *Range) Prefix() (*pb.RangeResponse, error) {
+	return r.kv.Range(
+		context.TODO(),
+		&pb.RangeRequest{
+			Key:      prefixNext(string(r.key)),
+			RangeEnd: r.keyEnd,
+			Revision: r.rev})
+}
+
+// OpenInterval gets the keys in the set <key>* - <key>
+func (r *Range) OpenInterval() (*pb.RangeResponse, error) {
+	return r.kv.Range(
+		context.TODO(),
+		&pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev})
+}
+
+func (r *Range) FirstKey() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY)
+}
+
+func (r *Range) LastKey() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY)
+}
+
+func (r *Range) FirstRev() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
+}
+
+func (r *Range) LastRev() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
+}
+
+func (r *Range) FirstCreate() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
+}
+
+func (r *Range) LastCreate() (*pb.RangeResponse, error) {
+	return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
+}
+
+// topTarget gets the first key for a given sort order and target
+func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) {
+	return r.kv.Range(
+		context.TODO(),
+		&pb.RangeRequest{
+			Key:        r.key,
+			RangeEnd:   r.keyEnd,
+			Limit:      1,
+			Revision:   r.rev,
+			SortOrder:  order,
+			SortTarget: target})
+}
+
+// prefixNext returns the first key possibly matched by <prefix>* - <prefix>
+func prefixNext(prefix string) []byte {
+	return append([]byte(prefix), 0)
+}
+
+// prefixEnd returns the last key possibly matched by <prefix>*
+func prefixEnd(prefix string) []byte {
+	keyEnd := []byte(prefix)
+	for i := len(keyEnd) - 1; i >= 0; i-- {
+		if keyEnd[i] < 0xff {
+			keyEnd[i] = keyEnd[i] + 1
+			keyEnd = keyEnd[:i+1]
+			break
+		}
+	}
+	return keyEnd
+}

+ 94 - 0
contrib/recipes/rwmutex.go

@@ -0,0 +1,94 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+type RWMutex struct {
+	client *EtcdClient
+	key    string
+	myKey  *RemoteKV
+}
+
+func NewRWMutex(client *EtcdClient, key string) *RWMutex {
+	return &RWMutex{client, key, nil}
+}
+
+func (rwm *RWMutex) RLock() error {
+	// XXX: make reads ephemeral locks?
+	rk, err := NewUniqueKey(rwm.client, rwm.key+"/read")
+	if err != nil {
+		return err
+	}
+	rwm.myKey = rk
+
+	// if there are nodes with "write-" and a lower
+	// revision number than us we must wait
+	resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev()
+	if err != nil {
+		return err
+	}
+	if len(resp.Kvs) == 0 || resp.Kvs[0].ModRevision > rk.Revision() {
+		// no blocking since no write key
+		return nil
+	}
+	return rwm.waitOnLowest()
+}
+
+func (rwm *RWMutex) Lock() error {
+	rk, err := NewUniqueKey(rwm.client, rwm.key+"/write")
+	if err != nil {
+		return err
+	}
+	rwm.myKey = rk
+
+	for {
+		// any key of lower rev number blocks the write lock
+		resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev()
+		if err != nil {
+			return err
+		}
+		if len(resp.Kvs) == 0 {
+			// no matching for revision before myKey; acquired
+			return nil
+		}
+		if err := rwm.waitOnLowest(); err != nil {
+			return err
+		}
+		//  get the new lowest, etc until this is the only one left
+	}
+
+	return nil
+}
+
+func (rwm *RWMutex) waitOnLowest() error {
+	// must block; get key before ek for waiting
+	lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev()
+	if err != nil {
+		return err
+	}
+	// wait for release on prior key
+	_, err = WaitEvents(
+		rwm.client,
+		string(lastKey.Kvs[0].Key),
+		rwm.myKey.Revision(),
+		[]storagepb.Event_EventType{storagepb.DELETE})
+	return err
+}
+
+func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }
+func (rwm *RWMutex) Unlock() error  { return rwm.myKey.Delete() }

+ 104 - 0
contrib/recipes/stm.go

@@ -0,0 +1,104 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// STM implements software transactional memory over etcd
+type STM struct {
+	client *EtcdClient
+	// rset holds the read key's value and revision of read
+	rset map[string]*RemoteKV
+	// wset holds the write key and its value
+	wset map[string]string
+	// aborted is whether user aborted the txn
+	aborted bool
+	apply   func(*STM) error
+}
+
+// NewSTM creates new transaction loop for a given apply function.
+func NewSTM(client *EtcdClient, apply func(*STM) error) <-chan error {
+	s := &STM{client: client, apply: apply}
+	errc := make(chan error, 1)
+	go func() {
+		var err error
+		for {
+			s.clear()
+			if err = apply(s); err != nil || s.aborted {
+				break
+			}
+			if ok, err := s.commit(); ok || err != nil {
+				break
+			}
+		}
+		errc <- err
+	}()
+	return errc
+}
+
+// Abort abandons the apply loop, letting the transaction close without a commit.
+func (s *STM) Abort() { s.aborted = true }
+
+// Get returns the value for a given key, inserting the key into the txn's readset.
+func (s *STM) Get(key string) (string, error) {
+	if wv, ok := s.wset[key]; ok {
+		return wv, nil
+	}
+	if rk, ok := s.rset[key]; ok {
+		return rk.Value(), nil
+	}
+	rk, err := GetRemoteKV(s.client, key)
+	if err != nil {
+		return "", err
+	}
+	// TODO: setup watchers to abort txn early
+	s.rset[key] = rk
+	return rk.Value(), nil
+}
+
+// Put adds a value for a key to the write set.
+func (s *STM) Put(key string, val string) { s.wset[key] = val }
+
+// commit attempts to apply the txn's changes to the server.
+func (s *STM) commit() (ok bool, err error) {
+	// read set must not change
+	cmps := []*pb.Compare{}
+	for k, rk := range s.rset {
+		// use < to support updating keys that don't exist yet
+		cmp := &pb.Compare{
+			Result:      pb.Compare_LESS,
+			Target:      pb.Compare_MOD,
+			Key:         []byte(k),
+			ModRevision: rk.Revision() + 1,
+		}
+		cmps = append(cmps, cmp)
+	}
+	// apply all writes
+	puts := []*pb.RequestUnion{}
+	for k, v := range s.wset {
+		put := &pb.PutRequest{Key: []byte(k), Value: []byte(v)}
+		puts = append(puts, &pb.RequestUnion{RequestPut: put})
+	}
+	txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil})
+	return txnresp.Succeeded, err
+}
+
+func (s *STM) clear() {
+	s.rset = make(map[string]*RemoteKV)
+	s.wset = make(map[string]string)
+}

+ 147 - 0
contrib/recipes/watch.go

@@ -0,0 +1,147 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package recipe
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+type Watcher struct {
+	wstream pb.Watch_WatchClient
+	donec   chan struct{}
+	id      storage.WatchID
+	recvc   chan *storagepb.Event
+	lastErr error
+}
+
+func NewWatcher(c *EtcdClient, key string, rev int64) (*Watcher, error) {
+	return newWatcher(c, key, rev, false)
+}
+
+func NewPrefixWatcher(c *EtcdClient, prefix string, rev int64) (*Watcher, error) {
+	return newWatcher(c, prefix, rev, true)
+}
+
+func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, error) {
+	w, err := c.Watch.Watch(context.Background())
+	if err != nil {
+		return nil, err
+	}
+
+	req := &pb.WatchCreateRequest{StartRevision: rev}
+	if isPrefix {
+		req.Prefix = []byte(key)
+	} else {
+		req.Key = []byte(key)
+	}
+
+	if err := w.Send(&pb.WatchRequest{CreateRequest: req}); err != nil {
+		return nil, err
+	}
+
+	wresp, err := w.Recv()
+	if err != nil {
+		return nil, err
+	}
+	if len(wresp.Events) != 0 || wresp.Created != true {
+		return nil, ErrWaitMismatch
+	}
+	ret := &Watcher{
+		wstream: w,
+		donec:   make(chan struct{}),
+		id:      storage.WatchID(wresp.WatchId),
+		recvc:   make(chan *storagepb.Event),
+	}
+	go ret.recvLoop()
+	return ret, nil
+}
+
+func (w *Watcher) Close() error {
+	if w.wstream == nil {
+		return w.lastErr
+	}
+	req := &pb.WatchCancelRequest{WatchId: int64(w.id)}
+	err := w.wstream.Send(&pb.WatchRequest{CancelRequest: req})
+	if err != nil && w.lastErr == nil {
+		return err
+	}
+	w.wstream.CloseSend()
+	w.donec <- struct{}{}
+	<-w.donec
+	w.wstream = nil
+	return w.lastErr
+}
+
+func (w *Watcher) Chan() <-chan *storagepb.Event { return w.recvc }
+
+func (w *Watcher) recvLoop() {
+	defer close(w.donec)
+	for {
+		wresp, err := w.wstream.Recv()
+		if err != nil {
+			w.lastErr = err
+			break
+		}
+		for i := range wresp.Events {
+			select {
+			case <-w.donec:
+				close(w.recvc)
+				return
+			case w.recvc <- wresp.Events[i]:
+			}
+		}
+	}
+	close(w.recvc)
+	<-w.donec
+}
+
+func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+	i := 0
+	for {
+		ev, ok := <-w.recvc
+		if !ok {
+			break
+		}
+		if ev.Type == evs[i] {
+			i++
+			if i == len(evs) {
+				return ev, nil
+			}
+		}
+	}
+	return nil, w.Close()
+}
+
+// WaitEvents waits on a key until it observes the given events and returns the final one.
+func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+	w, err := NewWatcher(c, key, rev)
+	if err != nil {
+		return nil, err
+	}
+	defer w.Close()
+	return w.waitEvents(evs)
+}
+
+func WaitPrefixEvents(c *EtcdClient, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
+	w, err := NewPrefixWatcher(c, prefix, rev)
+	if err != nil {
+		return nil, err
+	}
+	defer w.Close()
+	return w.waitEvents(evs)
+}

+ 74 - 0
integration/v3_barrier_test.go

@@ -0,0 +1,74 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.package recipe
+package integration
+
+import (
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+func TestBarrierSingleNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testBarrier(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+}
+
+func TestBarrierMultiNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testBarrier(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+}
+
+func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
+	b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
+	if err := b.Hold(); err != nil {
+		t.Fatalf("could not hold barrier (%v)", err)
+	}
+	if err := b.Hold(); err == nil {
+		t.Fatalf("able to double-hold barrier")
+	}
+
+	donec := make(chan struct{})
+	for i := 0; i < waiters; i++ {
+		go func() {
+			b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier")
+			if err := b.Wait(); err != nil {
+				t.Fatalf("could not wait on barrier (%v)", err)
+			}
+			donec <- struct{}{}
+		}()
+	}
+
+	select {
+	case <-donec:
+		t.Fatalf("barrier did not wait")
+	default:
+	}
+
+	if err := b.Release(); err != nil {
+		t.Fatalf("could not release barrier (%v)", err)
+	}
+
+	timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
+	for i := 0; i < waiters; i++ {
+		select {
+		case <-timerC:
+			t.Fatalf("barrier timed out")
+		case <-donec:
+		}
+	}
+}

+ 136 - 0
integration/v3_lock_test.go

@@ -0,0 +1,136 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package integration
+
+import (
+	"math/rand"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+func TestMutexSingleNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+}
+
+func TestMutexMultiNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+}
+
+func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
+	// stream lock acquistions
+	lockedC := make(chan *recipe.Mutex, 1)
+	for i := 0; i < waiters; i++ {
+		go func() {
+			m := recipe.NewMutex(recipe.NewEtcdClient(chooseConn()), "test-mutex")
+			if err := m.Lock(); err != nil {
+				t.Fatalf("could not wait on lock (%v)", err)
+			}
+			lockedC <- m
+		}()
+	}
+	// unlock locked mutexes
+	timerC := time.After(time.Duration(waiters) * time.Second)
+	for i := 0; i < waiters; i++ {
+		select {
+		case <-timerC:
+			t.Fatalf("timed out waiting for lock %d", i)
+		case m := <-lockedC:
+			// lock acquired with m
+			select {
+			case <-lockedC:
+				t.Fatalf("lock %d followers did not wait", i)
+			default:
+			}
+			if err := m.Unlock(); err != nil {
+				t.Fatalf("could not release lock (%v)", err)
+			}
+		}
+	}
+}
+
+func BenchmarkMutex4Waiters(b *testing.B) {
+	// XXX switch tests to use TB interface
+	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	defer clus.Terminate(nil)
+	for i := 0; i < b.N; i++ {
+		testMutex(nil, 4, func() *grpc.ClientConn { return clus.RandConn() })
+	}
+}
+
+func TestRWMutexSingleNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testRWMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] })
+}
+
+func TestRWMutexMultiNode(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testRWMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() })
+}
+
+func testRWMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) {
+	// stream rwlock acquistions
+	rlockedC := make(chan *recipe.RWMutex, 1)
+	wlockedC := make(chan *recipe.RWMutex, 1)
+	for i := 0; i < waiters; i++ {
+		go func() {
+			rwm := recipe.NewRWMutex(recipe.NewEtcdClient(chooseConn()), "test-rwmutex")
+			if rand.Intn(1) == 0 {
+				if err := rwm.RLock(); err != nil {
+					t.Fatalf("could not rlock (%v)", err)
+				}
+				rlockedC <- rwm
+			} else {
+				if err := rwm.Lock(); err != nil {
+					t.Fatalf("could not lock (%v)", err)
+				}
+				wlockedC <- rwm
+			}
+		}()
+	}
+	// unlock locked rwmutexes
+	timerC := time.After(time.Duration(waiters) * time.Second)
+	for i := 0; i < waiters; i++ {
+		select {
+		case <-timerC:
+			t.Fatalf("timed out waiting for lock %d", i)
+		case wl := <-wlockedC:
+			select {
+			case <-rlockedC:
+				t.Fatalf("rlock %d readers did not wait", i)
+			default:
+			}
+			if err := wl.Unlock(); err != nil {
+				t.Fatalf("could not release lock (%v)", err)
+			}
+		case rl := <-rlockedC:
+			select {
+			case <-wlockedC:
+				t.Fatalf("rlock %d writers did not wait", i)
+			default:
+			}
+			if err := rl.RUnlock(); err != nil {
+				t.Fatalf("could not release rlock (%v)", err)
+			}
+		}
+	}
+}

+ 219 - 0
integration/v3_queue_test.go

@@ -0,0 +1,219 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.package recipe
+package integration
+
+import (
+	"fmt"
+	"math/rand"
+	"sync/atomic"
+	"testing"
+
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+const (
+	manyQueueClients    = 3
+	queueItemsPerClient = 2
+)
+
+// TestQueueOneReaderOneWriter confirms the queue is FIFO
+func TestQueueOneReaderOneWriter(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	defer clus.Terminate(t)
+
+	go func() {
+		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		q := recipe.NewQueue(etcdc, "testq")
+		for i := 0; i < 5; i++ {
+			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
+				t.Fatalf("error enqueuing (%v)", err)
+			}
+		}
+	}()
+
+	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	q := recipe.NewQueue(etcdc, "testq")
+	for i := 0; i < 5; i++ {
+		s, err := q.Dequeue()
+		if err != nil {
+			t.Fatalf("error dequeueing (%v)", err)
+		}
+		if s != fmt.Sprintf("%d", i) {
+			t.Fatalf("expected dequeue value %v, got %v", s, i)
+		}
+	}
+}
+
+func TestQueueManyReaderOneWriter(t *testing.T) {
+	testQueueNReaderMWriter(t, manyQueueClients, 1)
+}
+
+func TestQueueOneReaderManyWriter(t *testing.T) {
+	testQueueNReaderMWriter(t, 1, manyQueueClients)
+}
+
+func TestQueueManyReaderManyWriter(t *testing.T) {
+	testQueueNReaderMWriter(t, manyQueueClients, manyQueueClients)
+}
+
+// BenchmarkQueue benchmarks Queues using many/many readers/writers
+func BenchmarkQueue(b *testing.B) {
+	// XXX switch tests to use TB interface
+	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	defer clus.Terminate(nil)
+	for i := 0; i < b.N; i++ {
+		testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
+	}
+}
+
+// TestPrQueue tests whether priority queues respect priorities.
+func TestPrQueueOneReaderOneWriter(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	defer clus.Terminate(t)
+
+	// write out five items with random priority
+	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	q := recipe.NewPriorityQueue(etcdc, "testprq")
+	for i := 0; i < 5; i++ {
+		// [0, 2] priority for priority collision to test seq keys
+		pr := uint16(rand.Intn(3))
+		if err := q.Enqueue(fmt.Sprintf("%d", pr), pr); err != nil {
+			t.Fatalf("error enqueuing (%v)", err)
+		}
+	}
+
+	// read back items; confirm priority order is respected
+	lastPr := -1
+	for i := 0; i < 5; i++ {
+		s, err := q.Dequeue()
+		if err != nil {
+			t.Fatalf("error dequeueing (%v)", err)
+		}
+		curPr := 0
+		if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil {
+			t.Fatalf(`error parsing item "%s" (%v)`, s, err)
+		}
+		if lastPr > curPr {
+			t.Fatalf("expected priority %v > %v", curPr, lastPr)
+		}
+	}
+}
+
+func TestPrQueueManyReaderManyWriter(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	rqs := newPriorityQueues(clus, manyQueueClients)
+	wqs := newPriorityQueues(clus, manyQueueClients)
+	testReadersWriters(t, rqs, wqs)
+}
+
+// BenchmarkQueue benchmarks Queues using n/n readers/writers
+func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
+	// XXX switch tests to use TB interface
+	clus := newClusterGRPC(nil, &clusterConfig{size: 3})
+	defer clus.Terminate(nil)
+	rqs := newPriorityQueues(clus, 1)
+	wqs := newPriorityQueues(clus, 1)
+	for i := 0; i < b.N; i++ {
+		testReadersWriters(nil, rqs, wqs)
+	}
+}
+
+func testQueueNReaderMWriter(t *testing.T, n int, m int) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+	testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
+}
+
+func newQueues(clus *clusterV3, n int) (qs []testQueue) {
+	for i := 0; i < n; i++ {
+		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		qs = append(qs, recipe.NewQueue(etcdc, "q"))
+	}
+	return qs
+}
+
+func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) {
+	for i := 0; i < n; i++ {
+		etcdc := recipe.NewEtcdClient(clus.RandConn())
+		q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}
+		qs = append(qs, q)
+	}
+	return qs
+}
+
+func testReadersWriters(t *testing.T, rqs []testQueue, wqs []testQueue) {
+	rerrc := make(chan error)
+	werrc := make(chan error)
+	manyWriters(wqs, queueItemsPerClient, werrc)
+	manyReaders(rqs, len(wqs)*queueItemsPerClient, rerrc)
+	for range wqs {
+		if err := <-werrc; err != nil {
+			t.Errorf("error writing (%v)", err)
+		}
+	}
+	for range rqs {
+		if err := <-rerrc; err != nil {
+			t.Errorf("error reading (%v)", err)
+		}
+	}
+}
+
+func manyReaders(qs []testQueue, totalReads int, errc chan<- error) {
+	var rxReads int32
+	for _, q := range qs {
+		go func(q testQueue) {
+			for {
+				total := atomic.AddInt32(&rxReads, 1)
+				if int(total) > totalReads {
+					break
+				}
+				if _, err := q.Dequeue(); err != nil {
+					errc <- err
+					return
+				}
+			}
+			errc <- nil
+		}(q)
+	}
+}
+
+func manyWriters(qs []testQueue, writesEach int, errc chan<- error) {
+	for _, q := range qs {
+		go func(q testQueue) {
+			for j := 0; j < writesEach; j++ {
+				if err := q.Enqueue("foo"); err != nil {
+					errc <- err
+					return
+				}
+			}
+			errc <- nil
+		}(q)
+	}
+}
+
+type testQueue interface {
+	Enqueue(val string) error
+	Dequeue() (string, error)
+}
+
+type flatPriorityQueue struct{ *recipe.PriorityQueue }
+
+func (q *flatPriorityQueue) Enqueue(val string) error {
+	// randomized to stress dequeuing logic; order isn't important
+	return q.PriorityQueue.Enqueue(val, uint16(rand.Intn(2)))
+}
+func (q *flatPriorityQueue) Dequeue() (string, error) {
+	return q.PriorityQueue.Dequeue()
+}

+ 138 - 0
integration/v3_stm_test.go

@@ -0,0 +1,138 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.package recipe
+package integration
+
+import (
+	"fmt"
+	"math/rand"
+	"strconv"
+	"testing"
+
+	"github.com/coreos/etcd/contrib/recipes"
+)
+
+// TestSTMConflict tests that conflicts are retried.
+func TestSTMConflict(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	keys := make([]*recipe.RemoteKV, 5)
+	for i := 0; i < len(keys); i++ {
+		rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
+		if err != nil {
+			t.Fatalf("could not make key (%v)", err)
+		}
+		keys[i] = rk
+	}
+
+	errc := make([]<-chan error, len(keys))
+	for i, rk := range keys {
+		curEtcdc := recipe.NewEtcdClient(clus.RandConn())
+		srcKey := rk.Key()
+		applyf := func(stm *recipe.STM) error {
+			src, err := stm.Get(srcKey)
+			if err != nil {
+				return err
+			}
+			// must be different key to avoid double-adding
+			dstKey := srcKey
+			for dstKey == srcKey {
+				dstKey = keys[rand.Intn(len(keys))].Key()
+			}
+			dst, err := stm.Get(dstKey)
+			if err != nil {
+				return err
+			}
+			srcV, _ := strconv.ParseInt(src, 10, 64)
+			dstV, _ := strconv.ParseInt(dst, 10, 64)
+			xfer := int64(rand.Intn(int(srcV)) / 2)
+			stm.Put(srcKey, fmt.Sprintf("%d", srcV-xfer))
+			stm.Put(dstKey, fmt.Sprintf("%d", dstV+xfer))
+			return nil
+		}
+		errc[i] = recipe.NewSTM(curEtcdc, applyf)
+	}
+
+	// wait for txns
+	for _, ch := range errc {
+		if err := <-ch; err != nil {
+			t.Fatalf("apply failed (%v)", err)
+		}
+	}
+
+	// ensure sum matches initial sum
+	sum := 0
+	for _, oldRK := range keys {
+		rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key())
+		if err != nil {
+			t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
+		}
+		v, _ := strconv.ParseInt(rk.Value(), 10, 64)
+		sum += int(v)
+	}
+	if sum != len(keys)*100 {
+		t.Fatalf("bad sum. got %d, expected %d", sum, len(keys)*100)
+	}
+}
+
+// TestSTMPut confirms a STM put on a new key is visible after commit.
+func TestSTMPutNewKey(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	defer clus.Terminate(t)
+
+	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	applyf := func(stm *recipe.STM) error {
+		stm.Put("foo", "bar")
+		return nil
+	}
+	errc := recipe.NewSTM(etcdc, applyf)
+	if err := <-errc; err != nil {
+		t.Fatalf("error on stm txn (%v)", err)
+	}
+
+	rk, err := recipe.GetRemoteKV(etcdc, "foo")
+	if err != nil {
+		t.Fatalf("error fetching key (%v)", err)
+	}
+	if rk.Value() != "bar" {
+		t.Fatalf("bad value. got %v, expected bar", rk.Value())
+	}
+}
+
+// TestSTMAbort tests that an aborted txn does not modify any keys.
+func TestSTMAbort(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 1})
+	defer clus.Terminate(t)
+
+	etcdc := recipe.NewEtcdClient(clus.RandConn())
+	applyf := func(stm *recipe.STM) error {
+		stm.Put("foo", "baz")
+		stm.Abort()
+		stm.Put("foo", "baz")
+		return nil
+	}
+	errc := recipe.NewSTM(etcdc, applyf)
+	if err := <-errc; err != nil {
+		t.Fatalf("error on stm txn (%v)", err)
+	}
+
+	rk, err := recipe.GetRemoteKV(etcdc, "foo")
+	if err != nil {
+		t.Fatalf("error fetching key (%v)", err)
+	}
+	if rk.Value() != "" {
+		t.Fatalf("bad value. got %v, expected empty string", rk.Value())
+	}
+}