Преглед изворни кода

backend: readtx

ReadTxs are designed for read-only accesses to the backend using a
read-only boltDB transaction. Since BatchTx's are long-running
transactions, all writes to BatchTx will writeback to ReadTx, overlaying
the base read-only transaction.
Anthony Romano пре 9 година
родитељ
комит
8d438c2939
5 измењених фајлова са 477 додато и 42 уклоњено
  1. 25 2
      mvcc/backend/backend.go
  2. 75 0
      mvcc/backend/backend_test.go
  3. 104 40
      mvcc/backend/batch_tx.go
  4. 92 0
      mvcc/backend/read_tx.go
  5. 181 0
      mvcc/backend/tx_buffer.go

+ 25 - 2
mvcc/backend/backend.go

@@ -53,7 +53,9 @@ const (
 )
 
 type Backend interface {
+	ReadTx() ReadTx
 	BatchTx() BatchTx
+
 	Snapshot() Snapshot
 	Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
 	// Size returns the current size of the backend.
@@ -86,7 +88,9 @@ type backend struct {
 
 	batchInterval time.Duration
 	batchLimit    int
-	batchTx       *batchTx
+	batchTx       *batchTxBuffered
+
+	readTx *readTx
 
 	stopc chan struct{}
 	donec chan struct{}
@@ -106,16 +110,22 @@ func newBackend(path string, d time.Duration, limit int) *backend {
 		plog.Panicf("cannot open database at %s (%v)", path, err)
 	}
 
+	// In future, may want to make buffering optional for low-concurrency systems
+	// or dynamically swap between buffered/non-buffered depending on workload.
 	b := &backend{
 		db: db,
 
 		batchInterval: d,
 		batchLimit:    limit,
 
+		readTx: &readTx{buf: txReadBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
+		},
+
 		stopc: make(chan struct{}),
 		donec: make(chan struct{}),
 	}
-	b.batchTx = newBatchTx(b)
+	b.batchTx = newBatchTxBuffered(b)
 	go b.run()
 	return b
 }
@@ -127,6 +137,8 @@ func (b *backend) BatchTx() BatchTx {
 	return b.batchTx
 }
 
+func (b *backend) ReadTx() ReadTx { return b.readTx }
+
 // ForceCommit forces the current batching tx to commit.
 func (b *backend) ForceCommit() {
 	b.batchTx.Commit()
@@ -328,6 +340,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
 	return tmptx.Commit()
 }
 
+func (b *backend) begin(write bool) *bolt.Tx {
+	b.mu.RLock()
+	tx, err := b.db.Begin(write)
+	if err != nil {
+		plog.Fatalf("cannot begin tx (%s)", err)
+	}
+	b.mu.RUnlock()
+	atomic.StoreInt64(&b.size, tx.Size())
+	return tx
+}
+
 // NewTmpBackend creates a backend implementation for testing.
 func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
 	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")

+ 75 - 0
mvcc/backend/backend_test.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
+	"reflect"
 	"testing"
 	"time"
 
@@ -173,6 +174,80 @@ func TestBackendDefrag(t *testing.T) {
 	b.ForceCommit()
 }
 
+// TestBackendWriteback ensures writes are stored to the read txn on write txn unlock.
+func TestBackendWriteback(t *testing.T) {
+	b, tmpPath := NewDefaultTmpBackend()
+	defer cleanup(b, tmpPath)
+
+	tx := b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("key"))
+	tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar"))
+	tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz"))
+	tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
+	tx.Unlock()
+
+	// overwrites should be propagated too
+	tx.Lock()
+	tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
+	tx.Unlock()
+
+	keys := []struct {
+		key   []byte
+		end   []byte
+		limit int64
+
+		wkey [][]byte
+		wval [][]byte
+	}{
+		{
+			key: []byte("abc"),
+			end: nil,
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("def"),
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("deg"),
+
+			wkey: [][]byte{[]byte("abc"), []byte("def")},
+			wval: [][]byte{[]byte("bar"), []byte("baz")},
+		},
+		{
+			key:   []byte("abc"),
+			end:   []byte("\xff"),
+			limit: 1,
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("\xff"),
+
+			wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")},
+			wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")},
+		},
+	}
+	rtx := b.ReadTx()
+	for i, tt := range keys {
+		rtx.Lock()
+		k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit)
+		rtx.Unlock()
+		if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
+			t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
+		}
+	}
+}
+
 func cleanup(b Backend, path string) {
 	b.Close()
 	os.Remove(path)

+ 104 - 40
mvcc/backend/batch_tx.go

@@ -16,6 +16,8 @@ package backend
 
 import (
 	"bytes"
+	"fmt"
+	"math"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -24,15 +26,14 @@ import (
 )
 
 type BatchTx interface {
-	Lock()
-	Unlock()
+	ReadTx
 	UnsafeCreateBucket(name []byte)
 	UnsafePut(bucketName []byte, key []byte, value []byte)
 	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
-	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
 	UnsafeDelete(bucketName []byte, key []byte)
-	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+	// Commit commits a previous tx and begins a new writable one.
 	Commit()
+	// CommitAndStop commits the previous tx and does not create a new one.
 	CommitAndStop()
 }
 
@@ -40,13 +41,8 @@ type batchTx struct {
 	sync.Mutex
 	tx      *bolt.Tx
 	backend *backend
-	pending int
-}
 
-func newBatchTx(backend *backend) *batchTx {
-	tx := &batchTx{backend: backend}
-	tx.Commit()
-	return tx
+	pending int
 }
 
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
@@ -84,30 +80,37 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
 }
 
 // UnsafeRange must be called holding the lock on the tx.
-func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
-	bucket := t.tx.Bucket(bucketName)
-	if bucket == nil {
-		plog.Fatalf("bucket %s does not exist", bucketName)
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
+	if err != nil {
+		plog.Fatal(err)
 	}
+	return k, v
+}
 
+func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
+	bucket := tx.Bucket(bucketName)
+	if bucket == nil {
+		return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
+	}
 	if len(endKey) == 0 {
-		if v := bucket.Get(key); v == nil {
-			return keys, vs
-		} else {
-			return append(keys, key), append(vs, v)
+		if v := bucket.Get(key); v != nil {
+			return append(keys, key), append(vs, v), nil
 		}
+		return nil, nil, nil
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
 	}
-
 	c := bucket.Cursor()
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 		vs = append(vs, cv)
 		keys = append(keys, ck)
-		if limit > 0 && limit == int64(len(keys)) {
+		if limit == int64(len(keys)) {
 			break
 		}
 	}
-
-	return keys, vs
+	return keys, vs, nil
 }
 
 // UnsafeDelete must be called holding the lock on the tx.
@@ -125,12 +128,14 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 
 // UnsafeForEach must be called holding the lock on the tx.
 func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
-	b := t.tx.Bucket(bucketName)
-	if b == nil {
-		// bucket does not exist
-		return nil
+	return unsafeForEach(t.tx, bucketName, visitor)
+}
+
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+	if b := tx.Bucket(bucket); b != nil {
+		return b.ForEach(visitor)
 	}
-	return b.ForEach(visitor)
+	return nil
 }
 
 // Commit commits a previous tx and begins a new writable one.
