Browse Source

etcdserver, v3rpc: space quotas

Anthony Romano 9 years ago
parent
commit
9c8253c543

+ 2 - 2
etcdserver/api/v3rpc/grpc.go

@@ -30,9 +30,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
 	}
 	}
 
 
 	grpcServer := grpc.NewServer(opts...)
 	grpcServer := grpc.NewServer(opts...)
-	pb.RegisterKVServer(grpcServer, NewKVServer(s))
+	pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
 	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
 	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
-	pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s))
+	pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
 	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
 	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
 	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
 	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
 	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
 	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

+ 2 - 0
etcdserver/api/v3rpc/key.go

@@ -297,6 +297,8 @@ func togRPCError(err error) error {
 	// TODO: handle error from raft and timeout
 	// TODO: handle error from raft and timeout
 	case etcdserver.ErrRequestTooLarge:
 	case etcdserver.ErrRequestTooLarge:
 		return rpctypes.ErrRequestTooLarge
 		return rpctypes.ErrRequestTooLarge
+	case etcdserver.ErrNoSpace:
+		return rpctypes.ErrNoSpace
 	default:
 	default:
 		return grpc.Errorf(codes.Internal, err.Error())
 		return grpc.Errorf(codes.Internal, err.Error())
 	}
 	}

+ 61 - 0
etcdserver/api/v3rpc/quota.go

@@ -0,0 +1,61 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3rpc
+
+import (
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+)
+
+type quotaKVServer struct {
+	pb.KVServer
+	q etcdserver.Quota
+}
+
+func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
+	return &quotaKVServer{NewKVServer(s), etcdserver.NewBackendQuota(s)}
+}
+
+func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+	if !s.q.Available(r) {
+		return nil, rpctypes.ErrNoSpace
+	}
+	return s.KVServer.Put(ctx, r)
+}
+
+func (s *quotaKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+	if !s.q.Available(r) {
+		return nil, rpctypes.ErrNoSpace
+	}
+	return s.KVServer.Txn(ctx, r)
+}
+
+type quotaLeaseServer struct {
+	pb.LeaseServer
+	q etcdserver.Quota
+}
+
+func (s *quotaLeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
+	if !s.q.Available(cr) {
+		return nil, rpctypes.ErrNoSpace
+	}
+	return s.LeaseServer.LeaseCreate(ctx, cr)
+}
+
+func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
+	return &quotaLeaseServer{NewLeaseServer(s), etcdserver.NewBackendQuota(s)}
+}

+ 1 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -25,6 +25,7 @@ var (
 	ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "etcdserver: duplicate key given in txn request")
 	ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "etcdserver: duplicate key given in txn request")
 	ErrCompacted    = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision has been compacted")
 	ErrCompacted    = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision has been compacted")
 	ErrFutureRev    = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision is a future revision")
 	ErrFutureRev    = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision is a future revision")
+	ErrNoSpace      = grpc.Errorf(codes.ResourceExhausted, "etcdserver: storage: database space exceeded")
 
 
 	ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found")
 	ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found")
 	ErrLeaseExist    = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists")
 	ErrLeaseExist    = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists")

+ 36 - 0
etcdserver/apply.go

@@ -394,6 +394,42 @@ func (a *applierV3backend) UserAdd(r *pb.UserAddRequest) (*pb.UserAddResponse, e
 	return a.s.AuthStore().UserAdd(r)
 	return a.s.AuthStore().UserAdd(r)
 }
 }
 
 
