v3_grpc_test.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.package recipe
  14. package integration
  15. import (
  16. "bytes"
  17. "fmt"
  18. "reflect"
  19. "sort"
  20. "sync"
  21. "testing"
  22. "time"
  23. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc"
  26. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/lease"
  28. "github.com/coreos/etcd/pkg/testutil"
  29. "github.com/coreos/etcd/storage/storagepb"
  30. )
  31. // TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
  32. // overwrites it, then checks that the change was applied.
  33. func TestV3PutOverwrite(t *testing.T) {
  34. defer testutil.AfterTest(t)
  35. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  36. defer clus.Terminate(t)
  37. kvc := clus.RandClient().KV
  38. key := []byte("foo")
  39. reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
  40. respput, err := kvc.Put(context.TODO(), reqput)
  41. if err != nil {
  42. t.Fatalf("couldn't put key (%v)", err)
  43. }
  44. // overwrite
  45. reqput.Value = []byte("baz")
  46. respput2, err := kvc.Put(context.TODO(), reqput)
  47. if err != nil {
  48. t.Fatalf("couldn't put key (%v)", err)
  49. }
  50. if respput2.Header.Revision <= respput.Header.Revision {
  51. t.Fatalf("expected newer revision on overwrite, got %v <= %v",
  52. respput2.Header.Revision, respput.Header.Revision)
  53. }
  54. reqrange := &pb.RangeRequest{Key: key}
  55. resprange, err := kvc.Range(context.TODO(), reqrange)
  56. if err != nil {
  57. t.Fatalf("couldn't get key (%v)", err)
  58. }
  59. if len(resprange.Kvs) != 1 {
  60. t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
  61. }
  62. kv := resprange.Kvs[0]
  63. if kv.ModRevision <= kv.CreateRevision {
  64. t.Errorf("expected modRev > createRev, got %d <= %d",
  65. kv.ModRevision, kv.CreateRevision)
  66. }
  67. if !reflect.DeepEqual(reqput.Value, kv.Value) {
  68. t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
  69. }
  70. }
  71. func TestV3TxnTooManyOps(t *testing.T) {
  72. defer testutil.AfterTest(t)
  73. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  74. defer clus.Terminate(t)
  75. kvc := clus.RandClient().KV
  76. // unique keys
  77. i := new(int)
  78. keyf := func() []byte {
  79. *i++
  80. return []byte(fmt.Sprintf("key-%d", i))
  81. }
  82. addCompareOps := func(txn *pb.TxnRequest) {
  83. txn.Compare = append(txn.Compare,
  84. &pb.Compare{
  85. Result: pb.Compare_GREATER,
  86. Target: pb.Compare_CREATE,
  87. Key: keyf(),
  88. })
  89. }
  90. addSuccessOps := func(txn *pb.TxnRequest) {
  91. txn.Success = append(txn.Success,
  92. &pb.RequestUnion{
  93. Request: &pb.RequestUnion_RequestPut{
  94. RequestPut: &pb.PutRequest{
  95. Key: keyf(),
  96. Value: []byte("bar"),
  97. },
  98. },
  99. })
  100. }
  101. addFailureOps := func(txn *pb.TxnRequest) {
  102. txn.Failure = append(txn.Failure,
  103. &pb.RequestUnion{
  104. Request: &pb.RequestUnion_RequestPut{
  105. RequestPut: &pb.PutRequest{
  106. Key: keyf(),
  107. Value: []byte("bar"),
  108. },
  109. },
  110. })
  111. }
  112. tests := []func(txn *pb.TxnRequest){
  113. addCompareOps,
  114. addSuccessOps,
  115. addFailureOps,
  116. }
  117. for i, tt := range tests {
  118. txn := &pb.TxnRequest{}
  119. for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
  120. tt(txn)
  121. }
  122. _, err := kvc.Txn(context.Background(), txn)
  123. if err != v3rpc.ErrTooManyOps {
  124. t.Errorf("#%d: err = %v, want %v", i, err, v3rpc.ErrTooManyOps)
  125. }
  126. }
  127. }
  128. func TestV3TxnDuplicateKeys(t *testing.T) {
  129. defer testutil.AfterTest(t)
  130. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  131. defer clus.Terminate(t)
  132. putreq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
  133. delKeyReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
  134. RequestDeleteRange: &pb.DeleteRangeRequest{
  135. Key: []byte("abc"),
  136. },
  137. },
  138. }
  139. delInRangeReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
  140. RequestDeleteRange: &pb.DeleteRangeRequest{
  141. Key: []byte("a"), RangeEnd: []byte("b"),
  142. },
  143. },
  144. }
  145. delOutOfRangeReq := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
  146. RequestDeleteRange: &pb.DeleteRangeRequest{
  147. Key: []byte("abb"), RangeEnd: []byte("abc"),
  148. },
  149. },
  150. }
  151. kvc := clus.RandClient().KV
  152. tests := []struct {
  153. txnSuccess []*pb.RequestUnion
  154. werr error
  155. }{
  156. {
  157. txnSuccess: []*pb.RequestUnion{putreq, putreq},
  158. werr: v3rpc.ErrDuplicateKey,
  159. },
  160. {
  161. txnSuccess: []*pb.RequestUnion{putreq, delKeyReq},
  162. werr: v3rpc.ErrDuplicateKey,
  163. },
  164. {
  165. txnSuccess: []*pb.RequestUnion{putreq, delInRangeReq},
  166. werr: v3rpc.ErrDuplicateKey,
  167. },
  168. {
  169. txnSuccess: []*pb.RequestUnion{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
  170. werr: nil,
  171. },
  172. {
  173. txnSuccess: []*pb.RequestUnion{putreq, delOutOfRangeReq},
  174. werr: nil,
  175. },
  176. }
  177. for i, tt := range tests {
  178. txn := &pb.TxnRequest{Success: tt.txnSuccess}
  179. _, err := kvc.Txn(context.Background(), txn)
  180. if err != tt.werr {
  181. t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
  182. }
  183. }
  184. }
  185. // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
  186. func TestV3PutMissingLease(t *testing.T) {
  187. defer testutil.AfterTest(t)
  188. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  189. defer clus.Terminate(t)
  190. kvc := clus.RandClient().KV
  191. key := []byte("foo")
  192. preq := &pb.PutRequest{Key: key, Lease: 123456}
  193. tests := []func(){
  194. // put case
  195. func() {
  196. if presp, err := kvc.Put(context.TODO(), preq); err == nil {
  197. t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
  198. }
  199. },
  200. // txn success case
  201. func() {
  202. txn := &pb.TxnRequest{}
  203. txn.Success = append(txn.Success, &pb.RequestUnion{
  204. Request: &pb.RequestUnion_RequestPut{
  205. RequestPut: preq}})
  206. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  207. t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
  208. }
  209. },
  210. // txn failure case
  211. func() {
  212. txn := &pb.TxnRequest{}
  213. txn.Failure = append(txn.Failure, &pb.RequestUnion{
  214. Request: &pb.RequestUnion_RequestPut{
  215. RequestPut: preq}})
  216. cmp := &pb.Compare{
  217. Result: pb.Compare_GREATER,
  218. Target: pb.Compare_CREATE,
  219. Key: []byte("bar"),
  220. }
  221. txn.Compare = append(txn.Compare, cmp)
  222. if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
  223. t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
  224. }
  225. },
  226. // ignore bad lease in failure on success txn
  227. func() {
  228. txn := &pb.TxnRequest{}
  229. rreq := &pb.RangeRequest{Key: []byte("bar")}
  230. txn.Success = append(txn.Success, &pb.RequestUnion{
  231. Request: &pb.RequestUnion_RequestRange{
  232. RequestRange: rreq}})
  233. txn.Failure = append(txn.Failure, &pb.RequestUnion{
  234. Request: &pb.RequestUnion_RequestPut{
  235. RequestPut: preq}})
  236. if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
  237. t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
  238. }
  239. },
  240. }
  241. for i, f := range tests {
  242. f()
  243. // key shouldn't have been stored
  244. rreq := &pb.RangeRequest{Key: key}
  245. rresp, err := kvc.Range(context.TODO(), rreq)
  246. if err != nil {
  247. t.Errorf("#%d. could not rangereq (%v)", i, err)
  248. } else if len(rresp.Kvs) != 0 {
  249. t.Errorf("#%d. expected no keys, got %v", i, rresp)
  250. }
  251. }
  252. }
  253. // TestV3DeleteRange tests various edge cases in the DeleteRange API.
  254. func TestV3DeleteRange(t *testing.T) {
  255. defer testutil.AfterTest(t)
  256. tests := []struct {
  257. keySet []string
  258. begin string
  259. end string
  260. wantSet [][]byte
  261. }{
  262. // delete middle
  263. {
  264. []string{"foo", "foo/abc", "fop"},
  265. "foo/", "fop",
  266. [][]byte{[]byte("foo"), []byte("fop")},
  267. },
  268. // no delete
  269. {
  270. []string{"foo", "foo/abc", "fop"},
  271. "foo/", "foo/",
  272. [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")},
  273. },
  274. // delete first
  275. {
  276. []string{"foo", "foo/abc", "fop"},
  277. "fo", "fop",
  278. [][]byte{[]byte("fop")},
  279. },
  280. // delete tail
  281. {
  282. []string{"foo", "foo/abc", "fop"},
  283. "foo/", "fos",
  284. [][]byte{[]byte("foo")},
  285. },
  286. // delete exact
  287. {
  288. []string{"foo", "foo/abc", "fop"},
  289. "foo/abc", "",
  290. [][]byte{[]byte("foo"), []byte("fop")},
  291. },
  292. // delete none, [x,x)
  293. {
  294. []string{"foo"},
  295. "foo", "foo",
  296. [][]byte{[]byte("foo")},
  297. },
  298. }
  299. for i, tt := range tests {
  300. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  301. kvc := clus.RandClient().KV
  302. ks := tt.keySet
  303. for j := range ks {
  304. reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
  305. _, err := kvc.Put(context.TODO(), reqput)
  306. if err != nil {
  307. t.Fatalf("couldn't put key (%v)", err)
  308. }
  309. }
  310. dreq := &pb.DeleteRangeRequest{
  311. Key: []byte(tt.begin),
  312. RangeEnd: []byte(tt.end)}
  313. dresp, err := kvc.DeleteRange(context.TODO(), dreq)
  314. if err != nil {
  315. t.Fatalf("couldn't delete range on test %d (%v)", i, err)
  316. }
  317. rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
  318. rresp, err := kvc.Range(context.TODO(), rreq)
  319. if err != nil {
  320. t.Errorf("couldn't get range on test %v (%v)", i, err)
  321. }
  322. if dresp.Header.Revision != rresp.Header.Revision {
  323. t.Errorf("expected revision %v, got %v",
  324. dresp.Header.Revision, rresp.Header.Revision)
  325. }
  326. keys := [][]byte{}
  327. for j := range rresp.Kvs {
  328. keys = append(keys, rresp.Kvs[j].Key)
  329. }
  330. if reflect.DeepEqual(tt.wantSet, keys) == false {
  331. t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
  332. }
  333. // can't defer because tcp ports will be in use
  334. clus.Terminate(t)
  335. }
  336. }
  337. // TestV3TxnInvaildRange tests txn
  338. func TestV3TxnInvaildRange(t *testing.T) {
  339. defer testutil.AfterTest(t)
  340. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  341. defer clus.Terminate(t)
  342. kvc := clus.RandClient().KV
  343. preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  344. for i := 0; i < 3; i++ {
  345. _, err := kvc.Put(context.Background(), preq)
  346. if err != nil {
  347. t.Fatalf("couldn't put key (%v)", err)
  348. }
  349. }
  350. _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
  351. if err != nil {
  352. t.Fatalf("couldn't compact kv space (%v)", err)
  353. }
  354. // future rev
  355. txn := &pb.TxnRequest{}
  356. txn.Success = append(txn.Success, &pb.RequestUnion{
  357. Request: &pb.RequestUnion_RequestPut{
  358. RequestPut: preq}})
  359. rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
  360. txn.Success = append(txn.Success, &pb.RequestUnion{
  361. Request: &pb.RequestUnion_RequestRange{
  362. RequestRange: rreq}})
  363. if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrFutureRev {
  364. t.Errorf("err = %v, want %v", err, v3rpc.ErrFutureRev)
  365. }
  366. // compacted rev
  367. tv, _ := txn.Success[1].Request.(*pb.RequestUnion_RequestRange)
  368. tv.RequestRange.Revision = 1
  369. if _, err := kvc.Txn(context.TODO(), txn); err != v3rpc.ErrCompacted {
  370. t.Errorf("err = %v, want %v", err, v3rpc.ErrCompacted)
  371. }
  372. }
  373. // TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
  374. func TestV3WatchFromCurrentRevision(t *testing.T) {
  375. defer testutil.AfterTest(t)
  376. tests := []struct {
  377. putKeys []string
  378. watchRequest *pb.WatchRequest
  379. wresps []*pb.WatchResponse
  380. }{
  381. // watch the key, matching
  382. {
  383. []string{"foo"},
  384. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  385. CreateRequest: &pb.WatchCreateRequest{
  386. Key: []byte("foo")}}},
  387. []*pb.WatchResponse{
  388. {
  389. Header: &pb.ResponseHeader{Revision: 1},
  390. Created: true,
  391. },
  392. {
  393. Header: &pb.ResponseHeader{Revision: 2},
  394. Created: false,
  395. Events: []*storagepb.Event{
  396. {
  397. Type: storagepb.PUT,
  398. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  399. },
  400. },
  401. },
  402. },
  403. },
  404. // watch the key, non-matching
  405. {
  406. []string{"foo"},
  407. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  408. CreateRequest: &pb.WatchCreateRequest{
  409. Key: []byte("helloworld")}}},
  410. []*pb.WatchResponse{
  411. {
  412. Header: &pb.ResponseHeader{Revision: 1},
  413. Created: true,
  414. },
  415. },
  416. },
  417. // watch the prefix, matching
  418. {
  419. []string{"fooLong"},
  420. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  421. CreateRequest: &pb.WatchCreateRequest{
  422. Prefix: []byte("foo")}}},
  423. []*pb.WatchResponse{
  424. {
  425. Header: &pb.ResponseHeader{Revision: 1},
  426. Created: true,
  427. },
  428. {
  429. Header: &pb.ResponseHeader{Revision: 2},
  430. Created: false,
  431. Events: []*storagepb.Event{
  432. {
  433. Type: storagepb.PUT,
  434. Kv: &storagepb.KeyValue{Key: []byte("fooLong"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  435. },
  436. },
  437. },
  438. },
  439. },
  440. // watch the prefix, non-matching
  441. {
  442. []string{"foo"},
  443. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  444. CreateRequest: &pb.WatchCreateRequest{
  445. Prefix: []byte("helloworld")}}},
  446. []*pb.WatchResponse{
  447. {
  448. Header: &pb.ResponseHeader{Revision: 1},
  449. Created: true,
  450. },
  451. },
  452. },
  453. // multiple puts, one watcher with matching key
  454. {
  455. []string{"foo", "foo", "foo"},
  456. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  457. CreateRequest: &pb.WatchCreateRequest{
  458. Key: []byte("foo")}}},
  459. []*pb.WatchResponse{
  460. {
  461. Header: &pb.ResponseHeader{Revision: 1},
  462. Created: true,
  463. },
  464. {
  465. Header: &pb.ResponseHeader{Revision: 2},
  466. Created: false,
  467. Events: []*storagepb.Event{
  468. {
  469. Type: storagepb.PUT,
  470. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  471. },
  472. },
  473. },
  474. {
  475. Header: &pb.ResponseHeader{Revision: 3},
  476. Created: false,
  477. Events: []*storagepb.Event{
  478. {
  479. Type: storagepb.PUT,
  480. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 3, Version: 2},
  481. },
  482. },
  483. },
  484. {
  485. Header: &pb.ResponseHeader{Revision: 4},
  486. Created: false,
  487. Events: []*storagepb.Event{
  488. {
  489. Type: storagepb.PUT,
  490. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 3},
  491. },
  492. },
  493. },
  494. },
  495. },
  496. // multiple puts, one watcher with matching prefix
  497. {
  498. []string{"foo", "foo", "foo"},
  499. &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  500. CreateRequest: &pb.WatchCreateRequest{
  501. Prefix: []byte("foo")}}},
  502. []*pb.WatchResponse{
  503. {
  504. Header: &pb.ResponseHeader{Revision: 1},
  505. Created: true,
  506. },
  507. {
  508. Header: &pb.ResponseHeader{Revision: 2},
  509. Created: false,
  510. Events: []*storagepb.Event{
  511. {
  512. Type: storagepb.PUT,
  513. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  514. },
  515. },
  516. },
  517. {
  518. Header: &pb.ResponseHeader{Revision: 3},
  519. Created: false,
  520. Events: []*storagepb.Event{
  521. {
  522. Type: storagepb.PUT,
  523. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 3, Version: 2},
  524. },
  525. },
  526. },
  527. {
  528. Header: &pb.ResponseHeader{Revision: 4},
  529. Created: false,
  530. Events: []*storagepb.Event{
  531. {
  532. Type: storagepb.PUT,
  533. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 3},
  534. },
  535. },
  536. },
  537. },
  538. },
  539. }
  540. for i, tt := range tests {
  541. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  542. wAPI := clus.RandClient().Watch
  543. ctx, cancel := context.WithCancel(context.Background())
  544. defer cancel()
  545. wStream, err := wAPI.Watch(ctx)
  546. if err != nil {
  547. t.Fatalf("#%d: wAPI.Watch error: %v", i, err)
  548. }
  549. if err := wStream.Send(tt.watchRequest); err != nil {
  550. t.Fatalf("#%d: wStream.Send error: %v", i, err)
  551. }
  552. go func() {
  553. for _, k := range tt.putKeys {
  554. kvc := clus.RandClient().KV
  555. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  556. if _, err := kvc.Put(context.TODO(), req); err != nil {
  557. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  558. }
  559. }
  560. }()
  561. var createdWatchId int64
  562. for j, wresp := range tt.wresps {
  563. resp, err := wStream.Recv()
  564. if err != nil {
  565. t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err)
  566. }
  567. if resp.Header == nil {
  568. t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j)
  569. }
  570. if resp.Header.Revision != wresp.Header.Revision {
  571. t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision)
  572. }
  573. if wresp.Created != resp.Created {
  574. t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created)
  575. }
  576. if resp.Created {
  577. createdWatchId = resp.WatchId
  578. }
  579. if resp.WatchId != createdWatchId {
  580. t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId)
  581. }
  582. if !reflect.DeepEqual(resp.Events, wresp.Events) {
  583. t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events)
  584. }
  585. }
  586. rok, nr := WaitResponse(wStream, 1*time.Second)
  587. if !rok {
  588. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  589. }
  590. // can't defer because tcp ports will be in use
  591. clus.Terminate(t)
  592. }
  593. }
  594. // TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
  595. func TestV3WatchCancelSynced(t *testing.T) {
  596. defer testutil.AfterTest(t)
  597. testV3WatchCancel(t, 0)
  598. }
  599. // TestV3WatchCancelUnsynced tests Watch APIs cancellation from unsynced map.
  600. func TestV3WatchCancelUnsynced(t *testing.T) {
  601. defer testutil.AfterTest(t)
  602. testV3WatchCancel(t, 1)
  603. }
  604. func testV3WatchCancel(t *testing.T, startRev int64) {
  605. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  606. ctx, cancel := context.WithCancel(context.Background())
  607. defer cancel()
  608. wStream, errW := clus.RandClient().Watch.Watch(ctx)
  609. if errW != nil {
  610. t.Fatalf("wAPI.Watch error: %v", errW)
  611. }
  612. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  613. CreateRequest: &pb.WatchCreateRequest{
  614. Key: []byte("foo"), StartRevision: startRev}}}
  615. if err := wStream.Send(wreq); err != nil {
  616. t.Fatalf("wStream.Send error: %v", err)
  617. }
  618. wresp, errR := wStream.Recv()
  619. if errR != nil {
  620. t.Errorf("wStream.Recv error: %v", errR)
  621. }
  622. if !wresp.Created {
  623. t.Errorf("wresp.Created got = %v, want = true", wresp.Created)
  624. }
  625. creq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{
  626. CancelRequest: &pb.WatchCancelRequest{
  627. WatchId: wresp.WatchId}}}
  628. if err := wStream.Send(creq); err != nil {
  629. t.Fatalf("wStream.Send error: %v", err)
  630. }
  631. cresp, err := wStream.Recv()
  632. if err != nil {
  633. t.Errorf("wStream.Recv error: %v", err)
  634. }
  635. if !cresp.Canceled {
  636. t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled)
  637. }
  638. kvc := clus.RandClient().KV
  639. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
  640. t.Errorf("couldn't put key (%v)", err)
  641. }
  642. // watch got canceled, so this should block
  643. rok, nr := WaitResponse(wStream, 1*time.Second)
  644. if !rok {
  645. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  646. }
  647. clus.Terminate(t)
  648. }
  649. func TestV3WatchMultipleWatchersSynced(t *testing.T) {
  650. defer testutil.AfterTest(t)
  651. testV3WatchMultipleWatchers(t, 0)
  652. }
  653. func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
  654. defer testutil.AfterTest(t)
  655. testV3WatchMultipleWatchers(t, 1)
  656. }
  657. // testV3WatchMultipleWatchers tests multiple watchers on the same key
  658. // and one watcher with matching prefix. It first puts the key
  659. // that matches all watchers, and another key that matches only
  660. // one watcher to test if it receives expected events.
  661. func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
  662. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  663. kvc := clus.RandClient().KV
  664. ctx, cancel := context.WithCancel(context.Background())
  665. defer cancel()
  666. wStream, errW := clus.RandClient().Watch.Watch(ctx)
  667. if errW != nil {
  668. t.Fatalf("wAPI.Watch error: %v", errW)
  669. }
  670. watchKeyN := 4
  671. for i := 0; i < watchKeyN+1; i++ {
  672. var wreq *pb.WatchRequest
  673. if i < watchKeyN {
  674. wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  675. CreateRequest: &pb.WatchCreateRequest{
  676. Key: []byte("foo"), StartRevision: startRev}}}
  677. } else {
  678. wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  679. CreateRequest: &pb.WatchCreateRequest{
  680. Prefix: []byte("fo"), StartRevision: startRev}}}
  681. }
  682. if err := wStream.Send(wreq); err != nil {
  683. t.Fatalf("wStream.Send error: %v", err)
  684. }
  685. }
  686. ids := make(map[int64]struct{})
  687. for i := 0; i < watchKeyN+1; i++ {
  688. wresp, err := wStream.Recv()
  689. if err != nil {
  690. t.Fatalf("wStream.Recv error: %v", err)
  691. }
  692. if !wresp.Created {
  693. t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
  694. }
  695. ids[wresp.WatchId] = struct{}{}
  696. }
  697. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
  698. t.Fatalf("couldn't put key (%v)", err)
  699. }
  700. for i := 0; i < watchKeyN+1; i++ {
  701. wresp, err := wStream.Recv()
  702. if err != nil {
  703. t.Fatalf("wStream.Recv error: %v", err)
  704. }
  705. if _, ok := ids[wresp.WatchId]; !ok {
  706. t.Errorf("watchId %d is not created!", wresp.WatchId)
  707. } else {
  708. delete(ids, wresp.WatchId)
  709. }
  710. if len(wresp.Events) == 0 {
  711. t.Errorf("#%d: no events received", i)
  712. }
  713. for _, ev := range wresp.Events {
  714. if string(ev.Kv.Key) != "foo" {
  715. t.Errorf("ev.Kv.Key got = %s, want = foo", ev.Kv.Key)
  716. }
  717. if string(ev.Kv.Value) != "bar" {
  718. t.Errorf("ev.Kv.Value got = %s, want = bar", ev.Kv.Value)
  719. }
  720. }
  721. }
  722. // now put one key that has only one matching watcher
  723. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("fo"), Value: []byte("bar")}); err != nil {
  724. t.Fatalf("couldn't put key (%v)", err)
  725. }
  726. wresp, err := wStream.Recv()
  727. if err != nil {
  728. t.Errorf("wStream.Recv error: %v", err)
  729. }
  730. if len(wresp.Events) != 1 {
  731. t.Fatalf("len(wresp.Events) got = %d, want = 1", len(wresp.Events))
  732. }
  733. if string(wresp.Events[0].Kv.Key) != "fo" {
  734. t.Errorf("wresp.Events[0].Kv.Key got = %s, want = fo", wresp.Events[0].Kv.Key)
  735. }
  736. // now Recv should block because there is no more events coming
  737. rok, nr := WaitResponse(wStream, 1*time.Second)
  738. if !rok {
  739. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  740. }
  741. clus.Terminate(t)
  742. }
  743. func TestV3WatchMultipleEventsTxnSynced(t *testing.T) {
  744. defer testutil.AfterTest(t)
  745. testV3WatchMultipleEventsTxn(t, 0)
  746. }
  747. func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
  748. defer testutil.AfterTest(t)
  749. testV3WatchMultipleEventsTxn(t, 1)
  750. }
  751. // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
  752. func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
  753. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  754. ctx, cancel := context.WithCancel(context.Background())
  755. defer cancel()
  756. wStream, wErr := clus.RandClient().Watch.Watch(ctx)
  757. if wErr != nil {
  758. t.Fatalf("wAPI.Watch error: %v", wErr)
  759. }
  760. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  761. CreateRequest: &pb.WatchCreateRequest{
  762. Prefix: []byte("foo"), StartRevision: startRev}}}
  763. if err := wStream.Send(wreq); err != nil {
  764. t.Fatalf("wStream.Send error: %v", err)
  765. }
  766. kvc := clus.RandClient().KV
  767. txn := pb.TxnRequest{}
  768. for i := 0; i < 3; i++ {
  769. ru := &pb.RequestUnion{}
  770. ru.Request = &pb.RequestUnion_RequestPut{
  771. RequestPut: &pb.PutRequest{
  772. Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}}
  773. txn.Success = append(txn.Success, ru)
  774. }
  775. tresp, err := kvc.Txn(context.Background(), &txn)
  776. if err != nil {
  777. t.Fatalf("kvc.Txn error: %v", err)
  778. }
  779. if !tresp.Succeeded {
  780. t.Fatalf("kvc.Txn failed: %+v", tresp)
  781. }
  782. events := []*storagepb.Event{}
  783. for len(events) < 3 {
  784. resp, err := wStream.Recv()
  785. if err != nil {
  786. t.Errorf("wStream.Recv error: %v", err)
  787. }
  788. if resp.Created {
  789. continue
  790. }
  791. events = append(events, resp.Events...)
  792. }
  793. sort.Sort(eventsSortByKey(events))
  794. wevents := []*storagepb.Event{
  795. {
  796. Type: storagepb.PUT,
  797. Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  798. },
  799. {
  800. Type: storagepb.PUT,
  801. Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  802. },
  803. {
  804. Type: storagepb.PUT,
  805. Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  806. },
  807. }
  808. if !reflect.DeepEqual(events, wevents) {
  809. t.Errorf("events got = %+v, want = %+v", events, wevents)
  810. }
  811. rok, nr := WaitResponse(wStream, 1*time.Second)
  812. if !rok {
  813. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  814. }
  815. // can't defer because tcp ports will be in use
  816. clus.Terminate(t)
  817. }
  818. type eventsSortByKey []*storagepb.Event
  819. func (evs eventsSortByKey) Len() int { return len(evs) }
  820. func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
  821. func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
  822. func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
  823. defer testutil.AfterTest(t)
  824. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  825. defer clus.Terminate(t)
  826. kvc := clus.RandClient().KV
  827. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
  828. t.Fatalf("couldn't put key (%v)", err)
  829. }
  830. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
  831. t.Fatalf("couldn't put key (%v)", err)
  832. }
  833. ctx, cancel := context.WithCancel(context.Background())
  834. defer cancel()
  835. wStream, wErr := clus.RandClient().Watch.Watch(ctx)
  836. if wErr != nil {
  837. t.Fatalf("wAPI.Watch error: %v", wErr)
  838. }
  839. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  840. CreateRequest: &pb.WatchCreateRequest{
  841. Prefix: []byte("foo"), StartRevision: 1}}}
  842. if err := wStream.Send(wreq); err != nil {
  843. t.Fatalf("wStream.Send error: %v", err)
  844. }
  845. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil {
  846. t.Fatalf("couldn't put key (%v)", err)
  847. }
  848. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil {
  849. t.Fatalf("couldn't put key (%v)", err)
  850. }
  851. allWevents := []*storagepb.Event{
  852. {
  853. Type: storagepb.PUT,
  854. Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  855. },
  856. {
  857. Type: storagepb.PUT,
  858. Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
  859. },
  860. {
  861. Type: storagepb.PUT,
  862. Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 2},
  863. },
  864. {
  865. Type: storagepb.PUT,
  866. Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 5, Version: 2},
  867. },
  868. }
  869. events := []*storagepb.Event{}
  870. for len(events) < 4 {
  871. resp, err := wStream.Recv()
  872. if err != nil {
  873. t.Errorf("wStream.Recv error: %v", err)
  874. }
  875. if resp.Created {
  876. continue
  877. }
  878. events = append(events, resp.Events...)
  879. // if PUT requests are committed by now, first receive would return
  880. // multiple events, but if not, it returns a single event. In SSD,
  881. // it should return 4 events at once.
  882. }
  883. if !reflect.DeepEqual(events, allWevents) {
  884. t.Errorf("events got = %+v, want = %+v", events, allWevents)
  885. }
  886. rok, nr := WaitResponse(wStream, 1*time.Second)
  887. if !rok {
  888. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  889. }
  890. }
  891. func TestV3WatchMultipleStreamsSynced(t *testing.T) {
  892. defer testutil.AfterTest(t)
  893. testV3WatchMultipleStreams(t, 0)
  894. }
  895. func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
  896. defer testutil.AfterTest(t)
  897. testV3WatchMultipleStreams(t, 1)
  898. }
  899. // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
  900. func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
  901. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  902. wAPI := clus.RandClient().Watch
  903. kvc := clus.RandClient().KV
  904. streams := make([]pb.Watch_WatchClient, 5)
  905. for i := range streams {
  906. ctx, cancel := context.WithCancel(context.Background())
  907. defer cancel()
  908. wStream, errW := wAPI.Watch(ctx)
  909. if errW != nil {
  910. t.Fatalf("wAPI.Watch error: %v", errW)
  911. }
  912. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  913. CreateRequest: &pb.WatchCreateRequest{
  914. Key: []byte("foo"), StartRevision: startRev}}}
  915. if err := wStream.Send(wreq); err != nil {
  916. t.Fatalf("wStream.Send error: %v", err)
  917. }
  918. streams[i] = wStream
  919. }
  920. for _, wStream := range streams {
  921. wresp, err := wStream.Recv()
  922. if err != nil {
  923. t.Fatalf("wStream.Recv error: %v", err)
  924. }
  925. if !wresp.Created {
  926. t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
  927. }
  928. }
  929. if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
  930. t.Fatalf("couldn't put key (%v)", err)
  931. }
  932. var wg sync.WaitGroup
  933. wg.Add(len(streams))
  934. wevents := []*storagepb.Event{
  935. {
  936. Type: storagepb.PUT,
  937. Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
  938. },
  939. }
  940. for i := range streams {
  941. go func(i int) {
  942. defer wg.Done()
  943. wStream := streams[i]
  944. wresp, err := wStream.Recv()
  945. if err != nil {
  946. t.Fatalf("wStream.Recv error: %v", err)
  947. }
  948. if wresp.WatchId != 0 {
  949. t.Errorf("watchId got = %d, want = 0", wresp.WatchId)
  950. }
  951. if !reflect.DeepEqual(wresp.Events, wevents) {
  952. t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents)
  953. }
  954. // now Recv should block because there is no more events coming
  955. rok, nr := WaitResponse(wStream, 1*time.Second)
  956. if !rok {
  957. t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
  958. }
  959. }(i)
  960. }
  961. wg.Wait()
  962. clus.Terminate(t)
  963. }
  964. // WaitResponse waits on the given stream for given duration.
  965. // If there is no more events, true and a nil response will be
  966. // returned closing the WatchClient stream. Or the response will
  967. // be returned.
  968. func WaitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) {
  969. rCh := make(chan *pb.WatchResponse)
  970. go func() {
  971. resp, _ := wc.Recv()
  972. rCh <- resp
  973. }()
  974. select {
  975. case nr := <-rCh:
  976. return false, nr
  977. case <-time.After(timeout):
  978. }
  979. wc.CloseSend()
  980. rv, ok := <-rCh
  981. if rv != nil || !ok {
  982. return false, rv
  983. }
  984. return true, nil
  985. }
  986. func TestV3RangeRequest(t *testing.T) {
  987. defer testutil.AfterTest(t)
  988. tests := []struct {
  989. putKeys []string
  990. reqs []pb.RangeRequest
  991. wresps [][]string
  992. wmores []bool
  993. }{
  994. // single key
  995. {
  996. []string{"foo", "bar"},
  997. []pb.RangeRequest{
  998. // exists
  999. {Key: []byte("foo")},
  1000. // doesn't exist
  1001. {Key: []byte("baz")},
  1002. },
  1003. [][]string{
  1004. {"foo"},
  1005. {},
  1006. },
  1007. []bool{false, false},
  1008. },
  1009. // multi-key
  1010. {
  1011. []string{"a", "b", "c", "d", "e"},
  1012. []pb.RangeRequest{
  1013. // all in range
  1014. {Key: []byte("a"), RangeEnd: []byte("z")},
  1015. // [b, d)
  1016. {Key: []byte("b"), RangeEnd: []byte("d")},
  1017. // out of range
  1018. {Key: []byte("f"), RangeEnd: []byte("z")},
  1019. // [c,c) = empty
  1020. {Key: []byte("c"), RangeEnd: []byte("c")},
  1021. // [d, b) = empty
  1022. {Key: []byte("d"), RangeEnd: []byte("b")},
  1023. },
  1024. [][]string{
  1025. {"a", "b", "c", "d", "e"},
  1026. {"b", "c"},
  1027. {},
  1028. {},
  1029. {},
  1030. },
  1031. []bool{false, false, false, false, false},
  1032. },
  1033. // revision
  1034. {
  1035. []string{"a", "b", "c", "d", "e"},
  1036. []pb.RangeRequest{
  1037. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
  1038. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
  1039. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
  1040. {Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
  1041. },
  1042. [][]string{
  1043. {"a", "b", "c", "d", "e"},
  1044. {},
  1045. {"a"},
  1046. {"a", "b"},
  1047. },
  1048. []bool{false, false, false, false},
  1049. },
  1050. // limit
  1051. {
  1052. []string{"foo", "bar"},
  1053. []pb.RangeRequest{
  1054. // more
  1055. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
  1056. // no more
  1057. {Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
  1058. },
  1059. [][]string{
  1060. {"bar"},
  1061. {"bar", "foo"},
  1062. },
  1063. []bool{true, false},
  1064. },
  1065. // sort
  1066. {
  1067. []string{"b", "a", "c", "d", "c"},
  1068. []pb.RangeRequest{
  1069. {
  1070. Key: []byte("a"), RangeEnd: []byte("z"),
  1071. Limit: 1,
  1072. SortOrder: pb.RangeRequest_ASCEND,
  1073. SortTarget: pb.RangeRequest_KEY,
  1074. },
  1075. {
  1076. Key: []byte("a"), RangeEnd: []byte("z"),
  1077. Limit: 1,
  1078. SortOrder: pb.RangeRequest_DESCEND,
  1079. SortTarget: pb.RangeRequest_KEY,
  1080. },
  1081. {
  1082. Key: []byte("a"), RangeEnd: []byte("z"),
  1083. Limit: 1,
  1084. SortOrder: pb.RangeRequest_ASCEND,
  1085. SortTarget: pb.RangeRequest_CREATE,
  1086. },
  1087. {
  1088. Key: []byte("a"), RangeEnd: []byte("z"),
  1089. Limit: 1,
  1090. SortOrder: pb.RangeRequest_DESCEND,
  1091. SortTarget: pb.RangeRequest_MOD,
  1092. },
  1093. {
  1094. Key: []byte("z"), RangeEnd: []byte("z"),
  1095. Limit: 1,
  1096. SortOrder: pb.RangeRequest_DESCEND,
  1097. SortTarget: pb.RangeRequest_CREATE,
  1098. },
  1099. },
  1100. [][]string{
  1101. {"a"},
  1102. {"d"},
  1103. {"b"},
  1104. {"c"},
  1105. {},
  1106. },
  1107. []bool{true, true, true, true, false},
  1108. },
  1109. }
  1110. for i, tt := range tests {
  1111. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1112. for _, k := range tt.putKeys {
  1113. kvc := clus.RandClient().KV
  1114. req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
  1115. if _, err := kvc.Put(context.TODO(), req); err != nil {
  1116. t.Fatalf("#%d: couldn't put key (%v)", i, err)
  1117. }
  1118. }
  1119. for j, req := range tt.reqs {
  1120. kvc := clus.RandClient().KV
  1121. resp, err := kvc.Range(context.TODO(), &req)
  1122. if err != nil {
  1123. t.Errorf("#%d.%d: Range error: %v", i, j, err)
  1124. continue
  1125. }
  1126. if len(resp.Kvs) != len(tt.wresps[j]) {
  1127. t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
  1128. continue
  1129. }
  1130. for k, wKey := range tt.wresps[j] {
  1131. respKey := string(resp.Kvs[k].Key)
  1132. if respKey != wKey {
  1133. t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
  1134. }
  1135. }
  1136. if resp.More != tt.wmores[j] {
  1137. t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
  1138. }
  1139. wrev := int64(len(tt.putKeys) + 1)
  1140. if resp.Header.Revision != wrev {
  1141. t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
  1142. }
  1143. }
  1144. clus.Terminate(t)
  1145. }
  1146. }
  1147. // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
  1148. func TestV3LeaseRevoke(t *testing.T) {
  1149. defer testutil.AfterTest(t)
  1150. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  1151. lc := clus.RandClient().Lease
  1152. _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  1153. return err
  1154. })
  1155. }
  1156. // TestV3LeaseCreateById ensures leases may be created by a given id.
  1157. func TestV3LeaseCreateByID(t *testing.T) {
  1158. defer testutil.AfterTest(t)
  1159. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1160. defer clus.Terminate(t)
  1161. // create fixed lease
  1162. lresp, err := clus.RandClient().Lease.LeaseCreate(
  1163. context.TODO(),
  1164. &pb.LeaseCreateRequest{ID: 1, TTL: 1})
  1165. if err != nil {
  1166. t.Errorf("could not create lease 1 (%v)", err)
  1167. }
  1168. if lresp.ID != 1 {
  1169. t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
  1170. }
  1171. // create duplicate fixed lease
  1172. lresp, err = clus.RandClient().Lease.LeaseCreate(
  1173. context.TODO(),
  1174. &pb.LeaseCreateRequest{ID: 1, TTL: 1})
  1175. if err != nil {
  1176. t.Error(err)
  1177. }
  1178. if lresp.ID != 0 || lresp.Error != lease.ErrLeaseExists.Error() {
  1179. t.Errorf("got id %v, wanted id 0 (%v)", lresp.ID, lresp.Error)
  1180. }
  1181. // create fresh fixed lease
  1182. lresp, err = clus.RandClient().Lease.LeaseCreate(
  1183. context.TODO(),
  1184. &pb.LeaseCreateRequest{ID: 2, TTL: 1})
  1185. if err != nil {
  1186. t.Errorf("could not create lease 2 (%v)", err)
  1187. }
  1188. if lresp.ID != 2 {
  1189. t.Errorf("got id %v, wanted id %v", lresp.ID, 2)
  1190. }
  1191. }
  1192. // TestV3LeaseExpire ensures a key is deleted once a key expires.
  1193. func TestV3LeaseExpire(t *testing.T) {
  1194. defer testutil.AfterTest(t)
  1195. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  1196. // let lease lapse; wait for deleted key
  1197. ctx, cancel := context.WithCancel(context.Background())
  1198. defer cancel()
  1199. wStream, err := clus.RandClient().Watch.Watch(ctx)
  1200. if err != nil {
  1201. return err
  1202. }
  1203. wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
  1204. CreateRequest: &pb.WatchCreateRequest{
  1205. Key: []byte("foo"), StartRevision: 1}}}
  1206. if err := wStream.Send(wreq); err != nil {
  1207. return err
  1208. }
  1209. if _, err := wStream.Recv(); err != nil {
  1210. // the 'created' message
  1211. return err
  1212. }
  1213. if _, err := wStream.Recv(); err != nil {
  1214. // the 'put' message
  1215. return err
  1216. }
  1217. errc := make(chan error, 1)
  1218. go func() {
  1219. resp, err := wStream.Recv()
  1220. switch {
  1221. case err != nil:
  1222. errc <- err
  1223. case len(resp.Events) != 1:
  1224. fallthrough
  1225. case resp.Events[0].Type != storagepb.DELETE:
  1226. errc <- fmt.Errorf("expected key delete, got %v", resp)
  1227. default:
  1228. errc <- nil
  1229. }
  1230. }()
  1231. select {
  1232. case <-time.After(15 * time.Second):
  1233. return fmt.Errorf("lease expiration too slow")
  1234. case err := <-errc:
  1235. return err
  1236. }
  1237. })
  1238. }
  1239. // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
  1240. func TestV3LeaseKeepAlive(t *testing.T) {
  1241. defer testutil.AfterTest(t)
  1242. testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
  1243. lc := clus.RandClient().Lease
  1244. lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
  1245. ctx, cancel := context.WithCancel(context.Background())
  1246. defer cancel()
  1247. lac, err := lc.LeaseKeepAlive(ctx)
  1248. if err != nil {
  1249. return err
  1250. }
  1251. defer lac.CloseSend()
  1252. // renew long enough so lease would've expired otherwise
  1253. for i := 0; i < 3; i++ {
  1254. if err = lac.Send(lreq); err != nil {
  1255. return err
  1256. }
  1257. lresp, rxerr := lac.Recv()
  1258. if rxerr != nil {
  1259. return rxerr
  1260. }
  1261. if lresp.ID != leaseID {
  1262. return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
  1263. }
  1264. time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
  1265. }
  1266. _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
  1267. return err
  1268. })
  1269. }
  1270. // TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another
  1271. // client to confirm it's visible to the whole cluster.
  1272. func TestV3LeaseExists(t *testing.T) {
  1273. defer testutil.AfterTest(t)
  1274. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1275. defer clus.Terminate(t)
  1276. // create lease
  1277. ctx0, cancel0 := context.WithCancel(context.Background())
  1278. defer cancel0()
  1279. lresp, err := clus.RandClient().Lease.LeaseCreate(
  1280. ctx0,
  1281. &pb.LeaseCreateRequest{TTL: 30})
  1282. if err != nil {
  1283. t.Fatal(err)
  1284. }
  1285. if lresp.Error != "" {
  1286. t.Fatal(lresp.Error)
  1287. }
  1288. // confirm keepalive
  1289. ctx1, cancel1 := context.WithCancel(context.Background())
  1290. defer cancel1()
  1291. lac, err := clus.RandClient().Lease.LeaseKeepAlive(ctx1)
  1292. if err != nil {
  1293. t.Fatal(err)
  1294. }
  1295. defer lac.CloseSend()
  1296. if err = lac.Send(&pb.LeaseKeepAliveRequest{ID: lresp.ID}); err != nil {
  1297. t.Fatal(err)
  1298. }
  1299. if _, err = lac.Recv(); err != nil {
  1300. t.Fatal(err)
  1301. }
  1302. }
  1303. // acquireLeaseAndKey creates a new lease and creates an attached key.
  1304. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
  1305. // create lease
  1306. lresp, err := clus.RandClient().Lease.LeaseCreate(
  1307. context.TODO(),
  1308. &pb.LeaseCreateRequest{TTL: 1})
  1309. if err != nil {
  1310. return 0, err
  1311. }
  1312. if lresp.Error != "" {
  1313. return 0, fmt.Errorf(lresp.Error)
  1314. }
  1315. // attach to key
  1316. put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
  1317. if _, err := clus.RandClient().KV.Put(context.TODO(), put); err != nil {
  1318. return 0, err
  1319. }
  1320. return lresp.ID, nil
  1321. }
  1322. // testLeaseRemoveLeasedKey performs some action while holding a lease with an
  1323. // attached key "foo", then confirms the key is gone.
  1324. func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
  1325. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  1326. defer clus.Terminate(t)
  1327. leaseID, err := acquireLeaseAndKey(clus, "foo")
  1328. if err != nil {
  1329. t.Fatal(err)
  1330. }
  1331. if err = act(clus, leaseID); err != nil {
  1332. t.Fatal(err)
  1333. }
  1334. // confirm no key
  1335. rreq := &pb.RangeRequest{Key: []byte("foo")}
  1336. rresp, err := clus.RandClient().KV.Range(context.TODO(), rreq)
  1337. if err != nil {
  1338. t.Fatal(err)
  1339. }
  1340. if len(rresp.Kvs) != 0 {
  1341. t.Fatalf("lease removed but key remains")
  1342. }
  1343. }
  1344. func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
  1345. cfg.UseV3 = true
  1346. cfg.UseGRPC = true
  1347. clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
  1348. clus.Launch(t)
  1349. return clus
  1350. }
  1351. // TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
  1352. func TestTLSGRPCRejectInsecureClient(t *testing.T) {
  1353. defer testutil.AfterTest(t)
  1354. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1355. clus := newClusterV3NoClients(t, &cfg)
  1356. defer clus.Terminate(t)
  1357. // nil out TLS field so client will use an insecure connection
  1358. clus.Members[0].ClientTLSInfo = nil
  1359. client, err := NewClientV3(clus.Members[0])
  1360. if err != nil && err != grpc.ErrClientConnTimeout {
  1361. t.Fatalf("unexpected error (%v)", err)
  1362. } else if client == nil {
  1363. // Ideally, no client would be returned. However, grpc will
  1364. // return a connection without trying to handshake first so
  1365. // the connection appears OK.
  1366. return
  1367. }
  1368. defer client.Close()
  1369. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  1370. conn := client.ActiveConnection()
  1371. st, err := conn.State()
  1372. if err != nil {
  1373. t.Fatal(err)
  1374. } else if st != grpc.Ready {
  1375. t.Fatalf("expected Ready, got %v", st)
  1376. }
  1377. // rpc will fail to handshake, triggering a connection state change
  1378. donec := make(chan error, 1)
  1379. go func() {
  1380. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1381. _, perr := client.KV.Put(ctx, reqput)
  1382. donec <- perr
  1383. }()
  1384. st, err = conn.WaitForStateChange(ctx, st)
  1385. if err != nil {
  1386. t.Fatalf("unexpected error waiting for change (%v)", err)
  1387. } else if st != grpc.Connecting && st != grpc.TransientFailure {
  1388. t.Fatalf("expected connecting or transient failure state, got %v", st)
  1389. }
  1390. cancel()
  1391. if perr := <-donec; perr == nil {
  1392. t.Fatalf("expected client error on put")
  1393. }
  1394. }
  1395. // TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
  1396. func TestTLSGRPCRejectSecureClient(t *testing.T) {
  1397. defer testutil.AfterTest(t)
  1398. cfg := ClusterConfig{Size: 3}
  1399. clus := newClusterV3NoClients(t, &cfg)
  1400. defer clus.Terminate(t)
  1401. clus.Members[0].ClientTLSInfo = &testTLSInfo
  1402. client, err := NewClientV3(clus.Members[0])
  1403. if client != nil || err == nil {
  1404. t.Fatalf("expected no client")
  1405. } else if err != grpc.ErrClientConnTimeout {
  1406. t.Fatalf("unexpected error (%v)", err)
  1407. }
  1408. }
  1409. // TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
  1410. func TestTLSGRPCAcceptSecureAll(t *testing.T) {
  1411. defer testutil.AfterTest(t)
  1412. cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
  1413. clus := newClusterV3NoClients(t, &cfg)
  1414. defer clus.Terminate(t)
  1415. client, err := NewClientV3(clus.Members[0])
  1416. if err != nil {
  1417. t.Fatalf("expected tls client (%v)", err)
  1418. }
  1419. defer client.Close()
  1420. reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
  1421. if _, err := client.KV.Put(context.TODO(), reqput); err != nil {
  1422. t.Fatalf("unexpected error on put over tls (%v)", err)
  1423. }
  1424. }