v3_grpc_test.go 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "math/rand"
  20. "os"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/testutil"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "golang.org/x/net/context"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/metadata"
  32. )
  33. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  34. // overwrites it, then checks that the change was applied.
  35. func TestV3PutOverwrite(t *testing.T) {
  36. defer testutil.AfterTest(t)
  37. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  38. defer clus.Terminate(t)
  39. kvc := toGRPC(clus.RandClient()).KV
  40. key := []byte("foo")
  41. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  42. respput, err := kvc.Put(context.TODO(), reqput)
  43. if err != nil {
  44. t.Fatalf("couldn't put key (%v)", err)
  45. }
  46. // overwrite
  47. reqput.Value = []byte("baz")
  48. respput2, err := kvc.Put(context.TODO(), reqput)
  49. if err != nil {
  50. t.Fatalf("couldn't put key (%v)", err)
  51. }
  52. if respput2.Header.Revision <= respput.Header.Revision {
  53. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  54. respput2.Header.Revision, respput.Header.Revision)
  55. }
  56. reqrange := &pb.RangeRequest{Key: key}
  57. resprange, err := kvc.Range(context.TODO(), reqrange)
  58. if err != nil {
  59. t.Fatalf("couldn't get key (%v)", err)
  60. }
  61. if len(resprange.Kvs) != 1 {
  62. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  63. }
  64. kv := resprange.Kvs[0]
  65. if kv.ModRevision <= kv.CreateRevision {
  66. t.Errorf("expected modRev > createRev, got %d <= %d",
  67. kv.ModRevision, kv.CreateRevision)
  68. }
  69. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  70. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  71. }
  72. }
  73. // TestPutRestart checks if a put after an unrelated member restart succeeds
  74. func TestV3PutRestart(t *testing.T) {
  75. defer testutil.AfterTest(t)
  76. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  77. defer clus.Terminate(t)
  78. kvIdx := rand.Intn(3)
  79. kvc := toGRPC(clus.Client(kvIdx)).KV
  80. stopIdx := kvIdx
  81. for stopIdx == kvIdx {
  82. stopIdx = rand.Intn(3)
  83. }
  84. clus.clients[stopIdx].Close()
  85. clus.Members[stopIdx].Stop(t)
  86. clus.Members[stopIdx].Restart(t)
  87. c, cerr := NewClientV3(clus.Members[stopIdx])
  88. if cerr != nil {
  89. t.Fatalf("cannot create client: %v", cerr)
  90. }
  91. clus.clients[stopIdx] = c
  92. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  93. defer cancel()
  94. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  95. _, err := kvc.Put(ctx, reqput)
  96. if err != nil && err == ctx.Err() {
  97. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  98. }
  99. }
  100. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  101. func TestV3CompactCurrentRev(t *testing.T) {
  102. defer testutil.AfterTest(t)
  103. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  104. defer clus.Terminate(t)
  105. kvc := toGRPC(clus.RandClient()).KV
  106. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  107. for i := 0; i < 3; i++ {
  108. if _, err := kvc.Put(context.Background(), preq); err != nil {
  109. t.Fatalf("couldn't put key (%v)", err)
  110. }
  111. }
  112. // get key to add to proxy cache, if any
  113. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  114. t.Fatal(err)
  115. }
  116. // compact on current revision
  117. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  118. if err != nil {
  119. t.Fatalf("couldn't compact kv space (%v)", err)
  120. }
  121. // key still exists when linearized?
  122. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  123. if err != nil {
  124. t.Fatalf("couldn't get key after compaction (%v)", err)
  125. }
  126. // key still exists when serialized?
  127. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  128. if err != nil {
  129. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  130. }
  131. }
  132. // TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
  133. func TestV3HashKV(t *testing.T) {
  134. defer testutil.AfterTest(t)
  135. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  136. defer clus.Terminate(t)
  137. kvc := toGRPC(clus.RandClient()).KV
  138. mvc := toGRPC(clus.RandClient()).Maintenance
  139. for i := 0; i < 10; i++ {
  140. resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
  141. if err != nil {
  142. t.Fatal(err)
  143. }
  144. rev := resp.Header.Revision
  145. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. if rev != hresp.Header.Revision {
  150. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  151. }
  152. prevHash := hresp.Hash
  153. prevCompactRev := hresp.CompactRevision
  154. for i := 0; i < 10; i++ {
  155. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. if rev != hresp.Header.Revision {
  160. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  161. }
  162. if prevHash != hresp.Hash {
  163. t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
  164. }
  165. if prevCompactRev != hresp.CompactRevision {
  166. t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
  167. }
  168. prevHash = hresp.Hash
  169. prevCompactRev = hresp.CompactRevision
  170. }
  171. }
  172. }
  173. func TestV3TxnTooManyOps(t *testing.T) {
  174. defer testutil.AfterTest(t)
  175. maxTxnOps := uint(128)
  176. clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
  177. defer clus.Terminate(t)
  178. kvc := toGRPC(clus.RandClient()).KV
  179. // unique keys
  180. i := new(int)
  181. keyf := func() []byte {
  182. *i++
  183. return []byte(fmt.Sprintf("key-%d", i))
  184. }
  185. addCompareOps := func(txn *pb.TxnRequest) {
  186. txn.Compare = append(txn.Compare,
  187. &pb.Compare{
  188. Result: pb.Compare_GREATER,
  189. Target: pb.Compare_CREATE,
  190. Key: keyf(),
  191. })
  192. }
  193. addSuccessOps := func(txn *pb.TxnRequest) {
  194. txn.Success = append(txn.Success,
  195. &pb.RequestOp{
  196. Request: &pb.RequestOp_RequestPut{
  197. RequestPut: &pb.PutRequest{
  198. Key: keyf(),
  199. Value: []byte("bar"),
  200. },
  201. },
  202. })
  203. }
  204. addFailureOps := func(txn *pb.TxnRequest) {
  205. txn.Failure = append(txn.Failure,
  206. &pb.RequestOp{
  207. Request: &pb.RequestOp_RequestPut{
  208. RequestPut: &pb.PutRequest{
  209. Key: keyf(),
  210. Value: []byte("bar"),
  211. },
  212. },
  213. })
  214. }
  215. addTxnOps := func(txn *pb.TxnRequest) {
  216. newTxn := &pb.TxnRequest{}
  217. addSuccessOps(newTxn)
  218. txn.Success = append(txn.Success,
  219. &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  220. RequestTxn: newTxn,
  221. },
  222. },
  223. )
  224. }
  225. tests := []func(txn *pb.TxnRequest){
  226. addCompareOps,
  227. addSuccessOps,
  228. addFailureOps,
  229. addTxnOps,
  230. }
  231. for i, tt := range tests {
  232. txn := &pb.TxnRequest{}
  233. for j := 0; j < int(maxTxnOps+1); j++ {
  234. tt(txn)
  235. }
  236. _, err := kvc.Txn(context.Background(), txn)
  237. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  238. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  239. }
  240. }
  241. }
  242. func TestV3TxnDuplicateKeys(t *testing.T) {
  243. defer testutil.AfterTest(t)
  244. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  245. defer clus.Terminate(t)
  246. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  247. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  248. RequestDeleteRange: &pb.DeleteRangeRequest{
  249. Key: []byte("abc"),
  250. },
  251. },
  252. }
  253. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  254. RequestDeleteRange: &pb.DeleteRangeRequest{
  255. Key: []byte("a"), RangeEnd: []byte("b"),
  256. },
  257. },
  258. }
  259. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  260. RequestDeleteRange: &pb.DeleteRangeRequest{
  261. Key: []byte("abb"), RangeEnd: []byte("abc"),
  262. },
  263. },
  264. }
  265. txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  266. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
  267. },
  268. }
  269. txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  270. RequestTxn: &pb.TxnRequest{
  271. Success: []*pb.RequestOp{delInRangeReq},
  272. Failure: []*pb.RequestOp{delInRangeReq}},
  273. },
  274. }
  275. txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  276. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
  277. },
  278. }
  279. txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  280. RequestTxn: &pb.TxnRequest{
  281. Success: []*pb.RequestOp{putreq},
  282. Failure: []*pb.RequestOp{putreq}},
  283. },
  284. }
  285. kvc := toGRPC(clus.RandClient()).KV
  286. tests := []struct {
  287. txnSuccess []*pb.RequestOp
  288. werr error
  289. }{
  290. {
  291. txnSuccess: []*pb.RequestOp{putreq, putreq},
  292. werr: rpctypes.ErrGRPCDuplicateKey,
  293. },
  294. {
  295. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  296. werr: rpctypes.ErrGRPCDuplicateKey,
  297. },
  298. {
  299. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  300. werr: rpctypes.ErrGRPCDuplicateKey,
  301. },
  302. // Then(Put(a), Then(Del(a)))
  303. {
  304. txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
  305. werr: rpctypes.ErrGRPCDuplicateKey,
  306. },
  307. // Then(Del(a), Then(Put(a)))
  308. {
  309. txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
  310. werr: rpctypes.ErrGRPCDuplicateKey,
  311. },
  312. // Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
  313. {
  314. txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
  315. werr: rpctypes.ErrGRPCDuplicateKey,
  316. },
  317. // Then(Del(x), (Then(Put(a)), Else(Put(a))))
  318. {
  319. txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
  320. werr: nil,
  321. },
  322. // Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
  323. {
  324. txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
  325. werr: nil,
  326. },
  327. {
  328. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  329. werr: nil,
  330. },
  331. {
  332. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  333. werr: nil,
  334. },
  335. }
  336. for i, tt := range tests {
  337. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  338. _, err := kvc.Txn(context.Background(), txn)
  339. if !eqErrGRPC(err, tt.werr) {
  340. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  341. }
  342. }
  343. }
  344. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  345. func TestV3TxnRevision(t *testing.T) {
  346. defer testutil.AfterTest(t)
  347. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  348. defer clus.Terminate(t)
  349. kvc := toGRPC(clus.RandClient()).KV
  350. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  351. presp, err := kvc.Put(context.TODO(), pr)
  352. if err != nil {
  353. t.Fatal(err)
  354. }
  355. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  356. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  357. tresp, err := kvc.Txn(context.TODO(), txn)
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. // did not update revision
  362. if presp.Header.Revision != tresp.Header.Revision {
  363. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  364. }
  365. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  366. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  367. tresp, err = kvc.Txn(context.TODO(), txn)
  368. if err != nil {
  369. t.Fatal(err)
  370. }
  371. // did not update revision
  372. if presp.Header.Revision != tresp.Header.Revision {
  373. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  374. }
  375. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  376. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  377. tresp, err = kvc.Txn(context.TODO(), txn)
  378. if err != nil {
  379. t.Fatal(err)
  380. }
  381. // updated revision
  382. if tresp.Header.Revision != presp.Header.Revision+1 {
  383. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  384. }
  385. }
  386. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  387. // when compared to the Succeeded field in the txn response.
  388. func TestV3TxnCmpHeaderRev(t *testing.T) {
  389. defer testutil.AfterTest(t)
  390. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  391. defer clus.Terminate(t)
  392. kvc := toGRPC(clus.RandClient()).KV
  393. for i := 0; i < 10; i++ {
  394. // Concurrently put a key with a txn comparing on it.
  395. revc := make(chan int64, 1)
  396. go func() {
  397. defer close(revc)
  398. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  399. presp, err := kvc.Put(context.TODO(), pr)
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. revc <- presp.Header.Revision
  404. }()
  405. // The read-only txn uses the optimized readindex server path.
  406. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  407. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  408. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  409. // i = 0 /\ Succeeded => put followed txn
  410. cmp := &pb.Compare{
  411. Result: pb.Compare_EQUAL,
  412. Target: pb.Compare_VERSION,
  413. Key: []byte("k"),
  414. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  415. }
  416. txn.Compare = append(txn.Compare, cmp)
  417. tresp, err := kvc.Txn(context.TODO(), txn)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. prev := <-revc
  422. // put followed txn; should eval to false
  423. if prev > tresp.Header.Revision && !tresp.Succeeded {
  424. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  425. }
  426. // txn follows put; should eval to true
  427. if tresp.Header.Revision >= prev && tresp.Succeeded {
  428. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  429. }
  430. }
  431. }
  432. // TestV3TxnRangeCompare tests range comparisons in txns
  433. func TestV3TxnRangeCompare(t *testing.T) {
  434. defer testutil.AfterTest(t)
  435. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  436. defer clus.Terminate(t)
  437. // put keys, named by expected revision
  438. for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
  439. if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
  440. t.Fatal(err)
  441. }
  442. }
  443. tests := []struct {
  444. cmp pb.Compare
  445. wSuccess bool
  446. }{
  447. {
  448. // >= /a/; all create revs fit
  449. pb.Compare{
  450. Key: []byte("/a/"),
  451. RangeEnd: []byte{0},
  452. Target: pb.Compare_CREATE,
  453. Result: pb.Compare_LESS,
  454. TargetUnion: &pb.Compare_CreateRevision{6},
  455. },
  456. true,
  457. },
  458. {
  459. // >= /a/; one create rev doesn't fit
  460. pb.Compare{
  461. Key: []byte("/a/"),
  462. RangeEnd: []byte{0},
  463. Target: pb.Compare_CREATE,
  464. Result: pb.Compare_LESS,
  465. TargetUnion: &pb.Compare_CreateRevision{5},
  466. },
  467. false,
  468. },
  469. {
  470. // prefix /a/*; all create revs fit
  471. pb.Compare{
  472. Key: []byte("/a/"),
  473. RangeEnd: []byte("/a0"),
  474. Target: pb.Compare_CREATE,
  475. Result: pb.Compare_LESS,
  476. TargetUnion: &pb.Compare_CreateRevision{5},
  477. },
  478. true,
  479. },
  480. {
  481. // prefix /a/*; one create rev doesn't fit
  482. pb.Compare{
  483. Key: []byte("/a/"),
  484. RangeEnd: []byte("/a0"),
  485. Target: pb.Compare_CREATE,
  486. Result: pb.Compare_LESS,
  487. TargetUnion: &pb.Compare_CreateRevision{4},
  488. },
  489. false,
  490. },
  491. {
  492. // does not exist, does not succeed
  493. pb.Compare{
  494. Key: []byte("/b/"),
  495. RangeEnd: []byte("/b0"),
  496. Target: pb.Compare_VALUE,
  497. Result: pb.Compare_EQUAL,
  498. TargetUnion: &pb.Compare_Value{[]byte("x")},
  499. },
  500. false,
  501. },
  502. {
  503. // all keys are leased
  504. pb.Compare{
  505. Key: []byte("/a/"),
  506. RangeEnd: []byte("/a0"),
  507. Target: pb.Compare_LEASE,
  508. Result: pb.Compare_GREATER,
  509. TargetUnion: &pb.Compare_Lease{0},
  510. },
  511. false,
  512. },
  513. {
  514. // no keys are leased
  515. pb.Compare{
  516. Key: []byte("/a/"),
  517. RangeEnd: []byte("/a0"),
  518. Target: pb.Compare_LEASE,
  519. Result: pb.Compare_EQUAL,
  520. TargetUnion: &pb.Compare_Lease{0},
  521. },
  522. true,
  523. },
  524. }
  525. kvc := toGRPC(clus.Client(0)).KV
  526. for i, tt := range tests {
  527. txn := &pb.TxnRequest{}
  528. txn.Compare = append(txn.Compare, &tt.cmp)
  529. tresp, err := kvc.Txn(context.TODO(), txn)
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. if tt.wSuccess != tresp.Succeeded {
  534. t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
  535. }
  536. }
  537. }
  538. // TestV3TxnNested tests nested txns follow paths as expected.
  539. func TestV3TxnNestedPath(t *testing.T) {
  540. defer testutil.AfterTest(t)
  541. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  542. defer clus.Terminate(t)
  543. kvc := toGRPC(clus.RandClient()).KV
  544. cmpTrue := &pb.Compare{
  545. Result: pb.Compare_EQUAL,
  546. Target: pb.Compare_VERSION,
  547. Key: []byte("k"),
  548. TargetUnion: &pb.Compare_Version{Version: int64(0)},
  549. }
  550. cmpFalse := &pb.Compare{
  551. Result: pb.Compare_EQUAL,
  552. Target: pb.Compare_VERSION,
  553. Key: []byte("k"),
  554. TargetUnion: &pb.Compare_Version{Version: int64(1)},
  555. }
  556. // generate random path to eval txns
  557. topTxn := &pb.TxnRequest{}
  558. txn := topTxn
  559. txnPath := make([]bool, 10)
  560. for i := range txnPath {
  561. nextTxn := &pb.TxnRequest{}
  562. op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
  563. txnPath[i] = rand.Intn(2) == 0
  564. if txnPath[i] {
  565. txn.Compare = append(txn.Compare, cmpTrue)
  566. txn.Success = append(txn.Success, op)
  567. } else {
  568. txn.Compare = append(txn.Compare, cmpFalse)
  569. txn.Failure = append(txn.Failure, op)
  570. }
  571. txn = nextTxn
  572. }
  573. tresp, err := kvc.Txn(context.TODO(), topTxn)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. curTxnResp := tresp
  578. for i := range txnPath {
  579. if curTxnResp.Succeeded != txnPath[i] {
  580. t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
  581. }
  582. curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
  583. }
  584. }
  585. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  586. func TestV3PutIgnoreValue(t *testing.T) {
  587. defer testutil.AfterTest(t)
  588. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  589. defer clus.Terminate(t)
  590. kvc := toGRPC(clus.RandClient()).KV
  591. key, val := []byte("foo"), []byte("bar")
  592. putReq := pb.PutRequest{Key: key, Value: val}
  593. // create lease
  594. lc := toGRPC(clus.RandClient()).Lease
  595. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  596. if err != nil {
  597. t.Fatal(err)
  598. }
  599. if lresp.Error != "" {
  600. t.Fatal(lresp.Error)
  601. }
  602. tests := []struct {
  603. putFunc func() error
  604. putErr error
  605. wleaseID int64
  606. }{
  607. { // put failure for non-existent key
  608. func() error {
  609. preq := putReq
  610. preq.IgnoreValue = true
  611. _, err := kvc.Put(context.TODO(), &preq)
  612. return err
  613. },
  614. rpctypes.ErrGRPCKeyNotFound,
  615. 0,
  616. },
  617. { // txn failure for non-existent key
  618. func() error {
  619. preq := putReq
  620. preq.Value = nil
  621. preq.IgnoreValue = true
  622. txn := &pb.TxnRequest{}
  623. txn.Success = append(txn.Success, &pb.RequestOp{
  624. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  625. _, err := kvc.Txn(context.TODO(), txn)
  626. return err
  627. },
  628. rpctypes.ErrGRPCKeyNotFound,
  629. 0,
  630. },
  631. { // put success
  632. func() error {
  633. _, err := kvc.Put(context.TODO(), &putReq)
  634. return err
  635. },
  636. nil,
  637. 0,
  638. },
  639. { // txn success, attach lease
  640. func() error {
  641. preq := putReq
  642. preq.Value = nil
  643. preq.Lease = lresp.ID
  644. preq.IgnoreValue = true
  645. txn := &pb.TxnRequest{}
  646. txn.Success = append(txn.Success, &pb.RequestOp{
  647. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  648. _, err := kvc.Txn(context.TODO(), txn)
  649. return err
  650. },
  651. nil,
  652. lresp.ID,
  653. },
  654. { // non-empty value with ignore_value should error
  655. func() error {
  656. preq := putReq
  657. preq.IgnoreValue = true
  658. _, err := kvc.Put(context.TODO(), &preq)
  659. return err
  660. },
  661. rpctypes.ErrGRPCValueProvided,
  662. 0,
  663. },
  664. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  665. func() error {
  666. preq := putReq
  667. preq.Value = nil
  668. preq.IgnoreValue = true
  669. presp, err := kvc.Put(context.TODO(), &preq)
  670. if err != nil {
  671. return err
  672. }
  673. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  674. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  675. }
  676. return nil
  677. },
  678. nil,
  679. 0,
  680. },
  681. { // revoke lease, ensure detached key doesn't get deleted
  682. func() error {
  683. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  684. return err
  685. },
  686. nil,
  687. 0,
  688. },
  689. }
  690. for i, tt := range tests {
  691. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  692. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  693. }
  694. if tt.putErr != nil {
  695. continue
  696. }
  697. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  698. if err != nil {
  699. t.Fatalf("#%d: %v", i, err)
  700. }
  701. if len(rr.Kvs) != 1 {
  702. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  703. }
  704. if !bytes.Equal(rr.Kvs[0].Value, val) {
  705. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  706. }
  707. if rr.Kvs[0].Lease != tt.wleaseID {
  708. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  709. }
  710. }
  711. }
  712. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  713. func TestV3PutIgnoreLease(t *testing.T) {
  714. defer testutil.AfterTest(t)
  715. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  716. defer clus.Terminate(t)
  717. kvc := toGRPC(clus.RandClient()).KV
  718. // create lease
  719. lc := toGRPC(clus.RandClient()).Lease
  720. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  721. if err != nil {
  722. t.Fatal(err)
  723. }
  724. if lresp.Error != "" {
  725. t.Fatal(lresp.Error)
  726. }
  727. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  728. putReq := pb.PutRequest{Key: key, Value: val}
  729. tests := []struct {
  730. putFunc func() error
  731. putErr error
  732. wleaseID int64
  733. wvalue []byte
  734. }{
  735. { // put failure for non-existent key
  736. func() error {
  737. preq := putReq
  738. preq.IgnoreLease = true
  739. _, err := kvc.Put(context.TODO(), &preq)
  740. return err
  741. },
  742. rpctypes.ErrGRPCKeyNotFound,
  743. 0,
  744. nil,
  745. },
  746. { // txn failure for non-existent key
  747. func() error {
  748. preq := putReq
  749. preq.IgnoreLease = true
  750. txn := &pb.TxnRequest{}
  751. txn.Success = append(txn.Success, &pb.RequestOp{
  752. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  753. _, err := kvc.Txn(context.TODO(), txn)
  754. return err
  755. },
  756. rpctypes.ErrGRPCKeyNotFound,
  757. 0,
  758. nil,
  759. },
  760. { // put success
  761. func() error {
  762. preq := putReq
  763. preq.Lease = lresp.ID
  764. _, err := kvc.Put(context.TODO(), &preq)
  765. return err
  766. },
  767. nil,
  768. lresp.ID,
  769. val,
  770. },
  771. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  772. func() error {
  773. preq := putReq
  774. preq.Value = val1
  775. preq.IgnoreLease = true
  776. txn := &pb.TxnRequest{}
  777. txn.Success = append(txn.Success, &pb.RequestOp{
  778. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  779. _, err := kvc.Txn(context.TODO(), txn)
  780. return err
  781. },
  782. nil,
  783. lresp.ID,
  784. val1,
  785. },
  786. { // non-empty lease with ignore_lease should error
  787. func() error {
  788. preq := putReq
  789. preq.Lease = lresp.ID
  790. preq.IgnoreLease = true
  791. _, err := kvc.Put(context.TODO(), &preq)
  792. return err
  793. },
  794. rpctypes.ErrGRPCLeaseProvided,
  795. 0,
  796. nil,
  797. },
  798. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  799. func() error {
  800. presp, err := kvc.Put(context.TODO(), &putReq)
  801. if err != nil {
  802. return err
  803. }
  804. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  805. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  806. }
  807. return nil
  808. },
  809. nil,
  810. 0,
  811. val,
  812. },
  813. { // revoke lease, ensure detached key doesn't get deleted
  814. func() error {
  815. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  816. return err
  817. },
  818. nil,
  819. 0,
  820. val,
  821. },
  822. }
  823. for i, tt := range tests {
  824. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  825. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  826. }
  827. if tt.putErr != nil {
  828. continue
  829. }
  830. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  831. if err != nil {
  832. t.Fatalf("#%d: %v", i, err)
  833. }
  834. if len(rr.Kvs) != 1 {
  835. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  836. }
  837. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  838. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  839. }
  840. if rr.Kvs[0].Lease != tt.wleaseID {
  841. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  842. }
  843. }
  844. }
  845. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  846. func TestV3PutMissingLease(t *testing.T) {
  847. defer testutil.AfterTest(t)
  848. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  849. defer clus.Terminate(t)
  850. kvc := toGRPC(clus.RandClient()).KV
  851. key := []byte("foo")
  852. preq := &pb.PutRequest{Key: key, Lease: 123456}
  853. tests := []func(){
  854. // put case
  855. func() {
  856. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  857. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  858. }
  859. },
  860. // txn success case
  861. func() {
  862. txn := &pb.TxnRequest{}
  863. txn.Success = append(txn.Success, &pb.RequestOp{
  864. Request: &pb.RequestOp_RequestPut{
  865. RequestPut: preq}})
  866. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  867. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  868. }
  869. },
  870. // txn failure case
  871. func() {
  872. txn := &pb.TxnRequest{}
  873. txn.Failure = append(txn.Failure, &pb.RequestOp{
  874. Request: &pb.RequestOp_RequestPut{
  875. RequestPut: preq}})
  876. cmp := &pb.Compare{
  877. Result: pb.Compare_GREATER,
  878. Target: pb.Compare_CREATE,
  879. Key: []byte("bar"),
  880. }
  881. txn.Compare = append(txn.Compare, cmp)
  882. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  883. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  884. }
  885. },
  886. // ignore bad lease in failure on success txn
  887. func() {
  888. txn := &pb.TxnRequest{}
  889. rreq := &pb.RangeRequest{Key: []byte("bar")}
  890. txn.Success = append(txn.Success, &pb.RequestOp{
  891. Request: &pb.RequestOp_RequestRange{
  892. RequestRange: rreq}})
  893. txn.Failure = append(txn.Failure, &pb.RequestOp{
  894. Request: &pb.RequestOp_RequestPut{
  895. RequestPut: preq}})
  896. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  897. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  898. }
  899. },
  900. }
  901. for i, f := range tests {
  902. f()
  903. // key shouldn't have been stored
  904. rreq := &pb.RangeRequest{Key: key}
  905. rresp, err := kvc.Range(context.TODO(), rreq)
  906. if err != nil {
  907. t.Errorf("#%d. could not rangereq (%v)", i, err)
  908. } else if len(rresp.Kvs) != 0 {
  909. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  910. }
  911. }
  912. }
  913. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  914. func TestV3DeleteRange(t *testing.T) {
  915. defer testutil.AfterTest(t)
  916. tests := []struct {
  917. keySet []string
  918. begin string
  919. end string
  920. prevKV bool
  921. wantSet [][]byte
  922. deleted int64
  923. }{
  924. // delete middle
  925. {
  926. []string{"foo", "foo/abc", "fop"},
  927. "foo/", "fop", false,
  928. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  929. },
  930. // no delete
  931. {
  932. []string{"foo", "foo/abc", "fop"},
  933. "foo/", "foo/", false,
  934. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  935. },
  936. // delete first
  937. {
  938. []string{"foo", "foo/abc", "fop"},
  939. "fo", "fop", false,
  940. [][]byte{[]byte("fop")}, 2,
  941. },
  942. // delete tail
  943. {
  944. []string{"foo", "foo/abc", "fop"},
  945. "foo/", "fos", false,
  946. [][]byte{[]byte("foo")}, 2,
  947. },
  948. // delete exact
  949. {
  950. []string{"foo", "foo/abc", "fop"},
  951. "foo/abc", "", false,
  952. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  953. },
  954. // delete none, [x,x)
  955. {
  956. []string{"foo"},
  957. "foo", "foo", false,
  958. [][]byte{[]byte("foo")}, 0,
  959. },
  960. // delete middle with preserveKVs set
  961. {
  962. []string{"foo", "foo/abc", "fop"},
  963. "foo/", "fop", true,
  964. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  965. },
  966. }
  967. for i, tt := range tests {
  968. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  969. kvc := toGRPC(clus.RandClient()).KV
  970. ks := tt.keySet
  971. for j := range ks {
  972. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  973. _, err := kvc.Put(context.TODO(), reqput)
  974. if err != nil {
  975. t.Fatalf("couldn't put key (%v)", err)
  976. }
  977. }
  978. dreq := &pb.DeleteRangeRequest{
  979. Key: []byte(tt.begin),
  980. RangeEnd: []byte(tt.end),
  981. PrevKv: tt.prevKV,
  982. }
  983. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  984. if err != nil {
  985. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  986. }
  987. if tt.deleted != dresp.Deleted {
  988. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  989. }
  990. if tt.prevKV {
  991. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  992. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  993. }
  994. }
  995. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  996. rresp, err := kvc.Range(context.TODO(), rreq)
  997. if err != nil {
  998. t.Errorf("couldn't get range on test %v (%v)", i, err)
  999. }
  1000. if dresp.Header.Revision != rresp.Header.Revision {
  1001. t.Errorf("expected revision %v, got %v",
  1002. dresp.Header.Revision, rresp.Header.Revision)
  1003. }
  1004. keys := [][]byte{}
  1005. for j := range rresp.Kvs {
  1006. keys = append(keys, rresp.Kvs[j].Key)
  1007. }
  1008. if !reflect.DeepEqual(tt.wantSet, keys) {
  1009. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  1010. }
  1011. // can't defer because tcp ports will be in use
  1012. clus.Terminate(t)
  1013. }
  1014. }
  1015. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  1016. func TestV3TxnInvalidRange(t *testing.T) {
  1017. defer testutil.AfterTest(t)
  1018. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1019. defer clus.Terminate(t)
  1020. kvc := toGRPC(clus.RandClient()).KV
  1021. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1022. for i := 0; i < 3; i++ {
  1023. _, err := kvc.Put(context.Background(), preq)
  1024. if err != nil {
  1025. t.Fatalf("couldn't put key (%v)", err)
  1026. }
  1027. }
  1028. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  1029. if err != nil {
  1030. t.Fatalf("couldn't compact kv space (%v)", err)
  1031. }
  1032. // future rev
  1033. txn := &pb.TxnRequest{}
  1034. txn.Success = append(txn.Success, &pb.RequestOp{
  1035. Request: &pb.RequestOp_RequestPut{
  1036. RequestPut: preq}})
  1037. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  1038. txn.Success = append(txn.Success, &pb.RequestOp{
  1039. Request: &pb.RequestOp_RequestRange{
  1040. RequestRange: rreq}})
  1041. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  1042. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  1043. }
  1044. // compacted rev
  1045. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  1046. tv.RequestRange.Revision = 1
  1047. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  1048. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  1049. }
  1050. }
  1051. func TestV3TooLargeRequest(t *testing.T) {
  1052. defer testutil.AfterTest(t)
  1053. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1054. defer clus.Terminate(t)
  1055. kvc := toGRPC(clus.RandClient()).KV
  1056. // 2MB request value
  1057. largeV := make([]byte, 2*1024*1024)
  1058. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  1059. _, err := kvc.Put(context.Background(), preq)
  1060. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  1061. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  1062. }
  1063. }
  1064. // TestV3Hash tests hash.
  1065. func TestV3Hash(t *testing.T) {
  1066. defer testutil.AfterTest(t)
  1067. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1068. defer clus.Terminate(t)
  1069. cli := clus.RandClient()
  1070. kvc := toGRPC(cli).KV
  1071. m := toGRPC(cli).Maintenance
  1072. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1073. for i := 0; i < 3; i++ {
  1074. _, err := kvc.Put(context.Background(), preq)
  1075. if err != nil {
  1076. t.Fatalf("couldn't put key (%v)", err)
  1077. }
  1078. }
  1079. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  1080. if err != nil || resp.Hash == 0 {
  1081. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1082. }
  1083. }
  1084. // TestV3HashRestart ensures that hash stays the same after restart.
  1085. func TestV3HashRestart(t *testing.T) {
  1086. defer testutil.AfterTest(t)
  1087. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  1088. defer clus.Terminate(t)
  1089. cli := clus.RandClient()
  1090. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1091. if err != nil || resp.Hash == 0 {
  1092. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1093. }
  1094. hash1 := resp.Hash
  1095. clus.Members[0].Stop(t)
  1096. clus.Members[0].Restart(t)
  1097. clus.waitLeader(t, clus.Members)
  1098. kvc := toGRPC(clus.Client(0)).KV
  1099. waitForRestart(t, kvc)
  1100. cli = clus.RandClient()
  1101. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1102. if err != nil || resp.Hash == 0 {
  1103. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1104. }
  1105. hash2 := resp.Hash
  1106. if hash1 != hash2 {
  1107. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  1108. }
  1109. }
  1110. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  1111. func TestV3StorageQuotaAPI(t *testing.T) {
  1112. defer testutil.AfterTest(t)
  1113. quotasize := int64(16 * os.Getpagesize())
  1114. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1115. // Set a quota on one node
  1116. clus.Members[0].QuotaBackendBytes = quotasize
  1117. clus.Members[0].Stop(t)
  1118. clus.Members[0].Restart(t)
  1119. defer clus.Terminate(t)
  1120. kvc := toGRPC(clus.Client(0)).KV
  1121. waitForRestart(t, kvc)
  1122. key := []byte("abc")
  1123. // test small put that fits in quota
  1124. smallbuf := make([]byte, 512)
  1125. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1126. t.Fatal(err)
  1127. }
  1128. // test big put
  1129. bigbuf := make([]byte, quotasize)
  1130. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1131. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1132. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1133. }
  1134. // test big txn
  1135. puttxn := &pb.RequestOp{
  1136. Request: &pb.RequestOp_RequestPut{
  1137. RequestPut: &pb.PutRequest{
  1138. Key: key,
  1139. Value: bigbuf,
  1140. },
  1141. },
  1142. }
  1143. txnreq := &pb.TxnRequest{}
  1144. txnreq.Success = append(txnreq.Success, puttxn)
  1145. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  1146. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  1147. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1148. }
  1149. }
  1150. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  1151. func TestV3StorageQuotaApply(t *testing.T) {
  1152. testutil.AfterTest(t)
  1153. quotasize := int64(16 * os.Getpagesize())
  1154. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  1155. defer clus.Terminate(t)
  1156. kvc0 := toGRPC(clus.Client(0)).KV
  1157. kvc1 := toGRPC(clus.Client(1)).KV
  1158. // Set a quota on one node
  1159. clus.Members[0].QuotaBackendBytes = quotasize
  1160. clus.Members[0].Stop(t)
  1161. clus.Members[0].Restart(t)
  1162. clus.waitLeader(t, clus.Members)
  1163. waitForRestart(t, kvc0)
  1164. key := []byte("abc")
  1165. // test small put still works
  1166. smallbuf := make([]byte, 1024)
  1167. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1168. if serr != nil {
  1169. t.Fatal(serr)
  1170. }
  1171. // test big put
  1172. bigbuf := make([]byte, quotasize)
  1173. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1174. if err != nil {
  1175. t.Fatal(err)
  1176. }
  1177. // quorum get should work regardless of whether alarm is raised
  1178. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  1179. if err != nil {
  1180. t.Fatal(err)
  1181. }
  1182. // wait until alarm is raised for sure-- poll the alarms
  1183. stopc := time.After(5 * time.Second)
  1184. for {
  1185. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  1186. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  1187. if aerr != nil {
  1188. t.Fatal(aerr)
  1189. }
  1190. if len(resp.Alarms) != 0 {
  1191. break
  1192. }
  1193. select {
  1194. case <-stopc:
  1195. t.Fatalf("timed out waiting for alarm")
  1196. case <-time.After(10 * time.Millisecond):
  1197. }
  1198. }
  1199. // small quota machine should reject put
  1200. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1201. t.Fatalf("past-quota instance should reject put")
  1202. }
  1203. // large quota machine should reject put
  1204. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1205. t.Fatalf("past-quota instance should reject put")
  1206. }
  1207. // reset large quota node to ensure alarm persisted
  1208. clus.Members[1].Stop(t)
  1209. clus.Members[1].Restart(t)
  1210. clus.waitLeader(t, clus.Members)
  1211. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1212. t.Fatalf("alarmed instance should reject put after reset")
  1213. }
  1214. }
  1215. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  1216. func TestV3AlarmDeactivate(t *testing.T) {
  1217. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1218. defer clus.Terminate(t)
  1219. kvc := toGRPC(clus.RandClient()).KV
  1220. mt := toGRPC(clus.RandClient()).Maintenance
  1221. alarmReq := &pb.AlarmRequest{
  1222. MemberID: 123,
  1223. Action: pb.AlarmRequest_ACTIVATE,
  1224. Alarm: pb.AlarmType_NOSPACE,
  1225. }
  1226. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  1227. t.Fatal(err)
  1228. }
  1229. key := []byte("abc")
  1230. smallbuf := make([]byte, 512)
  1231. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1232. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1233. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1234. }
  1235. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  1236. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  1237. t.Fatal(err)
  1238. }
  1239. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1240. t.Fatal(err)
  1241. }
  1242. }
  1243. func TestV3RangeRequest(t *testing.T) {
  1244. defer testutil.AfterTest(t)
  1245. tests := []struct {
  1246. putKeys []string
  1247. reqs []pb.RangeRequest
  1248. wresps [][]string
  1249. wmores []bool
  1250. }{
  1251. // single key
  1252. {
  1253. []string{"foo", "bar"},
  1254. []pb.RangeRequest{
  1255. // exists
  1256. {Key: []byte("foo")},
  1257. // doesn't exist
  1258. {Key: []byte("baz")},
  1259. },
  1260. [][]string{
  1261. {"foo"},
  1262. {},
  1263. },
  1264. []bool{false, false},
  1265. },
  1266. // multi-key
  1267. {
  1268. []string{"a", "b", "c", "d", "e"},
  1269. []pb.RangeRequest{
  1270. // all in range
  1271. {Key: []byte("a"), RangeEnd: []byte("z")},
  1272. // [b, d)
  1273. {Key: []byte("b"), RangeEnd: []byte("d")},
  1274. // out of range
  1275. {Key: []byte("f"), RangeEnd: []byte("z")},
  1276. // [c,c) = empty
  1277. {Key: []byte("c"), RangeEnd: []byte("c")},
  1278. // [d, b) = empty
  1279. {Key: []byte("d"), RangeEnd: []byte("b")},
  1280. // ["\0", "\0") => all in range
  1281. {Key: []byte{0}, RangeEnd: []byte{0}},
  1282. },
  1283. [][]string{
  1284. {"a", "b", "c", "d", "e"},
  1285. {"b", "c"},
  1286. {},
  1287. {},
  1288. {},
  1289. {"a", "b", "c", "d", "e"},
  1290. },
  1291. []bool{false, false, false, false, false, false},
  1292. },
  1293. // revision
  1294. {
  1295. []string{"a", "b", "c", "d", "e"},
  1296. []pb.RangeRequest{
  1297. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1298. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1299. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1300. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1301. },
  1302. [][]string{
  1303. {"a", "b", "c", "d", "e"},
  1304. {},
  1305. {"a"},
  1306. {"a", "b"},
  1307. },
  1308. []bool{false, false, false, false},
  1309. },
  1310. // limit
  1311. {
  1312. []string{"foo", "bar"},
  1313. []pb.RangeRequest{
  1314. // more
  1315. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1316. // no more
  1317. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1318. },
  1319. [][]string{
  1320. {"bar"},
  1321. {"bar", "foo"},
  1322. },
  1323. []bool{true, false},
  1324. },
  1325. // sort
  1326. {
  1327. []string{"b", "a", "c", "d", "c"},
  1328. []pb.RangeRequest{
  1329. {
  1330. Key: []byte("a"), RangeEnd: []byte("z"),
  1331. Limit: 1,
  1332. SortOrder: pb.RangeRequest_ASCEND,
  1333. SortTarget: pb.RangeRequest_KEY,
  1334. },
  1335. {
  1336. Key: []byte("a"), RangeEnd: []byte("z"),
  1337. Limit: 1,
  1338. SortOrder: pb.RangeRequest_DESCEND,
  1339. SortTarget: pb.RangeRequest_KEY,
  1340. },
  1341. {
  1342. Key: []byte("a"), RangeEnd: []byte("z"),
  1343. Limit: 1,
  1344. SortOrder: pb.RangeRequest_ASCEND,
  1345. SortTarget: pb.RangeRequest_CREATE,
  1346. },
  1347. {
  1348. Key: []byte("a"), RangeEnd: []byte("z"),
  1349. Limit: 1,
  1350. SortOrder: pb.RangeRequest_DESCEND,
  1351. SortTarget: pb.RangeRequest_MOD,
  1352. },
  1353. {
  1354. Key: []byte("z"), RangeEnd: []byte("z"),
  1355. Limit: 1,
  1356. SortOrder: pb.RangeRequest_DESCEND,
  1357. SortTarget: pb.RangeRequest_CREATE,
  1358. },
  1359. { // sort ASCEND by default
  1360. Key: []byte("a"), RangeEnd: []byte("z"),
  1361. Limit: 10,
  1362. SortOrder: pb.RangeRequest_NONE,
  1363. SortTarget: pb.RangeRequest_CREATE,
  1364. },
  1365. },
  1366. [][]string{
  1367. {"a"},
  1368. {"d"},
  1369. {"b"},
  1370. {"c"},
  1371. {},
  1372. {"b", "a", "c", "d"},
  1373. },
  1374. []bool{true, true, true, true, false, false},
  1375. },
  1376. // min/max mod rev
  1377. {
  1378. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1379. []pb.RangeRequest{
  1380. {
  1381. Key: []byte{0}, RangeEnd: []byte{0},
  1382. MinModRevision: 3,
  1383. },
  1384. {
  1385. Key: []byte{0}, RangeEnd: []byte{0},
  1386. MaxModRevision: 3,
  1387. },
  1388. {
  1389. Key: []byte{0}, RangeEnd: []byte{0},
  1390. MinModRevision: 3,
  1391. MaxModRevision: 5,
  1392. },
  1393. {
  1394. Key: []byte{0}, RangeEnd: []byte{0},
  1395. MaxModRevision: 10,
  1396. },
  1397. },
  1398. [][]string{
  1399. {"rev3", "rev4", "rev5", "rev6"},
  1400. {"rev2", "rev3"},
  1401. {"rev3", "rev4", "rev5"},
  1402. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1403. },
  1404. []bool{false, false, false, false},
  1405. },
  1406. // min/max create rev
  1407. {
  1408. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1409. []pb.RangeRequest{
  1410. {
  1411. Key: []byte{0}, RangeEnd: []byte{0},
  1412. MinCreateRevision: 3,
  1413. },
  1414. {
  1415. Key: []byte{0}, RangeEnd: []byte{0},
  1416. MaxCreateRevision: 3,
  1417. },
  1418. {
  1419. Key: []byte{0}, RangeEnd: []byte{0},
  1420. MinCreateRevision: 3,
  1421. MaxCreateRevision: 5,
  1422. },
  1423. {
  1424. Key: []byte{0}, RangeEnd: []byte{0},
  1425. MaxCreateRevision: 10,
  1426. },
  1427. },
  1428. [][]string{
  1429. {"rev3", "rev6"},
  1430. {"rev2", "rev3"},
  1431. {"rev3"},
  1432. {"rev2", "rev3", "rev6"},
  1433. },
  1434. []bool{false, false, false, false},
  1435. },
  1436. }
  1437. for i, tt := range tests {
  1438. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1439. for _, k := range tt.putKeys {
  1440. kvc := toGRPC(clus.RandClient()).KV
  1441. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1442. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1443. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1444. }
  1445. }
  1446. for j, req := range tt.reqs {
  1447. kvc := toGRPC(clus.RandClient()).KV
  1448. resp, err := kvc.Range(context.TODO(), &req)
  1449. if err != nil {
  1450. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1451. continue
  1452. }
  1453. if len(resp.Kvs) != len(tt.wresps[j]) {
  1454. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1455. continue
  1456. }
  1457. for k, wKey := range tt.wresps[j] {
  1458. respKey := string(resp.Kvs[k].Key)
  1459. if respKey != wKey {
  1460. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1461. }
  1462. }
  1463. if resp.More != tt.wmores[j] {
  1464. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1465. }
  1466. wrev := int64(len(tt.putKeys) + 1)
  1467. if resp.Header.Revision != wrev {
  1468. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1469. }
  1470. }
  1471. clus.Terminate(t)
  1472. }
  1473. }
  1474. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1475. cfg.UseGRPC = true
  1476. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1477. clus.Launch(t)
  1478. return clus
  1479. }
  1480. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1481. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1482. defer testutil.AfterTest(t)
  1483. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1484. clus := newClusterV3NoClients(t, &cfg)
  1485. defer clus.Terminate(t)
  1486. // nil out TLS field so client will use an insecure connection
  1487. clus.Members[0].ClientTLSInfo = nil
  1488. client, err := NewClientV3(clus.Members[0])
  1489. if err != nil && err != grpc.ErrClientConnTimeout {
  1490. t.Fatalf("unexpected error (%v)", err)
  1491. } else if client == nil {
  1492. // Ideally, no client would be returned. However, grpc will
  1493. // return a connection without trying to handshake first so
  1494. // the connection appears OK.
  1495. return
  1496. }
  1497. defer client.Close()
  1498. donec := make(chan error, 1)
  1499. go func() {
  1500. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1501. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1502. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1503. cancel()
  1504. donec <- perr
  1505. }()
  1506. if perr := <-donec; perr == nil {
  1507. t.Fatalf("expected client error on put")
  1508. }
  1509. }
  1510. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1511. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1512. defer testutil.AfterTest(t)
  1513. cfg := ClusterConfig{Size: 3}
  1514. clus := newClusterV3NoClients(t, &cfg)
  1515. defer clus.Terminate(t)
  1516. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1517. client, err := NewClientV3(clus.Members[0])
  1518. if client != nil || err == nil {
  1519. t.Fatalf("expected no client")
  1520. } else if err != grpc.ErrClientConnTimeout {
  1521. t.Fatalf("unexpected error (%v)", err)
  1522. }
  1523. }
  1524. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1525. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1526. defer testutil.AfterTest(t)
  1527. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1528. clus := newClusterV3NoClients(t, &cfg)
  1529. defer clus.Terminate(t)
  1530. client, err := NewClientV3(clus.Members[0])
  1531. if err != nil {
  1532. t.Fatalf("expected tls client (%v)", err)
  1533. }
  1534. defer client.Close()
  1535. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1536. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1537. t.Fatalf("unexpected error on put over tls (%v)", err)
  1538. }
  1539. }
  1540. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1541. // when all certs are atomically replaced by directory renaming.
  1542. // And expects server to reject client requests, and vice versa.
  1543. func TestTLSReloadAtomicReplace(t *testing.T) {
  1544. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1545. if err != nil {
  1546. t.Fatal(err)
  1547. }
  1548. os.RemoveAll(tmpDir)
  1549. defer os.RemoveAll(tmpDir)
  1550. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1551. if err != nil {
  1552. t.Fatal(err)
  1553. }
  1554. defer os.RemoveAll(certsDir)
  1555. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1556. if err != nil {
  1557. t.Fatal(err)
  1558. }
  1559. defer os.RemoveAll(certsDirExp)
  1560. cloneFunc := func() transport.TLSInfo {
  1561. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1562. if terr != nil {
  1563. t.Fatal(terr)
  1564. }
  1565. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1566. t.Fatal(err)
  1567. }
  1568. return tlsInfo
  1569. }
  1570. replaceFunc := func() {
  1571. if err = os.Rename(certsDir, tmpDir); err != nil {
  1572. t.Fatal(err)
  1573. }
  1574. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1575. t.Fatal(err)
  1576. }
  1577. // after rename,
  1578. // 'certsDir' contains expired certs
  1579. // 'tmpDir' contains valid certs
  1580. // 'certsDirExp' does not exist
  1581. }
  1582. revertFunc := func() {
  1583. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1584. t.Fatal(err)
  1585. }
  1586. if err = os.Rename(certsDir, tmpDir); err != nil {
  1587. t.Fatal(err)
  1588. }
  1589. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1590. t.Fatal(err)
  1591. }
  1592. }
  1593. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1594. }
  1595. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1596. // when new certs are copied over, one by one. And expects server
  1597. // to reject client requests, and vice versa.
  1598. func TestTLSReloadCopy(t *testing.T) {
  1599. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1600. if err != nil {
  1601. t.Fatal(err)
  1602. }
  1603. defer os.RemoveAll(certsDir)
  1604. cloneFunc := func() transport.TLSInfo {
  1605. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1606. if terr != nil {
  1607. t.Fatal(terr)
  1608. }
  1609. return tlsInfo
  1610. }
  1611. replaceFunc := func() {
  1612. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1613. t.Fatal(err)
  1614. }
  1615. }
  1616. revertFunc := func() {
  1617. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1618. t.Fatal(err)
  1619. }
  1620. }
  1621. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1622. }
  1623. func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
  1624. defer testutil.AfterTest(t)
  1625. // 1. separate copies for TLS assets modification
  1626. tlsInfo := cloneFunc()
  1627. // 2. start cluster with valid certs
  1628. clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
  1629. defer clus.Terminate(t)
  1630. // 3. concurrent client dialing while certs become expired
  1631. errc := make(chan error, 1)
  1632. go func() {
  1633. for {
  1634. cc, err := tlsInfo.ClientConfig()
  1635. if err != nil {
  1636. // errors in 'go/src/crypto/tls/tls.go'
  1637. // tls: private key does not match public key
  1638. // tls: failed to find any PEM data in key input
  1639. // tls: failed to find any PEM data in certificate input
  1640. // Or 'does not exist', 'not found', etc
  1641. t.Log(err)
  1642. continue
  1643. }
  1644. cli, cerr := clientv3.New(clientv3.Config{
  1645. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1646. DialTimeout: time.Second,
  1647. TLS: cc,
  1648. })
  1649. if cerr != nil {
  1650. errc <- cerr
  1651. return
  1652. }
  1653. cli.Close()
  1654. }
  1655. }()
  1656. // 4. replace certs with expired ones
  1657. replaceFunc()
  1658. // 5. expect dial time-out when loading expired certs
  1659. select {
  1660. case gerr := <-errc:
  1661. if gerr != grpc.ErrClientConnTimeout {
  1662. t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr)
  1663. }
  1664. case <-time.After(5 * time.Second):
  1665. t.Fatal("failed to receive dial timeout error")
  1666. }
  1667. // 6. replace expired certs back with valid ones
  1668. revertFunc()
  1669. // 7. new requests should trigger listener to reload valid certs
  1670. tls, terr := tlsInfo.ClientConfig()
  1671. if terr != nil {
  1672. t.Fatal(terr)
  1673. }
  1674. cl, cerr := clientv3.New(clientv3.Config{
  1675. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1676. DialTimeout: 5 * time.Second,
  1677. TLS: tls,
  1678. })
  1679. if cerr != nil {
  1680. t.Fatalf("expected no error, got %v", cerr)
  1681. }
  1682. cl.Close()
  1683. }
  1684. func TestGRPCRequireLeader(t *testing.T) {
  1685. defer testutil.AfterTest(t)
  1686. cfg := ClusterConfig{Size: 3}
  1687. clus := newClusterV3NoClients(t, &cfg)
  1688. defer clus.Terminate(t)
  1689. clus.Members[1].Stop(t)
  1690. clus.Members[2].Stop(t)
  1691. client, err := NewClientV3(clus.Members[0])
  1692. if err != nil {
  1693. t.Fatalf("cannot create client: %v", err)
  1694. }
  1695. defer client.Close()
  1696. // wait for election timeout, then member[0] will not have a leader.
  1697. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1698. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1699. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1700. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1701. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1702. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1703. }
  1704. }
  1705. func TestGRPCStreamRequireLeader(t *testing.T) {
  1706. defer testutil.AfterTest(t)
  1707. cfg := ClusterConfig{Size: 3}
  1708. clus := newClusterV3NoClients(t, &cfg)
  1709. defer clus.Terminate(t)
  1710. client, err := NewClientV3(clus.Members[0])
  1711. if err != nil {
  1712. t.Fatalf("failed to create client (%v)", err)
  1713. }
  1714. defer client.Close()
  1715. wAPI := toGRPC(client).Watch
  1716. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1717. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1718. wStream, err := wAPI.Watch(ctx)
  1719. if err != nil {
  1720. t.Fatalf("wAPI.Watch error: %v", err)
  1721. }
  1722. clus.Members[1].Stop(t)
  1723. clus.Members[2].Stop(t)
  1724. // existing stream should be rejected
  1725. _, err = wStream.Recv()
  1726. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1727. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1728. }
  1729. // new stream should also be rejected
  1730. wStream, err = wAPI.Watch(ctx)
  1731. if err != nil {
  1732. t.Fatalf("wAPI.Watch error: %v", err)
  1733. }
  1734. _, err = wStream.Recv()
  1735. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1736. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1737. }
  1738. clus.Members[1].Restart(t)
  1739. clus.Members[2].Restart(t)
  1740. clus.waitLeader(t, clus.Members)
  1741. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1742. // new stream should also be OK now after we restarted the other members
  1743. wStream, err = wAPI.Watch(ctx)
  1744. if err != nil {
  1745. t.Fatalf("wAPI.Watch error: %v", err)
  1746. }
  1747. wreq := &pb.WatchRequest{
  1748. RequestUnion: &pb.WatchRequest_CreateRequest{
  1749. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1750. },
  1751. }
  1752. err = wStream.Send(wreq)
  1753. if err != nil {
  1754. t.Errorf("err = %v, want nil", err)
  1755. }
  1756. }
  1757. // TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
  1758. func TestV3PutLargeRequests(t *testing.T) {
  1759. defer testutil.AfterTest(t)
  1760. tests := []struct {
  1761. key string
  1762. maxRequestBytes uint
  1763. valueSize int
  1764. expectError error
  1765. }{
  1766. // don't set to 0. use 0 as the default.
  1767. {"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1768. {"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1769. {"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1770. {"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1771. }
  1772. for i, test := range tests {
  1773. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1774. kvcli := toGRPC(clus.Client(0)).KV
  1775. reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
  1776. _, err := kvcli.Put(context.TODO(), reqput)
  1777. if !eqErrGRPC(err, test.expectError) {
  1778. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1779. }
  1780. clus.Terminate(t)
  1781. }
  1782. }
  1783. func eqErrGRPC(err1 error, err2 error) bool {
  1784. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1785. }
  1786. // waitForRestart tries a range request until the client's server responds.
  1787. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1788. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1789. // FailFast=false works with Put.
  1790. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1791. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1792. if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1793. t.Fatal(err)
  1794. }
  1795. }