+type quotaApplierV3 struct {
+	applierV3
+	q Quota
+}
+
+func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
+	return &quotaApplierV3{app, NewBackendQuota(s)}
+}
+
+func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
+	ok := a.q.Available(p)
+	resp, err := a.applierV3.Put(txnID, p)
+	if err == nil && !ok {
+		err = ErrNoSpace
+	}
+	return resp, err
+}
+
+func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
+	ok := a.q.Available(rt)
+	resp, err := a.applierV3.Txn(rt)
+	if err == nil && !ok {
+		err = ErrNoSpace
+	}
+	return resp, err
+}
+
+func (a *quotaApplierV3) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
+	ok := a.q.Available(lc)
+	resp, err := a.applierV3.LeaseCreate(lc)
+	if err == nil && !ok {
+		err = ErrNoSpace
+	}
+	return resp, err
+}
+
 type kvSort struct{ kvs []storagepb.KeyValue }
 type kvSort struct{ kvs []storagepb.KeyValue }
 
 
 func (s *kvSort) Swap(i, j int) {
 func (s *kvSort) Swap(i, j int) {

+ 1 - 0
etcdserver/errors.go

@@ -35,6 +35,7 @@ var (
 	ErrNotEnoughStartedMembers    = errors.New("etcdserver: re-configuration failed due to not enough started members")
 	ErrNotEnoughStartedMembers    = errors.New("etcdserver: re-configuration failed due to not enough started members")
 	ErrNoLeader                   = errors.New("etcdserver: no leader")
 	ErrNoLeader                   = errors.New("etcdserver: no leader")
 	ErrRequestTooLarge            = errors.New("etcdserver: request is too large")
 	ErrRequestTooLarge            = errors.New("etcdserver: request is too large")
+	ErrNoSpace                    = errors.New("etcdserver: no space")
 )
 )
 
 
 func isKeyNotFound(err error) bool {
 func isKeyNotFound(err error) bool {

+ 95 - 0
etcdserver/quota.go

@@ -0,0 +1,95 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage/backend"
+)
+
+// Quota represents an arbitrary quota against arbitrary requests. Each request
+// costs some charge; if there is not enough remaining charge, then there are
+// too few resources available within the quota to apply the request.
+type Quota interface {
+	// Available judges whether the given request fits within the quota.
+	Available(req interface{}) bool
+	// Cost computes the charge against the quota for a given request.
+	Cost(req interface{}) int
+	// Remaining is the amount of charge left for the quota.
+	Remaining() int64
+}
+
+type backendQuota struct {
+	s               *EtcdServer
+	maxBackendBytes int64
+}
+
+const (
+	// leaseOverhead is an estimate for the cost of storing a lease
+	leaseOverhead = 64
+	// kvOverhead is an estimate for the cost of storing a key's metadata
+	kvOverhead = 256
+)
+
+func NewBackendQuota(s *EtcdServer) Quota {
+	return &backendQuota{s, backend.InitialMmapSize}
+}
+
+func (b *backendQuota) Available(v interface{}) bool {
+	// TODO: maybe optimize backend.Size()
+	return b.s.Backend().Size()+int64(b.Cost(v)) < b.maxBackendBytes
+}
+
+func (b *backendQuota) Cost(v interface{}) int {
+	switch r := v.(type) {
+	case *pb.PutRequest:
+		return costPut(r)
+	case *pb.TxnRequest:
+		return costTxn(r)
+	case *pb.LeaseCreateRequest:
+		return leaseOverhead
+	default:
+		panic("unexpected cost")
+	}
+}
+
+func costPut(r *pb.PutRequest) int { return kvOverhead + len(r.Key) + len(r.Value) }
+
+func costTxnReq(u *pb.RequestUnion) int {
+	r := u.GetRequestPut()
+	if r == nil {
+		return 0
+	}
+	return costPut(r)
+}
+
+func costTxn(r *pb.TxnRequest) int {
+	sizeSuccess := 0
+	for _, u := range r.Success {
+		sizeSuccess += costTxnReq(u)
+	}
+	sizeFailure := 0
+	for _, u := range r.Failure {
+		sizeFailure += costTxnReq(u)
+	}
+	if sizeFailure > sizeSuccess {
+		return sizeFailure
+	}
+	return sizeSuccess
+}
+
+func (b *backendQuota) Remaining() int64 {
+	return b.maxBackendBytes - b.s.Backend().Size()
+}

+ 11 - 7
etcdserver/server.go

@@ -374,7 +374,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
 		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
 		srv.compactor.Run()
 		srv.compactor.Run()
 	}
 	}
