lease.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "sync"
  17. "time"
  18. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  19. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. "golang.org/x/net/context"
  21. "google.golang.org/grpc"
  22. )
  23. type (
  24. LeaseRevokeResponse pb.LeaseRevokeResponse
  25. LeaseID int64
  26. )
  27. // LeaseGrantResponse is used to convert the protobuf grant response.
  28. type LeaseGrantResponse struct {
  29. *pb.ResponseHeader
  30. ID LeaseID
  31. TTL int64
  32. Error string
  33. }
  34. // LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
  35. type LeaseKeepAliveResponse struct {
  36. *pb.ResponseHeader
  37. ID LeaseID
  38. TTL int64
  39. }
  40. const (
  41. // a small buffer to store unsent lease responses.
  42. leaseResponseChSize = 16
  43. // NoLease is a lease ID for the absence of a lease.
  44. NoLease LeaseID = 0
  45. )
  46. type Lease interface {
  47. // Grant creates a new lease.
  48. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
  49. // Revoke revokes the given lease.
  50. Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
  51. // KeepAlive keeps the given lease alive forever.
  52. KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
  53. // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
  54. // should be used instead of KeepAliveOnce.
  55. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
  56. // Close releases all resources Lease keeps for efficient communication
  57. // with the etcd server.
  58. Close() error
  59. }
  60. type lessor struct {
  61. mu sync.Mutex // guards all fields
  62. // donec is closed when recvKeepAliveLoop stops
  63. donec chan struct{}
  64. rc *remoteClient
  65. remote pb.LeaseClient
  66. stream pb.Lease_LeaseKeepAliveClient
  67. streamCancel context.CancelFunc
  68. stopCtx context.Context
  69. stopCancel context.CancelFunc
  70. keepAlives map[LeaseID]*keepAlive
  71. }
  72. // keepAlive multiplexes a keepalive for a lease over multiple channels
  73. type keepAlive struct {
  74. chs []chan<- *LeaseKeepAliveResponse
  75. ctxs []context.Context
  76. // deadline is the next time to send a keep alive message
  77. deadline time.Time
  78. // donec is closed on lease revoke, expiration, or cancel.
  79. donec chan struct{}
  80. }
  81. func NewLease(c *Client) Lease {
  82. l := &lessor{
  83. donec: make(chan struct{}),
  84. keepAlives: make(map[LeaseID]*keepAlive),
  85. }
  86. f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) }
  87. l.rc = newRemoteClient(c, f)
  88. l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
  89. go l.recvKeepAliveLoop()
  90. return l
  91. }
  92. func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
  93. cctx, cancel := context.WithCancel(ctx)
  94. done := cancelWhenStop(cancel, l.stopCtx.Done())
  95. defer close(done)
  96. for {
  97. r := &pb.LeaseGrantRequest{TTL: ttl}
  98. resp, err := l.getRemote().LeaseGrant(cctx, r)
  99. if err == nil {
  100. gresp := &LeaseGrantResponse{
  101. ResponseHeader: resp.GetHeader(),
  102. ID: LeaseID(resp.ID),
  103. TTL: resp.TTL,
  104. Error: resp.Error,
  105. }
  106. return gresp, nil
  107. }
  108. if isHaltErr(cctx, err) {
  109. return nil, rpctypes.Error(err)
  110. }
  111. if nerr := l.switchRemoteAndStream(err); nerr != nil {
  112. return nil, nerr
  113. }
  114. }
  115. }
  116. func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
  117. cctx, cancel := context.WithCancel(ctx)
  118. done := cancelWhenStop(cancel, l.stopCtx.Done())
  119. defer close(done)
  120. for {
  121. r := &pb.LeaseRevokeRequest{ID: int64(id)}
  122. resp, err := l.getRemote().LeaseRevoke(cctx, r)
  123. if err == nil {
  124. return (*LeaseRevokeResponse)(resp), nil
  125. }
  126. if isHaltErr(ctx, err) {
  127. return nil, rpctypes.Error(err)
  128. }
  129. if nerr := l.switchRemoteAndStream(err); nerr != nil {
  130. return nil, nerr
  131. }
  132. }
  133. }
  134. func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
  135. ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
  136. l.mu.Lock()
  137. ka, ok := l.keepAlives[id]
  138. if !ok {
  139. // create fresh keep alive
  140. ka = &keepAlive{
  141. chs: []chan<- *LeaseKeepAliveResponse{ch},
  142. ctxs: []context.Context{ctx},
  143. deadline: time.Now(),
  144. donec: make(chan struct{}),
  145. }
  146. l.keepAlives[id] = ka
  147. } else {
  148. // add channel and context to existing keep alive
  149. ka.ctxs = append(ka.ctxs, ctx)
  150. ka.chs = append(ka.chs, ch)
  151. }
  152. l.mu.Unlock()
  153. go l.keepAliveCtxCloser(id, ctx, ka.donec)
  154. return ch, nil
  155. }
  156. func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  157. cctx, cancel := context.WithCancel(ctx)
  158. done := cancelWhenStop(cancel, l.stopCtx.Done())
  159. defer close(done)
  160. for {
  161. resp, err := l.keepAliveOnce(cctx, id)
  162. if err == nil {
  163. if resp.TTL == 0 {
  164. err = rpctypes.ErrLeaseNotFound
  165. }
  166. return resp, err
  167. }
  168. if isHaltErr(ctx, err) {
  169. return nil, rpctypes.Error(err)
  170. }
  171. nerr := l.switchRemoteAndStream(err)
  172. if nerr != nil {
  173. return nil, nerr
  174. }
  175. }
  176. }
  177. func (l *lessor) Close() error {
  178. l.stopCancel()
  179. <-l.donec
  180. return nil
  181. }
  182. func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
  183. select {
  184. case <-donec:
  185. return
  186. case <-l.donec:
  187. return
  188. case <-ctx.Done():
  189. }
  190. l.mu.Lock()
  191. defer l.mu.Unlock()
  192. ka, ok := l.keepAlives[id]
  193. if !ok {
  194. return
  195. }
  196. // close channel and remove context if still associated with keep alive
  197. for i, c := range ka.ctxs {
  198. if c == ctx {
  199. close(ka.chs[i])
  200. ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
  201. ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
  202. break
  203. }
  204. }
  205. // remove if no one more listeners
  206. if len(ka.chs) == 0 {
  207. delete(l.keepAlives, id)
  208. }
  209. }
  210. func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
  211. cctx, cancel := context.WithCancel(ctx)
  212. defer cancel()
  213. stream, err := l.getRemote().LeaseKeepAlive(cctx)
  214. if err != nil {
  215. return nil, rpctypes.Error(err)
  216. }
  217. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
  218. if err != nil {
  219. return nil, rpctypes.Error(err)
  220. }
  221. resp, rerr := stream.Recv()
  222. if rerr != nil {
  223. return nil, rpctypes.Error(rerr)
  224. }
  225. karesp := &LeaseKeepAliveResponse{
  226. ResponseHeader: resp.GetHeader(),
  227. ID: LeaseID(resp.ID),
  228. TTL: resp.TTL,
  229. }
  230. return karesp, nil
  231. }
  232. func (l *lessor) recvKeepAliveLoop() {
  233. defer func() {
  234. l.mu.Lock()
  235. close(l.donec)
  236. for _, ka := range l.keepAlives {
  237. ka.Close()
  238. }
  239. l.keepAlives = make(map[LeaseID]*keepAlive)
  240. l.mu.Unlock()
  241. }()
  242. stream, serr := l.resetRecv()
  243. for serr == nil {
  244. resp, err := stream.Recv()
  245. if err != nil {
  246. if isHaltErr(l.stopCtx, err) {
  247. return
  248. }
  249. stream, serr = l.resetRecv()
  250. continue
  251. }
  252. l.recvKeepAlive(resp)
  253. }
  254. }
  255. // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
  256. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
  257. if err := l.switchRemoteAndStream(nil); err != nil {
  258. return nil, err
  259. }
  260. stream := l.getKeepAliveStream()
  261. go l.sendKeepAliveLoop(stream)
  262. return stream, nil
  263. }
  264. // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
  265. func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
  266. karesp := &LeaseKeepAliveResponse{
  267. ResponseHeader: resp.GetHeader(),
  268. ID: LeaseID(resp.ID),
  269. TTL: resp.TTL,
  270. }
  271. l.mu.Lock()
  272. defer l.mu.Unlock()
  273. ka, ok := l.keepAlives[karesp.ID]
  274. if !ok {
  275. return
  276. }
  277. if karesp.TTL <= 0 {
  278. // lease expired; close all keep alive channels
  279. delete(l.keepAlives, karesp.ID)
  280. ka.Close()
  281. return
  282. }
  283. // send update to all channels
  284. nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
  285. for _, ch := range ka.chs {
  286. select {
  287. case ch <- karesp:
  288. ka.deadline = nextDeadline
  289. default:
  290. }
  291. }
  292. }
  293. // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
  294. func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
  295. for {
  296. select {
  297. case <-time.After(500 * time.Millisecond):
  298. case <-stream.Context().Done():
  299. return
  300. case <-l.donec:
  301. return
  302. case <-l.stopCtx.Done():
  303. return
  304. }
  305. tosend := make([]LeaseID, 0)
  306. now := time.Now()
  307. l.mu.Lock()
  308. for id, ka := range l.keepAlives {
  309. if ka.deadline.Before(now) {
  310. tosend = append(tosend, id)
  311. }
  312. }
  313. l.mu.Unlock()
  314. for _, id := range tosend {
  315. r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
  316. if err := stream.Send(r); err != nil {
  317. // TODO do something with this error?
  318. return
  319. }
  320. }
  321. }
  322. }
  323. func (l *lessor) getRemote() pb.LeaseClient {
  324. l.rc.mu.Lock()
  325. defer l.rc.mu.Unlock()
  326. return l.remote
  327. }
  328. func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
  329. l.mu.Lock()
  330. defer l.mu.Unlock()
  331. return l.stream
  332. }
  333. func (l *lessor) switchRemoteAndStream(prevErr error) error {
  334. for {
  335. if prevErr != nil {
  336. err := l.rc.reconnectWait(l.stopCtx, prevErr)
  337. if err != nil {
  338. return rpctypes.Error(err)
  339. }
  340. }
  341. if prevErr = l.newStream(); prevErr == nil {
  342. return nil
  343. }
  344. }
  345. }
  346. func (l *lessor) newStream() error {
  347. sctx, cancel := context.WithCancel(l.stopCtx)
  348. stream, err := l.getRemote().LeaseKeepAlive(sctx)
  349. if err != nil {
  350. cancel()
  351. return rpctypes.Error(err)
  352. }
  353. l.mu.Lock()
  354. defer l.mu.Unlock()
  355. if l.stream != nil && l.streamCancel != nil {
  356. l.stream.CloseSend()
  357. l.streamCancel()
  358. }
  359. l.streamCancel = cancel
  360. l.stream = stream
  361. return nil
  362. }
  363. func (ka *keepAlive) Close() {
  364. close(ka.donec)
  365. for _, ch := range ka.chs {
  366. close(ch)
  367. }
  368. }
  369. // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
  370. // should be closed when the work is finished. When done fires, cancelWhenStop will release
  371. // its internal resource.
  372. func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
  373. done := make(chan struct{}, 1)
  374. go func() {
  375. select {
  376. case <-stopc:
  377. case <-done:
  378. }
  379. cancel()
  380. }()
  381. return done
  382. }