v3_grpc_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/clientv3"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. )
  30. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  31. // overwrites it, then checks that the change was applied.
  32. func TestV3PutOverwrite(t *testing.T) {
  33. defer testutil.AfterTest(t)
  34. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  35. defer clus.Terminate(t)
  36. kvc := toGRPC(clus.RandClient()).KV
  37. key := []byte("foo")
  38. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  39. respput, err := kvc.Put(context.TODO(), reqput)
  40. if err != nil {
  41. t.Fatalf("couldn't put key (%v)", err)
  42. }
  43. // overwrite
  44. reqput.Value = []byte("baz")
  45. respput2, err := kvc.Put(context.TODO(), reqput)
  46. if err != nil {
  47. t.Fatalf("couldn't put key (%v)", err)
  48. }
  49. if respput2.Header.Revision <= respput.Header.Revision {
  50. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  51. respput2.Header.Revision, respput.Header.Revision)
  52. }
  53. reqrange := &pb.RangeRequest{Key: key}
  54. resprange, err := kvc.Range(context.TODO(), reqrange)
  55. if err != nil {
  56. t.Fatalf("couldn't get key (%v)", err)
  57. }
  58. if len(resprange.Kvs) != 1 {
  59. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  60. }
  61. kv := resprange.Kvs[0]
  62. if kv.ModRevision <= kv.CreateRevision {
  63. t.Errorf("expected modRev > createRev, got %d <= %d",
  64. kv.ModRevision, kv.CreateRevision)
  65. }
  66. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  67. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  68. }
  69. }
  70. // TestPutRestart checks if a put after an unrelated member restart succeeds
  71. func TestV3PutRestart(t *testing.T) {
  72. defer testutil.AfterTest(t)
  73. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  74. defer clus.Terminate(t)
  75. kvIdx := rand.Intn(3)
  76. kvc := toGRPC(clus.Client(kvIdx)).KV
  77. stopIdx := kvIdx
  78. for stopIdx == kvIdx {
  79. stopIdx = rand.Intn(3)
  80. }
  81. clus.clients[stopIdx].Close()
  82. clus.Members[stopIdx].Stop(t)
  83. clus.Members[stopIdx].Restart(t)
  84. c, cerr := NewClientV3(clus.Members[stopIdx])
  85. if cerr != nil {
  86. t.Fatalf("cannot create client: %v", cerr)
  87. }
  88. clus.clients[stopIdx] = c
  89. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  90. defer cancel()
  91. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  92. _, err := kvc.Put(ctx, reqput)
  93. if err != nil && err == ctx.Err() {
  94. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  95. }
  96. }
  97. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  98. func TestV3CompactCurrentRev(t *testing.T) {
  99. defer testutil.AfterTest(t)
  100. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  101. defer clus.Terminate(t)
  102. kvc := toGRPC(clus.RandClient()).KV
  103. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  104. for i := 0; i < 3; i++ {
  105. if _, err := kvc.Put(context.Background(), preq); err != nil {
  106. t.Fatalf("couldn't put key (%v)", err)
  107. }
  108. }
  109. // compact on current revision
  110. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  111. if err != nil {
  112. t.Fatalf("couldn't compact kv space (%v)", err)
  113. }
  114. // key still exists?
  115. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  116. if err != nil {
  117. t.Fatalf("couldn't get key after compaction (%v)", err)
  118. }
  119. }
  120. func TestV3TxnTooManyOps(t *testing.T) {
  121. defer testutil.AfterTest(t)
  122. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  123. defer clus.Terminate(t)
  124. kvc := toGRPC(clus.RandClient()).KV
  125. // unique keys
  126. i := new(int)
  127. keyf := func() []byte {
  128. *i++
  129. return []byte(fmt.Sprintf("key-%d", i))
  130. }
  131. addCompareOps := func(txn *pb.TxnRequest) {
  132. txn.Compare = append(txn.Compare,
  133. &pb.Compare{
  134. Result: pb.Compare_GREATER,
  135. Target: pb.Compare_CREATE,
  136. Key: keyf(),
  137. })
  138. }
  139. addSuccessOps := func(txn *pb.TxnRequest) {
  140. txn.Success = append(txn.Success,
  141. &pb.RequestOp{
  142. Request: &pb.RequestOp_RequestPut{
  143. RequestPut: &pb.PutRequest{
  144. Key: keyf(),
  145. Value: []byte("bar"),
  146. },
  147. },
  148. })
  149. }
  150. addFailureOps := func(txn *pb.TxnRequest) {
  151. txn.Failure = append(txn.Failure,
  152. &pb.RequestOp{
  153. Request: &pb.RequestOp_RequestPut{
  154. RequestPut: &pb.PutRequest{
  155. Key: keyf(),
  156. Value: []byte("bar"),
  157. },
  158. },
  159. })
  160. }
  161. tests := []func(txn *pb.TxnRequest){
  162. addCompareOps,
  163. addSuccessOps,
  164. addFailureOps,
  165. }
  166. for i, tt := range tests {
  167. txn := &pb.TxnRequest{}
  168. for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
  169. tt(txn)
  170. }
  171. _, err := kvc.Txn(context.Background(), txn)
  172. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  173. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  174. }
  175. }
  176. }
  177. func TestV3TxnDuplicateKeys(t *testing.T) {
  178. defer testutil.AfterTest(t)
  179. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  180. defer clus.Terminate(t)
  181. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  182. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  183. RequestDeleteRange: &pb.DeleteRangeRequest{
  184. Key: []byte("abc"),
  185. },
  186. },
  187. }
  188. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  189. RequestDeleteRange: &pb.DeleteRangeRequest{
  190. Key: []byte("a"), RangeEnd: []byte("b"),
  191. },
  192. },
  193. }
  194. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  195. RequestDeleteRange: &pb.DeleteRangeRequest{
  196. Key: []byte("abb"), RangeEnd: []byte("abc"),
  197. },
  198. },
  199. }
  200. kvc := toGRPC(clus.RandClient()).KV
  201. tests := []struct {
  202. txnSuccess []*pb.RequestOp
  203. werr error
  204. }{
  205. {
  206. txnSuccess: []*pb.RequestOp{putreq, putreq},
  207. werr: rpctypes.ErrGRPCDuplicateKey,
  208. },
  209. {
  210. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  211. werr: rpctypes.ErrGRPCDuplicateKey,
  212. },
  213. {
  214. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  215. werr: rpctypes.ErrGRPCDuplicateKey,
  216. },
  217. {
  218. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  219. werr: nil,
  220. },
  221. {
  222. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  223. werr: nil,
  224. },
  225. }
  226. for i, tt := range tests {
  227. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  228. _, err := kvc.Txn(context.Background(), txn)
  229. if !eqErrGRPC(err, tt.werr) {
  230. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  231. }
  232. }
  233. }
  234. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  235. func TestV3TxnRevision(t *testing.T) {
  236. defer testutil.AfterTest(t)
  237. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  238. defer clus.Terminate(t)
  239. kvc := toGRPC(clus.RandClient()).KV
  240. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  241. presp, err := kvc.Put(context.TODO(), pr)
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  246. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  247. tresp, err := kvc.Txn(context.TODO(), txn)
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. // did not update revision
  252. if presp.Header.Revision != tresp.Header.Revision {
  253. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  254. }
  255. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  256. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  257. tresp, err = kvc.Txn(context.TODO(), txn)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. // did not update revision
  262. if presp.Header.Revision != tresp.Header.Revision {
  263. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  264. }
  265. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  266. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  267. tresp, err = kvc.Txn(context.TODO(), txn)
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. // updated revision
  272. if tresp.Header.Revision != presp.Header.Revision+1 {
  273. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  274. }
  275. }
  276. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  277. func TestV3PutMissingLease(t *testing.T) {
  278. defer testutil.AfterTest(t)
  279. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  280. defer clus.Terminate(t)
  281. kvc := toGRPC(clus.RandClient()).KV
  282. key := []byte("foo")
  283. preq := &pb.PutRequest{Key: key, Lease: 123456}
  284. tests := []func(){
  285. // put case
  286. func() {
  287. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  288. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  289. }
  290. },
  291. // txn success case
  292. func() {
  293. txn := &pb.TxnRequest{}
  294. txn.Success = append(txn.Success, &pb.RequestOp{
  295. Request: &pb.RequestOp_RequestPut{
  296. RequestPut: preq}})
  297. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  298. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  299. }
  300. },
  301. // txn failure case
  302. func() {
  303. txn := &pb.TxnRequest{}
  304. txn.Failure = append(txn.Failure, &pb.RequestOp{
  305. Request: &pb.RequestOp_RequestPut{
  306. RequestPut: preq}})
  307. cmp := &pb.Compare{
  308. Result: pb.Compare_GREATER,
  309. Target: pb.Compare_CREATE,
  310. Key: []byte("bar"),
  311. }
  312. txn.Compare = append(txn.Compare, cmp)
  313. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  314. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  315. }
  316. },
  317. // ignore bad lease in failure on success txn
  318. func() {
  319. txn := &pb.TxnRequest{}
  320. rreq := &pb.RangeRequest{Key: []byte("bar")}
  321. txn.Success = append(txn.Success, &pb.RequestOp{
  322. Request: &pb.RequestOp_RequestRange{
  323. RequestRange: rreq}})
  324. txn.Failure = append(txn.Failure, &pb.RequestOp{
  325. Request: &pb.RequestOp_RequestPut{
  326. RequestPut: preq}})
  327. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  328. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  329. }
  330. },
  331. }
  332. for i, f := range tests {
  333. f()
  334. // key shouldn't have been stored
  335. rreq := &pb.RangeRequest{Key: key}
  336. rresp, err := kvc.Range(context.TODO(), rreq)
  337. if err != nil {
  338. t.Errorf("#%d. could not rangereq (%v)", i, err)
  339. } else if len(rresp.Kvs) != 0 {
  340. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  341. }
  342. }
  343. }
  344. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  345. func TestV3DeleteRange(t *testing.T) {
  346. defer testutil.AfterTest(t)
  347. tests := []struct {
  348. keySet []string
  349. begin string
  350. end string
  351. prevKV bool
  352. wantSet [][]byte
  353. deleted int64
  354. }{
  355. // delete middle
  356. {
  357. []string{"foo", "foo/abc", "fop"},
  358. "foo/", "fop", false,
  359. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  360. },
  361. // no delete
  362. {
  363. []string{"foo", "foo/abc", "fop"},
  364. "foo/", "foo/", false,
  365. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  366. },
  367. // delete first
  368. {
  369. []string{"foo", "foo/abc", "fop"},
  370. "fo", "fop", false,
  371. [][]byte{[]byte("fop")}, 2,
  372. },
  373. // delete tail
  374. {
  375. []string{"foo", "foo/abc", "fop"},
  376. "foo/", "fos", false,
  377. [][]byte{[]byte("foo")}, 2,
  378. },
  379. // delete exact
  380. {
  381. []string{"foo", "foo/abc", "fop"},
  382. "foo/abc", "", false,
  383. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  384. },
  385. // delete none, [x,x)
  386. {
  387. []string{"foo"},
  388. "foo", "foo", false,
  389. [][]byte{[]byte("foo")}, 0,
  390. },
  391. // delete middle with preserveKVs set
  392. {
  393. []string{"foo", "foo/abc", "fop"},
  394. "foo/", "fop", true,
  395. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  396. },
  397. }
  398. for i, tt := range tests {
  399. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  400. kvc := toGRPC(clus.RandClient()).KV
  401. ks := tt.keySet
  402. for j := range ks {
  403. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  404. _, err := kvc.Put(context.TODO(), reqput)
  405. if err != nil {
  406. t.Fatalf("couldn't put key (%v)", err)
  407. }
  408. }
  409. dreq := &pb.DeleteRangeRequest{
  410. Key: []byte(tt.begin),
  411. RangeEnd: []byte(tt.end),
  412. PrevKv: tt.prevKV,
  413. }
  414. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  415. if err != nil {
  416. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  417. }
  418. if tt.deleted != dresp.Deleted {
  419. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  420. }
  421. if tt.prevKV {
  422. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  423. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  424. }
  425. }
  426. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  427. rresp, err := kvc.Range(context.TODO(), rreq)
  428. if err != nil {
  429. t.Errorf("couldn't get range on test %v (%v)", i, err)
  430. }
  431. if dresp.Header.Revision != rresp.Header.Revision {
  432. t.Errorf("expected revision %v, got %v",
  433. dresp.Header.Revision, rresp.Header.Revision)
  434. }
  435. keys := [][]byte{}
  436. for j := range rresp.Kvs {
  437. keys = append(keys, rresp.Kvs[j].Key)
  438. }
  439. if !reflect.DeepEqual(tt.wantSet, keys) {
  440. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  441. }
  442. // can't defer because tcp ports will be in use
  443. clus.Terminate(t)
  444. }
  445. }
  446. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  447. func TestV3TxnInvalidRange(t *testing.T) {
  448. defer testutil.AfterTest(t)
  449. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  450. defer clus.Terminate(t)
  451. kvc := toGRPC(clus.RandClient()).KV
  452. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  453. for i := 0; i < 3; i++ {
  454. _, err := kvc.Put(context.Background(), preq)
  455. if err != nil {
  456. t.Fatalf("couldn't put key (%v)", err)
  457. }
  458. }
  459. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  460. if err != nil {
  461. t.Fatalf("couldn't compact kv space (%v)", err)
  462. }
  463. // future rev
  464. txn := &pb.TxnRequest{}
  465. txn.Success = append(txn.Success, &pb.RequestOp{
  466. Request: &pb.RequestOp_RequestPut{
  467. RequestPut: preq}})
  468. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  469. txn.Success = append(txn.Success, &pb.RequestOp{
  470. Request: &pb.RequestOp_RequestRange{
  471. RequestRange: rreq}})
  472. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  473. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  474. }
  475. // compacted rev
  476. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  477. tv.RequestRange.Revision = 1
  478. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  479. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  480. }
  481. }
  482. func TestV3TooLargeRequest(t *testing.T) {
  483. defer testutil.AfterTest(t)
  484. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  485. defer clus.Terminate(t)
  486. kvc := toGRPC(clus.RandClient()).KV
  487. // 2MB request value
  488. largeV := make([]byte, 2*1024*1024)
  489. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  490. _, err := kvc.Put(context.Background(), preq)
  491. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  492. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  493. }
  494. }
  495. // TestV3Hash tests hash.
  496. func TestV3Hash(t *testing.T) {
  497. defer testutil.AfterTest(t)
  498. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  499. defer clus.Terminate(t)
  500. cli := clus.RandClient()
  501. kvc := toGRPC(cli).KV
  502. m := toGRPC(cli).Maintenance
  503. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  504. for i := 0; i < 3; i++ {
  505. _, err := kvc.Put(context.Background(), preq)
  506. if err != nil {
  507. t.Fatalf("couldn't put key (%v)", err)
  508. }
  509. }
  510. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  511. if err != nil || resp.Hash == 0 {
  512. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  513. }
  514. }
  515. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  516. func TestV3StorageQuotaAPI(t *testing.T) {
  517. defer testutil.AfterTest(t)
  518. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  519. clus.Members[0].QuotaBackendBytes = 64 * 1024
  520. clus.Members[0].Stop(t)
  521. clus.Members[0].Restart(t)
  522. defer clus.Terminate(t)
  523. kvc := toGRPC(clus.Client(0)).KV
  524. key := []byte("abc")
  525. // test small put that fits in quota
  526. smallbuf := make([]byte, 512)
  527. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false)); err != nil {
  528. t.Fatal(err)
  529. }
  530. // test big put
  531. bigbuf := make([]byte, 64*1024)
  532. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  533. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  534. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  535. }
  536. // test big txn
  537. puttxn := &pb.RequestOp{
  538. Request: &pb.RequestOp_RequestPut{
  539. RequestPut: &pb.PutRequest{
  540. Key: key,
  541. Value: bigbuf,
  542. },
  543. },
  544. }
  545. txnreq := &pb.TxnRequest{}
  546. txnreq.Success = append(txnreq.Success, puttxn)
  547. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  548. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  549. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  550. }
  551. }
  552. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  553. func TestV3StorageQuotaApply(t *testing.T) {
  554. testutil.AfterTest(t)
  555. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  556. defer clus.Terminate(t)
  557. kvc0 := toGRPC(clus.Client(0)).KV
  558. kvc1 := toGRPC(clus.Client(1)).KV
  559. // force a node to have a different quota
  560. clus.Members[0].QuotaBackendBytes = 64 * 1024
  561. clus.Members[0].Stop(t)
  562. clus.Members[0].Restart(t)
  563. clus.waitLeader(t, clus.Members)
  564. key := []byte("abc")
  565. // test small put still works
  566. smallbuf := make([]byte, 1024)
  567. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false))
  568. if serr != nil {
  569. t.Fatal(serr)
  570. }
  571. // test big put
  572. bigbuf := make([]byte, 64*1024)
  573. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. // quorum get should work regardless of whether alarm is raised
  578. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. // wait until alarm is raised for sure-- poll the alarms
  583. stopc := time.After(5 * time.Second)
  584. for {
  585. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  586. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  587. if aerr != nil {
  588. t.Fatal(aerr)
  589. }
  590. if len(resp.Alarms) != 0 {
  591. break
  592. }
  593. select {
  594. case <-stopc:
  595. t.Fatalf("timed out waiting for alarm")
  596. case <-time.After(10 * time.Millisecond):
  597. }
  598. }
  599. // small quota machine should reject put
  600. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  601. t.Fatalf("past-quota instance should reject put")
  602. }
  603. // large quota machine should reject put
  604. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  605. t.Fatalf("past-quota instance should reject put")
  606. }
  607. // reset large quota node to ensure alarm persisted
  608. clus.Members[1].Stop(t)
  609. clus.Members[1].Restart(t)
  610. clus.waitLeader(t, clus.Members)
  611. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  612. t.Fatalf("alarmed instance should reject put after reset")
  613. }
  614. }
  615. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  616. func TestV3AlarmDeactivate(t *testing.T) {
  617. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  618. defer clus.Terminate(t)
  619. kvc := toGRPC(clus.RandClient()).KV
  620. mt := toGRPC(clus.RandClient()).Maintenance
  621. alarmReq := &pb.AlarmRequest{
  622. MemberID: 123,
  623. Action: pb.AlarmRequest_ACTIVATE,
  624. Alarm: pb.AlarmType_NOSPACE,
  625. }
  626. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  627. t.Fatal(err)
  628. }
  629. key := []byte("abc")
  630. smallbuf := make([]byte, 512)
  631. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  632. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  633. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  634. }
  635. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  636. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  637. t.Fatal(err)
  638. }
  639. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  640. t.Fatal(err)
  641. }
  642. }
  643. func TestV3RangeRequest(t *testing.T) {
  644. defer testutil.AfterTest(t)
  645. tests := []struct {
  646. putKeys []string
  647. reqs []pb.RangeRequest
  648. wresps [][]string
  649. wmores []bool
  650. }{
  651. // single key
  652. {
  653. []string{"foo", "bar"},
  654. []pb.RangeRequest{
  655. // exists
  656. {Key: []byte("foo")},
  657. // doesn't exist
  658. {Key: []byte("baz")},
  659. },
  660. [][]string{
  661. {"foo"},
  662. {},
  663. },
  664. []bool{false, false},
  665. },
  666. // multi-key
  667. {
  668. []string{"a", "b", "c", "d", "e"},
  669. []pb.RangeRequest{
  670. // all in range
  671. {Key: []byte("a"), RangeEnd: []byte("z")},
  672. // [b, d)
  673. {Key: []byte("b"), RangeEnd: []byte("d")},
  674. // out of range
  675. {Key: []byte("f"), RangeEnd: []byte("z")},
  676. // [c,c) = empty
  677. {Key: []byte("c"), RangeEnd: []byte("c")},
  678. // [d, b) = empty
  679. {Key: []byte("d"), RangeEnd: []byte("b")},
  680. // ["\0", "\0") => all in range
  681. {Key: []byte{0}, RangeEnd: []byte{0}},
  682. },
  683. [][]string{
  684. {"a", "b", "c", "d", "e"},
  685. {"b", "c"},
  686. {},
  687. {},
  688. {},
  689. {"a", "b", "c", "d", "e"},
  690. },
  691. []bool{false, false, false, false, false, false},
  692. },
  693. // revision
  694. {
  695. []string{"a", "b", "c", "d", "e"},
  696. []pb.RangeRequest{
  697. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  698. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  699. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  700. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  701. },
  702. [][]string{
  703. {"a", "b", "c", "d", "e"},
  704. {},
  705. {"a"},
  706. {"a", "b"},
  707. },
  708. []bool{false, false, false, false},
  709. },
  710. // limit
  711. {
  712. []string{"foo", "bar"},
  713. []pb.RangeRequest{
  714. // more
  715. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  716. // no more
  717. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  718. },
  719. [][]string{
  720. {"bar"},
  721. {"bar", "foo"},
  722. },
  723. []bool{true, false},
  724. },
  725. // sort
  726. {
  727. []string{"b", "a", "c", "d", "c"},
  728. []pb.RangeRequest{
  729. {
  730. Key: []byte("a"), RangeEnd: []byte("z"),
  731. Limit: 1,
  732. SortOrder: pb.RangeRequest_ASCEND,
  733. SortTarget: pb.RangeRequest_KEY,
  734. },
  735. {
  736. Key: []byte("a"), RangeEnd: []byte("z"),
  737. Limit: 1,
  738. SortOrder: pb.RangeRequest_DESCEND,
  739. SortTarget: pb.RangeRequest_KEY,
  740. },
  741. {
  742. Key: []byte("a"), RangeEnd: []byte("z"),
  743. Limit: 1,
  744. SortOrder: pb.RangeRequest_ASCEND,
  745. SortTarget: pb.RangeRequest_CREATE,
  746. },
  747. {
  748. Key: []byte("a"), RangeEnd: []byte("z"),
  749. Limit: 1,
  750. SortOrder: pb.RangeRequest_DESCEND,
  751. SortTarget: pb.RangeRequest_MOD,
  752. },
  753. {
  754. Key: []byte("z"), RangeEnd: []byte("z"),
  755. Limit: 1,
  756. SortOrder: pb.RangeRequest_DESCEND,
  757. SortTarget: pb.RangeRequest_CREATE,
  758. },
  759. },
  760. [][]string{
  761. {"a"},
  762. {"d"},
  763. {"b"},
  764. {"c"},
  765. {},
  766. },
  767. []bool{true, true, true, true, false},
  768. },
  769. }
  770. for i, tt := range tests {
  771. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  772. for _, k := range tt.putKeys {
  773. kvc := toGRPC(clus.RandClient()).KV
  774. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  775. if _, err := kvc.Put(context.TODO(), req); err != nil {
  776. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  777. }
  778. }
  779. for j, req := range tt.reqs {
  780. kvc := toGRPC(clus.RandClient()).KV
  781. resp, err := kvc.Range(context.TODO(), &req)
  782. if err != nil {
  783. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  784. continue
  785. }
  786. if len(resp.Kvs) != len(tt.wresps[j]) {
  787. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  788. continue
  789. }
  790. for k, wKey := range tt.wresps[j] {
  791. respKey := string(resp.Kvs[k].Key)
  792. if respKey != wKey {
  793. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  794. }
  795. }
  796. if resp.More != tt.wmores[j] {
  797. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  798. }
  799. wrev := int64(len(tt.putKeys) + 1)
  800. if resp.Header.Revision != wrev {
  801. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  802. }
  803. }
  804. clus.Terminate(t)
  805. }
  806. }
  807. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  808. cfg.UseGRPC = true
  809. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  810. clus.Launch(t)
  811. return clus
  812. }
  813. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  814. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  815. defer testutil.AfterTest(t)
  816. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  817. clus := newClusterV3NoClients(t, &cfg)
  818. defer clus.Terminate(t)
  819. // nil out TLS field so client will use an insecure connection
  820. clus.Members[0].ClientTLSInfo = nil
  821. client, err := NewClientV3(clus.Members[0])
  822. if err != nil && err != grpc.ErrClientConnTimeout {
  823. t.Fatalf("unexpected error (%v)", err)
  824. } else if client == nil {
  825. // Ideally, no client would be returned. However, grpc will
  826. // return a connection without trying to handshake first so
  827. // the connection appears OK.
  828. return
  829. }
  830. defer client.Close()
  831. donec := make(chan error, 1)
  832. go func() {
  833. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  834. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  835. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  836. cancel()
  837. donec <- perr
  838. }()
  839. if perr := <-donec; perr == nil {
  840. t.Fatalf("expected client error on put")
  841. }
  842. }
  843. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  844. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  845. defer testutil.AfterTest(t)
  846. cfg := ClusterConfig{Size: 3}
  847. clus := newClusterV3NoClients(t, &cfg)
  848. defer clus.Terminate(t)
  849. clus.Members[0].ClientTLSInfo = &testTLSInfo
  850. client, err := NewClientV3(clus.Members[0])
  851. if client != nil || err == nil {
  852. t.Fatalf("expected no client")
  853. } else if err != clientv3.ErrNoAvailableEndpoints {
  854. t.Fatalf("unexpected error (%v)", err)
  855. }
  856. }
  857. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  858. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  859. defer testutil.AfterTest(t)
  860. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  861. clus := newClusterV3NoClients(t, &cfg)
  862. defer clus.Terminate(t)
  863. client, err := NewClientV3(clus.Members[0])
  864. if err != nil {
  865. t.Fatalf("expected tls client (%v)", err)
  866. }
  867. defer client.Close()
  868. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  869. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  870. t.Fatalf("unexpected error on put over tls (%v)", err)
  871. }
  872. }
  873. func TestGRPCRequireLeader(t *testing.T) {
  874. defer testutil.AfterTest(t)
  875. cfg := ClusterConfig{Size: 3}
  876. clus := newClusterV3NoClients(t, &cfg)
  877. defer clus.Terminate(t)
  878. clus.Members[1].Stop(t)
  879. clus.Members[2].Stop(t)
  880. client, err := NewClientV3(clus.Members[0])
  881. if err != nil {
  882. t.Fatalf("cannot create client: %v", err)
  883. }
  884. defer client.Close()
  885. // wait for election timeout, then member[0] will not have a leader.
  886. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  887. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  888. ctx := metadata.NewContext(context.Background(), md)
  889. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  890. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  891. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  892. }
  893. }
  894. func TestGRPCStreamRequireLeader(t *testing.T) {
  895. defer testutil.AfterTest(t)
  896. cfg := ClusterConfig{Size: 3}
  897. clus := newClusterV3NoClients(t, &cfg)
  898. defer clus.Terminate(t)
  899. client, err := NewClientV3(clus.Members[0])
  900. if err != nil {
  901. t.Fatalf("failed to create client (%v)", err)
  902. }
  903. defer client.Close()
  904. wAPI := toGRPC(client).Watch
  905. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  906. ctx := metadata.NewContext(context.Background(), md)
  907. wStream, err := wAPI.Watch(ctx)
  908. if err != nil {
  909. t.Fatalf("wAPI.Watch error: %v", err)
  910. }
  911. clus.Members[1].Stop(t)
  912. clus.Members[2].Stop(t)
  913. // existing stream should be rejected
  914. _, err = wStream.Recv()
  915. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  916. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  917. }
  918. // new stream should also be rejected
  919. wStream, err = wAPI.Watch(ctx)
  920. if err != nil {
  921. t.Fatalf("wAPI.Watch error: %v", err)
  922. }
  923. _, err = wStream.Recv()
  924. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  925. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  926. }
  927. clus.Members[1].Restart(t)
  928. clus.Members[2].Restart(t)
  929. clus.waitLeader(t, clus.Members)
  930. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  931. // new stream should also be OK now after we restarted the other members
  932. wStream, err = wAPI.Watch(ctx)
  933. if err != nil {
  934. t.Fatalf("wAPI.Watch error: %v", err)
  935. }
  936. wreq := &pb.WatchRequest{
  937. RequestUnion: &pb.WatchRequest_CreateRequest{
  938. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  939. },
  940. }
  941. err = wStream.Send(wreq)
  942. if err != nil {
  943. t.Errorf("err = %v, want nil", err)
  944. }
  945. }
  946. func eqErrGRPC(err1 error, err2 error) bool {
  947. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  948. }