@@ -140,7 +145,7 @@ func (t *batchTx) Commit() {
 	t.commit(false)
 }
 
-// CommitAndStop commits the previous tx and do not create a new one.
+// CommitAndStop commits the previous tx and does not create a new one.
 func (t *batchTx) CommitAndStop() {
 	t.Lock()
 	defer t.Unlock()
@@ -150,13 +155,11 @@ func (t *batchTx) CommitAndStop() {
 func (t *batchTx) Unlock() {
 	if t.pending >= t.backend.batchLimit {
 		t.commit(false)
-		t.pending = 0
 	}
 	t.Mutex.Unlock()
 }
 
 func (t *batchTx) commit(stop bool) {
-	var err error
 	// commit the last tx
 	if t.tx != nil {
 		if t.pending == 0 && !stop {
@@ -178,9 +181,10 @@ func (t *batchTx) commit(stop bool) {
 			}
 			return
 		}
+
 		start := time.Now()
 		// gofail: var beforeCommit struct{}
-		err = t.tx.Commit()
+		err := t.tx.Commit()
 		// gofail: var afterCommit struct{}
 		commitDurations.Observe(time.Since(start).Seconds())
 		atomic.AddInt64(&t.backend.commits, 1)
@@ -190,17 +194,77 @@ func (t *batchTx) commit(stop bool) {
 			plog.Fatalf("cannot commit tx (%s)", err)
 		}
 	}
+	if !stop {
+		t.tx = t.backend.begin(true)
+	}
+}
+
+type batchTxBuffered struct {
+	batchTx
+	buf txWriteBuffer
+}
 
-	if stop {
-		return
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+	tx := &batchTxBuffered{
+		batchTx: batchTx{backend: backend},
+		buf: txWriteBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			seq:      true,
+		},
 	}
+	tx.Commit()
+	return tx
+}
 
-	t.backend.mu.RLock()
-	defer t.backend.mu.RUnlock()
-	// begin a new tx
-	t.tx, err = t.backend.db.Begin(true)
-	if err != nil {
-		plog.Fatalf("cannot begin tx (%s)", err)
+func (t *batchTxBuffered) Unlock() {
+	if t.pending != 0 {
+		t.backend.readTx.mu.Lock()
+		t.buf.writeback(&t.backend.readTx.buf)
+		t.backend.readTx.mu.Unlock()
+		if t.pending >= t.backend.batchLimit {
+			t.commit(false)
+		}
+	}
+	t.batchTx.Unlock()
+}
+
+func (t *batchTxBuffered) Commit() {
+	t.Lock()
+	defer t.Unlock()
+	t.commit(false)
+}
+
+func (t *batchTxBuffered) CommitAndStop() {
+	t.Lock()
+	defer t.Unlock()
+	t.commit(true)
+}
+
+func (t *batchTxBuffered) commit(stop bool) {
+	// all read txs must be closed to acquire boltdb commit rwlock
+	t.backend.readTx.mu.Lock()
+	defer t.backend.readTx.mu.Unlock()
+	if t.backend.readTx.tx != nil {
+		if err := t.backend.readTx.tx.Rollback(); err != nil {
+			plog.Fatalf("cannot rollback tx (%s)", err)
+		}
+		t.backend.readTx.buf.reset()
+		t.backend.readTx.tx = nil
+	}
+
+	t.batchTx.commit(stop)
+
+	if !stop {
+		t.backend.readTx.tx = t.backend.begin(false)
 	}
-	atomic.StoreInt64(&t.backend.size, t.tx.Size())
+}
+
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafePut(bucketName, key, value)
+	t.buf.put(bucketName, key, value)
+}
+
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafeSeqPut(bucketName, key, value)
+	t.buf.putSeq(bucketName, key, value)
 }

