|
|
@@ -22,6 +22,7 @@ import (
|
|
|
mrand "math/rand"
|
|
|
"os"
|
|
|
"reflect"
|
|
|
+ "sync"
|
|
|
"testing"
|
|
|
"time"
|
|
|
|
|
|
@@ -510,6 +511,78 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
|
|
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
|
|
|
}
|
|
|
|
|
|
+type hashKVResult struct {
|
|
|
+ hash uint32
|
|
|
+ compactRev int64
|
|
|
+}
|
|
|
+
|
|
|
+// TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
|
|
|
+func TestHashKVWhenCompacting(t *testing.T) {
|
|
|
+ b, tmpPath := backend.NewDefaultTmpBackend()
|
|
|
+ s := NewStore(b, &lease.FakeLessor{}, nil)
|
|
|
+ defer os.Remove(tmpPath)
|
|
|
+
|
|
|
+ rev := 1000
|
|
|
+ for i := 2; i <= rev; i++ {
|
|
|
+ s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
|
|
+ }
|
|
|
+
|
|
|
+ hashCompactc := make(chan hashKVResult, 1)
|
|
|
+
|
|
|
+ donec := make(chan struct{})
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ for {
|
|
|
+ hash, _, compactRev, err := s.HashByRev(int64(rev))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case <-donec:
|
|
|
+ return
|
|
|
+ case hashCompactc <- hashKVResult{hash, compactRev}:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer close(donec)
|
|
|
+ revHash := make(map[int64]uint32)
|
|
|
+ for round := 0; round < 1000; round++ {
|
|
|
+ r := <-hashCompactc
|
|
|
+ if revHash[r.compactRev] == 0 {
|
|
|
+ revHash[r.compactRev] = r.hash
|
|
|
+ }
|
|
|
+ if r.hash != revHash[r.compactRev] {
|
|
|
+ t.Fatalf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ for i := 100; i >= 0; i-- {
|
|
|
+ _, err := s.Compact(int64(rev - 1 - i))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ time.Sleep(10 * time.Millisecond)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-donec:
|
|
|
+ wg.Wait()
|
|
|
+ case <-time.After(10 * time.Second):
|
|
|
+ testutil.FatalStack(t, "timeout")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestTxnPut(t *testing.T) {
|
|
|
// assign arbitrary size
|
|
|
bytesN := 30
|