lease_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "reflect"
  17. "sort"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/clientv3"
  22. "github.com/coreos/etcd/clientv3/concurrency"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  24. "github.com/coreos/etcd/integration"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc"
  28. )
  29. func TestLeaseNotFoundError(t *testing.T) {
  30. defer testutil.AfterTest(t)
  31. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  32. defer clus.Terminate(t)
  33. kv := clus.RandClient()
  34. _, err := kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(500)))
  35. if err != rpctypes.ErrLeaseNotFound {
  36. t.Fatalf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
  37. }
  38. }
  39. func TestLeaseGrant(t *testing.T) {
  40. defer testutil.AfterTest(t)
  41. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  42. defer clus.Terminate(t)
  43. lapi := clus.RandClient()
  44. kv := clus.RandClient()
  45. resp, err := lapi.Grant(context.Background(), 10)
  46. if err != nil {
  47. t.Errorf("failed to create lease %v", err)
  48. }
  49. _, err = kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
  50. if err != nil {
  51. t.Fatalf("failed to create key with lease %v", err)
  52. }
  53. }
  54. func TestLeaseRevoke(t *testing.T) {
  55. defer testutil.AfterTest(t)
  56. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  57. defer clus.Terminate(t)
  58. lapi := clus.RandClient()
  59. kv := clus.RandClient()
  60. resp, err := lapi.Grant(context.Background(), 10)
  61. if err != nil {
  62. t.Errorf("failed to create lease %v", err)
  63. }
  64. _, err = lapi.Revoke(context.Background(), clientv3.LeaseID(resp.ID))
  65. if err != nil {
  66. t.Errorf("failed to revoke lease %v", err)
  67. }
  68. _, err = kv.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
  69. if err != rpctypes.ErrLeaseNotFound {
  70. t.Fatalf("err = %v, want %v", err, rpctypes.ErrLeaseNotFound)
  71. }
  72. }
  73. func TestLeaseKeepAliveOnce(t *testing.T) {
  74. defer testutil.AfterTest(t)
  75. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  76. defer clus.Terminate(t)
  77. lapi := clus.RandClient()
  78. resp, err := lapi.Grant(context.Background(), 10)
  79. if err != nil {
  80. t.Errorf("failed to create lease %v", err)
  81. }
  82. _, err = lapi.KeepAliveOnce(context.Background(), resp.ID)
  83. if err != nil {
  84. t.Errorf("failed to keepalive lease %v", err)
  85. }
  86. _, err = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0))
  87. if err != rpctypes.ErrLeaseNotFound {
  88. t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
  89. }
  90. }
  91. func TestLeaseKeepAlive(t *testing.T) {
  92. defer testutil.AfterTest(t)
  93. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  94. defer clus.Terminate(t)
  95. lapi := clus.Client(0)
  96. clus.TakeClient(0)
  97. resp, err := lapi.Grant(context.Background(), 10)
  98. if err != nil {
  99. t.Errorf("failed to create lease %v", err)
  100. }
  101. rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
  102. if kerr != nil {
  103. t.Errorf("failed to keepalive lease %v", kerr)
  104. }
  105. kresp, ok := <-rc
  106. if !ok {
  107. t.Errorf("chan is closed, want not closed")
  108. }
  109. if kresp.ID != resp.ID {
  110. t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
  111. }
  112. lapi.Close()
  113. _, ok = <-rc
  114. if ok {
  115. t.Errorf("chan is not closed, want lease Close() closes chan")
  116. }
  117. }
  118. // TODO: add a client that can connect to all the members of cluster via unix sock.
  119. // TODO: test handle more complicated failures.
  120. func TestLeaseKeepAliveHandleFailure(t *testing.T) {
  121. t.Skip("test it when we have a cluster client")
  122. defer testutil.AfterTest(t)
  123. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  124. defer clus.Terminate(t)
  125. // TODO: change this line to get a cluster client
  126. lapi := clus.RandClient()
  127. resp, err := lapi.Grant(context.Background(), 10)
  128. if err != nil {
  129. t.Errorf("failed to create lease %v", err)
  130. }
  131. rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
  132. if kerr != nil {
  133. t.Errorf("failed to keepalive lease %v", kerr)
  134. }
  135. kresp := <-rc
  136. if kresp.ID != resp.ID {
  137. t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
  138. }
  139. // restart the connected member.
  140. clus.Members[0].Stop(t)
  141. select {
  142. case <-rc:
  143. t.Fatalf("unexpected keepalive")
  144. case <-time.After(10*time.Second/3 + 1):
  145. }
  146. // recover the member.
  147. clus.Members[0].Restart(t)
  148. kresp = <-rc
  149. if kresp.ID != resp.ID {
  150. t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
  151. }
  152. lapi.Close()
  153. _, ok := <-rc
  154. if ok {
  155. t.Errorf("chan is not closed, want lease Close() closes chan")
  156. }
  157. }
  158. type leaseCh struct {
  159. lid clientv3.LeaseID
  160. ch <-chan *clientv3.LeaseKeepAliveResponse
  161. }
  162. // TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives
  163. func TestLeaseKeepAliveNotFound(t *testing.T) {
  164. defer testutil.AfterTest(t)
  165. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  166. defer clus.Terminate(t)
  167. cli := clus.RandClient()
  168. lchs := []leaseCh{}
  169. for i := 0; i < 3; i++ {
  170. resp, rerr := cli.Grant(context.TODO(), 5)
  171. if rerr != nil {
  172. t.Fatal(rerr)
  173. }
  174. kach, kaerr := cli.KeepAlive(context.Background(), resp.ID)
  175. if kaerr != nil {
  176. t.Fatal(kaerr)
  177. }
  178. lchs = append(lchs, leaseCh{resp.ID, kach})
  179. }
  180. if _, err := cli.Revoke(context.TODO(), lchs[1].lid); err != nil {
  181. t.Fatal(err)
  182. }
  183. <-lchs[0].ch
  184. if _, ok := <-lchs[0].ch; !ok {
  185. t.Fatalf("closed keepalive on wrong lease")
  186. }
  187. timec := time.After(5 * time.Second)
  188. for range lchs[1].ch {
  189. select {
  190. case <-timec:
  191. t.Fatalf("revoke did not close keep alive")
  192. default:
  193. }
  194. }
  195. }
  196. func TestLeaseGrantErrConnClosed(t *testing.T) {
  197. defer testutil.AfterTest(t)
  198. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  199. defer clus.Terminate(t)
  200. cli := clus.Client(0)
  201. clus.TakeClient(0)
  202. donec := make(chan struct{})
  203. go func() {
  204. defer close(donec)
  205. _, err := cli.Grant(context.TODO(), 5)
  206. if err != nil && err != grpc.ErrClientConnClosing {
  207. t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
  208. }
  209. }()
  210. if err := cli.Close(); err != nil {
  211. t.Fatal(err)
  212. }
  213. select {
  214. case <-time.After(3 * time.Second):
  215. t.Fatal("le.Grant took too long")
  216. case <-donec:
  217. }
  218. }
  219. func TestLeaseGrantNewAfterClose(t *testing.T) {
  220. defer testutil.AfterTest(t)
  221. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  222. defer clus.Terminate(t)
  223. cli := clus.Client(0)
  224. clus.TakeClient(0)
  225. if err := cli.Close(); err != nil {
  226. t.Fatal(err)
  227. }
  228. donec := make(chan struct{})
  229. go func() {
  230. if _, err := cli.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing {
  231. t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
  232. }
  233. close(donec)
  234. }()
  235. select {
  236. case <-time.After(3 * time.Second):
  237. t.Fatal("le.Grant took too long")
  238. case <-donec:
  239. }
  240. }
  241. func TestLeaseRevokeNewAfterClose(t *testing.T) {
  242. defer testutil.AfterTest(t)
  243. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  244. defer clus.Terminate(t)
  245. cli := clus.Client(0)
  246. resp, err := cli.Grant(context.TODO(), 5)
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. leaseID := resp.ID
  251. clus.TakeClient(0)
  252. if err := cli.Close(); err != nil {
  253. t.Fatal(err)
  254. }
  255. donec := make(chan struct{})
  256. go func() {
  257. if _, err := cli.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing {
  258. t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
  259. }
  260. close(donec)
  261. }()
  262. select {
  263. case <-time.After(3 * time.Second):
  264. t.Fatal("le.Revoke took too long")
  265. case <-donec:
  266. }
  267. }
  268. // TestLeaseKeepAliveCloseAfterDisconnectExpire ensures the keep alive channel is closed
  269. // following a disconnection, lease revoke, then reconnect.
  270. func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
  271. defer testutil.AfterTest(t)
  272. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  273. defer clus.Terminate(t)
  274. cli := clus.Client(0)
  275. // setup lease and do a keepalive
  276. resp, err := cli.Grant(context.Background(), 10)
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
  281. if kerr != nil {
  282. t.Fatal(kerr)
  283. }
  284. kresp := <-rc
  285. if kresp.ID != resp.ID {
  286. t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
  287. }
  288. // keep client disconnected
  289. clus.Members[0].Stop(t)
  290. time.Sleep(time.Second)
  291. clus.WaitLeader(t)
  292. if _, err := clus.Client(1).Revoke(context.TODO(), resp.ID); err != nil {
  293. t.Fatal(err)
  294. }
  295. clus.Members[0].Restart(t)
  296. // some keep-alives may still be buffered; drain until close
  297. timer := time.After(time.Duration(kresp.TTL) * time.Second)
  298. for kresp != nil {
  299. select {
  300. case kresp = <-rc:
  301. case <-timer:
  302. t.Fatalf("keepalive channel did not close")
  303. }
  304. }
  305. }
  306. // TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
  307. // the initial keep alive request never gets a response.
  308. func TestLeaseKeepAliveInitTimeout(t *testing.T) {
  309. defer testutil.AfterTest(t)
  310. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  311. defer clus.Terminate(t)
  312. cli := clus.Client(0)
  313. // setup lease and do a keepalive
  314. resp, err := cli.Grant(context.Background(), 5)
  315. if err != nil {
  316. t.Fatal(err)
  317. }
  318. // keep client disconnected
  319. clus.Members[0].Stop(t)
  320. rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
  321. if kerr != nil {
  322. t.Fatal(kerr)
  323. }
  324. select {
  325. case ka, ok := <-rc:
  326. if ok {
  327. t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
  328. }
  329. case <-time.After(10 * time.Second):
  330. t.Fatalf("keepalive channel did not close")
  331. }
  332. clus.Members[0].Restart(t)
  333. }
  334. // TestLeaseKeepAliveInitTimeout ensures the keep alive channel closes if
  335. // a keep alive request after the first never gets a response.
  336. func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
  337. defer testutil.AfterTest(t)
  338. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  339. defer clus.Terminate(t)
  340. cli := clus.Client(0)
  341. // setup lease and do a keepalive
  342. resp, err := cli.Grant(context.Background(), 5)
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
  347. if kerr != nil {
  348. t.Fatal(kerr)
  349. }
  350. if kresp := <-rc; kresp.ID != resp.ID {
  351. t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
  352. }
  353. // keep client disconnected
  354. clus.Members[0].Stop(t)
  355. select {
  356. case ka, ok := <-rc:
  357. if ok {
  358. t.Fatalf("unexpected keepalive %v, expected closed channel", ka)
  359. }
  360. case <-time.After(10 * time.Second):
  361. t.Fatalf("keepalive channel did not close")
  362. }
  363. clus.Members[0].Restart(t)
  364. }
  365. func TestLeaseTimeToLive(t *testing.T) {
  366. defer testutil.AfterTest(t)
  367. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  368. defer clus.Terminate(t)
  369. lapi := clus.RandClient()
  370. resp, err := lapi.Grant(context.Background(), 10)
  371. if err != nil {
  372. t.Errorf("failed to create lease %v", err)
  373. }
  374. kv := clus.RandClient()
  375. keys := []string{"foo1", "foo2"}
  376. for i := range keys {
  377. if _, err = kv.Put(context.TODO(), keys[i], "bar", clientv3.WithLease(resp.ID)); err != nil {
  378. t.Fatal(err)
  379. }
  380. }
  381. lresp, lerr := lapi.TimeToLive(context.Background(), resp.ID, clientv3.WithAttachedKeys())
  382. if lerr != nil {
  383. t.Fatal(lerr)
  384. }
  385. if lresp.ID != resp.ID {
  386. t.Fatalf("leaseID expected %d, got %d", resp.ID, lresp.ID)
  387. }
  388. if lresp.GrantedTTL != int64(10) {
  389. t.Fatalf("GrantedTTL expected %d, got %d", 10, lresp.GrantedTTL)
  390. }
  391. if lresp.TTL == 0 || lresp.TTL > lresp.GrantedTTL {
  392. t.Fatalf("unexpected TTL %d (granted %d)", lresp.TTL, lresp.GrantedTTL)
  393. }
  394. ks := make([]string, len(lresp.Keys))
  395. for i := range lresp.Keys {
  396. ks[i] = string(lresp.Keys[i])
  397. }
  398. sort.Strings(ks)
  399. if !reflect.DeepEqual(ks, keys) {
  400. t.Fatalf("keys expected %v, got %v", keys, ks)
  401. }
  402. lresp, lerr = lapi.TimeToLive(context.Background(), resp.ID)
  403. if lerr != nil {
  404. t.Fatal(lerr)
  405. }
  406. if len(lresp.Keys) != 0 {
  407. t.Fatalf("unexpected keys %+v", lresp.Keys)
  408. }
  409. }
  410. func TestLeaseTimeToLiveLeaseNotFound(t *testing.T) {
  411. defer testutil.AfterTest(t)
  412. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  413. defer clus.Terminate(t)
  414. cli := clus.RandClient()
  415. resp, err := cli.Grant(context.Background(), 10)
  416. if err != nil {
  417. t.Errorf("failed to create lease %v", err)
  418. }
  419. _, err = cli.Revoke(context.Background(), resp.ID)
  420. if err != nil {
  421. t.Errorf("failed to Revoke lease %v", err)
  422. }
  423. lresp, err := cli.TimeToLive(context.Background(), resp.ID)
  424. // TimeToLive() doesn't return LeaseNotFound error
  425. // but return a response with TTL to be -1
  426. if err != nil {
  427. t.Fatalf("expected err to be nil")
  428. }
  429. if lresp == nil {
  430. t.Fatalf("expected lresp not to be nil")
  431. }
  432. if lresp.ResponseHeader == nil {
  433. t.Fatalf("expected ResponseHeader not to be nil")
  434. }
  435. if lresp.ID != resp.ID {
  436. t.Fatalf("expected Lease ID %v, but got %v", resp.ID, lresp.ID)
  437. }
  438. if lresp.TTL != -1 {
  439. t.Fatalf("expected TTL %v, but got %v", lresp.TTL, lresp.TTL)
  440. }
  441. }
  442. // TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
  443. // for a while.
  444. func TestLeaseRenewLostQuorum(t *testing.T) {
  445. defer testutil.AfterTest(t)
  446. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
  447. defer clus.Terminate(t)
  448. cli := clus.Client(0)
  449. r, err := cli.Grant(context.TODO(), 4)
  450. if err != nil {
  451. t.Fatal(err)
  452. }
  453. kctx, kcancel := context.WithCancel(context.Background())
  454. defer kcancel()
  455. ka, err := cli.KeepAlive(kctx, r.ID)
  456. if err != nil {
  457. t.Fatal(err)
  458. }
  459. // consume first keepalive so next message sends when cluster is down
  460. <-ka
  461. // force keepalive stream message to timeout
  462. clus.Members[1].Stop(t)
  463. clus.Members[2].Stop(t)
  464. // Use TTL-1 since the client closes the keepalive channel if no
  465. // keepalive arrives before the lease deadline.
  466. // The cluster has 1 second to recover and reply to the keepalive.
  467. time.Sleep(time.Duration(r.TTL-1) * time.Second)
  468. clus.Members[1].Restart(t)
  469. clus.Members[2].Restart(t)
  470. select {
  471. case _, ok := <-ka:
  472. if !ok {
  473. t.Fatalf("keepalive closed")
  474. }
  475. case <-time.After(time.Duration(r.TTL) * time.Second):
  476. t.Fatalf("timed out waiting for keepalive")
  477. }
  478. }
  479. func TestLeaseKeepAliveLoopExit(t *testing.T) {
  480. defer testutil.AfterTest(t)
  481. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
  482. defer clus.Terminate(t)
  483. ctx := context.Background()
  484. cli := clus.Client(0)
  485. clus.TakeClient(0)
  486. resp, err := cli.Grant(ctx, 5)
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. cli.Close()
  491. _, err = cli.KeepAlive(ctx, resp.ID)
  492. if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
  493. t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
  494. }
  495. }
  496. // TestV3LeaseFailureOverlap issues Grant and Keepalive requests to a cluster
  497. // before, during, and after quorum loss to confirm Grant/Keepalive tolerates
  498. // transient cluster failure.
  499. func TestV3LeaseFailureOverlap(t *testing.T) {
  500. clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
  501. defer clus.Terminate(t)
  502. numReqs := 5
  503. cli := clus.Client(0)
  504. // bring up a session, tear it down
  505. updown := func(i int) error {
  506. sess, err := concurrency.NewSession(cli)
  507. if err != nil {
  508. return err
  509. }
  510. ch := make(chan struct{})
  511. go func() {
  512. defer close(ch)
  513. sess.Close()
  514. }()
  515. select {
  516. case <-ch:
  517. case <-time.After(time.Minute / 4):
  518. t.Fatalf("timeout %d", i)
  519. }
  520. return nil
  521. }
  522. var wg sync.WaitGroup
  523. mkReqs := func(n int) {
  524. wg.Add(numReqs)
  525. for i := 0; i < numReqs; i++ {
  526. go func() {
  527. defer wg.Done()
  528. err := updown(n)
  529. if err == nil || err == rpctypes.ErrTimeoutDueToConnectionLost {
  530. return
  531. }
  532. t.Fatal(err)
  533. }()
  534. }
  535. }
  536. mkReqs(1)
  537. clus.Members[1].Stop(t)
  538. mkReqs(2)
  539. time.Sleep(time.Second)
  540. mkReqs(3)
  541. clus.Members[1].Restart(t)
  542. mkReqs(4)
  543. wg.Wait()
  544. }