|
|
@@ -21,18 +21,20 @@ import (
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
- "github.com/coreos/etcd/lease"
|
|
|
)
|
|
|
|
|
|
type (
|
|
|
LeaseCreateResponse pb.LeaseCreateResponse
|
|
|
LeaseRevokeResponse pb.LeaseRevokeResponse
|
|
|
LeaseKeepAliveResponse pb.LeaseKeepAliveResponse
|
|
|
+ LeaseID int64
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
// a small buffer to store unsent lease responses.
|
|
|
leaseResponseChSize = 16
|
|
|
+ // NoLease is a lease ID for the absence of a lease.
|
|
|
+ NoLease LeaseID = 0
|
|
|
)
|
|
|
|
|
|
type Lease interface {
|
|
|
@@ -40,14 +42,14 @@ type Lease interface {
|
|
|
Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, error)
|
|
|
|
|
|
// Revoke revokes the given lease.
|
|
|
- Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResponse, error)
|
|
|
+ Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
|
|
|
|
|
|
// KeepAlive keeps the given lease alive forever.
|
|
|
- KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error)
|
|
|
+ KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
|
|
|
|
|
|
// KeepAliveOnce renews the lease once. In most of the cases, Keepalive
|
|
|
// should be used instead of KeepAliveOnce.
|
|
|
- KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error)
|
|
|
+ KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
|
|
|
|
|
|
// Close releases all resources Lease keeps for efficient communication
|
|
|
// with the etcd server.
|
|
|
@@ -71,7 +73,7 @@ type lessor struct {
|
|
|
stopCtx context.Context
|
|
|
stopCancel context.CancelFunc
|
|
|
|
|
|
- keepAlives map[lease.LeaseID]*keepAlive
|
|
|
+ keepAlives map[LeaseID]*keepAlive
|
|
|
}
|
|
|
|
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
|
@@ -90,7 +92,7 @@ func NewLease(c *Client) Lease {
|
|
|
conn: c.ActiveConnection(),
|
|
|
|
|
|
donec: make(chan struct{}),
|
|
|
- keepAlives: make(map[lease.LeaseID]*keepAlive),
|
|
|
+ keepAlives: make(map[LeaseID]*keepAlive),
|
|
|
}
|
|
|
|
|
|
l.remote = pb.NewLeaseClient(l.conn)
|
|
|
@@ -121,7 +123,7 @@ func (l *lessor) Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, e
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (l *lessor) Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResponse, error) {
|
|
|
+func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
|
|
cctx, cancel := context.WithCancel(ctx)
|
|
|
done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
defer close(done)
|
|
|
@@ -143,7 +145,7 @@ func (l *lessor) Revoke(ctx context.Context, id lease.LeaseID) (*LeaseRevokeResp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
|
|
+func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
|
|
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
|
|
|
|
|
l.mu.Lock()
|
|
|
@@ -169,7 +171,7 @@ func (l *lessor) KeepAlive(ctx context.Context, id lease.LeaseID) (<-chan *Lease
|
|
|
return ch, nil
|
|
|
}
|
|
|
|
|
|
-func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
+func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
cctx, cancel := context.WithCancel(ctx)
|
|
|
done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
defer close(done)
|
|
|
@@ -193,7 +195,7 @@ func (l *lessor) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (l *lessor) keepAliveCtxCloser(id lease.LeaseID, ctx context.Context, donec <-chan struct{}) {
|
|
|
+func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
|
|
|
select {
|
|
|
case <-donec:
|
|
|
return
|
|
|
@@ -225,7 +227,7 @@ func (l *lessor) keepAliveCtxCloser(id lease.LeaseID, ctx context.Context, donec
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
+func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
stream, err := l.getRemote().LeaseKeepAlive(ctx)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -251,7 +253,7 @@ func (l *lessor) recvKeepAliveLoop() {
|
|
|
for _, ka := range l.keepAlives {
|
|
|
ka.Close()
|
|
|
}
|
|
|
- l.keepAlives = make(map[lease.LeaseID]*keepAlive)
|
|
|
+ l.keepAlives = make(map[LeaseID]*keepAlive)
|
|
|
l.mu.Unlock()
|
|
|
}()
|
|
|
|
|
|
@@ -281,7 +283,7 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
|
|
|
|
|
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
|
|
|
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|
|
- id := lease.LeaseID(resp.ID)
|
|
|
+ id := LeaseID(resp.ID)
|
|
|
|
|
|
l.mu.Lock()
|
|
|
defer l.mu.Unlock()
|
|
|
@@ -320,7 +322,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- tosend := make([]lease.LeaseID, 0)
|
|
|
+ tosend := make([]LeaseID, 0)
|
|
|
|
|
|
now := time.Now()
|
|
|
l.mu.Lock()
|