v3_grpc_test.go 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "os"
  22. "reflect"
  23. "testing"
  24. "time"
  25. "go.etcd.io/etcd/clientv3"
  26. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  27. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  28. "go.etcd.io/etcd/pkg/testutil"
  29. "go.etcd.io/etcd/pkg/transport"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/metadata"
  33. "google.golang.org/grpc/status"
  34. )
  35. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  36. // overwrites it, then checks that the change was applied.
  37. func TestV3PutOverwrite(t *testing.T) {
  38. defer testutil.AfterTest(t)
  39. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  40. defer clus.Terminate(t)
  41. kvc := toGRPC(clus.RandClient()).KV
  42. key := []byte("foo")
  43. reqput := &pb.PutRequest{Key: key, Value: []byte("bar"), PrevKv: true}
  44. respput, err := kvc.Put(context.TODO(), reqput)
  45. if err != nil {
  46. t.Fatalf("couldn't put key (%v)", err)
  47. }
  48. // overwrite
  49. reqput.Value = []byte("baz")
  50. respput2, err := kvc.Put(context.TODO(), reqput)
  51. if err != nil {
  52. t.Fatalf("couldn't put key (%v)", err)
  53. }
  54. if respput2.Header.Revision <= respput.Header.Revision {
  55. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  56. respput2.Header.Revision, respput.Header.Revision)
  57. }
  58. if pkv := respput2.PrevKv; pkv == nil || string(pkv.Value) != "bar" {
  59. t.Fatalf("expected PrevKv=bar, got response %+v", respput2)
  60. }
  61. reqrange := &pb.RangeRequest{Key: key}
  62. resprange, err := kvc.Range(context.TODO(), reqrange)
  63. if err != nil {
  64. t.Fatalf("couldn't get key (%v)", err)
  65. }
  66. if len(resprange.Kvs) != 1 {
  67. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  68. }
  69. kv := resprange.Kvs[0]
  70. if kv.ModRevision <= kv.CreateRevision {
  71. t.Errorf("expected modRev > createRev, got %d <= %d",
  72. kv.ModRevision, kv.CreateRevision)
  73. }
  74. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  75. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  76. }
  77. }
  78. // TestPutRestart checks if a put after an unrelated member restart succeeds
  79. func TestV3PutRestart(t *testing.T) {
  80. defer testutil.AfterTest(t)
  81. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  82. defer clus.Terminate(t)
  83. kvIdx := rand.Intn(3)
  84. kvc := toGRPC(clus.Client(kvIdx)).KV
  85. stopIdx := kvIdx
  86. for stopIdx == kvIdx {
  87. stopIdx = rand.Intn(3)
  88. }
  89. clus.clients[stopIdx].Close()
  90. clus.Members[stopIdx].Stop(t)
  91. clus.Members[stopIdx].Restart(t)
  92. c, cerr := NewClientV3(clus.Members[stopIdx])
  93. if cerr != nil {
  94. t.Fatalf("cannot create client: %v", cerr)
  95. }
  96. clus.clients[stopIdx] = c
  97. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  98. defer cancel()
  99. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  100. _, err := kvc.Put(ctx, reqput)
  101. if err != nil && err == ctx.Err() {
  102. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  103. }
  104. }
  105. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  106. func TestV3CompactCurrentRev(t *testing.T) {
  107. defer testutil.AfterTest(t)
  108. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  109. defer clus.Terminate(t)
  110. kvc := toGRPC(clus.RandClient()).KV
  111. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  112. for i := 0; i < 3; i++ {
  113. if _, err := kvc.Put(context.Background(), preq); err != nil {
  114. t.Fatalf("couldn't put key (%v)", err)
  115. }
  116. }
  117. // get key to add to proxy cache, if any
  118. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  119. t.Fatal(err)
  120. }
  121. // compact on current revision
  122. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  123. if err != nil {
  124. t.Fatalf("couldn't compact kv space (%v)", err)
  125. }
  126. // key still exists when linearized?
  127. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  128. if err != nil {
  129. t.Fatalf("couldn't get key after compaction (%v)", err)
  130. }
  131. // key still exists when serialized?
  132. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  133. if err != nil {
  134. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  135. }
  136. }
  137. // TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
  138. func TestV3HashKV(t *testing.T) {
  139. defer testutil.AfterTest(t)
  140. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  141. defer clus.Terminate(t)
  142. kvc := toGRPC(clus.RandClient()).KV
  143. mvc := toGRPC(clus.RandClient()).Maintenance
  144. for i := 0; i < 10; i++ {
  145. resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. rev := resp.Header.Revision
  150. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{Revision: 0})
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. if rev != hresp.Header.Revision {
  155. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  156. }
  157. prevHash := hresp.Hash
  158. prevCompactRev := hresp.CompactRevision
  159. for i := 0; i < 10; i++ {
  160. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{Revision: 0})
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. if rev != hresp.Header.Revision {
  165. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  166. }
  167. if prevHash != hresp.Hash {
  168. t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
  169. }
  170. if prevCompactRev != hresp.CompactRevision {
  171. t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
  172. }
  173. prevHash = hresp.Hash
  174. prevCompactRev = hresp.CompactRevision
  175. }
  176. }
  177. }
  178. func TestV3TxnTooManyOps(t *testing.T) {
  179. defer testutil.AfterTest(t)
  180. maxTxnOps := uint(128)
  181. clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
  182. defer clus.Terminate(t)
  183. kvc := toGRPC(clus.RandClient()).KV
  184. // unique keys
  185. i := new(int)
  186. keyf := func() []byte {
  187. *i++
  188. return []byte(fmt.Sprintf("key-%d", i))
  189. }
  190. addCompareOps := func(txn *pb.TxnRequest) {
  191. txn.Compare = append(txn.Compare,
  192. &pb.Compare{
  193. Result: pb.Compare_GREATER,
  194. Target: pb.Compare_CREATE,
  195. Key: keyf(),
  196. })
  197. }
  198. addSuccessOps := func(txn *pb.TxnRequest) {
  199. txn.Success = append(txn.Success,
  200. &pb.RequestOp{
  201. Request: &pb.RequestOp_RequestPut{
  202. RequestPut: &pb.PutRequest{
  203. Key: keyf(),
  204. Value: []byte("bar"),
  205. },
  206. },
  207. })
  208. }
  209. addFailureOps := func(txn *pb.TxnRequest) {
  210. txn.Failure = append(txn.Failure,
  211. &pb.RequestOp{
  212. Request: &pb.RequestOp_RequestPut{
  213. RequestPut: &pb.PutRequest{
  214. Key: keyf(),
  215. Value: []byte("bar"),
  216. },
  217. },
  218. })
  219. }
  220. addTxnOps := func(txn *pb.TxnRequest) {
  221. newTxn := &pb.TxnRequest{}
  222. addSuccessOps(newTxn)
  223. txn.Success = append(txn.Success,
  224. &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  225. RequestTxn: newTxn,
  226. },
  227. },
  228. )
  229. }
  230. tests := []func(txn *pb.TxnRequest){
  231. addCompareOps,
  232. addSuccessOps,
  233. addFailureOps,
  234. addTxnOps,
  235. }
  236. for i, tt := range tests {
  237. txn := &pb.TxnRequest{}
  238. for j := 0; j < int(maxTxnOps+1); j++ {
  239. tt(txn)
  240. }
  241. _, err := kvc.Txn(context.Background(), txn)
  242. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  243. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  244. }
  245. }
  246. }
  247. func TestV3TxnDuplicateKeys(t *testing.T) {
  248. defer testutil.AfterTest(t)
  249. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  250. defer clus.Terminate(t)
  251. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  252. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  253. RequestDeleteRange: &pb.DeleteRangeRequest{
  254. Key: []byte("abc"),
  255. },
  256. },
  257. }
  258. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  259. RequestDeleteRange: &pb.DeleteRangeRequest{
  260. Key: []byte("a"), RangeEnd: []byte("b"),
  261. },
  262. },
  263. }
  264. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  265. RequestDeleteRange: &pb.DeleteRangeRequest{
  266. Key: []byte("abb"), RangeEnd: []byte("abc"),
  267. },
  268. },
  269. }
  270. txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  271. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
  272. },
  273. }
  274. txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  275. RequestTxn: &pb.TxnRequest{
  276. Success: []*pb.RequestOp{delInRangeReq},
  277. Failure: []*pb.RequestOp{delInRangeReq}},
  278. },
  279. }
  280. txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  281. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
  282. },
  283. }
  284. txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  285. RequestTxn: &pb.TxnRequest{
  286. Success: []*pb.RequestOp{putreq},
  287. Failure: []*pb.RequestOp{putreq}},
  288. },
  289. }
  290. kvc := toGRPC(clus.RandClient()).KV
  291. tests := []struct {
  292. txnSuccess []*pb.RequestOp
  293. werr error
  294. }{
  295. {
  296. txnSuccess: []*pb.RequestOp{putreq, putreq},
  297. werr: rpctypes.ErrGRPCDuplicateKey,
  298. },
  299. {
  300. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  301. werr: rpctypes.ErrGRPCDuplicateKey,
  302. },
  303. {
  304. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  305. werr: rpctypes.ErrGRPCDuplicateKey,
  306. },
  307. // Then(Put(a), Then(Del(a)))
  308. {
  309. txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
  310. werr: rpctypes.ErrGRPCDuplicateKey,
  311. },
  312. // Then(Del(a), Then(Put(a)))
  313. {
  314. txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
  315. werr: rpctypes.ErrGRPCDuplicateKey,
  316. },
  317. // Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
  318. {
  319. txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
  320. werr: rpctypes.ErrGRPCDuplicateKey,
  321. },
  322. // Then(Del(x), (Then(Put(a)), Else(Put(a))))
  323. {
  324. txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
  325. werr: nil,
  326. },
  327. // Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
  328. {
  329. txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
  330. werr: nil,
  331. },
  332. {
  333. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  334. werr: nil,
  335. },
  336. {
  337. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  338. werr: nil,
  339. },
  340. }
  341. for i, tt := range tests {
  342. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  343. _, err := kvc.Txn(context.Background(), txn)
  344. if !eqErrGRPC(err, tt.werr) {
  345. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  346. }
  347. }
  348. }
  349. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  350. func TestV3TxnRevision(t *testing.T) {
  351. defer testutil.AfterTest(t)
  352. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  353. defer clus.Terminate(t)
  354. kvc := toGRPC(clus.RandClient()).KV
  355. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  356. presp, err := kvc.Put(context.TODO(), pr)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  361. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  362. tresp, err := kvc.Txn(context.TODO(), txn)
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. // did not update revision
  367. if presp.Header.Revision != tresp.Header.Revision {
  368. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  369. }
  370. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  371. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  372. tresp, err = kvc.Txn(context.TODO(), txn)
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. // did not update revision
  377. if presp.Header.Revision != tresp.Header.Revision {
  378. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  379. }
  380. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  381. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  382. tresp, err = kvc.Txn(context.TODO(), txn)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. // updated revision
  387. if tresp.Header.Revision != presp.Header.Revision+1 {
  388. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  389. }
  390. }
  391. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  392. // when compared to the Succeeded field in the txn response.
  393. func TestV3TxnCmpHeaderRev(t *testing.T) {
  394. defer testutil.AfterTest(t)
  395. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  396. defer clus.Terminate(t)
  397. kvc := toGRPC(clus.RandClient()).KV
  398. for i := 0; i < 10; i++ {
  399. // Concurrently put a key with a txn comparing on it.
  400. revc := make(chan int64, 1)
  401. go func() {
  402. defer close(revc)
  403. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  404. presp, err := kvc.Put(context.TODO(), pr)
  405. if err != nil {
  406. t.Fatal(err)
  407. }
  408. revc <- presp.Header.Revision
  409. }()
  410. // The read-only txn uses the optimized readindex server path.
  411. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  412. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  413. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  414. // i = 0 /\ Succeeded => put followed txn
  415. cmp := &pb.Compare{
  416. Result: pb.Compare_EQUAL,
  417. Target: pb.Compare_VERSION,
  418. Key: []byte("k"),
  419. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  420. }
  421. txn.Compare = append(txn.Compare, cmp)
  422. tresp, err := kvc.Txn(context.TODO(), txn)
  423. if err != nil {
  424. t.Fatal(err)
  425. }
  426. prev := <-revc
  427. // put followed txn; should eval to false
  428. if prev > tresp.Header.Revision && !tresp.Succeeded {
  429. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  430. }
  431. // txn follows put; should eval to true
  432. if tresp.Header.Revision >= prev && tresp.Succeeded {
  433. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  434. }
  435. }
  436. }
  437. // TestV3TxnRangeCompare tests range comparisons in txns
  438. func TestV3TxnRangeCompare(t *testing.T) {
  439. defer testutil.AfterTest(t)
  440. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  441. defer clus.Terminate(t)
  442. // put keys, named by expected revision
  443. for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
  444. if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
  445. t.Fatal(err)
  446. }
  447. }
  448. tests := []struct {
  449. cmp pb.Compare
  450. wSuccess bool
  451. }{
  452. {
  453. // >= /a/; all create revs fit
  454. pb.Compare{
  455. Key: []byte("/a/"),
  456. RangeEnd: []byte{0},
  457. Target: pb.Compare_CREATE,
  458. Result: pb.Compare_LESS,
  459. TargetUnion: &pb.Compare_CreateRevision{CreateRevision: 6},
  460. },
  461. true,
  462. },
  463. {
  464. // >= /a/; one create rev doesn't fit
  465. pb.Compare{
  466. Key: []byte("/a/"),
  467. RangeEnd: []byte{0},
  468. Target: pb.Compare_CREATE,
  469. Result: pb.Compare_LESS,
  470. TargetUnion: &pb.Compare_CreateRevision{CreateRevision: 5},
  471. },
  472. false,
  473. },
  474. {
  475. // prefix /a/*; all create revs fit
  476. pb.Compare{
  477. Key: []byte("/a/"),
  478. RangeEnd: []byte("/a0"),
  479. Target: pb.Compare_CREATE,
  480. Result: pb.Compare_LESS,
  481. TargetUnion: &pb.Compare_CreateRevision{CreateRevision: 5},
  482. },
  483. true,
  484. },
  485. {
  486. // prefix /a/*; one create rev doesn't fit
  487. pb.Compare{
  488. Key: []byte("/a/"),
  489. RangeEnd: []byte("/a0"),
  490. Target: pb.Compare_CREATE,
  491. Result: pb.Compare_LESS,
  492. TargetUnion: &pb.Compare_CreateRevision{CreateRevision: 4},
  493. },
  494. false,
  495. },
  496. {
  497. // does not exist, does not succeed
  498. pb.Compare{
  499. Key: []byte("/b/"),
  500. RangeEnd: []byte("/b0"),
  501. Target: pb.Compare_VALUE,
  502. Result: pb.Compare_EQUAL,
  503. TargetUnion: &pb.Compare_Value{Value: []byte("x")},
  504. },
  505. false,
  506. },
  507. {
  508. // all keys are leased
  509. pb.Compare{
  510. Key: []byte("/a/"),
  511. RangeEnd: []byte("/a0"),
  512. Target: pb.Compare_LEASE,
  513. Result: pb.Compare_GREATER,
  514. TargetUnion: &pb.Compare_Lease{Lease: 0},
  515. },
  516. false,
  517. },
  518. {
  519. // no keys are leased
  520. pb.Compare{
  521. Key: []byte("/a/"),
  522. RangeEnd: []byte("/a0"),
  523. Target: pb.Compare_LEASE,
  524. Result: pb.Compare_EQUAL,
  525. TargetUnion: &pb.Compare_Lease{Lease: 0},
  526. },
  527. true,
  528. },
  529. }
  530. kvc := toGRPC(clus.Client(0)).KV
  531. for i, tt := range tests {
  532. txn := &pb.TxnRequest{}
  533. txn.Compare = append(txn.Compare, &tt.cmp)
  534. tresp, err := kvc.Txn(context.TODO(), txn)
  535. if err != nil {
  536. t.Fatal(err)
  537. }
  538. if tt.wSuccess != tresp.Succeeded {
  539. t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
  540. }
  541. }
  542. }
  543. // TestV3TxnNested tests nested txns follow paths as expected.
  544. func TestV3TxnNestedPath(t *testing.T) {
  545. defer testutil.AfterTest(t)
  546. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  547. defer clus.Terminate(t)
  548. kvc := toGRPC(clus.RandClient()).KV
  549. cmpTrue := &pb.Compare{
  550. Result: pb.Compare_EQUAL,
  551. Target: pb.Compare_VERSION,
  552. Key: []byte("k"),
  553. TargetUnion: &pb.Compare_Version{Version: int64(0)},
  554. }
  555. cmpFalse := &pb.Compare{
  556. Result: pb.Compare_EQUAL,
  557. Target: pb.Compare_VERSION,
  558. Key: []byte("k"),
  559. TargetUnion: &pb.Compare_Version{Version: int64(1)},
  560. }
  561. // generate random path to eval txns
  562. topTxn := &pb.TxnRequest{}
  563. txn := topTxn
  564. txnPath := make([]bool, 10)
  565. for i := range txnPath {
  566. nextTxn := &pb.TxnRequest{}
  567. op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
  568. txnPath[i] = rand.Intn(2) == 0
  569. if txnPath[i] {
  570. txn.Compare = append(txn.Compare, cmpTrue)
  571. txn.Success = append(txn.Success, op)
  572. } else {
  573. txn.Compare = append(txn.Compare, cmpFalse)
  574. txn.Failure = append(txn.Failure, op)
  575. }
  576. txn = nextTxn
  577. }
  578. tresp, err := kvc.Txn(context.TODO(), topTxn)
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. curTxnResp := tresp
  583. for i := range txnPath {
  584. if curTxnResp.Succeeded != txnPath[i] {
  585. t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
  586. }
  587. curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
  588. }
  589. }
  590. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  591. func TestV3PutIgnoreValue(t *testing.T) {
  592. defer testutil.AfterTest(t)
  593. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  594. defer clus.Terminate(t)
  595. kvc := toGRPC(clus.RandClient()).KV
  596. key, val := []byte("foo"), []byte("bar")
  597. putReq := pb.PutRequest{Key: key, Value: val}
  598. // create lease
  599. lc := toGRPC(clus.RandClient()).Lease
  600. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. if lresp.Error != "" {
  605. t.Fatal(lresp.Error)
  606. }
  607. tests := []struct {
  608. putFunc func() error
  609. putErr error
  610. wleaseID int64
  611. }{
  612. { // put failure for non-existent key
  613. func() error {
  614. preq := putReq
  615. preq.IgnoreValue = true
  616. _, err := kvc.Put(context.TODO(), &preq)
  617. return err
  618. },
  619. rpctypes.ErrGRPCKeyNotFound,
  620. 0,
  621. },
  622. { // txn failure for non-existent key
  623. func() error {
  624. preq := putReq
  625. preq.Value = nil
  626. preq.IgnoreValue = true
  627. txn := &pb.TxnRequest{}
  628. txn.Success = append(txn.Success, &pb.RequestOp{
  629. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  630. _, err := kvc.Txn(context.TODO(), txn)
  631. return err
  632. },
  633. rpctypes.ErrGRPCKeyNotFound,
  634. 0,
  635. },
  636. { // put success
  637. func() error {
  638. _, err := kvc.Put(context.TODO(), &putReq)
  639. return err
  640. },
  641. nil,
  642. 0,
  643. },
  644. { // txn success, attach lease
  645. func() error {
  646. preq := putReq
  647. preq.Value = nil
  648. preq.Lease = lresp.ID
  649. preq.IgnoreValue = true
  650. txn := &pb.TxnRequest{}
  651. txn.Success = append(txn.Success, &pb.RequestOp{
  652. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  653. _, err := kvc.Txn(context.TODO(), txn)
  654. return err
  655. },
  656. nil,
  657. lresp.ID,
  658. },
  659. { // non-empty value with ignore_value should error
  660. func() error {
  661. preq := putReq
  662. preq.IgnoreValue = true
  663. _, err := kvc.Put(context.TODO(), &preq)
  664. return err
  665. },
  666. rpctypes.ErrGRPCValueProvided,
  667. 0,
  668. },
  669. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  670. func() error {
  671. preq := putReq
  672. preq.Value = nil
  673. preq.IgnoreValue = true
  674. presp, err := kvc.Put(context.TODO(), &preq)
  675. if err != nil {
  676. return err
  677. }
  678. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  679. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  680. }
  681. return nil
  682. },
  683. nil,
  684. 0,
  685. },
  686. { // revoke lease, ensure detached key doesn't get deleted
  687. func() error {
  688. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  689. return err
  690. },
  691. nil,
  692. 0,
  693. },
  694. }
  695. for i, tt := range tests {
  696. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  697. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  698. }
  699. if tt.putErr != nil {
  700. continue
  701. }
  702. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  703. if err != nil {
  704. t.Fatalf("#%d: %v", i, err)
  705. }
  706. if len(rr.Kvs) != 1 {
  707. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  708. }
  709. if !bytes.Equal(rr.Kvs[0].Value, val) {
  710. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  711. }
  712. if rr.Kvs[0].Lease != tt.wleaseID {
  713. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  714. }
  715. }
  716. }
  717. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  718. func TestV3PutIgnoreLease(t *testing.T) {
  719. defer testutil.AfterTest(t)
  720. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  721. defer clus.Terminate(t)
  722. kvc := toGRPC(clus.RandClient()).KV
  723. // create lease
  724. lc := toGRPC(clus.RandClient()).Lease
  725. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  726. if err != nil {
  727. t.Fatal(err)
  728. }
  729. if lresp.Error != "" {
  730. t.Fatal(lresp.Error)
  731. }
  732. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  733. putReq := pb.PutRequest{Key: key, Value: val}
  734. tests := []struct {
  735. putFunc func() error
  736. putErr error
  737. wleaseID int64
  738. wvalue []byte
  739. }{
  740. { // put failure for non-existent key
  741. func() error {
  742. preq := putReq
  743. preq.IgnoreLease = true
  744. _, err := kvc.Put(context.TODO(), &preq)
  745. return err
  746. },
  747. rpctypes.ErrGRPCKeyNotFound,
  748. 0,
  749. nil,
  750. },
  751. { // txn failure for non-existent key
  752. func() error {
  753. preq := putReq
  754. preq.IgnoreLease = true
  755. txn := &pb.TxnRequest{}
  756. txn.Success = append(txn.Success, &pb.RequestOp{
  757. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  758. _, err := kvc.Txn(context.TODO(), txn)
  759. return err
  760. },
  761. rpctypes.ErrGRPCKeyNotFound,
  762. 0,
  763. nil,
  764. },
  765. { // put success
  766. func() error {
  767. preq := putReq
  768. preq.Lease = lresp.ID
  769. _, err := kvc.Put(context.TODO(), &preq)
  770. return err
  771. },
  772. nil,
  773. lresp.ID,
  774. val,
  775. },
  776. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  777. func() error {
  778. preq := putReq
  779. preq.Value = val1
  780. preq.IgnoreLease = true
  781. txn := &pb.TxnRequest{}
  782. txn.Success = append(txn.Success, &pb.RequestOp{
  783. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  784. _, err := kvc.Txn(context.TODO(), txn)
  785. return err
  786. },
  787. nil,
  788. lresp.ID,
  789. val1,
  790. },
  791. { // non-empty lease with ignore_lease should error
  792. func() error {
  793. preq := putReq
  794. preq.Lease = lresp.ID
  795. preq.IgnoreLease = true
  796. _, err := kvc.Put(context.TODO(), &preq)
  797. return err
  798. },
  799. rpctypes.ErrGRPCLeaseProvided,
  800. 0,
  801. nil,
  802. },
  803. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  804. func() error {
  805. presp, err := kvc.Put(context.TODO(), &putReq)
  806. if err != nil {
  807. return err
  808. }
  809. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  810. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  811. }
  812. return nil
  813. },
  814. nil,
  815. 0,
  816. val,
  817. },
  818. { // revoke lease, ensure detached key doesn't get deleted
  819. func() error {
  820. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  821. return err
  822. },
  823. nil,
  824. 0,
  825. val,
  826. },
  827. }
  828. for i, tt := range tests {
  829. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  830. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  831. }
  832. if tt.putErr != nil {
  833. continue
  834. }
  835. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  836. if err != nil {
  837. t.Fatalf("#%d: %v", i, err)
  838. }
  839. if len(rr.Kvs) != 1 {
  840. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  841. }
  842. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  843. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  844. }
  845. if rr.Kvs[0].Lease != tt.wleaseID {
  846. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  847. }
  848. }
  849. }
  850. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  851. func TestV3PutMissingLease(t *testing.T) {
  852. defer testutil.AfterTest(t)
  853. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  854. defer clus.Terminate(t)
  855. kvc := toGRPC(clus.RandClient()).KV
  856. key := []byte("foo")
  857. preq := &pb.PutRequest{Key: key, Lease: 123456}
  858. tests := []func(){
  859. // put case
  860. func() {
  861. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  862. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  863. }
  864. },
  865. // txn success case
  866. func() {
  867. txn := &pb.TxnRequest{}
  868. txn.Success = append(txn.Success, &pb.RequestOp{
  869. Request: &pb.RequestOp_RequestPut{
  870. RequestPut: preq}})
  871. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  872. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  873. }
  874. },
  875. // txn failure case
  876. func() {
  877. txn := &pb.TxnRequest{}
  878. txn.Failure = append(txn.Failure, &pb.RequestOp{
  879. Request: &pb.RequestOp_RequestPut{
  880. RequestPut: preq}})
  881. cmp := &pb.Compare{
  882. Result: pb.Compare_GREATER,
  883. Target: pb.Compare_CREATE,
  884. Key: []byte("bar"),
  885. }
  886. txn.Compare = append(txn.Compare, cmp)
  887. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  888. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  889. }
  890. },
  891. // ignore bad lease in failure on success txn
  892. func() {
  893. txn := &pb.TxnRequest{}
  894. rreq := &pb.RangeRequest{Key: []byte("bar")}
  895. txn.Success = append(txn.Success, &pb.RequestOp{
  896. Request: &pb.RequestOp_RequestRange{
  897. RequestRange: rreq}})
  898. txn.Failure = append(txn.Failure, &pb.RequestOp{
  899. Request: &pb.RequestOp_RequestPut{
  900. RequestPut: preq}})
  901. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  902. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  903. }
  904. },
  905. }
  906. for i, f := range tests {
  907. f()
  908. // key shouldn't have been stored
  909. rreq := &pb.RangeRequest{Key: key}
  910. rresp, err := kvc.Range(context.TODO(), rreq)
  911. if err != nil {
  912. t.Errorf("#%d. could not rangereq (%v)", i, err)
  913. } else if len(rresp.Kvs) != 0 {
  914. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  915. }
  916. }
  917. }
  918. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  919. func TestV3DeleteRange(t *testing.T) {
  920. defer testutil.AfterTest(t)
  921. tests := []struct {
  922. keySet []string
  923. begin string
  924. end string
  925. prevKV bool
  926. wantSet [][]byte
  927. deleted int64
  928. }{
  929. // delete middle
  930. {
  931. []string{"foo", "foo/abc", "fop"},
  932. "foo/", "fop", false,
  933. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  934. },
  935. // no delete
  936. {
  937. []string{"foo", "foo/abc", "fop"},
  938. "foo/", "foo/", false,
  939. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  940. },
  941. // delete first
  942. {
  943. []string{"foo", "foo/abc", "fop"},
  944. "fo", "fop", false,
  945. [][]byte{[]byte("fop")}, 2,
  946. },
  947. // delete tail
  948. {
  949. []string{"foo", "foo/abc", "fop"},
  950. "foo/", "fos", false,
  951. [][]byte{[]byte("foo")}, 2,
  952. },
  953. // delete exact
  954. {
  955. []string{"foo", "foo/abc", "fop"},
  956. "foo/abc", "", false,
  957. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  958. },
  959. // delete none, [x,x)
  960. {
  961. []string{"foo"},
  962. "foo", "foo", false,
  963. [][]byte{[]byte("foo")}, 0,
  964. },
  965. // delete middle with preserveKVs set
  966. {
  967. []string{"foo", "foo/abc", "fop"},
  968. "foo/", "fop", true,
  969. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  970. },
  971. }
  972. for i, tt := range tests {
  973. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  974. kvc := toGRPC(clus.RandClient()).KV
  975. ks := tt.keySet
  976. for j := range ks {
  977. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  978. _, err := kvc.Put(context.TODO(), reqput)
  979. if err != nil {
  980. t.Fatalf("couldn't put key (%v)", err)
  981. }
  982. }
  983. dreq := &pb.DeleteRangeRequest{
  984. Key: []byte(tt.begin),
  985. RangeEnd: []byte(tt.end),
  986. PrevKv: tt.prevKV,
  987. }
  988. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  989. if err != nil {
  990. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  991. }
  992. if tt.deleted != dresp.Deleted {
  993. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  994. }
  995. if tt.prevKV {
  996. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  997. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  998. }
  999. }
  1000. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  1001. rresp, err := kvc.Range(context.TODO(), rreq)
  1002. if err != nil {
  1003. t.Errorf("couldn't get range on test %v (%v)", i, err)
  1004. }
  1005. if dresp.Header.Revision != rresp.Header.Revision {
  1006. t.Errorf("expected revision %v, got %v",
  1007. dresp.Header.Revision, rresp.Header.Revision)
  1008. }
  1009. keys := [][]byte{}
  1010. for j := range rresp.Kvs {
  1011. keys = append(keys, rresp.Kvs[j].Key)
  1012. }
  1013. if !reflect.DeepEqual(tt.wantSet, keys) {
  1014. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  1015. }
  1016. // can't defer because tcp ports will be in use
  1017. clus.Terminate(t)
  1018. }
  1019. }
  1020. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  1021. func TestV3TxnInvalidRange(t *testing.T) {
  1022. defer testutil.AfterTest(t)
  1023. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1024. defer clus.Terminate(t)
  1025. kvc := toGRPC(clus.RandClient()).KV
  1026. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1027. for i := 0; i < 3; i++ {
  1028. _, err := kvc.Put(context.Background(), preq)
  1029. if err != nil {
  1030. t.Fatalf("couldn't put key (%v)", err)
  1031. }
  1032. }
  1033. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  1034. if err != nil {
  1035. t.Fatalf("couldn't compact kv space (%v)", err)
  1036. }
  1037. // future rev
  1038. txn := &pb.TxnRequest{}
  1039. txn.Success = append(txn.Success, &pb.RequestOp{
  1040. Request: &pb.RequestOp_RequestPut{
  1041. RequestPut: preq}})
  1042. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  1043. txn.Success = append(txn.Success, &pb.RequestOp{
  1044. Request: &pb.RequestOp_RequestRange{
  1045. RequestRange: rreq}})
  1046. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  1047. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  1048. }
  1049. // compacted rev
  1050. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  1051. tv.RequestRange.Revision = 1
  1052. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  1053. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  1054. }
  1055. }
  1056. func TestV3TooLargeRequest(t *testing.T) {
  1057. defer testutil.AfterTest(t)
  1058. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1059. defer clus.Terminate(t)
  1060. kvc := toGRPC(clus.RandClient()).KV
  1061. // 2MB request value
  1062. largeV := make([]byte, 2*1024*1024)
  1063. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  1064. _, err := kvc.Put(context.Background(), preq)
  1065. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  1066. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  1067. }
  1068. }
  1069. // TestV3Hash tests hash.
  1070. func TestV3Hash(t *testing.T) {
  1071. defer testutil.AfterTest(t)
  1072. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1073. defer clus.Terminate(t)
  1074. cli := clus.RandClient()
  1075. kvc := toGRPC(cli).KV
  1076. m := toGRPC(cli).Maintenance
  1077. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1078. for i := 0; i < 3; i++ {
  1079. _, err := kvc.Put(context.Background(), preq)
  1080. if err != nil {
  1081. t.Fatalf("couldn't put key (%v)", err)
  1082. }
  1083. }
  1084. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  1085. if err != nil || resp.Hash == 0 {
  1086. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1087. }
  1088. }
  1089. // TestV3HashRestart ensures that hash stays the same after restart.
  1090. func TestV3HashRestart(t *testing.T) {
  1091. defer testutil.AfterTest(t)
  1092. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  1093. defer clus.Terminate(t)
  1094. cli := clus.RandClient()
  1095. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1096. if err != nil || resp.Hash == 0 {
  1097. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1098. }
  1099. hash1 := resp.Hash
  1100. clus.Members[0].Stop(t)
  1101. clus.Members[0].Restart(t)
  1102. clus.waitLeader(t, clus.Members)
  1103. kvc := toGRPC(clus.Client(0)).KV
  1104. waitForRestart(t, kvc)
  1105. cli = clus.RandClient()
  1106. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1107. if err != nil || resp.Hash == 0 {
  1108. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1109. }
  1110. hash2 := resp.Hash
  1111. if hash1 != hash2 {
  1112. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  1113. }
  1114. }
  1115. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  1116. func TestV3StorageQuotaAPI(t *testing.T) {
  1117. defer testutil.AfterTest(t)
  1118. quotasize := int64(16 * os.Getpagesize())
  1119. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1120. // Set a quota on one node
  1121. clus.Members[0].QuotaBackendBytes = quotasize
  1122. clus.Members[0].Stop(t)
  1123. clus.Members[0].Restart(t)
  1124. defer clus.Terminate(t)
  1125. kvc := toGRPC(clus.Client(0)).KV
  1126. waitForRestart(t, kvc)
  1127. key := []byte("abc")
  1128. // test small put that fits in quota
  1129. smallbuf := make([]byte, 512)
  1130. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1131. t.Fatal(err)
  1132. }
  1133. // test big put
  1134. bigbuf := make([]byte, quotasize)
  1135. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1136. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1137. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1138. }
  1139. // test big txn
  1140. puttxn := &pb.RequestOp{
  1141. Request: &pb.RequestOp_RequestPut{
  1142. RequestPut: &pb.PutRequest{
  1143. Key: key,
  1144. Value: bigbuf,
  1145. },
  1146. },
  1147. }
  1148. txnreq := &pb.TxnRequest{}
  1149. txnreq.Success = append(txnreq.Success, puttxn)
  1150. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  1151. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  1152. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1153. }
  1154. }
  1155. func TestV3RangeRequest(t *testing.T) {
  1156. defer testutil.AfterTest(t)
  1157. tests := []struct {
  1158. putKeys []string
  1159. reqs []pb.RangeRequest
  1160. wresps [][]string
  1161. wmores []bool
  1162. }{
  1163. // single key
  1164. {
  1165. []string{"foo", "bar"},
  1166. []pb.RangeRequest{
  1167. // exists
  1168. {Key: []byte("foo")},
  1169. // doesn't exist
  1170. {Key: []byte("baz")},
  1171. },
  1172. [][]string{
  1173. {"foo"},
  1174. {},
  1175. },
  1176. []bool{false, false},
  1177. },
  1178. // multi-key
  1179. {
  1180. []string{"a", "b", "c", "d", "e"},
  1181. []pb.RangeRequest{
  1182. // all in range
  1183. {Key: []byte("a"), RangeEnd: []byte("z")},
  1184. // [b, d)
  1185. {Key: []byte("b"), RangeEnd: []byte("d")},
  1186. // out of range
  1187. {Key: []byte("f"), RangeEnd: []byte("z")},
  1188. // [c,c) = empty
  1189. {Key: []byte("c"), RangeEnd: []byte("c")},
  1190. // [d, b) = empty
  1191. {Key: []byte("d"), RangeEnd: []byte("b")},
  1192. // ["\0", "\0") => all in range
  1193. {Key: []byte{0}, RangeEnd: []byte{0}},
  1194. },
  1195. [][]string{
  1196. {"a", "b", "c", "d", "e"},
  1197. {"b", "c"},
  1198. {},
  1199. {},
  1200. {},
  1201. {"a", "b", "c", "d", "e"},
  1202. },
  1203. []bool{false, false, false, false, false, false},
  1204. },
  1205. // revision
  1206. {
  1207. []string{"a", "b", "c", "d", "e"},
  1208. []pb.RangeRequest{
  1209. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1210. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1211. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1212. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1213. },
  1214. [][]string{
  1215. {"a", "b", "c", "d", "e"},
  1216. {},
  1217. {"a"},
  1218. {"a", "b"},
  1219. },
  1220. []bool{false, false, false, false},
  1221. },
  1222. // limit
  1223. {
  1224. []string{"foo", "bar"},
  1225. []pb.RangeRequest{
  1226. // more
  1227. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1228. // no more
  1229. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1230. },
  1231. [][]string{
  1232. {"bar"},
  1233. {"bar", "foo"},
  1234. },
  1235. []bool{true, false},
  1236. },
  1237. // sort
  1238. {
  1239. []string{"b", "a", "c", "d", "c"},
  1240. []pb.RangeRequest{
  1241. {
  1242. Key: []byte("a"), RangeEnd: []byte("z"),
  1243. Limit: 1,
  1244. SortOrder: pb.RangeRequest_ASCEND,
  1245. SortTarget: pb.RangeRequest_KEY,
  1246. },
  1247. {
  1248. Key: []byte("a"), RangeEnd: []byte("z"),
  1249. Limit: 1,
  1250. SortOrder: pb.RangeRequest_DESCEND,
  1251. SortTarget: pb.RangeRequest_KEY,
  1252. },
  1253. {
  1254. Key: []byte("a"), RangeEnd: []byte("z"),
  1255. Limit: 1,
  1256. SortOrder: pb.RangeRequest_ASCEND,
  1257. SortTarget: pb.RangeRequest_CREATE,
  1258. },
  1259. {
  1260. Key: []byte("a"), RangeEnd: []byte("z"),
  1261. Limit: 1,
  1262. SortOrder: pb.RangeRequest_DESCEND,
  1263. SortTarget: pb.RangeRequest_MOD,
  1264. },
  1265. {
  1266. Key: []byte("z"), RangeEnd: []byte("z"),
  1267. Limit: 1,
  1268. SortOrder: pb.RangeRequest_DESCEND,
  1269. SortTarget: pb.RangeRequest_CREATE,
  1270. },
  1271. { // sort ASCEND by default
  1272. Key: []byte("a"), RangeEnd: []byte("z"),
  1273. Limit: 10,
  1274. SortOrder: pb.RangeRequest_NONE,
  1275. SortTarget: pb.RangeRequest_CREATE,
  1276. },
  1277. },
  1278. [][]string{
  1279. {"a"},
  1280. {"d"},
  1281. {"b"},
  1282. {"c"},
  1283. {},
  1284. {"b", "a", "c", "d"},
  1285. },
  1286. []bool{true, true, true, true, false, false},
  1287. },
  1288. // min/max mod rev
  1289. {
  1290. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1291. []pb.RangeRequest{
  1292. {
  1293. Key: []byte{0}, RangeEnd: []byte{0},
  1294. MinModRevision: 3,
  1295. },
  1296. {
  1297. Key: []byte{0}, RangeEnd: []byte{0},
  1298. MaxModRevision: 3,
  1299. },
  1300. {
  1301. Key: []byte{0}, RangeEnd: []byte{0},
  1302. MinModRevision: 3,
  1303. MaxModRevision: 5,
  1304. },
  1305. {
  1306. Key: []byte{0}, RangeEnd: []byte{0},
  1307. MaxModRevision: 10,
  1308. },
  1309. },
  1310. [][]string{
  1311. {"rev3", "rev4", "rev5", "rev6"},
  1312. {"rev2", "rev3"},
  1313. {"rev3", "rev4", "rev5"},
  1314. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1315. },
  1316. []bool{false, false, false, false},
  1317. },
  1318. // min/max create rev
  1319. {
  1320. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1321. []pb.RangeRequest{
  1322. {
  1323. Key: []byte{0}, RangeEnd: []byte{0},
  1324. MinCreateRevision: 3,
  1325. },
  1326. {
  1327. Key: []byte{0}, RangeEnd: []byte{0},
  1328. MaxCreateRevision: 3,
  1329. },
  1330. {
  1331. Key: []byte{0}, RangeEnd: []byte{0},
  1332. MinCreateRevision: 3,
  1333. MaxCreateRevision: 5,
  1334. },
  1335. {
  1336. Key: []byte{0}, RangeEnd: []byte{0},
  1337. MaxCreateRevision: 10,
  1338. },
  1339. },
  1340. [][]string{
  1341. {"rev3", "rev6"},
  1342. {"rev2", "rev3"},
  1343. {"rev3"},
  1344. {"rev2", "rev3", "rev6"},
  1345. },
  1346. []bool{false, false, false, false},
  1347. },
  1348. }
  1349. for i, tt := range tests {
  1350. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1351. for _, k := range tt.putKeys {
  1352. kvc := toGRPC(clus.RandClient()).KV
  1353. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1354. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1355. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1356. }
  1357. }
  1358. for j, req := range tt.reqs {
  1359. kvc := toGRPC(clus.RandClient()).KV
  1360. resp, err := kvc.Range(context.TODO(), &req)
  1361. if err != nil {
  1362. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1363. continue
  1364. }
  1365. if len(resp.Kvs) != len(tt.wresps[j]) {
  1366. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1367. continue
  1368. }
  1369. for k, wKey := range tt.wresps[j] {
  1370. respKey := string(resp.Kvs[k].Key)
  1371. if respKey != wKey {
  1372. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1373. }
  1374. }
  1375. if resp.More != tt.wmores[j] {
  1376. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1377. }
  1378. wrev := int64(len(tt.putKeys) + 1)
  1379. if resp.Header.Revision != wrev {
  1380. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1381. }
  1382. }
  1383. clus.Terminate(t)
  1384. }
  1385. }
  1386. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1387. cfg.UseGRPC = true
  1388. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1389. clus.Launch(t)
  1390. return clus
  1391. }
  1392. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1393. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1394. defer testutil.AfterTest(t)
  1395. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1396. clus := newClusterV3NoClients(t, &cfg)
  1397. defer clus.Terminate(t)
  1398. // nil out TLS field so client will use an insecure connection
  1399. clus.Members[0].ClientTLSInfo = nil
  1400. client, err := NewClientV3(clus.Members[0])
  1401. if err != nil && err != context.DeadlineExceeded {
  1402. t.Fatalf("unexpected error (%v)", err)
  1403. } else if client == nil {
  1404. // Ideally, no client would be returned. However, grpc will
  1405. // return a connection without trying to handshake first so
  1406. // the connection appears OK.
  1407. return
  1408. }
  1409. defer client.Close()
  1410. donec := make(chan error, 1)
  1411. go func() {
  1412. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1413. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1414. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1415. cancel()
  1416. donec <- perr
  1417. }()
  1418. if perr := <-donec; perr == nil {
  1419. t.Fatalf("expected client error on put")
  1420. }
  1421. }
  1422. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1423. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1424. defer testutil.AfterTest(t)
  1425. cfg := ClusterConfig{Size: 3}
  1426. clus := newClusterV3NoClients(t, &cfg)
  1427. defer clus.Terminate(t)
  1428. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1429. clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()}
  1430. client, err := NewClientV3(clus.Members[0])
  1431. if client != nil || err == nil {
  1432. t.Fatalf("expected no client")
  1433. } else if err != context.DeadlineExceeded {
  1434. t.Fatalf("unexpected error (%v)", err)
  1435. }
  1436. }
  1437. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1438. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1439. defer testutil.AfterTest(t)
  1440. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1441. clus := newClusterV3NoClients(t, &cfg)
  1442. defer clus.Terminate(t)
  1443. client, err := NewClientV3(clus.Members[0])
  1444. if err != nil {
  1445. t.Fatalf("expected tls client (%v)", err)
  1446. }
  1447. defer client.Close()
  1448. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1449. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1450. t.Fatalf("unexpected error on put over tls (%v)", err)
  1451. }
  1452. }
  1453. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1454. // when all certs are atomically replaced by directory renaming.
  1455. // And expects server to reject client requests, and vice versa.
  1456. func TestTLSReloadAtomicReplace(t *testing.T) {
  1457. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1458. if err != nil {
  1459. t.Fatal(err)
  1460. }
  1461. os.RemoveAll(tmpDir)
  1462. defer os.RemoveAll(tmpDir)
  1463. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1464. if err != nil {
  1465. t.Fatal(err)
  1466. }
  1467. defer os.RemoveAll(certsDir)
  1468. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1469. if err != nil {
  1470. t.Fatal(err)
  1471. }
  1472. defer os.RemoveAll(certsDirExp)
  1473. cloneFunc := func() transport.TLSInfo {
  1474. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1475. if terr != nil {
  1476. t.Fatal(terr)
  1477. }
  1478. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1479. t.Fatal(err)
  1480. }
  1481. return tlsInfo
  1482. }
  1483. replaceFunc := func() {
  1484. if err = os.Rename(certsDir, tmpDir); err != nil {
  1485. t.Fatal(err)
  1486. }
  1487. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1488. t.Fatal(err)
  1489. }
  1490. // after rename,
  1491. // 'certsDir' contains expired certs
  1492. // 'tmpDir' contains valid certs
  1493. // 'certsDirExp' does not exist
  1494. }
  1495. revertFunc := func() {
  1496. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1497. t.Fatal(err)
  1498. }
  1499. if err = os.Rename(certsDir, tmpDir); err != nil {
  1500. t.Fatal(err)
  1501. }
  1502. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1503. t.Fatal(err)
  1504. }
  1505. }
  1506. testTLSReload(t, cloneFunc, replaceFunc, revertFunc, false)
  1507. }
  1508. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1509. // when new certs are copied over, one by one. And expects server
  1510. // to reject client requests, and vice versa.
  1511. func TestTLSReloadCopy(t *testing.T) {
  1512. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1513. if err != nil {
  1514. t.Fatal(err)
  1515. }
  1516. defer os.RemoveAll(certsDir)
  1517. cloneFunc := func() transport.TLSInfo {
  1518. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1519. if terr != nil {
  1520. t.Fatal(terr)
  1521. }
  1522. return tlsInfo
  1523. }
  1524. replaceFunc := func() {
  1525. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1526. t.Fatal(err)
  1527. }
  1528. }
  1529. revertFunc := func() {
  1530. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1531. t.Fatal(err)
  1532. }
  1533. }
  1534. testTLSReload(t, cloneFunc, replaceFunc, revertFunc, false)
  1535. }
  1536. // TestTLSReloadCopyIPOnly ensures server reloads expired/valid certs
  1537. // when new certs are copied over, one by one. And expects server
  1538. // to reject client requests, and vice versa.
  1539. func TestTLSReloadCopyIPOnly(t *testing.T) {
  1540. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1541. if err != nil {
  1542. t.Fatal(err)
  1543. }
  1544. defer os.RemoveAll(certsDir)
  1545. cloneFunc := func() transport.TLSInfo {
  1546. tlsInfo, terr := copyTLSFiles(testTLSInfoIP, certsDir)
  1547. if terr != nil {
  1548. t.Fatal(terr)
  1549. }
  1550. return tlsInfo
  1551. }
  1552. replaceFunc := func() {
  1553. if _, err = copyTLSFiles(testTLSInfoExpiredIP, certsDir); err != nil {
  1554. t.Fatal(err)
  1555. }
  1556. }
  1557. revertFunc := func() {
  1558. if _, err = copyTLSFiles(testTLSInfoIP, certsDir); err != nil {
  1559. t.Fatal(err)
  1560. }
  1561. }
  1562. testTLSReload(t, cloneFunc, replaceFunc, revertFunc, true)
  1563. }
  1564. func testTLSReload(
  1565. t *testing.T,
  1566. cloneFunc func() transport.TLSInfo,
  1567. replaceFunc func(),
  1568. revertFunc func(),
  1569. useIP bool) {
  1570. defer testutil.AfterTest(t)
  1571. // 1. separate copies for TLS assets modification
  1572. tlsInfo := cloneFunc()
  1573. // 2. start cluster with valid certs
  1574. clus := NewClusterV3(t, &ClusterConfig{
  1575. Size: 1,
  1576. PeerTLS: &tlsInfo,
  1577. ClientTLS: &tlsInfo,
  1578. UseIP: useIP,
  1579. })
  1580. defer clus.Terminate(t)
  1581. // 3. concurrent client dialing while certs become expired
  1582. errc := make(chan error, 1)
  1583. go func() {
  1584. for {
  1585. cc, err := tlsInfo.ClientConfig()
  1586. if err != nil {
  1587. // errors in 'go/src/crypto/tls/tls.go'
  1588. // tls: private key does not match public key
  1589. // tls: failed to find any PEM data in key input
  1590. // tls: failed to find any PEM data in certificate input
  1591. // Or 'does not exist', 'not found', etc
  1592. t.Log(err)
  1593. continue
  1594. }
  1595. cli, cerr := clientv3.New(clientv3.Config{
  1596. DialOptions: []grpc.DialOption{grpc.WithBlock()},
  1597. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1598. DialTimeout: time.Second,
  1599. TLS: cc,
  1600. })
  1601. if cerr != nil {
  1602. errc <- cerr
  1603. return
  1604. }
  1605. cli.Close()
  1606. }
  1607. }()
  1608. // 4. replace certs with expired ones
  1609. replaceFunc()
  1610. // 5. expect dial time-out when loading expired certs
  1611. select {
  1612. case gerr := <-errc:
  1613. if gerr != context.DeadlineExceeded {
  1614. t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
  1615. }
  1616. case <-time.After(5 * time.Second):
  1617. t.Fatal("failed to receive dial timeout error")
  1618. }
  1619. // 6. replace expired certs back with valid ones
  1620. revertFunc()
  1621. // 7. new requests should trigger listener to reload valid certs
  1622. tls, terr := tlsInfo.ClientConfig()
  1623. if terr != nil {
  1624. t.Fatal(terr)
  1625. }
  1626. cl, cerr := clientv3.New(clientv3.Config{
  1627. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1628. DialTimeout: 5 * time.Second,
  1629. TLS: tls,
  1630. })
  1631. if cerr != nil {
  1632. t.Fatalf("expected no error, got %v", cerr)
  1633. }
  1634. cl.Close()
  1635. }
  1636. func TestGRPCRequireLeader(t *testing.T) {
  1637. defer testutil.AfterTest(t)
  1638. cfg := ClusterConfig{Size: 3}
  1639. clus := newClusterV3NoClients(t, &cfg)
  1640. defer clus.Terminate(t)
  1641. clus.Members[1].Stop(t)
  1642. clus.Members[2].Stop(t)
  1643. client, err := NewClientV3(clus.Members[0])
  1644. if err != nil {
  1645. t.Fatalf("cannot create client: %v", err)
  1646. }
  1647. defer client.Close()
  1648. // wait for election timeout, then member[0] will not have a leader.
  1649. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1650. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1651. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1652. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1653. if _, err := toGRPC(client).KV.Put(ctx, reqput); rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1654. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1655. }
  1656. }
  1657. func TestGRPCStreamRequireLeader(t *testing.T) {
  1658. defer testutil.AfterTest(t)
  1659. cfg := ClusterConfig{Size: 3}
  1660. clus := newClusterV3NoClients(t, &cfg)
  1661. defer clus.Terminate(t)
  1662. client, err := NewClientV3(clus.Members[0])
  1663. if err != nil {
  1664. t.Fatalf("failed to create client (%v)", err)
  1665. }
  1666. defer client.Close()
  1667. wAPI := toGRPC(client).Watch
  1668. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1669. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1670. wStream, err := wAPI.Watch(ctx)
  1671. if err != nil {
  1672. t.Fatalf("wAPI.Watch error: %v", err)
  1673. }
  1674. clus.Members[1].Stop(t)
  1675. clus.Members[2].Stop(t)
  1676. // existing stream should be rejected
  1677. _, err = wStream.Recv()
  1678. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1679. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1680. }
  1681. // new stream should also be rejected
  1682. wStream, err = wAPI.Watch(ctx)
  1683. if err != nil {
  1684. t.Fatalf("wAPI.Watch error: %v", err)
  1685. }
  1686. _, err = wStream.Recv()
  1687. if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1688. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1689. }
  1690. clus.Members[1].Restart(t)
  1691. clus.Members[2].Restart(t)
  1692. clus.waitLeader(t, clus.Members)
  1693. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1694. // new stream should also be OK now after we restarted the other members
  1695. wStream, err = wAPI.Watch(ctx)
  1696. if err != nil {
  1697. t.Fatalf("wAPI.Watch error: %v", err)
  1698. }
  1699. wreq := &pb.WatchRequest{
  1700. RequestUnion: &pb.WatchRequest_CreateRequest{
  1701. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1702. },
  1703. }
  1704. err = wStream.Send(wreq)
  1705. if err != nil {
  1706. t.Errorf("err = %v, want nil", err)
  1707. }
  1708. }
  1709. // TestV3LargeRequests ensures that configurable MaxRequestBytes works as intended.
  1710. func TestV3LargeRequests(t *testing.T) {
  1711. defer testutil.AfterTest(t)
  1712. tests := []struct {
  1713. maxRequestBytes uint
  1714. valueSize int
  1715. expectError error
  1716. }{
  1717. // don't set to 0. use 0 as the default.
  1718. {1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1719. {10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1720. {10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1721. {10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1722. }
  1723. for i, test := range tests {
  1724. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1725. kvcli := toGRPC(clus.Client(0)).KV
  1726. reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
  1727. _, err := kvcli.Put(context.TODO(), reqput)
  1728. if !eqErrGRPC(err, test.expectError) {
  1729. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1730. }
  1731. // request went through, expect large response back from server
  1732. if test.expectError == nil {
  1733. reqget := &pb.RangeRequest{Key: []byte("foo")}
  1734. // limit receive call size with original value + gRPC overhead bytes
  1735. _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
  1736. if err != nil {
  1737. t.Errorf("#%d: range expected no error, got %v", i, err)
  1738. }
  1739. }
  1740. clus.Terminate(t)
  1741. }
  1742. }
  1743. func eqErrGRPC(err1 error, err2 error) bool {
  1744. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1745. }
  1746. // waitForRestart tries a range request until the client's server responds.
  1747. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1748. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1749. // FailFast=false works with Put.
  1750. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1751. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1752. // TODO: Remove retry loop once the new grpc load balancer provides retry.
  1753. var err error
  1754. for i := 0; i < 10; i++ {
  1755. if _, err = kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1756. if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
  1757. time.Sleep(time.Millisecond * 250)
  1758. } else {
  1759. t.Fatal(err)
  1760. }
  1761. }
  1762. }
  1763. if err != nil {
  1764. t.Fatalf("timed out waiting for restart: %v", err)
  1765. }
  1766. }