v3_grpc_test.go 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "math/rand"
  20. "os"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/testutil"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "golang.org/x/net/context"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/metadata"
  32. )
  33. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  34. // overwrites it, then checks that the change was applied.
  35. func TestV3PutOverwrite(t *testing.T) {
  36. defer testutil.AfterTest(t)
  37. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  38. defer clus.Terminate(t)
  39. kvc := toGRPC(clus.RandClient()).KV
  40. key := []byte("foo")
  41. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  42. respput, err := kvc.Put(context.TODO(), reqput)
  43. if err != nil {
  44. t.Fatalf("couldn't put key (%v)", err)
  45. }
  46. // overwrite
  47. reqput.Value = []byte("baz")
  48. respput2, err := kvc.Put(context.TODO(), reqput)
  49. if err != nil {
  50. t.Fatalf("couldn't put key (%v)", err)
  51. }
  52. if respput2.Header.Revision <= respput.Header.Revision {
  53. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  54. respput2.Header.Revision, respput.Header.Revision)
  55. }
  56. reqrange := &pb.RangeRequest{Key: key}
  57. resprange, err := kvc.Range(context.TODO(), reqrange)
  58. if err != nil {
  59. t.Fatalf("couldn't get key (%v)", err)
  60. }
  61. if len(resprange.Kvs) != 1 {
  62. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  63. }
  64. kv := resprange.Kvs[0]
  65. if kv.ModRevision <= kv.CreateRevision {
  66. t.Errorf("expected modRev > createRev, got %d <= %d",
  67. kv.ModRevision, kv.CreateRevision)
  68. }
  69. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  70. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  71. }
  72. }
  73. // TestPutRestart checks if a put after an unrelated member restart succeeds
  74. func TestV3PutRestart(t *testing.T) {
  75. defer testutil.AfterTest(t)
  76. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  77. defer clus.Terminate(t)
  78. kvIdx := rand.Intn(3)
  79. kvc := toGRPC(clus.Client(kvIdx)).KV
  80. stopIdx := kvIdx
  81. for stopIdx == kvIdx {
  82. stopIdx = rand.Intn(3)
  83. }
  84. clus.clients[stopIdx].Close()
  85. clus.Members[stopIdx].Stop(t)
  86. clus.Members[stopIdx].Restart(t)
  87. c, cerr := NewClientV3(clus.Members[stopIdx])
  88. if cerr != nil {
  89. t.Fatalf("cannot create client: %v", cerr)
  90. }
  91. clus.clients[stopIdx] = c
  92. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  93. defer cancel()
  94. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  95. _, err := kvc.Put(ctx, reqput)
  96. if err != nil && err == ctx.Err() {
  97. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  98. }
  99. }
  100. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  101. func TestV3CompactCurrentRev(t *testing.T) {
  102. defer testutil.AfterTest(t)
  103. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  104. defer clus.Terminate(t)
  105. kvc := toGRPC(clus.RandClient()).KV
  106. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  107. for i := 0; i < 3; i++ {
  108. if _, err := kvc.Put(context.Background(), preq); err != nil {
  109. t.Fatalf("couldn't put key (%v)", err)
  110. }
  111. }
  112. // get key to add to proxy cache, if any
  113. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  114. t.Fatal(err)
  115. }
  116. // compact on current revision
  117. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  118. if err != nil {
  119. t.Fatalf("couldn't compact kv space (%v)", err)
  120. }
  121. // key still exists when linearized?
  122. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  123. if err != nil {
  124. t.Fatalf("couldn't get key after compaction (%v)", err)
  125. }
  126. // key still exists when serialized?
  127. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  128. if err != nil {
  129. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  130. }
  131. }
  132. func TestV3TxnTooManyOps(t *testing.T) {
  133. defer testutil.AfterTest(t)
  134. maxTxnOps := uint(128)
  135. clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
  136. defer clus.Terminate(t)
  137. kvc := toGRPC(clus.RandClient()).KV
  138. // unique keys
  139. i := new(int)
  140. keyf := func() []byte {
  141. *i++
  142. return []byte(fmt.Sprintf("key-%d", i))
  143. }
  144. addCompareOps := func(txn *pb.TxnRequest) {
  145. txn.Compare = append(txn.Compare,
  146. &pb.Compare{
  147. Result: pb.Compare_GREATER,
  148. Target: pb.Compare_CREATE,
  149. Key: keyf(),
  150. })
  151. }
  152. addSuccessOps := func(txn *pb.TxnRequest) {
  153. txn.Success = append(txn.Success,
  154. &pb.RequestOp{
  155. Request: &pb.RequestOp_RequestPut{
  156. RequestPut: &pb.PutRequest{
  157. Key: keyf(),
  158. Value: []byte("bar"),
  159. },
  160. },
  161. })
  162. }
  163. addFailureOps := func(txn *pb.TxnRequest) {
  164. txn.Failure = append(txn.Failure,
  165. &pb.RequestOp{
  166. Request: &pb.RequestOp_RequestPut{
  167. RequestPut: &pb.PutRequest{
  168. Key: keyf(),
  169. Value: []byte("bar"),
  170. },
  171. },
  172. })
  173. }
  174. tests := []func(txn *pb.TxnRequest){
  175. addCompareOps,
  176. addSuccessOps,
  177. addFailureOps,
  178. }
  179. for i, tt := range tests {
  180. txn := &pb.TxnRequest{}
  181. for j := 0; j < int(maxTxnOps+1); j++ {
  182. tt(txn)
  183. }
  184. _, err := kvc.Txn(context.Background(), txn)
  185. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  186. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  187. }
  188. }
  189. }
  190. func TestV3TxnDuplicateKeys(t *testing.T) {
  191. defer testutil.AfterTest(t)
  192. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  193. defer clus.Terminate(t)
  194. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  195. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  196. RequestDeleteRange: &pb.DeleteRangeRequest{
  197. Key: []byte("abc"),
  198. },
  199. },
  200. }
  201. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  202. RequestDeleteRange: &pb.DeleteRangeRequest{
  203. Key: []byte("a"), RangeEnd: []byte("b"),
  204. },
  205. },
  206. }
  207. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  208. RequestDeleteRange: &pb.DeleteRangeRequest{
  209. Key: []byte("abb"), RangeEnd: []byte("abc"),
  210. },
  211. },
  212. }
  213. kvc := toGRPC(clus.RandClient()).KV
  214. tests := []struct {
  215. txnSuccess []*pb.RequestOp
  216. werr error
  217. }{
  218. {
  219. txnSuccess: []*pb.RequestOp{putreq, putreq},
  220. werr: rpctypes.ErrGRPCDuplicateKey,
  221. },
  222. {
  223. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  224. werr: rpctypes.ErrGRPCDuplicateKey,
  225. },
  226. {
  227. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  228. werr: rpctypes.ErrGRPCDuplicateKey,
  229. },
  230. {
  231. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  232. werr: nil,
  233. },
  234. {
  235. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  236. werr: nil,
  237. },
  238. }
  239. for i, tt := range tests {
  240. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  241. _, err := kvc.Txn(context.Background(), txn)
  242. if !eqErrGRPC(err, tt.werr) {
  243. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  244. }
  245. }
  246. }
  247. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  248. func TestV3TxnRevision(t *testing.T) {
  249. defer testutil.AfterTest(t)
  250. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  251. defer clus.Terminate(t)
  252. kvc := toGRPC(clus.RandClient()).KV
  253. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  254. presp, err := kvc.Put(context.TODO(), pr)
  255. if err != nil {
  256. t.Fatal(err)
  257. }
  258. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  259. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  260. tresp, err := kvc.Txn(context.TODO(), txn)
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. // did not update revision
  265. if presp.Header.Revision != tresp.Header.Revision {
  266. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  267. }
  268. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  269. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  270. tresp, err = kvc.Txn(context.TODO(), txn)
  271. if err != nil {
  272. t.Fatal(err)
  273. }
  274. // did not update revision
  275. if presp.Header.Revision != tresp.Header.Revision {
  276. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  277. }
  278. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  279. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  280. tresp, err = kvc.Txn(context.TODO(), txn)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. // updated revision
  285. if tresp.Header.Revision != presp.Header.Revision+1 {
  286. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  287. }
  288. }
  289. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  290. // when compared to the Succeeded field in the txn response.
  291. func TestV3TxnCmpHeaderRev(t *testing.T) {
  292. defer testutil.AfterTest(t)
  293. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  294. defer clus.Terminate(t)
  295. kvc := toGRPC(clus.RandClient()).KV
  296. for i := 0; i < 10; i++ {
  297. // Concurrently put a key with a txn comparing on it.
  298. revc := make(chan int64, 1)
  299. go func() {
  300. defer close(revc)
  301. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  302. presp, err := kvc.Put(context.TODO(), pr)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. revc <- presp.Header.Revision
  307. }()
  308. // The read-only txn uses the optimized readindex server path.
  309. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  310. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  311. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  312. // i = 0 /\ Succeeded => put followed txn
  313. cmp := &pb.Compare{
  314. Result: pb.Compare_EQUAL,
  315. Target: pb.Compare_VERSION,
  316. Key: []byte("k"),
  317. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  318. }
  319. txn.Compare = append(txn.Compare, cmp)
  320. tresp, err := kvc.Txn(context.TODO(), txn)
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. prev := <-revc
  325. // put followed txn; should eval to false
  326. if prev > tresp.Header.Revision && !tresp.Succeeded {
  327. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  328. }
  329. // txn follows put; should eval to true
  330. if tresp.Header.Revision >= prev && tresp.Succeeded {
  331. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  332. }
  333. }
  334. }
  335. // TestV3TxnRangeCompare tests range comparisons in txns
  336. func TestV3TxnRangeCompare(t *testing.T) {
  337. defer testutil.AfterTest(t)
  338. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  339. defer clus.Terminate(t)
  340. // put keys, named by expected revision
  341. for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
  342. if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
  343. t.Fatal(err)
  344. }
  345. }
  346. tests := []struct {
  347. cmp pb.Compare
  348. wSuccess bool
  349. }{
  350. {
  351. // >= /a/; all create revs fit
  352. pb.Compare{
  353. Key: []byte("/a/"),
  354. RangeEnd: []byte{0},
  355. Target: pb.Compare_CREATE,
  356. Result: pb.Compare_LESS,
  357. TargetUnion: &pb.Compare_CreateRevision{6},
  358. },
  359. true,
  360. },
  361. {
  362. // >= /a/; one create rev doesn't fit
  363. pb.Compare{
  364. Key: []byte("/a/"),
  365. RangeEnd: []byte{0},
  366. Target: pb.Compare_CREATE,
  367. Result: pb.Compare_LESS,
  368. TargetUnion: &pb.Compare_CreateRevision{5},
  369. },
  370. false,
  371. },
  372. {
  373. // prefix /a/*; all create revs fit
  374. pb.Compare{
  375. Key: []byte("/a/"),
  376. RangeEnd: []byte("/a0"),
  377. Target: pb.Compare_CREATE,
  378. Result: pb.Compare_LESS,
  379. TargetUnion: &pb.Compare_CreateRevision{5},
  380. },
  381. true,
  382. },
  383. {
  384. // prefix /a/*; one create rev doesn't fit
  385. pb.Compare{
  386. Key: []byte("/a/"),
  387. RangeEnd: []byte("/a0"),
  388. Target: pb.Compare_CREATE,
  389. Result: pb.Compare_LESS,
  390. TargetUnion: &pb.Compare_CreateRevision{4},
  391. },
  392. false,
  393. },
  394. {
  395. // does not exist, does not succeed
  396. pb.Compare{
  397. Key: []byte("/b/"),
  398. RangeEnd: []byte("/b0"),
  399. Target: pb.Compare_VALUE,
  400. Result: pb.Compare_EQUAL,
  401. TargetUnion: &pb.Compare_Value{[]byte("x")},
  402. },
  403. false,
  404. },
  405. }
  406. kvc := toGRPC(clus.Client(0)).KV
  407. for i, tt := range tests {
  408. txn := &pb.TxnRequest{}
  409. txn.Compare = append(txn.Compare, &tt.cmp)
  410. tresp, err := kvc.Txn(context.TODO(), txn)
  411. if err != nil {
  412. t.Fatal(err)
  413. }
  414. if tt.wSuccess != tresp.Succeeded {
  415. t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
  416. }
  417. }
  418. }
  419. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  420. func TestV3PutIgnoreValue(t *testing.T) {
  421. defer testutil.AfterTest(t)
  422. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  423. defer clus.Terminate(t)
  424. kvc := toGRPC(clus.RandClient()).KV
  425. key, val := []byte("foo"), []byte("bar")
  426. putReq := pb.PutRequest{Key: key, Value: val}
  427. // create lease
  428. lc := toGRPC(clus.RandClient()).Lease
  429. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  430. if err != nil {
  431. t.Fatal(err)
  432. }
  433. if lresp.Error != "" {
  434. t.Fatal(lresp.Error)
  435. }
  436. tests := []struct {
  437. putFunc func() error
  438. putErr error
  439. wleaseID int64
  440. }{
  441. { // put failure for non-existent key
  442. func() error {
  443. preq := putReq
  444. preq.IgnoreValue = true
  445. _, err := kvc.Put(context.TODO(), &preq)
  446. return err
  447. },
  448. rpctypes.ErrGRPCKeyNotFound,
  449. 0,
  450. },
  451. { // txn failure for non-existent key
  452. func() error {
  453. preq := putReq
  454. preq.Value = nil
  455. preq.IgnoreValue = true
  456. txn := &pb.TxnRequest{}
  457. txn.Success = append(txn.Success, &pb.RequestOp{
  458. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  459. _, err := kvc.Txn(context.TODO(), txn)
  460. return err
  461. },
  462. rpctypes.ErrGRPCKeyNotFound,
  463. 0,
  464. },
  465. { // put success
  466. func() error {
  467. _, err := kvc.Put(context.TODO(), &putReq)
  468. return err
  469. },
  470. nil,
  471. 0,
  472. },
  473. { // txn success, attach lease
  474. func() error {
  475. preq := putReq
  476. preq.Value = nil
  477. preq.Lease = lresp.ID
  478. preq.IgnoreValue = true
  479. txn := &pb.TxnRequest{}
  480. txn.Success = append(txn.Success, &pb.RequestOp{
  481. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  482. _, err := kvc.Txn(context.TODO(), txn)
  483. return err
  484. },
  485. nil,
  486. lresp.ID,
  487. },
  488. { // non-empty value with ignore_value should error
  489. func() error {
  490. preq := putReq
  491. preq.IgnoreValue = true
  492. _, err := kvc.Put(context.TODO(), &preq)
  493. return err
  494. },
  495. rpctypes.ErrGRPCValueProvided,
  496. 0,
  497. },
  498. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  499. func() error {
  500. preq := putReq
  501. preq.Value = nil
  502. preq.IgnoreValue = true
  503. presp, err := kvc.Put(context.TODO(), &preq)
  504. if err != nil {
  505. return err
  506. }
  507. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  508. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  509. }
  510. return nil
  511. },
  512. nil,
  513. 0,
  514. },
  515. { // revoke lease, ensure detached key doesn't get deleted
  516. func() error {
  517. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  518. return err
  519. },
  520. nil,
  521. 0,
  522. },
  523. }
  524. for i, tt := range tests {
  525. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  526. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  527. }
  528. if tt.putErr != nil {
  529. continue
  530. }
  531. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  532. if err != nil {
  533. t.Fatalf("#%d: %v", i, err)
  534. }
  535. if len(rr.Kvs) != 1 {
  536. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  537. }
  538. if !bytes.Equal(rr.Kvs[0].Value, val) {
  539. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  540. }
  541. if rr.Kvs[0].Lease != tt.wleaseID {
  542. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  543. }
  544. }
  545. }
  546. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  547. func TestV3PutIgnoreLease(t *testing.T) {
  548. defer testutil.AfterTest(t)
  549. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  550. defer clus.Terminate(t)
  551. kvc := toGRPC(clus.RandClient()).KV
  552. // create lease
  553. lc := toGRPC(clus.RandClient()).Lease
  554. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. if lresp.Error != "" {
  559. t.Fatal(lresp.Error)
  560. }
  561. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  562. putReq := pb.PutRequest{Key: key, Value: val}
  563. tests := []struct {
  564. putFunc func() error
  565. putErr error
  566. wleaseID int64
  567. wvalue []byte
  568. }{
  569. { // put failure for non-existent key
  570. func() error {
  571. preq := putReq
  572. preq.IgnoreLease = true
  573. _, err := kvc.Put(context.TODO(), &preq)
  574. return err
  575. },
  576. rpctypes.ErrGRPCKeyNotFound,
  577. 0,
  578. nil,
  579. },
  580. { // txn failure for non-existent key
  581. func() error {
  582. preq := putReq
  583. preq.IgnoreLease = true
  584. txn := &pb.TxnRequest{}
  585. txn.Success = append(txn.Success, &pb.RequestOp{
  586. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  587. _, err := kvc.Txn(context.TODO(), txn)
  588. return err
  589. },
  590. rpctypes.ErrGRPCKeyNotFound,
  591. 0,
  592. nil,
  593. },
  594. { // put success
  595. func() error {
  596. preq := putReq
  597. preq.Lease = lresp.ID
  598. _, err := kvc.Put(context.TODO(), &preq)
  599. return err
  600. },
  601. nil,
  602. lresp.ID,
  603. val,
  604. },
  605. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  606. func() error {
  607. preq := putReq
  608. preq.Value = val1
  609. preq.IgnoreLease = true
  610. txn := &pb.TxnRequest{}
  611. txn.Success = append(txn.Success, &pb.RequestOp{
  612. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  613. _, err := kvc.Txn(context.TODO(), txn)
  614. return err
  615. },
  616. nil,
  617. lresp.ID,
  618. val1,
  619. },
  620. { // non-empty lease with ignore_lease should error
  621. func() error {
  622. preq := putReq
  623. preq.Lease = lresp.ID
  624. preq.IgnoreLease = true
  625. _, err := kvc.Put(context.TODO(), &preq)
  626. return err
  627. },
  628. rpctypes.ErrGRPCLeaseProvided,
  629. 0,
  630. nil,
  631. },
  632. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  633. func() error {
  634. presp, err := kvc.Put(context.TODO(), &putReq)
  635. if err != nil {
  636. return err
  637. }
  638. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  639. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  640. }
  641. return nil
  642. },
  643. nil,
  644. 0,
  645. val,
  646. },
  647. { // revoke lease, ensure detached key doesn't get deleted
  648. func() error {
  649. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  650. return err
  651. },
  652. nil,
  653. 0,
  654. val,
  655. },
  656. }
  657. for i, tt := range tests {
  658. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  659. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  660. }
  661. if tt.putErr != nil {
  662. continue
  663. }
  664. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  665. if err != nil {
  666. t.Fatalf("#%d: %v", i, err)
  667. }
  668. if len(rr.Kvs) != 1 {
  669. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  670. }
  671. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  672. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  673. }
  674. if rr.Kvs[0].Lease != tt.wleaseID {
  675. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  676. }
  677. }
  678. }
  679. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  680. func TestV3PutMissingLease(t *testing.T) {
  681. defer testutil.AfterTest(t)
  682. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  683. defer clus.Terminate(t)
  684. kvc := toGRPC(clus.RandClient()).KV
  685. key := []byte("foo")
  686. preq := &pb.PutRequest{Key: key, Lease: 123456}
  687. tests := []func(){
  688. // put case
  689. func() {
  690. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  691. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  692. }
  693. },
  694. // txn success case
  695. func() {
  696. txn := &pb.TxnRequest{}
  697. txn.Success = append(txn.Success, &pb.RequestOp{
  698. Request: &pb.RequestOp_RequestPut{
  699. RequestPut: preq}})
  700. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  701. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  702. }
  703. },
  704. // txn failure case
  705. func() {
  706. txn := &pb.TxnRequest{}
  707. txn.Failure = append(txn.Failure, &pb.RequestOp{
  708. Request: &pb.RequestOp_RequestPut{
  709. RequestPut: preq}})
  710. cmp := &pb.Compare{
  711. Result: pb.Compare_GREATER,
  712. Target: pb.Compare_CREATE,
  713. Key: []byte("bar"),
  714. }
  715. txn.Compare = append(txn.Compare, cmp)
  716. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  717. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  718. }
  719. },
  720. // ignore bad lease in failure on success txn
  721. func() {
  722. txn := &pb.TxnRequest{}
  723. rreq := &pb.RangeRequest{Key: []byte("bar")}
  724. txn.Success = append(txn.Success, &pb.RequestOp{
  725. Request: &pb.RequestOp_RequestRange{
  726. RequestRange: rreq}})
  727. txn.Failure = append(txn.Failure, &pb.RequestOp{
  728. Request: &pb.RequestOp_RequestPut{
  729. RequestPut: preq}})
  730. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  731. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  732. }
  733. },
  734. }
  735. for i, f := range tests {
  736. f()
  737. // key shouldn't have been stored
  738. rreq := &pb.RangeRequest{Key: key}
  739. rresp, err := kvc.Range(context.TODO(), rreq)
  740. if err != nil {
  741. t.Errorf("#%d. could not rangereq (%v)", i, err)
  742. } else if len(rresp.Kvs) != 0 {
  743. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  744. }
  745. }
  746. }
  747. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  748. func TestV3DeleteRange(t *testing.T) {
  749. defer testutil.AfterTest(t)
  750. tests := []struct {
  751. keySet []string
  752. begin string
  753. end string
  754. prevKV bool
  755. wantSet [][]byte
  756. deleted int64
  757. }{
  758. // delete middle
  759. {
  760. []string{"foo", "foo/abc", "fop"},
  761. "foo/", "fop", false,
  762. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  763. },
  764. // no delete
  765. {
  766. []string{"foo", "foo/abc", "fop"},
  767. "foo/", "foo/", false,
  768. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  769. },
  770. // delete first
  771. {
  772. []string{"foo", "foo/abc", "fop"},
  773. "fo", "fop", false,
  774. [][]byte{[]byte("fop")}, 2,
  775. },
  776. // delete tail
  777. {
  778. []string{"foo", "foo/abc", "fop"},
  779. "foo/", "fos", false,
  780. [][]byte{[]byte("foo")}, 2,
  781. },
  782. // delete exact
  783. {
  784. []string{"foo", "foo/abc", "fop"},
  785. "foo/abc", "", false,
  786. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  787. },
  788. // delete none, [x,x)
  789. {
  790. []string{"foo"},
  791. "foo", "foo", false,
  792. [][]byte{[]byte("foo")}, 0,
  793. },
  794. // delete middle with preserveKVs set
  795. {
  796. []string{"foo", "foo/abc", "fop"},
  797. "foo/", "fop", true,
  798. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  799. },
  800. }
  801. for i, tt := range tests {
  802. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  803. kvc := toGRPC(clus.RandClient()).KV
  804. ks := tt.keySet
  805. for j := range ks {
  806. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  807. _, err := kvc.Put(context.TODO(), reqput)
  808. if err != nil {
  809. t.Fatalf("couldn't put key (%v)", err)
  810. }
  811. }
  812. dreq := &pb.DeleteRangeRequest{
  813. Key: []byte(tt.begin),
  814. RangeEnd: []byte(tt.end),
  815. PrevKv: tt.prevKV,
  816. }
  817. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  818. if err != nil {
  819. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  820. }
  821. if tt.deleted != dresp.Deleted {
  822. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  823. }
  824. if tt.prevKV {
  825. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  826. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  827. }
  828. }
  829. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  830. rresp, err := kvc.Range(context.TODO(), rreq)
  831. if err != nil {
  832. t.Errorf("couldn't get range on test %v (%v)", i, err)
  833. }
  834. if dresp.Header.Revision != rresp.Header.Revision {
  835. t.Errorf("expected revision %v, got %v",
  836. dresp.Header.Revision, rresp.Header.Revision)
  837. }
  838. keys := [][]byte{}
  839. for j := range rresp.Kvs {
  840. keys = append(keys, rresp.Kvs[j].Key)
  841. }
  842. if !reflect.DeepEqual(tt.wantSet, keys) {
  843. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  844. }
  845. // can't defer because tcp ports will be in use
  846. clus.Terminate(t)
  847. }
  848. }
  849. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  850. func TestV3TxnInvalidRange(t *testing.T) {
  851. defer testutil.AfterTest(t)
  852. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  853. defer clus.Terminate(t)
  854. kvc := toGRPC(clus.RandClient()).KV
  855. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  856. for i := 0; i < 3; i++ {
  857. _, err := kvc.Put(context.Background(), preq)
  858. if err != nil {
  859. t.Fatalf("couldn't put key (%v)", err)
  860. }
  861. }
  862. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  863. if err != nil {
  864. t.Fatalf("couldn't compact kv space (%v)", err)
  865. }
  866. // future rev
  867. txn := &pb.TxnRequest{}
  868. txn.Success = append(txn.Success, &pb.RequestOp{
  869. Request: &pb.RequestOp_RequestPut{
  870. RequestPut: preq}})
  871. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  872. txn.Success = append(txn.Success, &pb.RequestOp{
  873. Request: &pb.RequestOp_RequestRange{
  874. RequestRange: rreq}})
  875. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  876. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  877. }
  878. // compacted rev
  879. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  880. tv.RequestRange.Revision = 1
  881. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  882. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  883. }
  884. }
  885. func TestV3TooLargeRequest(t *testing.T) {
  886. defer testutil.AfterTest(t)
  887. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  888. defer clus.Terminate(t)
  889. kvc := toGRPC(clus.RandClient()).KV
  890. // 2MB request value
  891. largeV := make([]byte, 2*1024*1024)
  892. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  893. _, err := kvc.Put(context.Background(), preq)
  894. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  895. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  896. }
  897. }
  898. // TestV3Hash tests hash.
  899. func TestV3Hash(t *testing.T) {
  900. defer testutil.AfterTest(t)
  901. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  902. defer clus.Terminate(t)
  903. cli := clus.RandClient()
  904. kvc := toGRPC(cli).KV
  905. m := toGRPC(cli).Maintenance
  906. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  907. for i := 0; i < 3; i++ {
  908. _, err := kvc.Put(context.Background(), preq)
  909. if err != nil {
  910. t.Fatalf("couldn't put key (%v)", err)
  911. }
  912. }
  913. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  914. if err != nil || resp.Hash == 0 {
  915. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  916. }
  917. }
  918. // TestV3HashRestart ensures that hash stays the same after restart.
  919. func TestV3HashRestart(t *testing.T) {
  920. defer testutil.AfterTest(t)
  921. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  922. defer clus.Terminate(t)
  923. cli := clus.RandClient()
  924. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  925. if err != nil || resp.Hash == 0 {
  926. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  927. }
  928. hash1 := resp.Hash
  929. clus.Members[0].Stop(t)
  930. clus.Members[0].Restart(t)
  931. clus.waitLeader(t, clus.Members)
  932. kvc := toGRPC(clus.Client(0)).KV
  933. waitForRestart(t, kvc)
  934. cli = clus.RandClient()
  935. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  936. if err != nil || resp.Hash == 0 {
  937. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  938. }
  939. hash2 := resp.Hash
  940. if hash1 != hash2 {
  941. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  942. }
  943. }
  944. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  945. func TestV3StorageQuotaAPI(t *testing.T) {
  946. defer testutil.AfterTest(t)
  947. quotasize := int64(16 * os.Getpagesize())
  948. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  949. // Set a quota on one node
  950. clus.Members[0].QuotaBackendBytes = quotasize
  951. clus.Members[0].Stop(t)
  952. clus.Members[0].Restart(t)
  953. defer clus.Terminate(t)
  954. kvc := toGRPC(clus.Client(0)).KV
  955. waitForRestart(t, kvc)
  956. key := []byte("abc")
  957. // test small put that fits in quota
  958. smallbuf := make([]byte, 512)
  959. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  960. t.Fatal(err)
  961. }
  962. // test big put
  963. bigbuf := make([]byte, quotasize)
  964. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  965. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  966. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  967. }
  968. // test big txn
  969. puttxn := &pb.RequestOp{
  970. Request: &pb.RequestOp_RequestPut{
  971. RequestPut: &pb.PutRequest{
  972. Key: key,
  973. Value: bigbuf,
  974. },
  975. },
  976. }
  977. txnreq := &pb.TxnRequest{}
  978. txnreq.Success = append(txnreq.Success, puttxn)
  979. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  980. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  981. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  982. }
  983. }
  984. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  985. func TestV3StorageQuotaApply(t *testing.T) {
  986. testutil.AfterTest(t)
  987. quotasize := int64(16 * os.Getpagesize())
  988. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  989. defer clus.Terminate(t)
  990. kvc0 := toGRPC(clus.Client(0)).KV
  991. kvc1 := toGRPC(clus.Client(1)).KV
  992. // Set a quota on one node
  993. clus.Members[0].QuotaBackendBytes = quotasize
  994. clus.Members[0].Stop(t)
  995. clus.Members[0].Restart(t)
  996. clus.waitLeader(t, clus.Members)
  997. waitForRestart(t, kvc0)
  998. key := []byte("abc")
  999. // test small put still works
  1000. smallbuf := make([]byte, 1024)
  1001. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1002. if serr != nil {
  1003. t.Fatal(serr)
  1004. }
  1005. // test big put
  1006. bigbuf := make([]byte, quotasize)
  1007. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1008. if err != nil {
  1009. t.Fatal(err)
  1010. }
  1011. // quorum get should work regardless of whether alarm is raised
  1012. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  1013. if err != nil {
  1014. t.Fatal(err)
  1015. }
  1016. // wait until alarm is raised for sure-- poll the alarms
  1017. stopc := time.After(5 * time.Second)
  1018. for {
  1019. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  1020. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  1021. if aerr != nil {
  1022. t.Fatal(aerr)
  1023. }
  1024. if len(resp.Alarms) != 0 {
  1025. break
  1026. }
  1027. select {
  1028. case <-stopc:
  1029. t.Fatalf("timed out waiting for alarm")
  1030. case <-time.After(10 * time.Millisecond):
  1031. }
  1032. }
  1033. // small quota machine should reject put
  1034. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1035. t.Fatalf("past-quota instance should reject put")
  1036. }
  1037. // large quota machine should reject put
  1038. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1039. t.Fatalf("past-quota instance should reject put")
  1040. }
  1041. // reset large quota node to ensure alarm persisted
  1042. clus.Members[1].Stop(t)
  1043. clus.Members[1].Restart(t)
  1044. clus.waitLeader(t, clus.Members)
  1045. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1046. t.Fatalf("alarmed instance should reject put after reset")
  1047. }
  1048. }
  1049. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  1050. func TestV3AlarmDeactivate(t *testing.T) {
  1051. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1052. defer clus.Terminate(t)
  1053. kvc := toGRPC(clus.RandClient()).KV
  1054. mt := toGRPC(clus.RandClient()).Maintenance
  1055. alarmReq := &pb.AlarmRequest{
  1056. MemberID: 123,
  1057. Action: pb.AlarmRequest_ACTIVATE,
  1058. Alarm: pb.AlarmType_NOSPACE,
  1059. }
  1060. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  1061. t.Fatal(err)
  1062. }
  1063. key := []byte("abc")
  1064. smallbuf := make([]byte, 512)
  1065. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1066. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1067. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1068. }
  1069. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  1070. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  1071. t.Fatal(err)
  1072. }
  1073. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1074. t.Fatal(err)
  1075. }
  1076. }
  1077. func TestV3RangeRequest(t *testing.T) {
  1078. defer testutil.AfterTest(t)
  1079. tests := []struct {
  1080. putKeys []string
  1081. reqs []pb.RangeRequest
  1082. wresps [][]string
  1083. wmores []bool
  1084. }{
  1085. // single key
  1086. {
  1087. []string{"foo", "bar"},
  1088. []pb.RangeRequest{
  1089. // exists
  1090. {Key: []byte("foo")},
  1091. // doesn't exist
  1092. {Key: []byte("baz")},
  1093. },
  1094. [][]string{
  1095. {"foo"},
  1096. {},
  1097. },
  1098. []bool{false, false},
  1099. },
  1100. // multi-key
  1101. {
  1102. []string{"a", "b", "c", "d", "e"},
  1103. []pb.RangeRequest{
  1104. // all in range
  1105. {Key: []byte("a"), RangeEnd: []byte("z")},
  1106. // [b, d)
  1107. {Key: []byte("b"), RangeEnd: []byte("d")},
  1108. // out of range
  1109. {Key: []byte("f"), RangeEnd: []byte("z")},
  1110. // [c,c) = empty
  1111. {Key: []byte("c"), RangeEnd: []byte("c")},
  1112. // [d, b) = empty
  1113. {Key: []byte("d"), RangeEnd: []byte("b")},
  1114. // ["\0", "\0") => all in range
  1115. {Key: []byte{0}, RangeEnd: []byte{0}},
  1116. },
  1117. [][]string{
  1118. {"a", "b", "c", "d", "e"},
  1119. {"b", "c"},
  1120. {},
  1121. {},
  1122. {},
  1123. {"a", "b", "c", "d", "e"},
  1124. },
  1125. []bool{false, false, false, false, false, false},
  1126. },
  1127. // revision
  1128. {
  1129. []string{"a", "b", "c", "d", "e"},
  1130. []pb.RangeRequest{
  1131. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1132. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1133. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1134. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1135. },
  1136. [][]string{
  1137. {"a", "b", "c", "d", "e"},
  1138. {},
  1139. {"a"},
  1140. {"a", "b"},
  1141. },
  1142. []bool{false, false, false, false},
  1143. },
  1144. // limit
  1145. {
  1146. []string{"foo", "bar"},
  1147. []pb.RangeRequest{
  1148. // more
  1149. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1150. // no more
  1151. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1152. },
  1153. [][]string{
  1154. {"bar"},
  1155. {"bar", "foo"},
  1156. },
  1157. []bool{true, false},
  1158. },
  1159. // sort
  1160. {
  1161. []string{"b", "a", "c", "d", "c"},
  1162. []pb.RangeRequest{
  1163. {
  1164. Key: []byte("a"), RangeEnd: []byte("z"),
  1165. Limit: 1,
  1166. SortOrder: pb.RangeRequest_ASCEND,
  1167. SortTarget: pb.RangeRequest_KEY,
  1168. },
  1169. {
  1170. Key: []byte("a"), RangeEnd: []byte("z"),
  1171. Limit: 1,
  1172. SortOrder: pb.RangeRequest_DESCEND,
  1173. SortTarget: pb.RangeRequest_KEY,
  1174. },
  1175. {
  1176. Key: []byte("a"), RangeEnd: []byte("z"),
  1177. Limit: 1,
  1178. SortOrder: pb.RangeRequest_ASCEND,
  1179. SortTarget: pb.RangeRequest_CREATE,
  1180. },
  1181. {
  1182. Key: []byte("a"), RangeEnd: []byte("z"),
  1183. Limit: 1,
  1184. SortOrder: pb.RangeRequest_DESCEND,
  1185. SortTarget: pb.RangeRequest_MOD,
  1186. },
  1187. {
  1188. Key: []byte("z"), RangeEnd: []byte("z"),
  1189. Limit: 1,
  1190. SortOrder: pb.RangeRequest_DESCEND,
  1191. SortTarget: pb.RangeRequest_CREATE,
  1192. },
  1193. { // sort ASCEND by default
  1194. Key: []byte("a"), RangeEnd: []byte("z"),
  1195. Limit: 10,
  1196. SortOrder: pb.RangeRequest_NONE,
  1197. SortTarget: pb.RangeRequest_CREATE,
  1198. },
  1199. },
  1200. [][]string{
  1201. {"a"},
  1202. {"d"},
  1203. {"b"},
  1204. {"c"},
  1205. {},
  1206. {"b", "a", "c", "d"},
  1207. },
  1208. []bool{true, true, true, true, false, false},
  1209. },
  1210. // min/max mod rev
  1211. {
  1212. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1213. []pb.RangeRequest{
  1214. {
  1215. Key: []byte{0}, RangeEnd: []byte{0},
  1216. MinModRevision: 3,
  1217. },
  1218. {
  1219. Key: []byte{0}, RangeEnd: []byte{0},
  1220. MaxModRevision: 3,
  1221. },
  1222. {
  1223. Key: []byte{0}, RangeEnd: []byte{0},
  1224. MinModRevision: 3,
  1225. MaxModRevision: 5,
  1226. },
  1227. {
  1228. Key: []byte{0}, RangeEnd: []byte{0},
  1229. MaxModRevision: 10,
  1230. },
  1231. },
  1232. [][]string{
  1233. {"rev3", "rev4", "rev5", "rev6"},
  1234. {"rev2", "rev3"},
  1235. {"rev3", "rev4", "rev5"},
  1236. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1237. },
  1238. []bool{false, false, false, false},
  1239. },
  1240. // min/max create rev
  1241. {
  1242. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1243. []pb.RangeRequest{
  1244. {
  1245. Key: []byte{0}, RangeEnd: []byte{0},
  1246. MinCreateRevision: 3,
  1247. },
  1248. {
  1249. Key: []byte{0}, RangeEnd: []byte{0},
  1250. MaxCreateRevision: 3,
  1251. },
  1252. {
  1253. Key: []byte{0}, RangeEnd: []byte{0},
  1254. MinCreateRevision: 3,
  1255. MaxCreateRevision: 5,
  1256. },
  1257. {
  1258. Key: []byte{0}, RangeEnd: []byte{0},
  1259. MaxCreateRevision: 10,
  1260. },
  1261. },
  1262. [][]string{
  1263. {"rev3", "rev6"},
  1264. {"rev2", "rev3"},
  1265. {"rev3"},
  1266. {"rev2", "rev3", "rev6"},
  1267. },
  1268. []bool{false, false, false, false},
  1269. },
  1270. }
  1271. for i, tt := range tests {
  1272. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1273. for _, k := range tt.putKeys {
  1274. kvc := toGRPC(clus.RandClient()).KV
  1275. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1276. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1277. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1278. }
  1279. }
  1280. for j, req := range tt.reqs {
  1281. kvc := toGRPC(clus.RandClient()).KV
  1282. resp, err := kvc.Range(context.TODO(), &req)
  1283. if err != nil {
  1284. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1285. continue
  1286. }
  1287. if len(resp.Kvs) != len(tt.wresps[j]) {
  1288. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1289. continue
  1290. }
  1291. for k, wKey := range tt.wresps[j] {
  1292. respKey := string(resp.Kvs[k].Key)
  1293. if respKey != wKey {
  1294. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1295. }
  1296. }
  1297. if resp.More != tt.wmores[j] {
  1298. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1299. }
  1300. wrev := int64(len(tt.putKeys) + 1)
  1301. if resp.Header.Revision != wrev {
  1302. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1303. }
  1304. }
  1305. clus.Terminate(t)
  1306. }
  1307. }
  1308. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1309. cfg.UseGRPC = true
  1310. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1311. clus.Launch(t)
  1312. return clus
  1313. }
  1314. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1315. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1316. defer testutil.AfterTest(t)
  1317. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1318. clus := newClusterV3NoClients(t, &cfg)
  1319. defer clus.Terminate(t)
  1320. // nil out TLS field so client will use an insecure connection
  1321. clus.Members[0].ClientTLSInfo = nil
  1322. client, err := NewClientV3(clus.Members[0])
  1323. if err != nil && err != grpc.ErrClientConnTimeout {
  1324. t.Fatalf("unexpected error (%v)", err)
  1325. } else if client == nil {
  1326. // Ideally, no client would be returned. However, grpc will
  1327. // return a connection without trying to handshake first so
  1328. // the connection appears OK.
  1329. return
  1330. }
  1331. defer client.Close()
  1332. donec := make(chan error, 1)
  1333. go func() {
  1334. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1335. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1336. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1337. cancel()
  1338. donec <- perr
  1339. }()
  1340. if perr := <-donec; perr == nil {
  1341. t.Fatalf("expected client error on put")
  1342. }
  1343. }
  1344. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1345. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1346. defer testutil.AfterTest(t)
  1347. cfg := ClusterConfig{Size: 3}
  1348. clus := newClusterV3NoClients(t, &cfg)
  1349. defer clus.Terminate(t)
  1350. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1351. client, err := NewClientV3(clus.Members[0])
  1352. if client != nil || err == nil {
  1353. t.Fatalf("expected no client")
  1354. } else if err != grpc.ErrClientConnTimeout {
  1355. t.Fatalf("unexpected error (%v)", err)
  1356. }
  1357. }
  1358. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1359. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1360. defer testutil.AfterTest(t)
  1361. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1362. clus := newClusterV3NoClients(t, &cfg)
  1363. defer clus.Terminate(t)
  1364. client, err := NewClientV3(clus.Members[0])
  1365. if err != nil {
  1366. t.Fatalf("expected tls client (%v)", err)
  1367. }
  1368. defer client.Close()
  1369. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1370. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1371. t.Fatalf("unexpected error on put over tls (%v)", err)
  1372. }
  1373. }
  1374. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1375. // when all certs are atomically replaced by directory renaming.
  1376. // And expects server to reject client requests, and vice versa.
  1377. func TestTLSReloadAtomicReplace(t *testing.T) {
  1378. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1379. if err != nil {
  1380. t.Fatal(err)
  1381. }
  1382. os.RemoveAll(tmpDir)
  1383. defer os.RemoveAll(tmpDir)
  1384. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1385. if err != nil {
  1386. t.Fatal(err)
  1387. }
  1388. defer os.RemoveAll(certsDir)
  1389. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1390. if err != nil {
  1391. t.Fatal(err)
  1392. }
  1393. defer os.RemoveAll(certsDirExp)
  1394. cloneFunc := func() transport.TLSInfo {
  1395. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1396. if terr != nil {
  1397. t.Fatal(terr)
  1398. }
  1399. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1400. t.Fatal(err)
  1401. }
  1402. return tlsInfo
  1403. }
  1404. replaceFunc := func() {
  1405. if err = os.Rename(certsDir, tmpDir); err != nil {
  1406. t.Fatal(err)
  1407. }
  1408. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1409. t.Fatal(err)
  1410. }
  1411. // after rename,
  1412. // 'certsDir' contains expired certs
  1413. // 'tmpDir' contains valid certs
  1414. // 'certsDirExp' does not exist
  1415. }
  1416. revertFunc := func() {
  1417. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1418. t.Fatal(err)
  1419. }
  1420. if err = os.Rename(certsDir, tmpDir); err != nil {
  1421. t.Fatal(err)
  1422. }
  1423. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1424. t.Fatal(err)
  1425. }
  1426. }
  1427. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1428. }
  1429. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1430. // when new certs are copied over, one by one. And expects server
  1431. // to reject client requests, and vice versa.
  1432. func TestTLSReloadCopy(t *testing.T) {
  1433. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1434. if err != nil {
  1435. t.Fatal(err)
  1436. }
  1437. defer os.RemoveAll(certsDir)
  1438. cloneFunc := func() transport.TLSInfo {
  1439. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1440. if terr != nil {
  1441. t.Fatal(terr)
  1442. }
  1443. return tlsInfo
  1444. }
  1445. replaceFunc := func() {
  1446. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1447. t.Fatal(err)
  1448. }
  1449. }
  1450. revertFunc := func() {
  1451. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1452. t.Fatal(err)
  1453. }
  1454. }
  1455. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1456. }
  1457. func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
  1458. defer testutil.AfterTest(t)
  1459. // 1. separate copies for TLS assets modification
  1460. tlsInfo := cloneFunc()
  1461. // 2. start cluster with valid certs
  1462. clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
  1463. defer clus.Terminate(t)
  1464. // 3. concurrent client dialing while certs become expired
  1465. errc := make(chan error, 1)
  1466. go func() {
  1467. for {
  1468. cc, err := tlsInfo.ClientConfig()
  1469. if err != nil {
  1470. // errors in 'go/src/crypto/tls/tls.go'
  1471. // tls: private key does not match public key
  1472. // tls: failed to find any PEM data in key input
  1473. // tls: failed to find any PEM data in certificate input
  1474. // Or 'does not exist', 'not found', etc
  1475. t.Log(err)
  1476. continue
  1477. }
  1478. cli, cerr := clientv3.New(clientv3.Config{
  1479. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1480. DialTimeout: time.Second,
  1481. TLS: cc,
  1482. })
  1483. if cerr != nil {
  1484. errc <- cerr
  1485. return
  1486. }
  1487. cli.Close()
  1488. }
  1489. }()
  1490. // 4. replace certs with expired ones
  1491. replaceFunc()
  1492. // 5. expect dial time-out when loading expired certs
  1493. select {
  1494. case gerr := <-errc:
  1495. if gerr != grpc.ErrClientConnTimeout {
  1496. t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr)
  1497. }
  1498. case <-time.After(5 * time.Second):
  1499. t.Fatal("failed to receive dial timeout error")
  1500. }
  1501. // 6. replace expired certs back with valid ones
  1502. revertFunc()
  1503. // 7. new requests should trigger listener to reload valid certs
  1504. tls, terr := tlsInfo.ClientConfig()
  1505. if terr != nil {
  1506. t.Fatal(terr)
  1507. }
  1508. cl, cerr := clientv3.New(clientv3.Config{
  1509. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1510. DialTimeout: time.Second,
  1511. TLS: tls,
  1512. })
  1513. if cerr != nil {
  1514. t.Fatalf("expected no error, got %v", cerr)
  1515. }
  1516. cl.Close()
  1517. }
  1518. func TestGRPCRequireLeader(t *testing.T) {
  1519. defer testutil.AfterTest(t)
  1520. cfg := ClusterConfig{Size: 3}
  1521. clus := newClusterV3NoClients(t, &cfg)
  1522. defer clus.Terminate(t)
  1523. clus.Members[1].Stop(t)
  1524. clus.Members[2].Stop(t)
  1525. client, err := NewClientV3(clus.Members[0])
  1526. if err != nil {
  1527. t.Fatalf("cannot create client: %v", err)
  1528. }
  1529. defer client.Close()
  1530. // wait for election timeout, then member[0] will not have a leader.
  1531. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1532. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1533. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1534. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1535. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1536. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1537. }
  1538. }
  1539. func TestGRPCStreamRequireLeader(t *testing.T) {
  1540. defer testutil.AfterTest(t)
  1541. cfg := ClusterConfig{Size: 3}
  1542. clus := newClusterV3NoClients(t, &cfg)
  1543. defer clus.Terminate(t)
  1544. client, err := NewClientV3(clus.Members[0])
  1545. if err != nil {
  1546. t.Fatalf("failed to create client (%v)", err)
  1547. }
  1548. defer client.Close()
  1549. wAPI := toGRPC(client).Watch
  1550. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1551. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1552. wStream, err := wAPI.Watch(ctx)
  1553. if err != nil {
  1554. t.Fatalf("wAPI.Watch error: %v", err)
  1555. }
  1556. clus.Members[1].Stop(t)
  1557. clus.Members[2].Stop(t)
  1558. // existing stream should be rejected
  1559. _, err = wStream.Recv()
  1560. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1561. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1562. }
  1563. // new stream should also be rejected
  1564. wStream, err = wAPI.Watch(ctx)
  1565. if err != nil {
  1566. t.Fatalf("wAPI.Watch error: %v", err)
  1567. }
  1568. _, err = wStream.Recv()
  1569. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1570. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1571. }
  1572. clus.Members[1].Restart(t)
  1573. clus.Members[2].Restart(t)
  1574. clus.waitLeader(t, clus.Members)
  1575. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1576. // new stream should also be OK now after we restarted the other members
  1577. wStream, err = wAPI.Watch(ctx)
  1578. if err != nil {
  1579. t.Fatalf("wAPI.Watch error: %v", err)
  1580. }
  1581. wreq := &pb.WatchRequest{
  1582. RequestUnion: &pb.WatchRequest_CreateRequest{
  1583. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1584. },
  1585. }
  1586. err = wStream.Send(wreq)
  1587. if err != nil {
  1588. t.Errorf("err = %v, want nil", err)
  1589. }
  1590. }
  1591. // TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
  1592. func TestV3PutLargeRequests(t *testing.T) {
  1593. defer testutil.AfterTest(t)
  1594. tests := []struct {
  1595. key string
  1596. maxRequestBytes uint
  1597. valueSize int
  1598. expectError error
  1599. }{
  1600. // don't set to 0. use 0 as the default.
  1601. {"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1602. {"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1603. {"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1604. {"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1605. }
  1606. for i, test := range tests {
  1607. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1608. kvcli := toGRPC(clus.Client(0)).KV
  1609. reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
  1610. _, err := kvcli.Put(context.TODO(), reqput)
  1611. if !eqErrGRPC(err, test.expectError) {
  1612. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1613. }
  1614. clus.Terminate(t)
  1615. }
  1616. }
  1617. func eqErrGRPC(err1 error, err2 error) bool {
  1618. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1619. }
  1620. // waitForRestart tries a range request until the client's server responds.
  1621. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1622. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1623. // FailFast=false works with Put.
  1624. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1625. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1626. if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1627. t.Fatal(err)
  1628. }
  1629. }