kvstore_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. package storage
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "math"
  6. "os"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/coreos/etcd/storage/storagepb"
  11. )
  12. func TestRange(t *testing.T) {
  13. s := newStore("test")
  14. defer os.Remove("test")
  15. s.Put([]byte("foo"), []byte("bar"))
  16. s.Put([]byte("foo1"), []byte("bar1"))
  17. s.Put([]byte("foo2"), []byte("bar2"))
  18. kvs := []storagepb.KeyValue{
  19. {Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
  20. {Key: []byte("foo1"), Value: []byte("bar1"), CreateIndex: 2, ModIndex: 2, Version: 1},
  21. {Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
  22. }
  23. tests := []struct {
  24. key, end []byte
  25. rev int64
  26. wrev int64
  27. wkvs []storagepb.KeyValue
  28. }{
  29. {
  30. []byte("foo"), []byte("foo3"), 0,
  31. 3, kvs,
  32. },
  33. {
  34. []byte("foo"), []byte("foo1"), 0,
  35. 3, kvs[:1],
  36. },
  37. {
  38. []byte("foo"), []byte("foo3"), 1,
  39. 1, kvs[:1],
  40. },
  41. {
  42. []byte("foo"), []byte("foo3"), 2,
  43. 2, kvs[:2],
  44. },
  45. }
  46. for i, tt := range tests {
  47. kvs, rev, err := s.Range(tt.key, tt.end, 0, tt.rev)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. if rev != tt.wrev {
  52. t.Errorf("#%d: rev = %d, want %d", i, tt.rev, tt.wrev)
  53. }
  54. if !reflect.DeepEqual(kvs, tt.wkvs) {
  55. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  56. }
  57. }
  58. }
  59. func TestRangeBadRev(t *testing.T) {
  60. s := newStore("test")
  61. defer os.Remove("test")
  62. s.Put([]byte("foo"), []byte("bar"))
  63. s.Put([]byte("foo1"), []byte("bar1"))
  64. s.Put([]byte("foo2"), []byte("bar2"))
  65. if err := s.Compact(3); err != nil {
  66. t.Fatalf("compact error (%v)", err)
  67. }
  68. tests := []struct {
  69. rev int64
  70. werr error
  71. }{
  72. {2, ErrCompacted},
  73. {3, ErrCompacted},
  74. {4, ErrFutureRev},
  75. }
  76. for i, tt := range tests {
  77. _, _, err := s.Range([]byte("foo"), []byte("foo3"), 0, tt.rev)
  78. if err != tt.werr {
  79. t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
  80. }
  81. }
  82. }
  83. func TestRangeLimit(t *testing.T) {
  84. s := newStore("test")
  85. defer os.Remove("test")
  86. s.Put([]byte("foo"), []byte("bar"))
  87. s.Put([]byte("foo1"), []byte("bar1"))
  88. s.Put([]byte("foo2"), []byte("bar2"))
  89. s.DeleteRange([]byte("foo1"), nil)
  90. kvs := []storagepb.KeyValue{
  91. {Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
  92. {Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
  93. }
  94. tests := []struct {
  95. limit int64
  96. wkvs []storagepb.KeyValue
  97. }{
  98. // no limit
  99. {0, kvs},
  100. {1, kvs[:1]},
  101. {2, kvs},
  102. {3, kvs},
  103. }
  104. for i, tt := range tests {
  105. kvs, _, err := s.Range([]byte("foo"), []byte("foo3"), tt.limit, 0)
  106. if err != nil {
  107. t.Fatalf("#%d: range error (%v)", i, err)
  108. }
  109. if !reflect.DeepEqual(kvs, tt.wkvs) {
  110. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  111. }
  112. }
  113. }
  114. func TestSimpleDeleteRange(t *testing.T) {
  115. tests := []struct {
  116. key, end []byte
  117. wrev int64
  118. wN int64
  119. }{
  120. {
  121. []byte("foo"), []byte("foo1"),
  122. 4, 1,
  123. },
  124. {
  125. []byte("foo"), []byte("foo2"),
  126. 4, 2,
  127. },
  128. {
  129. []byte("foo"), []byte("foo3"),
  130. 4, 3,
  131. },
  132. {
  133. []byte("foo3"), []byte("foo8"),
  134. 3, 0,
  135. },
  136. }
  137. for i, tt := range tests {
  138. s := newStore("test")
  139. s.Put([]byte("foo"), []byte("bar"))
  140. s.Put([]byte("foo1"), []byte("bar1"))
  141. s.Put([]byte("foo2"), []byte("bar2"))
  142. n, rev := s.DeleteRange(tt.key, tt.end)
  143. if n != tt.wN {
  144. t.Errorf("#%d: n = %d, want %d", i, n, tt.wN)
  145. }
  146. if rev != tt.wrev {
  147. t.Errorf("#%d: rev = %d, wang %d", i, rev, tt.wrev)
  148. }
  149. os.Remove("test")
  150. }
  151. }
  152. func TestRangeInSequence(t *testing.T) {
  153. s := newStore("test")
  154. defer os.Remove("test")
  155. s.Put([]byte("foo"), []byte("bar"))
  156. s.Put([]byte("foo1"), []byte("bar1"))
  157. s.Put([]byte("foo2"), []byte("bar2"))
  158. // remove foo
  159. n, rev := s.DeleteRange([]byte("foo"), nil)
  160. if n != 1 || rev != 4 {
  161. t.Fatalf("n = %d, index = %d, want (%d, %d)", n, rev, 1, 4)
  162. }
  163. // before removal foo
  164. kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 3)
  165. if err != nil {
  166. t.Fatal(err)
  167. }
  168. if len(kvs) != 3 {
  169. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3)
  170. }
  171. // after removal foo
  172. kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 4)
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. if len(kvs) != 2 {
  177. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
  178. }
  179. // remove again -> expect nothing
  180. n, rev = s.DeleteRange([]byte("foo"), nil)
  181. if n != 0 || rev != 4 {
  182. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 0, 4)
  183. }
  184. // remove foo1
  185. n, rev = s.DeleteRange([]byte("foo"), []byte("foo2"))
  186. if n != 1 || rev != 5 {
  187. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 5)
  188. }
  189. // after removal foo1
  190. kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 5)
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. if len(kvs) != 1 {
  195. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
  196. }
  197. // remove foo2
  198. n, rev = s.DeleteRange([]byte("foo2"), []byte("foo3"))
  199. if n != 1 || rev != 6 {
  200. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 6)
  201. }
  202. // after removal foo2
  203. kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 6)
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. if len(kvs) != 0 {
  208. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
  209. }
  210. }
  211. func TestOneTnx(t *testing.T) {
  212. s := newStore("test")
  213. defer os.Remove("test")
  214. id := s.TnxBegin()
  215. for i := 0; i < 3; i++ {
  216. s.TnxPut(id, []byte("foo"), []byte("bar"))
  217. s.TnxPut(id, []byte("foo1"), []byte("bar1"))
  218. s.TnxPut(id, []byte("foo2"), []byte("bar2"))
  219. // remove foo
  220. n, rev, err := s.TnxDeleteRange(id, []byte("foo"), nil)
  221. if err != nil {
  222. t.Fatal(err)
  223. }
  224. if n != 1 || rev != 1 {
  225. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1)
  226. }
  227. kvs, rev, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
  228. if err != nil {
  229. t.Fatal(err)
  230. }
  231. if len(kvs) != 2 {
  232. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
  233. }
  234. // remove again -> expect nothing
  235. n, rev, err = s.TnxDeleteRange(id, []byte("foo"), nil)
  236. if err != nil {
  237. t.Fatal(err)
  238. }
  239. if n != 0 || rev != 1 {
  240. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 0, 1)
  241. }
  242. // remove foo1
  243. n, rev, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2"))
  244. if err != nil {
  245. t.Fatal(err)
  246. }
  247. if n != 1 || rev != 1 {
  248. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1)
  249. }
  250. // after removal foo1
  251. kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
  252. if err != nil {
  253. t.Fatal(err)
  254. }
  255. if len(kvs) != 1 {
  256. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
  257. }
  258. // remove foo2
  259. n, rev, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3"))
  260. if err != nil {
  261. t.Fatal(err)
  262. }
  263. if n != 1 || rev != 1 {
  264. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1)
  265. }
  266. // after removal foo2
  267. kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. if len(kvs) != 0 {
  272. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
  273. }
  274. }
  275. err := s.TnxEnd(id)
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. // After tnx
  280. kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. if len(kvs) != 0 {
  285. t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
  286. }
  287. if rev != 1 {
  288. t.Fatalf("rev = %d, want %d", rev, 1)
  289. }
  290. }
  291. func TestCompaction(t *testing.T) {
  292. s := newStore("test")
  293. defer os.Remove("test")
  294. s.Put([]byte("foo"), []byte("bar"))
  295. s.Put([]byte("foo1"), []byte("bar1"))
  296. s.Put([]byte("foo2"), []byte("bar2"))
  297. s.Put([]byte("foo"), []byte("bar11"))
  298. s.Put([]byte("foo1"), []byte("bar12"))
  299. s.Put([]byte("foo2"), []byte("bar13"))
  300. s.Put([]byte("foo1"), []byte("bar14"))
  301. s.DeleteRange([]byte("foo"), []byte("foo200"))
  302. s.Put([]byte("foo4"), []byte("bar4"))
  303. err := s.Compact(4)
  304. if err != nil {
  305. t.Errorf("unexpect compact error %v", err)
  306. }
  307. err = s.Compact(4)
  308. if err != ErrCompacted {
  309. t.Errorf("err = %v, want %v", err, ErrCompacted)
  310. }
  311. _, _, err = s.Range([]byte("foo"), nil, 0, 4)
  312. if err != ErrCompacted {
  313. t.Errorf("err = %v, want %v", err, ErrCompacted)
  314. }
  315. // compact should not compact the last value of foo
  316. kvs, rev, err := s.Range([]byte("foo"), nil, 0, 5)
  317. if err != nil {
  318. t.Errorf("unexpected range error %v", err)
  319. }
  320. if !bytes.Equal(kvs[0].Value, []byte("bar11")) {
  321. t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar11")
  322. }
  323. if rev != 5 {
  324. t.Errorf("rev = %d, want %d", rev, 5)
  325. }
  326. // compact everything
  327. err = s.Compact(8)
  328. if err != nil {
  329. t.Errorf("unexpect compact error %v", err)
  330. }
  331. kvs, rev, err = s.Range([]byte("foo"), []byte("fop"), 0, 0)
  332. if err != nil {
  333. t.Errorf("unexpected range error %v", err)
  334. }
  335. if len(kvs) != 1 {
  336. t.Errorf("len(kvs) = %d, want %d", len(kvs), 1)
  337. }
  338. if !bytes.Equal(kvs[0].Value, []byte("bar4")) {
  339. t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar4")
  340. }
  341. if rev != 9 {
  342. t.Errorf("rev = %d, want %d", rev, 9)
  343. }
  344. }
  345. func TestRestore(t *testing.T) {
  346. s0 := newStore("test")
  347. defer os.Remove("test")
  348. s0.Put([]byte("foo"), []byte("bar"))
  349. s0.Put([]byte("foo1"), []byte("bar1"))
  350. s0.Put([]byte("foo2"), []byte("bar2"))
  351. s0.Put([]byte("foo"), []byte("bar11"))
  352. s0.Put([]byte("foo1"), []byte("bar12"))
  353. s0.Put([]byte("foo2"), []byte("bar13"))
  354. s0.Put([]byte("foo1"), []byte("bar14"))
  355. s0.Put([]byte("foo3"), []byte("bar3"))
  356. s0.DeleteRange([]byte("foo3"), nil)
  357. s0.Put([]byte("foo3"), []byte("bar31"))
  358. s0.DeleteRange([]byte("foo3"), nil)
  359. mink := newRevBytes()
  360. revToBytes(reversion{main: 0, sub: 0}, mink)
  361. maxk := newRevBytes()
  362. revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, maxk)
  363. s0kvs, _, err := s0.rangeKeys(mink, maxk, 0, 0)
  364. if err != nil {
  365. t.Fatalf("rangeKeys on s0 error (%v)", err)
  366. }
  367. s0.Close()
  368. s1 := newStore("test")
  369. s1.Restore()
  370. if !s0.Equal(s1) {
  371. t.Errorf("not equal!")
  372. }
  373. s1kvs, _, err := s1.rangeKeys(mink, maxk, 0, 0)
  374. if err != nil {
  375. t.Fatalf("rangeKeys on s1 error (%v)", err)
  376. }
  377. if !reflect.DeepEqual(s1kvs, s0kvs) {
  378. t.Errorf("s1kvs = %+v, want %+v", s1kvs, s0kvs)
  379. }
  380. }
  381. func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
  382. s0 := newStore("test")
  383. defer os.Remove("test")
  384. s0.Put([]byte("foo"), []byte("bar"))
  385. s0.Put([]byte("foo"), []byte("bar1"))
  386. s0.Put([]byte("foo"), []byte("bar2"))
  387. // write scheduled compaction, but not do compaction
  388. rbytes := newRevBytes()
  389. revToBytes(reversion{main: 2}, rbytes)
  390. tx := s0.b.BatchTx()
  391. tx.Lock()
  392. tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
  393. tx.Unlock()
  394. s0.Close()
  395. s1 := newStore("test")
  396. s1.Restore()
  397. // wait for scheduled compaction to be finished
  398. time.Sleep(100 * time.Millisecond)
  399. if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted {
  400. t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
  401. }
  402. // check the key in backend is deleted
  403. revbytes := newRevBytes()
  404. // TODO: compact should delete main=2 key too
  405. revToBytes(reversion{main: 1}, revbytes)
  406. tx = s1.b.BatchTx()
  407. tx.Lock()
  408. ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
  409. if len(ks) != 0 {
  410. t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
  411. }
  412. tx.Unlock()
  413. }
  414. func BenchmarkStorePut(b *testing.B) {
  415. s := newStore("test")
  416. defer os.Remove("test")
  417. // prepare keys
  418. keys := make([][]byte, b.N)
  419. for i := 0; i < b.N; i++ {
  420. keys[i] = make([]byte, 64)
  421. rand.Read(keys[i])
  422. }
  423. b.ResetTimer()
  424. for i := 0; i < b.N; i++ {
  425. s.Put(keys[i], []byte("foo"))
  426. }
  427. }