Browse Source

Merge pull request #7525 from heyitsanthony/big-backend

etcdserver, backend: configure mmap size based on quota
Anthony Romano 8 years ago
parent
commit
7ef75e373a

+ 1 - 1
etcdctl/ctlv3/command/migrate_command.go

@@ -106,7 +106,7 @@ func prepareBackend() backend.Backend {
 	dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
 	dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
 	go func() {
 	go func() {
 		defer close(bch)
 		defer close(bch)
-		be = backend.New(dbpath, time.Second, 10000)
+		be = backend.NewDefaultBackend(dbpath)
 
 
 	}()
 	}()
 	select {
 	select {

+ 12 - 5
etcdserver/quota.go

@@ -16,7 +16,15 @@ package etcdserver
 
 
 import (
 import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"github.com/coreos/etcd/mvcc/backend"
+)
+
+const (
+	// DefaultQuotaBytes is the number of bytes the backend Size may
+	// consume before exceeding the space quota.
+	DefaultQuotaBytes = int64(2 * 1024 * 1024 * 1024) // 2GB
+	// MaxQuotaBytes is the maximum number of bytes suggested for a backend
+	// quota. A larger quota may lead to degraded performance.
+	MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB
 )
 )
 
 
 // Quota represents an arbitrary quota against arbitrary requests. Each request
 // Quota represents an arbitrary quota against arbitrary requests. Each request
@@ -57,11 +65,10 @@ func NewBackendQuota(s *EtcdServer) Quota {
 	}
 	}
 	if s.Cfg.QuotaBackendBytes == 0 {
 	if s.Cfg.QuotaBackendBytes == 0 {
 		// use default size if no quota size given
 		// use default size if no quota size given
-		return &backendQuota{s, backend.DefaultQuotaBytes}
+		return &backendQuota{s, DefaultQuotaBytes}
 	}
 	}
-	if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
-		plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
-		return &backendQuota{s, backend.MaxQuotaBytes}
+	if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
+		plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
 	}
 	}
 	return &backendQuota{s, s.Cfg.QuotaBackendBytes}
 	return &backendQuota{s, s.Cfg.QuotaBackendBytes}
 }
 }

+ 12 - 2
etcdserver/server.go

@@ -270,7 +270,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	var be backend.Backend
 	var be backend.Backend
 	beOpened := make(chan struct{})
 	beOpened := make(chan struct{})
 	go func() {
 	go func() {
-		be = backend.NewDefaultBackend(bepath)
+		be = newBackend(bepath, cfg.QuotaBackendBytes)
 		beOpened <- struct{}{}
 		beOpened <- struct{}{}
 	}()
 	}()
 
 
@@ -809,7 +809,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		plog.Panicf("rename snapshot file error: %v", err)
 		plog.Panicf("rename snapshot file error: %v", err)
 	}
 	}
 
 
-	newbe := backend.NewDefaultBackend(fn)
+	newbe := newBackend(fn, s.Cfg.QuotaBackendBytes)
 
 
 	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
 	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
@@ -1653,3 +1653,13 @@ func (s *EtcdServer) goAttach(f func()) {
 		f()
 		f()
 	}()
 	}()
 }
 }
+
+func newBackend(path string, quotaBytes int64) backend.Backend {
+	bcfg := backend.DefaultBackendConfig()
+	bcfg.Path = path
+	if quotaBytes > 0 && quotaBytes != DefaultQuotaBytes {
+		// permit 10% excess over quota for disarm
+		bcfg.MmapSize = uint64(quotaBytes + quotaBytes/10)
+	}
+	return backend.New(bcfg)
+}

+ 3 - 2
lease/lessor_test.go

@@ -390,6 +390,7 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
 	if err != nil {
 	if err != nil {
 		t.Fatalf("failed to create tmpdir (%v)", err)
 		t.Fatalf("failed to create tmpdir (%v)", err)
 	}
 	}
-
-	return tmpPath, backend.New(filepath.Join(tmpPath, "be"), time.Second, 10000)
+	bcfg := backend.DefaultBackendConfig()
+	bcfg.Path = filepath.Join(tmpPath, "be")
+	return tmpPath, backend.New(bcfg)
 }
 }

+ 40 - 20
mvcc/backend/backend.go

@@ -35,23 +35,14 @@ var (
 
 
 	defragLimit = 10000
 	defragLimit = 10000
 
 
-	// InitialMmapSize is the initial size of the mmapped region. Setting this larger than
+	// initialMmapSize is the initial size of the mmapped region. Setting this larger than
 	// the potential max db size can prevent writer from blocking reader.
 	// the potential max db size can prevent writer from blocking reader.
 	// This only works for linux.
 	// This only works for linux.
-	InitialMmapSize = int64(10 * 1024 * 1024 * 1024)
+	initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
 
 
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
 )
 )
 
 