-	srv.applyV3 = &applierV3backend{srv}
+	srv.applyV3 = newQuotaApplierV3(srv, &applierV3backend{srv})
 
 
 	// TODO: move transport initialization near the definition of remote
 	// TODO: move transport initialization near the definition of remote
 	tr := &rafthttp.Transport{
 	tr := &rafthttp.Transport{
@@ -1007,13 +1007,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 				var r pb.Request
 				var r pb.Request
 				pbutil.MustUnmarshal(&r, e.Data)
 				pbutil.MustUnmarshal(&r, e.Data)
 				s.w.Trigger(r.ID, s.applyRequest(r))
 				s.w.Trigger(r.ID, s.applyRequest(r))
+			} else if raftReq.V2 != nil {
+				req := raftReq.V2
+				s.w.Trigger(req.ID, s.applyRequest(*req))
 			} else {
 			} else {
-				switch {
-				case raftReq.V2 != nil:
-					req := raftReq.V2
-					s.w.Trigger(req.ID, s.applyRequest(*req))
-				default:
-					s.w.Trigger(raftReq.ID, s.applyV3Request(&raftReq))
+				ar := s.applyV3Request(&raftReq)
+				s.w.Trigger(raftReq.ID, ar)
+				if ar.err == ErrNoSpace {
+					plog.Errorf("applying raft message exceeded backend quota")
+					// TODO: send alarm
+					s.errorc <- ar.err
+					return applied, true
 				}
 				}
 			}
 			}
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:

+ 89 - 0
integration/v3_grpc_test.go

@@ -24,6 +24,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/storage/backend"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 )
 )
@@ -455,6 +456,94 @@ func TestV3Hash(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
+func TestV3StorageQuotaAPI(t *testing.T) {
+	oldSize := backend.InitialMmapSize
+	defer func() {
+		backend.InitialMmapSize = oldSize
+		testutil.AfterTest(t)
+	}()
+
+	backend.InitialMmapSize = 64 * 1024
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+	kvc := toGRPC(clus.RandClient()).KV
+
+	key := []byte("abc")
+
+	// test small put that fits in quota
+	smallbuf := make([]byte, 512)
+	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
+		t.Fatal(err)
+	}
+
+	// test big put
+	bigbuf := make([]byte, 64*1024)
+	_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
+	if err == nil || err != rpctypes.ErrNoSpace {
+		t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrNoSpace)
+	}
+
+	// test big txn
+	puttxn := &pb.RequestUnion{
+		Request: &pb.RequestUnion_RequestPut{
+			RequestPut: &pb.PutRequest{
+				Key:   key,
+				Value: bigbuf,
+			},
+		},
+	}
+	txnreq := &pb.TxnRequest{}
+	txnreq.Success = append(txnreq.Success, puttxn)
+	_, txnerr := kvc.Txn(context.TODO(), txnreq)
+	if txnerr == nil || err != rpctypes.ErrNoSpace {
+		t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrNoSpace)
+	}
+}
+
+// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
+func TestV3StorageQuotaApply(t *testing.T) {
+	oldSize := backend.InitialMmapSize
+	defer func() {
+		backend.InitialMmapSize = oldSize
+		testutil.AfterTest(t)
+	}()
+
+	clus := NewClusterV3(t, &ClusterConfig{Size: 2})
+	defer clus.Terminate(t)
+	kvc0 := toGRPC(clus.Client(0)).KV
+	kvc1 := toGRPC(clus.Client(1)).KV
+
+	// force a node to have a different quota
+	backend.InitialMmapSize = 64 * 1024
+	clus.Members[0].Stop(t)
+	clus.Members[0].Restart(t)
+	clus.waitLeader(t, clus.Members)
+
+	key := []byte("abc")
+
+	// test small put still works
+	smallbuf := make([]byte, 1024)
+	_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
+	if serr != nil {
+		t.Fatal(serr)
+	}
+
+	// test big put
+	bigbuf := make([]byte, 64*1024)
+	_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// small quota machine should reject put
+	// first, synchronize with the cluster via quorum get
+	kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
+		t.Fatalf("past-quota instance should reject put")
+	}
+}
+
 func TestV3RangeRequest(t *testing.T) {
 func TestV3RangeRequest(t *testing.T) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 	tests := []struct {
 	tests := []struct {