Browse Source

Merge pull request #7444 from heyitsanthony/lock-service

grpc lock service
Anthony Romano 8 years ago
parent
commit
1a6be700d8

+ 12 - 1
clientv3/client.go

@@ -77,6 +77,14 @@ func New(cfg Config) (*Client, error) {
 	return newClient(&cfg)
 }
 
+// NewCtxClient creates a client with a context but no underlying grpc
+// connection. This is useful for embedded cases that override the
+// service interface implementations and do not need connection management.
+func NewCtxClient(ctx context.Context) *Client {
+	cctx, cancel := context.WithCancel(ctx)
+	return &Client{ctx: cctx, cancel: cancel}
+}
+
 // NewFromURL creates a new etcdv3 client from a URL.
 func NewFromURL(url string) (*Client, error) {
 	return New(Config{Endpoints: []string{url}})
@@ -87,7 +95,10 @@ func (c *Client) Close() error {
 	c.cancel()
 	c.Watcher.Close()
 	c.Lease.Close()
-	return toErr(c.ctx, c.conn.Close())
+	if c.conn != nil {
+		return toErr(c.ctx, c.conn.Close())
+	}
+	return c.ctx.Err()
 }
 
 // Ctx is a context for "out of band" messages (e.g., for sending

+ 1 - 1
clientv3/concurrency/election.go

@@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 		}
 	}
 
-	err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
+	_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
 	if err != nil {
 		// clean up in case of context cancel
 		select {

+ 5 - 4
clientv3/concurrency/key.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 )
@@ -46,19 +47,19 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e
 
 // waitDeletes efficiently waits until all keys matching the prefix and no greater
 // than the create revision.
-func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error {
+func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
 	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
 	for {
 		resp, err := client.Get(ctx, pfx, getOpts...)
 		if err != nil {
-			return err
+			return nil, err
 		}
 		if len(resp.Kvs) == 0 {
-			return nil
+			return resp.Header, nil
 		}
 		lastKey := string(resp.Kvs[0].Key)
 		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
-			return err
+			return nil, err
 		}
 	}
 }

+ 10 - 4
clientv3/concurrency/mutex.go

@@ -19,6 +19,7 @@ import (
 	"sync"
 
 	v3 "github.com/coreos/etcd/clientv3"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 )
 
@@ -29,13 +30,14 @@ type Mutex struct {
 	pfx   string
 	myKey string
 	myRev int64
+	hdr   *pb.ResponseHeader
 }
 
 func NewMutex(s *Session, pfx string) *Mutex {
-	return &Mutex{s, pfx + "/", "", -1}
+	return &Mutex{s, pfx + "/", "", -1, nil}
 }
 
-// Lock locks the mutex with a cancellable context. If the context is cancelled
+// Lock locks the mutex with a cancelable context. If the context is canceled
 // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
 func (m *Mutex) Lock(ctx context.Context) error {
 	s := m.s
@@ -57,14 +59,15 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	}
 
 	// wait for deletion revisions prior to myKey
-	err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
+	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
 	// release lock key if cancelled
 	select {
 	case <-ctx.Done():
 		m.Unlock(client.Ctx())
 	default:
+		m.hdr = hdr
 	}
