|
|
@@ -57,8 +57,10 @@ type Lease interface {
|
|
|
type lessor struct {
|
|
|
c *Client
|
|
|
|
|
|
- mu sync.Mutex // guards all fields
|
|
|
- conn *grpc.ClientConn // conn in-use
|
|
|
+ mu sync.Mutex // guards all fields
|
|
|
+ conn *grpc.ClientConn // conn in-use
|
|
|
+ initedc chan bool
|
|
|
+
|
|
|
remote pb.LeaseClient
|
|
|
|
|
|
stream pb.Lease_LeaseKeepAliveClient
|
|
|
@@ -76,12 +78,17 @@ func NewLease(c *Client) Lease {
|
|
|
c: c,
|
|
|
conn: c.ActiveConnection(),
|
|
|
|
|
|
+ initedc: make(chan bool, 1),
|
|
|
+
|
|
|
keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse),
|
|
|
deadlines: make(map[lease.LeaseID]time.Time),
|
|
|
}
|
|
|
|
|
|
l.remote = pb.NewLeaseClient(l.conn)
|
|
|
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
|
|
|
+
|
|
|
+ l.initedc <- false
|
|
|
+
|
|
|
go l.recvKeepAliveLoop()
|
|
|
go l.sendKeepAliveLoop()
|
|
|
|
|
|
@@ -103,7 +110,6 @@ func (l *lessor) Create(ctx context.Context, ttl int64) (*LeaseCreateResponse, e
|
|
|
if isRPCError(err) {
|
|
|
return nil, err
|
|
|
}
|
|
|
-
|
|
|
if nerr := l.switchRemoteAndStream(err); nerr != nil {
|
|
|
return nil, nerr
|
|
|
}
|
|
|
@@ -354,14 +360,17 @@ func (l *lessor) newStream() error {
|
|
|
}
|
|
|
|
|
|
func (l *lessor) initStream() bool {
|
|
|
- if l.getKeepAliveStream() != nil {
|
|
|
+ ok := <-l.initedc
|
|
|
+ if ok {
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
err := l.switchRemoteAndStream(nil)
|
|
|
if err == nil {
|
|
|
+ l.initedc <- true
|
|
|
return true
|
|
|
}
|
|
|
+ l.initedc <- false
|
|
|
return false
|
|
|
}
|
|
|
|