v3_grpc_test.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "os"
  19. "reflect"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  23. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. )
  30. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  31. // overwrites it, then checks that the change was applied.
  32. func TestV3PutOverwrite(t *testing.T) {
  33. defer testutil.AfterTest(t)
  34. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  35. defer clus.Terminate(t)
  36. kvc := toGRPC(clus.RandClient()).KV
  37. key := []byte("foo")
  38. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  39. respput, err := kvc.Put(context.TODO(), reqput)
  40. if err != nil {
  41. t.Fatalf("couldn't put key (%v)", err)
  42. }
  43. // overwrite
  44. reqput.Value = []byte("baz")
  45. respput2, err := kvc.Put(context.TODO(), reqput)
  46. if err != nil {
  47. t.Fatalf("couldn't put key (%v)", err)
  48. }
  49. if respput2.Header.Revision <= respput.Header.Revision {
  50. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  51. respput2.Header.Revision, respput.Header.Revision)
  52. }
  53. reqrange := &pb.RangeRequest{Key: key}
  54. resprange, err := kvc.Range(context.TODO(), reqrange)
  55. if err != nil {
  56. t.Fatalf("couldn't get key (%v)", err)
  57. }
  58. if len(resprange.Kvs) != 1 {
  59. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  60. }
  61. kv := resprange.Kvs[0]
  62. if kv.ModRevision <= kv.CreateRevision {
  63. t.Errorf("expected modRev > createRev, got %d <= %d",
  64. kv.ModRevision, kv.CreateRevision)
  65. }
  66. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  67. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  68. }
  69. }
  70. // TestPutRestart checks if a put after an unrelated member restart succeeds
  71. func TestV3PutRestart(t *testing.T) {
  72. defer testutil.AfterTest(t)
  73. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  74. defer clus.Terminate(t)
  75. kvIdx := rand.Intn(3)
  76. kvc := toGRPC(clus.Client(kvIdx)).KV
  77. stopIdx := kvIdx
  78. for stopIdx == kvIdx {
  79. stopIdx = rand.Intn(3)
  80. }
  81. clus.clients[stopIdx].Close()
  82. clus.Members[stopIdx].Stop(t)
  83. clus.Members[stopIdx].Restart(t)
  84. c, cerr := NewClientV3(clus.Members[stopIdx])
  85. if cerr != nil {
  86. t.Fatalf("cannot create client: %v", cerr)
  87. }
  88. clus.clients[stopIdx] = c
  89. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  90. defer cancel()
  91. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  92. _, err := kvc.Put(ctx, reqput)
  93. if err != nil && err == ctx.Err() {
  94. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  95. }
  96. }
  97. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  98. func TestV3CompactCurrentRev(t *testing.T) {
  99. defer testutil.AfterTest(t)
  100. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  101. defer clus.Terminate(t)
  102. kvc := toGRPC(clus.RandClient()).KV
  103. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  104. for i := 0; i < 3; i++ {
  105. if _, err := kvc.Put(context.Background(), preq); err != nil {
  106. t.Fatalf("couldn't put key (%v)", err)
  107. }
  108. }
  109. // compact on current revision
  110. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  111. if err != nil {
  112. t.Fatalf("couldn't compact kv space (%v)", err)
  113. }
  114. // key still exists?
  115. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  116. if err != nil {
  117. t.Fatalf("couldn't get key after compaction (%v)", err)
  118. }
  119. }
  120. func TestV3TxnTooManyOps(t *testing.T) {
  121. defer testutil.AfterTest(t)
  122. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  123. defer clus.Terminate(t)
  124. kvc := toGRPC(clus.RandClient()).KV
  125. // unique keys
  126. i := new(int)
  127. keyf := func() []byte {
  128. *i++
  129. return []byte(fmt.Sprintf("key-%d", i))
  130. }
  131. addCompareOps := func(txn *pb.TxnRequest) {
  132. txn.Compare = append(txn.Compare,
  133. &pb.Compare{
  134. Result: pb.Compare_GREATER,
  135. Target: pb.Compare_CREATE,
  136. Key: keyf(),
  137. })
  138. }
  139. addSuccessOps := func(txn *pb.TxnRequest) {
  140. txn.Success = append(txn.Success,
  141. &pb.RequestOp{
  142. Request: &pb.RequestOp_RequestPut{
  143. RequestPut: &pb.PutRequest{
  144. Key: keyf(),
  145. Value: []byte("bar"),
  146. },
  147. },
  148. })
  149. }
  150. addFailureOps := func(txn *pb.TxnRequest) {
  151. txn.Failure = append(txn.Failure,
  152. &pb.RequestOp{
  153. Request: &pb.RequestOp_RequestPut{
  154. RequestPut: &pb.PutRequest{
  155. Key: keyf(),
  156. Value: []byte("bar"),
  157. },
  158. },
  159. })
  160. }
  161. tests := []func(txn *pb.TxnRequest){
  162. addCompareOps,
  163. addSuccessOps,
  164. addFailureOps,
  165. }
  166. for i, tt := range tests {
  167. txn := &pb.TxnRequest{}
  168. for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
  169. tt(txn)
  170. }
  171. _, err := kvc.Txn(context.Background(), txn)
  172. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  173. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  174. }
  175. }
  176. }
  177. func TestV3TxnDuplicateKeys(t *testing.T) {
  178. defer testutil.AfterTest(t)
  179. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  180. defer clus.Terminate(t)
  181. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  182. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  183. RequestDeleteRange: &pb.DeleteRangeRequest{
  184. Key: []byte("abc"),
  185. },
  186. },
  187. }
  188. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  189. RequestDeleteRange: &pb.DeleteRangeRequest{
  190. Key: []byte("a"), RangeEnd: []byte("b"),
  191. },
  192. },
  193. }
  194. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  195. RequestDeleteRange: &pb.DeleteRangeRequest{
  196. Key: []byte("abb"), RangeEnd: []byte("abc"),
  197. },
  198. },
  199. }
  200. kvc := toGRPC(clus.RandClient()).KV
  201. tests := []struct {
  202. txnSuccess []*pb.RequestOp
  203. werr error
  204. }{
  205. {
  206. txnSuccess: []*pb.RequestOp{putreq, putreq},
  207. werr: rpctypes.ErrGRPCDuplicateKey,
  208. },
  209. {
  210. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  211. werr: rpctypes.ErrGRPCDuplicateKey,
  212. },
  213. {
  214. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  215. werr: rpctypes.ErrGRPCDuplicateKey,
  216. },
  217. {
  218. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  219. werr: nil,
  220. },
  221. {
  222. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  223. werr: nil,
  224. },
  225. }
  226. for i, tt := range tests {
  227. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  228. _, err := kvc.Txn(context.Background(), txn)
  229. if !eqErrGRPC(err, tt.werr) {
  230. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  231. }
  232. }
  233. }
  234. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  235. func TestV3TxnRevision(t *testing.T) {
  236. defer testutil.AfterTest(t)
  237. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  238. defer clus.Terminate(t)
  239. kvc := toGRPC(clus.RandClient()).KV
  240. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  241. presp, err := kvc.Put(context.TODO(), pr)
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  246. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  247. tresp, err := kvc.Txn(context.TODO(), txn)
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. // did not update revision
  252. if presp.Header.Revision != tresp.Header.Revision {
  253. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  254. }
  255. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  256. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  257. tresp, err = kvc.Txn(context.TODO(), txn)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. // did not update revision
  262. if presp.Header.Revision != tresp.Header.Revision {
  263. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  264. }
  265. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  266. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  267. tresp, err = kvc.Txn(context.TODO(), txn)
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. // updated revision
  272. if tresp.Header.Revision != presp.Header.Revision+1 {
  273. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  274. }
  275. }
  276. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  277. func TestV3PutMissingLease(t *testing.T) {
  278. defer testutil.AfterTest(t)
  279. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  280. defer clus.Terminate(t)
  281. kvc := toGRPC(clus.RandClient()).KV
  282. key := []byte("foo")
  283. preq := &pb.PutRequest{Key: key, Lease: 123456}
  284. tests := []func(){
  285. // put case
  286. func() {
  287. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  288. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  289. }
  290. },
  291. // txn success case
  292. func() {
  293. txn := &pb.TxnRequest{}
  294. txn.Success = append(txn.Success, &pb.RequestOp{
  295. Request: &pb.RequestOp_RequestPut{
  296. RequestPut: preq}})
  297. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  298. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  299. }
  300. },
  301. // txn failure case
  302. func() {
  303. txn := &pb.TxnRequest{}
  304. txn.Failure = append(txn.Failure, &pb.RequestOp{
  305. Request: &pb.RequestOp_RequestPut{
  306. RequestPut: preq}})
  307. cmp := &pb.Compare{
  308. Result: pb.Compare_GREATER,
  309. Target: pb.Compare_CREATE,
  310. Key: []byte("bar"),
  311. }
  312. txn.Compare = append(txn.Compare, cmp)
  313. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  314. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  315. }
  316. },
  317. // ignore bad lease in failure on success txn
  318. func() {
  319. txn := &pb.TxnRequest{}
  320. rreq := &pb.RangeRequest{Key: []byte("bar")}
  321. txn.Success = append(txn.Success, &pb.RequestOp{
  322. Request: &pb.RequestOp_RequestRange{
  323. RequestRange: rreq}})
  324. txn.Failure = append(txn.Failure, &pb.RequestOp{
  325. Request: &pb.RequestOp_RequestPut{
  326. RequestPut: preq}})
  327. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  328. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  329. }
  330. },
  331. }
  332. for i, f := range tests {
  333. f()
  334. // key shouldn't have been stored
  335. rreq := &pb.RangeRequest{Key: key}
  336. rresp, err := kvc.Range(context.TODO(), rreq)
  337. if err != nil {
  338. t.Errorf("#%d. could not rangereq (%v)", i, err)
  339. } else if len(rresp.Kvs) != 0 {
  340. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  341. }
  342. }
  343. }
  344. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  345. func TestV3DeleteRange(t *testing.T) {
  346. defer testutil.AfterTest(t)
  347. tests := []struct {
  348. keySet []string
  349. begin string
  350. end string
  351. prevKV bool
  352. wantSet [][]byte
  353. deleted int64
  354. }{
  355. // delete middle
  356. {
  357. []string{"foo", "foo/abc", "fop"},
  358. "foo/", "fop", false,
  359. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  360. },
  361. // no delete
  362. {
  363. []string{"foo", "foo/abc", "fop"},
  364. "foo/", "foo/", false,
  365. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  366. },
  367. // delete first
  368. {
  369. []string{"foo", "foo/abc", "fop"},
  370. "fo", "fop", false,
  371. [][]byte{[]byte("fop")}, 2,
  372. },
  373. // delete tail
  374. {
  375. []string{"foo", "foo/abc", "fop"},
  376. "foo/", "fos", false,
  377. [][]byte{[]byte("foo")}, 2,
  378. },
  379. // delete exact
  380. {
  381. []string{"foo", "foo/abc", "fop"},
  382. "foo/abc", "", false,
  383. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  384. },
  385. // delete none, [x,x)
  386. {
  387. []string{"foo"},
  388. "foo", "foo", false,
  389. [][]byte{[]byte("foo")}, 0,
  390. },
  391. // delete middle with preserveKVs set
  392. {
  393. []string{"foo", "foo/abc", "fop"},
  394. "foo/", "fop", true,
  395. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  396. },
  397. }
  398. for i, tt := range tests {
  399. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  400. kvc := toGRPC(clus.RandClient()).KV
  401. ks := tt.keySet
  402. for j := range ks {
  403. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  404. _, err := kvc.Put(context.TODO(), reqput)
  405. if err != nil {
  406. t.Fatalf("couldn't put key (%v)", err)
  407. }
  408. }
  409. dreq := &pb.DeleteRangeRequest{
  410. Key: []byte(tt.begin),
  411. RangeEnd: []byte(tt.end),
  412. PrevKv: tt.prevKV,
  413. }
  414. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  415. if err != nil {
  416. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  417. }
  418. if tt.deleted != dresp.Deleted {
  419. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  420. }
  421. if tt.prevKV {
  422. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  423. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  424. }
  425. }
  426. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  427. rresp, err := kvc.Range(context.TODO(), rreq)
  428. if err != nil {
  429. t.Errorf("couldn't get range on test %v (%v)", i, err)
  430. }
  431. if dresp.Header.Revision != rresp.Header.Revision {
  432. t.Errorf("expected revision %v, got %v",
  433. dresp.Header.Revision, rresp.Header.Revision)
  434. }
  435. keys := [][]byte{}
  436. for j := range rresp.Kvs {
  437. keys = append(keys, rresp.Kvs[j].Key)
  438. }
  439. if !reflect.DeepEqual(tt.wantSet, keys) {
  440. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  441. }
  442. // can't defer because tcp ports will be in use
  443. clus.Terminate(t)
  444. }
  445. }
  446. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  447. func TestV3TxnInvalidRange(t *testing.T) {
  448. defer testutil.AfterTest(t)
  449. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  450. defer clus.Terminate(t)
  451. kvc := toGRPC(clus.RandClient()).KV
  452. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  453. for i := 0; i < 3; i++ {
  454. _, err := kvc.Put(context.Background(), preq)
  455. if err != nil {
  456. t.Fatalf("couldn't put key (%v)", err)
  457. }
  458. }
  459. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  460. if err != nil {
  461. t.Fatalf("couldn't compact kv space (%v)", err)
  462. }
  463. // future rev
  464. txn := &pb.TxnRequest{}
  465. txn.Success = append(txn.Success, &pb.RequestOp{
  466. Request: &pb.RequestOp_RequestPut{
  467. RequestPut: preq}})
  468. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  469. txn.Success = append(txn.Success, &pb.RequestOp{
  470. Request: &pb.RequestOp_RequestRange{
  471. RequestRange: rreq}})
  472. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  473. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  474. }
  475. // compacted rev
  476. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  477. tv.RequestRange.Revision = 1
  478. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  479. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  480. }
  481. }
  482. func TestV3TooLargeRequest(t *testing.T) {
  483. defer testutil.AfterTest(t)
  484. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  485. defer clus.Terminate(t)
  486. kvc := toGRPC(clus.RandClient()).KV
  487. // 2MB request value
  488. largeV := make([]byte, 2*1024*1024)
  489. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  490. _, err := kvc.Put(context.Background(), preq)
  491. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  492. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  493. }
  494. }
  495. // TestV3Hash tests hash.
  496. func TestV3Hash(t *testing.T) {
  497. defer testutil.AfterTest(t)
  498. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  499. defer clus.Terminate(t)
  500. cli := clus.RandClient()
  501. kvc := toGRPC(cli).KV
  502. m := toGRPC(cli).Maintenance
  503. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  504. for i := 0; i < 3; i++ {
  505. _, err := kvc.Put(context.Background(), preq)
  506. if err != nil {
  507. t.Fatalf("couldn't put key (%v)", err)
  508. }
  509. }
  510. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  511. if err != nil || resp.Hash == 0 {
  512. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  513. }
  514. }
  515. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  516. func TestV3StorageQuotaAPI(t *testing.T) {
  517. defer testutil.AfterTest(t)
  518. quotasize := int64(16 * os.Getpagesize())
  519. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  520. // Set a quota on one node
  521. clus.Members[0].QuotaBackendBytes = quotasize
  522. clus.Members[0].Stop(t)
  523. clus.Members[0].Restart(t)
  524. defer clus.Terminate(t)
  525. kvc := toGRPC(clus.Client(0)).KV
  526. waitForRestart(t, kvc)
  527. key := []byte("abc")
  528. // test small put that fits in quota
  529. smallbuf := make([]byte, 512)
  530. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  531. t.Fatal(err)
  532. }
  533. // test big put
  534. bigbuf := make([]byte, quotasize)
  535. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  536. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  537. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  538. }
  539. // test big txn
  540. puttxn := &pb.RequestOp{
  541. Request: &pb.RequestOp_RequestPut{
  542. RequestPut: &pb.PutRequest{
  543. Key: key,
  544. Value: bigbuf,
  545. },
  546. },
  547. }
  548. txnreq := &pb.TxnRequest{}
  549. txnreq.Success = append(txnreq.Success, puttxn)
  550. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  551. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  552. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  553. }
  554. }
  555. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  556. func TestV3StorageQuotaApply(t *testing.T) {
  557. testutil.AfterTest(t)
  558. quotasize := int64(16 * os.Getpagesize())
  559. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  560. defer clus.Terminate(t)
  561. kvc0 := toGRPC(clus.Client(0)).KV
  562. kvc1 := toGRPC(clus.Client(1)).KV
  563. // Set a quota on one node
  564. clus.Members[0].QuotaBackendBytes = quotasize
  565. clus.Members[0].Stop(t)
  566. clus.Members[0].Restart(t)
  567. clus.waitLeader(t, clus.Members)
  568. waitForRestart(t, kvc0)
  569. key := []byte("abc")
  570. // test small put still works
  571. smallbuf := make([]byte, 1024)
  572. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  573. if serr != nil {
  574. t.Fatal(serr)
  575. }
  576. // test big put
  577. bigbuf := make([]byte, quotasize)
  578. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. // quorum get should work regardless of whether alarm is raised
  583. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  584. if err != nil {
  585. t.Fatal(err)
  586. }
  587. // wait until alarm is raised for sure-- poll the alarms
  588. stopc := time.After(5 * time.Second)
  589. for {
  590. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  591. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  592. if aerr != nil {
  593. t.Fatal(aerr)
  594. }
  595. if len(resp.Alarms) != 0 {
  596. break
  597. }
  598. select {
  599. case <-stopc:
  600. t.Fatalf("timed out waiting for alarm")
  601. case <-time.After(10 * time.Millisecond):
  602. }
  603. }
  604. // small quota machine should reject put
  605. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  606. t.Fatalf("past-quota instance should reject put")
  607. }
  608. // large quota machine should reject put
  609. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  610. t.Fatalf("past-quota instance should reject put")
  611. }
  612. // reset large quota node to ensure alarm persisted
  613. clus.Members[1].Stop(t)
  614. clus.Members[1].Restart(t)
  615. clus.waitLeader(t, clus.Members)
  616. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  617. t.Fatalf("alarmed instance should reject put after reset")
  618. }
  619. }
  620. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  621. func TestV3AlarmDeactivate(t *testing.T) {
  622. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  623. defer clus.Terminate(t)
  624. kvc := toGRPC(clus.RandClient()).KV
  625. mt := toGRPC(clus.RandClient()).Maintenance
  626. alarmReq := &pb.AlarmRequest{
  627. MemberID: 123,
  628. Action: pb.AlarmRequest_ACTIVATE,
  629. Alarm: pb.AlarmType_NOSPACE,
  630. }
  631. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  632. t.Fatal(err)
  633. }
  634. key := []byte("abc")
  635. smallbuf := make([]byte, 512)
  636. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  637. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  638. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  639. }
  640. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  641. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  642. t.Fatal(err)
  643. }
  644. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  645. t.Fatal(err)
  646. }
  647. }
  648. func TestV3RangeRequest(t *testing.T) {
  649. defer testutil.AfterTest(t)
  650. tests := []struct {
  651. putKeys []string
  652. reqs []pb.RangeRequest
  653. wresps [][]string
  654. wmores []bool
  655. }{
  656. // single key
  657. {
  658. []string{"foo", "bar"},
  659. []pb.RangeRequest{
  660. // exists
  661. {Key: []byte("foo")},
  662. // doesn't exist
  663. {Key: []byte("baz")},
  664. },
  665. [][]string{
  666. {"foo"},
  667. {},
  668. },
  669. []bool{false, false},
  670. },
  671. // multi-key
  672. {
  673. []string{"a", "b", "c", "d", "e"},
  674. []pb.RangeRequest{
  675. // all in range
  676. {Key: []byte("a"), RangeEnd: []byte("z")},
  677. // [b, d)
  678. {Key: []byte("b"), RangeEnd: []byte("d")},
  679. // out of range
  680. {Key: []byte("f"), RangeEnd: []byte("z")},
  681. // [c,c) = empty
  682. {Key: []byte("c"), RangeEnd: []byte("c")},
  683. // [d, b) = empty
  684. {Key: []byte("d"), RangeEnd: []byte("b")},
  685. // ["\0", "\0") => all in range
  686. {Key: []byte{0}, RangeEnd: []byte{0}},
  687. },
  688. [][]string{
  689. {"a", "b", "c", "d", "e"},
  690. {"b", "c"},
  691. {},
  692. {},
  693. {},
  694. {"a", "b", "c", "d", "e"},
  695. },
  696. []bool{false, false, false, false, false, false},
  697. },
  698. // revision
  699. {
  700. []string{"a", "b", "c", "d", "e"},
  701. []pb.RangeRequest{
  702. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  703. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  704. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  705. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  706. },
  707. [][]string{
  708. {"a", "b", "c", "d", "e"},
  709. {},
  710. {"a"},
  711. {"a", "b"},
  712. },
  713. []bool{false, false, false, false},
  714. },
  715. // limit
  716. {
  717. []string{"foo", "bar"},
  718. []pb.RangeRequest{
  719. // more
  720. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  721. // no more
  722. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  723. },
  724. [][]string{
  725. {"bar"},
  726. {"bar", "foo"},
  727. },
  728. []bool{true, false},
  729. },
  730. // sort
  731. {
  732. []string{"b", "a", "c", "d", "c"},
  733. []pb.RangeRequest{
  734. {
  735. Key: []byte("a"), RangeEnd: []byte("z"),
  736. Limit: 1,
  737. SortOrder: pb.RangeRequest_ASCEND,
  738. SortTarget: pb.RangeRequest_KEY,
  739. },
  740. {
  741. Key: []byte("a"), RangeEnd: []byte("z"),
  742. Limit: 1,
  743. SortOrder: pb.RangeRequest_DESCEND,
  744. SortTarget: pb.RangeRequest_KEY,
  745. },
  746. {
  747. Key: []byte("a"), RangeEnd: []byte("z"),
  748. Limit: 1,
  749. SortOrder: pb.RangeRequest_ASCEND,
  750. SortTarget: pb.RangeRequest_CREATE,
  751. },
  752. {
  753. Key: []byte("a"), RangeEnd: []byte("z"),
  754. Limit: 1,
  755. SortOrder: pb.RangeRequest_DESCEND,
  756. SortTarget: pb.RangeRequest_MOD,
  757. },
  758. {
  759. Key: []byte("z"), RangeEnd: []byte("z"),
  760. Limit: 1,
  761. SortOrder: pb.RangeRequest_DESCEND,
  762. SortTarget: pb.RangeRequest_CREATE,
  763. },
  764. { // sort ASCEND by default
  765. Key: []byte("a"), RangeEnd: []byte("z"),
  766. Limit: 10,
  767. SortOrder: pb.RangeRequest_NONE,
  768. SortTarget: pb.RangeRequest_CREATE,
  769. },
  770. },
  771. [][]string{
  772. {"a"},
  773. {"d"},
  774. {"b"},
  775. {"c"},
  776. {},
  777. {"b", "a", "c", "d"},
  778. },
  779. []bool{true, true, true, true, false, false},
  780. },
  781. // min/max mod rev
  782. {
  783. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  784. []pb.RangeRequest{
  785. {
  786. Key: []byte{0}, RangeEnd: []byte{0},
  787. MinModRevision: 3,
  788. },
  789. {
  790. Key: []byte{0}, RangeEnd: []byte{0},
  791. MaxModRevision: 3,
  792. },
  793. {
  794. Key: []byte{0}, RangeEnd: []byte{0},
  795. MinModRevision: 3,
  796. MaxModRevision: 5,
  797. },
  798. {
  799. Key: []byte{0}, RangeEnd: []byte{0},
  800. MaxModRevision: 10,
  801. },
  802. },
  803. [][]string{
  804. {"rev3", "rev4", "rev5", "rev6"},
  805. {"rev2", "rev3"},
  806. {"rev3", "rev4", "rev5"},
  807. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  808. },
  809. []bool{false, false, false, false},
  810. },
  811. // min/max create rev
  812. {
  813. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  814. []pb.RangeRequest{
  815. {
  816. Key: []byte{0}, RangeEnd: []byte{0},
  817. MinCreateRevision: 3,
  818. },
  819. {
  820. Key: []byte{0}, RangeEnd: []byte{0},
  821. MaxCreateRevision: 3,
  822. },
  823. {
  824. Key: []byte{0}, RangeEnd: []byte{0},
  825. MinCreateRevision: 3,
  826. MaxCreateRevision: 5,
  827. },
  828. {
  829. Key: []byte{0}, RangeEnd: []byte{0},
  830. MaxCreateRevision: 10,
  831. },
  832. },
  833. [][]string{
  834. {"rev3", "rev6"},
  835. {"rev2", "rev3"},
  836. {"rev3"},
  837. {"rev2", "rev3", "rev6"},
  838. },
  839. []bool{false, false, false, false},
  840. },
  841. }
  842. for i, tt := range tests {
  843. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  844. for _, k := range tt.putKeys {
  845. kvc := toGRPC(clus.RandClient()).KV
  846. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  847. if _, err := kvc.Put(context.TODO(), req); err != nil {
  848. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  849. }
  850. }
  851. for j, req := range tt.reqs {
  852. kvc := toGRPC(clus.RandClient()).KV
  853. resp, err := kvc.Range(context.TODO(), &req)
  854. if err != nil {
  855. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  856. continue
  857. }
  858. if len(resp.Kvs) != len(tt.wresps[j]) {
  859. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  860. continue
  861. }
  862. for k, wKey := range tt.wresps[j] {
  863. respKey := string(resp.Kvs[k].Key)
  864. if respKey != wKey {
  865. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  866. }
  867. }
  868. if resp.More != tt.wmores[j] {
  869. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  870. }
  871. wrev := int64(len(tt.putKeys) + 1)
  872. if resp.Header.Revision != wrev {
  873. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  874. }
  875. }
  876. clus.Terminate(t)
  877. }
  878. }
  879. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  880. cfg.UseGRPC = true
  881. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  882. clus.Launch(t)
  883. return clus
  884. }
  885. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  886. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  887. defer testutil.AfterTest(t)
  888. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  889. clus := newClusterV3NoClients(t, &cfg)
  890. defer clus.Terminate(t)
  891. // nil out TLS field so client will use an insecure connection
  892. clus.Members[0].ClientTLSInfo = nil
  893. client, err := NewClientV3(clus.Members[0])
  894. if err != nil && err != grpc.ErrClientConnTimeout {
  895. t.Fatalf("unexpected error (%v)", err)
  896. } else if client == nil {
  897. // Ideally, no client would be returned. However, grpc will
  898. // return a connection without trying to handshake first so
  899. // the connection appears OK.
  900. return
  901. }
  902. defer client.Close()
  903. donec := make(chan error, 1)
  904. go func() {
  905. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  906. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  907. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  908. cancel()
  909. donec <- perr
  910. }()
  911. if perr := <-donec; perr == nil {
  912. t.Fatalf("expected client error on put")
  913. }
  914. }
  915. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  916. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  917. defer testutil.AfterTest(t)
  918. cfg := ClusterConfig{Size: 3}
  919. clus := newClusterV3NoClients(t, &cfg)
  920. defer clus.Terminate(t)
  921. clus.Members[0].ClientTLSInfo = &testTLSInfo
  922. client, err := NewClientV3(clus.Members[0])
  923. if client != nil || err == nil {
  924. t.Fatalf("expected no client")
  925. } else if err != grpc.ErrClientConnTimeout {
  926. t.Fatalf("unexpected error (%v)", err)
  927. }
  928. }
  929. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  930. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  931. defer testutil.AfterTest(t)
  932. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  933. clus := newClusterV3NoClients(t, &cfg)
  934. defer clus.Terminate(t)
  935. client, err := NewClientV3(clus.Members[0])
  936. if err != nil {
  937. t.Fatalf("expected tls client (%v)", err)
  938. }
  939. defer client.Close()
  940. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  941. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  942. t.Fatalf("unexpected error on put over tls (%v)", err)
  943. }
  944. }
  945. func TestGRPCRequireLeader(t *testing.T) {
  946. defer testutil.AfterTest(t)
  947. cfg := ClusterConfig{Size: 3}
  948. clus := newClusterV3NoClients(t, &cfg)
  949. defer clus.Terminate(t)
  950. clus.Members[1].Stop(t)
  951. clus.Members[2].Stop(t)
  952. client, err := NewClientV3(clus.Members[0])
  953. if err != nil {
  954. t.Fatalf("cannot create client: %v", err)
  955. }
  956. defer client.Close()
  957. // wait for election timeout, then member[0] will not have a leader.
  958. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  959. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  960. ctx := metadata.NewContext(context.Background(), md)
  961. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  962. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  963. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  964. }
  965. }
  966. func TestGRPCStreamRequireLeader(t *testing.T) {
  967. defer testutil.AfterTest(t)
  968. cfg := ClusterConfig{Size: 3}
  969. clus := newClusterV3NoClients(t, &cfg)
  970. defer clus.Terminate(t)
  971. client, err := NewClientV3(clus.Members[0])
  972. if err != nil {
  973. t.Fatalf("failed to create client (%v)", err)
  974. }
  975. defer client.Close()
  976. wAPI := toGRPC(client).Watch
  977. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  978. ctx := metadata.NewContext(context.Background(), md)
  979. wStream, err := wAPI.Watch(ctx)
  980. if err != nil {
  981. t.Fatalf("wAPI.Watch error: %v", err)
  982. }
  983. clus.Members[1].Stop(t)
  984. clus.Members[2].Stop(t)
  985. // existing stream should be rejected
  986. _, err = wStream.Recv()
  987. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  988. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  989. }
  990. // new stream should also be rejected
  991. wStream, err = wAPI.Watch(ctx)
  992. if err != nil {
  993. t.Fatalf("wAPI.Watch error: %v", err)
  994. }
  995. _, err = wStream.Recv()
  996. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  997. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  998. }
  999. clus.Members[1].Restart(t)
  1000. clus.Members[2].Restart(t)
  1001. clus.waitLeader(t, clus.Members)
  1002. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1003. // new stream should also be OK now after we restarted the other members
  1004. wStream, err = wAPI.Watch(ctx)
  1005. if err != nil {
  1006. t.Fatalf("wAPI.Watch error: %v", err)
  1007. }
  1008. wreq := &pb.WatchRequest{
  1009. RequestUnion: &pb.WatchRequest_CreateRequest{
  1010. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1011. },
  1012. }
  1013. err = wStream.Send(wreq)
  1014. if err != nil {
  1015. t.Errorf("err = %v, want nil", err)
  1016. }
  1017. }
  1018. func eqErrGRPC(err1 error, err2 error) bool {
  1019. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1020. }
  1021. // waitForRestart tries a range request until the client's server responds.
  1022. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1023. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1024. // FailFast=false works with Put.
  1025. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1026. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1027. if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1028. t.Fatal(err)
  1029. }
  1030. }