lease.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "sync"
  17. "time"
  18. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  19. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. "golang.org/x/net/context"
  21. )
  22. type (
  23. LeaseRevokeResponse pb.LeaseRevokeResponse
  24. LeaseID int64
  25. )
  26. // LeaseGrantResponse is used to convert the protobuf grant response.
  27. type LeaseGrantResponse struct {
  28. *pb.ResponseHeader
  29. ID LeaseID
  30. TTL int64
  31. Error string
  32. }
  33. // LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
  34. type LeaseKeepAliveResponse struct {
  35. *pb.ResponseHeader
  36. ID LeaseID
  37. TTL int64
  38. }
  39. const (
  40. // defaultTTL is the assumed lease TTL used for the first keepalive
  41. // deadline before the actual TTL is known to the client.
  42. defaultTTL = 5 * time.Second
  43. // a small buffer to store unsent lease responses.
  44. leaseResponseChSize = 16
  45. // NoLease is a lease ID for the absence of a lease.
  46. NoLease LeaseID = 0
  47. )
  48. type Lease interface {
  49. // Grant creates a new lease.
  50. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
  51. // Revoke revokes the given lease.
  52. Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
  53. // KeepAlive keeps the given lease alive forever.
  54. KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
  55. // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
  56. // should be used instead of KeepAliveOnce.
  57. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
  58. // Close releases all resources Lease keeps for efficient communication
  59. // with the etcd server.
  60. Close() error
  61. }
  62. type lessor struct {
  63. mu sync.Mutex // guards all fields
  64. // donec is closed when recvKeepAliveLoop stops
  65. donec chan struct{}
  66. remote pb.LeaseClient
  67. stream pb.Lease_LeaseKeepAliveClient
  68. streamCancel context.CancelFunc
  69. stopCtx context.Context
  70. stopCancel context.CancelFunc
  71. keepAlives map[LeaseID]*keepAlive
  72. // firstKeepAliveTimeout is the timeout for the first keepalive request
  73. // before the actual TTL is known to the lease client
  74. firstKeepAliveTimeout time.Duration
  75. }
  76. // keepAlive multiplexes a keepalive for a lease over multiple channels
  77. type keepAlive struct {
  78. chs []chan<- *LeaseKeepAliveResponse
  79. ctxs []context.Context
  80. // deadline is the time the keep alive channels close if no response
  81. deadline time.Time
  82. // nextKeepAlive is when to send the next keep alive message
  83. nextKeepAlive time.Time
  84. // donec is closed on lease revoke, expiration, or cancel.
  85. donec chan struct{}
  86. }
  87. func NewLease(c *Client) Lease {
  88. l := &lessor{
  89. donec: make(chan struct{}),
  90. keepAlives: make(map[LeaseID]*keepAlive),
  91. remote: pb.NewLeaseClient(c.conn),
  92. firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
  93. }
  94. if l.firstKeepAliveTimeout == time.Second {
  95. l.firstKeepAliveTimeout = defaultTTL
  96. }
  97. l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
  98. go l.recvKeepAliveLoop()
  99. go l.deadlineLoop()
  100. return l
  101. }
  102. func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
  103. cctx, cancel := context.WithCancel(ctx)
  104. done := cancelWhenStop(cancel, l.stopCtx.Done())
  105. defer close(done)
  106. for {
  107. r := &pb.LeaseGrantRequest{TTL: ttl}
  108. resp, err := l.remote.LeaseGrant(cctx, r)
  109. if err == nil {
  110. gresp := &LeaseGrantResponse{
  111. ResponseHeader: resp.GetHeader(),
  112. ID: LeaseID(resp.ID),
  113. TTL: resp.TTL,
  114. Error: resp.Error,
  115. }
  116. return gresp, nil
  117. }
  118. if isHaltErr(cctx, err) {
  119. return nil, toErr(ctx, err)
  120. }
  121. if nerr := l.newStream(); nerr != nil {
  122. return nil, nerr
  123. }
  124. }
  125. }
  126. func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
  127. cctx, cancel := context.WithCancel(ctx)
  128. done := cancelWhenStop(cancel, l.stopCtx.Done())
  129. defer close(done)
  130. for {
  131. r := &pb.LeaseRevokeRequest{ID: int64(id)}
  132. resp, err := l.remote.LeaseRevoke(cctx, r)
  133. if err == nil {
  134. return (*LeaseRevokeResponse)(resp), nil
  135. }
  136. if isHaltErr(ctx, err) {
  137. return nil, toErr(ctx, err)
  138. }
  139. if nerr := l.newStream(); nerr != nil {
  140. return nil, nerr
  141. }
  142. }
  143. }
  144. func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
  145. ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
  146. l.mu.Lock()
  147. ka, ok := l.keepAlives[id]
  148. if !ok {
  149. // create fresh keep alive
  150. ka = &keepAlive{
  151. chs: []chan<- *LeaseKeepAliveResponse{ch},
  152. ctxs: []context.Context{ctx},
  153. deadline: time.Now().Add(l.firstKeepAliveTimeout),
  154. nextKeepAlive: time.Now(),
  155. donec: make(chan struct{}),
  156. }
  157. l.keepAlives[id] = ka
  158. } else {
  159. // add channel and context to existing keep alive
  160. ka.ctxs = append(ka.ctxs, ctx)
  161. ka.chs = append(ka.chs, ch)
  162. }
  163. l.mu.Unlock()
  164. go l.keepAliveCtxCloser(id, ctx, ka.donec)
  165. return ch, nil
  166. }
  167. func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  168. cctx, cancel := context.WithCancel(ctx)
  169. done := cancelWhenStop(cancel, l.stopCtx.Done())
  170. defer close(done)
  171. for {
  172. resp, err := l.keepAliveOnce(cctx, id)
  173. if err == nil {
  174. if resp.TTL == 0 {
  175. err = rpctypes.ErrLeaseNotFound
  176. }
  177. return resp, err
  178. }
  179. if isHaltErr(ctx, err) {
  180. return nil, toErr(ctx, err)
  181. }
  182. if nerr := l.newStream(); nerr != nil {
  183. return nil, nerr
  184. }
  185. }
  186. }
  187. func (l *lessor) Close() error {
  188. l.stopCancel()
  189. <-l.donec
  190. return nil
  191. }
  192. func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
  193. select {
  194. case <-donec:
  195. return
  196. case <-l.donec:
  197. return
  198. case <-ctx.Done():
  199. }
  200. l.mu.Lock()
  201. defer l.mu.Unlock()
  202. ka, ok := l.keepAlives[id]
  203. if !ok {
  204. return
  205. }
  206. // close channel and remove context if still associated with keep alive
  207. for i, c := range ka.ctxs {
  208. if c == ctx {
  209. close(ka.chs[i])
  210. ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
  211. ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
  212. break
  213. }
  214. }
  215. // remove if no one more listeners
  216. if len(ka.chs) == 0 {
  217. delete(l.keepAlives, id)
  218. }
  219. }
  220. func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  221. cctx, cancel := context.WithCancel(ctx)
  222. defer cancel()
  223. stream, err := l.remote.LeaseKeepAlive(cctx)
  224. if err != nil {
  225. return nil, toErr(ctx, err)
  226. }
  227. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
  228. if err != nil {
  229. return nil, toErr(ctx, err)
  230. }
  231. resp, rerr := stream.Recv()
  232. if rerr != nil {
  233. return nil, toErr(ctx, rerr)
  234. }
  235. karesp := &LeaseKeepAliveResponse{
  236. ResponseHeader: resp.GetHeader(),
  237. ID: LeaseID(resp.ID),
  238. TTL: resp.TTL,
  239. }
  240. return karesp, nil
  241. }
  242. func (l *lessor) recvKeepAliveLoop() {
  243. defer func() {
  244. l.mu.Lock()
  245. close(l.donec)
  246. for _, ka := range l.keepAlives {
  247. ka.Close()
  248. }
  249. l.keepAlives = make(map[LeaseID]*keepAlive)
  250. l.mu.Unlock()
  251. }()
  252. stream, serr := l.resetRecv()
  253. for serr == nil {
  254. resp, err := stream.Recv()
  255. if err != nil {
  256. if isHaltErr(l.stopCtx, err) {
  257. return
  258. }
  259. stream, serr = l.resetRecv()
  260. continue
  261. }
  262. l.recvKeepAlive(resp)
  263. }
  264. }
  265. // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
  266. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
  267. if err := l.newStream(); err != nil {
  268. return nil, err
  269. }
  270. stream := l.getKeepAliveStream()
  271. go l.sendKeepAliveLoop(stream)
  272. return stream, nil
  273. }
  274. // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
  275. func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
  276. karesp := &LeaseKeepAliveResponse{
  277. ResponseHeader: resp.GetHeader(),
  278. ID: LeaseID(resp.ID),
  279. TTL: resp.TTL,
  280. }
  281. l.mu.Lock()
  282. defer l.mu.Unlock()
  283. ka, ok := l.keepAlives[karesp.ID]
  284. if !ok {
  285. return
  286. }
  287. if karesp.TTL <= 0 {
  288. // lease expired; close all keep alive channels
  289. delete(l.keepAlives, karesp.ID)
  290. ka.Close()
  291. return
  292. }
  293. // send update to all channels
  294. nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
  295. ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
  296. for _, ch := range ka.chs {
  297. select {
  298. case ch <- karesp:
  299. ka.nextKeepAlive = nextKeepAlive
  300. default:
  301. }
  302. }
  303. }
  304. // deadlineLoop reaps any keep alive channels that have not recieved a resposne within
  305. // the lease TTL
  306. func (l *lessor) deadlineLoop() {
  307. for {
  308. select {
  309. case <-time.After(time.Second):
  310. case <-l.donec:
  311. return
  312. }
  313. now := time.Now()
  314. l.mu.Lock()
  315. for id, ka := range l.keepAlives {
  316. if ka.deadline.Before(now) {
  317. // waited too long for response; lease may be expired
  318. ka.Close()
  319. delete(l.keepAlives, id)
  320. }
  321. }
  322. l.mu.Unlock()
  323. }
  324. }
  325. // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
  326. func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
  327. for {
  328. select {
  329. case <-time.After(500 * time.Millisecond):
  330. case <-stream.Context().Done():
  331. return
  332. case <-l.donec:
  333. return
  334. case <-l.stopCtx.Done():
  335. return
  336. }
  337. tosend := make([]LeaseID, 0)
  338. now := time.Now()
  339. l.mu.Lock()
  340. for id, ka := range l.keepAlives {
  341. if ka.nextKeepAlive.Before(now) {
  342. tosend = append(tosend, id)
  343. }
  344. }
  345. l.mu.Unlock()
  346. for _, id := range tosend {
  347. r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
  348. if err := stream.Send(r); err != nil {
  349. // TODO do something with this error?
  350. return
  351. }
  352. }
  353. }
  354. }
  355. func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
  356. l.mu.Lock()
  357. defer l.mu.Unlock()
  358. return l.stream
  359. }
  360. func (l *lessor) newStream() error {
  361. sctx, cancel := context.WithCancel(l.stopCtx)
  362. stream, err := l.remote.LeaseKeepAlive(sctx)
  363. if err != nil {
  364. cancel()
  365. return toErr(sctx, err)
  366. }
  367. l.mu.Lock()
  368. defer l.mu.Unlock()
  369. if l.stream != nil && l.streamCancel != nil {
  370. l.stream.CloseSend()
  371. l.streamCancel()
  372. }
  373. l.streamCancel = cancel
  374. l.stream = stream
  375. return nil
  376. }
  377. func (ka *keepAlive) Close() {
  378. close(ka.donec)
  379. for _, ch := range ka.chs {
  380. close(ch)
  381. }
  382. }
  383. // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
  384. // should be closed when the work is finished. When done fires, cancelWhenStop will release
  385. // its internal resource.
  386. func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
  387. done := make(chan struct{}, 1)
  388. go func() {
  389. select {
  390. case <-stopc:
  391. case <-done:
  392. }
  393. cancel()
  394. }()
  395. return done
  396. }