v3_grpc_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "os"
  19. "reflect"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. )
  30. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  31. // overwrites it, then checks that the change was applied.
  32. func TestV3PutOverwrite(t *testing.T) {
  33. defer testutil.AfterTest(t)
  34. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  35. defer clus.Terminate(t)
  36. kvc := toGRPC(clus.RandClient()).KV
  37. key := []byte("foo")
  38. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  39. respput, err := kvc.Put(context.TODO(), reqput)
  40. if err != nil {
  41. t.Fatalf("couldn't put key (%v)", err)
  42. }
  43. // overwrite
  44. reqput.Value = []byte("baz")
  45. respput2, err := kvc.Put(context.TODO(), reqput)
  46. if err != nil {
  47. t.Fatalf("couldn't put key (%v)", err)
  48. }
  49. if respput2.Header.Revision <= respput.Header.Revision {
  50. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  51. respput2.Header.Revision, respput.Header.Revision)
  52. }
  53. reqrange := &pb.RangeRequest{Key: key}
  54. resprange, err := kvc.Range(context.TODO(), reqrange)
  55. if err != nil {
  56. t.Fatalf("couldn't get key (%v)", err)
  57. }
  58. if len(resprange.Kvs) != 1 {
  59. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  60. }
  61. kv := resprange.Kvs[0]
  62. if kv.ModRevision <= kv.CreateRevision {
  63. t.Errorf("expected modRev > createRev, got %d <= %d",
  64. kv.ModRevision, kv.CreateRevision)
  65. }
  66. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  67. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  68. }
  69. }
  70. // TestPutRestart checks if a put after an unrelated member restart succeeds
  71. func TestV3PutRestart(t *testing.T) {
  72. defer testutil.AfterTest(t)
  73. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  74. defer clus.Terminate(t)
  75. kvIdx := rand.Intn(3)
  76. kvc := toGRPC(clus.Client(kvIdx)).KV
  77. stopIdx := kvIdx
  78. for stopIdx == kvIdx {
  79. stopIdx = rand.Intn(3)
  80. }
  81. clus.clients[stopIdx].Close()
  82. clus.Members[stopIdx].Stop(t)
  83. clus.Members[stopIdx].Restart(t)
  84. c, cerr := NewClientV3(clus.Members[stopIdx])
  85. if cerr != nil {
  86. t.Fatalf("cannot create client: %v", cerr)
  87. }
  88. clus.clients[stopIdx] = c
  89. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  90. defer cancel()
  91. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  92. _, err := kvc.Put(ctx, reqput)
  93. if err != nil && err == ctx.Err() {
  94. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  95. }
  96. }
  97. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  98. func TestV3CompactCurrentRev(t *testing.T) {
  99. defer testutil.AfterTest(t)
  100. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  101. defer clus.Terminate(t)
  102. kvc := toGRPC(clus.RandClient()).KV
  103. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  104. for i := 0; i < 3; i++ {
  105. if _, err := kvc.Put(context.Background(), preq); err != nil {
  106. t.Fatalf("couldn't put key (%v)", err)
  107. }
  108. }
  109. // compact on current revision
  110. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  111. if err != nil {
  112. t.Fatalf("couldn't compact kv space (%v)", err)
  113. }
  114. // key still exists?
  115. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  116. if err != nil {
  117. t.Fatalf("couldn't get key after compaction (%v)", err)
  118. }
  119. }
  120. func TestV3TxnTooManyOps(t *testing.T) {
  121. defer testutil.AfterTest(t)
  122. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  123. defer clus.Terminate(t)
  124. kvc := toGRPC(clus.RandClient()).KV
  125. // unique keys
  126. i := new(int)
  127. keyf := func() []byte {
  128. *i++
  129. return []byte(fmt.Sprintf("key-%d", i))
  130. }
  131. addCompareOps := func(txn *pb.TxnRequest) {
  132. txn.Compare = append(txn.Compare,
  133. &pb.Compare{
  134. Result: pb.Compare_GREATER,
  135. Target: pb.Compare_CREATE,
  136. Key: keyf(),
  137. })
  138. }
  139. addSuccessOps := func(txn *pb.TxnRequest) {
  140. txn.Success = append(txn.Success,
  141. &pb.RequestOp{
  142. Request: &pb.RequestOp_RequestPut{
  143. RequestPut: &pb.PutRequest{
  144. Key: keyf(),
  145. Value: []byte("bar"),
  146. },
  147. },
  148. })
  149. }
  150. addFailureOps := func(txn *pb.TxnRequest) {
  151. txn.Failure = append(txn.Failure,
  152. &pb.RequestOp{
  153. Request: &pb.RequestOp_RequestPut{
  154. RequestPut: &pb.PutRequest{
  155. Key: keyf(),
  156. Value: []byte("bar"),
  157. },
  158. },
  159. })
  160. }
  161. tests := []func(txn *pb.TxnRequest){
  162. addCompareOps,
  163. addSuccessOps,
  164. addFailureOps,
  165. }
  166. for i, tt := range tests {
  167. txn := &pb.TxnRequest{}
  168. for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
  169. tt(txn)
  170. }
  171. _, err := kvc.Txn(context.Background(), txn)
  172. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  173. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  174. }
  175. }
  176. }
  177. func TestV3TxnDuplicateKeys(t *testing.T) {
  178. defer testutil.AfterTest(t)
  179. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  180. defer clus.Terminate(t)
  181. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  182. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  183. RequestDeleteRange: &pb.DeleteRangeRequest{
  184. Key: []byte("abc"),
  185. },
  186. },
  187. }
  188. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  189. RequestDeleteRange: &pb.DeleteRangeRequest{
  190. Key: []byte("a"), RangeEnd: []byte("b"),
  191. },
  192. },
  193. }
  194. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  195. RequestDeleteRange: &pb.DeleteRangeRequest{
  196. Key: []byte("abb"), RangeEnd: []byte("abc"),
  197. },
  198. },
  199. }
  200. kvc := toGRPC(clus.RandClient()).KV
  201. tests := []struct {
  202. txnSuccess []*pb.RequestOp
  203. werr error
  204. }{
  205. {
  206. txnSuccess: []*pb.RequestOp{putreq, putreq},
  207. werr: rpctypes.ErrGRPCDuplicateKey,
  208. },
  209. {
  210. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  211. werr: rpctypes.ErrGRPCDuplicateKey,
  212. },
  213. {
  214. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  215. werr: rpctypes.ErrGRPCDuplicateKey,
  216. },
  217. {
  218. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  219. werr: nil,
  220. },
  221. {
  222. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  223. werr: nil,
  224. },
  225. }
  226. for i, tt := range tests {
  227. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  228. _, err := kvc.Txn(context.Background(), txn)
  229. if !eqErrGRPC(err, tt.werr) {
  230. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  231. }
  232. }
  233. }
  234. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  235. func TestV3TxnRevision(t *testing.T) {
  236. defer testutil.AfterTest(t)
  237. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  238. defer clus.Terminate(t)
  239. kvc := toGRPC(clus.RandClient()).KV
  240. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  241. presp, err := kvc.Put(context.TODO(), pr)
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  246. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  247. tresp, err := kvc.Txn(context.TODO(), txn)
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. // did not update revision
  252. if presp.Header.Revision != tresp.Header.Revision {
  253. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  254. }
  255. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  256. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  257. tresp, err = kvc.Txn(context.TODO(), txn)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. // did not update revision
  262. if presp.Header.Revision != tresp.Header.Revision {
  263. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  264. }
  265. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  266. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  267. tresp, err = kvc.Txn(context.TODO(), txn)
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. // updated revision
  272. if tresp.Header.Revision != presp.Header.Revision+1 {
  273. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  274. }
  275. }
  276. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  277. func TestV3PutMissingLease(t *testing.T) {
  278. defer testutil.AfterTest(t)
  279. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  280. defer clus.Terminate(t)
  281. kvc := toGRPC(clus.RandClient()).KV
  282. key := []byte("foo")
  283. preq := &pb.PutRequest{Key: key, Lease: 123456}
  284. tests := []func(){
  285. // put case
  286. func() {
  287. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  288. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  289. }
  290. },
  291. // txn success case
  292. func() {
  293. txn := &pb.TxnRequest{}
  294. txn.Success = append(txn.Success, &pb.RequestOp{
  295. Request: &pb.RequestOp_RequestPut{
  296. RequestPut: preq}})
  297. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  298. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  299. }
  300. },
  301. // txn failure case
  302. func() {
  303. txn := &pb.TxnRequest{}
  304. txn.Failure = append(txn.Failure, &pb.RequestOp{
  305. Request: &pb.RequestOp_RequestPut{
  306. RequestPut: preq}})
  307. cmp := &pb.Compare{
  308. Result: pb.Compare_GREATER,
  309. Target: pb.Compare_CREATE,
  310. Key: []byte("bar"),
  311. }
  312. txn.Compare = append(txn.Compare, cmp)
  313. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  314. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  315. }
  316. },
  317. // ignore bad lease in failure on success txn
  318. func() {
  319. txn := &pb.TxnRequest{}
  320. rreq := &pb.RangeRequest{Key: []byte("bar")}
  321. txn.Success = append(txn.Success, &pb.RequestOp{
  322. Request: &pb.RequestOp_RequestRange{
  323. RequestRange: rreq}})
  324. txn.Failure = append(txn.Failure, &pb.RequestOp{
  325. Request: &pb.RequestOp_RequestPut{
  326. RequestPut: preq}})
  327. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  328. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  329. }
  330. },
  331. }
  332. for i, f := range tests {
  333. f()
  334. // key shouldn't have been stored
  335. rreq := &pb.RangeRequest{Key: key}
  336. rresp, err := kvc.Range(context.TODO(), rreq)
  337. if err != nil {
  338. t.Errorf("#%d. could not rangereq (%v)", i, err)
  339. } else if len(rresp.Kvs) != 0 {
  340. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  341. }
  342. }
  343. }
  344. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  345. func TestV3DeleteRange(t *testing.T) {
  346. defer testutil.AfterTest(t)
  347. tests := []struct {
  348. keySet []string
  349. begin string
  350. end string
  351. prevKV bool
  352. wantSet [][]byte
  353. deleted int64
  354. }{
  355. // delete middle
  356. {
  357. []string{"foo", "foo/abc", "fop"},
  358. "foo/", "fop", false,
  359. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  360. },
  361. // no delete
  362. {
  363. []string{"foo", "foo/abc", "fop"},
  364. "foo/", "foo/", false,
  365. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  366. },
  367. // delete first
  368. {
  369. []string{"foo", "foo/abc", "fop"},
  370. "fo", "fop", false,
  371. [][]byte{[]byte("fop")}, 2,
  372. },
  373. // delete tail
  374. {
  375. []string{"foo", "foo/abc", "fop"},
  376. "foo/", "fos", false,
  377. [][]byte{[]byte("foo")}, 2,
  378. },
  379. // delete exact
  380. {
  381. []string{"foo", "foo/abc", "fop"},
  382. "foo/abc", "", false,
  383. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  384. },
  385. // delete none, [x,x)
  386. {
  387. []string{"foo"},
  388. "foo", "foo", false,
  389. [][]byte{[]byte("foo")}, 0,
  390. },
  391. // delete middle with preserveKVs set
  392. {
  393. []string{"foo", "foo/abc", "fop"},
  394. "foo/", "fop", true,
  395. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  396. },
  397. }
  398. for i, tt := range tests {
  399. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  400. kvc := toGRPC(clus.RandClient()).KV
  401. ks := tt.keySet
  402. for j := range ks {
  403. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  404. _, err := kvc.Put(context.TODO(), reqput)
  405. if err != nil {
  406. t.Fatalf("couldn't put key (%v)", err)
  407. }
  408. }
  409. dreq := &pb.DeleteRangeRequest{
  410. Key: []byte(tt.begin),
  411. RangeEnd: []byte(tt.end),
  412. PrevKv: tt.prevKV,
  413. }
  414. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  415. if err != nil {
  416. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  417. }
  418. if tt.deleted != dresp.Deleted {
  419. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  420. }
  421. if tt.prevKV {
  422. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  423. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  424. }
  425. }
  426. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  427. rresp, err := kvc.Range(context.TODO(), rreq)
  428. if err != nil {
  429. t.Errorf("couldn't get range on test %v (%v)", i, err)
  430. }
  431. if dresp.Header.Revision != rresp.Header.Revision {
  432. t.Errorf("expected revision %v, got %v",
  433. dresp.Header.Revision, rresp.Header.Revision)
  434. }
  435. keys := [][]byte{}
  436. for j := range rresp.Kvs {
  437. keys = append(keys, rresp.Kvs[j].Key)
  438. }
  439. if !reflect.DeepEqual(tt.wantSet, keys) {
  440. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  441. }
  442. // can't defer because tcp ports will be in use
  443. clus.Terminate(t)
  444. }
  445. }
  446. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  447. func TestV3TxnInvalidRange(t *testing.T) {
  448. defer testutil.AfterTest(t)
  449. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  450. defer clus.Terminate(t)
  451. kvc := toGRPC(clus.RandClient()).KV
  452. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  453. for i := 0; i < 3; i++ {
  454. _, err := kvc.Put(context.Background(), preq)
  455. if err != nil {
  456. t.Fatalf("couldn't put key (%v)", err)
  457. }
  458. }
  459. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  460. if err != nil {
  461. t.Fatalf("couldn't compact kv space (%v)", err)
  462. }
  463. // future rev
  464. txn := &pb.TxnRequest{}
  465. txn.Success = append(txn.Success, &pb.RequestOp{
  466. Request: &pb.RequestOp_RequestPut{
  467. RequestPut: preq}})
  468. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  469. txn.Success = append(txn.Success, &pb.RequestOp{
  470. Request: &pb.RequestOp_RequestRange{
  471. RequestRange: rreq}})
  472. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  473. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  474. }
  475. // compacted rev
  476. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  477. tv.RequestRange.Revision = 1
  478. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  479. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  480. }
  481. }
  482. func TestV3TooLargeRequest(t *testing.T) {
  483. defer testutil.AfterTest(t)
  484. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  485. defer clus.Terminate(t)
  486. kvc := toGRPC(clus.RandClient()).KV
  487. // 2MB request value
  488. largeV := make([]byte, 2*1024*1024)
  489. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  490. _, err := kvc.Put(context.Background(), preq)
  491. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  492. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  493. }
  494. }
  495. // TestV3Hash tests hash.
  496. func TestV3Hash(t *testing.T) {
  497. defer testutil.AfterTest(t)
  498. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  499. defer clus.Terminate(t)
  500. cli := clus.RandClient()
  501. kvc := toGRPC(cli).KV
  502. m := toGRPC(cli).Maintenance
  503. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  504. for i := 0; i < 3; i++ {
  505. _, err := kvc.Put(context.Background(), preq)
  506. if err != nil {
  507. t.Fatalf("couldn't put key (%v)", err)
  508. }
  509. }
  510. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  511. if err != nil || resp.Hash == 0 {
  512. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  513. }
  514. }
  515. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  516. func TestV3StorageQuotaAPI(t *testing.T) {
  517. defer testutil.AfterTest(t)
  518. quotasize := int64(16 * os.Getpagesize())
  519. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  520. // Set a quota on one node
  521. clus.Members[0].QuotaBackendBytes = quotasize
  522. clus.Members[0].Stop(t)
  523. clus.Members[0].Restart(t)
  524. defer clus.Terminate(t)
  525. kvc := toGRPC(clus.Client(0)).KV
  526. key := []byte("abc")
  527. // test small put that fits in quota
  528. smallbuf := make([]byte, 512)
  529. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false)); err != nil {
  530. t.Fatal(err)
  531. }
  532. // test big put
  533. bigbuf := make([]byte, quotasize)
  534. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  535. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  536. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  537. }
  538. // test big txn
  539. puttxn := &pb.RequestOp{
  540. Request: &pb.RequestOp_RequestPut{
  541. RequestPut: &pb.PutRequest{
  542. Key: key,
  543. Value: bigbuf,
  544. },
  545. },
  546. }
  547. txnreq := &pb.TxnRequest{}
  548. txnreq.Success = append(txnreq.Success, puttxn)
  549. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  550. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  551. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  552. }
  553. }
  554. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  555. func TestV3StorageQuotaApply(t *testing.T) {
  556. testutil.AfterTest(t)
  557. quotasize := int64(16 * os.Getpagesize())
  558. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  559. defer clus.Terminate(t)
  560. kvc0 := toGRPC(clus.Client(0)).KV
  561. kvc1 := toGRPC(clus.Client(1)).KV
  562. // Set a quota on one node
  563. clus.Members[0].QuotaBackendBytes = quotasize
  564. clus.Members[0].Stop(t)
  565. clus.Members[0].Restart(t)
  566. clus.waitLeader(t, clus.Members)
  567. key := []byte("abc")
  568. // test small put still works
  569. smallbuf := make([]byte, 1024)
  570. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false))
  571. if serr != nil {
  572. t.Fatal(serr)
  573. }
  574. // test big put
  575. bigbuf := make([]byte, quotasize)
  576. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  577. if err != nil {
  578. t.Fatal(err)
  579. }
  580. // quorum get should work regardless of whether alarm is raised
  581. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. // wait until alarm is raised for sure-- poll the alarms
  586. stopc := time.After(5 * time.Second)
  587. for {
  588. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  589. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  590. if aerr != nil {
  591. t.Fatal(aerr)
  592. }
  593. if len(resp.Alarms) != 0 {
  594. break
  595. }
  596. select {
  597. case <-stopc:
  598. t.Fatalf("timed out waiting for alarm")
  599. case <-time.After(10 * time.Millisecond):
  600. }
  601. }
  602. // small quota machine should reject put
  603. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  604. t.Fatalf("past-quota instance should reject put")
  605. }
  606. // large quota machine should reject put
  607. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  608. t.Fatalf("past-quota instance should reject put")
  609. }
  610. // reset large quota node to ensure alarm persisted
  611. clus.Members[1].Stop(t)
  612. clus.Members[1].Restart(t)
  613. clus.waitLeader(t, clus.Members)
  614. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  615. t.Fatalf("alarmed instance should reject put after reset")
  616. }
  617. }
  618. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  619. func TestV3AlarmDeactivate(t *testing.T) {
  620. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  621. defer clus.Terminate(t)
  622. kvc := toGRPC(clus.RandClient()).KV
  623. mt := toGRPC(clus.RandClient()).Maintenance
  624. alarmReq := &pb.AlarmRequest{
  625. MemberID: 123,
  626. Action: pb.AlarmRequest_ACTIVATE,
  627. Alarm: pb.AlarmType_NOSPACE,
  628. }
  629. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  630. t.Fatal(err)
  631. }
  632. key := []byte("abc")
  633. smallbuf := make([]byte, 512)
  634. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  635. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  636. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  637. }
  638. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  639. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  640. t.Fatal(err)
  641. }
  642. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  643. t.Fatal(err)
  644. }
  645. }
  646. func TestV3RangeRequest(t *testing.T) {
  647. defer testutil.AfterTest(t)
  648. tests := []struct {
  649. putKeys []string
  650. reqs []pb.RangeRequest
  651. wresps [][]string
  652. wmores []bool
  653. }{
  654. // single key
  655. {
  656. []string{"foo", "bar"},
  657. []pb.RangeRequest{
  658. // exists
  659. {Key: []byte("foo")},
  660. // doesn't exist
  661. {Key: []byte("baz")},
  662. },
  663. [][]string{
  664. {"foo"},
  665. {},
  666. },
  667. []bool{false, false},
  668. },
  669. // multi-key
  670. {
  671. []string{"a", "b", "c", "d", "e"},
  672. []pb.RangeRequest{
  673. // all in range
  674. {Key: []byte("a"), RangeEnd: []byte("z")},
  675. // [b, d)
  676. {Key: []byte("b"), RangeEnd: []byte("d")},
  677. // out of range
  678. {Key: []byte("f"), RangeEnd: []byte("z")},
  679. // [c,c) = empty
  680. {Key: []byte("c"), RangeEnd: []byte("c")},
  681. // [d, b) = empty
  682. {Key: []byte("d"), RangeEnd: []byte("b")},
  683. // ["\0", "\0") => all in range
  684. {Key: []byte{0}, RangeEnd: []byte{0}},
  685. },
  686. [][]string{
  687. {"a", "b", "c", "d", "e"},
  688. {"b", "c"},
  689. {},
  690. {},
  691. {},
  692. {"a", "b", "c", "d", "e"},
  693. },
  694. []bool{false, false, false, false, false, false},
  695. },
  696. // revision
  697. {
  698. []string{"a", "b", "c", "d", "e"},
  699. []pb.RangeRequest{
  700. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  701. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  702. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  703. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  704. },
  705. [][]string{
  706. {"a", "b", "c", "d", "e"},
  707. {},
  708. {"a"},
  709. {"a", "b"},
  710. },
  711. []bool{false, false, false, false},
  712. },
  713. // limit
  714. {
  715. []string{"foo", "bar"},
  716. []pb.RangeRequest{
  717. // more
  718. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  719. // no more
  720. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  721. },
  722. [][]string{
  723. {"bar"},
  724. {"bar", "foo"},
  725. },
  726. []bool{true, false},
  727. },
  728. // sort
  729. {
  730. []string{"b", "a", "c", "d", "c"},
  731. []pb.RangeRequest{
  732. {
  733. Key: []byte("a"), RangeEnd: []byte("z"),
  734. Limit: 1,
  735. SortOrder: pb.RangeRequest_ASCEND,
  736. SortTarget: pb.RangeRequest_KEY,
  737. },
  738. {
  739. Key: []byte("a"), RangeEnd: []byte("z"),
  740. Limit: 1,
  741. SortOrder: pb.RangeRequest_DESCEND,
  742. SortTarget: pb.RangeRequest_KEY,
  743. },
  744. {
  745. Key: []byte("a"), RangeEnd: []byte("z"),
  746. Limit: 1,
  747. SortOrder: pb.RangeRequest_ASCEND,
  748. SortTarget: pb.RangeRequest_CREATE,
  749. },
  750. {
  751. Key: []byte("a"), RangeEnd: []byte("z"),
  752. Limit: 1,
  753. SortOrder: pb.RangeRequest_DESCEND,
  754. SortTarget: pb.RangeRequest_MOD,
  755. },
  756. {
  757. Key: []byte("z"), RangeEnd: []byte("z"),
  758. Limit: 1,
  759. SortOrder: pb.RangeRequest_DESCEND,
  760. SortTarget: pb.RangeRequest_CREATE,
  761. },
  762. { // sort ASCEND by default
  763. Key: []byte("a"), RangeEnd: []byte("z"),
  764. Limit: 10,
  765. SortOrder: pb.RangeRequest_NONE,
  766. SortTarget: pb.RangeRequest_CREATE,
  767. },
  768. },
  769. [][]string{
  770. {"a"},
  771. {"d"},
  772. {"b"},
  773. {"c"},
  774. {},
  775. {"b", "a", "c", "d"},
  776. },
  777. []bool{true, true, true, true, false, false},
  778. },
  779. // min/max mod rev
  780. {
  781. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  782. []pb.RangeRequest{
  783. {
  784. Key: []byte{0}, RangeEnd: []byte{0},
  785. MinModRevision: 3,
  786. },
  787. {
  788. Key: []byte{0}, RangeEnd: []byte{0},
  789. MaxModRevision: 3,
  790. },
  791. {
  792. Key: []byte{0}, RangeEnd: []byte{0},
  793. MinModRevision: 3,
  794. MaxModRevision: 5,
  795. },
  796. {
  797. Key: []byte{0}, RangeEnd: []byte{0},
  798. MaxModRevision: 10,
  799. },
  800. },
  801. [][]string{
  802. {"rev3", "rev4", "rev5", "rev6"},
  803. {"rev2", "rev3"},
  804. {"rev3", "rev4", "rev5"},
  805. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  806. },
  807. []bool{false, false, false, false},
  808. },
  809. // min/max create rev
  810. {
  811. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  812. []pb.RangeRequest{
  813. {
  814. Key: []byte{0}, RangeEnd: []byte{0},
  815. MinCreateRevision: 3,
  816. },
  817. {
  818. Key: []byte{0}, RangeEnd: []byte{0},
  819. MaxCreateRevision: 3,
  820. },
  821. {
  822. Key: []byte{0}, RangeEnd: []byte{0},
  823. MinCreateRevision: 3,
  824. MaxCreateRevision: 5,
  825. },
  826. {
  827. Key: []byte{0}, RangeEnd: []byte{0},
  828. MaxCreateRevision: 10,
  829. },
  830. },
  831. [][]string{
  832. {"rev3", "rev6"},
  833. {"rev2", "rev3"},
  834. {"rev3"},
  835. {"rev2", "rev3", "rev6"},
  836. },
  837. []bool{false, false, false, false},
  838. },
  839. }
  840. for i, tt := range tests {
  841. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  842. for _, k := range tt.putKeys {
  843. kvc := toGRPC(clus.RandClient()).KV
  844. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  845. if _, err := kvc.Put(context.TODO(), req); err != nil {
  846. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  847. }
  848. }
  849. for j, req := range tt.reqs {
  850. kvc := toGRPC(clus.RandClient()).KV
  851. resp, err := kvc.Range(context.TODO(), &req)
  852. if err != nil {
  853. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  854. continue
  855. }
  856. if len(resp.Kvs) != len(tt.wresps[j]) {
  857. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  858. continue
  859. }
  860. for k, wKey := range tt.wresps[j] {
  861. respKey := string(resp.Kvs[k].Key)
  862. if respKey != wKey {
  863. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  864. }
  865. }
  866. if resp.More != tt.wmores[j] {
  867. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  868. }
  869. wrev := int64(len(tt.putKeys) + 1)
  870. if resp.Header.Revision != wrev {
  871. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  872. }
  873. }
  874. clus.Terminate(t)
  875. }
  876. }
  877. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  878. cfg.UseGRPC = true
  879. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  880. clus.Launch(t)
  881. return clus
  882. }
  883. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  884. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  885. defer testutil.AfterTest(t)
  886. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  887. clus := newClusterV3NoClients(t, &cfg)
  888. defer clus.Terminate(t)
  889. // nil out TLS field so client will use an insecure connection
  890. clus.Members[0].ClientTLSInfo = nil
  891. client, err := NewClientV3(clus.Members[0])
  892. if err != nil && err != grpc.ErrClientConnTimeout {
  893. t.Fatalf("unexpected error (%v)", err)
  894. } else if client == nil {
  895. // Ideally, no client would be returned. However, grpc will
  896. // return a connection without trying to handshake first so
  897. // the connection appears OK.
  898. return
  899. }
  900. defer client.Close()
  901. donec := make(chan error, 1)
  902. go func() {
  903. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  904. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  905. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  906. cancel()
  907. donec <- perr
  908. }()
  909. if perr := <-donec; perr == nil {
  910. t.Fatalf("expected client error on put")
  911. }
  912. }
  913. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  914. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  915. defer testutil.AfterTest(t)
  916. cfg := ClusterConfig{Size: 3}
  917. clus := newClusterV3NoClients(t, &cfg)
  918. defer clus.Terminate(t)
  919. clus.Members[0].ClientTLSInfo = &testTLSInfo
  920. client, err := NewClientV3(clus.Members[0])
  921. if client != nil || err == nil {
  922. t.Fatalf("expected no client")
  923. } else if err != grpc.ErrClientConnTimeout {
  924. t.Fatalf("unexpected error (%v)", err)
  925. }
  926. }
  927. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  928. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  929. defer testutil.AfterTest(t)
  930. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  931. clus := newClusterV3NoClients(t, &cfg)
  932. defer clus.Terminate(t)
  933. client, err := NewClientV3(clus.Members[0])
  934. if err != nil {
  935. t.Fatalf("expected tls client (%v)", err)
  936. }
  937. defer client.Close()
  938. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  939. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  940. t.Fatalf("unexpected error on put over tls (%v)", err)
  941. }
  942. }
  943. func TestGRPCRequireLeader(t *testing.T) {
  944. defer testutil.AfterTest(t)
  945. cfg := ClusterConfig{Size: 3}
  946. clus := newClusterV3NoClients(t, &cfg)
  947. defer clus.Terminate(t)
  948. clus.Members[1].Stop(t)
  949. clus.Members[2].Stop(t)
  950. client, err := NewClientV3(clus.Members[0])
  951. if err != nil {
  952. t.Fatalf("cannot create client: %v", err)
  953. }
  954. defer client.Close()
  955. // wait for election timeout, then member[0] will not have a leader.
  956. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  957. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  958. ctx := metadata.NewContext(context.Background(), md)
  959. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  960. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  961. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  962. }
  963. }
  964. func TestGRPCStreamRequireLeader(t *testing.T) {
  965. defer testutil.AfterTest(t)
  966. cfg := ClusterConfig{Size: 3}
  967. clus := newClusterV3NoClients(t, &cfg)
  968. defer clus.Terminate(t)
  969. client, err := NewClientV3(clus.Members[0])
  970. if err != nil {
  971. t.Fatalf("failed to create client (%v)", err)
  972. }
  973. defer client.Close()
  974. wAPI := toGRPC(client).Watch
  975. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  976. ctx := metadata.NewContext(context.Background(), md)
  977. wStream, err := wAPI.Watch(ctx)
  978. if err != nil {
  979. t.Fatalf("wAPI.Watch error: %v", err)
  980. }
  981. clus.Members[1].Stop(t)
  982. clus.Members[2].Stop(t)
  983. // existing stream should be rejected
  984. _, err = wStream.Recv()
  985. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  986. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  987. }
  988. // new stream should also be rejected
  989. wStream, err = wAPI.Watch(ctx)
  990. if err != nil {
  991. t.Fatalf("wAPI.Watch error: %v", err)
  992. }
  993. _, err = wStream.Recv()
  994. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  995. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  996. }
  997. clus.Members[1].Restart(t)
  998. clus.Members[2].Restart(t)
  999. clus.waitLeader(t, clus.Members)
  1000. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1001. // new stream should also be OK now after we restarted the other members
  1002. wStream, err = wAPI.Watch(ctx)
  1003. if err != nil {
  1004. t.Fatalf("wAPI.Watch error: %v", err)
  1005. }
  1006. wreq := &pb.WatchRequest{
  1007. RequestUnion: &pb.WatchRequest_CreateRequest{
  1008. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1009. },
  1010. }
  1011. err = wStream.Send(wreq)
  1012. if err != nil {
  1013. t.Errorf("err = %v, want nil", err)
  1014. }
  1015. }
  1016. func eqErrGRPC(err1 error, err2 error) bool {
  1017. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1018. }