+ 92 - 0
mvcc/backend/read_tx.go

@@ -0,0 +1,92 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+	"bytes"
+	"math"
+	"sync"
+
+	"github.com/boltdb/bolt"
+)
+
+// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
+// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
+// is known to never overwrite any key so range is safe.
+var safeRangeBucket = []byte("key")
+
+type ReadTx interface {
+	Lock()
+	Unlock()
+
+	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
+	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+}
+
+type readTx struct {
+	// mu protects accesses to the txReadBuffer
+	mu  sync.RWMutex
+	buf txReadBuffer
+
+	// txmu protects accesses to the Tx on Range requests
+	txmu sync.Mutex
+	tx   *bolt.Tx
+}
+
+func (rt *readTx) Lock()   { rt.mu.RLock() }
+func (rt *readTx) Unlock() { rt.mu.RUnlock() }
+
+func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if endKey == nil {
+		// forbid duplicates for single keys
+		limit = 1
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+		panic("do not use unsafeRange on non-keys bucket")
+	}
+	keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+	if int64(len(keys)) == limit {
+		return keys, vals
+	}
+	rt.txmu.Lock()
+	// ignore error since bucket may have been created in this batch
+	k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
+	rt.txmu.Unlock()
+	return append(k2, keys...), append(v2, vals...)
+}
+
+func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	dups := make(map[string]struct{})
+	f1 := func(k, v []byte) error {
+		dups[string(k)] = struct{}{}
+		return visitor(k, v)
+	}
+	f2 := func(k, v []byte) error {
+		if _, ok := dups[string(k)]; ok {
+			return nil
+		}
+		return visitor(k, v)
+	}
+	if err := rt.buf.ForEach(bucketName, f1); err != nil {
+		return err
+	}
+	rt.txmu.Lock()
+	err := unsafeForEach(rt.tx, bucketName, f2)
+	rt.txmu.Unlock()
+	return err
+}

