v3_grpc_test.go 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "math/rand"
  20. "os"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/pkg/testutil"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "golang.org/x/net/context"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/metadata"
  32. )
  33. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  34. // overwrites it, then checks that the change was applied.
  35. func TestV3PutOverwrite(t *testing.T) {
  36. defer testutil.AfterTest(t)
  37. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  38. defer clus.Terminate(t)
  39. kvc := toGRPC(clus.RandClient()).KV
  40. key := []byte("foo")
  41. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  42. respput, err := kvc.Put(context.TODO(), reqput)
  43. if err != nil {
  44. t.Fatalf("couldn't put key (%v)", err)
  45. }
  46. // overwrite
  47. reqput.Value = []byte("baz")
  48. respput2, err := kvc.Put(context.TODO(), reqput)
  49. if err != nil {
  50. t.Fatalf("couldn't put key (%v)", err)
  51. }
  52. if respput2.Header.Revision <= respput.Header.Revision {
  53. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  54. respput2.Header.Revision, respput.Header.Revision)
  55. }
  56. reqrange := &pb.RangeRequest{Key: key}
  57. resprange, err := kvc.Range(context.TODO(), reqrange)
  58. if err != nil {
  59. t.Fatalf("couldn't get key (%v)", err)
  60. }
  61. if len(resprange.Kvs) != 1 {
  62. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  63. }
  64. kv := resprange.Kvs[0]
  65. if kv.ModRevision <= kv.CreateRevision {
  66. t.Errorf("expected modRev > createRev, got %d <= %d",
  67. kv.ModRevision, kv.CreateRevision)
  68. }
  69. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  70. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  71. }
  72. }
  73. // TestPutRestart checks if a put after an unrelated member restart succeeds
  74. func TestV3PutRestart(t *testing.T) {
  75. defer testutil.AfterTest(t)
  76. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  77. defer clus.Terminate(t)
  78. kvIdx := rand.Intn(3)
  79. kvc := toGRPC(clus.Client(kvIdx)).KV
  80. stopIdx := kvIdx
  81. for stopIdx == kvIdx {
  82. stopIdx = rand.Intn(3)
  83. }
  84. clus.clients[stopIdx].Close()
  85. clus.Members[stopIdx].Stop(t)
  86. clus.Members[stopIdx].Restart(t)
  87. c, cerr := NewClientV3(clus.Members[stopIdx])
  88. if cerr != nil {
  89. t.Fatalf("cannot create client: %v", cerr)
  90. }
  91. clus.clients[stopIdx] = c
  92. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  93. defer cancel()
  94. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  95. _, err := kvc.Put(ctx, reqput)
  96. if err != nil && err == ctx.Err() {
  97. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  98. }
  99. }
  100. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  101. func TestV3CompactCurrentRev(t *testing.T) {
  102. defer testutil.AfterTest(t)
  103. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  104. defer clus.Terminate(t)
  105. kvc := toGRPC(clus.RandClient()).KV
  106. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  107. for i := 0; i < 3; i++ {
  108. if _, err := kvc.Put(context.Background(), preq); err != nil {
  109. t.Fatalf("couldn't put key (%v)", err)
  110. }
  111. }
  112. // get key to add to proxy cache, if any
  113. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  114. t.Fatal(err)
  115. }
  116. // compact on current revision
  117. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  118. if err != nil {
  119. t.Fatalf("couldn't compact kv space (%v)", err)
  120. }
  121. // key still exists when linearized?
  122. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  123. if err != nil {
  124. t.Fatalf("couldn't get key after compaction (%v)", err)
  125. }
  126. // key still exists when serialized?
  127. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  128. if err != nil {
  129. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  130. }
  131. }
  132. // TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
  133. func TestV3HashKV(t *testing.T) {
  134. defer testutil.AfterTest(t)
  135. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  136. defer clus.Terminate(t)
  137. kvc := toGRPC(clus.RandClient()).KV
  138. mvc := toGRPC(clus.RandClient()).Maintenance
  139. for i := 0; i < 10; i++ {
  140. resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
  141. if err != nil {
  142. t.Fatal(err)
  143. }
  144. rev := resp.Header.Revision
  145. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. if rev != hresp.Header.Revision {
  150. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  151. }
  152. prevHash := hresp.Hash
  153. prevCompactRev := hresp.CompactRevision
  154. for i := 0; i < 10; i++ {
  155. hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. if rev != hresp.Header.Revision {
  160. t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
  161. }
  162. if prevHash != hresp.Hash {
  163. t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
  164. }
  165. if prevCompactRev != hresp.CompactRevision {
  166. t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
  167. }
  168. prevHash = hresp.Hash
  169. prevCompactRev = hresp.CompactRevision
  170. }
  171. }
  172. }
  173. func TestV3TxnTooManyOps(t *testing.T) {
  174. defer testutil.AfterTest(t)
  175. maxTxnOps := uint(128)
  176. clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
  177. defer clus.Terminate(t)
  178. kvc := toGRPC(clus.RandClient()).KV
  179. // unique keys
  180. i := new(int)
  181. keyf := func() []byte {
  182. *i++
  183. return []byte(fmt.Sprintf("key-%d", i))
  184. }
  185. addCompareOps := func(txn *pb.TxnRequest) {
  186. txn.Compare = append(txn.Compare,
  187. &pb.Compare{
  188. Result: pb.Compare_GREATER,
  189. Target: pb.Compare_CREATE,
  190. Key: keyf(),
  191. })
  192. }
  193. addSuccessOps := func(txn *pb.TxnRequest) {
  194. txn.Success = append(txn.Success,
  195. &pb.RequestOp{
  196. Request: &pb.RequestOp_RequestPut{
  197. RequestPut: &pb.PutRequest{
  198. Key: keyf(),
  199. Value: []byte("bar"),
  200. },
  201. },
  202. })
  203. }
  204. addFailureOps := func(txn *pb.TxnRequest) {
  205. txn.Failure = append(txn.Failure,
  206. &pb.RequestOp{
  207. Request: &pb.RequestOp_RequestPut{
  208. RequestPut: &pb.PutRequest{
  209. Key: keyf(),
  210. Value: []byte("bar"),
  211. },
  212. },
  213. })
  214. }
  215. addTxnOps := func(txn *pb.TxnRequest) {
  216. newTxn := &pb.TxnRequest{}
  217. addSuccessOps(newTxn)
  218. txn.Success = append(txn.Success,
  219. &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  220. RequestTxn: newTxn,
  221. },
  222. },
  223. )
  224. }
  225. tests := []func(txn *pb.TxnRequest){
  226. addCompareOps,
  227. addSuccessOps,
  228. addFailureOps,
  229. addTxnOps,
  230. }
  231. for i, tt := range tests {
  232. txn := &pb.TxnRequest{}
  233. for j := 0; j < int(maxTxnOps+1); j++ {
  234. tt(txn)
  235. }
  236. _, err := kvc.Txn(context.Background(), txn)
  237. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  238. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  239. }
  240. }
  241. }
  242. func TestV3TxnDuplicateKeys(t *testing.T) {
  243. defer testutil.AfterTest(t)
  244. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  245. defer clus.Terminate(t)
  246. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  247. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  248. RequestDeleteRange: &pb.DeleteRangeRequest{
  249. Key: []byte("abc"),
  250. },
  251. },
  252. }
  253. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  254. RequestDeleteRange: &pb.DeleteRangeRequest{
  255. Key: []byte("a"), RangeEnd: []byte("b"),
  256. },
  257. },
  258. }
  259. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  260. RequestDeleteRange: &pb.DeleteRangeRequest{
  261. Key: []byte("abb"), RangeEnd: []byte("abc"),
  262. },
  263. },
  264. }
  265. txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  266. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
  267. },
  268. }
  269. txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  270. RequestTxn: &pb.TxnRequest{
  271. Success: []*pb.RequestOp{delInRangeReq},
  272. Failure: []*pb.RequestOp{delInRangeReq}},
  273. },
  274. }
  275. txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  276. RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
  277. },
  278. }
  279. txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
  280. RequestTxn: &pb.TxnRequest{
  281. Success: []*pb.RequestOp{putreq},
  282. Failure: []*pb.RequestOp{putreq}},
  283. },
  284. }
  285. kvc := toGRPC(clus.RandClient()).KV
  286. tests := []struct {
  287. txnSuccess []*pb.RequestOp
  288. werr error
  289. }{
  290. {
  291. txnSuccess: []*pb.RequestOp{putreq, putreq},
  292. werr: rpctypes.ErrGRPCDuplicateKey,
  293. },
  294. {
  295. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  296. werr: rpctypes.ErrGRPCDuplicateKey,
  297. },
  298. {
  299. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  300. werr: rpctypes.ErrGRPCDuplicateKey,
  301. },
  302. // Then(Put(a), Then(Del(a)))
  303. {
  304. txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
  305. werr: rpctypes.ErrGRPCDuplicateKey,
  306. },
  307. // Then(Del(a), Then(Put(a)))
  308. {
  309. txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
  310. werr: rpctypes.ErrGRPCDuplicateKey,
  311. },
  312. // Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
  313. {
  314. txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
  315. werr: rpctypes.ErrGRPCDuplicateKey,
  316. },
  317. // Then(Del(x), (Then(Put(a)), Else(Put(a))))
  318. {
  319. txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
  320. werr: nil,
  321. },
  322. // Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
  323. {
  324. txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
  325. werr: nil,
  326. },
  327. {
  328. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  329. werr: nil,
  330. },
  331. {
  332. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  333. werr: nil,
  334. },
  335. }
  336. for i, tt := range tests {
  337. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  338. _, err := kvc.Txn(context.Background(), txn)
  339. if !eqErrGRPC(err, tt.werr) {
  340. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  341. }
  342. }
  343. }
  344. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  345. func TestV3TxnRevision(t *testing.T) {
  346. defer testutil.AfterTest(t)
  347. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  348. defer clus.Terminate(t)
  349. kvc := toGRPC(clus.RandClient()).KV
  350. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  351. presp, err := kvc.Put(context.TODO(), pr)
  352. if err != nil {
  353. t.Fatal(err)
  354. }
  355. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  356. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  357. tresp, err := kvc.Txn(context.TODO(), txn)
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. // did not update revision
  362. if presp.Header.Revision != tresp.Header.Revision {
  363. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  364. }
  365. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  366. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  367. tresp, err = kvc.Txn(context.TODO(), txn)
  368. if err != nil {
  369. t.Fatal(err)
  370. }
  371. // did not update revision
  372. if presp.Header.Revision != tresp.Header.Revision {
  373. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  374. }
  375. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  376. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  377. tresp, err = kvc.Txn(context.TODO(), txn)
  378. if err != nil {
  379. t.Fatal(err)
  380. }
  381. // updated revision
  382. if tresp.Header.Revision != presp.Header.Revision+1 {
  383. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  384. }
  385. }
  386. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  387. // when compared to the Succeeded field in the txn response.
  388. func TestV3TxnCmpHeaderRev(t *testing.T) {
  389. defer testutil.AfterTest(t)
  390. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  391. defer clus.Terminate(t)
  392. kvc := toGRPC(clus.RandClient()).KV
  393. for i := 0; i < 10; i++ {
  394. // Concurrently put a key with a txn comparing on it.
  395. revc := make(chan int64, 1)
  396. go func() {
  397. defer close(revc)
  398. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  399. presp, err := kvc.Put(context.TODO(), pr)
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. revc <- presp.Header.Revision
  404. }()
  405. // The read-only txn uses the optimized readindex server path.
  406. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  407. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  408. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  409. // i = 0 /\ Succeeded => put followed txn
  410. cmp := &pb.Compare{
  411. Result: pb.Compare_EQUAL,
  412. Target: pb.Compare_VERSION,
  413. Key: []byte("k"),
  414. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  415. }
  416. txn.Compare = append(txn.Compare, cmp)
  417. tresp, err := kvc.Txn(context.TODO(), txn)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. prev := <-revc
  422. // put followed txn; should eval to false
  423. if prev > tresp.Header.Revision && !tresp.Succeeded {
  424. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  425. }
  426. // txn follows put; should eval to true
  427. if tresp.Header.Revision >= prev && tresp.Succeeded {
  428. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  429. }
  430. }
  431. }
  432. // TestV3TxnRangeCompare tests range comparisons in txns
  433. func TestV3TxnRangeCompare(t *testing.T) {
  434. defer testutil.AfterTest(t)
  435. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  436. defer clus.Terminate(t)
  437. // put keys, named by expected revision
  438. for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
  439. if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
  440. t.Fatal(err)
  441. }
  442. }
  443. tests := []struct {
  444. cmp pb.Compare
  445. wSuccess bool
  446. }{
  447. {
  448. // >= /a/; all create revs fit
  449. pb.Compare{
  450. Key: []byte("/a/"),
  451. RangeEnd: []byte{0},
  452. Target: pb.Compare_CREATE,
  453. Result: pb.Compare_LESS,
  454. TargetUnion: &pb.Compare_CreateRevision{6},
  455. },
  456. true,
  457. },
  458. {
  459. // >= /a/; one create rev doesn't fit
  460. pb.Compare{
  461. Key: []byte("/a/"),
  462. RangeEnd: []byte{0},
  463. Target: pb.Compare_CREATE,
  464. Result: pb.Compare_LESS,
  465. TargetUnion: &pb.Compare_CreateRevision{5},
  466. },
  467. false,
  468. },
  469. {
  470. // prefix /a/*; all create revs fit
  471. pb.Compare{
  472. Key: []byte("/a/"),
  473. RangeEnd: []byte("/a0"),
  474. Target: pb.Compare_CREATE,
  475. Result: pb.Compare_LESS,
  476. TargetUnion: &pb.Compare_CreateRevision{5},
  477. },
  478. true,
  479. },
  480. {
  481. // prefix /a/*; one create rev doesn't fit
  482. pb.Compare{
  483. Key: []byte("/a/"),
  484. RangeEnd: []byte("/a0"),
  485. Target: pb.Compare_CREATE,
  486. Result: pb.Compare_LESS,
  487. TargetUnion: &pb.Compare_CreateRevision{4},
  488. },
  489. false,
  490. },
  491. {
  492. // does not exist, does not succeed
  493. pb.Compare{
  494. Key: []byte("/b/"),
  495. RangeEnd: []byte("/b0"),
  496. Target: pb.Compare_VALUE,
  497. Result: pb.Compare_EQUAL,
  498. TargetUnion: &pb.Compare_Value{[]byte("x")},
  499. },
  500. false,
  501. },
  502. }
  503. kvc := toGRPC(clus.Client(0)).KV
  504. for i, tt := range tests {
  505. txn := &pb.TxnRequest{}
  506. txn.Compare = append(txn.Compare, &tt.cmp)
  507. tresp, err := kvc.Txn(context.TODO(), txn)
  508. if err != nil {
  509. t.Fatal(err)
  510. }
  511. if tt.wSuccess != tresp.Succeeded {
  512. t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
  513. }
  514. }
  515. }
  516. // TestV3TxnNested tests nested txns follow paths as expected.
  517. func TestV3TxnNestedPath(t *testing.T) {
  518. defer testutil.AfterTest(t)
  519. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  520. defer clus.Terminate(t)
  521. kvc := toGRPC(clus.RandClient()).KV
  522. cmpTrue := &pb.Compare{
  523. Result: pb.Compare_EQUAL,
  524. Target: pb.Compare_VERSION,
  525. Key: []byte("k"),
  526. TargetUnion: &pb.Compare_Version{Version: int64(0)},
  527. }
  528. cmpFalse := &pb.Compare{
  529. Result: pb.Compare_EQUAL,
  530. Target: pb.Compare_VERSION,
  531. Key: []byte("k"),
  532. TargetUnion: &pb.Compare_Version{Version: int64(1)},
  533. }
  534. // generate random path to eval txns
  535. topTxn := &pb.TxnRequest{}
  536. txn := topTxn
  537. txnPath := make([]bool, 10)
  538. for i := range txnPath {
  539. nextTxn := &pb.TxnRequest{}
  540. op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
  541. txnPath[i] = rand.Intn(2) == 0
  542. if txnPath[i] {
  543. txn.Compare = append(txn.Compare, cmpTrue)
  544. txn.Success = append(txn.Success, op)
  545. } else {
  546. txn.Compare = append(txn.Compare, cmpFalse)
  547. txn.Failure = append(txn.Failure, op)
  548. }
  549. txn = nextTxn
  550. }
  551. tresp, err := kvc.Txn(context.TODO(), topTxn)
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. curTxnResp := tresp
  556. for i := range txnPath {
  557. if curTxnResp.Succeeded != txnPath[i] {
  558. t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
  559. }
  560. curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
  561. }
  562. }
  563. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  564. func TestV3PutIgnoreValue(t *testing.T) {
  565. defer testutil.AfterTest(t)
  566. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  567. defer clus.Terminate(t)
  568. kvc := toGRPC(clus.RandClient()).KV
  569. key, val := []byte("foo"), []byte("bar")
  570. putReq := pb.PutRequest{Key: key, Value: val}
  571. // create lease
  572. lc := toGRPC(clus.RandClient()).Lease
  573. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. if lresp.Error != "" {
  578. t.Fatal(lresp.Error)
  579. }
  580. tests := []struct {
  581. putFunc func() error
  582. putErr error
  583. wleaseID int64
  584. }{
  585. { // put failure for non-existent key
  586. func() error {
  587. preq := putReq
  588. preq.IgnoreValue = true
  589. _, err := kvc.Put(context.TODO(), &preq)
  590. return err
  591. },
  592. rpctypes.ErrGRPCKeyNotFound,
  593. 0,
  594. },
  595. { // txn failure for non-existent key
  596. func() error {
  597. preq := putReq
  598. preq.Value = nil
  599. preq.IgnoreValue = true
  600. txn := &pb.TxnRequest{}
  601. txn.Success = append(txn.Success, &pb.RequestOp{
  602. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  603. _, err := kvc.Txn(context.TODO(), txn)
  604. return err
  605. },
  606. rpctypes.ErrGRPCKeyNotFound,
  607. 0,
  608. },
  609. { // put success
  610. func() error {
  611. _, err := kvc.Put(context.TODO(), &putReq)
  612. return err
  613. },
  614. nil,
  615. 0,
  616. },
  617. { // txn success, attach lease
  618. func() error {
  619. preq := putReq
  620. preq.Value = nil
  621. preq.Lease = lresp.ID
  622. preq.IgnoreValue = true
  623. txn := &pb.TxnRequest{}
  624. txn.Success = append(txn.Success, &pb.RequestOp{
  625. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  626. _, err := kvc.Txn(context.TODO(), txn)
  627. return err
  628. },
  629. nil,
  630. lresp.ID,
  631. },
  632. { // non-empty value with ignore_value should error
  633. func() error {
  634. preq := putReq
  635. preq.IgnoreValue = true
  636. _, err := kvc.Put(context.TODO(), &preq)
  637. return err
  638. },
  639. rpctypes.ErrGRPCValueProvided,
  640. 0,
  641. },
  642. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  643. func() error {
  644. preq := putReq
  645. preq.Value = nil
  646. preq.IgnoreValue = true
  647. presp, err := kvc.Put(context.TODO(), &preq)
  648. if err != nil {
  649. return err
  650. }
  651. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  652. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  653. }
  654. return nil
  655. },
  656. nil,
  657. 0,
  658. },
  659. { // revoke lease, ensure detached key doesn't get deleted
  660. func() error {
  661. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  662. return err
  663. },
  664. nil,
  665. 0,
  666. },
  667. }
  668. for i, tt := range tests {
  669. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  670. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  671. }
  672. if tt.putErr != nil {
  673. continue
  674. }
  675. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  676. if err != nil {
  677. t.Fatalf("#%d: %v", i, err)
  678. }
  679. if len(rr.Kvs) != 1 {
  680. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  681. }
  682. if !bytes.Equal(rr.Kvs[0].Value, val) {
  683. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  684. }
  685. if rr.Kvs[0].Lease != tt.wleaseID {
  686. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  687. }
  688. }
  689. }
  690. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  691. func TestV3PutIgnoreLease(t *testing.T) {
  692. defer testutil.AfterTest(t)
  693. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  694. defer clus.Terminate(t)
  695. kvc := toGRPC(clus.RandClient()).KV
  696. // create lease
  697. lc := toGRPC(clus.RandClient()).Lease
  698. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  699. if err != nil {
  700. t.Fatal(err)
  701. }
  702. if lresp.Error != "" {
  703. t.Fatal(lresp.Error)
  704. }
  705. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  706. putReq := pb.PutRequest{Key: key, Value: val}
  707. tests := []struct {
  708. putFunc func() error
  709. putErr error
  710. wleaseID int64
  711. wvalue []byte
  712. }{
  713. { // put failure for non-existent key
  714. func() error {
  715. preq := putReq
  716. preq.IgnoreLease = true
  717. _, err := kvc.Put(context.TODO(), &preq)
  718. return err
  719. },
  720. rpctypes.ErrGRPCKeyNotFound,
  721. 0,
  722. nil,
  723. },
  724. { // txn failure for non-existent key
  725. func() error {
  726. preq := putReq
  727. preq.IgnoreLease = true
  728. txn := &pb.TxnRequest{}
  729. txn.Success = append(txn.Success, &pb.RequestOp{
  730. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  731. _, err := kvc.Txn(context.TODO(), txn)
  732. return err
  733. },
  734. rpctypes.ErrGRPCKeyNotFound,
  735. 0,
  736. nil,
  737. },
  738. { // put success
  739. func() error {
  740. preq := putReq
  741. preq.Lease = lresp.ID
  742. _, err := kvc.Put(context.TODO(), &preq)
  743. return err
  744. },
  745. nil,
  746. lresp.ID,
  747. val,
  748. },
  749. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  750. func() error {
  751. preq := putReq
  752. preq.Value = val1
  753. preq.IgnoreLease = true
  754. txn := &pb.TxnRequest{}
  755. txn.Success = append(txn.Success, &pb.RequestOp{
  756. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  757. _, err := kvc.Txn(context.TODO(), txn)
  758. return err
  759. },
  760. nil,
  761. lresp.ID,
  762. val1,
  763. },
  764. { // non-empty lease with ignore_lease should error
  765. func() error {
  766. preq := putReq
  767. preq.Lease = lresp.ID
  768. preq.IgnoreLease = true
  769. _, err := kvc.Put(context.TODO(), &preq)
  770. return err
  771. },
  772. rpctypes.ErrGRPCLeaseProvided,
  773. 0,
  774. nil,
  775. },
  776. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  777. func() error {
  778. presp, err := kvc.Put(context.TODO(), &putReq)
  779. if err != nil {
  780. return err
  781. }
  782. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  783. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  784. }
  785. return nil
  786. },
  787. nil,
  788. 0,
  789. val,
  790. },
  791. { // revoke lease, ensure detached key doesn't get deleted
  792. func() error {
  793. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  794. return err
  795. },
  796. nil,
  797. 0,
  798. val,
  799. },
  800. }
  801. for i, tt := range tests {
  802. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  803. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  804. }
  805. if tt.putErr != nil {
  806. continue
  807. }
  808. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  809. if err != nil {
  810. t.Fatalf("#%d: %v", i, err)
  811. }
  812. if len(rr.Kvs) != 1 {
  813. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  814. }
  815. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  816. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  817. }
  818. if rr.Kvs[0].Lease != tt.wleaseID {
  819. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  820. }
  821. }
  822. }
  823. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  824. func TestV3PutMissingLease(t *testing.T) {
  825. defer testutil.AfterTest(t)
  826. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  827. defer clus.Terminate(t)
  828. kvc := toGRPC(clus.RandClient()).KV
  829. key := []byte("foo")
  830. preq := &pb.PutRequest{Key: key, Lease: 123456}
  831. tests := []func(){
  832. // put case
  833. func() {
  834. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  835. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  836. }
  837. },
  838. // txn success case
  839. func() {
  840. txn := &pb.TxnRequest{}
  841. txn.Success = append(txn.Success, &pb.RequestOp{
  842. Request: &pb.RequestOp_RequestPut{
  843. RequestPut: preq}})
  844. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  845. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  846. }
  847. },
  848. // txn failure case
  849. func() {
  850. txn := &pb.TxnRequest{}
  851. txn.Failure = append(txn.Failure, &pb.RequestOp{
  852. Request: &pb.RequestOp_RequestPut{
  853. RequestPut: preq}})
  854. cmp := &pb.Compare{
  855. Result: pb.Compare_GREATER,
  856. Target: pb.Compare_CREATE,
  857. Key: []byte("bar"),
  858. }
  859. txn.Compare = append(txn.Compare, cmp)
  860. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  861. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  862. }
  863. },
  864. // ignore bad lease in failure on success txn
  865. func() {
  866. txn := &pb.TxnRequest{}
  867. rreq := &pb.RangeRequest{Key: []byte("bar")}
  868. txn.Success = append(txn.Success, &pb.RequestOp{
  869. Request: &pb.RequestOp_RequestRange{
  870. RequestRange: rreq}})
  871. txn.Failure = append(txn.Failure, &pb.RequestOp{
  872. Request: &pb.RequestOp_RequestPut{
  873. RequestPut: preq}})
  874. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  875. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  876. }
  877. },
  878. }
  879. for i, f := range tests {
  880. f()
  881. // key shouldn't have been stored
  882. rreq := &pb.RangeRequest{Key: key}
  883. rresp, err := kvc.Range(context.TODO(), rreq)
  884. if err != nil {
  885. t.Errorf("#%d. could not rangereq (%v)", i, err)
  886. } else if len(rresp.Kvs) != 0 {
  887. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  888. }
  889. }
  890. }
  891. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  892. func TestV3DeleteRange(t *testing.T) {
  893. defer testutil.AfterTest(t)
  894. tests := []struct {
  895. keySet []string
  896. begin string
  897. end string
  898. prevKV bool
  899. wantSet [][]byte
  900. deleted int64
  901. }{
  902. // delete middle
  903. {
  904. []string{"foo", "foo/abc", "fop"},
  905. "foo/", "fop", false,
  906. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  907. },
  908. // no delete
  909. {
  910. []string{"foo", "foo/abc", "fop"},
  911. "foo/", "foo/", false,
  912. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  913. },
  914. // delete first
  915. {
  916. []string{"foo", "foo/abc", "fop"},
  917. "fo", "fop", false,
  918. [][]byte{[]byte("fop")}, 2,
  919. },
  920. // delete tail
  921. {
  922. []string{"foo", "foo/abc", "fop"},
  923. "foo/", "fos", false,
  924. [][]byte{[]byte("foo")}, 2,
  925. },
  926. // delete exact
  927. {
  928. []string{"foo", "foo/abc", "fop"},
  929. "foo/abc", "", false,
  930. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  931. },
  932. // delete none, [x,x)
  933. {
  934. []string{"foo"},
  935. "foo", "foo", false,
  936. [][]byte{[]byte("foo")}, 0,
  937. },
  938. // delete middle with preserveKVs set
  939. {
  940. []string{"foo", "foo/abc", "fop"},
  941. "foo/", "fop", true,
  942. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  943. },
  944. }
  945. for i, tt := range tests {
  946. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  947. kvc := toGRPC(clus.RandClient()).KV
  948. ks := tt.keySet
  949. for j := range ks {
  950. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  951. _, err := kvc.Put(context.TODO(), reqput)
  952. if err != nil {
  953. t.Fatalf("couldn't put key (%v)", err)
  954. }
  955. }
  956. dreq := &pb.DeleteRangeRequest{
  957. Key: []byte(tt.begin),
  958. RangeEnd: []byte(tt.end),
  959. PrevKv: tt.prevKV,
  960. }
  961. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  962. if err != nil {
  963. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  964. }
  965. if tt.deleted != dresp.Deleted {
  966. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  967. }
  968. if tt.prevKV {
  969. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  970. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  971. }
  972. }
  973. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  974. rresp, err := kvc.Range(context.TODO(), rreq)
  975. if err != nil {
  976. t.Errorf("couldn't get range on test %v (%v)", i, err)
  977. }
  978. if dresp.Header.Revision != rresp.Header.Revision {
  979. t.Errorf("expected revision %v, got %v",
  980. dresp.Header.Revision, rresp.Header.Revision)
  981. }
  982. keys := [][]byte{}
  983. for j := range rresp.Kvs {
  984. keys = append(keys, rresp.Kvs[j].Key)
  985. }
  986. if !reflect.DeepEqual(tt.wantSet, keys) {
  987. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  988. }
  989. // can't defer because tcp ports will be in use
  990. clus.Terminate(t)
  991. }
  992. }
  993. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  994. func TestV3TxnInvalidRange(t *testing.T) {
  995. defer testutil.AfterTest(t)
  996. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  997. defer clus.Terminate(t)
  998. kvc := toGRPC(clus.RandClient()).KV
  999. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1000. for i := 0; i < 3; i++ {
  1001. _, err := kvc.Put(context.Background(), preq)
  1002. if err != nil {
  1003. t.Fatalf("couldn't put key (%v)", err)
  1004. }
  1005. }
  1006. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  1007. if err != nil {
  1008. t.Fatalf("couldn't compact kv space (%v)", err)
  1009. }
  1010. // future rev
  1011. txn := &pb.TxnRequest{}
  1012. txn.Success = append(txn.Success, &pb.RequestOp{
  1013. Request: &pb.RequestOp_RequestPut{
  1014. RequestPut: preq}})
  1015. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  1016. txn.Success = append(txn.Success, &pb.RequestOp{
  1017. Request: &pb.RequestOp_RequestRange{
  1018. RequestRange: rreq}})
  1019. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  1020. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  1021. }
  1022. // compacted rev
  1023. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  1024. tv.RequestRange.Revision = 1
  1025. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  1026. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  1027. }
  1028. }
  1029. func TestV3TooLargeRequest(t *testing.T) {
  1030. defer testutil.AfterTest(t)
  1031. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1032. defer clus.Terminate(t)
  1033. kvc := toGRPC(clus.RandClient()).KV
  1034. // 2MB request value
  1035. largeV := make([]byte, 2*1024*1024)
  1036. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  1037. _, err := kvc.Put(context.Background(), preq)
  1038. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  1039. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  1040. }
  1041. }
  1042. // TestV3Hash tests hash.
  1043. func TestV3Hash(t *testing.T) {
  1044. defer testutil.AfterTest(t)
  1045. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1046. defer clus.Terminate(t)
  1047. cli := clus.RandClient()
  1048. kvc := toGRPC(cli).KV
  1049. m := toGRPC(cli).Maintenance
  1050. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1051. for i := 0; i < 3; i++ {
  1052. _, err := kvc.Put(context.Background(), preq)
  1053. if err != nil {
  1054. t.Fatalf("couldn't put key (%v)", err)
  1055. }
  1056. }
  1057. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  1058. if err != nil || resp.Hash == 0 {
  1059. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1060. }
  1061. }
  1062. // TestV3HashRestart ensures that hash stays the same after restart.
  1063. func TestV3HashRestart(t *testing.T) {
  1064. defer testutil.AfterTest(t)
  1065. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  1066. defer clus.Terminate(t)
  1067. cli := clus.RandClient()
  1068. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1069. if err != nil || resp.Hash == 0 {
  1070. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1071. }
  1072. hash1 := resp.Hash
  1073. clus.Members[0].Stop(t)
  1074. clus.Members[0].Restart(t)
  1075. clus.waitLeader(t, clus.Members)
  1076. kvc := toGRPC(clus.Client(0)).KV
  1077. waitForRestart(t, kvc)
  1078. cli = clus.RandClient()
  1079. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  1080. if err != nil || resp.Hash == 0 {
  1081. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  1082. }
  1083. hash2 := resp.Hash
  1084. if hash1 != hash2 {
  1085. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  1086. }
  1087. }
  1088. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  1089. func TestV3StorageQuotaAPI(t *testing.T) {
  1090. defer testutil.AfterTest(t)
  1091. quotasize := int64(16 * os.Getpagesize())
  1092. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1093. // Set a quota on one node
  1094. clus.Members[0].QuotaBackendBytes = quotasize
  1095. clus.Members[0].Stop(t)
  1096. clus.Members[0].Restart(t)
  1097. defer clus.Terminate(t)
  1098. kvc := toGRPC(clus.Client(0)).KV
  1099. waitForRestart(t, kvc)
  1100. key := []byte("abc")
  1101. // test small put that fits in quota
  1102. smallbuf := make([]byte, 512)
  1103. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1104. t.Fatal(err)
  1105. }
  1106. // test big put
  1107. bigbuf := make([]byte, quotasize)
  1108. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1109. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1110. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1111. }
  1112. // test big txn
  1113. puttxn := &pb.RequestOp{
  1114. Request: &pb.RequestOp_RequestPut{
  1115. RequestPut: &pb.PutRequest{
  1116. Key: key,
  1117. Value: bigbuf,
  1118. },
  1119. },
  1120. }
  1121. txnreq := &pb.TxnRequest{}
  1122. txnreq.Success = append(txnreq.Success, puttxn)
  1123. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  1124. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  1125. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1126. }
  1127. }
  1128. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  1129. func TestV3StorageQuotaApply(t *testing.T) {
  1130. testutil.AfterTest(t)
  1131. quotasize := int64(16 * os.Getpagesize())
  1132. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  1133. defer clus.Terminate(t)
  1134. kvc0 := toGRPC(clus.Client(0)).KV
  1135. kvc1 := toGRPC(clus.Client(1)).KV
  1136. // Set a quota on one node
  1137. clus.Members[0].QuotaBackendBytes = quotasize
  1138. clus.Members[0].Stop(t)
  1139. clus.Members[0].Restart(t)
  1140. clus.waitLeader(t, clus.Members)
  1141. waitForRestart(t, kvc0)
  1142. key := []byte("abc")
  1143. // test small put still works
  1144. smallbuf := make([]byte, 1024)
  1145. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1146. if serr != nil {
  1147. t.Fatal(serr)
  1148. }
  1149. // test big put
  1150. bigbuf := make([]byte, quotasize)
  1151. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  1152. if err != nil {
  1153. t.Fatal(err)
  1154. }
  1155. // quorum get should work regardless of whether alarm is raised
  1156. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  1157. if err != nil {
  1158. t.Fatal(err)
  1159. }
  1160. // wait until alarm is raised for sure-- poll the alarms
  1161. stopc := time.After(5 * time.Second)
  1162. for {
  1163. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  1164. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  1165. if aerr != nil {
  1166. t.Fatal(aerr)
  1167. }
  1168. if len(resp.Alarms) != 0 {
  1169. break
  1170. }
  1171. select {
  1172. case <-stopc:
  1173. t.Fatalf("timed out waiting for alarm")
  1174. case <-time.After(10 * time.Millisecond):
  1175. }
  1176. }
  1177. // small quota machine should reject put
  1178. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1179. t.Fatalf("past-quota instance should reject put")
  1180. }
  1181. // large quota machine should reject put
  1182. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1183. t.Fatalf("past-quota instance should reject put")
  1184. }
  1185. // reset large quota node to ensure alarm persisted
  1186. clus.Members[1].Stop(t)
  1187. clus.Members[1].Restart(t)
  1188. clus.waitLeader(t, clus.Members)
  1189. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  1190. t.Fatalf("alarmed instance should reject put after reset")
  1191. }
  1192. }
  1193. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  1194. func TestV3AlarmDeactivate(t *testing.T) {
  1195. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1196. defer clus.Terminate(t)
  1197. kvc := toGRPC(clus.RandClient()).KV
  1198. mt := toGRPC(clus.RandClient()).Maintenance
  1199. alarmReq := &pb.AlarmRequest{
  1200. MemberID: 123,
  1201. Action: pb.AlarmRequest_ACTIVATE,
  1202. Alarm: pb.AlarmType_NOSPACE,
  1203. }
  1204. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  1205. t.Fatal(err)
  1206. }
  1207. key := []byte("abc")
  1208. smallbuf := make([]byte, 512)
  1209. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  1210. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  1211. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  1212. }
  1213. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  1214. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  1215. t.Fatal(err)
  1216. }
  1217. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  1218. t.Fatal(err)
  1219. }
  1220. }
  1221. func TestV3RangeRequest(t *testing.T) {
  1222. defer testutil.AfterTest(t)
  1223. tests := []struct {
  1224. putKeys []string
  1225. reqs []pb.RangeRequest
  1226. wresps [][]string
  1227. wmores []bool
  1228. }{
  1229. // single key
  1230. {
  1231. []string{"foo", "bar"},
  1232. []pb.RangeRequest{
  1233. // exists
  1234. {Key: []byte("foo")},
  1235. // doesn't exist
  1236. {Key: []byte("baz")},
  1237. },
  1238. [][]string{
  1239. {"foo"},
  1240. {},
  1241. },
  1242. []bool{false, false},
  1243. },
  1244. // multi-key
  1245. {
  1246. []string{"a", "b", "c", "d", "e"},
  1247. []pb.RangeRequest{
  1248. // all in range
  1249. {Key: []byte("a"), RangeEnd: []byte("z")},
  1250. // [b, d)
  1251. {Key: []byte("b"), RangeEnd: []byte("d")},
  1252. // out of range
  1253. {Key: []byte("f"), RangeEnd: []byte("z")},
  1254. // [c,c) = empty
  1255. {Key: []byte("c"), RangeEnd: []byte("c")},
  1256. // [d, b) = empty
  1257. {Key: []byte("d"), RangeEnd: []byte("b")},
  1258. // ["\0", "\0") => all in range
  1259. {Key: []byte{0}, RangeEnd: []byte{0}},
  1260. },
  1261. [][]string{
  1262. {"a", "b", "c", "d", "e"},
  1263. {"b", "c"},
  1264. {},
  1265. {},
  1266. {},
  1267. {"a", "b", "c", "d", "e"},
  1268. },
  1269. []bool{false, false, false, false, false, false},
  1270. },
  1271. // revision
  1272. {
  1273. []string{"a", "b", "c", "d", "e"},
  1274. []pb.RangeRequest{
  1275. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1276. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1277. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1278. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1279. },
  1280. [][]string{
  1281. {"a", "b", "c", "d", "e"},
  1282. {},
  1283. {"a"},
  1284. {"a", "b"},
  1285. },
  1286. []bool{false, false, false, false},
  1287. },
  1288. // limit
  1289. {
  1290. []string{"foo", "bar"},
  1291. []pb.RangeRequest{
  1292. // more
  1293. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1294. // no more
  1295. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1296. },
  1297. [][]string{
  1298. {"bar"},
  1299. {"bar", "foo"},
  1300. },
  1301. []bool{true, false},
  1302. },
  1303. // sort
  1304. {
  1305. []string{"b", "a", "c", "d", "c"},
  1306. []pb.RangeRequest{
  1307. {
  1308. Key: []byte("a"), RangeEnd: []byte("z"),
  1309. Limit: 1,
  1310. SortOrder: pb.RangeRequest_ASCEND,
  1311. SortTarget: pb.RangeRequest_KEY,
  1312. },
  1313. {
  1314. Key: []byte("a"), RangeEnd: []byte("z"),
  1315. Limit: 1,
  1316. SortOrder: pb.RangeRequest_DESCEND,
  1317. SortTarget: pb.RangeRequest_KEY,
  1318. },
  1319. {
  1320. Key: []byte("a"), RangeEnd: []byte("z"),
  1321. Limit: 1,
  1322. SortOrder: pb.RangeRequest_ASCEND,
  1323. SortTarget: pb.RangeRequest_CREATE,
  1324. },
  1325. {
  1326. Key: []byte("a"), RangeEnd: []byte("z"),
  1327. Limit: 1,
  1328. SortOrder: pb.RangeRequest_DESCEND,
  1329. SortTarget: pb.RangeRequest_MOD,
  1330. },
  1331. {
  1332. Key: []byte("z"), RangeEnd: []byte("z"),
  1333. Limit: 1,
  1334. SortOrder: pb.RangeRequest_DESCEND,
  1335. SortTarget: pb.RangeRequest_CREATE,
  1336. },
  1337. { // sort ASCEND by default
  1338. Key: []byte("a"), RangeEnd: []byte("z"),
  1339. Limit: 10,
  1340. SortOrder: pb.RangeRequest_NONE,
  1341. SortTarget: pb.RangeRequest_CREATE,
  1342. },
  1343. },
  1344. [][]string{
  1345. {"a"},
  1346. {"d"},
  1347. {"b"},
  1348. {"c"},
  1349. {},
  1350. {"b", "a", "c", "d"},
  1351. },
  1352. []bool{true, true, true, true, false, false},
  1353. },
  1354. // min/max mod rev
  1355. {
  1356. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1357. []pb.RangeRequest{
  1358. {
  1359. Key: []byte{0}, RangeEnd: []byte{0},
  1360. MinModRevision: 3,
  1361. },
  1362. {
  1363. Key: []byte{0}, RangeEnd: []byte{0},
  1364. MaxModRevision: 3,
  1365. },
  1366. {
  1367. Key: []byte{0}, RangeEnd: []byte{0},
  1368. MinModRevision: 3,
  1369. MaxModRevision: 5,
  1370. },
  1371. {
  1372. Key: []byte{0}, RangeEnd: []byte{0},
  1373. MaxModRevision: 10,
  1374. },
  1375. },
  1376. [][]string{
  1377. {"rev3", "rev4", "rev5", "rev6"},
  1378. {"rev2", "rev3"},
  1379. {"rev3", "rev4", "rev5"},
  1380. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1381. },
  1382. []bool{false, false, false, false},
  1383. },
  1384. // min/max create rev
  1385. {
  1386. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1387. []pb.RangeRequest{
  1388. {
  1389. Key: []byte{0}, RangeEnd: []byte{0},
  1390. MinCreateRevision: 3,
  1391. },
  1392. {
  1393. Key: []byte{0}, RangeEnd: []byte{0},
  1394. MaxCreateRevision: 3,
  1395. },
  1396. {
  1397. Key: []byte{0}, RangeEnd: []byte{0},
  1398. MinCreateRevision: 3,
  1399. MaxCreateRevision: 5,
  1400. },
  1401. {
  1402. Key: []byte{0}, RangeEnd: []byte{0},
  1403. MaxCreateRevision: 10,
  1404. },
  1405. },
  1406. [][]string{
  1407. {"rev3", "rev6"},
  1408. {"rev2", "rev3"},
  1409. {"rev3"},
  1410. {"rev2", "rev3", "rev6"},
  1411. },
  1412. []bool{false, false, false, false},
  1413. },
  1414. }
  1415. for i, tt := range tests {
  1416. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1417. for _, k := range tt.putKeys {
  1418. kvc := toGRPC(clus.RandClient()).KV
  1419. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1420. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1421. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1422. }
  1423. }
  1424. for j, req := range tt.reqs {
  1425. kvc := toGRPC(clus.RandClient()).KV
  1426. resp, err := kvc.Range(context.TODO(), &req)
  1427. if err != nil {
  1428. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1429. continue
  1430. }
  1431. if len(resp.Kvs) != len(tt.wresps[j]) {
  1432. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1433. continue
  1434. }
  1435. for k, wKey := range tt.wresps[j] {
  1436. respKey := string(resp.Kvs[k].Key)
  1437. if respKey != wKey {
  1438. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1439. }
  1440. }
  1441. if resp.More != tt.wmores[j] {
  1442. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1443. }
  1444. wrev := int64(len(tt.putKeys) + 1)
  1445. if resp.Header.Revision != wrev {
  1446. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1447. }
  1448. }
  1449. clus.Terminate(t)
  1450. }
  1451. }
  1452. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1453. cfg.UseGRPC = true
  1454. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1455. clus.Launch(t)
  1456. return clus
  1457. }
  1458. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1459. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1460. defer testutil.AfterTest(t)
  1461. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1462. clus := newClusterV3NoClients(t, &cfg)
  1463. defer clus.Terminate(t)
  1464. // nil out TLS field so client will use an insecure connection
  1465. clus.Members[0].ClientTLSInfo = nil
  1466. client, err := NewClientV3(clus.Members[0])
  1467. if err != nil && err != grpc.ErrClientConnTimeout {
  1468. t.Fatalf("unexpected error (%v)", err)
  1469. } else if client == nil {
  1470. // Ideally, no client would be returned. However, grpc will
  1471. // return a connection without trying to handshake first so
  1472. // the connection appears OK.
  1473. return
  1474. }
  1475. defer client.Close()
  1476. donec := make(chan error, 1)
  1477. go func() {
  1478. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1479. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1480. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1481. cancel()
  1482. donec <- perr
  1483. }()
  1484. if perr := <-donec; perr == nil {
  1485. t.Fatalf("expected client error on put")
  1486. }
  1487. }
  1488. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1489. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1490. defer testutil.AfterTest(t)
  1491. cfg := ClusterConfig{Size: 3}
  1492. clus := newClusterV3NoClients(t, &cfg)
  1493. defer clus.Terminate(t)
  1494. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1495. client, err := NewClientV3(clus.Members[0])
  1496. if client != nil || err == nil {
  1497. t.Fatalf("expected no client")
  1498. } else if err != grpc.ErrClientConnTimeout {
  1499. t.Fatalf("unexpected error (%v)", err)
  1500. }
  1501. }
  1502. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1503. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1504. defer testutil.AfterTest(t)
  1505. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1506. clus := newClusterV3NoClients(t, &cfg)
  1507. defer clus.Terminate(t)
  1508. client, err := NewClientV3(clus.Members[0])
  1509. if err != nil {
  1510. t.Fatalf("expected tls client (%v)", err)
  1511. }
  1512. defer client.Close()
  1513. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1514. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1515. t.Fatalf("unexpected error on put over tls (%v)", err)
  1516. }
  1517. }
  1518. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1519. // when all certs are atomically replaced by directory renaming.
  1520. // And expects server to reject client requests, and vice versa.
  1521. func TestTLSReloadAtomicReplace(t *testing.T) {
  1522. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1523. if err != nil {
  1524. t.Fatal(err)
  1525. }
  1526. os.RemoveAll(tmpDir)
  1527. defer os.RemoveAll(tmpDir)
  1528. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1529. if err != nil {
  1530. t.Fatal(err)
  1531. }
  1532. defer os.RemoveAll(certsDir)
  1533. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1534. if err != nil {
  1535. t.Fatal(err)
  1536. }
  1537. defer os.RemoveAll(certsDirExp)
  1538. cloneFunc := func() transport.TLSInfo {
  1539. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1540. if terr != nil {
  1541. t.Fatal(terr)
  1542. }
  1543. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1544. t.Fatal(err)
  1545. }
  1546. return tlsInfo
  1547. }
  1548. replaceFunc := func() {
  1549. if err = os.Rename(certsDir, tmpDir); err != nil {
  1550. t.Fatal(err)
  1551. }
  1552. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1553. t.Fatal(err)
  1554. }
  1555. // after rename,
  1556. // 'certsDir' contains expired certs
  1557. // 'tmpDir' contains valid certs
  1558. // 'certsDirExp' does not exist
  1559. }
  1560. revertFunc := func() {
  1561. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1562. t.Fatal(err)
  1563. }
  1564. if err = os.Rename(certsDir, tmpDir); err != nil {
  1565. t.Fatal(err)
  1566. }
  1567. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1568. t.Fatal(err)
  1569. }
  1570. }
  1571. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1572. }
  1573. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1574. // when new certs are copied over, one by one. And expects server
  1575. // to reject client requests, and vice versa.
  1576. func TestTLSReloadCopy(t *testing.T) {
  1577. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1578. if err != nil {
  1579. t.Fatal(err)
  1580. }
  1581. defer os.RemoveAll(certsDir)
  1582. cloneFunc := func() transport.TLSInfo {
  1583. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1584. if terr != nil {
  1585. t.Fatal(terr)
  1586. }
  1587. return tlsInfo
  1588. }
  1589. replaceFunc := func() {
  1590. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1591. t.Fatal(err)
  1592. }
  1593. }
  1594. revertFunc := func() {
  1595. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1596. t.Fatal(err)
  1597. }
  1598. }
  1599. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1600. }
  1601. func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
  1602. defer testutil.AfterTest(t)
  1603. // 1. separate copies for TLS assets modification
  1604. tlsInfo := cloneFunc()
  1605. // 2. start cluster with valid certs
  1606. clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
  1607. defer clus.Terminate(t)
  1608. // 3. concurrent client dialing while certs become expired
  1609. errc := make(chan error, 1)
  1610. go func() {
  1611. for {
  1612. cc, err := tlsInfo.ClientConfig()
  1613. if err != nil {
  1614. // errors in 'go/src/crypto/tls/tls.go'
  1615. // tls: private key does not match public key
  1616. // tls: failed to find any PEM data in key input
  1617. // tls: failed to find any PEM data in certificate input
  1618. // Or 'does not exist', 'not found', etc
  1619. t.Log(err)
  1620. continue
  1621. }
  1622. cli, cerr := clientv3.New(clientv3.Config{
  1623. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1624. DialTimeout: time.Second,
  1625. TLS: cc,
  1626. })
  1627. if cerr != nil {
  1628. errc <- cerr
  1629. return
  1630. }
  1631. cli.Close()
  1632. }
  1633. }()
  1634. // 4. replace certs with expired ones
  1635. replaceFunc()
  1636. // 5. expect dial time-out when loading expired certs
  1637. select {
  1638. case gerr := <-errc:
  1639. if gerr != grpc.ErrClientConnTimeout {
  1640. t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr)
  1641. }
  1642. case <-time.After(5 * time.Second):
  1643. t.Fatal("failed to receive dial timeout error")
  1644. }
  1645. // 6. replace expired certs back with valid ones
  1646. revertFunc()
  1647. // 7. new requests should trigger listener to reload valid certs
  1648. tls, terr := tlsInfo.ClientConfig()
  1649. if terr != nil {
  1650. t.Fatal(terr)
  1651. }
  1652. cl, cerr := clientv3.New(clientv3.Config{
  1653. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1654. DialTimeout: 5 * time.Second,
  1655. TLS: tls,
  1656. })
  1657. if cerr != nil {
  1658. t.Fatalf("expected no error, got %v", cerr)
  1659. }
  1660. cl.Close()
  1661. }
  1662. func TestGRPCRequireLeader(t *testing.T) {
  1663. defer testutil.AfterTest(t)
  1664. cfg := ClusterConfig{Size: 3}
  1665. clus := newClusterV3NoClients(t, &cfg)
  1666. defer clus.Terminate(t)
  1667. clus.Members[1].Stop(t)
  1668. clus.Members[2].Stop(t)
  1669. client, err := NewClientV3(clus.Members[0])
  1670. if err != nil {
  1671. t.Fatalf("cannot create client: %v", err)
  1672. }
  1673. defer client.Close()
  1674. // wait for election timeout, then member[0] will not have a leader.
  1675. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1676. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1677. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1678. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1679. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1680. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1681. }
  1682. }
  1683. func TestGRPCStreamRequireLeader(t *testing.T) {
  1684. defer testutil.AfterTest(t)
  1685. cfg := ClusterConfig{Size: 3}
  1686. clus := newClusterV3NoClients(t, &cfg)
  1687. defer clus.Terminate(t)
  1688. client, err := NewClientV3(clus.Members[0])
  1689. if err != nil {
  1690. t.Fatalf("failed to create client (%v)", err)
  1691. }
  1692. defer client.Close()
  1693. wAPI := toGRPC(client).Watch
  1694. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1695. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1696. wStream, err := wAPI.Watch(ctx)
  1697. if err != nil {
  1698. t.Fatalf("wAPI.Watch error: %v", err)
  1699. }
  1700. clus.Members[1].Stop(t)
  1701. clus.Members[2].Stop(t)
  1702. // existing stream should be rejected
  1703. _, err = wStream.Recv()
  1704. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1705. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1706. }
  1707. // new stream should also be rejected
  1708. wStream, err = wAPI.Watch(ctx)
  1709. if err != nil {
  1710. t.Fatalf("wAPI.Watch error: %v", err)
  1711. }
  1712. _, err = wStream.Recv()
  1713. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1714. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1715. }
  1716. clus.Members[1].Restart(t)
  1717. clus.Members[2].Restart(t)
  1718. clus.waitLeader(t, clus.Members)
  1719. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1720. // new stream should also be OK now after we restarted the other members
  1721. wStream, err = wAPI.Watch(ctx)
  1722. if err != nil {
  1723. t.Fatalf("wAPI.Watch error: %v", err)
  1724. }
  1725. wreq := &pb.WatchRequest{
  1726. RequestUnion: &pb.WatchRequest_CreateRequest{
  1727. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1728. },
  1729. }
  1730. err = wStream.Send(wreq)
  1731. if err != nil {
  1732. t.Errorf("err = %v, want nil", err)
  1733. }
  1734. }
  1735. // TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
  1736. func TestV3PutLargeRequests(t *testing.T) {
  1737. defer testutil.AfterTest(t)
  1738. tests := []struct {
  1739. key string
  1740. maxRequestBytes uint
  1741. valueSize int
  1742. expectError error
  1743. }{
  1744. // don't set to 0. use 0 as the default.
  1745. {"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1746. {"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1747. {"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1748. {"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1749. }
  1750. for i, test := range tests {
  1751. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1752. kvcli := toGRPC(clus.Client(0)).KV
  1753. reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
  1754. _, err := kvcli.Put(context.TODO(), reqput)
  1755. if !eqErrGRPC(err, test.expectError) {
  1756. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1757. }
  1758. clus.Terminate(t)
  1759. }
  1760. }
  1761. func eqErrGRPC(err1 error, err2 error) bool {
  1762. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1763. }
  1764. // waitForRestart tries a range request until the client's server responds.
  1765. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1766. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1767. // FailFast=false works with Put.
  1768. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1769. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1770. if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1771. t.Fatal(err)
  1772. }
  1773. }