-const (
-	// DefaultQuotaBytes is the number of bytes the backend Size may
-	// consume before exceeding the space quota.
-	DefaultQuotaBytes = int64(2 * 1024 * 1024 * 1024) // 2GB
-	// MaxQuotaBytes is the maximum number of bytes suggested for a backend
-	// quota. A larger quota may lead to degraded performance.
-	MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB
-)
-
 type Backend interface {
 type Backend interface {
 	ReadTx() ReadTx
 	ReadTx() ReadTx
 	BatchTx() BatchTx
 	BatchTx() BatchTx
@@ -96,18 +87,45 @@ type backend struct {
 	donec chan struct{}
 	donec chan struct{}
 }
 }
 
 
-func New(path string, d time.Duration, limit int) Backend {
-	return newBackend(path, d, limit)
+type BackendConfig struct {
+	// Path is the file path to the backend file.
+	Path string
+	// BatchInterval is the maximum time before flushing the BatchTx.
+	BatchInterval time.Duration
+	// BatchLimit is the maximum puts before flushing the BatchTx.
+	BatchLimit int
+	// MmapSize is the number of bytes to mmap for the backend.
+	MmapSize uint64
+}
+
+func DefaultBackendConfig() BackendConfig {
+	return BackendConfig{
+		BatchInterval: defaultBatchInterval,
+		BatchLimit:    defaultBatchLimit,
+		MmapSize:      initialMmapSize,
+	}
+}
+
+func New(bcfg BackendConfig) Backend {
+	return newBackend(bcfg)
 }
 }
 
 
 func NewDefaultBackend(path string) Backend {
 func NewDefaultBackend(path string) Backend {
-	return newBackend(path, defaultBatchInterval, defaultBatchLimit)
+	bcfg := DefaultBackendConfig()
+	bcfg.Path = path
+	return newBackend(bcfg)
 }
 }
 
 
-func newBackend(path string, d time.Duration, limit int) *backend {
-	db, err := bolt.Open(path, 0600, boltOpenOptions)
+func newBackend(bcfg BackendConfig) *backend {
+	bopts := &bolt.Options{}
+	if boltOpenOptions != nil {
+		*bopts = *boltOpenOptions
+	}
+	bopts.InitialMmapSize = int(bcfg.MmapSize)
+
+	db, err := bolt.Open(bcfg.Path, 0600, bopts)
 	if err != nil {
 	if err != nil {
-		plog.Panicf("cannot open database at %s (%v)", path, err)
+		plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
 	}
 	}
 
 
 	// In future, may want to make buffering optional for low-concurrency systems
 	// In future, may want to make buffering optional for low-concurrency systems
@@ -115,8 +133,8 @@ func newBackend(path string, d time.Duration, limit int) *backend {
 	b := &backend{
 	b := &backend{
 		db: db,
 		db: db,
 
 
-		batchInterval: d,
-		batchLimit:    limit,
+		batchInterval: bcfg.BatchInterval,
+		batchLimit:    bcfg.BatchLimit,
 
 
 		readTx: &readTx{buf: txReadBuffer{
 		readTx: &readTx{buf: txReadBuffer{
 			txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
 			txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
@@ -361,7 +379,9 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
 		plog.Fatal(err)
 		plog.Fatal(err)
 	}
 	}
 	tmpPath := filepath.Join(dir, "database")
 	tmpPath := filepath.Join(dir, "database")
-	return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
+	bcfg := DefaultBackendConfig()
+	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
+	return newBackend(bcfg), tmpPath
 }
 }
 
 
 func NewDefaultTmpBackend() (*backend, string) {
 func NewDefaultTmpBackend() (*backend, string) {

+ 2 - 2
mvcc/backend/backend_bench_test.go

@@ -22,9 +22,9 @@ import (
 )
 )
 
 
 func BenchmarkBackendPut(b *testing.B) {
 func BenchmarkBackendPut(b *testing.B) {
-	backend := New("test", 100*time.Millisecond, 10000)
+	backend, tmppath := NewTmpBackend(100*time.Millisecond, 10000)
 	defer backend.Close()
 	defer backend.Close()
-	defer os.Remove("test")
+	defer os.Remove(tmppath)
 
 
 	// prepare keys
 	// prepare keys
 	keys := make([][]byte, b.N)
 	keys := make([][]byte, b.N)

+ 3 - 1
mvcc/backend/backend_test.go

@@ -69,7 +69,9 @@ func TestBackendSnapshot(t *testing.T) {
 	f.Close()
 	f.Close()
 
 
 	// bootstrap new backend from the snapshot
 	// bootstrap new backend from the snapshot
-	nb := New(f.Name(), time.Hour, 10000)
+	bcfg := DefaultBackendConfig()
+	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = f.Name(), time.Hour, 10000
+	nb := New(bcfg)
 	defer cleanup(nb, f.Name())
 	defer cleanup(nb, f.Name())
 
 
 	newTx := b.BatchTx()
 	newTx := b.BatchTx()

+ 1 - 2
mvcc/backend/boltoption_linux.go

@@ -27,6 +27,5 @@ import (
 // (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
 // (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
 // silently ignore this flag. Please update your kernel to prevent this.
 // silently ignore this flag. Please update your kernel to prevent this.
 var boltOpenOptions = &bolt.Options{
 var boltOpenOptions = &bolt.Options{
-	MmapFlags:       syscall.MAP_POPULATE,
-	InitialMmapSize: int(InitialMmapSize),
+	MmapFlags: syscall.MAP_POPULATE,
 }
 }

+ 3 - 1
tools/benchmark/cmd/mvcc.go

@@ -32,7 +32,9 @@ var (
 )
 )
 
 
 func initMVCC() {
 func initMVCC() {
-	be := backend.New("mvcc-bench", time.Duration(batchInterval), batchLimit)
+	bcfg := backend.DefaultBackendConfig()
+	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit
+	be := backend.New(bcfg)
 	s = mvcc.NewStore(be, &lease.FakeLessor{}, nil)
 	s = mvcc.NewStore(be, &lease.FakeLessor{}, nil)
 	os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
 	os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
 }
 }