v3_grpc_test.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "math/rand"
  20. "os"
  21. "reflect"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  26. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  27. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  28. "github.com/coreos/etcd/pkg/testutil"
  29. "github.com/coreos/etcd/pkg/transport"
  30. "golang.org/x/net/context"
  31. "google.golang.org/grpc"
  32. "google.golang.org/grpc/metadata"
  33. )
  34. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  35. // overwrites it, then checks that the change was applied.
  36. func TestV3PutOverwrite(t *testing.T) {
  37. defer testutil.AfterTest(t)
  38. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  39. defer clus.Terminate(t)
  40. kvc := toGRPC(clus.RandClient()).KV
  41. key := []byte("foo")
  42. reqput := &pb.PutRequest{Key: key, Value: []byte("bar"), PrevKv: true}
  43. respput, err := kvc.Put(context.TODO(), reqput)
  44. if err != nil {
  45. t.Fatalf("couldn't put key (%v)", err)
  46. }
  47. // overwrite
  48. reqput.Value = []byte("baz")
  49. respput2, err := kvc.Put(context.TODO(), reqput)
  50. if err != nil {
  51. t.Fatalf("couldn't put key (%v)", err)
  52. }
  53. if respput2.Header.Revision <= respput.Header.Revision {
  54. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  55. respput2.Header.Revision, respput.Header.Revision)
  56. }
  57. if pkv := respput2.PrevKv; pkv == nil || string(pkv.Value) != "bar" {
  58. t.Fatalf("expected PrevKv=bar, got response %+v", respput2)
  59. }
  60. reqrange := &pb.RangeRequest{Key: key}
  61. resprange, err := kvc.Range(context.TODO(), reqrange)
  62. if err != nil {
  63. t.Fatalf("couldn't get key (%v)", err)
  64. }
  65. if len(resprange.Kvs) != 1 {
  66. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  67. }
  68. kv := resprange.Kvs[0]
  69. if kv.ModRevision <= kv.CreateRevision {
  70. t.Errorf("expected modRev > createRev, got %d <= %d",
  71. kv.ModRevision, kv.CreateRevision)
  72. }
  73. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  74. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  75. }
  76. }
  77. // TestPutRestart checks if a put after an unrelated member restart succeeds
  78. func TestV3PutRestart(t *testing.T) {
  79. defer testutil.AfterTest(t)
  80. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  81. defer clus.Terminate(t)
  82. kvIdx := rand.Intn(3)
  83. kvc := toGRPC(clus.Client(kvIdx)).KV
  84. stopIdx := kvIdx
  85. for stopIdx == kvIdx {
  86. stopIdx = rand.Intn(3)
  87. }
  88. clus.clients[stopIdx].Close()
  89. clus.Members[stopIdx].Stop(t)
  90. clus.Members[stopIdx].Restart(t)
  91. c, cerr := NewClientV3(clus.Members[stopIdx])
  92. if cerr != nil {
  93. t.Fatalf("cannot create client: %v", cerr)
  94. }
  95. clus.clients[stopIdx] = c
  96. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
  97. defer cancel()
  98. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  99. _, err := kvc.Put(ctx, reqput)
  100. if err != nil && err == ctx.Err() {
  101. t.Fatalf("expected grpc error, got local ctx error (%v)", err)
  102. }
  103. }
  104. // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
  105. func TestV3CompactCurrentRev(t *testing.T) {
  106. defer testutil.AfterTest(t)
  107. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  108. defer clus.Terminate(t)
  109. kvc := toGRPC(clus.RandClient()).KV
  110. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  111. for i := 0; i < 3; i++ {
  112. if _, err := kvc.Put(context.Background(), preq); err != nil {
  113. t.Fatalf("couldn't put key (%v)", err)
  114. }
  115. }
  116. // get key to add to proxy cache, if any
  117. if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
  118. t.Fatal(err)
  119. }
  120. // compact on current revision
  121. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
  122. if err != nil {
  123. t.Fatalf("couldn't compact kv space (%v)", err)
  124. }
  125. // key still exists when linearized?
  126. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
  127. if err != nil {
  128. t.Fatalf("couldn't get key after compaction (%v)", err)
  129. }
  130. // key still exists when serialized?
  131. _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
  132. if err != nil {
  133. t.Fatalf("couldn't get serialized key after compaction (%v)", err)
  134. }
  135. }
  136. func TestV3TxnTooManyOps(t *testing.T) {
  137. defer testutil.AfterTest(t)
  138. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  139. defer clus.Terminate(t)
  140. kvc := toGRPC(clus.RandClient()).KV
  141. // unique keys
  142. i := new(int)
  143. keyf := func() []byte {
  144. *i++
  145. return []byte(fmt.Sprintf("key-%d", i))
  146. }
  147. addCompareOps := func(txn *pb.TxnRequest) {
  148. txn.Compare = append(txn.Compare,
  149. &pb.Compare{
  150. Result: pb.Compare_GREATER,
  151. Target: pb.Compare_CREATE,
  152. Key: keyf(),
  153. })
  154. }
  155. addSuccessOps := func(txn *pb.TxnRequest) {
  156. txn.Success = append(txn.Success,
  157. &pb.RequestOp{
  158. Request: &pb.RequestOp_RequestPut{
  159. RequestPut: &pb.PutRequest{
  160. Key: keyf(),
  161. Value: []byte("bar"),
  162. },
  163. },
  164. })
  165. }
  166. addFailureOps := func(txn *pb.TxnRequest) {
  167. txn.Failure = append(txn.Failure,
  168. &pb.RequestOp{
  169. Request: &pb.RequestOp_RequestPut{
  170. RequestPut: &pb.PutRequest{
  171. Key: keyf(),
  172. Value: []byte("bar"),
  173. },
  174. },
  175. })
  176. }
  177. tests := []func(txn *pb.TxnRequest){
  178. addCompareOps,
  179. addSuccessOps,
  180. addFailureOps,
  181. }
  182. for i, tt := range tests {
  183. txn := &pb.TxnRequest{}
  184. for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
  185. tt(txn)
  186. }
  187. _, err := kvc.Txn(context.Background(), txn)
  188. if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
  189. t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
  190. }
  191. }
  192. }
  193. func TestV3TxnDuplicateKeys(t *testing.T) {
  194. defer testutil.AfterTest(t)
  195. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  196. defer clus.Terminate(t)
  197. putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  198. delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  199. RequestDeleteRange: &pb.DeleteRangeRequest{
  200. Key: []byte("abc"),
  201. },
  202. },
  203. }
  204. delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  205. RequestDeleteRange: &pb.DeleteRangeRequest{
  206. Key: []byte("a"), RangeEnd: []byte("b"),
  207. },
  208. },
  209. }
  210. delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
  211. RequestDeleteRange: &pb.DeleteRangeRequest{
  212. Key: []byte("abb"), RangeEnd: []byte("abc"),
  213. },
  214. },
  215. }
  216. kvc := toGRPC(clus.RandClient()).KV
  217. tests := []struct {
  218. txnSuccess []*pb.RequestOp
  219. werr error
  220. }{
  221. {
  222. txnSuccess: []*pb.RequestOp{putreq, putreq},
  223. werr: rpctypes.ErrGRPCDuplicateKey,
  224. },
  225. {
  226. txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
  227. werr: rpctypes.ErrGRPCDuplicateKey,
  228. },
  229. {
  230. txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
  231. werr: rpctypes.ErrGRPCDuplicateKey,
  232. },
  233. {
  234. txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  235. werr: nil,
  236. },
  237. {
  238. txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
  239. werr: nil,
  240. },
  241. }
  242. for i, tt := range tests {
  243. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  244. _, err := kvc.Txn(context.Background(), txn)
  245. if !eqErrGRPC(err, tt.werr) {
  246. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  247. }
  248. }
  249. }
  250. // Testv3TxnRevision tests that the transaction header revision is set as expected.
  251. func TestV3TxnRevision(t *testing.T) {
  252. defer testutil.AfterTest(t)
  253. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  254. defer clus.Terminate(t)
  255. kvc := toGRPC(clus.RandClient()).KV
  256. pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
  257. presp, err := kvc.Put(context.TODO(), pr)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
  262. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  263. tresp, err := kvc.Txn(context.TODO(), txn)
  264. if err != nil {
  265. t.Fatal(err)
  266. }
  267. // did not update revision
  268. if presp.Header.Revision != tresp.Header.Revision {
  269. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  270. }
  271. txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
  272. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
  273. tresp, err = kvc.Txn(context.TODO(), txn)
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. // did not update revision
  278. if presp.Header.Revision != tresp.Header.Revision {
  279. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
  280. }
  281. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
  282. txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
  283. tresp, err = kvc.Txn(context.TODO(), txn)
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. // updated revision
  288. if tresp.Header.Revision != presp.Header.Revision+1 {
  289. t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
  290. }
  291. }
  292. // Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
  293. // when compared to the Succeeded field in the txn response.
  294. func TestV3TxnCmpHeaderRev(t *testing.T) {
  295. defer testutil.AfterTest(t)
  296. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  297. defer clus.Terminate(t)
  298. kvc := toGRPC(clus.RandClient()).KV
  299. for i := 0; i < 10; i++ {
  300. // Concurrently put a key with a txn comparing on it.
  301. revc := make(chan int64, 1)
  302. go func() {
  303. defer close(revc)
  304. pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
  305. presp, err := kvc.Put(context.TODO(), pr)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. revc <- presp.Header.Revision
  310. }()
  311. // The read-only txn uses the optimized readindex server path.
  312. txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
  313. RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
  314. txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
  315. // i = 0 /\ Succeeded => put followed txn
  316. cmp := &pb.Compare{
  317. Result: pb.Compare_EQUAL,
  318. Target: pb.Compare_VERSION,
  319. Key: []byte("k"),
  320. TargetUnion: &pb.Compare_Version{Version: int64(i)},
  321. }
  322. txn.Compare = append(txn.Compare, cmp)
  323. tresp, err := kvc.Txn(context.TODO(), txn)
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. prev := <-revc
  328. // put followed txn; should eval to false
  329. if prev > tresp.Header.Revision && !tresp.Succeeded {
  330. t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
  331. }
  332. // txn follows put; should eval to true
  333. if tresp.Header.Revision >= prev && tresp.Succeeded {
  334. t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
  335. }
  336. }
  337. }
  338. // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
  339. func TestV3PutIgnoreValue(t *testing.T) {
  340. defer testutil.AfterTest(t)
  341. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  342. defer clus.Terminate(t)
  343. kvc := toGRPC(clus.RandClient()).KV
  344. key, val := []byte("foo"), []byte("bar")
  345. putReq := pb.PutRequest{Key: key, Value: val}
  346. // create lease
  347. lc := toGRPC(clus.RandClient()).Lease
  348. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  349. if err != nil {
  350. t.Fatal(err)
  351. }
  352. if lresp.Error != "" {
  353. t.Fatal(lresp.Error)
  354. }
  355. tests := []struct {
  356. putFunc func() error
  357. putErr error
  358. wleaseID int64
  359. }{
  360. { // put failure for non-existent key
  361. func() error {
  362. preq := putReq
  363. preq.IgnoreValue = true
  364. _, err := kvc.Put(context.TODO(), &preq)
  365. return err
  366. },
  367. rpctypes.ErrGRPCKeyNotFound,
  368. 0,
  369. },
  370. { // txn failure for non-existent key
  371. func() error {
  372. preq := putReq
  373. preq.Value = nil
  374. preq.IgnoreValue = true
  375. txn := &pb.TxnRequest{}
  376. txn.Success = append(txn.Success, &pb.RequestOp{
  377. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  378. _, err := kvc.Txn(context.TODO(), txn)
  379. return err
  380. },
  381. rpctypes.ErrGRPCKeyNotFound,
  382. 0,
  383. },
  384. { // put success
  385. func() error {
  386. _, err := kvc.Put(context.TODO(), &putReq)
  387. return err
  388. },
  389. nil,
  390. 0,
  391. },
  392. { // txn success, attach lease
  393. func() error {
  394. preq := putReq
  395. preq.Value = nil
  396. preq.Lease = lresp.ID
  397. preq.IgnoreValue = true
  398. txn := &pb.TxnRequest{}
  399. txn.Success = append(txn.Success, &pb.RequestOp{
  400. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  401. _, err := kvc.Txn(context.TODO(), txn)
  402. return err
  403. },
  404. nil,
  405. lresp.ID,
  406. },
  407. { // non-empty value with ignore_value should error
  408. func() error {
  409. preq := putReq
  410. preq.IgnoreValue = true
  411. _, err := kvc.Put(context.TODO(), &preq)
  412. return err
  413. },
  414. rpctypes.ErrGRPCValueProvided,
  415. 0,
  416. },
  417. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  418. func() error {
  419. preq := putReq
  420. preq.Value = nil
  421. preq.IgnoreValue = true
  422. presp, err := kvc.Put(context.TODO(), &preq)
  423. if err != nil {
  424. return err
  425. }
  426. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  427. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  428. }
  429. return nil
  430. },
  431. nil,
  432. 0,
  433. },
  434. { // revoke lease, ensure detached key doesn't get deleted
  435. func() error {
  436. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  437. return err
  438. },
  439. nil,
  440. 0,
  441. },
  442. }
  443. for i, tt := range tests {
  444. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  445. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  446. }
  447. if tt.putErr != nil {
  448. continue
  449. }
  450. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  451. if err != nil {
  452. t.Fatalf("#%d: %v", i, err)
  453. }
  454. if len(rr.Kvs) != 1 {
  455. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  456. }
  457. if !bytes.Equal(rr.Kvs[0].Value, val) {
  458. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  459. }
  460. if rr.Kvs[0].Lease != tt.wleaseID {
  461. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  462. }
  463. }
  464. }
  465. // TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
  466. func TestV3PutIgnoreLease(t *testing.T) {
  467. defer testutil.AfterTest(t)
  468. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  469. defer clus.Terminate(t)
  470. kvc := toGRPC(clus.RandClient()).KV
  471. // create lease
  472. lc := toGRPC(clus.RandClient()).Lease
  473. lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
  474. if err != nil {
  475. t.Fatal(err)
  476. }
  477. if lresp.Error != "" {
  478. t.Fatal(lresp.Error)
  479. }
  480. key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
  481. putReq := pb.PutRequest{Key: key, Value: val}
  482. tests := []struct {
  483. putFunc func() error
  484. putErr error
  485. wleaseID int64
  486. wvalue []byte
  487. }{
  488. { // put failure for non-existent key
  489. func() error {
  490. preq := putReq
  491. preq.IgnoreLease = true
  492. _, err := kvc.Put(context.TODO(), &preq)
  493. return err
  494. },
  495. rpctypes.ErrGRPCKeyNotFound,
  496. 0,
  497. nil,
  498. },
  499. { // txn failure for non-existent key
  500. func() error {
  501. preq := putReq
  502. preq.IgnoreLease = true
  503. txn := &pb.TxnRequest{}
  504. txn.Success = append(txn.Success, &pb.RequestOp{
  505. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  506. _, err := kvc.Txn(context.TODO(), txn)
  507. return err
  508. },
  509. rpctypes.ErrGRPCKeyNotFound,
  510. 0,
  511. nil,
  512. },
  513. { // put success
  514. func() error {
  515. preq := putReq
  516. preq.Lease = lresp.ID
  517. _, err := kvc.Put(context.TODO(), &preq)
  518. return err
  519. },
  520. nil,
  521. lresp.ID,
  522. val,
  523. },
  524. { // txn success, modify value using 'ignore_lease' and ensure lease is not detached
  525. func() error {
  526. preq := putReq
  527. preq.Value = val1
  528. preq.IgnoreLease = true
  529. txn := &pb.TxnRequest{}
  530. txn.Success = append(txn.Success, &pb.RequestOp{
  531. Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
  532. _, err := kvc.Txn(context.TODO(), txn)
  533. return err
  534. },
  535. nil,
  536. lresp.ID,
  537. val1,
  538. },
  539. { // non-empty lease with ignore_lease should error
  540. func() error {
  541. preq := putReq
  542. preq.Lease = lresp.ID
  543. preq.IgnoreLease = true
  544. _, err := kvc.Put(context.TODO(), &preq)
  545. return err
  546. },
  547. rpctypes.ErrGRPCLeaseProvided,
  548. 0,
  549. nil,
  550. },
  551. { // overwrite with previous value, ensure no prev-kv is returned and lease is detached
  552. func() error {
  553. presp, err := kvc.Put(context.TODO(), &putReq)
  554. if err != nil {
  555. return err
  556. }
  557. if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
  558. return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
  559. }
  560. return nil
  561. },
  562. nil,
  563. 0,
  564. val,
  565. },
  566. { // revoke lease, ensure detached key doesn't get deleted
  567. func() error {
  568. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
  569. return err
  570. },
  571. nil,
  572. 0,
  573. val,
  574. },
  575. }
  576. for i, tt := range tests {
  577. if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
  578. t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
  579. }
  580. if tt.putErr != nil {
  581. continue
  582. }
  583. rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
  584. if err != nil {
  585. t.Fatalf("#%d: %v", i, err)
  586. }
  587. if len(rr.Kvs) != 1 {
  588. t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
  589. }
  590. if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
  591. t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
  592. }
  593. if rr.Kvs[0].Lease != tt.wleaseID {
  594. t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
  595. }
  596. }
  597. }
  598. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  599. func TestV3PutMissingLease(t *testing.T) {
  600. defer testutil.AfterTest(t)
  601. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  602. defer clus.Terminate(t)
  603. kvc := toGRPC(clus.RandClient()).KV
  604. key := []byte("foo")
  605. preq := &pb.PutRequest{Key: key, Lease: 123456}
  606. tests := []func(){
  607. // put case
  608. func() {
  609. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  610. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  611. }
  612. },
  613. // txn success case
  614. func() {
  615. txn := &pb.TxnRequest{}
  616. txn.Success = append(txn.Success, &pb.RequestOp{
  617. Request: &pb.RequestOp_RequestPut{
  618. RequestPut: preq}})
  619. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  620. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  621. }
  622. },
  623. // txn failure case
  624. func() {
  625. txn := &pb.TxnRequest{}
  626. txn.Failure = append(txn.Failure, &pb.RequestOp{
  627. Request: &pb.RequestOp_RequestPut{
  628. RequestPut: preq}})
  629. cmp := &pb.Compare{
  630. Result: pb.Compare_GREATER,
  631. Target: pb.Compare_CREATE,
  632. Key: []byte("bar"),
  633. }
  634. txn.Compare = append(txn.Compare, cmp)
  635. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  636. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  637. }
  638. },
  639. // ignore bad lease in failure on success txn
  640. func() {
  641. txn := &pb.TxnRequest{}
  642. rreq := &pb.RangeRequest{Key: []byte("bar")}
  643. txn.Success = append(txn.Success, &pb.RequestOp{
  644. Request: &pb.RequestOp_RequestRange{
  645. RequestRange: rreq}})
  646. txn.Failure = append(txn.Failure, &pb.RequestOp{
  647. Request: &pb.RequestOp_RequestPut{
  648. RequestPut: preq}})
  649. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  650. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  651. }
  652. },
  653. }
  654. for i, f := range tests {
  655. f()
  656. // key shouldn't have been stored
  657. rreq := &pb.RangeRequest{Key: key}
  658. rresp, err := kvc.Range(context.TODO(), rreq)
  659. if err != nil {
  660. t.Errorf("#%d. could not rangereq (%v)", i, err)
  661. } else if len(rresp.Kvs) != 0 {
  662. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  663. }
  664. }
  665. }
  666. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  667. func TestV3DeleteRange(t *testing.T) {
  668. defer testutil.AfterTest(t)
  669. tests := []struct {
  670. keySet []string
  671. begin string
  672. end string
  673. prevKV bool
  674. wantSet [][]byte
  675. deleted int64
  676. }{
  677. // delete middle
  678. {
  679. []string{"foo", "foo/abc", "fop"},
  680. "foo/", "fop", false,
  681. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  682. },
  683. // no delete
  684. {
  685. []string{"foo", "foo/abc", "fop"},
  686. "foo/", "foo/", false,
  687. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
  688. },
  689. // delete first
  690. {
  691. []string{"foo", "foo/abc", "fop"},
  692. "fo", "fop", false,
  693. [][]byte{[]byte("fop")}, 2,
  694. },
  695. // delete tail
  696. {
  697. []string{"foo", "foo/abc", "fop"},
  698. "foo/", "fos", false,
  699. [][]byte{[]byte("foo")}, 2,
  700. },
  701. // delete exact
  702. {
  703. []string{"foo", "foo/abc", "fop"},
  704. "foo/abc", "", false,
  705. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  706. },
  707. // delete none, [x,x)
  708. {
  709. []string{"foo"},
  710. "foo", "foo", false,
  711. [][]byte{[]byte("foo")}, 0,
  712. },
  713. // delete middle with preserveKVs set
  714. {
  715. []string{"foo", "foo/abc", "fop"},
  716. "foo/", "fop", true,
  717. [][]byte{[]byte("foo"), []byte("fop")}, 1,
  718. },
  719. }
  720. for i, tt := range tests {
  721. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  722. kvc := toGRPC(clus.RandClient()).KV
  723. ks := tt.keySet
  724. for j := range ks {
  725. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  726. _, err := kvc.Put(context.TODO(), reqput)
  727. if err != nil {
  728. t.Fatalf("couldn't put key (%v)", err)
  729. }
  730. }
  731. dreq := &pb.DeleteRangeRequest{
  732. Key: []byte(tt.begin),
  733. RangeEnd: []byte(tt.end),
  734. PrevKv: tt.prevKV,
  735. }
  736. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  737. if err != nil {
  738. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  739. }
  740. if tt.deleted != dresp.Deleted {
  741. t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
  742. }
  743. if tt.prevKV {
  744. if len(dresp.PrevKvs) != int(dresp.Deleted) {
  745. t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
  746. }
  747. }
  748. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  749. rresp, err := kvc.Range(context.TODO(), rreq)
  750. if err != nil {
  751. t.Errorf("couldn't get range on test %v (%v)", i, err)
  752. }
  753. if dresp.Header.Revision != rresp.Header.Revision {
  754. t.Errorf("expected revision %v, got %v",
  755. dresp.Header.Revision, rresp.Header.Revision)
  756. }
  757. keys := [][]byte{}
  758. for j := range rresp.Kvs {
  759. keys = append(keys, rresp.Kvs[j].Key)
  760. }
  761. if !reflect.DeepEqual(tt.wantSet, keys) {
  762. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  763. }
  764. // can't defer because tcp ports will be in use
  765. clus.Terminate(t)
  766. }
  767. }
  768. // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
  769. func TestV3TxnInvalidRange(t *testing.T) {
  770. defer testutil.AfterTest(t)
  771. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  772. defer clus.Terminate(t)
  773. kvc := toGRPC(clus.RandClient()).KV
  774. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  775. for i := 0; i < 3; i++ {
  776. _, err := kvc.Put(context.Background(), preq)
  777. if err != nil {
  778. t.Fatalf("couldn't put key (%v)", err)
  779. }
  780. }
  781. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  782. if err != nil {
  783. t.Fatalf("couldn't compact kv space (%v)", err)
  784. }
  785. // future rev
  786. txn := &pb.TxnRequest{}
  787. txn.Success = append(txn.Success, &pb.RequestOp{
  788. Request: &pb.RequestOp_RequestPut{
  789. RequestPut: preq}})
  790. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  791. txn.Success = append(txn.Success, &pb.RequestOp{
  792. Request: &pb.RequestOp_RequestRange{
  793. RequestRange: rreq}})
  794. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
  795. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
  796. }
  797. // compacted rev
  798. tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
  799. tv.RequestRange.Revision = 1
  800. if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
  801. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
  802. }
  803. }
  804. func TestV3TooLargeRequest(t *testing.T) {
  805. defer testutil.AfterTest(t)
  806. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  807. defer clus.Terminate(t)
  808. kvc := toGRPC(clus.RandClient()).KV
  809. // 2MB request value
  810. largeV := make([]byte, 2*1024*1024)
  811. preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
  812. _, err := kvc.Put(context.Background(), preq)
  813. if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
  814. t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
  815. }
  816. }
  817. // TestV3Hash tests hash.
  818. func TestV3Hash(t *testing.T) {
  819. defer testutil.AfterTest(t)
  820. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  821. defer clus.Terminate(t)
  822. cli := clus.RandClient()
  823. kvc := toGRPC(cli).KV
  824. m := toGRPC(cli).Maintenance
  825. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  826. for i := 0; i < 3; i++ {
  827. _, err := kvc.Put(context.Background(), preq)
  828. if err != nil {
  829. t.Fatalf("couldn't put key (%v)", err)
  830. }
  831. }
  832. resp, err := m.Hash(context.Background(), &pb.HashRequest{})
  833. if err != nil || resp.Hash == 0 {
  834. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  835. }
  836. }
  837. // TestV3HashRestart ensures that hash stays the same after restart.
  838. func TestV3HashRestart(t *testing.T) {
  839. defer testutil.AfterTest(t)
  840. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  841. defer clus.Terminate(t)
  842. cli := clus.RandClient()
  843. resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  844. if err != nil || resp.Hash == 0 {
  845. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  846. }
  847. hash1 := resp.Hash
  848. clus.Members[0].Stop(t)
  849. clus.Members[0].Restart(t)
  850. clus.waitLeader(t, clus.Members)
  851. kvc := toGRPC(clus.Client(0)).KV
  852. waitForRestart(t, kvc)
  853. cli = clus.RandClient()
  854. resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
  855. if err != nil || resp.Hash == 0 {
  856. t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
  857. }
  858. hash2 := resp.Hash
  859. if hash1 != hash2 {
  860. t.Fatalf("hash expected %d, got %d", hash1, hash2)
  861. }
  862. }
  863. // TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
  864. func TestV3StorageQuotaAPI(t *testing.T) {
  865. defer testutil.AfterTest(t)
  866. quotasize := int64(16 * os.Getpagesize())
  867. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  868. // Set a quota on one node
  869. clus.Members[0].QuotaBackendBytes = quotasize
  870. clus.Members[0].Stop(t)
  871. clus.Members[0].Restart(t)
  872. defer clus.Terminate(t)
  873. kvc := toGRPC(clus.Client(0)).KV
  874. waitForRestart(t, kvc)
  875. key := []byte("abc")
  876. // test small put that fits in quota
  877. smallbuf := make([]byte, 512)
  878. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  879. t.Fatal(err)
  880. }
  881. // test big put
  882. bigbuf := make([]byte, quotasize)
  883. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  884. if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  885. t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  886. }
  887. // test big txn
  888. puttxn := &pb.RequestOp{
  889. Request: &pb.RequestOp_RequestPut{
  890. RequestPut: &pb.PutRequest{
  891. Key: key,
  892. Value: bigbuf,
  893. },
  894. },
  895. }
  896. txnreq := &pb.TxnRequest{}
  897. txnreq.Success = append(txnreq.Success, puttxn)
  898. _, txnerr := kvc.Txn(context.TODO(), txnreq)
  899. if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
  900. t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  901. }
  902. }
  903. // TestV3StorageQuotaApply tests the V3 server respects quotas during apply
  904. func TestV3StorageQuotaApply(t *testing.T) {
  905. testutil.AfterTest(t)
  906. quotasize := int64(16 * os.Getpagesize())
  907. clus := NewClusterV3(t, &ClusterConfig{Size: 2})
  908. defer clus.Terminate(t)
  909. kvc0 := toGRPC(clus.Client(0)).KV
  910. kvc1 := toGRPC(clus.Client(1)).KV
  911. // Set a quota on one node
  912. clus.Members[0].QuotaBackendBytes = quotasize
  913. clus.Members[0].Stop(t)
  914. clus.Members[0].Restart(t)
  915. clus.waitLeader(t, clus.Members)
  916. waitForRestart(t, kvc0)
  917. key := []byte("abc")
  918. // test small put still works
  919. smallbuf := make([]byte, 1024)
  920. _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  921. if serr != nil {
  922. t.Fatal(serr)
  923. }
  924. // test big put
  925. bigbuf := make([]byte, quotasize)
  926. _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
  927. if err != nil {
  928. t.Fatal(err)
  929. }
  930. // quorum get should work regardless of whether alarm is raised
  931. _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
  932. if err != nil {
  933. t.Fatal(err)
  934. }
  935. // wait until alarm is raised for sure-- poll the alarms
  936. stopc := time.After(5 * time.Second)
  937. for {
  938. req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
  939. resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
  940. if aerr != nil {
  941. t.Fatal(aerr)
  942. }
  943. if len(resp.Alarms) != 0 {
  944. break
  945. }
  946. select {
  947. case <-stopc:
  948. t.Fatalf("timed out waiting for alarm")
  949. case <-time.After(10 * time.Millisecond):
  950. }
  951. }
  952. // small quota machine should reject put
  953. if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  954. t.Fatalf("past-quota instance should reject put")
  955. }
  956. // large quota machine should reject put
  957. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  958. t.Fatalf("past-quota instance should reject put")
  959. }
  960. // reset large quota node to ensure alarm persisted
  961. clus.Members[1].Stop(t)
  962. clus.Members[1].Restart(t)
  963. clus.waitLeader(t, clus.Members)
  964. if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
  965. t.Fatalf("alarmed instance should reject put after reset")
  966. }
  967. }
  968. // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
  969. func TestV3AlarmDeactivate(t *testing.T) {
  970. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  971. defer clus.Terminate(t)
  972. kvc := toGRPC(clus.RandClient()).KV
  973. mt := toGRPC(clus.RandClient()).Maintenance
  974. alarmReq := &pb.AlarmRequest{
  975. MemberID: 123,
  976. Action: pb.AlarmRequest_ACTIVATE,
  977. Alarm: pb.AlarmType_NOSPACE,
  978. }
  979. if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
  980. t.Fatal(err)
  981. }
  982. key := []byte("abc")
  983. smallbuf := make([]byte, 512)
  984. _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
  985. if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
  986. t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
  987. }
  988. alarmReq.Action = pb.AlarmRequest_DEACTIVATE
  989. if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
  990. t.Fatal(err)
  991. }
  992. if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
  993. t.Fatal(err)
  994. }
  995. }
  996. func TestV3RangeRequest(t *testing.T) {
  997. defer testutil.AfterTest(t)
  998. tests := []struct {
  999. putKeys []string
  1000. reqs []pb.RangeRequest
  1001. wresps [][]string
  1002. wmores []bool
  1003. }{
  1004. // single key
  1005. {
  1006. []string{"foo", "bar"},
  1007. []pb.RangeRequest{
  1008. // exists
  1009. {Key: []byte("foo")},
  1010. // doesn't exist
  1011. {Key: []byte("baz")},
  1012. },
  1013. [][]string{
  1014. {"foo"},
  1015. {},
  1016. },
  1017. []bool{false, false},
  1018. },
  1019. // multi-key
  1020. {
  1021. []string{"a", "b", "c", "d", "e"},
  1022. []pb.RangeRequest{
  1023. // all in range
  1024. {Key: []byte("a"), RangeEnd: []byte("z")},
  1025. // [b, d)
  1026. {Key: []byte("b"), RangeEnd: []byte("d")},
  1027. // out of range
  1028. {Key: []byte("f"), RangeEnd: []byte("z")},
  1029. // [c,c) = empty
  1030. {Key: []byte("c"), RangeEnd: []byte("c")},
  1031. // [d, b) = empty
  1032. {Key: []byte("d"), RangeEnd: []byte("b")},
  1033. // ["\0", "\0") => all in range
  1034. {Key: []byte{0}, RangeEnd: []byte{0}},
  1035. },
  1036. [][]string{
  1037. {"a", "b", "c", "d", "e"},
  1038. {"b", "c"},
  1039. {},
  1040. {},
  1041. {},
  1042. {"a", "b", "c", "d", "e"},
  1043. },
  1044. []bool{false, false, false, false, false, false},
  1045. },
  1046. // revision
  1047. {
  1048. []string{"a", "b", "c", "d", "e"},
  1049. []pb.RangeRequest{
  1050. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1051. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1052. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1053. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1054. },
  1055. [][]string{
  1056. {"a", "b", "c", "d", "e"},
  1057. {},
  1058. {"a"},
  1059. {"a", "b"},
  1060. },
  1061. []bool{false, false, false, false},
  1062. },
  1063. // limit
  1064. {
  1065. []string{"foo", "bar"},
  1066. []pb.RangeRequest{
  1067. // more
  1068. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1069. // no more
  1070. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1071. },
  1072. [][]string{
  1073. {"bar"},
  1074. {"bar", "foo"},
  1075. },
  1076. []bool{true, false},
  1077. },
  1078. // sort
  1079. {
  1080. []string{"b", "a", "c", "d", "c"},
  1081. []pb.RangeRequest{
  1082. {
  1083. Key: []byte("a"), RangeEnd: []byte("z"),
  1084. Limit: 1,
  1085. SortOrder: pb.RangeRequest_ASCEND,
  1086. SortTarget: pb.RangeRequest_KEY,
  1087. },
  1088. {
  1089. Key: []byte("a"), RangeEnd: []byte("z"),
  1090. Limit: 1,
  1091. SortOrder: pb.RangeRequest_DESCEND,
  1092. SortTarget: pb.RangeRequest_KEY,
  1093. },
  1094. {
  1095. Key: []byte("a"), RangeEnd: []byte("z"),
  1096. Limit: 1,
  1097. SortOrder: pb.RangeRequest_ASCEND,
  1098. SortTarget: pb.RangeRequest_CREATE,
  1099. },
  1100. {
  1101. Key: []byte("a"), RangeEnd: []byte("z"),
  1102. Limit: 1,
  1103. SortOrder: pb.RangeRequest_DESCEND,
  1104. SortTarget: pb.RangeRequest_MOD,
  1105. },
  1106. {
  1107. Key: []byte("z"), RangeEnd: []byte("z"),
  1108. Limit: 1,
  1109. SortOrder: pb.RangeRequest_DESCEND,
  1110. SortTarget: pb.RangeRequest_CREATE,
  1111. },
  1112. { // sort ASCEND by default
  1113. Key: []byte("a"), RangeEnd: []byte("z"),
  1114. Limit: 10,
  1115. SortOrder: pb.RangeRequest_NONE,
  1116. SortTarget: pb.RangeRequest_CREATE,
  1117. },
  1118. },
  1119. [][]string{
  1120. {"a"},
  1121. {"d"},
  1122. {"b"},
  1123. {"c"},
  1124. {},
  1125. {"b", "a", "c", "d"},
  1126. },
  1127. []bool{true, true, true, true, false, false},
  1128. },
  1129. // min/max mod rev
  1130. {
  1131. []string{"rev2", "rev3", "rev4", "rev5", "rev6"},
  1132. []pb.RangeRequest{
  1133. {
  1134. Key: []byte{0}, RangeEnd: []byte{0},
  1135. MinModRevision: 3,
  1136. },
  1137. {
  1138. Key: []byte{0}, RangeEnd: []byte{0},
  1139. MaxModRevision: 3,
  1140. },
  1141. {
  1142. Key: []byte{0}, RangeEnd: []byte{0},
  1143. MinModRevision: 3,
  1144. MaxModRevision: 5,
  1145. },
  1146. {
  1147. Key: []byte{0}, RangeEnd: []byte{0},
  1148. MaxModRevision: 10,
  1149. },
  1150. },
  1151. [][]string{
  1152. {"rev3", "rev4", "rev5", "rev6"},
  1153. {"rev2", "rev3"},
  1154. {"rev3", "rev4", "rev5"},
  1155. {"rev2", "rev3", "rev4", "rev5", "rev6"},
  1156. },
  1157. []bool{false, false, false, false},
  1158. },
  1159. // min/max create rev
  1160. {
  1161. []string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
  1162. []pb.RangeRequest{
  1163. {
  1164. Key: []byte{0}, RangeEnd: []byte{0},
  1165. MinCreateRevision: 3,
  1166. },
  1167. {
  1168. Key: []byte{0}, RangeEnd: []byte{0},
  1169. MaxCreateRevision: 3,
  1170. },
  1171. {
  1172. Key: []byte{0}, RangeEnd: []byte{0},
  1173. MinCreateRevision: 3,
  1174. MaxCreateRevision: 5,
  1175. },
  1176. {
  1177. Key: []byte{0}, RangeEnd: []byte{0},
  1178. MaxCreateRevision: 10,
  1179. },
  1180. },
  1181. [][]string{
  1182. {"rev3", "rev6"},
  1183. {"rev2", "rev3"},
  1184. {"rev3"},
  1185. {"rev2", "rev3", "rev6"},
  1186. },
  1187. []bool{false, false, false, false},
  1188. },
  1189. }
  1190. for i, tt := range tests {
  1191. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1192. for _, k := range tt.putKeys {
  1193. kvc := toGRPC(clus.RandClient()).KV
  1194. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1195. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1196. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1197. }
  1198. }
  1199. for j, req := range tt.reqs {
  1200. kvc := toGRPC(clus.RandClient()).KV
  1201. resp, err := kvc.Range(context.TODO(), &req)
  1202. if err != nil {
  1203. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1204. continue
  1205. }
  1206. if len(resp.Kvs) != len(tt.wresps[j]) {
  1207. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1208. continue
  1209. }
  1210. for k, wKey := range tt.wresps[j] {
  1211. respKey := string(resp.Kvs[k].Key)
  1212. if respKey != wKey {
  1213. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1214. }
  1215. }
  1216. if resp.More != tt.wmores[j] {
  1217. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1218. }
  1219. wrev := int64(len(tt.putKeys) + 1)
  1220. if resp.Header.Revision != wrev {
  1221. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1222. }
  1223. }
  1224. clus.Terminate(t)
  1225. }
  1226. }
  1227. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1228. cfg.UseGRPC = true
  1229. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1230. clus.Launch(t)
  1231. return clus
  1232. }
  1233. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1234. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1235. defer testutil.AfterTest(t)
  1236. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1237. clus := newClusterV3NoClients(t, &cfg)
  1238. defer clus.Terminate(t)
  1239. // nil out TLS field so client will use an insecure connection
  1240. clus.Members[0].ClientTLSInfo = nil
  1241. client, err := NewClientV3(clus.Members[0])
  1242. if err != nil && err != context.DeadlineExceeded {
  1243. t.Fatalf("unexpected error (%v)", err)
  1244. } else if client == nil {
  1245. // Ideally, no client would be returned. However, grpc will
  1246. // return a connection without trying to handshake first so
  1247. // the connection appears OK.
  1248. return
  1249. }
  1250. defer client.Close()
  1251. donec := make(chan error, 1)
  1252. go func() {
  1253. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1254. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1255. _, perr := toGRPC(client).KV.Put(ctx, reqput)
  1256. cancel()
  1257. donec <- perr
  1258. }()
  1259. if perr := <-donec; perr == nil {
  1260. t.Fatalf("expected client error on put")
  1261. }
  1262. }
  1263. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1264. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1265. defer testutil.AfterTest(t)
  1266. cfg := ClusterConfig{Size: 3}
  1267. clus := newClusterV3NoClients(t, &cfg)
  1268. defer clus.Terminate(t)
  1269. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1270. client, err := NewClientV3(clus.Members[0])
  1271. if client != nil || err == nil {
  1272. t.Fatalf("expected no client")
  1273. } else if err != context.DeadlineExceeded {
  1274. t.Fatalf("unexpected error (%v)", err)
  1275. }
  1276. }
  1277. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1278. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1279. defer testutil.AfterTest(t)
  1280. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1281. clus := newClusterV3NoClients(t, &cfg)
  1282. defer clus.Terminate(t)
  1283. client, err := NewClientV3(clus.Members[0])
  1284. if err != nil {
  1285. t.Fatalf("expected tls client (%v)", err)
  1286. }
  1287. defer client.Close()
  1288. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1289. if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
  1290. t.Fatalf("unexpected error on put over tls (%v)", err)
  1291. }
  1292. }
  1293. // TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
  1294. // when all certs are atomically replaced by directory renaming.
  1295. // And expects server to reject client requests, and vice versa.
  1296. func TestTLSReloadAtomicReplace(t *testing.T) {
  1297. tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
  1298. if err != nil {
  1299. t.Fatal(err)
  1300. }
  1301. os.RemoveAll(tmpDir)
  1302. defer os.RemoveAll(tmpDir)
  1303. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1304. if err != nil {
  1305. t.Fatal(err)
  1306. }
  1307. defer os.RemoveAll(certsDir)
  1308. certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
  1309. if err != nil {
  1310. t.Fatal(err)
  1311. }
  1312. defer os.RemoveAll(certsDirExp)
  1313. cloneFunc := func() transport.TLSInfo {
  1314. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1315. if terr != nil {
  1316. t.Fatal(terr)
  1317. }
  1318. if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
  1319. t.Fatal(err)
  1320. }
  1321. return tlsInfo
  1322. }
  1323. replaceFunc := func() {
  1324. if err = os.Rename(certsDir, tmpDir); err != nil {
  1325. t.Fatal(err)
  1326. }
  1327. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1328. t.Fatal(err)
  1329. }
  1330. // after rename,
  1331. // 'certsDir' contains expired certs
  1332. // 'tmpDir' contains valid certs
  1333. // 'certsDirExp' does not exist
  1334. }
  1335. revertFunc := func() {
  1336. if err = os.Rename(tmpDir, certsDirExp); err != nil {
  1337. t.Fatal(err)
  1338. }
  1339. if err = os.Rename(certsDir, tmpDir); err != nil {
  1340. t.Fatal(err)
  1341. }
  1342. if err = os.Rename(certsDirExp, certsDir); err != nil {
  1343. t.Fatal(err)
  1344. }
  1345. }
  1346. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1347. }
  1348. // TestTLSReloadCopy ensures server reloads expired/valid certs
  1349. // when new certs are copied over, one by one. And expects server
  1350. // to reject client requests, and vice versa.
  1351. func TestTLSReloadCopy(t *testing.T) {
  1352. certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
  1353. if err != nil {
  1354. t.Fatal(err)
  1355. }
  1356. defer os.RemoveAll(certsDir)
  1357. cloneFunc := func() transport.TLSInfo {
  1358. tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
  1359. if terr != nil {
  1360. t.Fatal(terr)
  1361. }
  1362. return tlsInfo
  1363. }
  1364. replaceFunc := func() {
  1365. if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
  1366. t.Fatal(err)
  1367. }
  1368. }
  1369. revertFunc := func() {
  1370. if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
  1371. t.Fatal(err)
  1372. }
  1373. }
  1374. testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
  1375. }
  1376. func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
  1377. defer testutil.AfterTest(t)
  1378. // 1. separate copies for TLS assets modification
  1379. tlsInfo := cloneFunc()
  1380. // 2. start cluster with valid certs
  1381. clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
  1382. defer clus.Terminate(t)
  1383. // 3. concurrent client dialing while certs become expired
  1384. errc := make(chan error, 1)
  1385. go func() {
  1386. for {
  1387. cc, err := tlsInfo.ClientConfig()
  1388. if err != nil {
  1389. // errors in 'go/src/crypto/tls/tls.go'
  1390. // tls: private key does not match public key
  1391. // tls: failed to find any PEM data in key input
  1392. // tls: failed to find any PEM data in certificate input
  1393. // Or 'does not exist', 'not found', etc
  1394. t.Log(err)
  1395. continue
  1396. }
  1397. cli, cerr := clientv3.New(clientv3.Config{
  1398. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1399. DialTimeout: time.Second,
  1400. TLS: cc,
  1401. })
  1402. if cerr != nil {
  1403. errc <- cerr
  1404. return
  1405. }
  1406. cli.Close()
  1407. }
  1408. }()
  1409. // 4. replace certs with expired ones
  1410. replaceFunc()
  1411. // 5. expect dial time-out when loading expired certs
  1412. select {
  1413. case gerr := <-errc:
  1414. if gerr != context.DeadlineExceeded {
  1415. t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
  1416. }
  1417. case <-time.After(5 * time.Second):
  1418. t.Fatal("failed to receive dial timeout error")
  1419. }
  1420. // 6. replace expired certs back with valid ones
  1421. revertFunc()
  1422. // 7. new requests should trigger listener to reload valid certs
  1423. tls, terr := tlsInfo.ClientConfig()
  1424. if terr != nil {
  1425. t.Fatal(terr)
  1426. }
  1427. cl, cerr := clientv3.New(clientv3.Config{
  1428. Endpoints: []string{clus.Members[0].GRPCAddr()},
  1429. DialTimeout: time.Second,
  1430. TLS: tls,
  1431. })
  1432. if cerr != nil {
  1433. t.Fatalf("expected no error, got %v", cerr)
  1434. }
  1435. cl.Close()
  1436. }
  1437. func TestGRPCRequireLeader(t *testing.T) {
  1438. defer testutil.AfterTest(t)
  1439. cfg := ClusterConfig{Size: 3}
  1440. clus := newClusterV3NoClients(t, &cfg)
  1441. defer clus.Terminate(t)
  1442. clus.Members[1].Stop(t)
  1443. clus.Members[2].Stop(t)
  1444. client, err := NewClientV3(clus.Members[0])
  1445. if err != nil {
  1446. t.Fatalf("cannot create client: %v", err)
  1447. }
  1448. defer client.Close()
  1449. // wait for election timeout, then member[0] will not have a leader.
  1450. time.Sleep(time.Duration(3*electionTicks) * tickDuration)
  1451. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1452. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1453. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1454. if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1455. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1456. }
  1457. }
  1458. func TestGRPCStreamRequireLeader(t *testing.T) {
  1459. defer testutil.AfterTest(t)
  1460. cfg := ClusterConfig{Size: 3}
  1461. clus := newClusterV3NoClients(t, &cfg)
  1462. defer clus.Terminate(t)
  1463. client, err := NewClientV3(clus.Members[0])
  1464. if err != nil {
  1465. t.Fatalf("failed to create client (%v)", err)
  1466. }
  1467. defer client.Close()
  1468. wAPI := toGRPC(client).Watch
  1469. md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
  1470. ctx := metadata.NewOutgoingContext(context.Background(), md)
  1471. wStream, err := wAPI.Watch(ctx)
  1472. if err != nil {
  1473. t.Fatalf("wAPI.Watch error: %v", err)
  1474. }
  1475. clus.Members[1].Stop(t)
  1476. clus.Members[2].Stop(t)
  1477. // existing stream should be rejected
  1478. _, err = wStream.Recv()
  1479. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1480. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1481. }
  1482. // new stream should also be rejected
  1483. wStream, err = wAPI.Watch(ctx)
  1484. if err != nil {
  1485. t.Fatalf("wAPI.Watch error: %v", err)
  1486. }
  1487. _, err = wStream.Recv()
  1488. if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
  1489. t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
  1490. }
  1491. clus.Members[1].Restart(t)
  1492. clus.Members[2].Restart(t)
  1493. clus.waitLeader(t, clus.Members)
  1494. time.Sleep(time.Duration(2*electionTicks) * tickDuration)
  1495. // new stream should also be OK now after we restarted the other members
  1496. wStream, err = wAPI.Watch(ctx)
  1497. if err != nil {
  1498. t.Fatalf("wAPI.Watch error: %v", err)
  1499. }
  1500. wreq := &pb.WatchRequest{
  1501. RequestUnion: &pb.WatchRequest_CreateRequest{
  1502. CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
  1503. },
  1504. }
  1505. err = wStream.Send(wreq)
  1506. if err != nil {
  1507. t.Errorf("err = %v, want nil", err)
  1508. }
  1509. }
  1510. // TestV3LargeRequests ensures that configurable MaxRequestBytes works as intended.
  1511. func TestV3LargeRequests(t *testing.T) {
  1512. defer testutil.AfterTest(t)
  1513. tests := []struct {
  1514. maxRequestBytes uint
  1515. valueSize int
  1516. expectError error
  1517. }{
  1518. // don't set to 0. use 0 as the default.
  1519. {1, 1024, rpctypes.ErrGRPCRequestTooLarge},
  1520. {10 * 1024 * 1024, 9 * 1024 * 1024, nil},
  1521. {10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
  1522. {10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
  1523. }
  1524. for i, test := range tests {
  1525. clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
  1526. kvcli := toGRPC(clus.Client(0)).KV
  1527. reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
  1528. _, err := kvcli.Put(context.TODO(), reqput)
  1529. if !eqErrGRPC(err, test.expectError) {
  1530. t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
  1531. }
  1532. // request went through, expect large response back from server
  1533. if test.expectError == nil {
  1534. reqget := &pb.RangeRequest{Key: []byte("foo")}
  1535. // limit receive call size with original value + gRPC overhead bytes
  1536. _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
  1537. if err != nil {
  1538. t.Errorf("#%d: range expected no error, got %v", i, err)
  1539. }
  1540. }
  1541. clus.Terminate(t)
  1542. }
  1543. }
  1544. func eqErrGRPC(err1 error, err2 error) bool {
  1545. return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
  1546. }
  1547. // waitForRestart tries a range request until the client's server responds.
  1548. // This is mainly a stop-gap function until grpcproxy's KVClient adapter
  1549. // (and by extension, clientv3) supports grpc.CallOption pass-through so
  1550. // FailFast=false works with Put.
  1551. func waitForRestart(t *testing.T, kvc pb.KVClient) {
  1552. req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
  1553. if _, err := kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
  1554. t.Fatal(err)
  1555. }
  1556. }