-	return err
+	return werr
 }
 
 func (m *Mutex) Unlock(ctx context.Context) error {
@@ -83,6 +86,9 @@ func (m *Mutex) IsOwner() v3.Cmp {
 
 func (m *Mutex) Key() string { return m.myKey }
 
+// Header is the response header received from etcd on acquiring the lock.
+func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
+
 type lockerMutex struct{ *Mutex }
 
 func (lm *lockerMutex) Lock() {

+ 5 - 0
embed/serve.go

@@ -25,6 +25,9 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3client"
+	"github.com/coreos/etcd/etcdserver/api/v3lock"
+	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/transport"
@@ -68,6 +71,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 
 	if sctx.insecure {
 		gs := v3rpc.Server(s, nil)
+		v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
 		if sctx.serviceRegister != nil {
 			sctx.serviceRegister(gs)
 		}
@@ -95,6 +99,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 
 	if sctx.secure {
 		gs := v3rpc.Server(s, tlscfg)
+		v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
 		if sctx.serviceRegister != nil {
 			sctx.serviceRegister(gs)
 		}

+ 16 - 0
etcdserver/api/v3client/doc.go

@@ -0,0 +1,16 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package v3client provides clientv3 interfaces from an etcdserver.
+package v3client

+ 40 - 0
etcdserver/api/v3client/v3client.go

@@ -0,0 +1,40 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3client
+
+import (
+	"context"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
+	"github.com/coreos/etcd/proxy/grpcproxy/adapter"
+)
+
+func New(s *etcdserver.EtcdServer) *clientv3.Client {
+	c := clientv3.NewCtxClient(context.Background())
+
+	kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
+	c.KV = clientv3.NewKVFromKVClient(kvc)
+
+	lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
+	c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
+
+	wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
+	c.Watcher = clientv3.NewWatchFromWatchClient(wc)
+
+	return c
+}

+ 16 - 0
etcdserver/api/v3lock/doc.go

@@ -0,0 +1,16 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package v3lock provides a v3 locking service from an etcdserver.
+package v3lock

+ 56 - 0
etcdserver/api/v3lock/lock.go

@@ -0,0 +1,56 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3lock
+
+import (
+	"golang.org/x/net/context"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
+	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
+)
+
+type lockServer struct {
+	c *clientv3.Client
+}
+
+func NewLockServer(c *clientv3.Client) v3lockpb.LockServer {
+	return &lockServer{c}
+}
+
+func (ls *lockServer) Lock(ctx context.Context, req *v3lockpb.LockRequest) (*v3lockpb.LockResponse, error) {
+	s, err := concurrency.NewSession(
+		ls.c,
+		concurrency.WithLease(clientv3.LeaseID(req.Lease)),
+		concurrency.WithContext(ctx),
+	)
+	if err != nil {
+		return nil, err
+	}
+	s.Orphan()
+	m := concurrency.NewMutex(s, string(req.Name))
+	if err = m.Lock(ctx); err != nil {
+		return nil, err
+	}
+	return &v3lockpb.LockResponse{Header: m.Header(), Key: []byte(m.Key())}, nil
+}
+
+func (ls *lockServer) Unlock(ctx context.Context, req *v3lockpb.UnlockRequest) (*v3lockpb.UnlockResponse, error) {
+	resp, err := ls.c.Delete(ctx, string(req.Key))
+	if err != nil {
+		return nil, err
+	}
+	return &v3lockpb.UnlockResponse{Header: resp.Header}, nil
+}

+ 948 - 0
etcdserver/api/v3lock/v3lockpb/v3lock.pb.go

@@ -0,0 +1,948 @@
+// Code generated by protoc-gen-gogo.
+// source: v3lock.proto
+// DO NOT EDIT!
+
+/*
+	Package v3lockpb is a generated protocol buffer package.
+
+	It is generated from these files:
+		v3lock.proto
+
+	It has these top-level messages:
+		LockRequest
+		LockResponse
+		UnlockRequest
+		UnlockResponse
+*/
+package v3lockpb
+
+import (
+	"fmt"
+
+	proto "github.com/golang/protobuf/proto"
+
+	math "math"
+
+	etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	context "golang.org/x/net/context"
+
+	grpc "google.golang.org/grpc"
+
+	io "io"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type LockRequest struct {
+	// name is the identifier for the distributed shared lock to be acquired.
+	Name []byte `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	// lease is the ID of the lease that will be attached to ownership of the
+	// lock. If the lease expires or is revoked and currently holds the lock,
+	// the lock is automatically released. Calls to Lock with the same lease will
+	// be treated as a single acquistion; locking twice with the same lease is a
+	// no-op.
+	Lease int64 `protobuf:"varint,2,opt,name=lease,proto3" json:"lease,omitempty"`
+}
+
+func (m *LockRequest) Reset()                    { *m = LockRequest{} }
+func (m *LockRequest) String() string            { return proto.CompactTextString(m) }
+func (*LockRequest) ProtoMessage()               {}
+func (*LockRequest) Descriptor() ([]byte, []int) { return fileDescriptorV3Lock, []int{0} }
+
+type LockResponse struct {
+	Header *etcdserverpb.ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
+	// key is a key that will exist on etcd for the duration that the Lock caller
+	// owns the lock. Users should not modify this key or the lock may exhibit
+	// undefined behavior.
+	Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *LockResponse) Reset()                    { *m = LockResponse{} }
+func (m *LockResponse) String() string            { return proto.CompactTextString(m) }
+func (*LockResponse) ProtoMessage()               {}
+func (*LockResponse) Descriptor() ([]byte, []int) { return fileDescriptorV3Lock, []int{1} }
+
+func (m *LockResponse) GetHeader() *etcdserverpb.ResponseHeader {
+	if m != nil {
+		return m.Header
+	}
+	return nil
+}
+
+type UnlockRequest struct {
+	// key is the lock ownership key granted by Lock.
+	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *UnlockRequest) Reset()                    { *m = UnlockRequest{} }
+func (m *UnlockRequest) String() string            { return proto.CompactTextString(m) }
+func (*UnlockRequest) ProtoMessage()               {}
+func (*UnlockRequest) Descriptor() ([]byte, []int) { return fileDescriptorV3Lock, []int{2} }
+
+type UnlockResponse struct {
+	Header *etcdserverpb.ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
+}
+
+func (m *UnlockResponse) Reset()                    { *m = UnlockResponse{} }
+func (m *UnlockResponse) String() string            { return proto.CompactTextString(m) }
+func (*UnlockResponse) ProtoMessage()               {}
+func (*UnlockResponse) Descriptor() ([]byte, []int) { return fileDescriptorV3Lock, []int{3} }
+
+func (m *UnlockResponse) GetHeader() *etcdserverpb.ResponseHeader {
+	if m != nil {
+		return m.Header
+	}
+	return nil
+}
+
+func init() {
+	proto.RegisterType((*LockRequest)(nil), "v3lockpb.LockRequest")
+	proto.RegisterType((*LockResponse)(nil), "v3lockpb.LockResponse")
+	proto.RegisterType((*UnlockRequest)(nil), "v3lockpb.UnlockRequest")
+	proto.RegisterType((*UnlockResponse)(nil), "v3lockpb.UnlockResponse")
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for Lock service
+
+type LockClient interface {
+	// Lock acquires a distributed shared lock on a given named lock.
+	// On success, it will return a unique key that exists so long as the
+	// lock is held by the caller. This key can be used in conjunction with
+	// transactions to safely ensure updates to etcd only occur while holding
+	// lock ownership. The lock is held until Unlock is called on the key or the
+	// lease associate with the owner expires.
+	Lock(ctx context.Context, in *LockRequest, opts ...grpc.CallOption) (*LockResponse, error)
+	// Unlock takes a key returned by Lock and releases the hold on lock. The
+	// next Lock caller waiting for the lock will then be woken up and given
+	// ownership of the lock.
+	Unlock(ctx context.Context, in *UnlockRequest, opts ...grpc.CallOption) (*UnlockResponse, error)
+}
+
+type lockClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewLockClient(cc *grpc.ClientConn) LockClient {
+	return &lockClient{cc}
+}
+
+func (c *lockClient) Lock(ctx context.Context, in *LockRequest, opts ...grpc.CallOption) (*LockResponse, error) {
+	out := new(LockResponse)
+	err := grpc.Invoke(ctx, "/v3lockpb.Lock/Lock", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *lockClient) Unlock(ctx context.Context, in *UnlockRequest, opts ...grpc.CallOption) (*UnlockResponse, error) {
+	out := new(UnlockResponse)
+	err := grpc.Invoke(ctx, "/v3lockpb.Lock/Unlock", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// Server API for Lock service
+
+type LockServer interface {
+	// Lock acquires a distributed shared lock on a given named lock.
+	// On success, it will return a unique key that exists so long as the
+	// lock is held by the caller. This key can be used in conjunction with
+	// transactions to safely ensure updates to etcd only occur while holding
+	// lock ownership. The lock is held until Unlock is called on the key or the
+	// lease associate with the owner expires.
+	Lock(context.Context, *LockRequest) (*LockResponse, error)
+	// Unlock takes a key returned by Lock and releases the hold on lock. The
+	// next Lock caller waiting for the lock will then be woken up and given
+	// ownership of the lock.
+	Unlock(context.Context, *UnlockRequest) (*UnlockResponse, error)
+}
+
+func RegisterLockServer(s *grpc.Server, srv LockServer) {
+	s.RegisterService(&_Lock_serviceDesc, srv)
+}
+
+func _Lock_Lock_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(LockRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(LockServer).Lock(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/v3lockpb.Lock/Lock",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(LockServer).Lock(ctx, req.(*LockRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _Lock_Unlock_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(UnlockRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(LockServer).Unlock(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/v3lockpb.Lock/Unlock",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(LockServer).Unlock(ctx, req.(*UnlockRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _Lock_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "v3lockpb.Lock",
+	HandlerType: (*LockServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Lock",
+			Handler:    _Lock_Lock_Handler,
+		},
+		{
+			MethodName: "Unlock",
+			Handler:    _Lock_Unlock_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "v3lock.proto",
+}
+
+func (m *LockRequest) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *LockRequest) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Name) > 0 {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(len(m.Name)))
+		i += copy(dAtA[i:], m.Name)
+	}
+	if m.Lease != 0 {
+		dAtA[i] = 0x10
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(m.Lease))
+	}
+	return i, nil
+}
+
+func (m *LockResponse) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *LockResponse) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Header != nil {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(m.Header.Size()))
+		n1, err := m.Header.MarshalTo(dAtA[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n1
+	}
+	if len(m.Key) > 0 {
+		dAtA[i] = 0x12
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(len(m.Key)))
+		i += copy(dAtA[i:], m.Key)
+	}
+	return i, nil
+}
+
+func (m *UnlockRequest) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *UnlockRequest) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Key) > 0 {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(len(m.Key)))
+		i += copy(dAtA[i:], m.Key)
+	}
+	return i, nil
+}
+
+func (m *UnlockResponse) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *UnlockResponse) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Header != nil {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintV3Lock(dAtA, i, uint64(m.Header.Size()))
+		n2, err := m.Header.MarshalTo(dAtA[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n2
+	}
+	return i, nil
+}
+
+func encodeFixed64V3Lock(dAtA []byte, offset int, v uint64) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	dAtA[offset+4] = uint8(v >> 32)
+	dAtA[offset+5] = uint8(v >> 40)
+	dAtA[offset+6] = uint8(v >> 48)
+	dAtA[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32V3Lock(dAtA []byte, offset int, v uint32) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintV3Lock(dAtA []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		dAtA[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	dAtA[offset] = uint8(v)
+	return offset + 1
+}
+func (m *LockRequest) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Name)
+	if l > 0 {
+		n += 1 + l + sovV3Lock(uint64(l))
+	}
+	if m.Lease != 0 {
+		n += 1 + sovV3Lock(uint64(m.Lease))
+	}
+	return n
+}
+
+func (m *LockResponse) Size() (n int) {
+	var l int
+	_ = l
+	if m.Header != nil {
+		l = m.Header.Size()
+		n += 1 + l + sovV3Lock(uint64(l))
+	}
+	l = len(m.Key)
+	if l > 0 {
+		n += 1 + l + sovV3Lock(uint64(l))
+	}
+	return n
+}
+
+func (m *UnlockRequest) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Key)
+	if l > 0 {
+		n += 1 + l + sovV3Lock(uint64(l))
+	}
+	return n
+}
+
+func (m *UnlockResponse) Size() (n int) {
+	var l int
+	_ = l
+	if m.Header != nil {
+		l = m.Header.Size()
+		n += 1 + l + sovV3Lock(uint64(l))
+	}
+	return n
+}
+
+func sovV3Lock(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozV3Lock(x uint64) (n int) {
+	return sovV3Lock(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *LockRequest) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowV3Lock
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LockRequest: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LockRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if byteLen < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			postIndex := iNdEx + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Name = append(m.Name[:0], dAtA[iNdEx:postIndex]...)
+			if m.Name == nil {
+				m.Name = []byte{}
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Lease", wireType)
+			}
+			m.Lease = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.Lease |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			iNdEx = preIndex
+			skippy, err := skipV3Lock(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *LockResponse) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowV3Lock
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LockResponse: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LockResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Header == nil {
+				m.Header = &etcdserverpb.ResponseHeader{}
+			}
+			if err := m.Header.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if byteLen < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			postIndex := iNdEx + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
+			if m.Key == nil {
+				m.Key = []byte{}
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipV3Lock(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *UnlockRequest) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowV3Lock
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: UnlockRequest: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: UnlockRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if byteLen < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			postIndex := iNdEx + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
+			if m.Key == nil {
+				m.Key = []byte{}
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipV3Lock(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *UnlockResponse) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowV3Lock
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: UnlockResponse: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: UnlockResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Header == nil {
+				m.Header = &etcdserverpb.ResponseHeader{}
+			}
+			if err := m.Header.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipV3Lock(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthV3Lock
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func skipV3Lock(dAtA []byte) (n int, err error) {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return 0, ErrIntOverflowV3Lock
+			}
+			if iNdEx >= l {
+				return 0, io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		wireType := int(wire & 0x7)
+		switch wireType {
+		case 0:
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				iNdEx++
+				if dAtA[iNdEx-1] < 0x80 {
+					break
+				}
+			}
+			return iNdEx, nil
+		case 1:
+			iNdEx += 8
+			return iNdEx, nil
+		case 2:
+			var length int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowV3Lock
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				length |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			iNdEx += length
+			if length < 0 {
+				return 0, ErrInvalidLengthV3Lock
+			}
+			return iNdEx, nil
+		case 3:
+			for {
+				var innerWire uint64
+				var start int = iNdEx
+				for shift := uint(0); ; shift += 7 {
+					if shift >= 64 {
+						return 0, ErrIntOverflowV3Lock
+					}
+					if iNdEx >= l {
+						return 0, io.ErrUnexpectedEOF
+					}
+					b := dAtA[iNdEx]
+					iNdEx++
+					innerWire |= (uint64(b) & 0x7F) << shift
+					if b < 0x80 {
+						break
+					}
+				}
+				innerWireType := int(innerWire & 0x7)
+				if innerWireType == 4 {
+					break
+				}
+				next, err := skipV3Lock(dAtA[start:])
+				if err != nil {
+					return 0, err
+				}
+				iNdEx = start + next
+			}
+			return iNdEx, nil
+		case 4:
+			return iNdEx, nil
+		case 5:
+			iNdEx += 4
+			return iNdEx, nil
+		default:
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+		}
+	}
+	panic("unreachable")
+}
+
+var (
+	ErrInvalidLengthV3Lock = fmt.Errorf("proto: negative length found during unmarshaling")
+	ErrIntOverflowV3Lock   = fmt.Errorf("proto: integer overflow")
+)
+
+func init() { proto.RegisterFile("v3lock.proto", fileDescriptorV3Lock) }
+
+var fileDescriptorV3Lock = []byte{
+	// 336 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x33, 0xce, 0xc9,
+	0x4f, 0xce, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf0, 0x0a, 0x92, 0xa4, 0x44,
+	0xd2, 0xf3, 0xd3, 0xf3, 0xc1, 0x82, 0xfa, 0x20, 0x16, 0x44, 0x5e, 0x4a, 0x2d, 0xb5, 0x24, 0x39,
+	0x45, 0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a,
+	0x92, 0xa1, 0xea, 0x64, 0xd2, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0x13, 0x0b, 0x32, 0xf5, 0x13,
+	0xf3, 0xf2, 0xf2, 0x4b, 0x12, 0x4b, 0x32, 0xf3, 0xf3, 0x8a, 0x21, 0xb2, 0x4a, 0xe6, 0x5c, 0xdc,
+	0x3e, 0xf9, 0xc9, 0xd9, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0x79,
+	0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x60, 0xb6, 0x90, 0x08, 0x17, 0x6b,
+	0x4e, 0x6a, 0x62, 0x71, 0xaa, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x73, 0x10, 0x84, 0xa3, 0x14, 0xc6,
+	0xc5, 0x03, 0xd1, 0x58, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x2a, 0x64, 0xc2, 0xc5, 0x96, 0x91, 0x9a,
+	0x98, 0x92, 0x5a, 0x04, 0xd6, 0xcb, 0x6d, 0x24, 0xa3, 0x87, 0xec, 0x1e, 0x3d, 0x98, 0x3a, 0x0f,
+	0xb0, 0x9a, 0x20, 0xa8, 0x5a, 0x21, 0x01, 0x2e, 0xe6, 0xec, 0xd4, 0x4a, 0xb0, 0xc9, 0x3c, 0x41,
+	0x20, 0xa6, 0x92, 0x22, 0x17, 0x6f, 0x68, 0x5e, 0x0e, 0x92, 0x93, 0xa0, 0x4a, 0x18, 0x11, 0x4a,
+	0xdc, 0xb8, 0xf8, 0x60, 0x4a, 0x28, 0xb1, 0xdc, 0x68, 0x17, 0x23, 0x17, 0x0b, 0xc8, 0x0f, 0x42,
+	0x21, 0x50, 0x5a, 0x54, 0x0f, 0x16, 0xe6, 0x7a, 0x48, 0x81, 0x22, 0x25, 0x86, 0x2e, 0x0c, 0x31,
+	0x4d, 0x49, 0xb6, 0xe9, 0xf2, 0x93, 0xc9, 0x4c, 0xe2, 0x4a, 0x42, 0xfa, 0x65, 0xc6, 0x89, 0x39,
+	0x05, 0x19, 0x89, 0xfa, 0x20, 0x55, 0x60, 0xc2, 0x8a, 0x51, 0x4b, 0x28, 0x86, 0x8b, 0x0d, 0xe2,
+	0x4c, 0x21, 0x71, 0x84, 0x01, 0x28, 0x7e, 0x93, 0x92, 0xc0, 0x94, 0x80, 0x9a, 0x2d, 0x0f, 0x36,
+	0x5b, 0x52, 0x49, 0x04, 0xd5, 0xec, 0xd2, 0x3c, 0xa8, 0xe9, 0x4e, 0x02, 0x27, 0x1e, 0xc9, 0x31,
+	0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0xe0,
+	0x18, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xb6, 0xa0, 0x26, 0x28, 0x47, 0x02, 0x00, 0x00,
+}

+ 65 - 0
etcdserver/api/v3lock/v3lockpb/v3lock.proto

@@ -0,0 +1,65 @@
+syntax = "proto3";
+package v3lockpb;
+
+import "gogoproto/gogo.proto";
+import "etcd/etcdserver/etcdserverpb/rpc.proto";
+
+// for grpc-gateway
+import "google/api/annotations.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+
+// The lock service exposes client-side locking facilities as a gRPC interface.
+service Lock {
+  // Lock acquires a distributed shared lock on a given named lock.
+  // On success, it will return a unique key that exists so long as the
+  // lock is held by the caller. This key can be used in conjunction with
+  // transactions to safely ensure updates to etcd only occur while holding
+  // lock ownership. The lock is held until Unlock is called on the key or the
+  // lease associate with the owner expires.
+  rpc Lock(LockRequest) returns (LockResponse) {
+      option (google.api.http) = {
+        post: "/v3alpha/lock/lock"
+        body: "*"
+    };
+  }
+
+  // Unlock takes a key returned by Lock and releases the hold on lock. The
+  // next Lock caller waiting for the lock will then be woken up and given
+  // ownership of the lock.
+  rpc Unlock(UnlockRequest) returns (UnlockResponse) {
+      option (google.api.http) = {
+        post: "/v3alpha/lock/unlock"
+        body: "*"
+    };
+  }
+}
+
+message LockRequest {
+  // name is the identifier for the distributed shared lock to be acquired.
+  bytes name = 1;
+  // lease is the ID of the lease that will be attached to ownership of the
+  // lock. If the lease expires or is revoked and currently holds the lock,
+  // the lock is automatically released. Calls to Lock with the same lease will
+  // be treated as a single acquistion; locking twice with the same lease is a
+  // no-op.
+  int64 lease = 2;
+}
+
+message LockResponse {
+  etcdserverpb.ResponseHeader header = 1;
+  // key is a key that will exist on etcd for the duration that the Lock caller
+  // owns the lock. Users should not modify this key or the lock may exhibit
+  // undefined behavior.
+  bytes key = 2;
+}
+
+message UnlockRequest {
+  // key is the lock ownership key granted by Lock.
+  bytes key = 1;
+}
+
+message UnlockResponse {
+  etcdserverpb.ResponseHeader header = 1;
+}

+ 12 - 0
integration/cluster.go

@@ -38,6 +38,9 @@ import (
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v2http"
+	"github.com/coreos/etcd/etcdserver/api/v3client"
+	"github.com/coreos/etcd/etcdserver/api/v3lock"
+	lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
@@ -459,6 +462,9 @@ type member struct {
 	grpcServer *grpc.Server
 	grpcAddr   string
 	grpcBridge *bridge
+
+	// serverClient is a clientv3 that directly calls the etcdserver.
+	serverClient *clientv3.Client
 }
 
 func (m *member) GRPCAddr() string { return m.grpcAddr }
@@ -652,6 +658,8 @@ func (m *member) Launch() error {
 			}
 		}
 		m.grpcServer = v3rpc.Server(m.s, tlscfg)
+		m.serverClient = v3client.New(m.s)
+		lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
 		go m.grpcServer.Serve(m.grpcListener)
 	}
 
@@ -695,6 +703,10 @@ func (m *member) Close() {
 		m.grpcBridge.Close()
 		m.grpcBridge = nil
 	}
+	if m.serverClient != nil {
+		m.serverClient.Close()
+		m.serverClient = nil
+	}
 	if m.grpcServer != nil {
 		m.grpcServer.Stop()
 		m.grpcServer = nil

+ 4 - 3
integration/cluster_proxy.go

@@ -22,6 +22,7 @@ import (
 	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/proxy/grpcproxy"
+	"github.com/coreos/etcd/proxy/grpcproxy/adapter"
 )
 
 var (
@@ -48,9 +49,9 @@ func toGRPC(c *clientv3.Client) grpcAPI {
 	lp, lpch := grpcproxy.NewLeaseProxy(c)
 	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
-		grpcproxy.KvServerToKvClient(kvp),
-		grpcproxy.LeaseServerToLeaseClient(lp),
-		grpcproxy.WatchServerToWatchClient(wp),
+		adapter.KvServerToKvClient(kvp),
+		adapter.LeaseServerToLeaseClient(lp),
+		adapter.WatchServerToWatchClient(wp),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 		pb.NewAuthClient(c.ActiveConnection()),
 	}

+ 76 - 0
integration/v3lock_grpc_test.go

@@ -0,0 +1,76 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+	"testing"
+	"time"
+
+	lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+	"golang.org/x/net/context"
+)
+
+// TestV3LockLockWaiter tests that a client will wait for a lock, then acquire it
+// once it is unlocked.
+func TestV3LockLockWaiter(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	lease1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
+	if err1 != nil {
+		t.Fatal(err1)
+	}
+	lease2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
+	if err2 != nil {
+		t.Fatal(err2)
+	}
+
+	lc := lockpb.NewLockClient(clus.Client(0).ActiveConnection())
+	l1, lerr1 := lc.Lock(context.TODO(), &lockpb.LockRequest{Name: []byte("foo"), Lease: lease1.ID})
+	if lerr1 != nil {
+		t.Fatal(lerr1)
+	}
+
+	lockc := make(chan struct{})
+	go func() {
+		l2, lerr2 := lc.Lock(context.TODO(), &lockpb.LockRequest{Name: []byte("foo"), Lease: lease2.ID})
+		if lerr2 != nil {
+			t.Fatal(lerr2)
+		}
+		if l1.Header.Revision >= l2.Header.Revision {
+			t.Fatalf("expected l1 revision < l2 revision, got %d >= %d", l1.Header.Revision, l2.Header.Revision)
+		}
+		close(lockc)
+	}()
+
+	select {
+	case <-time.After(200 * time.Millisecond):
+	case <-lockc:
+		t.Fatalf("locked before unlock")
+	}
+
+	if _, uerr := lc.Unlock(context.TODO(), &lockpb.UnlockRequest{Key: l1.Key}); uerr != nil {
+		t.Fatal(uerr)
+	}
+
+	select {
+	case <-time.After(200 * time.Millisecond):
+		t.Fatalf("waiter did not lock after unlock")
+	case <-lockc:
+	}
+}

+ 1 - 1
proxy/grpcproxy/chan_stream.go → proxy/grpcproxy/adapter/chan_stream.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package grpcproxy
+package adapter
 
 import (
 	"golang.org/x/net/context"

+ 17 - 0
proxy/grpcproxy/adapter/doc.go

@@ -0,0 +1,17 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package adapter provides gRPC adapters between client and server
+// gRPC interfaces without needing to go through a gRPC connection.
+package adapter

+ 1 - 1
proxy/grpcproxy/kv_client_adapter.go → proxy/grpcproxy/adapter/kv_client_adapter.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package grpcproxy
+package adapter
 
 import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

+ 1 - 1
proxy/grpcproxy/lease_client_adapter.go → proxy/grpcproxy/adapter/lease_client_adapter.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package grpcproxy
+package adapter
 
 import (
 	"golang.org/x/net/context"

+ 2 - 2
proxy/grpcproxy/watch_client_adapter.go → proxy/grpcproxy/adapter/watch_client_adapter.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package grpcproxy
+package adapter
 
 import (
 	"errors"
@@ -23,7 +23,7 @@ import (
 	"google.golang.org/grpc/metadata"
 )
 
-var errAlreadySentHeader = errors.New("grpcproxy: already send header")
+var errAlreadySentHeader = errors.New("adapter: already sent header")
 
 type ws2wc struct{ wserv pb.WatchServer }
 

+ 1 - 1
scripts/genproto.sh

@@ -17,7 +17,7 @@ if ! [[ $(protoc --version) =~ "3.1.0" ]]; then
 fi
 
 # directories containing protos to be built
-DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb"
+DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb"
 
 # exact version of protoc-gen-gogo to build
 GOGO_PROTO_SHA="8d70fb3182befc465c4a1eac8ad4d38ff49778e2"