+ 181 - 0
mvcc/backend/tx_buffer.go

@@ -0,0 +1,181 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+	"bytes"
+	"sort"
+)
+
+// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
+type txBuffer struct {
+	buckets map[string]*bucketBuffer
+}
+
+func (txb *txBuffer) reset() {
+	for k, v := range txb.buckets {
+		if v.used == 0 {
+			// demote
+			delete(txb.buckets, k)
+		}
+		v.used = 0
+	}
+}
+
+// txWriteBuffer buffers writes of pending updates that have not yet committed.
+type txWriteBuffer struct {
+	txBuffer
+	seq bool
+}
+
+func (txw *txWriteBuffer) put(bucket, k, v []byte) {
+	txw.seq = false
+	txw.putSeq(bucket, k, v)
+}
+
+func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
+	b, ok := txw.buckets[string(bucket)]
+	if !ok {
+		b = newBucketBuffer()
+		txw.buckets[string(bucket)] = b
+	}
+	b.add(k, v)
+}
+
+func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
+	for k, wb := range txw.buckets {
+		rb, ok := txr.buckets[k]
+		if !ok {
+			delete(txw.buckets, k)
+			txr.buckets[k] = wb
+			continue
+		}
+		if !txw.seq && wb.used > 1 {
+			// assume no duplicate keys
+			sort.Sort(wb)
+		}
+		rb.merge(wb)
+	}
+	txw.reset()
+}
+
+// txReadBuffer accesses buffered updates.
+type txReadBuffer struct{ txBuffer }
+
+func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.Range(key, endKey, limit)
+	}
+	return nil, nil
+}
+
+func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.ForEach(visitor)
+	}
+	return nil
+}
+
+type kv struct {
+	key []byte
+	val []byte
+}
+
+// bucketBuffer buffers key-value pairs that are pending commit.
+type bucketBuffer struct {
+	buf []kv
+	// used tracks number of elements in use so buf can be reused without reallocation.
+	used int
+}
+
+func newBucketBuffer() *bucketBuffer {
+	return &bucketBuffer{buf: make([]kv, 512), used: 0}
+}
+
+func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
+	f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
+	idx := sort.Search(bb.used, f)
+	if idx < 0 {
+		return nil, nil
+	}
+	if len(endKey) == 0 {
+		if bytes.Equal(key, bb.buf[idx].key) {
+			keys = append(keys, bb.buf[idx].key)
+			vals = append(vals, bb.buf[idx].val)
+		}
+		return keys, vals
+	}
+	if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
+		return nil, nil
+	}
+	for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
+		if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
+			break
+		}
+		keys = append(keys, bb.buf[i].key)
+		vals = append(vals, bb.buf[i].val)
+	}
+	return keys, vals
+}
+
+func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
+	for i := 0; i < bb.used; i++ {
+		if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (bb *bucketBuffer) add(k, v []byte) {
+	bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
+	bb.used++
+	if bb.used == len(bb.buf) {
+		buf := make([]kv, (3*len(bb.buf))/2)
+		copy(buf, bb.buf)
+		bb.buf = buf
+	}
+}
+
+// merge merges data from bb into bbsrc.
+func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
+	for i := 0; i < bbsrc.used; i++ {
+		bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
+	}
+	if bb.used == bbsrc.used {
+		return
+	}
+	if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
+		return
+	}
+
+	sort.Stable(bb)
+
+	// remove duplicates, using only newest update
+	widx := 0
+	for ridx := 1; ridx < bb.used; ridx++ {
+		if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
+			widx++
+		}
+		bb.buf[widx] = bb.buf[ridx]
+	}
+	bb.used = widx + 1
+}
+
+func (bb *bucketBuffer) Len() int { return bb.used }
+func (bb *bucketBuffer) Less(i, j int) bool {
+	return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
+}
+func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }