|
@@ -159,13 +159,9 @@ func NewLease(c *Client) Lease {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
|
|
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
|
|
|
- cctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
- done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
|
|
- defer close(done)
|
|
|
|
|
-
|
|
|
|
|
for {
|
|
for {
|
|
|
r := &pb.LeaseGrantRequest{TTL: ttl}
|
|
r := &pb.LeaseGrantRequest{TTL: ttl}
|
|
|
- resp, err := l.remote.LeaseGrant(cctx, r)
|
|
|
|
|
|
|
+ resp, err := l.remote.LeaseGrant(ctx, r)
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
gresp := &LeaseGrantResponse{
|
|
gresp := &LeaseGrantResponse{
|
|
|
ResponseHeader: resp.GetHeader(),
|
|
ResponseHeader: resp.GetHeader(),
|
|
@@ -175,20 +171,16 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
|
|
|
}
|
|
}
|
|
|
return gresp, nil
|
|
return gresp, nil
|
|
|
}
|
|
}
|
|
|
- if isHaltErr(cctx, err) {
|
|
|
|
|
- return nil, toErr(cctx, err)
|
|
|
|
|
|
|
+ if isHaltErr(ctx, err) {
|
|
|
|
|
+ return nil, toErr(ctx, err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
|
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
|
|
- cctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
- done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
|
|
- defer close(done)
|
|
|
|
|
-
|
|
|
|
|
for {
|
|
for {
|
|
|
r := &pb.LeaseRevokeRequest{ID: int64(id)}
|
|
r := &pb.LeaseRevokeRequest{ID: int64(id)}
|
|
|
- resp, err := l.remote.LeaseRevoke(cctx, r)
|
|
|
|
|
|
|
+ resp, err := l.remote.LeaseRevoke(ctx, r)
|
|
|
|
|
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
return (*LeaseRevokeResponse)(resp), nil
|
|
return (*LeaseRevokeResponse)(resp), nil
|
|
@@ -200,13 +192,9 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
|
|
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
|
|
|
- cctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
- done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
|
|
- defer close(done)
|
|
|
|
|
-
|
|
|
|
|
for {
|
|
for {
|
|
|
r := toLeaseTimeToLiveRequest(id, opts...)
|
|
r := toLeaseTimeToLiveRequest(id, opts...)
|
|
|
- resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false))
|
|
|
|
|
|
|
+ resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false))
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
gresp := &LeaseTimeToLiveResponse{
|
|
gresp := &LeaseTimeToLiveResponse{
|
|
|
ResponseHeader: resp.GetHeader(),
|
|
ResponseHeader: resp.GetHeader(),
|
|
@@ -217,8 +205,8 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
|
|
}
|
|
}
|
|
|
return gresp, nil
|
|
return gresp, nil
|
|
|
}
|
|
}
|
|
|
- if isHaltErr(cctx, err) {
|
|
|
|
|
- return nil, toErr(cctx, err)
|
|
|
|
|
|
|
+ if isHaltErr(ctx, err) {
|
|
|
|
|
+ return nil, toErr(ctx, err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -264,12 +252,8 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
- cctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
- done := cancelWhenStop(cancel, l.stopCtx.Done())
|
|
|
|
|
- defer close(done)
|
|
|
|
|
-
|
|
|
|
|
for {
|
|
for {
|
|
|
- resp, err := l.keepAliveOnce(cctx, id)
|
|
|
|
|
|
|
+ resp, err := l.keepAliveOnce(ctx, id)
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
if resp.TTL == 0 {
|
|
if resp.TTL == 0 {
|
|
|
err = rpctypes.ErrLeaseNotFound
|
|
err = rpctypes.ErrLeaseNotFound
|
|
@@ -496,20 +480,3 @@ func (ka *keepAlive) Close() {
|
|
|
close(ch)
|
|
close(ch)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-// cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
|
|
|
|
|
-// should be closed when the work is finished. When done fires, cancelWhenStop will release
|
|
|
|
|
-// its internal resource.
|
|
|
|
|
-func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
|
|
|
|
|
- done := make(chan struct{}, 1)
|
|
|
|
|
-
|
|
|
|
|
- go func() {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-stopc:
|
|
|
|
|
- case <-done:
|
|
|
|
|
- }
|
|
|
|
|
- cancel()
|
|
|
|
|
- }()
|
|
|
|
|
-
|
|
|
|
|
- return done
|
|
|
|
|
-}
|
|
|