v3_grpc_test.go 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "os"
  22. "reflect"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/clientv3"
  26. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  27. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  28. "github.com/coreos/etcd/pkg/testutil"
  29. "github.com/coreos/etcd/pkg/transport"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/metadata"
  33. "google.golang.org/grpc/status"
  34. )
  35. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  36. // overwrites it, then checks that the change was applied.
  37. func TestV3PutOverwrite(t *testing.T) {
  38. defer testutil.AfterTest(t)
  39. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  40. defer clus.Terminate(t)
  41. kvc := toGRPC(clus.RandClient()).KV
  42. key := []byte("foo")
  43. reqput := &pb.PutRequest{Key: key, Value: []byte("bar"), PrevKv: true}
  44. respput, err := kvc.Put(context.TODO(), reqput)
  45. if err != nil {
  46. t.Fatalf("couldn't put key (%v)", err)
  47. }
  48. // overwrite
  49. reqput.Value = []byte("baz")
  50. respput2, err := kvc.Put(context.TODO(), reqput)
  51. if err != nil {
  52. t.Fatalf("couldn't put key (%v)", err)
  53. }
  54. if respput2.Header.Revision <= respput.Header.Revision {
  55. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  56. respput2.Header.Revision, respput.Header.Revision)
  57. }
  58. if pkv := respput2.PrevKv; pkv == nil || string(pkv.Value) != "bar" {
  59. t.Fatalf("expected PrevKv=bar, got response %+v", respput2)
  60. }
  61. reqrange := &pb.RangeRequest{Key: key}
  62. resprange, err := kvc.Range(context.TODO(), reqrange)
  63. if err != nil {
  64. t.Fatalf("couldn't get key (%v)", err)
  65. }
  66. if len(resprange.Kvs) != 1 {
  67. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  68. }
  69. kv := resprange.Kvs[0]
  70. if kv.ModRevision <= kv.CreateRevision {
  71. t.Errorf("expected modRev > createRev, got %d <= %d",
  72. kv.ModRevision, kv.CreateRevision)
  73. }
  74. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  75. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  76. }
  77. }
  78. // TestPutRestart checks if a put after an unrelated member restart succeeds
  79. func TestV3PutRestart(t *testing.T) {
  80. defer testutil.AfterTest(t)
  81. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  82. defer clus.Terminate(t)
  83. kvIdx := rand.Intn(3)
  84. kvc := toGRPC(clus.Client(kvIdx)).KV
  85. stopIdx := kvIdx
  86. for stopIdx == kvIdx {
  87. stopIdx = rand.Intn(3)
  88. }
  89. clus.clients[stopIdx].Close()
  90. clus.Members[stopIdx].Stop(t)
  91. clus.Members[stopIdx].Restart(t)
  92. c, cerr := NewClientV3(clus.Members[stopIdx])
  93. if cerr != nil {
  94. t.Fatalf("cannot create client: %v", cerr)
  95. }
  96. clus.clients[stopIdx] = c
  97. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  98. defer cancel()
  99. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  100. _, err := kvc.Put(ctx, reqput)
  101. if err != nil && err == ctx.Err() {
  102. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  103. }
  104. }
  105. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  106. func TestV3CompactCurrentRev(t *testing.T) {
  107. defer testutil.AfterTest(t)
  108. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  109. defer clus.Terminate(t)
  110. kvc := toGRPC(clus.RandClient()).KV
  111. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  112. for i := 0; i < 3; i++ {
  113. if _, err := kvc.Put(context.Background(), preq); err != nil {
  114. t.Fatalf("couldn't put key (%v)", err)
  115. }
  116. }
  117. // get key to add to proxy cache, if any
  118. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  119. t.Fatal(err)
  120. }
  121. // compact on current revision
  122. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  123. if err != nil {
  124. t.Fatalf("couldn't compact kv space (%v)", err)
  125. }
  126. // key still exists when linearized?
  127. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  128. if err != nil {
  129. t.Fatalf("couldn't get key after compaction (%v)", err)
  130. }
  131. // key still exists when serialized?
  132. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  133. if err != nil {
  134. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  135. }
  136. }
  137. // TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
  138. func TestV3HashKV(t *testing.T) {
  139. defer testutil.AfterTest(t)
  140. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  141. defer clus.Terminate(t)
  142. kvc := toGRPC(clus.RandClient()).KV
  143. mvc := toGRPC(clus.RandClient()).Maintenance
  144. for i := 0; i < 10; i++ {
  145. resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. rev := resp.Header.Revision
  150. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{})
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. if rev != hresp.Header.Revision {
  155. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  156. }
  157. prevHash := hresp.Hash
  158. prevCompactRev := hresp.CompactRevision
  159. for i := 0; i < 10; i++ {
  160. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{})
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. if rev != hresp.Header.Revision {
  165. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  166. }
  167. if prevHash != hresp.Hash {
  168. t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
  169. }
  170. if prevCompactRev != hresp.CompactRevision {
  171. t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
  172. }
  173. prevHash = hresp.Hash
  174. prevCompactRev = hresp.CompactRevision
  175. }
  176. }
  177. }
  178. func TestV3TxnTooManyOps(t *testing.T) {
  179. defer testutil.AfterTest(t)
  180. maxTxnOps := uint(128)
  181. clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
  182. defer clus.Terminate(t)
  183. kvc := toGRPC(clus.RandClient()).KV
  184. // unique keys
  185. i := new(int)
  186. keyf := func() []byte {
  187. *i++
  188. return []byte(fmt.Sprintf("key-%d", i))
  189. }
  190. addCompareOps := func(txn *pb.TxnRequest) {
  191. txn.Compare = append(txn.Compare,
  192. &pb.Compare{
  193. Result: pb.Compare_GREATER,
  194. Target: pb.Compare_CREATE,
  195. Key: keyf(),
  196. })
  197. }
  198. addSuccessOps := func(txn *pb.TxnRequest) {
  199. txn.Success = append(txn.Success,
  200. &pb.RequestOp{
  201. Request: &pb.RequestOp_RequestPut{
  202. RequestPut: &pb.PutRequest{
  203. Key: keyf(),
  204. Value: []byte("bar"),
  205. },
  206. },
  207. })
  208. }
  209. addFailureOps := func(txn *pb.TxnRequest) {
  210. txn.Failure = append(txn.Failure,
  211. &pb.RequestOp{
  212. Request: &pb.RequestOp_RequestPut{
  213. RequestPut: &pb.PutRequest{
  214. Key: keyf(),
  215. Value: []byte("bar"),
  216. },
  217. },
  218. })
  219. }
  220. addTxnOps := func(txn *pb.TxnRequest) {
  221. newTxn := &pb.TxnRequest{}
  222. addSuccessOps(newTxn)
  223. txn.Success = append(txn.Success,
  224. &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  225. RequestTxn: newTxn,
  226. },
  227. },
  228. )
  229. }
  230. tests := []func(txn *pb.TxnRequest){
  231. addCompareOps,
  232. addSuccessOps,
  233. addFailureOps,
  234. addTxnOps,
  235. }
  236. for i, tt := range tests {
  237. txn := &pb.TxnRequest{}
  238. for j := 0; j < int(maxTxnOps+1); j++ {
  239. tt(txn)
  240. }
  241. _, err := kvc.Txn(context.Background(), txn)
  242. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  243. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  244. }
  245. }
  246. }
  247. func TestV3TxnDuplicateKeys(t *testing.T) {
  248. defer testutil.AfterTest(t)
  249. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  250. defer clus.Terminate(t)
  251. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  252. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  253. RequestDeleteRange: &pb.DeleteRangeRequest{
  254. Key: []byte("abc"),
  255. },
  256. },
  257. }
  258. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  259. RequestDeleteRange: &pb.DeleteRangeRequest{
  260. Key: []byte("a"), RangeEnd: []byte("b"),
  261. },
  262. },
  263. }
  264. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  265. RequestDeleteRange: &pb.DeleteRangeRequest{
  266. Key: []byte("abb"), RangeEnd: []byte("abc"),
  267. },
  268. },
  269. }
  270. txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  271. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
  272. },
  273. }
  274. txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  275. RequestTxn: &pb.TxnRequest{
  276. Success: []*pb.RequestOp{delInRangeReq},
  277. Failure: []*pb.RequestOp{delInRangeReq}},
  278. },
  279. }
  280. txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  281. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
  282. },
  283. }
  284. txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  285. RequestTxn: &pb.TxnRequest{
  286. Success: []*pb.RequestOp{putreq},
  287. Failure: []*pb.RequestOp{putreq}},
  288. },
  289. }
  290. kvc := toGRPC(clus.RandClient()).KV
  291. tests := []struct {
  292. txnSuccess []*pb.RequestOp
  293. werr error
  294. }{
  295. {
  296. txnSuccess: []*pb.RequestOp{putreq, putreq},
  297. werr: rpctypes.ErrGRPCDuplicateKey,
  298. },
  299. {
  300. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  301. werr: rpctypes.ErrGRPCDuplicateKey,
  302. },
  303. {
  304. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  305. werr: rpctypes.ErrGRPCDuplicateKey,
  306. },
  307. // Then(Put(a), Then(Del(a)))
  308. {
  309. txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
  310. werr: rpctypes.ErrGRPCDuplicateKey,
  311. },
  312. // Then(Del(a), Then(Put(a)))
  313. {
  314. txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
  315. werr: rpctypes.ErrGRPCDuplicateKey,
  316. },
  317. // Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
  318. {
  319. txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
  320. werr: rpctypes.ErrGRPCDuplicateKey,
  321. },
  322. // Then(Del(x), (Then(Put(a)), Else(Put(a))))
  323. {
  324. txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
  325. werr: nil,
  326. },
  327. // Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
  328. {
  329. txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
  330. werr: nil,
  331. },
  332. {
  333. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  334. werr: nil,
  335. },
  336. {
  337. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  338. werr: nil,
  339. },
  340. }
  341. for i, tt := range tests {
  342. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  343. _, err := kvc.Txn(context.Background(), txn)
  344. if !eqErrGRPC(err, tt.werr) {
  345. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  346. }
  347. }
  348. }
  349. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  350. func TestV3TxnRevision(t *testing.T) {
  351. defer testutil.AfterTest(t)
  352. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  353. defer clus.Terminate(t)
  354. kvc := toGRPC(clus.RandClient()).KV
  355. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  356. presp, err := kvc.Put(context.TODO(), pr)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  361. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  362. tresp, err := kvc.Txn(context.TODO(), txn)
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. // did not update revision
  367. if presp.Header.Revision != tresp.Header.Revision {
  368. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  369. }
  370. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  371. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  372. tresp, err = kvc.Txn(context.TODO(), txn)
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. // did not update revision
  377. if presp.Header.Revision != tresp.Header.Revision {
  378. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  379. }
  380. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  381. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  382. tresp, err = kvc.Txn(context.TODO(), txn)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. // updated revision
  387. if tresp.Header.Revision != presp.Header.Revision+1 {
  388. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  389. }
  390. }
  391. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  392. // when compared to the Succeeded field in the txn response.
  393. func TestV3TxnCmpHeaderRev(t *testing.T) {
  394. defer testutil.AfterTest(t)
  395. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  396. defer clus.Terminate(t)
  397. kvc := toGRPC(clus.RandClient()).KV
  398. for i := 0; i < 10; i++ {
  399. // Concurrently put a key with a txn comparing on it.
  400. revc := make(chan int64, 1)
  401. go func() {
  402. defer close(revc)
  403. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  404. presp, err := kvc.Put(context.TODO(), pr)
  405. if err != nil {
  406. t.Fatal(err)
  407. }
  408. revc <- presp.Header.Revision
  409. }()
  410. // The read-only txn uses the optimized readindex server path.
  411. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  412. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  413. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  414. // i = 0 /\ Succeeded => put followed txn
  415. cmp := &pb.Compare{
  416. Result: pb.Compare_EQUAL,
  417. Target: pb.Compare_VERSION,
  418. Key: []byte("k"),
  419. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  420. }
  421. txn.Compare = append(txn.Compare, cmp)
  422. tresp, err := kvc.Txn(context.TODO(), txn)
  423. if err != nil {
  424. t.Fatal(err)
  425. }
  426. prev := <-revc
  427. // put followed txn; should eval to false
  428. if prev > tresp.Header.Revision && !tresp.Succeeded {
  429. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  430. }
  431. // txn follows put; should eval to true
  432. if tresp.Header.Revision >= prev && tresp.Succeeded {
  433. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  434. }
  435. }
  436. }
  437. // TestV3TxnRangeCompare tests range comparisons in txns
  438. func TestV3TxnRangeCompare(t *testing.T) {
  439. defer testutil.AfterTest(t)
  440. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  441. defer clus.Terminate(t)
  442. // put keys, named by expected revision
  443. for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
  444. if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
  445. t.Fatal(err)
  446. }
  447. }
  448. tests := []struct {
  449. cmp pb.Compare
  450. wSuccess bool
  451. }{
  452. {
  453. // >= /a/; all create revs fit
  454. pb.Compare{
  455. Key: []byte("/a/"),
  456. RangeEnd: []byte{0},
  457. Target: pb.Compare_CREATE,
  458. Result: pb.Compare_LESS,
  459. TargetUnion: &pb.Compare_CreateRevision{6},
  460. },
  461. true,
  462. },
  463. {
  464. // >= /a/; one create rev doesn't fit
  465. pb.Compare{
  466. Key: []byte("/a/"),
  467. RangeEnd: []byte{0},
  468. Target: pb.Compare_CREATE,
  469. Result: pb.Compare_LESS,
  470. TargetUnion: &pb.Compare_CreateRevision{5},
  471. },
  472. false,
  473. },
  474. {
  475. // prefix /a/*; all create revs fit
  476. pb.Compare{
  477. Key: []byte("/a/"),
  478. RangeEnd: []byte("/a0"),
  479. Target: pb.Compare_CREATE,
  480. Result: pb.Compare_LESS,
  481. TargetUnion: &pb.Compare_CreateRevision{5},
  482. },
  483. true,
  484. },
  485. {
  486. // prefix /a/*; one create rev doesn't fit
  487. pb.Compare{
  488. Key: []byte("/a/"),
  489. RangeEnd: []byte("/a0"),
  490. Target: pb.Compare_CREATE,
  491. Result: pb.Compare_LESS,
  492. TargetUnion: &pb.Compare_CreateRevision{4},
  493. },
  494. false,
  495. },
  496. {
  497. // does not exist, does not succeed
  498. pb.Compare{
  499. Key: []byte("/b/"),
  500. RangeEnd: []byte("/b0"),
  501. Target: pb.Compare_VALUE,
  502. Result: pb.Compare_EQUAL,
  503. TargetUnion: &pb.Compare_Value{[]byte("x")},
  504. },
  505. false,
  506. },
  507. {
  508. // all keys are leased
  509. pb.Compare{
  510. Key: []byte("/a/"),
  511. RangeEnd: []byte("/a0"),
  512. Target: pb.Compare_LEASE,
  513. Result: pb.Compare_GREATER,
  514. TargetUnion: &pb.Compare_Lease{0},
  515. },
  516. false,
  517. },
  518. {
  519. // no keys are leased
  520. pb.Compare{
  521. Key: []byte("/a/"),
  522. RangeEnd: []byte("/a0"),
  523. Target: pb.Compare_LEASE,
  524. Result: pb.Compare_EQUAL,
  525. TargetUnion: &pb.Compare_Lease{0},
  526. },
  527. true,
  528. },
  529. }
  530. kvc := toGRPC(clus.Client(0)).KV
  531. for i, tt := range tests {
  532. txn := &pb.TxnRequest{}
  533. txn.Compare = append(txn.Compare, &tt.cmp)
  534. tresp, err := kvc.Txn(context.TODO(), txn)
  535. if err != nil {
  536. t.Fatal(err)
  537. }
  538. if tt.wSuccess != tresp.Succeeded {
  539. t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
  540. }
  541. }
  542. }
  543. // TestV3TxnNested tests nested txns follow paths as expected.
  544. func TestV3TxnNestedPath(t *testing.T) {
  545. defer testutil.AfterTest(t)
  546. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  547. defer clus.Terminate(t)
  548. kvc := toGRPC(clus.RandClient()).KV
  549. cmpTrue := &pb.Compare{
  550. Result: pb.Compare_EQUAL,
  551. Target: pb.Compare_VERSION,
  552. Key: []byte("k"),
  553. TargetUnion: &pb.Compare_Version{Version: int64(0)},
  554. }
  555. cmpFalse := &pb.Compare{
  556. Result: pb.Compare_EQUAL,
  557. Target: pb.Compare_VERSION,
  558. Key: []byte("k"),
  559. TargetUnion: &pb.Compare_Version{Version: int64(1)},
  560. }
  561. // generate random path to eval txns
  562. topTxn := &pb.TxnRequest{}
  563. txn := topTxn
  564. txnPath := make([]bool, 10)
  565. for i := range txnPath {
  566. nextTxn := &pb.TxnRequest{}
  567. op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
  568. txnPath[i] = rand.Intn(2) == 0
  569. if txnPath[i] {
  570. txn.Compare = append(txn.Compare, cmpTrue)
  571. txn.Success = append(txn.Success, op)
  572. } else {
  573. txn.Compare = append(txn.Compare, cmpFalse)
  574. txn.Failure = append(txn.Failure, op)
  575. }
  576. txn = nextTxn
  577. }
  578. tresp, err := kvc.Txn(context.TODO(), topTxn)
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. curTxnResp := tresp
  583. for i := range txnPath {
  584. if curTxnResp.Succeeded != txnPath[i] {
  585. t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
  586. }
  587. curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
  588. }
  589. }
  590. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  591. func TestV3PutIgnoreValue(t *testing.T) {
  592. defer testutil.AfterTest(t)
  593. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  594. defer clus.Terminate(t)
  595. kvc := toGRPC(clus.RandClient()).KV
  596. key, val := []byte("foo"), []byte("bar")
  597. putReq := pb.PutRequest{Key: key, Value: val}
  598. // create lease
  599. lc := toGRPC(clus.RandClient()).Lease
  600. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. if lresp.Error != "" {
  605. t.Fatal(lresp.Error)
  606. }
  607. tests := []struct {
  608. putFunc func() error
  609. putErr error
  610. wleaseID int64
  611. }{
  612. { // put failure for non-existent key
  613. func() error {
  614. preq := putReq
  615. preq.IgnoreValue = true
  616. _, err := kvc.Put(context.TODO(), &preq)
  617. return err
  618. },
  619. rpctypes.ErrGRPCKeyNotFound,
  620. 0,
  621. },
  622. { // txn failure for non-existent key
  623. func() error {
  624. preq := putReq
  625. preq.Value = nil
  626. preq.IgnoreValue = true
  627. txn := &pb.TxnRequest{}
  628. txn.Success = append(txn.Success, &pb.RequestOp{
  629. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  630. _, err := kvc.Txn(context.TODO(), txn)
  631. return err
  632. },
  633. rpctypes.ErrGRPCKeyNotFound,
  634. 0,
  635. },
  636. { // put success
  637. func() error {
  638. _, err := kvc.Put(context.TODO(), &putReq)
  639. return err
  640. },
  641. nil,
  642. 0,
  643. },
  644. { // txn success, attach lease
  645. func() error {
  646. preq := putReq
  647. preq.Value = nil
  648. preq.Lease = lresp.ID
  649. preq.IgnoreValue = true
  650. txn := &pb.TxnRequest{}
  651. txn.Success = append(txn.Success, &pb.RequestOp{
  652. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  653. _, err := kvc.Txn(context.TODO(), txn)
  654. return err
  655. },
  656. nil,
  657. lresp.ID,
  658. },
  659. { // non-empty value with ignore_value should error
  660. func() error {
  661. preq := putReq
  662. preq.IgnoreValue = true
  663. _, err := kvc.Put(context.TODO(), &preq)
  664. return err
  665. },
  666. rpctypes.ErrGRPCValueProvided,
  667. 0,
  668. },
  669. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  670. func() error {
  671. preq := putReq
  672. preq.Value = nil
  673. preq.IgnoreValue = true
  674. presp, err := kvc.Put(context.TODO(), &preq)
  675. if err != nil {
  676. return err
  677. }
  678. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  679. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  680. }
  681. return nil
  682. },
  683. nil,
  684. 0,
  685. },
  686. { // revoke lease, ensure detached key doesn't get deleted
  687. func() error {
  688. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  689. return err
  690. },
  691. nil,
  692. 0,
  693. },
  694. }
  695. for i, tt := range tests {
  696. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  697. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  698. }
  699. if tt.putErr != nil {
  700. continue
  701. }
  702. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  703. if err != nil {
  704. t.Fatalf("#%d: %v", i, err)
  705. }
  706. if len(rr.Kvs) != 1 {
  707. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  708. }
  709. if !bytes.Equal(rr.Kvs[0].Value, val) {
  710. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  711. }
  712. if rr.Kvs[0].Lease != tt.wleaseID {
  713. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  714. }
  715. }
  716. }
  717. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  718. func TestV3PutIgnoreLease(t *testing.T) {
  719. defer testutil.AfterTest(t)
  720. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  721. defer clus.Terminate(t)
  722. kvc := toGRPC(clus.RandClient()).KV
  723. // create lease
  724. lc := toGRPC(clus.RandClient()).Lease
  725. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  726. if err != nil {
  727. t.Fatal(err)
  728. }
  729. if lresp.Error != "" {
  730. t.Fatal(lresp.Error)
  731. }
  732. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  733. putReq := pb.PutRequest{Key: key, Value: val}
  734. tests := []struct {
  735. putFunc func() error
  736. putErr error
  737. wleaseID int64
  738. wvalue []byte
  739. }{
  740. { // put failure for non-existent key
  741. func() error {
  742. preq := putReq
  743. preq.IgnoreLease = true
  744. _, err := kvc.Put(context.TODO(), &preq)
  745. return err
  746. },
  747. rpctypes.ErrGRPCKeyNotFound,
  748. 0,
  749. nil,
  750. },
  751. { // txn failure for non-existent key
  752. func() error {
  753. preq := putReq
  754. preq.IgnoreLease = true
  755. txn := &pb.TxnRequest{}
  756. txn.Success = append(txn.Success, &pb.RequestOp{
  757. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  758. _, err := kvc.Txn(context.TODO(), txn)
  759. return err
  760. },
  761. rpctypes.ErrGRPCKeyNotFound,
  762. 0,
  763. nil,
  764. },
  765. { // put success
  766. func() error {
  767. preq := putReq
  768. preq.Lease = lresp.ID
  769. _, err := kvc.Put(context.TODO(), &preq)
  770. return err
  771. },
  772. nil,
  773. lresp.ID,
  774. val,
  775. },
  776. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  777. func() error {
  778. preq := putReq
  779. preq.Value = val1
  780. preq.IgnoreLease = true
  781. txn := &pb.TxnRequest{}
  782. txn.Success = append(txn.Success, &pb.RequestOp{
  783. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  784. _, err := kvc.Txn(context.TODO(), txn)
  785. return err
  786. },
  787. nil,
  788. lresp.ID,
  789. val1,
  790. },
  791. { // non-empty lease with ignore_lease should error
  792. func() error {
  793. preq := putReq
  794. preq.Lease = lresp.ID
  795. preq.IgnoreLease = true
  796. _, err := kvc.Put(context.TODO(), &preq)
  797. return err
  798. },
  799. rpctypes.ErrGRPCLeaseProvided,
  800. 0,
  801. nil,
  802. },
  803. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  804. func() error {
  805. presp, err := kvc.Put(context.TODO(), &putReq)
  806. if err != nil {
  807. return err
  808. }
  809. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  810. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  811. }
  812. return nil
  813. },
  814. nil,
  815. 0,
  816. val,
  817. },
  818. { // revoke lease, ensure detached key doesn't get deleted
  819. func() error {
  820. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  821. return err
  822. },
  823. nil,
  824. 0,
  825. val,
  826. },
  827. }
  828. for i, tt := range tests {
  829. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  830. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  831. }
  832. if tt.putErr != nil {
  833. continue
  834. }
  835. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  836. if err != nil {
  837. t.Fatalf("#%d: %v", i, err)
  838. }
  839. if len(rr.Kvs) != 1 {
  840. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  841. }
  842. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  843. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  844. }
  845. if rr.Kvs[0].Lease != tt.wleaseID {
  846. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  847. }
  848. }
  849. }
  850. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  851. func TestV3PutMissingLease(t *testing.T) {
  852. defer testutil.AfterTest(t)
  853. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  854. defer clus.Terminate(t)
  855. kvc := toGRPC(clus.RandClient()).KV
  856. key := []byte("foo")
  857. preq := &pb.PutRequest{Key: key, Lease: 123456}
  858. tests := []func(){
  859. // put case
  860. func() {
  861. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  862. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  863. }
  864. },
  865. // txn success case
  866. func() {
  867. txn := &pb.TxnRequest{}
  868. txn.Success = append(txn.Success, &pb.RequestOp{
  869. Request: &pb.RequestOp_RequestPut{
  870. RequestPut: preq}})
  871. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  872. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  873. }
  874. },
  875. // txn failure case
  876. func() {
  877. txn := &pb.TxnRequest{}
  878. txn.Failure = append(txn.Failure, &pb.RequestOp{
  879. Request: &pb.RequestOp_RequestPut{
  880. RequestPut: preq}})
  881. cmp := &pb.Compare{
  882. Result: pb.Compare_GREATER,
  883. Target: pb.Compare_CREATE,
  884. Key: []byte("bar"),
  885. }
  886. txn.Compare = append(txn.Compare, cmp)
  887. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  888. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  889. }
  890. },
  891. // ignore bad lease in failure on success txn
  892. func() {
  893. txn := &pb.TxnRequest{}
  894. rreq := &pb.RangeRequest{Key: []byte("bar")}
  895. txn.Success = append(txn.Success, &pb.RequestOp{
  896. Request: &pb.RequestOp_RequestRange{
  897. RequestRange: rreq}})
  898. txn.Failure = append(txn.Failure, &pb.RequestOp{
  899. Request: &pb.RequestOp_RequestPut{
  900. RequestPut: preq}})
  901. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  902. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  903. }
  904. },
  905. }
  906. for i, f := range tests {
  907. f()
  908. // key shouldn't have been stored
  909. rreq := &pb.RangeRequest{Key: key}
  910. rresp, err := kvc.Range(context.TODO(), rreq)
  911. if err != nil {
  912. t.Errorf("#%d. could not rangereq (%v)", i, err)
  913. } else if len(rresp.Kvs) != 0 {
  914. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  915. }
  916. }
  917. }
  918. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  919. func TestV3DeleteRange(t *testing.T) {
  920. defer testutil.AfterTest(t)
  921. tests := []struct {
  922. keySet []string
  923. begin string
  924. end string
  925. prevKV bool
  926. wantSet [][]byte
  927. deleted int64
  928. }{
  929. // delete middle
  930. {
  931. []string{"foo", "foo/abc", "fop"},
  932. "foo/", "fop", false,
  933. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  934. },
  935. // no delete
  936. {
  937. []string{"foo", "foo/abc", "fop"},
  938. "foo/", "foo/", false,
  939. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  940. },
  941. // delete first
  942. {
  943. []string{"foo", "foo/abc", "fop"},
  944. "fo", "fop", false,
  945. [][]byte{[]byte("fop")}, 2,
  946. },
  947. // delete tail
  948. {
  949. []string{"foo", "foo/abc", "fop"},
  950. "foo/", "fos", false,
  951. [][]byte{[]byte("foo")}, 2,
  952. },
  953. // delete exact
  954. {
  955. []string{"foo", "foo/abc", "fop"},
  956. "foo/abc", "", false,
  957. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  958. },
  959. // delete none, [x,x)
  960. {
  961. []string{"foo"},
  962. "foo", "foo", false,
  963. [][]byte{[]byte("foo")}, 0,
  964. },
  965. // delete middle with preserveKVs set
  966. {
  967. []string{"foo", "foo/abc", "fop"},
  968. "foo/", "fop", true,
  969. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  970. },
  971. }
  972. for i, tt := range tests {
  973. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  974. kvc := toGRPC(clus.RandClient()).KV
  975. ks := tt.keySet
  976. for j := range ks {
  977. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  978. _, err := kvc.Put(context.TODO(), reqput)
  979. if err != nil {
  980. t.Fatalf("couldn't put key (%v)", err)
  981. }
  982. }
  983. dreq := &pb.DeleteRangeRequest{
  984. Key: []byte(tt.begin),
  985. RangeEnd: []byte(tt.end),
  986. PrevKv: tt.prevKV,
  987. }
  988. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  989. if err != nil {
  990. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  991. }
  992. if tt.deleted != dresp.Deleted {
  993. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  994. }
  995. if tt.prevKV {
  996. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  997. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  998. }
  999. }
  1000. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  1001. rresp, err := kvc.Range(context.TODO(), rreq)
  1002. if err != nil {
  1003. t.Errorf("couldn't get range on test %v (%v)", i, err)
  1004. }
  1005. if dresp.Header.Revision != rresp.Header.Revision {
  1006. t.Errorf("expected revision %v, got %v",
  1007. dresp.Header.Revision, rresp.Header.Revision)
  1008. }
  1009. keys := [][]byte{}
  1010. for j := range rresp.Kvs {
  1011. keys = append(keys, rresp.Kvs[j].Key)
  1012. }
  1013. if !reflect.DeepEqual(tt.wantSet, keys) {
  1014. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  1015. }
  1016. // can't defer because tcp ports will be in use
  1017. clus.Terminate(t)
  1018. }
  1019. }
  1020. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  1021. func TestV3TxnInvalidRange(t *testing.T) {
  1022. defer testutil.AfterTest(t)
  1023. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1024. defer clus.Terminate(t)
  1025. kvc := toGRPC(clus.RandClient()).KV
  1026. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1027. for i := 0; i < 3; i++ {
  1028. _, err := kvc.Put(context.Background(), preq)
  1029. if err != nil {
  1030. t.Fatalf("couldn't put key (%v)", err)
  1031. }
  1032. }
  1033. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  1034. if err != nil {
  1035. t.Fatalf("couldn't compact kv space (%v)", err)
  1036. }
  1037. // future rev
  1038. txn := &pb.TxnRequest{}
  1039. txn.Success = append(txn.Success, &pb.RequestOp{
  1040. Request: &pb.RequestOp_RequestPut{
  1041. RequestPut: preq}})
  1042. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  1043. txn.Success = append(txn.Success, &pb.RequestOp{
  1044. Request: &pb.RequestOp_RequestRange{
  1045. RequestRange: rreq}})
  1046. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  1047. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  1048. }
  1049. // compacted rev
  1050. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  1051. tv.RequestRange.Revision = 1
  1052. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  1053. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  1054. }
  1055. }
  1056. func TestV3TooLargeRequest(t *testing.T) {
  1057. defer testutil.AfterTest(t)
  1058. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1059. defer clus.Terminate(t)
  1060. kvc := toGRPC(clus.RandClient()).KV
  1061. // 2MB request value
  1062. largeV := make([]byte, 2*1024*1024)
  1063. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  1064. _, err := kvc.Put(context.Background(), preq)
  1065. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  1066. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  1067. }
  1068. }
  1069. // TestV3Hash tests hash.
  1070. func TestV3Hash(t *testing.T) {
  1071. defer testutil.AfterTest(t)
  1072. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1073. defer clus.Terminate(t)
  1074. cli := clus.RandClient()
  1075. kvc := toGRPC(cli).KV
  1076. m := toGRPC(cli).Maintenance
  1077. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1078. for i := 0; i < 3; i++ {
  1079. _, err := kvc.Put(context.Background(), preq)
  1080. if err != nil {
  1081. t.Fatalf("couldn't put key (%v)", err)
  1082. }
  1083. }
  1084. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  1085. if err != nil || resp.Hash == 0 {
  1086. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1087. }
  1088. }
  1089. // TestV3HashRestart ensures that hash stays the same after restart.
  1090. func TestV3HashRestart(t *testing.T) {
  1091. defer testutil.AfterTest(t)
  1092. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  1093. defer clus.Terminate(t)
  1094. cli := clus.RandClient()
  1095. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1096. if err != nil || resp.Hash == 0 {
  1097. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1098. }
  1099. hash1 := resp.Hash
  1100. clus.Members[0].Stop(t)
  1101. clus.Members[0].Restart(t)
  1102. clus.waitLeader(t, clus.Members)
  1103. kvc := toGRPC(clus.Client(0)).KV
  1104. waitForRestart(t, kvc)
  1105. cli = clus.RandClient()
  1106. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1107. if err != nil || resp.Hash == 0 {
  1108. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1109. }
  1110. hash2 := resp.Hash
  1111. if hash1 != hash2 {
  1112. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  1113. }
  1114. }
  1115. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  1116. func TestV3StorageQuotaAPI(t *testing.T) {
  1117. defer testutil.AfterTest(t)
  1118. quotasize := int64(16 * os.Getpagesize())
  1119. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1120. // Set a quota on one node
  1121. clus.Members[0].QuotaBackendBytes = quotasize
  1122. clus.Members[0].Stop(t)
  1123. clus.Members[0].Restart(t)
  1124. defer clus.Terminate(t)
  1125. kvc := toGRPC(clus.Client(0)).KV
  1126. waitForRestart(t, kvc)
  1127. key := []byte("abc")
  1128. // test small put that fits in quota
  1129. smallbuf := make([]byte, 512)
  1130. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1131. t.Fatal(err)
  1132. }
  1133. // test big put
  1134. bigbuf := make([]byte, quotasize)
  1135. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1136. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1137. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1138. }
  1139. // test big txn
  1140. puttxn := &pb.RequestOp{
  1141. Request: &pb.RequestOp_RequestPut{
  1142. RequestPut: &pb.PutRequest{
  1143. Key: key,
  1144. Value: bigbuf,
  1145. },
  1146. },
  1147. }
  1148. txnreq := &pb.TxnRequest{}
  1149. txnreq.Success = append(txnreq.Success, puttxn)
  1150. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  1151. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  1152. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1153. }
  1154. }
  1155. func TestV3RangeRequest(t *testing.T) {
  1156. defer testutil.AfterTest(t)
  1157. tests := []struct {
  1158. putKeys []string
  1159. reqs []pb.RangeRequest
  1160. wresps [][]string
  1161. wmores []bool
  1162. }{
  1163. // single key
  1164. {
  1165. []string{"foo", "bar"},
  1166. []pb.RangeRequest{
  1167. // exists
  1168. {Key: []byte("foo")},
  1169. // doesn't exist
  1170. {Key: []byte("baz")},
  1171. },
  1172. [][]string{
  1173. {"foo"},
  1174. {},
  1175. },
  1176. []bool{false, false},
  1177. },
  1178. // multi-key
  1179. {
  1180. []string{"a", "b", "c", "d", "e"},
  1181. []pb.RangeRequest{
  1182. // all in range
  1183. {Key: []byte("a"), RangeEnd: []byte("z")},
  1184. // [b, d)
  1185. {Key: []byte("b"), RangeEnd: []byte("d")},
  1186. // out of range
  1187. {Key: []byte("f"), RangeEnd: []byte("z")},
  1188. // [c,c) = empty
  1189. {Key: []byte("c"), RangeEnd: []byte("c")},
  1190. // [d, b) = empty
  1191. {Key: []byte("d"), RangeEnd: []byte("b")},
  1192. // ["\0", "\0") => all in range
  1193. {Key: []byte{0}, RangeEnd: []byte{0}},
  1194. },
  1195. [][]string{
  1196. {"a", "b", "c", "d", "e"},
  1197. {"b", "c"},
  1198. {},
  1199. {},
  1200. {},
  1201. {"a", "b", "c", "d", "e"},
  1202. },
  1203. []bool{false, false, false, false, false, false},
  1204. },
  1205. // revision
  1206. {
  1207. []string{"a", "b", "c", "d", "e"},
  1208. []pb.RangeRequest{
  1209. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1210. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1211. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1212. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1213. },
  1214. [][]string{
  1215. {"a", "b", "c", "d", "e"},
  1216. {},
  1217. {"a"},
  1218. {"a", "b"},
  1219. },
  1220. []bool{false, false, false, false},
  1221. },
  1222. // limit
  1223. {
  1224. []string{"foo", "bar"},
  1225. []pb.RangeRequest{
  1226. // more
  1227. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1228. // no more
  1229. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1230. },
  1231. [][]string{
  1232. {"bar"},
  1233. {"bar", "foo"},
  1234. },
  1235. []bool{true, false},
  1236. },
  1237. // sort
  1238. {
  1239. []string{"b", "a", "c", "d", "c"},
  1240. []pb.RangeRequest{
  1241. {
  1242. Key: []byte("a"), RangeEnd: []byte("z"),
  1243. Limit: 1,
  1244. SortOrder: pb.RangeRequest_ASCEND,
  1245. SortTarget: pb.RangeRequest_KEY,
  1246. },
  1247. {
  1248. Key: []byte("a"), RangeEnd: []byte("z"),
  1249. Limit: 1,
  1250. SortOrder: pb.RangeRequest_DESCEND,
  1251. SortTarget: pb.RangeRequest_KEY,
  1252. },
  1253. {
  1254. Key: []byte("a"), RangeEnd: []byte("z"),
  1255. Limit: 1,
  1256. SortOrder: pb.RangeRequest_ASCEND,
  1257. SortTarget: pb.RangeRequest_CREATE,
  1258. },
  1259. {
  1260. Key: []byte("a"), RangeEnd: []byte("z"),
  1261. Limit: 1,
  1262. SortOrder: pb.RangeRequest_DESCEND,
  1263. SortTarget: pb.RangeRequest_MOD,
  1264. },
  1265. {
  1266. Key: []byte("z"), RangeEnd: []byte("z"),
  1267. Limit: 1,
  1268. SortOrder: pb.RangeRequest_DESCEND,
  1269. SortTarget: pb.RangeRequest_CREATE,
  1270. },
  1271. { // sort ASCEND by default
  1272. Key: []byte("a"), RangeEnd: []byte("z"),
  1273. Limit: 10,
  1274. SortOrder: pb.RangeRequest_NONE,
  1275. SortTarget: pb.RangeRequest_CREATE,
  1276. },
  1277. },
  1278. [][]string{
  1279. {"a"},
  1280. {"d"},
  1281. {"b"},
  1282. {"c"},
  1283. {},
  1284. {"b", "a", "c", "d"},
  1285. },
  1286. []bool{true, true, true, true, false, false},
  1287. },
  1288. // min/max mod rev
  1289. {
  1290. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1291. []pb.RangeRequest{
  1292. {
  1293. Key: []byte{0}, RangeEnd: []byte{0},
  1294. MinModRevision: 3,
  1295. },
  1296. {
  1297. Key: []byte{0}, RangeEnd: []byte{0},
  1298. MaxModRevision: 3,
  1299. },
  1300. {
  1301. Key: []byte{0}, RangeEnd: []byte{0},
  1302. MinModRevision: 3,
  1303. MaxModRevision: 5,
  1304. },
  1305. {
  1306. Key: []byte{0}, RangeEnd: []byte{0},
  1307. MaxModRevision: 10,
  1308. },
  1309. },
  1310. [][]string{
  1311. {"rev3", "rev4", "rev5", "rev6"},
  1312. {"rev2", "rev3"},
  1313. {"rev3", "rev4", "rev5"},
  1314. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1315. },
  1316. []bool{false, false, false, false},
  1317. },
  1318. // min/max create rev
  1319. {
  1320. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1321. []pb.RangeRequest{
  1322. {
  1323. Key: []byte{0}, RangeEnd: []byte{0},
  1324. MinCreateRevision: 3,
  1325. },
  1326. {
  1327. Key: []byte{0}, RangeEnd: []byte{0},
  1328. MaxCreateRevision: 3,
  1329. },
  1330. {
  1331. Key: []byte{0}, RangeEnd: []byte{0},
  1332. MinCreateRevision: 3,
  1333. MaxCreateRevision: 5,
  1334. },
  1335. {
  1336. Key: []byte{0}, RangeEnd: []byte{0},
  1337. MaxCreateRevision: 10,
  1338. },
  1339. },
  1340. [][]string{
  1341. {"rev3", "rev6"},
  1342. {"rev2", "rev3"},
  1343. {"rev3"},
  1344. {"rev2", "rev3", "rev6"},
  1345. },
  1346. []bool{false, false, false, false},
  1347. },
  1348. }
  1349. for i, tt := range tests {
  1350. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1351. for _, k := range tt.putKeys {
  1352. kvc := toGRPC(clus.RandClient()).KV
  1353. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1354. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1355. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1356. }
  1357. }
  1358. for j, req := range tt.reqs {
  1359. kvc := toGRPC(clus.RandClient()).KV
  1360. resp, err := kvc.Range(context.TODO(), &req)
  1361. if err != nil {
  1362. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1363. continue
  1364. }
  1365. if len(resp.Kvs) != len(tt.wresps[j]) {
  1366. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1367. continue
  1368. }
  1369. for k, wKey := range tt.wresps[j] {
  1370. respKey := string(resp.Kvs[k].Key)
  1371. if respKey != wKey {
  1372. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1373. }
  1374. }
  1375. if resp.More != tt.wmores[j] {
  1376. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1377. }
  1378. wrev := int64(len(tt.putKeys) + 1)
  1379. if resp.Header.Revision != wrev {
  1380. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1381. }
  1382. }
  1383. clus.Terminate(t)
  1384. }
  1385. }
  1386. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1387. cfg.UseGRPC = true
  1388. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1389. clus.Launch(t)
  1390. return clus
  1391. }
  1392. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1393. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1394. defer testutil.AfterTest(t)
  1395. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1396. clus := newClusterV3NoClients(t, &cfg)
  1397. defer clus.Terminate(t)
  1398. // nil out TLS field so client will use an insecure connection
  1399. clus.Members[0].ClientTLSInfo = nil
  1400. client, err := NewClientV3(clus.Members[0])
  1401. if err != nil && err != context.DeadlineExceeded {
  1402. t.Fatalf("unexpected error (%v)", err)
  1403. } else if client == nil {
  1404. // Ideally, no client would be returned. However, grpc will
  1405. // return a connection without trying to handshake first so
  1406. // the connection appears OK.
  1407. return
  1408. }
  1409. defer client.Close()
  1410. donec := make(chan error, 1)
  1411. go func() {
  1412. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1413. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1414. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1415. cancel()
  1416. donec <- perr
  1417. }()
  1418. if perr := <-donec; perr == nil {
  1419. t.Fatalf("expected client error on put")
  1420. }
  1421. }
  1422. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1423. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1424. defer testutil.AfterTest(t)
  1425. cfg := ClusterConfig{Size: 3}
  1426. clus := newClusterV3NoClients(t, &cfg)
  1427. defer clus.Terminate(t)
  1428. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1429. client, err := NewClientV3(clus.Members[0])
  1430. if client != nil || err == nil {
  1431. t.Fatalf("expected no client")
  1432. } else if err != context.DeadlineExceeded {
  1433. t.Fatalf("unexpected error (%v)", err)
  1434. }
  1435. }
  1436. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1437. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1438. defer testutil.AfterTest(t)
  1439. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1440. clus := newClusterV3NoClients(t, &cfg)
  1441. defer clus.Terminate(t)
  1442. client, err := NewClientV3(clus.Members[0])
  1443. if err != nil {
  1444. t.Fatalf("expected tls client (%v)", err)
  1445. }
  1446. defer client.Close()
  1447. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1448. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1449. t.Fatalf("unexpected error on put over tls (%v)", err)
  1450. }
  1451. }
  1452. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1453. // when all certs are atomically replaced by directory renaming.
  1454. // And expects server to reject client requests, and vice versa.
  1455. func TestTLSReloadAtomicReplace(t *testing.T) {
  1456. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1457. if err != nil {
  1458. t.Fatal(err)
  1459. }
  1460. os.RemoveAll(tmpDir)
  1461. defer os.RemoveAll(tmpDir)
  1462. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1463. if err != nil {
  1464. t.Fatal(err)
  1465. }
  1466. defer os.RemoveAll(certsDir)
  1467. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1468. if err != nil {
  1469. t.Fatal(err)
  1470. }
  1471. defer os.RemoveAll(certsDirExp)
  1472. cloneFunc := func() transport.TLSInfo {
  1473. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1474. if terr != nil {
  1475. t.Fatal(terr)
  1476. }
  1477. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1478. t.Fatal(err)
  1479. }
  1480. return tlsInfo
  1481. }
  1482. replaceFunc := func() {
  1483. if err = os.Rename(certsDir, tmpDir); err != nil {
  1484. t.Fatal(err)
  1485. }
  1486. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1487. t.Fatal(err)
  1488. }
  1489. // after rename,
  1490. // 'certsDir' contains expired certs
  1491. // 'tmpDir' contains valid certs
  1492. // 'certsDirExp' does not exist
  1493. }
  1494. revertFunc := func() {
  1495. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1496. t.Fatal(err)
  1497. }
  1498. if err = os.Rename(certsDir, tmpDir); err != nil {
  1499. t.Fatal(err)
  1500. }
  1501. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1502. t.Fatal(err)
  1503. }
  1504. }
  1505. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1506. }
  1507. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1508. // when new certs are copied over, one by one. And expects server
  1509. // to reject client requests, and vice versa.
  1510. func TestTLSReloadCopy(t *testing.T) {
  1511. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1512. if err != nil {
  1513. t.Fatal(err)
  1514. }
  1515. defer os.RemoveAll(certsDir)
  1516. cloneFunc := func() transport.TLSInfo {
  1517. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1518. if terr != nil {
  1519. t.Fatal(terr)
  1520. }
  1521. return tlsInfo
  1522. }
  1523. replaceFunc := func() {
  1524. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1525. t.Fatal(err)
  1526. }
  1527. }
  1528. revertFunc := func() {
  1529. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1530. t.Fatal(err)
  1531. }
  1532. }
  1533. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1534. }
  1535. func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
  1536. defer testutil.AfterTest(t)
  1537. // 1. separate copies for TLS assets modification
  1538. tlsInfo := cloneFunc()
  1539. // 2. start cluster with valid certs
  1540. clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
  1541. defer clus.Terminate(t)
  1542. // 3. concurrent client dialing while certs become expired
  1543. errc := make(chan error, 1)
  1544. go func() {
  1545. for {
  1546. cc, err := tlsInfo.ClientConfig()
  1547. if err != nil {
  1548. // errors in 'go/src/crypto/tls/tls.go'
  1549. // tls: private key does not match public key
  1550. // tls: failed to find any PEM data in key input
  1551. // tls: failed to find any PEM data in certificate input
  1552. // Or 'does not exist', 'not found', etc
  1553. t.Log(err)
  1554. continue
  1555. }
  1556. cli, cerr := clientv3.New(clientv3.Config{
  1557. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1558. DialTimeout: time.Second,
  1559. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  1560. TLS: cc,
  1561. })
  1562. if cerr != nil {
  1563. errc <- cerr
  1564. return
  1565. }
  1566. cli.Close()
  1567. }
  1568. }()
  1569. // 4. replace certs with expired ones
  1570. replaceFunc()
  1571. // 5. expect dial time-out when loading expired certs
  1572. select {
  1573. case gerr := <-errc:
  1574. if gerr != context.DeadlineExceeded {
  1575. t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
  1576. }
  1577. case <-time.After(5 * time.Second):
  1578. t.Fatal("failed to receive dial timeout error")
  1579. }
  1580. // 6. replace expired certs back with valid ones
  1581. revertFunc()
  1582. // 7. new requests should trigger listener to reload valid certs
  1583. tls, terr := tlsInfo.ClientConfig()
  1584. if terr != nil {
  1585. t.Fatal(terr)
  1586. }
  1587. cl, cerr := clientv3.New(clientv3.Config{
  1588. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1589. DialTimeout: 5 * time.Second,
  1590. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  1591. TLS: tls,
  1592. })
  1593. if cerr != nil {
  1594. t.Fatalf("expected no error, got %v", cerr)
  1595. }
  1596. cl.Close()
  1597. }
  1598. func TestGRPCRequireLeader(t *testing.T) {
  1599. defer testutil.AfterTest(t)
  1600. cfg := ClusterConfig{Size: 3}
  1601. clus := newClusterV3NoClients(t, &cfg)
  1602. defer clus.Terminate(t)
  1603. clus.Members[1].Stop(t)
  1604. clus.Members[2].Stop(t)
  1605. client, err := NewClientV3(clus.Members[0])
  1606. if err != nil {
  1607. t.Fatalf("cannot create client: %v", err)
  1608. }
  1609. defer client.Close()
  1610. // wait for election timeout, then member[0] will not have a leader.
  1611. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1612. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1613. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1614. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1615. if _, err := toGRPC(client).KV.Put(ctx, reqput); rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1616. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1617. }
  1618. }
  1619. func TestGRPCStreamRequireLeader(t *testing.T) {
  1620. defer testutil.AfterTest(t)
  1621. cfg := ClusterConfig{Size: 3}
  1622. clus := newClusterV3NoClients(t, &cfg)
  1623. defer clus.Terminate(t)
  1624. client, err := NewClientV3(clus.Members[0])
  1625. if err != nil {
  1626. t.Fatalf("failed to create client (%v)", err)
  1627. }
  1628. defer client.Close()
  1629. wAPI := toGRPC(client).Watch
  1630. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1631. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1632. wStream, err := wAPI.Watch(ctx)
  1633. if err != nil {
  1634. t.Fatalf("wAPI.Watch error: %v", err)
  1635. }
  1636. clus.Members[1].Stop(t)
  1637. clus.Members[2].Stop(t)
  1638. // existing stream should be rejected
  1639. _, err = wStream.Recv()
  1640. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1641. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1642. }
  1643. // new stream should also be rejected
  1644. wStream, err = wAPI.Watch(ctx)
  1645. if err != nil {
  1646. t.Fatalf("wAPI.Watch error: %v", err)
  1647. }
  1648. _, err = wStream.Recv()
  1649. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1650. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1651. }
  1652. clus.Members[1].Restart(t)
  1653. clus.Members[2].Restart(t)
  1654. clus.waitLeader(t, clus.Members)
  1655. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1656. // new stream should also be OK now after we restarted the other members
  1657. wStream, err = wAPI.Watch(ctx)
  1658. if err != nil {
  1659. t.Fatalf("wAPI.Watch error: %v", err)
  1660. }
  1661. wreq := &pb.WatchRequest{
  1662. RequestUnion: &pb.WatchRequest_CreateRequest{
  1663. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1664. },
  1665. }
  1666. err = wStream.Send(wreq)
  1667. if err != nil {
  1668. t.Errorf("err = %v, want nil", err)
  1669. }
  1670. }
  1671. // TestV3LargeRequests ensures that configurable MaxRequestBytes works as intended.
  1672. func TestV3LargeRequests(t *testing.T) {
  1673. defer testutil.AfterTest(t)
  1674. tests := []struct {
  1675. maxRequestBytes uint
  1676. valueSize int
  1677. expectError error
  1678. }{
  1679. // don't set to 0. use 0 as the default.
  1680. {1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1681. {10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1682. {10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1683. {10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1684. }
  1685. for i, test := range tests {
  1686. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1687. kvcli := toGRPC(clus.Client(0)).KV
  1688. reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
  1689. _, err := kvcli.Put(context.TODO(), reqput)
  1690. if !eqErrGRPC(err, test.expectError) {
  1691. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1692. }
  1693. // request went through, expect large response back from server
  1694. if test.expectError == nil {
  1695. reqget := &pb.RangeRequest{Key: []byte("foo")}
  1696. // limit receive call size with original value + gRPC overhead bytes
  1697. _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
  1698. if err != nil {
  1699. t.Errorf("#%d: range expected no error, got %v", i, err)
  1700. }
  1701. }
  1702. clus.Terminate(t)
  1703. }
  1704. }
  1705. func eqErrGRPC(err1 error, err2 error) bool {
  1706. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1707. }
  1708. // waitForRestart tries a range request until the client's server responds.
  1709. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1710. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1711. // FailFast=false works with Put.
  1712. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1713. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1714. // TODO: Remove retry loop once the new grpc load balancer provides retry.
  1715. var err error
  1716. for i := 0; i < 10; i++ {
  1717. if _, err = kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1718. if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
  1719. time.Sleep(time.Millisecond * 250)
  1720. } else {
  1721. t.Fatal(err)
  1722. }
  1723. }
  1724. }
  1725. if err != nil {
  1726. t.Fatalf("timed out waiting for restart: %v", err)
  1727. }
  1728. }