v3_lease_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "fmt"
  18. "testing"
  19. "time"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  22. "github.com/coreos/etcd/mvcc/mvccpb"
  23. "github.com/coreos/etcd/pkg/testutil"
  24. "google.golang.org/grpc/metadata"
  25. )
  26. // TestV3LeasePrmote ensures the newly elected leader can promote itself
  27. // to the primary lessor, refresh the leases and start to manage leases.
  28. // TODO: use customized clock to make this test go faster?
  29. func TestV3LeasePrmote(t *testing.T) {
  30. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  31. defer clus.Terminate(t)
  32. // create lease
  33. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 3})
  34. ttl := time.Duration(lresp.TTL) * time.Second
  35. afterGrant := time.Now()
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. if lresp.Error != "" {
  40. t.Fatal(lresp.Error)
  41. }
  42. // wait until the lease is going to expire.
  43. time.Sleep(time.Until(afterGrant.Add(ttl - time.Second)))
  44. // kill the current leader, all leases should be refreshed.
  45. toStop := clus.waitLeader(t, clus.Members)
  46. beforeStop := time.Now()
  47. clus.Members[toStop].Stop(t)
  48. var toWait []*member
  49. for i, m := range clus.Members {
  50. if i != toStop {
  51. toWait = append(toWait, m)
  52. }
  53. }
  54. clus.waitLeader(t, toWait)
  55. clus.Members[toStop].Restart(t)
  56. clus.waitLeader(t, clus.Members)
  57. afterReelect := time.Now()
  58. // ensure lease is refreshed by waiting for a "long" time.
  59. // it was going to expire anyway.
  60. time.Sleep(time.Until(beforeStop.Add(ttl - time.Second)))
  61. if !leaseExist(t, clus, lresp.ID) {
  62. t.Error("unexpected lease not exists")
  63. }
  64. // wait until the renewed lease is expected to expire.
  65. time.Sleep(time.Until(afterReelect.Add(ttl)))
  66. // wait for up to 10 seconds for lease to expire.
  67. expiredCondition := func() (bool, error) {
  68. return !leaseExist(t, clus, lresp.ID), nil
  69. }
  70. expired, err := testutil.Poll(100*time.Millisecond, 10*time.Second, expiredCondition)
  71. if err != nil {
  72. t.Error(err)
  73. }
  74. if !expired {
  75. t.Error("unexpected lease exists")
  76. }
  77. }
  78. // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
  79. func TestV3LeaseRevoke(t *testing.T) {
  80. defer testutil.AfterTest(t)
  81. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  82. lc := toGRPC(clus.RandClient()).Lease
  83. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  84. return err
  85. })
  86. }
  87. // TestV3LeaseGrantById ensures leases may be created by a given id.
  88. func TestV3LeaseGrantByID(t *testing.T) {
  89. defer testutil.AfterTest(t)
  90. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  91. defer clus.Terminate(t)
  92. // create fixed lease
  93. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  94. context.TODO(),
  95. &pb.LeaseGrantRequest{ID: 1, TTL: 1})
  96. if err != nil {
  97. t.Errorf("could not create lease 1 (%v)", err)
  98. }
  99. if lresp.ID != 1 {
  100. t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
  101. }
  102. // create duplicate fixed lease
  103. _, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
  104. context.TODO(),
  105. &pb.LeaseGrantRequest{ID: 1, TTL: 1})
  106. if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
  107. t.Error(err)
  108. }
  109. // create fresh fixed lease
  110. lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
  111. context.TODO(),
  112. &pb.LeaseGrantRequest{ID: 2, TTL: 1})
  113. if err != nil {
  114. t.Errorf("could not create lease 2 (%v)", err)
  115. }
  116. if lresp.ID != 2 {
  117. t.Errorf("got id %v, wanted id %v", lresp.ID, 2)
  118. }
  119. }
  120. // TestV3LeaseExpire ensures a key is deleted once a key expires.
  121. func TestV3LeaseExpire(t *testing.T) {
  122. defer testutil.AfterTest(t)
  123. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  124. // let lease lapse; wait for deleted key
  125. ctx, cancel := context.WithCancel(context.Background())
  126. defer cancel()
  127. wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx)
  128. if err != nil {
  129. return err
  130. }
  131. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  132. CreateRequest: &pb.WatchCreateRequest{
  133. Key: []byte("foo"), StartRevision: 1}}}
  134. if err := wStream.Send(wreq); err != nil {
  135. return err
  136. }
  137. if _, err := wStream.Recv(); err != nil {
  138. // the 'created' message
  139. return err
  140. }
  141. if _, err := wStream.Recv(); err != nil {
  142. // the 'put' message
  143. return err
  144. }
  145. errc := make(chan error, 1)
  146. go func() {
  147. resp, err := wStream.Recv()
  148. switch {
  149. case err != nil:
  150. errc <- err
  151. case len(resp.Events) != 1:
  152. fallthrough
  153. case resp.Events[0].Type != mvccpb.DELETE:
  154. errc <- fmt.Errorf("expected key delete, got %v", resp)
  155. default:
  156. errc <- nil
  157. }
  158. }()
  159. select {
  160. case <-time.After(15 * time.Second):
  161. return fmt.Errorf("lease expiration too slow")
  162. case err := <-errc:
  163. return err
  164. }
  165. })
  166. }
  167. // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
  168. func TestV3LeaseKeepAlive(t *testing.T) {
  169. defer testutil.AfterTest(t)
  170. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  171. lc := toGRPC(clus.RandClient()).Lease
  172. lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
  173. ctx, cancel := context.WithCancel(context.Background())
  174. defer cancel()
  175. lac, err := lc.LeaseKeepAlive(ctx)
  176. if err != nil {
  177. return err
  178. }
  179. defer lac.CloseSend()
  180. // renew long enough so lease would've expired otherwise
  181. for i := 0; i < 3; i++ {
  182. if err = lac.Send(lreq); err != nil {
  183. return err
  184. }
  185. lresp, rxerr := lac.Recv()
  186. if rxerr != nil {
  187. return rxerr
  188. }
  189. if lresp.ID != leaseID {
  190. return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
  191. }
  192. time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
  193. }
  194. _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  195. return err
  196. })
  197. }
  198. // TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster.
  199. func TestV3LeaseExists(t *testing.T) {
  200. defer testutil.AfterTest(t)
  201. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  202. defer clus.Terminate(t)
  203. // create lease
  204. ctx0, cancel0 := context.WithCancel(context.Background())
  205. defer cancel0()
  206. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  207. ctx0,
  208. &pb.LeaseGrantRequest{TTL: 30})
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. if lresp.Error != "" {
  213. t.Fatal(lresp.Error)
  214. }
  215. if !leaseExist(t, clus, lresp.ID) {
  216. t.Error("unexpected lease not exists")
  217. }
  218. }
  219. // TestV3LeaseLeases creates leases and confirms list RPC fetches created ones.
  220. func TestV3LeaseLeases(t *testing.T) {
  221. defer testutil.AfterTest(t)
  222. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  223. defer clus.Terminate(t)
  224. ctx0, cancel0 := context.WithCancel(context.Background())
  225. defer cancel0()
  226. // create leases
  227. ids := []int64{}
  228. for i := 0; i < 5; i++ {
  229. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  230. ctx0,
  231. &pb.LeaseGrantRequest{TTL: 30})
  232. if err != nil {
  233. t.Fatal(err)
  234. }
  235. if lresp.Error != "" {
  236. t.Fatal(lresp.Error)
  237. }
  238. ids = append(ids, lresp.ID)
  239. }
  240. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseLeases(
  241. context.Background(),
  242. &pb.LeaseLeasesRequest{})
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. for i := range lresp.Leases {
  247. if lresp.Leases[i].ID != ids[i] {
  248. t.Fatalf("#%d: lease ID expected %d, got %d", i, ids[i], lresp.Leases[i].ID)
  249. }
  250. }
  251. }
  252. // TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
  253. // it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
  254. // related issue https://github.com/coreos/etcd/issues/6978
  255. func TestV3LeaseRenewStress(t *testing.T) {
  256. testLeaseStress(t, stressLeaseRenew)
  257. }
  258. // TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
  259. // it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
  260. // related issue https://github.com/coreos/etcd/issues/6978
  261. func TestV3LeaseTimeToLiveStress(t *testing.T) {
  262. testLeaseStress(t, stressLeaseTimeToLive)
  263. }
  264. func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
  265. defer testutil.AfterTest(t)
  266. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  267. defer clus.Terminate(t)
  268. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  269. defer cancel()
  270. errc := make(chan error)
  271. for i := 0; i < 30; i++ {
  272. for j := 0; j < 3; j++ {
  273. go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
  274. }
  275. }
  276. for i := 0; i < 90; i++ {
  277. if err := <-errc; err != nil {
  278. t.Fatal(err)
  279. }
  280. }
  281. }
  282. func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
  283. defer func() {
  284. if tctx.Err() != nil {
  285. reterr = nil
  286. }
  287. }()
  288. lac, err := lc.LeaseKeepAlive(tctx)
  289. if err != nil {
  290. return err
  291. }
  292. for tctx.Err() == nil {
  293. resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
  294. if gerr != nil {
  295. continue
  296. }
  297. err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
  298. if err != nil {
  299. continue
  300. }
  301. rresp, rxerr := lac.Recv()
  302. if rxerr != nil {
  303. continue
  304. }
  305. if rresp.TTL == 0 {
  306. return fmt.Errorf("TTL shouldn't be 0 so soon")
  307. }
  308. }
  309. return nil
  310. }
  311. func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
  312. defer func() {
  313. if tctx.Err() != nil {
  314. reterr = nil
  315. }
  316. }()
  317. for tctx.Err() == nil {
  318. resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
  319. if gerr != nil {
  320. continue
  321. }
  322. _, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
  323. if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
  324. return kerr
  325. }
  326. }
  327. return nil
  328. }
  329. func TestV3PutOnNonExistLease(t *testing.T) {
  330. defer testutil.AfterTest(t)
  331. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  332. defer clus.Terminate(t)
  333. ctx, cancel := context.WithCancel(context.Background())
  334. defer cancel()
  335. badLeaseID := int64(0x12345678)
  336. putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: badLeaseID}
  337. _, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
  338. if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseNotFound) {
  339. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCLeaseNotFound)
  340. }
  341. }
  342. // TestV3GetNonExistLease ensures client retrieving nonexistent lease on a follower doesn't result node panic
  343. // related issue https://github.com/coreos/etcd/issues/6537
  344. func TestV3GetNonExistLease(t *testing.T) {
  345. defer testutil.AfterTest(t)
  346. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  347. defer clus.Terminate(t)
  348. ctx, cancel := context.WithCancel(context.Background())
  349. defer cancel()
  350. lc := toGRPC(clus.RandClient()).Lease
  351. lresp, err := lc.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 10})
  352. if err != nil {
  353. t.Errorf("failed to create lease %v", err)
  354. }
  355. _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  356. if err != nil {
  357. t.Fatal(err)
  358. }
  359. leaseTTLr := &pb.LeaseTimeToLiveRequest{
  360. ID: lresp.ID,
  361. Keys: true,
  362. }
  363. for _, client := range clus.clients {
  364. // quorum-read to ensure revoke completes before TimeToLive
  365. if _, err := toGRPC(client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil {
  366. t.Fatal(err)
  367. }
  368. resp, err := toGRPC(client).Lease.LeaseTimeToLive(ctx, leaseTTLr)
  369. if err != nil {
  370. t.Fatalf("expected non nil error, but go %v", err)
  371. }
  372. if resp.TTL != -1 {
  373. t.Fatalf("expected TTL to be -1, but got %v", resp.TTL)
  374. }
  375. }
  376. }
  377. // TestV3LeaseSwitch tests a key can be switched from one lease to another.
  378. func TestV3LeaseSwitch(t *testing.T) {
  379. defer testutil.AfterTest(t)
  380. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  381. defer clus.Terminate(t)
  382. key := "foo"
  383. // create lease
  384. ctx, cancel := context.WithCancel(context.Background())
  385. defer cancel()
  386. lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
  387. if err1 != nil {
  388. t.Fatal(err1)
  389. }
  390. lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
  391. if err2 != nil {
  392. t.Fatal(err2)
  393. }
  394. // attach key on lease1 then switch it to lease2
  395. put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
  396. _, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1)
  397. if err != nil {
  398. t.Fatal(err)
  399. }
  400. put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
  401. _, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2)
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. // revoke lease1 should not remove key
  406. _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. rreq := &pb.RangeRequest{Key: []byte("foo")}
  411. rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. if len(rresp.Kvs) != 1 {
  416. t.Fatalf("unexpect removal of key")
  417. }
  418. // revoke lease2 should remove key
  419. _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. if len(rresp.Kvs) != 0 {
  428. t.Fatalf("lease removed but key remains")
  429. }
  430. }
  431. // TestV3LeaseFailover ensures the old leader drops lease keepalive requests within
  432. // election timeout after it loses its quorum. And the new leader extends the TTL of
  433. // the lease to at least TTL + election timeout.
  434. func TestV3LeaseFailover(t *testing.T) {
  435. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  436. defer clus.Terminate(t)
  437. toIsolate := clus.waitLeader(t, clus.Members)
  438. lc := toGRPC(clus.Client(toIsolate)).Lease
  439. // create lease
  440. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 5})
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. if lresp.Error != "" {
  445. t.Fatal(lresp.Error)
  446. }
  447. // isolate the current leader with its followers.
  448. clus.Members[toIsolate].Pause()
  449. lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
  450. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  451. mctx := metadata.NewOutgoingContext(context.Background(), md)
  452. ctx, cancel := context.WithCancel(mctx)
  453. lac, err := lc.LeaseKeepAlive(ctx)
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. defer func() {
  458. lac.CloseSend()
  459. cancel()
  460. }()
  461. // send keep alive to old leader until the old leader starts
  462. // to drop lease request.
  463. var expectedExp time.Time
  464. for {
  465. if err = lac.Send(lreq); err != nil {
  466. break
  467. }
  468. lkresp, rxerr := lac.Recv()
  469. if rxerr != nil {
  470. break
  471. }
  472. expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
  473. time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
  474. }
  475. clus.Members[toIsolate].Resume()
  476. clus.waitLeader(t, clus.Members)
  477. // lease should not expire at the last received expire deadline.
  478. time.Sleep(time.Until(expectedExp) - 500*time.Millisecond)
  479. if !leaseExist(t, clus, lresp.ID) {
  480. t.Error("unexpected lease not exists")
  481. }
  482. }
  483. // TestV3LeaseRequireLeader ensures that a Recv will get a leader
  484. // loss error if there is no leader.
  485. func TestV3LeaseRequireLeader(t *testing.T) {
  486. defer testutil.AfterTest(t)
  487. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  488. defer clus.Terminate(t)
  489. lc := toGRPC(clus.Client(0)).Lease
  490. clus.Members[1].Stop(t)
  491. clus.Members[2].Stop(t)
  492. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  493. mctx := metadata.NewOutgoingContext(context.Background(), md)
  494. ctx, cancel := context.WithCancel(mctx)
  495. defer cancel()
  496. lac, err := lc.LeaseKeepAlive(ctx)
  497. if err != nil {
  498. t.Fatal(err)
  499. }
  500. donec := make(chan struct{})
  501. go func() {
  502. defer close(donec)
  503. resp, err := lac.Recv()
  504. if err == nil {
  505. t.Fatalf("got response %+v, expected error", resp)
  506. }
  507. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  508. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  509. }
  510. }()
  511. select {
  512. case <-time.After(5 * time.Second):
  513. t.Fatal("did not receive leader loss error (in 5-sec)")
  514. case <-donec:
  515. }
  516. }
  517. const fiveMinTTL int64 = 300
  518. // TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.
  519. func TestV3LeaseRecoverAndRevoke(t *testing.T) {
  520. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  521. defer clus.Terminate(t)
  522. kvc := toGRPC(clus.Client(0)).KV
  523. lsc := toGRPC(clus.Client(0)).Lease
  524. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  525. if err != nil {
  526. t.Fatal(err)
  527. }
  528. if lresp.Error != "" {
  529. t.Fatal(lresp.Error)
  530. }
  531. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. // restart server and ensure lease still exists
  536. clus.Members[0].Stop(t)
  537. clus.Members[0].Restart(t)
  538. clus.waitLeader(t, clus.Members)
  539. // overwrite old client with newly dialed connection
  540. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  541. nc, err := NewClientV3(clus.Members[0])
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. kvc = toGRPC(nc).KV
  546. lsc = toGRPC(nc).Lease
  547. defer nc.Close()
  548. // revoke should delete the key
  549. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  554. if err != nil {
  555. t.Fatal(err)
  556. }
  557. if len(rresp.Kvs) != 0 {
  558. t.Fatalf("lease removed but key remains")
  559. }
  560. }
  561. // TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart.
  562. func TestV3LeaseRevokeAndRecover(t *testing.T) {
  563. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  564. defer clus.Terminate(t)
  565. kvc := toGRPC(clus.Client(0)).KV
  566. lsc := toGRPC(clus.Client(0)).Lease
  567. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  568. if err != nil {
  569. t.Fatal(err)
  570. }
  571. if lresp.Error != "" {
  572. t.Fatal(lresp.Error)
  573. }
  574. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  575. if err != nil {
  576. t.Fatal(err)
  577. }
  578. // revoke should delete the key
  579. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  580. if err != nil {
  581. t.Fatal(err)
  582. }
  583. // restart server and ensure revoked key doesn't exist
  584. clus.Members[0].Stop(t)
  585. clus.Members[0].Restart(t)
  586. clus.waitLeader(t, clus.Members)
  587. // overwrite old client with newly dialed connection
  588. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  589. nc, err := NewClientV3(clus.Members[0])
  590. if err != nil {
  591. t.Fatal(err)
  592. }
  593. kvc = toGRPC(nc).KV
  594. defer nc.Close()
  595. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  596. if err != nil {
  597. t.Fatal(err)
  598. }
  599. if len(rresp.Kvs) != 0 {
  600. t.Fatalf("lease removed but key remains")
  601. }
  602. }
  603. // TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart
  604. // does not delete the key.
  605. func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
  606. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  607. defer clus.Terminate(t)
  608. kvc := toGRPC(clus.Client(0)).KV
  609. lsc := toGRPC(clus.Client(0)).Lease
  610. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. if lresp.Error != "" {
  615. t.Fatal(lresp.Error)
  616. }
  617. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  618. if err != nil {
  619. t.Fatal(err)
  620. }
  621. // overwrite lease with none
  622. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
  623. if err != nil {
  624. t.Fatal(err)
  625. }
  626. // restart server and ensure lease still exists
  627. clus.Members[0].Stop(t)
  628. clus.Members[0].Restart(t)
  629. clus.waitLeader(t, clus.Members)
  630. // overwrite old client with newly dialed connection
  631. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  632. nc, err := NewClientV3(clus.Members[0])
  633. if err != nil {
  634. t.Fatal(err)
  635. }
  636. kvc = toGRPC(nc).KV
  637. lsc = toGRPC(nc).Lease
  638. defer nc.Close()
  639. // revoke the detached lease
  640. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  641. if err != nil {
  642. t.Fatal(err)
  643. }
  644. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  645. if err != nil {
  646. t.Fatal(err)
  647. }
  648. if len(rresp.Kvs) != 1 {
  649. t.Fatalf("only detached lease removed, key should remain")
  650. }
  651. }
  652. func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
  653. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  654. defer clus.Terminate(t)
  655. kvc := toGRPC(clus.Client(0)).KV
  656. lsc := toGRPC(clus.Client(0)).Lease
  657. var leaseIDs []int64
  658. for i := 0; i < 2; i++ {
  659. lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
  660. if err != nil {
  661. t.Fatal(err)
  662. }
  663. if lresp.Error != "" {
  664. t.Fatal(lresp.Error)
  665. }
  666. leaseIDs = append(leaseIDs, lresp.ID)
  667. _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
  668. if err != nil {
  669. t.Fatal(err)
  670. }
  671. }
  672. // restart server and ensure lease still exists
  673. clus.Members[0].Stop(t)
  674. clus.Members[0].Restart(t)
  675. clus.waitLeader(t, clus.Members)
  676. for i, leaseID := range leaseIDs {
  677. if !leaseExist(t, clus, leaseID) {
  678. t.Errorf("#%d: unexpected lease not exists", i)
  679. }
  680. }
  681. // overwrite old client with newly dialed connection
  682. // otherwise, error with "grpc: RPC failed fast due to transport failure"
  683. nc, err := NewClientV3(clus.Members[0])
  684. if err != nil {
  685. t.Fatal(err)
  686. }
  687. kvc = toGRPC(nc).KV
  688. lsc = toGRPC(nc).Lease
  689. defer nc.Close()
  690. // revoke the old lease
  691. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[0]})
  692. if err != nil {
  693. t.Fatal(err)
  694. }
  695. // key should still exist
  696. rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  697. if err != nil {
  698. t.Fatal(err)
  699. }
  700. if len(rresp.Kvs) != 1 {
  701. t.Fatalf("only detached lease removed, key should remain")
  702. }
  703. // revoke the latest lease
  704. _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[1]})
  705. if err != nil {
  706. t.Fatal(err)
  707. }
  708. rresp, err = kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. if len(rresp.Kvs) != 0 {
  713. t.Fatalf("lease removed but key remains")
  714. }
  715. }
  716. // acquireLeaseAndKey creates a new lease and creates an attached key.
  717. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
  718. // create lease
  719. lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
  720. context.TODO(),
  721. &pb.LeaseGrantRequest{TTL: 1})
  722. if err != nil {
  723. return 0, err
  724. }
  725. if lresp.Error != "" {
  726. return 0, fmt.Errorf(lresp.Error)
  727. }
  728. // attach to key
  729. put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
  730. if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil {
  731. return 0, err
  732. }
  733. return lresp.ID, nil
  734. }
  735. // testLeaseRemoveLeasedKey performs some action while holding a lease with an
  736. // attached key "foo", then confirms the key is gone.
  737. func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
  738. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  739. defer clus.Terminate(t)
  740. leaseID, err := acquireLeaseAndKey(clus, "foo")
  741. if err != nil {
  742. t.Fatal(err)
  743. }
  744. if err = act(clus, leaseID); err != nil {
  745. t.Fatal(err)
  746. }
  747. // confirm no key
  748. rreq := &pb.RangeRequest{Key: []byte("foo")}
  749. rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
  750. if err != nil {
  751. t.Fatal(err)
  752. }
  753. if len(rresp.Kvs) != 0 {
  754. t.Fatalf("lease removed but key remains")
  755. }
  756. }
  757. func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
  758. l := toGRPC(clus.RandClient()).Lease
  759. _, err := l.LeaseGrant(context.Background(), &pb.LeaseGrantRequest{ID: leaseID, TTL: 5})
  760. if err == nil {
  761. _, err = l.LeaseRevoke(context.Background(), &pb.LeaseRevokeRequest{ID: leaseID})
  762. if err != nil {
  763. t.Fatalf("failed to check lease %v", err)
  764. }
  765. return false
  766. }
  767. if eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
  768. return true
  769. }
  770. t.Fatalf("unexpecter error %v", err)
  771. return true
  772. }