|
|
@@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
|
|
|
+func TestConcurrentReadTx(t *testing.T) {
|
|
|
+ b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
|
|
+ defer cleanup(b, tmpPath)
|
|
|
+
|
|
|
+ wtx1 := b.BatchTx()
|
|
|
+ wtx1.Lock()
|
|
|
+ wtx1.UnsafeCreateBucket([]byte("key"))
|
|
|
+ wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
|
|
|
+ wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
|
|
|
+ wtx1.Unlock()
|
|
|
+
|
|
|
+ wtx2 := b.BatchTx()
|
|
|
+ wtx2.Lock()
|
|
|
+ wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
|
|
|
+ wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
|
|
|
+ wtx2.Unlock()
|
|
|
+
|
|
|
+ rtx := b.ConcurrentReadTx()
|
|
|
+ rtx.RLock() // no-op
|
|
|
+ k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
|
|
|
+ rtx.RUnlock()
|
|
|
+ wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
|
|
|
+ wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
|
|
|
+ if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
|
|
|
+ t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// TestBackendWritebackForEach checks that partially written / buffered
|
|
|
// data is visited in the same order as fully committed data.
|
|
|
func TestBackendWritebackForEach(t *testing.T) {
|
|
|
@@ -300,8 +329,6 @@ func TestBackendWritebackForEach(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// TODO: add a unit test for concurrentReadTx
|
|
|
-
|
|
|
func cleanup(b Backend, path string) {
|
|
|
b.Close()
|
|
|
os.Remove(path)
|