lease.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "sync"
  17. "time"
  18. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  19. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. "golang.org/x/net/context"
  21. "google.golang.org/grpc"
  22. "google.golang.org/grpc/metadata"
  23. )
  24. type (
  25. LeaseRevokeResponse pb.LeaseRevokeResponse
  26. LeaseID int64
  27. )
  28. // LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
  29. type LeaseGrantResponse struct {
  30. *pb.ResponseHeader
  31. ID LeaseID
  32. TTL int64
  33. Error string
  34. }
  35. // LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
  36. type LeaseKeepAliveResponse struct {
  37. *pb.ResponseHeader
  38. ID LeaseID
  39. TTL int64
  40. }
  41. // LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
  42. type LeaseTimeToLiveResponse struct {
  43. *pb.ResponseHeader
  44. ID LeaseID `json:"id"`
  45. // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
  46. TTL int64 `json:"ttl"`
  47. // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
  48. GrantedTTL int64 `json:"granted-ttl"`
  49. // Keys is the list of keys attached to this lease.
  50. Keys [][]byte `json:"keys"`
  51. }
  52. // LeaseStatus represents a lease status.
  53. type LeaseStatus struct {
  54. ID LeaseID `json:"id"`
  55. // TODO: TTL int64
  56. }
  57. const (
  58. // defaultTTL is the assumed lease TTL used for the first keepalive
  59. // deadline before the actual TTL is known to the client.
  60. defaultTTL = 5 * time.Second
  61. // NoLease is a lease ID for the absence of a lease.
  62. NoLease LeaseID = 0
  63. // retryConnWait is how long to wait before retrying request due to an error
  64. retryConnWait = 500 * time.Millisecond
  65. )
  66. // LeaseResponseChSize is the size of buffer to store unsent lease responses.
  67. // WARNING: DO NOT UPDATE.
  68. // Only for testing purposes.
  69. var LeaseResponseChSize = 16
  70. // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
  71. //
  72. // This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
  73. type ErrKeepAliveHalted struct {
  74. Reason error
  75. }
  76. func (e ErrKeepAliveHalted) Error() string {
  77. s := "etcdclient: leases keep alive halted"
  78. if e.Reason != nil {
  79. s += ": " + e.Reason.Error()
  80. }
  81. return s
  82. }
  83. type Lease interface {
  84. // Grant creates a new lease.
  85. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
  86. // Revoke revokes the given lease.
  87. Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
  88. // TimeToLive retrieves the lease information of the given lease ID.
  89. TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
  90. // KeepAlive keeps the given lease alive forever.
  91. KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
  92. // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
  93. // should be used instead of KeepAliveOnce.
  94. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
  95. // Close releases all resources Lease keeps for efficient communication
  96. // with the etcd server.
  97. Close() error
  98. }
  99. type lessor struct {
  100. mu sync.Mutex // guards all fields
  101. // donec is closed and loopErr is set when recvKeepAliveLoop stops
  102. donec chan struct{}
  103. loopErr error
  104. remote pb.LeaseClient
  105. stream pb.Lease_LeaseKeepAliveClient
  106. streamCancel context.CancelFunc
  107. stopCtx context.Context
  108. stopCancel context.CancelFunc
  109. keepAlives map[LeaseID]*keepAlive
  110. // firstKeepAliveTimeout is the timeout for the first keepalive request
  111. // before the actual TTL is known to the lease client
  112. firstKeepAliveTimeout time.Duration
  113. // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
  114. firstKeepAliveOnce sync.Once
  115. callOpts []grpc.CallOption
  116. }
  117. // keepAlive multiplexes a keepalive for a lease over multiple channels
  118. type keepAlive struct {
  119. chs []chan<- *LeaseKeepAliveResponse
  120. ctxs []context.Context
  121. // deadline is the time the keep alive channels close if no response
  122. deadline time.Time
  123. // nextKeepAlive is when to send the next keep alive message
  124. nextKeepAlive time.Time
  125. // donec is closed on lease revoke, expiration, or cancel.
  126. donec chan struct{}
  127. }
  128. func NewLease(c *Client) Lease {
  129. return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
  130. }
  131. func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
  132. l := &lessor{
  133. donec: make(chan struct{}),
  134. keepAlives: make(map[LeaseID]*keepAlive),
  135. remote: remote,
  136. firstKeepAliveTimeout: keepAliveTimeout,
  137. }
  138. if l.firstKeepAliveTimeout == time.Second {
  139. l.firstKeepAliveTimeout = defaultTTL
  140. }
  141. if c != nil {
  142. l.callOpts = c.callOpts
  143. }
  144. reqLeaderCtx := WithRequireLeader(context.Background())
  145. l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
  146. return l
  147. }
  148. func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
  149. r := &pb.LeaseGrantRequest{TTL: ttl}
  150. resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
  151. if err == nil {
  152. gresp := &LeaseGrantResponse{
  153. ResponseHeader: resp.GetHeader(),
  154. ID: LeaseID(resp.ID),
  155. TTL: resp.TTL,
  156. Error: resp.Error,
  157. }
  158. return gresp, nil
  159. }
  160. return nil, toErr(ctx, err)
  161. }
  162. func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
  163. r := &pb.LeaseRevokeRequest{ID: int64(id)}
  164. resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
  165. if err == nil {
  166. return (*LeaseRevokeResponse)(resp), nil
  167. }
  168. return nil, toErr(ctx, err)
  169. }
  170. func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
  171. r := toLeaseTimeToLiveRequest(id, opts...)
  172. resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
  173. if err == nil {
  174. gresp := &LeaseTimeToLiveResponse{
  175. ResponseHeader: resp.GetHeader(),
  176. ID: LeaseID(resp.ID),
  177. TTL: resp.TTL,
  178. GrantedTTL: resp.GrantedTTL,
  179. Keys: resp.Keys,
  180. }
  181. return gresp, nil
  182. }
  183. return nil, toErr(ctx, err)
  184. }
  185. func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
  186. ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
  187. l.mu.Lock()
  188. // ensure that recvKeepAliveLoop is still running
  189. select {
  190. case <-l.donec:
  191. err := l.loopErr
  192. l.mu.Unlock()
  193. close(ch)
  194. return ch, ErrKeepAliveHalted{Reason: err}
  195. default:
  196. }
  197. ka, ok := l.keepAlives[id]
  198. if !ok {
  199. // create fresh keep alive
  200. ka = &keepAlive{
  201. chs: []chan<- *LeaseKeepAliveResponse{ch},
  202. ctxs: []context.Context{ctx},
  203. deadline: time.Now().Add(l.firstKeepAliveTimeout),
  204. nextKeepAlive: time.Now(),
  205. donec: make(chan struct{}),
  206. }
  207. l.keepAlives[id] = ka
  208. } else {
  209. // add channel and context to existing keep alive
  210. ka.ctxs = append(ka.ctxs, ctx)
  211. ka.chs = append(ka.chs, ch)
  212. }
  213. l.mu.Unlock()
  214. go l.keepAliveCtxCloser(id, ctx, ka.donec)
  215. l.firstKeepAliveOnce.Do(func() {
  216. go l.recvKeepAliveLoop()
  217. go l.deadlineLoop()
  218. })
  219. return ch, nil
  220. }
  221. func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  222. for {
  223. resp, err := l.keepAliveOnce(ctx, id)
  224. if err == nil {
  225. if resp.TTL <= 0 {
  226. err = rpctypes.ErrLeaseNotFound
  227. }
  228. return resp, err
  229. }
  230. if isHaltErr(ctx, err) {
  231. return nil, toErr(ctx, err)
  232. }
  233. }
  234. }
  235. func (l *lessor) Close() error {
  236. l.stopCancel()
  237. // close for synchronous teardown if stream goroutines never launched
  238. l.firstKeepAliveOnce.Do(func() { close(l.donec) })
  239. <-l.donec
  240. return nil
  241. }
  242. func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
  243. select {
  244. case <-donec:
  245. return
  246. case <-l.donec:
  247. return
  248. case <-ctx.Done():
  249. }
  250. l.mu.Lock()
  251. defer l.mu.Unlock()
  252. ka, ok := l.keepAlives[id]
  253. if !ok {
  254. return
  255. }
  256. // close channel and remove context if still associated with keep alive
  257. for i, c := range ka.ctxs {
  258. if c == ctx {
  259. close(ka.chs[i])
  260. ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
  261. ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
  262. break
  263. }
  264. }
  265. // remove if no one more listeners
  266. if len(ka.chs) == 0 {
  267. delete(l.keepAlives, id)
  268. }
  269. }
  270. // closeRequireLeader scans keepAlives for ctxs that have require leader
  271. // and closes the associated channels.
  272. func (l *lessor) closeRequireLeader() {
  273. l.mu.Lock()
  274. defer l.mu.Unlock()
  275. for _, ka := range l.keepAlives {
  276. reqIdxs := 0
  277. // find all required leader channels, close, mark as nil
  278. for i, ctx := range ka.ctxs {
  279. md, ok := metadata.FromOutgoingContext(ctx)
  280. if !ok {
  281. continue
  282. }
  283. ks := md[rpctypes.MetadataRequireLeaderKey]
  284. if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
  285. continue
  286. }
  287. close(ka.chs[i])
  288. ka.chs[i] = nil
  289. reqIdxs++
  290. }
  291. if reqIdxs == 0 {
  292. continue
  293. }
  294. // remove all channels that required a leader from keepalive
  295. newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
  296. newCtxs := make([]context.Context, len(newChs))
  297. newIdx := 0
  298. for i := range ka.chs {
  299. if ka.chs[i] == nil {
  300. continue
  301. }
  302. newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
  303. newIdx++
  304. }
  305. ka.chs, ka.ctxs = newChs, newCtxs
  306. }
  307. }
  308. func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  309. cctx, cancel := context.WithCancel(ctx)
  310. defer cancel()
  311. stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
  312. if err != nil {
  313. return nil, toErr(ctx, err)
  314. }
  315. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
  316. if err != nil {
  317. return nil, toErr(ctx, err)
  318. }
  319. resp, rerr := stream.Recv()
  320. if rerr != nil {
  321. return nil, toErr(ctx, rerr)
  322. }
  323. karesp := &LeaseKeepAliveResponse{
  324. ResponseHeader: resp.GetHeader(),
  325. ID: LeaseID(resp.ID),
  326. TTL: resp.TTL,
  327. }
  328. return karesp, nil
  329. }
  330. func (l *lessor) recvKeepAliveLoop() (gerr error) {
  331. defer func() {
  332. l.mu.Lock()
  333. close(l.donec)
  334. l.loopErr = gerr
  335. for _, ka := range l.keepAlives {
  336. ka.close()
  337. }
  338. l.keepAlives = make(map[LeaseID]*keepAlive)
  339. l.mu.Unlock()
  340. }()
  341. for {
  342. stream, err := l.resetRecv()
  343. if err != nil {
  344. if canceledByCaller(l.stopCtx, err) {
  345. return err
  346. }
  347. } else {
  348. for {
  349. resp, err := stream.Recv()
  350. if err != nil {
  351. if canceledByCaller(l.stopCtx, err) {
  352. return err
  353. }
  354. if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
  355. l.closeRequireLeader()
  356. }
  357. break
  358. }
  359. l.recvKeepAlive(resp)
  360. }
  361. }
  362. select {
  363. case <-time.After(retryConnWait):
  364. continue
  365. case <-l.stopCtx.Done():
  366. return l.stopCtx.Err()
  367. }
  368. }
  369. }
  370. // resetRecv opens a new lease stream and starts sending keep alive requests.
  371. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
  372. sctx, cancel := context.WithCancel(l.stopCtx)
  373. stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
  374. if err != nil {
  375. cancel()
  376. return nil, err
  377. }
  378. l.mu.Lock()
  379. defer l.mu.Unlock()
  380. if l.stream != nil && l.streamCancel != nil {
  381. l.streamCancel()
  382. }
  383. l.streamCancel = cancel
  384. l.stream = stream
  385. go l.sendKeepAliveLoop(stream)
  386. return stream, nil
  387. }
  388. // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
  389. func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
  390. karesp := &LeaseKeepAliveResponse{
  391. ResponseHeader: resp.GetHeader(),
  392. ID: LeaseID(resp.ID),
  393. TTL: resp.TTL,
  394. }
  395. l.mu.Lock()
  396. defer l.mu.Unlock()
  397. ka, ok := l.keepAlives[karesp.ID]
  398. if !ok {
  399. return
  400. }
  401. if karesp.TTL <= 0 {
  402. // lease expired; close all keep alive channels
  403. delete(l.keepAlives, karesp.ID)
  404. ka.close()
  405. return
  406. }
  407. // send update to all channels
  408. nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
  409. ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
  410. for _, ch := range ka.chs {
  411. select {
  412. case ch <- karesp:
  413. default:
  414. }
  415. // still advance in order to rate-limit keep-alive sends
  416. ka.nextKeepAlive = nextKeepAlive
  417. }
  418. }
  419. // deadlineLoop reaps any keep alive channels that have not received a response
  420. // within the lease TTL
  421. func (l *lessor) deadlineLoop() {
  422. for {
  423. select {
  424. case <-time.After(time.Second):
  425. case <-l.donec:
  426. return
  427. }
  428. now := time.Now()
  429. l.mu.Lock()
  430. for id, ka := range l.keepAlives {
  431. if ka.deadline.Before(now) {
  432. // waited too long for response; lease may be expired
  433. ka.close()
  434. delete(l.keepAlives, id)
  435. }
  436. }
  437. l.mu.Unlock()
  438. }
  439. }
  440. // sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
  441. func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
  442. for {
  443. var tosend []LeaseID
  444. now := time.Now()
  445. l.mu.Lock()
  446. for id, ka := range l.keepAlives {
  447. if ka.nextKeepAlive.Before(now) {
  448. tosend = append(tosend, id)
  449. }
  450. }
  451. l.mu.Unlock()
  452. for _, id := range tosend {
  453. r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
  454. if err := stream.Send(r); err != nil {
  455. // TODO do something with this error?
  456. return
  457. }
  458. }
  459. select {
  460. case <-time.After(500 * time.Millisecond):
  461. case <-stream.Context().Done():
  462. return
  463. case <-l.donec:
  464. return
  465. case <-l.stopCtx.Done():
  466. return
  467. }
  468. }
  469. }
  470. func (ka *keepAlive) close() {
  471. close(ka.donec)
  472. for _, ch := range ka.chs {
  473. close(ch)
  474. }
  475. }