|
@@ -41,10 +41,8 @@ type LeaseGrantResponse struct {
|
|
|
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
|
|
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
|
|
|
type LeaseKeepAliveResponse struct {
|
|
type LeaseKeepAliveResponse struct {
|
|
|
*pb.ResponseHeader
|
|
*pb.ResponseHeader
|
|
|
- ID LeaseID
|
|
|
|
|
- TTL int64
|
|
|
|
|
- Err error
|
|
|
|
|
- Deadline time.Time
|
|
|
|
|
|
|
+ ID LeaseID
|
|
|
|
|
+ TTL int64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
|
|
// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
|
|
@@ -72,11 +70,23 @@ const (
|
|
|
NoLease LeaseID = 0
|
|
NoLease LeaseID = 0
|
|
|
|
|
|
|
|
// retryConnWait is how long to wait before retrying on a lost leader
|
|
// retryConnWait is how long to wait before retrying on a lost leader
|
|
|
- // or keep alive loop failure.
|
|
|
|
|
retryConnWait = 500 * time.Millisecond
|
|
retryConnWait = 500 * time.Millisecond
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse
|
|
|
|
|
|
|
+// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
|
|
|
|
+//
|
|
|
|
|
+// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
|
|
|
|
+type ErrKeepAliveHalted struct {
|
|
|
|
|
+ Reason error
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (e ErrKeepAliveHalted) Error() string {
|
|
|
|
|
+ s := "etcdclient: leases keep alive halted"
|
|
|
|
|
+ if e.Reason != nil {
|
|
|
|
|
+ s += ": " + e.Reason.Error()
|
|
|
|
|
+ }
|
|
|
|
|
+ return s
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
type Lease interface {
|
|
type Lease interface {
|
|
|
// Grant creates a new lease.
|
|
// Grant creates a new lease.
|
|
@@ -88,24 +98,12 @@ type Lease interface {
|
|
|
// TimeToLive retrieves the lease information of the given lease ID.
|
|
// TimeToLive retrieves the lease information of the given lease ID.
|
|
|
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
|
|
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
|
|
|
|
|
|
|
|
- // KeepAlive keeps the given lease alive forever. If the keepalive response posted to
|
|
|
|
|
- // the channel is not consumed immediately, the lease client will continue sending keep alive requests
|
|
|
|
|
- // to the etcd server at least every second until latest response is consumed.
|
|
|
|
|
- //
|
|
|
|
|
- // The KeepAlive channel closes if the underlying keep alive stream is interrupted in some
|
|
|
|
|
- // way the client cannot handle itself; the error will be posted in the last keep
|
|
|
|
|
- // alive message before closing. If there is no keepalive response within the
|
|
|
|
|
- // lease's time-out, the channel will close with no error. In most cases calling
|
|
|
|
|
- // KeepAlive again will re-establish keepalives with the target lease if it has not
|
|
|
|
|
- // expired.
|
|
|
|
|
- KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan
|
|
|
|
|
-
|
|
|
|
|
- // KeepAliveOnce renews the lease once. The response corresponds to the
|
|
|
|
|
- // first message from calling KeepAlive. If the response has a recoverable
|
|
|
|
|
- // error, KeepAliveOnce will retry the RPC with a new keep alive message.
|
|
|
|
|
- //
|
|
|
|
|
- // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
|
|
|
|
|
- KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse
|
|
|
|
|
|
|
+ // KeepAlive keeps the given lease alive forever.
|
|
|
|
|
+ KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
|
|
|
|
|
+
|
|
|
|
|
+ // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
|
|
|
|
|
+ // should be used instead of KeepAliveOnce.
|
|
|
|
|
+ KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
|
|
|
|
|
|
|
|
// Close releases all resources Lease keeps for efficient communication
|
|
// Close releases all resources Lease keeps for efficient communication
|
|
|
// with the etcd server.
|
|
// with the etcd server.
|
|
@@ -115,8 +113,9 @@ type Lease interface {
|
|
|
type lessor struct {
|
|
type lessor struct {
|
|
|
mu sync.Mutex // guards all fields
|
|
mu sync.Mutex // guards all fields
|
|
|
|
|
|
|
|
- // donec is closed when all goroutines are torn down from Close()
|
|
|
|
|
- donec chan struct{}
|
|
|
|
|
|
|
+ // donec is closed and loopErr is set when recvKeepAliveLoop stops
|
|
|
|
|
+ donec chan struct{}
|
|
|
|
|
+ loopErr error
|
|
|
|
|
|
|
|
remote pb.LeaseClient
|
|
remote pb.LeaseClient
|
|
|
|
|
|
|
@@ -138,7 +137,7 @@ type lessor struct {
|
|
|
|
|
|
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
|
type keepAlive struct {
|
|
type keepAlive struct {
|
|
|
- chs []chan<- LeaseKeepAliveResponse
|
|
|
|
|
|
|
+ chs []chan<- *LeaseKeepAliveResponse
|
|
|
ctxs []context.Context
|
|
ctxs []context.Context
|
|
|
// deadline is the time the keep alive channels close if no response
|
|
// deadline is the time the keep alive channels close if no response
|
|
|
deadline time.Time
|
|
deadline time.Time
|
|
@@ -220,22 +219,24 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan {
|
|
|
|
|
- ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize)
|
|
|
|
|
|
|
+func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
|
|
|
|
+ ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
|
|
|
|
|
|
|
|
l.mu.Lock()
|
|
l.mu.Lock()
|
|
|
// ensure that recvKeepAliveLoop is still running
|
|
// ensure that recvKeepAliveLoop is still running
|
|
|
select {
|
|
select {
|
|
|
case <-l.donec:
|
|
case <-l.donec:
|
|
|
|
|
+ err := l.loopErr
|
|
|
|
|
+ l.mu.Unlock()
|
|
|
close(ch)
|
|
close(ch)
|
|
|
- return ch
|
|
|
|
|
|
|
+ return ch, ErrKeepAliveHalted{Reason: err}
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
ka, ok := l.keepAlives[id]
|
|
ka, ok := l.keepAlives[id]
|
|
|
if !ok {
|
|
if !ok {
|
|
|
// create fresh keep alive
|
|
// create fresh keep alive
|
|
|
ka = &keepAlive{
|
|
ka = &keepAlive{
|
|
|
- chs: []chan<- LeaseKeepAliveResponse{ch},
|
|
|
|
|
|
|
+ chs: []chan<- *LeaseKeepAliveResponse{ch},
|
|
|
ctxs: []context.Context{ctx},
|
|
ctxs: []context.Context{ctx},
|
|
|
deadline: time.Now().Add(l.firstKeepAliveTimeout),
|
|
deadline: time.Now().Add(l.firstKeepAliveTimeout),
|
|
|
nextKeepAlive: time.Now(),
|
|
nextKeepAlive: time.Now(),
|
|
@@ -251,51 +252,24 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan {
|
|
|
|
|
|
|
|
go l.keepAliveCtxCloser(id, ctx, ka.donec)
|
|
go l.keepAliveCtxCloser(id, ctx, ka.donec)
|
|
|
l.firstKeepAliveOnce.Do(func() {
|
|
l.firstKeepAliveOnce.Do(func() {
|
|
|
- go func() {
|
|
|
|
|
- defer func() {
|
|
|
|
|
- l.mu.Lock()
|
|
|
|
|
- for _, ka := range l.keepAlives {
|
|
|
|
|
- ka.Close(nil)
|
|
|
|
|
- }
|
|
|
|
|
- close(l.donec)
|
|
|
|
|
- l.mu.Unlock()
|
|
|
|
|
- }()
|
|
|
|
|
-
|
|
|
|
|
- for l.stopCtx.Err() == nil {
|
|
|
|
|
- err := l.recvKeepAliveLoop()
|
|
|
|
|
- if err == context.Canceled {
|
|
|
|
|
- // canceled by user; no error like WatchChan
|
|
|
|
|
- err = nil
|
|
|
|
|
- }
|
|
|
|
|
- l.mu.Lock()
|
|
|
|
|
- for _, ka := range l.keepAlives {
|
|
|
|
|
- ka.Close(err)
|
|
|
|
|
- }
|
|
|
|
|
- l.keepAlives = make(map[LeaseID]*keepAlive)
|
|
|
|
|
- l.mu.Unlock()
|
|
|
|
|
- select {
|
|
|
|
|
- case <-l.stopCtx.Done():
|
|
|
|
|
- case <-time.After(retryConnWait):
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
|
|
+ go l.recvKeepAliveLoop()
|
|
|
go l.deadlineLoop()
|
|
go l.deadlineLoop()
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- return ch
|
|
|
|
|
|
|
+ return ch, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
|
|
|
|
|
|
|
+func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
for {
|
|
for {
|
|
|
- resp := l.keepAliveOnce(ctx, id)
|
|
|
|
|
- if resp.Err == nil {
|
|
|
|
|
|
|
+ resp, err := l.keepAliveOnce(ctx, id)
|
|
|
|
|
+ if err == nil {
|
|
|
if resp.TTL <= 0 {
|
|
if resp.TTL <= 0 {
|
|
|
- resp.Err = rpctypes.ErrLeaseNotFound
|
|
|
|
|
|
|
+ err = rpctypes.ErrLeaseNotFound
|
|
|
}
|
|
}
|
|
|
- return resp
|
|
|
|
|
|
|
+ return resp, err
|
|
|
}
|
|
}
|
|
|
- if isHaltErr(ctx, resp.Err) {
|
|
|
|
|
- return resp
|
|
|
|
|
|
|
+ if isHaltErr(ctx, err) {
|
|
|
|
|
+ return nil, toErr(ctx, err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -365,7 +339,7 @@ func (l *lessor) closeRequireLeader() {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
// remove all channels that required a leader from keepalive
|
|
// remove all channels that required a leader from keepalive
|
|
|
- newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
|
|
|
|
|
|
|
+ newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
|
|
|
newCtxs := make([]context.Context, len(newChs))
|
|
newCtxs := make([]context.Context, len(newChs))
|
|
|
newIdx := 0
|
|
newIdx := 0
|
|
|
for i := range ka.chs {
|
|
for i := range ka.chs {
|
|
@@ -379,34 +353,45 @@ func (l *lessor) closeRequireLeader() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
|
|
|
|
|
|
|
+func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
|
cctx, cancel := context.WithCancel(ctx)
|
|
cctx, cancel := context.WithCancel(ctx)
|
|
|
defer cancel()
|
|
defer cancel()
|
|
|
|
|
|
|
|
stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
|
|
stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
|
|
|
|
|
|
|
+ return nil, toErr(ctx, err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
|
|
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
|
|
|
|
|
|
|
+ return nil, toErr(ctx, err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
resp, rerr := stream.Recv()
|
|
resp, rerr := stream.Recv()
|
|
|
if rerr != nil {
|
|
if rerr != nil {
|
|
|
- return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)}
|
|
|
|
|
|
|
+ return nil, toErr(ctx, rerr)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return LeaseKeepAliveResponse{
|
|
|
|
|
|
|
+ karesp := &LeaseKeepAliveResponse{
|
|
|
ResponseHeader: resp.GetHeader(),
|
|
ResponseHeader: resp.GetHeader(),
|
|
|
ID: LeaseID(resp.ID),
|
|
ID: LeaseID(resp.ID),
|
|
|
TTL: resp.TTL,
|
|
TTL: resp.TTL,
|
|
|
- Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ return karesp, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
|
func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ l.mu.Lock()
|
|
|
|
|
+ close(l.donec)
|
|
|
|
|
+ l.loopErr = gerr
|
|
|
|
|
+ for _, ka := range l.keepAlives {
|
|
|
|
|
+ ka.Close()
|
|
|
|
|
+ }
|
|
|
|
|
+ l.keepAlives = make(map[LeaseID]*keepAlive)
|
|
|
|
|
+ l.mu.Unlock()
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
stream, serr := l.resetRecv()
|
|
stream, serr := l.resetRecv()
|
|
|
for serr == nil {
|
|
for serr == nil {
|
|
|
resp, err := stream.Recv()
|
|
resp, err := stream.Recv()
|
|
@@ -458,7 +443,6 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|
|
ResponseHeader: resp.GetHeader(),
|
|
ResponseHeader: resp.GetHeader(),
|
|
|
ID: LeaseID(resp.ID),
|
|
ID: LeaseID(resp.ID),
|
|
|
TTL: resp.TTL,
|
|
TTL: resp.TTL,
|
|
|
- Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
l.mu.Lock()
|
|
l.mu.Lock()
|
|
@@ -472,7 +456,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|
|
if karesp.TTL <= 0 {
|
|
if karesp.TTL <= 0 {
|
|
|
// lease expired; close all keep alive channels
|
|
// lease expired; close all keep alive channels
|
|
|
delete(l.keepAlives, karesp.ID)
|
|
delete(l.keepAlives, karesp.ID)
|
|
|
- ka.Close(nil)
|
|
|
|
|
|
|
+ ka.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -481,7 +465,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|
|
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
|
|
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
|
|
|
for _, ch := range ka.chs {
|
|
for _, ch := range ka.chs {
|
|
|
select {
|
|
select {
|
|
|
- case ch <- *karesp:
|
|
|
|
|
|
|
+ case ch <- karesp:
|
|
|
ka.nextKeepAlive = nextKeepAlive
|
|
ka.nextKeepAlive = nextKeepAlive
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
@@ -502,7 +486,7 @@ func (l *lessor) deadlineLoop() {
|
|
|
for id, ka := range l.keepAlives {
|
|
for id, ka := range l.keepAlives {
|
|
|
if ka.deadline.Before(now) {
|
|
if ka.deadline.Before(now) {
|
|
|
// waited too long for response; lease may be expired
|
|
// waited too long for response; lease may be expired
|
|
|
- ka.Close(nil)
|
|
|
|
|
|
|
+ ka.Close()
|
|
|
delete(l.keepAlives, id)
|
|
delete(l.keepAlives, id)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -544,18 +528,9 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (ka *keepAlive) Close(err error) {
|
|
|
|
|
|
|
+func (ka *keepAlive) Close() {
|
|
|
close(ka.donec)
|
|
close(ka.donec)
|
|
|
for _, ch := range ka.chs {
|
|
for _, ch := range ka.chs {
|
|
|
- if err != nil {
|
|
|
|
|
- // try to post error if buffer space available
|
|
|
|
|
- select {
|
|
|
|
|
- case ch <- LeaseKeepAliveResponse{Err: err}:
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
close(ch)
|
|
close(ch)
|
|
|
}
|
|
}
|
|
|
- // so keepAliveCtxClose doesn't double-close ka.chs
|
|
|
|
|
- ka.chs, ka.ctxs = nil, nil
|
|
|
|
|
}
|
|
}
|