v3_lease_test.go 24 KB


  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "github.com/coreos/etcd/internal/mvcc/mvccpb"
  23. "github.com/coreos/etcd/pkg/testutil"
  24. "google.golang.org/grpc/metadata"
  25. )
  26. // TestV3LeasePrmote ensures the newly elected leader can promote itself
  27. // to the primary lessor, refresh the leases and start to manage leases.
  28. // TODO: use customized clock to make this test go faster?
  29. func TestV3LeasePrmote(t *testing.T) {
  30. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  31. defer clus.Terminate(t)
  32. // create lease
  33. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 3})
  34. ttl := time.Duration(lresp.TTL) * time.Second
  35. afterGrant := time.Now()
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. if lresp.Error != "" {
  40. t.Fatal(lresp.Error)
  41. }
  42. // wait until the lease is going to expire.
  43. time.Sleep(time.Until(afterGrant.Add(ttl - time.Second)))
  44. // kill the current leader, all leases should be refreshed.
  45. toStop := clus.waitLeader(t, clus.Members)
  46. beforeStop := time.Now()
  47. clus.Members[toStop].Stop(t)
  48. var toWait []*member
  49. for i, m := range clus.Members {
  50. if i != toStop {
  51. toWait = append(toWait, m)
  52. }
  53. }
  54. clus.waitLeader(t, toWait)
  55. clus.Members[toStop].Restart(t)
  56. clus.waitLeader(t, clus.Members)
  57. afterReelect := time.Now()
  58. // ensure lease is refreshed by waiting for a "long" time.
  59. // it was going to expire anyway.
  60. time.Sleep(time.Until(beforeStop.Add(ttl - time.Second)))
  61. if !leaseExist(t, clus, lresp.ID) {
  62. t.Error("unexpected lease not exists")
  63. }
  64. // wait until the renewed lease is expected to expire.
  65. time.Sleep(time.Until(afterReelect.Add(ttl)))
  66. // wait for up to 10 seconds for lease to expire.
  67. expiredCondition := func() (bool, error) {
  68. return !leaseExist(t, clus, lresp.ID), nil
  69. }
  70. expired, err := testutil.Poll(100*time.Millisecond, 10*time.Second, expiredCondition)
  71. if err != nil {
  72. t.Error(err)
  73. }
  74. if !expired {
  75. t.Error("unexpected lease exists")
  76. }
  77. }
  78. // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
  79. func TestV3LeaseRevoke(t *testing.T) {
  80. defer testutil.AfterTest(t)
  81. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  82. lc := toGRPC(clus.RandClient()).Lease
  83. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  84. return err
  85. })
  86. }
  87. // TestV3LeaseGrantById ensures leases may be created by a given id.
  88. func TestV3LeaseGrantByID(t *testing.T) {
  89. defer testutil.AfterTest(t)
  90. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  91. defer clus.Terminate(t)
  92. // create fixed lease
  93. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  94. context.TODO(),
  95. &pb.LeaseGrantRequest{ID: 1, TTL: 1})
  96. if err != nil {
  97. t.Errorf("could not create lease 1 (%v)", err)
  98. }
  99. if lresp.ID != 1 {
  100. t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
  101. }
  102. // create duplicate fixed lease
  103. _, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
  104. context.TODO(),
  105. &pb.LeaseGrantRequest{ID: 1, TTL: 1})
  106. if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
  107. t.Error(err)
  108. }
  109. // create fresh fixed lease
  110. lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
  111. context.TODO(),
  112. &pb.LeaseGrantRequest{ID: 2, TTL: 1})
  113. if err != nil {
  114. t.Errorf("could not create lease 2 (%v)", err)
  115. }
  116. if lresp.ID != 2 {
  117. t.Errorf("got id %v, wanted id %v", lresp.ID, 2)
  118. }
  119. }
  120. // TestV3LeaseExpire ensures a key is deleted once a key expires.
  121. func TestV3LeaseExpire(t *testing.T) {
  122. defer testutil.AfterTest(t)
  123. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  124. // let lease lapse; wait for deleted key
  125. ctx, cancel := context.WithCancel(context.Background())
  126. defer cancel()
  127. wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx)
  128. if err != nil {
  129. return err
  130. }
  131. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  132. CreateRequest: &pb.WatchCreateRequest{
  133. Key: []byte("foo"), StartRevision: 1}}}
  134. if err := wStream.Send(wreq); err != nil {
  135. return err
  136. }
  137. if _, err := wStream.Recv(); err != nil {
  138. // the 'created' message
  139. return err
  140. }
  141. if _, err := wStream.Recv(); err != nil {
  142. // the 'put' message
  143. return err
  144. }
  145. errc := make(chan error, 1)
  146. go func() {
  147. resp, err := wStream.Recv()
  148. switch {
  149. case err != nil:
  150. errc <- err
  151. case len(resp.Events) != 1:
  152. fallthrough
  153. case resp.Events[0].Type != mvccpb.DELETE:
  154. errc <- fmt.Errorf("expected key delete, got %v", resp)
  155. default:
  156. errc <- nil
  157. }
  158. }()
  159. select {
  160. case <-time.After(15 * time.Second):
  161. return fmt.Errorf("lease expiration too slow")
  162. case err := <-errc:
  163. return err
  164. }
  165. })
  166. }
  167. // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
  168. func TestV3LeaseKeepAlive(t *testing.T) {
  169. defer testutil.AfterTest(t)
  170. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  171. lc := toGRPC(clus.RandClient()).Lease
  172. lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
  173. ctx, cancel := context.WithCancel(context.Background())
  174. defer cancel()
  175. lac, err := lc.LeaseKeepAlive(ctx)
  176. if err != nil {
  177. return err
  178. }
  179. defer lac.CloseSend()
  180. // renew long enough so lease would've expired otherwise
  181. for i := 0; i < 3; i++ {
  182. if err = lac.Send(lreq); err != nil {
  183. return err
  184. }
  185. lresp, rxerr := lac.Recv()
  186. if rxerr != nil {
  187. return rxerr
  188. }
  189. if lresp.ID != leaseID {
  190. return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
  191. }
  192. time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
  193. }
  194. _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  195. return err
  196. })
  197. }
  198. // TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster.
  199. func TestV3LeaseExists(t *testing.T) {
  200. defer testutil.AfterTest(t)
  201. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  202. defer clus.Terminate(t)
  203. // create lease
  204. ctx0, cancel0 := context.WithCancel(context.Background())
  205. defer cancel0()
  206. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  207. ctx0,
  208. &pb.LeaseGrantRequest{TTL: 30})
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. if lresp.Error != "" {
  213. t.Fatal(lresp.Error)
  214. }
  215. if !leaseExist(t, clus, lresp.ID) {
  216. t.Error("unexpected lease not exists")
  217. }
  218. }
  219. // TestV3LeaseLeases creates leases and confirms list RPC fetches created ones.
  220. func TestV3LeaseLeases(t *testing.T) {
  221. defer testutil.AfterTest(t)
  222. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  223. defer clus.Terminate(t)
  224. ctx0, cancel0 := context.WithCancel(context.Background())
  225. defer cancel0()
  226. // create leases
  227. ids := []int64{}
  228. for i := 0; i < 5; i++ {
  229. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  230. ctx0,
  231. &pb.LeaseGrantRequest{TTL: 30})
  232. if err != nil {
  233. t.Fatal(err)
  234. }
  235. if lresp.Error != "" {
  236. t.Fatal(lresp.Error)
  237. }
  238. ids = append(ids, lresp.ID)
  239. }
  240. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseLeases(
  241. context.Background(),
  242. &pb.LeaseLeasesRequest{})
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. for i := range lresp.Leases {
  247. if lresp.Leases[i].ID != ids[i] {
  248. t.Fatalf("#%d: lease ID expected %d, got %d", i, ids[i], lresp.Leases[i].ID)
  249. }
  250. }
  251. }
  252. // TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
  253. // it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
  254. // related issue https://github.com/coreos/etcd/issues/6978
  255. func TestV3LeaseRenewStress(t *testing.T) {
  256. testLeaseStress(t, stressLeaseRenew)
  257. }
  258. // TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
  259. // it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
  260. // related issue https://github.com/coreos/etcd/issues/6978
  261. func TestV3LeaseTimeToLiveStress(t *testing.T) {
  262. testLeaseStress(t, stressLeaseTimeToLive)
  263. }
  264. func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
  265. defer testutil.AfterTest(t)
  266. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  267. defer clus.Terminate(t)
  268. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  269. defer cancel()
  270. errc := make(chan error)
  271. for i := 0; i < 30; i++ {
  272. for j := 0; j < 3; j++ {
  273. go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
  274. }
  275. }
  276. for i := 0; i < 90; i++ {
  277. if err := <-errc; err != nil {
  278. t.Fatal(err)
  279. }
  280. }
  281. }
  282. func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
  283. defer func() {
  284. if tctx.Err() != nil {
  285. reterr = nil
  286. }
  287. }()
  288. lac, err := lc.LeaseKeepAlive(tctx)
  289. if err != nil {
  290. return err
  291. }
  292. for tctx.Err() == nil {
  293. resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
  294. if gerr != nil {
  295. continue
  296. }
  297. err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
  298. if err != nil {
  299. continue
  300. }
  301. rresp, rxerr := lac.Recv()
  302. if rxerr != nil {
  303. continue
  304. }
  305. if rresp.TTL == 0 {
  306. return fmt.Errorf("TTL shouldn't be 0 so soon")
  307. }
  308. }
  309. return nil
  310. }
  311. func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
  312. defer func() {
  313. if tctx.Err() != nil {
  314. reterr = nil
  315. }
  316. }()
  317. for tctx.Err() == nil {
  318. resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
  319. if gerr != nil {
  320. continue
  321. }
  322. _, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
  323. if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
  324. return kerr
  325. }
  326. }
  327. return nil
  328. }
  329. func TestV3PutOnNonExistLease(t *testing.T) {
  330. defer testutil.AfterTest(t)
  331. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  332. defer clus.Terminate(t)
  333. ctx, cancel := context.WithCancel(context.Background())
  334. defer cancel()
  335. badLeaseID := int64(0x12345678)
  336. putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: badLeaseID}
  337. _, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
  338. if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseNotFound) {
  339. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCLeaseNotFound)
  340. }
  341. }
  342. // TestV3GetNonExistLease ensures client retrieving nonexistent lease on a follower doesn't result node panic
  343. // related issue https://github.com/coreos/etcd/issues/6537
  344. func TestV3GetNonExistLease(t *testing.T) {
  345. defer testutil.AfterTest(t)
  346. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  347. defer clus.Terminate(t)
  348. ctx, cancel := context.WithCancel(context.Background())
  349. defer cancel()
  350. lc := toGRPC(clus.RandClient()).Lease
  351. lresp, err := lc.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 10})
  352. if err != nil {
  353. t.Errorf("failed to create lease %v", err)
  354. }
  355. _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  356. if err != nil {
  357. t.Fatal(err)
  358. }
  359. leaseTTLr := &pb.LeaseTimeToLiveRequest{
  360. ID: lresp.ID,
  361. Keys: true,
  362. }
  363. for _, client := range clus.clients {
  364. // quorum-read to ensure revoke completes before TimeToLive
  365. if _, err := toGRPC(client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil {
  366. t.Fatal(err)
  367. }
  368. resp, err := toGRPC(client).Lease.LeaseTimeToLive(ctx, leaseTTLr)
  369. if err != nil {
  370. t.Fatalf("expected non nil error, but go %v", err)
  371. }
  372. if resp.TTL != -1 {
  373. t.Fatalf("expected TTL to be -1, but got %v", resp.TTL)
  374. }
  375. }
  376. }
  377. // TestV3LeaseSwitch tests a key can be switched from one lease to another.
  378. func TestV3LeaseSwitch(t *testing.T) {
  379. defer testutil.AfterTest(t)
  380. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  381. defer clus.Terminate(t)
  382. key := "foo"
  383. // create lease
  384. ctx, cancel := context.WithCancel(context.Background())
  385. defer cancel()
  386. lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
  387. if err1 != nil {
  388. t.Fatal(err1)
  389. }
  390. lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
  391. if err2 != nil {
  392. t.Fatal(err2)
  393. }
  394. // attach key on lease1 then switch it to lease2
  395. put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
  396. _, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1)
  397. if err != nil {
  398. t.Fatal(err)
  399. }
  400. put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
  401. _, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2)
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. // revoke lease1 should not remove key
  406. _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. rreq := &pb.RangeRequest{Key: []byte("foo")}
  411. rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. if len(rresp.Kvs) != 1 {
  416. t.Fatalf("unexpect removal of key")
  417. }
  418. // revoke lease2 should remove key
  419. _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. if len(rresp.Kvs) != 0 {
  428. t.Fatalf("lease removed but key remains")
  429. }
  430. }
  431. // TestV3LeaseFailover ensures the old leader drops lease keepalive requests within
  432. // election timeout after it loses its quorum. And the new leader extends the TTL of
  433. // the lease to at least TTL + election timeout.
  434. func TestV3LeaseFailover(t *testing.T) {
  435. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  436. defer clus.Terminate(t)
  437. toIsolate := clus.waitLeader(t, clus.Members)
  438. lc := toGRPC(clus.Client(toIsolate)).Lease
  439. // create lease
  440. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 5})
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. if lresp.Error != "" {
  445. t.Fatal(lresp.Error)
  446. }
  447. // isolate the current leader with its followers.
  448. clus.Members[toIsolate].Pause()
  449. lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
  450. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  451. mctx := metadata.NewOutgoingContext(context.Background(), md)
  452. ctx, cancel := context.WithCancel(mctx)
  453. defer cancel()
  454. lac, err := lc.LeaseKeepAlive(ctx)
  455. if err != nil {
  456. t.Fatal(err)
  457. }
  458. defer lac.CloseSend()
  459. // send keep alive to old leader until the old leader starts
  460. // to drop lease request.
  461. var expectedExp time.Time
  462. for {
  463. if err = lac.Send(lreq); err != nil {
  464. break
  465. }
  466. lkresp, rxerr := lac.Recv()
  467. if rxerr != nil {
  468. break
  469. }
  470. expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
  471. time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
  472. }
  473. clus.Members[toIsolate].Resume()
  474. clus.waitLeader(t, clus.Members)
  475. // lease should not expire at the last received expire deadline.
  476. time.Sleep(time.Until(expectedExp) - 500*time.Millisecond)
  477. if !leaseExist(t, clus, lresp.ID) {
  478. t.Error("unexpected lease not exists")
  479. }
  480. }
  481. // TestV3LeaseRequireLeader ensures that a Recv will get a leader
  482. // loss error if there is no leader.
  483. func TestV3LeaseRequireLeader(t *testing.T) {
  484. defer testutil.AfterTest(t)
  485. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  486. defer clus.Terminate(t)
  487. lc := toGRPC(clus.Client(0)).Lease
  488. clus.Members[1].Stop(t)
  489. clus.Members[2].Stop(t)
  490. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  491. mctx := metadata.NewOutgoingContext(context.Background(), md)
  492. ctx, cancel := context.WithCancel(mctx)
  493. defer cancel()
  494. lac, err := lc.LeaseKeepAlive(ctx)
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. donec := make(chan struct{})
  499. go func() {
  500. defer close(donec)
  501. resp, err := lac.Recv()
  502. if err == nil {
  503. t.Fatalf("got response %+v, expected error", resp)
  504. }
  505. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  506. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  507. }
  508. }()
  509. select {
  510. case <-time.After(5 * time.Second):
  511. t.Fatal("did not receive leader loss error (in 5-sec)")
  512. case <-donec:
  513. }
  514. }
  515. const fiveMinTTL int64 = 300
  516. // TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.
  517. func TestV3LeaseRecoverAndRevoke(t *testing.T) {
  518. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  519. defer clus.Terminate(t)
  520. kvc := toGRPC(clus.Client(0)).KV
  521. lsc := toGRPC(clus.Client(0)).Lease
  522. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. if lresp.Error != "" {
  527. t.Fatal(lresp.Error)
  528. }
  529. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. // restart server and ensure lease still exists
  534. clus.Members[0].Stop(t)
  535. clus.Members[0].Restart(t)
  536. clus.waitLeader(t, clus.Members)
  537. // overwrite old client with newly dialed connection
  538. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  539. nc, err := NewClientV3(clus.Members[0])
  540. if err != nil {
  541. t.Fatal(err)
  542. }
  543. kvc = toGRPC(nc).KV
  544. lsc = toGRPC(nc).Lease
  545. defer nc.Close()
  546. // revoke should delete the key
  547. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  548. if err != nil {
  549. t.Fatal(err)
  550. }
  551. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. if len(rresp.Kvs) != 0 {
  556. t.Fatalf("lease removed but key remains")
  557. }
  558. }
  559. // TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart.
  560. func TestV3LeaseRevokeAndRecover(t *testing.T) {
  561. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  562. defer clus.Terminate(t)
  563. kvc := toGRPC(clus.Client(0)).KV
  564. lsc := toGRPC(clus.Client(0)).Lease
  565. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. if lresp.Error != "" {
  570. t.Fatal(lresp.Error)
  571. }
  572. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  573. if err != nil {
  574. t.Fatal(err)
  575. }
  576. // revoke should delete the key
  577. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  578. if err != nil {
  579. t.Fatal(err)
  580. }
  581. // restart server and ensure revoked key doesn't exist
  582. clus.Members[0].Stop(t)
  583. clus.Members[0].Restart(t)
  584. clus.waitLeader(t, clus.Members)
  585. // overwrite old client with newly dialed connection
  586. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  587. nc, err := NewClientV3(clus.Members[0])
  588. if err != nil {
  589. t.Fatal(err)
  590. }
  591. kvc = toGRPC(nc).KV
  592. defer nc.Close()
  593. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  594. if err != nil {
  595. t.Fatal(err)
  596. }
  597. if len(rresp.Kvs) != 0 {
  598. t.Fatalf("lease removed but key remains")
  599. }
  600. }
  601. // TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart
  602. // does not delete the key.
  603. func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
  604. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  605. defer clus.Terminate(t)
  606. kvc := toGRPC(clus.Client(0)).KV
  607. lsc := toGRPC(clus.Client(0)).Lease
  608. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  609. if err != nil {
  610. t.Fatal(err)
  611. }
  612. if lresp.Error != "" {
  613. t.Fatal(lresp.Error)
  614. }
  615. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  616. if err != nil {
  617. t.Fatal(err)
  618. }
  619. // overwrite lease with none
  620. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
  621. if err != nil {
  622. t.Fatal(err)
  623. }
  624. // restart server and ensure lease still exists
  625. clus.Members[0].Stop(t)
  626. clus.Members[0].Restart(t)
  627. clus.waitLeader(t, clus.Members)
  628. // overwrite old client with newly dialed connection
  629. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  630. nc, err := NewClientV3(clus.Members[0])
  631. if err != nil {
  632. t.Fatal(err)
  633. }
  634. kvc = toGRPC(nc).KV
  635. lsc = toGRPC(nc).Lease
  636. defer nc.Close()
  637. // revoke the detached lease
  638. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  639. if err != nil {
  640. t.Fatal(err)
  641. }
  642. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  643. if err != nil {
  644. t.Fatal(err)
  645. }
  646. if len(rresp.Kvs) != 1 {
  647. t.Fatalf("only detached lease removed, key should remain")
  648. }
  649. }
  650. func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
  651. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  652. defer clus.Terminate(t)
  653. kvc := toGRPC(clus.Client(0)).KV
  654. lsc := toGRPC(clus.Client(0)).Lease
  655. var leaseIDs []int64
  656. for i := 0; i < 2; i++ {
  657. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  658. if err != nil {
  659. t.Fatal(err)
  660. }
  661. if lresp.Error != "" {
  662. t.Fatal(lresp.Error)
  663. }
  664. leaseIDs = append(leaseIDs, lresp.ID)
  665. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  666. if err != nil {
  667. t.Fatal(err)
  668. }
  669. }
  670. // restart server and ensure lease still exists
  671. clus.Members[0].Stop(t)
  672. clus.Members[0].Restart(t)
  673. clus.waitLeader(t, clus.Members)
  674. for i, leaseID := range leaseIDs {
  675. if !leaseExist(t, clus, leaseID) {
  676. t.Errorf("#%d: unexpected lease not exists", i)
  677. }
  678. }
  679. // overwrite old client with newly dialed connection
  680. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  681. nc, err := NewClientV3(clus.Members[0])
  682. if err != nil {
  683. t.Fatal(err)
  684. }
  685. kvc = toGRPC(nc).KV
  686. lsc = toGRPC(nc).Lease
  687. defer nc.Close()
  688. // revoke the old lease
  689. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[0]})
  690. if err != nil {
  691. t.Fatal(err)
  692. }
  693. // key should still exist
  694. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  695. if err != nil {
  696. t.Fatal(err)
  697. }
  698. if len(rresp.Kvs) != 1 {
  699. t.Fatalf("only detached lease removed, key should remain")
  700. }
  701. // revoke the latest lease
  702. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[1]})
  703. if err != nil {
  704. t.Fatal(err)
  705. }
  706. rresp, err = kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  707. if err != nil {
  708. t.Fatal(err)
  709. }
  710. if len(rresp.Kvs) != 0 {
  711. t.Fatalf("lease removed but key remains")
  712. }
  713. }
  714. // acquireLeaseAndKey creates a new lease and creates an attached key.
  715. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
  716. // create lease
  717. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  718. context.TODO(),
  719. &pb.LeaseGrantRequest{TTL: 1})
  720. if err != nil {
  721. return 0, err
  722. }
  723. if lresp.Error != "" {
  724. return 0, fmt.Errorf(lresp.Error)
  725. }
  726. // attach to key
  727. put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
  728. if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil {
  729. return 0, err
  730. }
  731. return lresp.ID, nil
  732. }
  733. // testLeaseRemoveLeasedKey performs some action while holding a lease with an
  734. // attached key "foo", then confirms the key is gone.
  735. func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
  736. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  737. defer clus.Terminate(t)
  738. leaseID, err := acquireLeaseAndKey(clus, "foo")
  739. if err != nil {
  740. t.Fatal(err)
  741. }
  742. if err = act(clus, leaseID); err != nil {
  743. t.Fatal(err)
  744. }
  745. // confirm no key
  746. rreq := &pb.RangeRequest{Key: []byte("foo")}
  747. rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  748. if err != nil {
  749. t.Fatal(err)
  750. }
  751. if len(rresp.Kvs) != 0 {
  752. t.Fatalf("lease removed but key remains")
  753. }
  754. }
  755. func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
  756. l := toGRPC(clus.RandClient()).Lease
  757. _, err := l.LeaseGrant(context.Background(), &pb.LeaseGrantRequest{ID: leaseID, TTL: 5})
  758. if err == nil {
  759. _, err = l.LeaseRevoke(context.Background(), &pb.LeaseRevokeRequest{ID: leaseID})
  760. if err != nil {
  761. t.Fatalf("failed to check lease %v", err)
  762. }
  763. return false
  764. }
  765. if eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
  766. return true
  767. }
  768. t.Fatalf("unexpecter error %v", err)
  769. return true
  770. }