kvstore_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mvcc
  15. import (
  16. "bytes"
  17. "crypto/rand"
  18. "encoding/binary"
  19. "fmt"
  20. "math"
  21. mrand "math/rand"
  22. "os"
  23. "reflect"
  24. "sort"
  25. "strconv"
  26. "sync"
  27. "testing"
  28. "time"
  29. "go.etcd.io/etcd/lease"
  30. "go.etcd.io/etcd/mvcc/backend"
  31. "go.etcd.io/etcd/mvcc/mvccpb"
  32. "go.etcd.io/etcd/pkg/schedule"
  33. "go.etcd.io/etcd/pkg/testutil"
  34. "go.uber.org/zap"
  35. )
  36. func TestStoreRev(t *testing.T) {
  37. b, tmpPath := backend.NewDefaultTmpBackend()
  38. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  39. defer s.Close()
  40. defer os.Remove(tmpPath)
  41. for i := 1; i <= 3; i++ {
  42. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  43. if r := s.Rev(); r != int64(i+1) {
  44. t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
  45. }
  46. }
  47. }
  48. func TestStorePut(t *testing.T) {
  49. kv := mvccpb.KeyValue{
  50. Key: []byte("foo"),
  51. Value: []byte("bar"),
  52. CreateRevision: 1,
  53. ModRevision: 2,
  54. Version: 1,
  55. }
  56. kvb, err := kv.Marshal()
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. tests := []struct {
  61. rev revision
  62. r indexGetResp
  63. rr *rangeResp
  64. wrev revision
  65. wkey []byte
  66. wkv mvccpb.KeyValue
  67. wputrev revision
  68. }{
  69. {
  70. revision{1, 0},
  71. indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
  72. nil,
  73. revision{2, 0},
  74. newTestKeyBytes(revision{2, 0}, false),
  75. mvccpb.KeyValue{
  76. Key: []byte("foo"),
  77. Value: []byte("bar"),
  78. CreateRevision: 2,
  79. ModRevision: 2,
  80. Version: 1,
  81. Lease: 1,
  82. },
  83. revision{2, 0},
  84. },
  85. {
  86. revision{1, 1},
  87. indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
  88. &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
  89. revision{2, 0},
  90. newTestKeyBytes(revision{2, 0}, false),
  91. mvccpb.KeyValue{
  92. Key: []byte("foo"),
  93. Value: []byte("bar"),
  94. CreateRevision: 2,
  95. ModRevision: 2,
  96. Version: 2,
  97. Lease: 2,
  98. },
  99. revision{2, 0},
  100. },
  101. {
  102. revision{2, 0},
  103. indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
  104. &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
  105. revision{3, 0},
  106. newTestKeyBytes(revision{3, 0}, false),
  107. mvccpb.KeyValue{
  108. Key: []byte("foo"),
  109. Value: []byte("bar"),
  110. CreateRevision: 2,
  111. ModRevision: 3,
  112. Version: 3,
  113. Lease: 3,
  114. },
  115. revision{3, 0},
  116. },
  117. }
  118. for i, tt := range tests {
  119. s := newFakeStore()
  120. b := s.b.(*fakeBackend)
  121. fi := s.kvindex.(*fakeIndex)
  122. s.currentRev = tt.rev.main
  123. fi.indexGetRespc <- tt.r
  124. if tt.rr != nil {
  125. b.tx.rangeRespc <- *tt.rr
  126. }
  127. s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
  128. data, err := tt.wkv.Marshal()
  129. if err != nil {
  130. t.Errorf("#%d: marshal err = %v, want nil", i, err)
  131. }
  132. wact := []testutil.Action{
  133. {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
  134. }
  135. if tt.rr != nil {
  136. wact = []testutil.Action{
  137. {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
  138. }
  139. }
  140. if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
  141. t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
  142. }
  143. wact = []testutil.Action{
  144. {Name: "get", Params: []interface{}{[]byte("foo"), tt.wputrev.main}},
  145. {Name: "put", Params: []interface{}{[]byte("foo"), tt.wputrev}},
  146. }
  147. if g := fi.Action(); !reflect.DeepEqual(g, wact) {
  148. t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
  149. }
  150. if s.currentRev != tt.wrev.main {
  151. t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
  152. }
  153. s.Close()
  154. }
  155. }
  156. func TestStoreRange(t *testing.T) {
  157. key := newTestKeyBytes(revision{2, 0}, false)
  158. kv := mvccpb.KeyValue{
  159. Key: []byte("foo"),
  160. Value: []byte("bar"),
  161. CreateRevision: 1,
  162. ModRevision: 2,
  163. Version: 1,
  164. }
  165. kvb, err := kv.Marshal()
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. wrev := int64(2)
  170. tests := []struct {
  171. idxr indexRangeResp
  172. r rangeResp
  173. }{
  174. {
  175. indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
  176. rangeResp{[][]byte{key}, [][]byte{kvb}},
  177. },
  178. {
  179. indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
  180. rangeResp{[][]byte{key}, [][]byte{kvb}},
  181. },
  182. }
  183. ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
  184. for i, tt := range tests {
  185. s := newFakeStore()
  186. b := s.b.(*fakeBackend)
  187. fi := s.kvindex.(*fakeIndex)
  188. s.currentRev = 2
  189. b.tx.rangeRespc <- tt.r
  190. fi.indexRangeRespc <- tt.idxr
  191. ret, err := s.Range([]byte("foo"), []byte("goo"), ro)
  192. if err != nil {
  193. t.Errorf("#%d: err = %v, want nil", i, err)
  194. }
  195. if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
  196. t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
  197. }
  198. if ret.Rev != wrev {
  199. t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
  200. }
  201. wstart := newRevBytes()
  202. revToBytes(tt.idxr.revs[0], wstart)
  203. wact := []testutil.Action{
  204. {Name: "range", Params: []interface{}{keyBucketName, wstart, []byte(nil), int64(0)}},
  205. }
  206. if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
  207. t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
  208. }
  209. wact = []testutil.Action{
  210. {Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), wrev}},
  211. }
  212. if g := fi.Action(); !reflect.DeepEqual(g, wact) {
  213. t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
  214. }
  215. if s.currentRev != 2 {
  216. t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
  217. }
  218. s.Close()
  219. }
  220. }
  221. func TestStoreDeleteRange(t *testing.T) {
  222. key := newTestKeyBytes(revision{2, 0}, false)
  223. kv := mvccpb.KeyValue{
  224. Key: []byte("foo"),
  225. Value: []byte("bar"),
  226. CreateRevision: 1,
  227. ModRevision: 2,
  228. Version: 1,
  229. }
  230. kvb, err := kv.Marshal()
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. tests := []struct {
  235. rev revision
  236. r indexRangeResp
  237. rr rangeResp
  238. wkey []byte
  239. wrev revision
  240. wrrev int64
  241. wdelrev revision
  242. }{
  243. {
  244. revision{2, 0},
  245. indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
  246. rangeResp{[][]byte{key}, [][]byte{kvb}},
  247. newTestKeyBytes(revision{3, 0}, true),
  248. revision{3, 0},
  249. 2,
  250. revision{3, 0},
  251. },
  252. }
  253. for i, tt := range tests {
  254. s := newFakeStore()
  255. b := s.b.(*fakeBackend)
  256. fi := s.kvindex.(*fakeIndex)
  257. s.currentRev = tt.rev.main
  258. fi.indexRangeRespc <- tt.r
  259. b.tx.rangeRespc <- tt.rr
  260. n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
  261. if n != 1 {
  262. t.Errorf("#%d: n = %d, want 1", i, n)
  263. }
  264. data, err := (&mvccpb.KeyValue{
  265. Key: []byte("foo"),
  266. }).Marshal()
  267. if err != nil {
  268. t.Errorf("#%d: marshal err = %v, want nil", i, err)
  269. }
  270. wact := []testutil.Action{
  271. {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
  272. }
  273. if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
  274. t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
  275. }
  276. wact = []testutil.Action{
  277. {Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
  278. {Name: "tombstone", Params: []interface{}{[]byte("foo"), tt.wdelrev}},
  279. }
  280. if g := fi.Action(); !reflect.DeepEqual(g, wact) {
  281. t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
  282. }
  283. if s.currentRev != tt.wrev.main {
  284. t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
  285. }
  286. }
  287. }
  288. func TestStoreCompact(t *testing.T) {
  289. s := newFakeStore()
  290. defer s.Close()
  291. b := s.b.(*fakeBackend)
  292. fi := s.kvindex.(*fakeIndex)
  293. s.currentRev = 3
  294. fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
  295. key1 := newTestKeyBytes(revision{1, 0}, false)
  296. key2 := newTestKeyBytes(revision{2, 0}, false)
  297. b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
  298. s.Compact(3)
  299. s.fifoSched.WaitFinish(1)
  300. if s.compactMainRev != 3 {
  301. t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
  302. }
  303. end := make([]byte, 8)
  304. binary.BigEndian.PutUint64(end, uint64(4))
  305. wact := []testutil.Action{
  306. {Name: "put", Params: []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
  307. {Name: "range", Params: []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
  308. {Name: "delete", Params: []interface{}{keyBucketName, key2}},
  309. {Name: "put", Params: []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
  310. }
  311. if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
  312. t.Errorf("tx actions = %+v, want %+v", g, wact)
  313. }
  314. wact = []testutil.Action{
  315. {Name: "compact", Params: []interface{}{int64(3)}},
  316. }
  317. if g := fi.Action(); !reflect.DeepEqual(g, wact) {
  318. t.Errorf("index action = %+v, want %+v", g, wact)
  319. }
  320. }
  321. func TestStoreRestore(t *testing.T) {
  322. s := newFakeStore()
  323. b := s.b.(*fakeBackend)
  324. fi := s.kvindex.(*fakeIndex)
  325. putkey := newTestKeyBytes(revision{3, 0}, false)
  326. putkv := mvccpb.KeyValue{
  327. Key: []byte("foo"),
  328. Value: []byte("bar"),
  329. CreateRevision: 4,
  330. ModRevision: 4,
  331. Version: 1,
  332. }
  333. putkvb, err := putkv.Marshal()
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. delkey := newTestKeyBytes(revision{5, 0}, true)
  338. delkv := mvccpb.KeyValue{
  339. Key: []byte("foo"),
  340. }
  341. delkvb, err := delkv.Marshal()
  342. if err != nil {
  343. t.Fatal(err)
  344. }
  345. b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
  346. b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
  347. b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
  348. b.tx.rangeRespc <- rangeResp{nil, nil}
  349. s.restore()
  350. if s.compactMainRev != 3 {
  351. t.Errorf("compact rev = %d, want 5", s.compactMainRev)
  352. }
  353. if s.currentRev != 5 {
  354. t.Errorf("current rev = %v, want 5", s.currentRev)
  355. }
  356. wact := []testutil.Action{
  357. {Name: "range", Params: []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
  358. {Name: "range", Params: []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
  359. {Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
  360. }
  361. if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
  362. t.Errorf("tx actions = %+v, want %+v", g, wact)
  363. }
  364. gens := []generation{
  365. {created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
  366. {created: revision{0, 0}, ver: 0, revs: nil},
  367. }
  368. ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
  369. wact = []testutil.Action{
  370. {Name: "keyIndex", Params: []interface{}{ki}},
  371. {Name: "insert", Params: []interface{}{ki}},
  372. }
  373. if g := fi.Action(); !reflect.DeepEqual(g, wact) {
  374. t.Errorf("index action = %+v, want %+v", g, wact)
  375. }
  376. }
  377. func TestRestoreDelete(t *testing.T) {
  378. oldChunk := restoreChunkKeys
  379. restoreChunkKeys = mrand.Intn(3) + 2
  380. defer func() { restoreChunkKeys = oldChunk }()
  381. b, tmpPath := backend.NewDefaultTmpBackend()
  382. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  383. defer os.Remove(tmpPath)
  384. keys := make(map[string]struct{})
  385. for i := 0; i < 20; i++ {
  386. ks := fmt.Sprintf("foo-%d", i)
  387. k := []byte(ks)
  388. s.Put(k, []byte("bar"), lease.NoLease)
  389. keys[ks] = struct{}{}
  390. switch mrand.Intn(3) {
  391. case 0:
  392. // put random key from past via random range on map
  393. ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1))
  394. s.Put([]byte(ks), []byte("baz"), lease.NoLease)
  395. keys[ks] = struct{}{}
  396. case 1:
  397. // delete random key via random range on map
  398. for k := range keys {
  399. s.DeleteRange([]byte(k), nil)
  400. delete(keys, k)
  401. break
  402. }
  403. }
  404. }
  405. s.Close()
  406. s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  407. defer s.Close()
  408. for i := 0; i < 20; i++ {
  409. ks := fmt.Sprintf("foo-%d", i)
  410. r, err := s.Range([]byte(ks), nil, RangeOptions{})
  411. if err != nil {
  412. t.Fatal(err)
  413. }
  414. if _, ok := keys[ks]; ok {
  415. if len(r.KVs) == 0 {
  416. t.Errorf("#%d: expected %q, got deleted", i, ks)
  417. }
  418. } else if len(r.KVs) != 0 {
  419. t.Errorf("#%d: expected deleted, got %q", i, ks)
  420. }
  421. }
  422. }
  423. func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
  424. tests := []string{"recreate", "restore"}
  425. for _, test := range tests {
  426. b, tmpPath := backend.NewDefaultTmpBackend()
  427. s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  428. defer os.Remove(tmpPath)
  429. s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  430. s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
  431. s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
  432. // write scheduled compaction, but not do compaction
  433. rbytes := newRevBytes()
  434. revToBytes(revision{main: 2}, rbytes)
  435. tx := s0.b.BatchTx()
  436. tx.Lock()
  437. tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
  438. tx.Unlock()
  439. s0.Close()
  440. var s *store
  441. switch test {
  442. case "recreate":
  443. s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  444. case "restore":
  445. s0.Restore(b)
  446. s = s0
  447. }
  448. // wait for scheduled compaction to be finished
  449. time.Sleep(100 * time.Millisecond)
  450. if _, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
  451. t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
  452. }
  453. // check the key in backend is deleted
  454. revbytes := newRevBytes()
  455. revToBytes(revision{main: 1}, revbytes)
  456. // The disk compaction is done asynchronously and requires more time on slow disk.
  457. // try 5 times for CI with slow IO.
  458. for i := 0; i < 5; i++ {
  459. tx = s.b.BatchTx()
  460. tx.Lock()
  461. ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  462. tx.Unlock()
  463. if len(ks) != 0 {
  464. time.Sleep(100 * time.Millisecond)
  465. continue
  466. }
  467. return
  468. }
  469. t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
  470. }
  471. }
  472. type hashKVResult struct {
  473. hash uint32
  474. compactRev int64
  475. }
  476. // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
  477. func TestHashKVWhenCompacting(t *testing.T) {
  478. b, tmpPath := backend.NewDefaultTmpBackend()
  479. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  480. defer os.Remove(tmpPath)
  481. rev := 10000
  482. for i := 2; i <= rev; i++ {
  483. s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
  484. }
  485. hashCompactc := make(chan hashKVResult, 1)
  486. donec := make(chan struct{})
  487. var wg sync.WaitGroup
  488. for i := 0; i < 10; i++ {
  489. wg.Add(1)
  490. go func() {
  491. defer wg.Done()
  492. for {
  493. hash, _, compactRev, err := s.HashByRev(int64(rev))
  494. if err != nil {
  495. t.Error(err)
  496. }
  497. select {
  498. case <-donec:
  499. return
  500. case hashCompactc <- hashKVResult{hash, compactRev}:
  501. }
  502. }
  503. }()
  504. }
  505. go func() {
  506. defer close(donec)
  507. revHash := make(map[int64]uint32)
  508. for round := 0; round < 1000; round++ {
  509. r := <-hashCompactc
  510. if revHash[r.compactRev] == 0 {
  511. revHash[r.compactRev] = r.hash
  512. }
  513. if r.hash != revHash[r.compactRev] {
  514. t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
  515. }
  516. }
  517. }()
  518. wg.Add(1)
  519. go func() {
  520. defer wg.Done()
  521. for i := 100; i >= 0; i-- {
  522. _, err := s.Compact(int64(rev - 1 - i))
  523. if err != nil {
  524. t.Error(err)
  525. }
  526. time.Sleep(10 * time.Millisecond)
  527. }
  528. }()
  529. select {
  530. case <-donec:
  531. wg.Wait()
  532. case <-time.After(10 * time.Second):
  533. testutil.FatalStack(t, "timeout")
  534. }
  535. }
  536. // TestHashKVZeroRevision ensures that "HashByRev(0)" computes
  537. // correct hash value with latest revision.
  538. func TestHashKVZeroRevision(t *testing.T) {
  539. b, tmpPath := backend.NewDefaultTmpBackend()
  540. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  541. defer os.Remove(tmpPath)
  542. rev := 1000
  543. for i := 2; i <= rev; i++ {
  544. s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
  545. }
  546. if _, err := s.Compact(int64(rev / 2)); err != nil {
  547. t.Fatal(err)
  548. }
  549. hash1, _, _, err := s.HashByRev(int64(rev))
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. var hash2 uint32
  554. hash2, _, _, err = s.HashByRev(0)
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. if hash1 != hash2 {
  559. t.Errorf("hash %d (rev %d) != hash %d (rev 0)", hash1, rev, hash2)
  560. }
  561. }
  562. func TestTxnPut(t *testing.T) {
  563. // assign arbitrary size
  564. bytesN := 30
  565. sliceN := 100
  566. keys := createBytesSlice(bytesN, sliceN)
  567. vals := createBytesSlice(bytesN, sliceN)
  568. b, tmpPath := backend.NewDefaultTmpBackend()
  569. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  570. defer cleanup(s, b, tmpPath)
  571. for i := 0; i < sliceN; i++ {
  572. txn := s.Write()
  573. base := int64(i + 2)
  574. if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
  575. t.Errorf("#%d: rev = %d, want %d", i, rev, base)
  576. }
  577. txn.End()
  578. }
  579. }
  580. // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
  581. func TestConcurrentReadNotBlockingWrite(t *testing.T) {
  582. b, tmpPath := backend.NewDefaultTmpBackend()
  583. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  584. defer os.Remove(tmpPath)
  585. // write something to read later
  586. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  587. // readTx simulates a long read request
  588. readTx1 := s.Read()
  589. // write should not be blocked by reads
  590. done := make(chan struct{})
  591. go func() {
  592. s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn
  593. done <- struct{}{}
  594. }()
  595. select {
  596. case <-done:
  597. case <-time.After(1 * time.Second):
  598. t.Fatalf("write should not be blocked by read")
  599. }
  600. // readTx2 simulates a short read request
  601. readTx2 := s.Read()
  602. ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
  603. ret, err := readTx2.Range([]byte("foo"), nil, ro)
  604. if err != nil {
  605. t.Fatalf("failed to range: %v", err)
  606. }
  607. // readTx2 should see the result of new write
  608. w := mvccpb.KeyValue{
  609. Key: []byte("foo"),
  610. Value: []byte("newBar"),
  611. CreateRevision: 2,
  612. ModRevision: 3,
  613. Version: 2,
  614. }
  615. if !reflect.DeepEqual(ret.KVs[0], w) {
  616. t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
  617. }
  618. readTx2.End()
  619. ret, err = readTx1.Range([]byte("foo"), nil, ro)
  620. if err != nil {
  621. t.Fatalf("failed to range: %v", err)
  622. }
  623. // readTx1 should not see the result of new write
  624. w = mvccpb.KeyValue{
  625. Key: []byte("foo"),
  626. Value: []byte("bar"),
  627. CreateRevision: 2,
  628. ModRevision: 2,
  629. Version: 1,
  630. }
  631. if !reflect.DeepEqual(ret.KVs[0], w) {
  632. t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
  633. }
  634. readTx1.End()
  635. }
  636. // TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes
  637. func TestConcurrentReadTxAndWrite(t *testing.T) {
  638. var (
  639. numOfReads = 100
  640. numOfWrites = 100
  641. maxNumOfPutsPerWrite = 10
  642. committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns
  643. mu sync.Mutex // mu protectes committedKVs
  644. )
  645. b, tmpPath := backend.NewDefaultTmpBackend()
  646. s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
  647. defer os.Remove(tmpPath)
  648. var wg sync.WaitGroup
  649. wg.Add(numOfWrites)
  650. for i := 0; i < numOfWrites; i++ {
  651. go func() {
  652. defer wg.Done()
  653. time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
  654. tx := s.Write()
  655. numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
  656. var pendingKvs kvs
  657. for j := 0; j < numOfPuts; j++ {
  658. k := []byte(strconv.Itoa(mrand.Int()))
  659. v := []byte(strconv.Itoa(mrand.Int()))
  660. tx.Put(k, v, lease.NoLease)
  661. pendingKvs = append(pendingKvs, kv{k, v})
  662. }
  663. // reads should not see above Puts until write is finished
  664. mu.Lock()
  665. committedKVs = merge(committedKVs, pendingKvs) // update shared data structure
  666. tx.End()
  667. mu.Unlock()
  668. }()
  669. }
  670. wg.Add(numOfReads)
  671. for i := 0; i < numOfReads; i++ {
  672. go func() {
  673. defer wg.Done()
  674. time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
  675. mu.Lock()
  676. wKVs := make(kvs, len(committedKVs))
  677. copy(wKVs, committedKVs)
  678. tx := s.Read()
  679. mu.Unlock()
  680. // get all keys in backend store, and compare with wKVs
  681. ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
  682. tx.End()
  683. if err != nil {
  684. t.Errorf("failed to range keys: %v", err)
  685. return
  686. }
  687. if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet
  688. return
  689. }
  690. var result kvs
  691. for _, keyValue := range ret.KVs {
  692. result = append(result, kv{keyValue.Key, keyValue.Value})
  693. }
  694. if !reflect.DeepEqual(wKVs, result) {
  695. t.Errorf("unexpected range result") // too many key value pairs, skip printing them
  696. }
  697. }()
  698. }
  699. // wait until go routines finish or timeout
  700. doneC := make(chan struct{})
  701. go func() {
  702. wg.Wait()
  703. close(doneC)
  704. }()
  705. select {
  706. case <-doneC:
  707. case <-time.After(5 * time.Minute):
  708. testutil.FatalStack(t, "timeout")
  709. }
  710. }
  711. type kv struct {
  712. key []byte
  713. val []byte
  714. }
  715. type kvs []kv
  716. func (kvs kvs) Len() int { return len(kvs) }
  717. func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
  718. func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] }
  719. func merge(dst, src kvs) kvs {
  720. dst = append(dst, src...)
  721. sort.Stable(dst)
  722. // remove duplicates, using only the newest value
  723. // ref: tx_buffer.go
  724. widx := 0
  725. for ridx := 1; ridx < len(dst); ridx++ {
  726. if !bytes.Equal(dst[widx].key, dst[ridx].key) {
  727. widx++
  728. }
  729. dst[widx] = dst[ridx]
  730. }
  731. return dst[:widx+1]
  732. }
  733. // TODO: test attach key to lessor
  734. func newTestRevBytes(rev revision) []byte {
  735. bytes := newRevBytes()
  736. revToBytes(rev, bytes)
  737. return bytes
  738. }
  739. func newTestKeyBytes(rev revision, tombstone bool) []byte {
  740. bytes := newRevBytes()
  741. revToBytes(rev, bytes)
  742. if tombstone {
  743. bytes = appendMarkTombstone(zap.NewExample(), bytes)
  744. }
  745. return bytes
  746. }
  747. func newFakeStore() *store {
  748. b := &fakeBackend{&fakeBatchTx{
  749. Recorder: &testutil.RecorderBuffered{},
  750. rangeRespc: make(chan rangeResp, 5)}}
  751. fi := &fakeIndex{
  752. Recorder: &testutil.RecorderBuffered{},
  753. indexGetRespc: make(chan indexGetResp, 1),
  754. indexRangeRespc: make(chan indexRangeResp, 1),
  755. indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
  756. indexCompactRespc: make(chan map[revision]struct{}, 1),
  757. }
  758. s := &store{
  759. b: b,
  760. le: &lease.FakeLessor{},
  761. kvindex: fi,
  762. currentRev: 0,
  763. compactMainRev: -1,
  764. fifoSched: schedule.NewFIFOScheduler(),
  765. stopc: make(chan struct{}),
  766. lg: zap.NewExample(),
  767. }
  768. s.ReadView, s.WriteView = &readView{s}, &writeView{s}
  769. return s
  770. }
  771. type rangeResp struct {
  772. keys [][]byte
  773. vals [][]byte
  774. }
  775. type fakeBatchTx struct {
  776. testutil.Recorder
  777. rangeRespc chan rangeResp
  778. }
  779. func (b *fakeBatchTx) Lock() {}
  780. func (b *fakeBatchTx) Unlock() {}
  781. func (b *fakeBatchTx) RLock() {}
  782. func (b *fakeBatchTx) RUnlock() {}
  783. func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
  784. func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
  785. b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
  786. }
  787. func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
  788. b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}})
  789. }
  790. func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
  791. b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
  792. r := <-b.rangeRespc
  793. return r.keys, r.vals
  794. }
  795. func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) {
  796. b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}})
  797. }
  798. func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  799. return nil
  800. }
  801. func (b *fakeBatchTx) Commit() {}
  802. func (b *fakeBatchTx) CommitAndStop() {}
  803. type fakeBackend struct {
  804. tx *fakeBatchTx
  805. }
  806. func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
  807. func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
  808. func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
  809. func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
  810. func (b *fakeBackend) Size() int64 { return 0 }
  811. func (b *fakeBackend) SizeInUse() int64 { return 0 }
  812. func (b *fakeBackend) OpenReadTxN() int64 { return 0 }
  813. func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
  814. func (b *fakeBackend) ForceCommit() {}
  815. func (b *fakeBackend) Defrag() error { return nil }
  816. func (b *fakeBackend) Close() error { return nil }
  817. type indexGetResp struct {
  818. rev revision
  819. created revision
  820. ver int64
  821. err error
  822. }
  823. type indexRangeResp struct {
  824. keys [][]byte
  825. revs []revision
  826. }
  827. type indexRangeEventsResp struct {
  828. revs []revision
  829. }
  830. type fakeIndex struct {
  831. testutil.Recorder
  832. indexGetRespc chan indexGetResp
  833. indexRangeRespc chan indexRangeResp
  834. indexRangeEventsRespc chan indexRangeEventsResp
  835. indexCompactRespc chan map[revision]struct{}
  836. }
  837. func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision {
  838. _, rev := i.Range(key, end, atRev)
  839. return rev
  840. }
  841. func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
  842. i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
  843. r := <-i.indexGetRespc
  844. return r.rev, r.created, r.ver, r.err
  845. }
  846. func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
  847. i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
  848. r := <-i.indexRangeRespc
  849. return r.keys, r.revs
  850. }
  851. func (i *fakeIndex) Put(key []byte, rev revision) {
  852. i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
  853. }
  854. func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
  855. i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
  856. return nil
  857. }
  858. func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
  859. i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
  860. r := <-i.indexRangeEventsRespc
  861. return r.revs
  862. }
  863. func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
  864. i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
  865. return <-i.indexCompactRespc
  866. }
  867. func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
  868. i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
  869. return <-i.indexCompactRespc
  870. }
  871. func (i *fakeIndex) Equal(b index) bool { return false }
  872. func (i *fakeIndex) Insert(ki *keyIndex) {
  873. i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
  874. }
  875. func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
  876. i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
  877. return nil
  878. }
  879. func createBytesSlice(bytesN, sliceN int) [][]byte {
  880. rs := [][]byte{}
  881. for len(rs) != sliceN {
  882. v := make([]byte, bytesN)
  883. if _, err := rand.Read(v); err != nil {
  884. panic(err)
  885. }
  886. rs = append(rs, v)
  887. }
  888. return rs
  889. }