Browse Source

mvcc: add TestConcurrentReadTxAndWrite

Add TestConcurrentReadTxAndWrite which creates random reads and writes,
and ensures reads always see latest writes.
Jingyi Hu 6 years ago
parent
commit
2a9320e944
1 changed files with 112 additions and 1 deletions
  1. 112 1
      mvcc/kvstore_test.go

+ 112 - 1
mvcc/kvstore_test.go

@@ -15,6 +15,7 @@
 package mvcc
 
 import (
+	"bytes"
 	"crypto/rand"
 	"encoding/binary"
 	"fmt"
@@ -22,6 +23,8 @@ import (
 	mrand "math/rand"
 	"os"
 	"reflect"
+	"sort"
+	"strconv"
 	"sync"
 	"testing"
 	"time"
@@ -645,7 +648,8 @@ func TestTxnPut(t *testing.T) {
 	}
 }
 
-func TestConcurrentReadAndWrite(t *testing.T) {
+// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
+func TestConcurrentReadNotBlockingWrite(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
@@ -706,6 +710,113 @@ func TestConcurrentReadAndWrite(t *testing.T) {
 	readTx1.End()
 }
 
+// TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes
+func TestConcurrentReadTxAndWrite(t *testing.T) {
+	var (
+		numOfReads           = 100
+		numOfWrites          = 100
+		maxNumOfPutsPerWrite = 10
+		committedKVs         kvs        // committedKVs records the key-value pairs written by the finished Write Txns
+		mu                   sync.Mutex // mu protectes committedKVs
+	)
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	defer os.Remove(tmpPath)
+
+	var wg sync.WaitGroup
+	wg.Add(numOfWrites)
+	for i := 0; i < numOfWrites; i++ {
+		go func() {
+			defer wg.Done()
+			time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
+
+			tx := s.Write()
+			numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
+			var pendingKvs kvs
+			for j := 0; j < numOfPuts; j++ {
+				k := []byte(strconv.Itoa(mrand.Int()))
+				v := []byte(strconv.Itoa(mrand.Int()))
+				tx.Put(k, v, lease.NoLease)
+				pendingKvs = append(pendingKvs, kv{k, v})
+			}
+			// reads should not see above Puts until write is finished
+			mu.Lock()
+			committedKVs = merge(committedKVs, pendingKvs) // update shared data structure
+			tx.End()
+			mu.Unlock()
+		}()
+	}
+
+	wg.Add(numOfReads)
+	for i := 0; i < numOfReads; i++ {
+		go func() {
+			defer wg.Done()
+			time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
+
+			mu.Lock()
+			wKVs := make(kvs, len(committedKVs))
+			copy(wKVs, committedKVs)
+			tx := s.Read()
+			mu.Unlock()
+			// get all keys in backend store, and compare with wKVs
+			ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
+			tx.End()
+			if err != nil {
+				t.Errorf("failed to range keys: %v", err)
+				return
+			}
+			if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet
+				return
+			}
+			var result kvs
+			for _, keyValue := range ret.KVs {
+				result = append(result, kv{keyValue.Key, keyValue.Value})
+			}
+			if !reflect.DeepEqual(wKVs, result) {
+				t.Errorf("unexpected range result") // too many key value pairs, skip printing them
+			}
+		}()
+	}
+
+	// wait until go routines finish or timeout
+	doneC := make(chan struct{})
+	go func() {
+		wg.Wait()
+		close(doneC)
+	}()
+	select {
+	case <-doneC:
+	case <-time.After(5 * time.Minute):
+		testutil.FatalStack(t, "timeout")
+	}
+}
+
+type kv struct {
+	key []byte
+	val []byte
+}
+
+type kvs []kv
+
+func (kvs kvs) Len() int           { return len(kvs) }
+func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
+func (kvs kvs) Swap(i, j int)      { kvs[i], kvs[j] = kvs[j], kvs[i] }
+
+func merge(dst, src kvs) kvs {
+	dst = append(dst, src...)
+	sort.Stable(dst)
+	// remove duplicates, using only the newest value
+	// ref: tx_buffer.go
+	widx := 0
+	for ridx := 1; ridx < len(dst); ridx++ {
+		if !bytes.Equal(dst[widx].key, dst[ridx].key) {
+			widx++
+		}
+		dst[widx] = dst[ridx]
+	}
+	return dst[:widx+1]
+}
+
 // TODO: test attach key to lessor
 
 func newTestRevBytes(rev revision) []byte {