kv_test.go 22 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "fmt"
  17. "os"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/lease"
  22. "github.com/coreos/etcd/pkg/testutil"
  23. "github.com/coreos/etcd/storage/backend"
  24. "github.com/coreos/etcd/storage/storagepb"
  25. )
  26. // Functional tests for features implemented in v3 store. It treats v3 store
  27. // as a black box, and tests it by feeding the input and validating the output.
  28. // TODO: add similar tests on operations in one txn/rev
  29. type (
  30. rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error)
  31. putFunc func(kv KV, key, value []byte, lease lease.LeaseID) int64
  32. deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
  33. )
  34. var (
  35. normalRangeFunc = func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error) {
  36. return kv.Range(key, end, limit, rangeRev)
  37. }
  38. txnRangeFunc = func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error) {
  39. id := kv.TxnBegin()
  40. defer kv.TxnEnd(id)
  41. return kv.TxnRange(id, key, end, limit, rangeRev)
  42. }
  43. normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
  44. return kv.Put(key, value, lease)
  45. }
  46. txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
  47. id := kv.TxnBegin()
  48. defer kv.TxnEnd(id)
  49. rev, err := kv.TxnPut(id, key, value, lease)
  50. if err != nil {
  51. panic("txn put error")
  52. }
  53. return rev
  54. }
  55. normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
  56. return kv.DeleteRange(key, end)
  57. }
  58. txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
  59. id := kv.TxnBegin()
  60. defer kv.TxnEnd(id)
  61. n, rev, err := kv.TxnDeleteRange(id, key, end)
  62. if err != nil {
  63. panic("txn delete error")
  64. }
  65. return n, rev
  66. }
  67. )
  68. func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) }
  69. func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
  70. func testKVRange(t *testing.T, f rangeFunc) {
  71. b, tmpPath := backend.NewDefaultTmpBackend()
  72. s := NewStore(b, &lease.FakeLessor{})
  73. defer cleanup(s, b, tmpPath)
  74. s.Put([]byte("foo"), []byte("bar"), 1)
  75. s.Put([]byte("foo1"), []byte("bar1"), 2)
  76. s.Put([]byte("foo2"), []byte("bar2"), 3)
  77. kvs := []storagepb.KeyValue{
  78. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
  79. {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
  80. {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
  81. }
  82. wrev := int64(3)
  83. tests := []struct {
  84. key, end []byte
  85. wkvs []storagepb.KeyValue
  86. }{
  87. // get no keys
  88. {
  89. []byte("doo"), []byte("foo"),
  90. nil,
  91. },
  92. // get no keys when key == end
  93. {
  94. []byte("foo"), []byte("foo"),
  95. nil,
  96. },
  97. // get no keys when ranging single key
  98. {
  99. []byte("doo"), nil,
  100. nil,
  101. },
  102. // get all keys
  103. {
  104. []byte("foo"), []byte("foo3"),
  105. kvs,
  106. },
  107. // get partial keys
  108. {
  109. []byte("foo"), []byte("foo1"),
  110. kvs[:1],
  111. },
  112. // get single key
  113. {
  114. []byte("foo"), nil,
  115. kvs[:1],
  116. },
  117. }
  118. for i, tt := range tests {
  119. kvs, rev, err := f(s, tt.key, tt.end, 0, 0)
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. if rev != wrev {
  124. t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
  125. }
  126. if !reflect.DeepEqual(kvs, tt.wkvs) {
  127. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  128. }
  129. }
  130. }
  131. func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
  132. func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
  133. func testKVRangeRev(t *testing.T, f rangeFunc) {
  134. b, tmpPath := backend.NewDefaultTmpBackend()
  135. s := NewStore(b, &lease.FakeLessor{})
  136. defer cleanup(s, b, tmpPath)
  137. s.Put([]byte("foo"), []byte("bar"), 1)
  138. s.Put([]byte("foo1"), []byte("bar1"), 2)
  139. s.Put([]byte("foo2"), []byte("bar2"), 3)
  140. kvs := []storagepb.KeyValue{
  141. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
  142. {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
  143. {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
  144. }
  145. tests := []struct {
  146. rev int64
  147. wrev int64
  148. wkvs []storagepb.KeyValue
  149. }{
  150. {-1, 3, kvs},
  151. {0, 3, kvs},
  152. {1, 1, kvs[:1]},
  153. {2, 2, kvs[:2]},
  154. {3, 3, kvs},
  155. }
  156. for i, tt := range tests {
  157. kvs, rev, err := f(s, []byte("foo"), []byte("foo3"), 0, tt.rev)
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. if rev != tt.wrev {
  162. t.Errorf("#%d: rev = %d, want %d", i, rev, tt.wrev)
  163. }
  164. if !reflect.DeepEqual(kvs, tt.wkvs) {
  165. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  166. }
  167. }
  168. }
  169. func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
  170. func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
  171. func testKVRangeBadRev(t *testing.T, f rangeFunc) {
  172. b, tmpPath := backend.NewDefaultTmpBackend()
  173. s := NewStore(b, &lease.FakeLessor{})
  174. defer cleanup(s, b, tmpPath)
  175. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  176. s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
  177. s.Put([]byte("foo2"), []byte("bar2"), lease.NoLease)
  178. if err := s.Compact(3); err != nil {
  179. t.Fatalf("compact error (%v)", err)
  180. }
  181. tests := []struct {
  182. rev int64
  183. werr error
  184. }{
  185. {-1, ErrCompacted},
  186. {2, ErrCompacted},
  187. {3, ErrCompacted},
  188. {4, ErrFutureRev},
  189. {100, ErrFutureRev},
  190. }
  191. for i, tt := range tests {
  192. _, _, err := f(s, []byte("foo"), []byte("foo3"), 0, tt.rev)
  193. if err != tt.werr {
  194. t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
  195. }
  196. }
  197. }
  198. func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) }
  199. func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
  200. func testKVRangeLimit(t *testing.T, f rangeFunc) {
  201. b, tmpPath := backend.NewDefaultTmpBackend()
  202. s := NewStore(b, &lease.FakeLessor{})
  203. defer cleanup(s, b, tmpPath)
  204. s.Put([]byte("foo"), []byte("bar"), 1)
  205. s.Put([]byte("foo1"), []byte("bar1"), 2)
  206. s.Put([]byte("foo2"), []byte("bar2"), 3)
  207. kvs := []storagepb.KeyValue{
  208. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
  209. {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
  210. {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
  211. }
  212. wrev := int64(3)
  213. tests := []struct {
  214. limit int64
  215. wkvs []storagepb.KeyValue
  216. }{
  217. // no limit
  218. {-1, kvs},
  219. // no limit
  220. {0, kvs},
  221. {1, kvs[:1]},
  222. {2, kvs[:2]},
  223. {3, kvs},
  224. {100, kvs},
  225. }
  226. for i, tt := range tests {
  227. kvs, rev, err := f(s, []byte("foo"), []byte("foo3"), tt.limit, 0)
  228. if err != nil {
  229. t.Fatalf("#%d: range error (%v)", i, err)
  230. }
  231. if !reflect.DeepEqual(kvs, tt.wkvs) {
  232. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  233. }
  234. if rev != wrev {
  235. t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
  236. }
  237. }
  238. }
  239. func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalPutFunc) }
  240. func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
  241. func testKVPutMultipleTimes(t *testing.T, f putFunc) {
  242. b, tmpPath := backend.NewDefaultTmpBackend()
  243. s := NewStore(b, &lease.FakeLessor{})
  244. defer cleanup(s, b, tmpPath)
  245. for i := 0; i < 10; i++ {
  246. base := int64(i + 1)
  247. rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base))
  248. if rev != base {
  249. t.Errorf("#%d: rev = %d, want %d", i, rev, base)
  250. }
  251. kvs, _, err := s.Range([]byte("foo"), nil, 0, 0)
  252. if err != nil {
  253. t.Fatal(err)
  254. }
  255. wkvs := []storagepb.KeyValue{
  256. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base, Lease: base},
  257. }
  258. if !reflect.DeepEqual(kvs, wkvs) {
  259. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
  260. }
  261. }
  262. }
  263. func TestKVDeleteRange(t *testing.T) { testKVDeleteRange(t, normalDeleteRangeFunc) }
  264. func TestKVTxnDeleteRange(t *testing.T) { testKVDeleteRange(t, txnDeleteRangeFunc) }
  265. func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
  266. tests := []struct {
  267. key, end []byte
  268. wrev int64
  269. wN int64
  270. }{
  271. {
  272. []byte("foo"), nil,
  273. 4, 1,
  274. },
  275. {
  276. []byte("foo"), []byte("foo1"),
  277. 4, 1,
  278. },
  279. {
  280. []byte("foo"), []byte("foo2"),
  281. 4, 2,
  282. },
  283. {
  284. []byte("foo"), []byte("foo3"),
  285. 4, 3,
  286. },
  287. {
  288. []byte("foo3"), []byte("foo8"),
  289. 3, 0,
  290. },
  291. {
  292. []byte("foo3"), nil,
  293. 3, 0,
  294. },
  295. }
  296. for i, tt := range tests {
  297. b, tmpPath := backend.NewDefaultTmpBackend()
  298. s := NewStore(b, &lease.FakeLessor{})
  299. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  300. s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
  301. s.Put([]byte("foo2"), []byte("bar2"), lease.NoLease)
  302. n, rev := f(s, tt.key, tt.end)
  303. if n != tt.wN || rev != tt.wrev {
  304. t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, tt.wN, tt.wrev)
  305. }
  306. cleanup(s, b, tmpPath)
  307. }
  308. }
  309. func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, normalDeleteRangeFunc) }
  310. func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
  311. func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
  312. b, tmpPath := backend.NewDefaultTmpBackend()
  313. s := NewStore(b, &lease.FakeLessor{})
  314. defer cleanup(s, b, tmpPath)
  315. s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  316. n, rev := f(s, []byte("foo"), nil)
  317. if n != 1 || rev != 2 {
  318. t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 2)
  319. }
  320. for i := 0; i < 10; i++ {
  321. n, rev := f(s, []byte("foo"), nil)
  322. if n != 0 || rev != 2 {
  323. t.Fatalf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 0, 2)
  324. }
  325. }
  326. }
  327. // test that range, put, delete on single key in sequence repeatedly works correctly.
  328. func TestKVOperationInSequence(t *testing.T) {
  329. b, tmpPath := backend.NewDefaultTmpBackend()
  330. s := NewStore(b, &lease.FakeLessor{})
  331. defer cleanup(s, b, tmpPath)
  332. for i := 0; i < 10; i++ {
  333. base := int64(i * 2)
  334. // put foo
  335. rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
  336. if rev != base+1 {
  337. t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
  338. }
  339. kvs, rev, err := s.Range([]byte("foo"), nil, 0, base+1)
  340. if err != nil {
  341. t.Fatal(err)
  342. }
  343. wkvs := []storagepb.KeyValue{
  344. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(lease.NoLease)},
  345. }
  346. if !reflect.DeepEqual(kvs, wkvs) {
  347. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
  348. }
  349. if rev != base+1 {
  350. t.Errorf("#%d: range rev = %d, want %d", i, rev, base+1)
  351. }
  352. // delete foo
  353. n, rev := s.DeleteRange([]byte("foo"), nil)
  354. if n != 1 || rev != base+2 {
  355. t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+2)
  356. }
  357. kvs, rev, err = s.Range([]byte("foo"), nil, 0, base+2)
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. if kvs != nil {
  362. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, nil)
  363. }
  364. if rev != base+2 {
  365. t.Errorf("#%d: range rev = %d, want %d", i, rev, base+2)
  366. }
  367. }
  368. }
  369. func TestKVTxnBlockNonTnxOperations(t *testing.T) {
  370. b, tmpPath := backend.NewDefaultTmpBackend()
  371. s := NewStore(b, &lease.FakeLessor{})
  372. tests := []func(){
  373. func() { s.Range([]byte("foo"), nil, 0, 0) },
  374. func() { s.Put([]byte("foo"), nil, lease.NoLease) },
  375. func() { s.DeleteRange([]byte("foo"), nil) },
  376. }
  377. for i, tt := range tests {
  378. id := s.TxnBegin()
  379. done := make(chan struct{}, 1)
  380. go func() {
  381. tt()
  382. done <- struct{}{}
  383. }()
  384. select {
  385. case <-done:
  386. t.Fatalf("#%d: operation failed to be blocked", i)
  387. case <-time.After(10 * time.Millisecond):
  388. }
  389. s.TxnEnd(id)
  390. select {
  391. case <-done:
  392. case <-time.After(10 * time.Second):
  393. testutil.FatalStack(t, fmt.Sprintf("#%d: operation failed to be unblocked", i))
  394. }
  395. }
  396. // only close backend when we know all the tx are finished
  397. cleanup(s, b, tmpPath)
  398. }
  399. func TestKVTxnWrongID(t *testing.T) {
  400. b, tmpPath := backend.NewDefaultTmpBackend()
  401. s := NewStore(b, &lease.FakeLessor{})
  402. defer cleanup(s, b, tmpPath)
  403. id := s.TxnBegin()
  404. wrongid := id + 1
  405. tests := []func() error{
  406. func() error {
  407. _, _, err := s.TxnRange(wrongid, []byte("foo"), nil, 0, 0)
  408. return err
  409. },
  410. func() error {
  411. _, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease)
  412. return err
  413. },
  414. func() error {
  415. _, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil)
  416. return err
  417. },
  418. func() error { return s.TxnEnd(wrongid) },
  419. }
  420. for i, tt := range tests {
  421. err := tt()
  422. if err != ErrTxnIDMismatch {
  423. t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch)
  424. }
  425. }
  426. err := s.TxnEnd(id)
  427. if err != nil {
  428. t.Fatalf("end err = %+v, want %+v", err, nil)
  429. }
  430. }
  431. // test that txn range, put, delete on single key in sequence repeatedly works correctly.
  432. func TestKVTnxOperationInSequence(t *testing.T) {
  433. b, tmpPath := backend.NewDefaultTmpBackend()
  434. s := NewStore(b, &lease.FakeLessor{})
  435. defer cleanup(s, b, tmpPath)
  436. for i := 0; i < 10; i++ {
  437. id := s.TxnBegin()
  438. base := int64(i)
  439. // put foo
  440. rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease)
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. if rev != base+1 {
  445. t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
  446. }
  447. kvs, rev, err := s.TxnRange(id, []byte("foo"), nil, 0, base+1)
  448. if err != nil {
  449. t.Fatal(err)
  450. }
  451. wkvs := []storagepb.KeyValue{
  452. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(lease.NoLease)},
  453. }
  454. if !reflect.DeepEqual(kvs, wkvs) {
  455. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
  456. }
  457. if rev != base+1 {
  458. t.Errorf("#%d: range rev = %d, want %d", i, rev, base+1)
  459. }
  460. // delete foo
  461. n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil)
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. if n != 1 || rev != base+1 {
  466. t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
  467. }
  468. kvs, rev, err = s.TxnRange(id, []byte("foo"), nil, 0, base+1)
  469. if err != nil {
  470. t.Errorf("#%d: range error (%v)", i, err)
  471. }
  472. if kvs != nil {
  473. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, nil)
  474. }
  475. if rev != base+1 {
  476. t.Errorf("#%d: range rev = %d, want %d", i, rev, base+1)
  477. }
  478. s.TxnEnd(id)
  479. }
  480. }
  481. func TestKVCompactReserveLastValue(t *testing.T) {
  482. b, tmpPath := backend.NewDefaultTmpBackend()
  483. s := NewStore(b, &lease.FakeLessor{})
  484. defer cleanup(s, b, tmpPath)
  485. s.Put([]byte("foo"), []byte("bar0"), 1)
  486. s.Put([]byte("foo"), []byte("bar1"), 2)
  487. s.DeleteRange([]byte("foo"), nil)
  488. s.Put([]byte("foo"), []byte("bar2"), 3)
  489. // rev in tests will be called in Compact() one by one on the same store
  490. tests := []struct {
  491. rev int64
  492. // wanted kvs right after the compacted rev
  493. wkvs []storagepb.KeyValue
  494. }{
  495. {
  496. 0,
  497. []storagepb.KeyValue{
  498. {Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
  499. },
  500. },
  501. {
  502. 1,
  503. []storagepb.KeyValue{
  504. {Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2, Lease: 2},
  505. },
  506. },
  507. {
  508. 2,
  509. nil,
  510. },
  511. {
  512. 3,
  513. []storagepb.KeyValue{
  514. {Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
  515. },
  516. },
  517. }
  518. for i, tt := range tests {
  519. err := s.Compact(tt.rev)
  520. if err != nil {
  521. t.Errorf("#%d: unexpect compact error %v", i, err)
  522. }
  523. kvs, _, err := s.Range([]byte("foo"), nil, 0, tt.rev+1)
  524. if err != nil {
  525. t.Errorf("#%d: unexpect range error %v", i, err)
  526. }
  527. if !reflect.DeepEqual(kvs, tt.wkvs) {
  528. t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
  529. }
  530. }
  531. }
  532. func TestKVCompactBad(t *testing.T) {
  533. b, tmpPath := backend.NewDefaultTmpBackend()
  534. s := NewStore(b, &lease.FakeLessor{})
  535. defer cleanup(s, b, tmpPath)
  536. s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
  537. s.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
  538. s.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
  539. // rev in tests will be called in Compact() one by one on the same store
  540. tests := []struct {
  541. rev int64
  542. werr error
  543. }{
  544. {0, nil},
  545. {1, nil},
  546. {1, ErrCompacted},
  547. {3, nil},
  548. {4, ErrFutureRev},
  549. {100, ErrFutureRev},
  550. }
  551. for i, tt := range tests {
  552. err := s.Compact(tt.rev)
  553. if err != tt.werr {
  554. t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
  555. }
  556. }
  557. }
  558. func TestKVHash(t *testing.T) {
  559. hashes := make([]uint32, 3)
  560. for i := 0; i < len(hashes); i++ {
  561. var err error
  562. b, tmpPath := backend.NewDefaultTmpBackend()
  563. kv := NewStore(b, &lease.FakeLessor{})
  564. kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
  565. kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
  566. hashes[i], err = kv.Hash()
  567. if err != nil {
  568. t.Fatalf("failed to get hash: %v", err)
  569. }
  570. cleanup(kv, b, tmpPath)
  571. }
  572. for i := 1; i < len(hashes); i++ {
  573. if hashes[i-1] != hashes[i] {
  574. t.Errorf("hash[%d](%d) != hash[%d](%d)", i-1, hashes[i-1], i, hashes[i])
  575. }
  576. }
  577. }
  578. func TestKVRestore(t *testing.T) {
  579. tests := []func(kv KV){
  580. func(kv KV) {
  581. kv.Put([]byte("foo"), []byte("bar0"), 1)
  582. kv.Put([]byte("foo"), []byte("bar1"), 2)
  583. kv.Put([]byte("foo"), []byte("bar2"), 3)
  584. },
  585. func(kv KV) {
  586. kv.Put([]byte("foo"), []byte("bar0"), 1)
  587. kv.DeleteRange([]byte("foo"), nil)
  588. kv.Put([]byte("foo"), []byte("bar1"), 2)
  589. },
  590. func(kv KV) {
  591. kv.Put([]byte("foo"), []byte("bar0"), 1)
  592. kv.Put([]byte("foo"), []byte("bar1"), 2)
  593. kv.Compact(1)
  594. },
  595. }
  596. for i, tt := range tests {
  597. b, tmpPath := backend.NewDefaultTmpBackend()
  598. s := NewStore(b, &lease.FakeLessor{})
  599. tt(s)
  600. var kvss [][]storagepb.KeyValue
  601. for k := int64(0); k < 10; k++ {
  602. kvs, _, _ := s.Range([]byte("a"), []byte("z"), 0, k)
  603. kvss = append(kvss, kvs)
  604. }
  605. s.Close()
  606. // ns should recover the the previous state from backend.
  607. ns := NewStore(b, &lease.FakeLessor{})
  608. // wait for possible compaction to finish
  609. testutil.WaitSchedule()
  610. var nkvss [][]storagepb.KeyValue
  611. for k := int64(0); k < 10; k++ {
  612. nkvs, _, _ := ns.Range([]byte("a"), []byte("z"), 0, k)
  613. nkvss = append(nkvss, nkvs)
  614. }
  615. cleanup(ns, b, tmpPath)
  616. if !reflect.DeepEqual(nkvss, kvss) {
  617. t.Errorf("#%d: kvs history = %+v, want %+v", i, nkvss, kvss)
  618. }
  619. }
  620. }
  621. func TestKVSnapshot(t *testing.T) {
  622. b, tmpPath := backend.NewDefaultTmpBackend()
  623. s := NewStore(b, &lease.FakeLessor{})
  624. defer cleanup(s, b, tmpPath)
  625. s.Put([]byte("foo"), []byte("bar"), 1)
  626. s.Put([]byte("foo1"), []byte("bar1"), 2)
  627. s.Put([]byte("foo2"), []byte("bar2"), 3)
  628. wkvs := []storagepb.KeyValue{
  629. {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
  630. {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
  631. {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
  632. }
  633. newPath := "new_test"
  634. f, err := os.Create(newPath)
  635. if err != nil {
  636. t.Fatal(err)
  637. }
  638. defer os.Remove(newPath)
  639. snap := s.b.Snapshot()
  640. defer snap.Close()
  641. _, err = snap.WriteTo(f)
  642. if err != nil {
  643. t.Fatal(err)
  644. }
  645. f.Close()
  646. ns := NewStore(b, &lease.FakeLessor{})
  647. defer ns.Close()
  648. kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
  649. if err != nil {
  650. t.Errorf("unexpect range error (%v)", err)
  651. }
  652. if !reflect.DeepEqual(kvs, wkvs) {
  653. t.Errorf("kvs = %+v, want %+v", kvs, wkvs)
  654. }
  655. if rev != 3 {
  656. t.Errorf("rev = %d, want %d", rev, 3)
  657. }
  658. }
  659. func TestWatchableKVWatch(t *testing.T) {
  660. b, tmpPath := backend.NewDefaultTmpBackend()
  661. s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
  662. defer cleanup(s, b, tmpPath)
  663. w := s.NewWatchStream()
  664. defer w.Close()
  665. wid := w.Watch([]byte("foo"), true, 0)
  666. s.Put([]byte("foo"), []byte("bar"), 1)
  667. select {
  668. case resp := <-w.Chan():
  669. wev := storagepb.Event{
  670. Type: storagepb.PUT,
  671. Kv: &storagepb.KeyValue{
  672. Key: []byte("foo"),
  673. Value: []byte("bar"),
  674. CreateRevision: 1,
  675. ModRevision: 1,
  676. Version: 1,
  677. Lease: 1,
  678. },
  679. }
  680. if resp.WatchID != wid {
  681. t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
  682. }
  683. ev := resp.Events[0]
  684. if !reflect.DeepEqual(ev, wev) {
  685. t.Errorf("watched event = %+v, want %+v", ev, wev)
  686. }
  687. case <-time.After(5 * time.Second):
  688. // CPU might be too slow, and the routine is not able to switch around
  689. testutil.FatalStack(t, "failed to watch the event")
  690. }
  691. s.Put([]byte("foo1"), []byte("bar1"), 2)
  692. select {
  693. case resp := <-w.Chan():
  694. wev := storagepb.Event{
  695. Type: storagepb.PUT,
  696. Kv: &storagepb.KeyValue{
  697. Key: []byte("foo1"),
  698. Value: []byte("bar1"),
  699. CreateRevision: 2,
  700. ModRevision: 2,
  701. Version: 1,
  702. Lease: 2,
  703. },
  704. }
  705. if resp.WatchID != wid {
  706. t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
  707. }
  708. ev := resp.Events[0]
  709. if !reflect.DeepEqual(ev, wev) {
  710. t.Errorf("watched event = %+v, want %+v", ev, wev)
  711. }
  712. case <-time.After(5 * time.Second):
  713. testutil.FatalStack(t, "failed to watch the event")
  714. }
  715. w = s.NewWatchStream()
  716. wid = w.Watch([]byte("foo1"), false, 1)
  717. select {
  718. case resp := <-w.Chan():
  719. wev := storagepb.Event{
  720. Type: storagepb.PUT,
  721. Kv: &storagepb.KeyValue{
  722. Key: []byte("foo1"),
  723. Value: []byte("bar1"),
  724. CreateRevision: 2,
  725. ModRevision: 2,
  726. Version: 1,
  727. Lease: 2,
  728. },
  729. }
  730. if resp.WatchID != wid {
  731. t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
  732. }
  733. ev := resp.Events[0]
  734. if !reflect.DeepEqual(ev, wev) {
  735. t.Errorf("watched event = %+v, want %+v", ev, wev)
  736. }
  737. case <-time.After(5 * time.Second):
  738. testutil.FatalStack(t, "failed to watch the event")
  739. }
  740. s.Put([]byte("foo1"), []byte("bar11"), 3)
  741. select {
  742. case resp := <-w.Chan():
  743. wev := storagepb.Event{
  744. Type: storagepb.PUT,
  745. Kv: &storagepb.KeyValue{
  746. Key: []byte("foo1"),
  747. Value: []byte("bar11"),
  748. CreateRevision: 2,
  749. ModRevision: 3,
  750. Version: 2,
  751. Lease: 3,
  752. },
  753. }
  754. if resp.WatchID != wid {
  755. t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
  756. }
  757. ev := resp.Events[0]
  758. if !reflect.DeepEqual(ev, wev) {
  759. t.Errorf("watched event = %+v, want %+v", ev, wev)
  760. }
  761. case <-time.After(5 * time.Second):
  762. testutil.FatalStack(t, "failed to watch the event")
  763. }
  764. }
  765. func cleanup(s KV, b backend.Backend, path string) {
  766. s.Close()
  767. b.Close()
  768. os.Remove(path)
  769. }