Browse Source

etcdserver: add maintain service to support defrag

Xiang Li 9 years ago
parent
commit
2f12ea893b

+ 1 - 0
etcdserver/api/v3rpc/grpc.go

@@ -37,5 +37,6 @@ func Server(s *etcdserver.EtcdServer, tls *transport.TLSInfo) (*grpc.Server, err
 	pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s))
 	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
 	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
+	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
 	return grpcServer, nil
 }

+ 45 - 0
etcdserver/api/v3rpc/maintenance.go

@@ -0,0 +1,45 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3rpc
+
+import (
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/etcdserver"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/storage/backend"
+)
+
+type BackendGetter interface {
+	Backend() backend.Backend
+}
+
+type maintenanceServer struct {
+	bg BackendGetter
+}
+
+func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
+	return &maintenanceServer{bg: s}
+}
+
+func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
+	plog.Noticef("starting to defragment the storage backend...")
+	err := ms.bg.Backend().Defrag()
+	if err != nil {
+		plog.Errorf("failed to deframent the storage backend (%v)", err)
+		return nil, err
+	}
+	plog.Noticef("finished defragmenting the storage backend")
+	return &pb.DefragmentResponse{}, nil
+}

+ 2 - 0
etcdserver/etcdserverpb/etcdserver.pb.go

@@ -50,6 +50,8 @@
 		MemberUpdateResponse
 		MemberListRequest
 		MemberListResponse
+		DefragmentRequest
+		DefragmentResponse
 		AuthEnableRequest
 		AuthDisableRequest
 		AuthenticateRequest

+ 306 - 28
etcdserver/etcdserverpb/rpc.pb.go

@@ -1137,6 +1137,28 @@ func (m *MemberListResponse) GetMembers() []*Member {
 	return nil
 }
 
+type DefragmentRequest struct {
+}
+
+func (m *DefragmentRequest) Reset()         { *m = DefragmentRequest{} }
+func (m *DefragmentRequest) String() string { return proto.CompactTextString(m) }
+func (*DefragmentRequest) ProtoMessage()    {}
+
+type DefragmentResponse struct {
+	Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
+}
+
+func (m *DefragmentResponse) Reset()         { *m = DefragmentResponse{} }
+func (m *DefragmentResponse) String() string { return proto.CompactTextString(m) }
+func (*DefragmentResponse) ProtoMessage()    {}
+
+func (m *DefragmentResponse) GetHeader() *ResponseHeader {
+	if m != nil {
+		return m.Header
+	}
+	return nil
+}
+
 type AuthEnableRequest struct {
 }
 
@@ -1481,6 +1503,8 @@ func init() {
 	proto.RegisterType((*MemberUpdateResponse)(nil), "etcdserverpb.MemberUpdateResponse")
 	proto.RegisterType((*MemberListRequest)(nil), "etcdserverpb.MemberListRequest")
 	proto.RegisterType((*MemberListResponse)(nil), "etcdserverpb.MemberListResponse")
+	proto.RegisterType((*DefragmentRequest)(nil), "etcdserverpb.DefragmentRequest")
+	proto.RegisterType((*DefragmentResponse)(nil), "etcdserverpb.DefragmentResponse")
 	proto.RegisterType((*AuthEnableRequest)(nil), "etcdserverpb.AuthEnableRequest")
 	proto.RegisterType((*AuthDisableRequest)(nil), "etcdserverpb.AuthDisableRequest")
 	proto.RegisterType((*AuthenticateRequest)(nil), "etcdserverpb.AuthenticateRequest")
@@ -2154,6 +2178,65 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{
 	Streams: []grpc.StreamDesc{},
 }
 
+// Client API for Maintenance service
+
+type MaintenanceClient interface {
+	// TODO: move Hash from kv to Maintenance
+	Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error)
+}
+
+type maintenanceClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewMaintenanceClient(cc *grpc.ClientConn) MaintenanceClient {
+	return &maintenanceClient{cc}
+}
+
+func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) {
+	out := new(DefragmentResponse)
+	err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// Server API for Maintenance service
+
+type MaintenanceServer interface {
+	// TODO: move Hash from kv to Maintenance
+	Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error)
+}
+
+func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
+	s.RegisterService(&_Maintenance_serviceDesc, srv)
+}
+
+func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
+	in := new(DefragmentRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	out, err := srv.(MaintenanceServer).Defragment(ctx, in)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+var _Maintenance_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "etcdserverpb.Maintenance",
+	HandlerType: (*MaintenanceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Defragment",
+			Handler:    _Maintenance_Defragment_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{},
+}
+
 // Client API for Auth service
 
 type AuthClient interface {
@@ -3984,6 +4067,52 @@ func (m *MemberListResponse) MarshalTo(data []byte) (int, error) {
 	return i, nil
 }
 
+func (m *DefragmentRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *DefragmentRequest) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	return i, nil
+}
+
+func (m *DefragmentResponse) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Header != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
+		n28, err := m.Header.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n28
+	}
+	return i, nil
+}
+
 func (m *AuthEnableRequest) Marshal() (data []byte, err error) {
 	size := m.Size()
 	data = make([]byte, size)
@@ -4255,11 +4384,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n28, err := m.Header.MarshalTo(data[i:])
+		n29, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n28
+		i += n29
 	}
 	return i, nil
 }
@@ -4283,11 +4412,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n29, err := m.Header.MarshalTo(data[i:])
+		n30, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n29
+		i += n30
 	}
 	return i, nil
 }
@@ -4311,11 +4440,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n30, err := m.Header.MarshalTo(data[i:])
+		n31, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n30
+		i += n31
 	}
 	return i, nil
 }
@@ -4339,11 +4468,11 @@ func (m *UserAddResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n31, err := m.Header.MarshalTo(data[i:])
+		n32, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n31
+		i += n32
 	}
 	return i, nil
 }
@@ -4367,11 +4496,11 @@ func (m *UserGetResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n32, err := m.Header.MarshalTo(data[i:])
+		n33, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n32
+		i += n33
 	}
 	return i, nil
 }
@@ -4395,11 +4524,11 @@ func (m *UserDeleteResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n33, err := m.Header.MarshalTo(data[i:])
+		n34, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n33
+		i += n34
 	}
 	return i, nil
 }
@@ -4423,11 +4552,11 @@ func (m *UserChangePasswordResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n34, err := m.Header.MarshalTo(data[i:])
+		n35, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n34
+		i += n35
 	}
 	return i, nil
 }
@@ -4451,11 +4580,11 @@ func (m *UserGrantResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n35, err := m.Header.MarshalTo(data[i:])
+		n36, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n35
+		i += n36
 	}
 	return i, nil
 }
@@ -4479,11 +4608,11 @@ func (m *UserRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n36, err := m.Header.MarshalTo(data[i:])
+		n37, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n36
+		i += n37
 	}
 	return i, nil
 }
@@ -4507,11 +4636,11 @@ func (m *RoleAddResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n37, err := m.Header.MarshalTo(data[i:])
+		n38, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n37
+		i += n38
 	}
 	return i, nil
 }
@@ -4535,11 +4664,11 @@ func (m *RoleGetResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n38, err := m.Header.MarshalTo(data[i:])
+		n39, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n38
+		i += n39
 	}
 	return i, nil
 }
@@ -4563,11 +4692,11 @@ func (m *RoleDeleteResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n39, err := m.Header.MarshalTo(data[i:])
+		n40, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n39
+		i += n40
 	}
 	return i, nil
 }
@@ -4591,11 +4720,11 @@ func (m *RoleGrantResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n40, err := m.Header.MarshalTo(data[i:])
+		n41, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n40
+		i += n41
 	}
 	return i, nil
 }
@@ -4619,11 +4748,11 @@ func (m *RoleRevokeResponse) MarshalTo(data []byte) (int, error) {
 		data[i] = 0xa
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
-		n41, err := m.Header.MarshalTo(data[i:])
+		n42, err := m.Header.MarshalTo(data[i:])
 		if err != nil {
 			return 0, err
 		}
-		i += n41
+		i += n42
 	}
 	return i, nil
 }
@@ -5272,6 +5401,22 @@ func (m *MemberListResponse) Size() (n int) {
 	return n
 }
 
+func (m *DefragmentRequest) Size() (n int) {
+	var l int
+	_ = l
+	return n
+}
+
+func (m *DefragmentResponse) Size() (n int) {
+	var l int
+	_ = l
+	if m.Header != nil {
+		l = m.Header.Size()
+		n += 1 + l + sovRpc(uint64(l))
+	}
+	return n
+}
+
 func (m *AuthEnableRequest) Size() (n int) {
 	var l int
 	_ = l
@@ -9461,6 +9606,139 @@ func (m *MemberListResponse) Unmarshal(data []byte) error {
 	}
 	return nil
 }
+func (m *DefragmentRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowRpc
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: DefragmentRequest: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: DefragmentRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		default:
+			iNdEx = preIndex
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *DefragmentResponse) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowRpc
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: DefragmentResponse: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: DefragmentResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowRpc
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthRpc
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Header == nil {
+				m.Header = &ResponseHeader{}
+			}
+			if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipRpc(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthRpc
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
 func (m *AuthEnableRequest) Unmarshal(data []byte) error {
 	l := len(data)
 	iNdEx := 0

+ 13 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -76,6 +76,11 @@ service Cluster {
   rpc MemberList(MemberListRequest) returns (MemberListResponse) {}
 }
 
+service Maintenance {
+  // TODO: move Hash from kv to Maintenance
+  rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
+}
+
 service Auth {
   // AuthEnable enables authentication.
   rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) {}
@@ -425,6 +430,14 @@ message MemberListResponse {
   repeated Member members = 2;
 }
 
+message DefragmentRequest {
+
+}
+
+message DefragmentResponse {
+  ResponseHeader header = 1;
+}
+
 message AuthEnableRequest {
 }
 

+ 9 - 0
etcdserver/server.go

@@ -24,6 +24,7 @@ import (
 	"os"
 	"path"
 	"regexp"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -173,6 +174,7 @@ type EtcdServer struct {
 
 	kv     dstorage.ConsistentWatchableKV
 	lessor lease.Lessor
+	bemu   sync.Mutex
 	be     backend.Backend
 
 	stats  *stats.ServerStats
@@ -604,6 +606,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		// Closing old backend might block until all the txns
 		// on the backend are finished.
 		// We do not want to wait on closing the old backend.
+		s.bemu.Lock()
 		oldbe := s.be
 		go func() {
 			if err := oldbe.Close(); err != nil {
@@ -612,6 +615,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		}()
 
 		s.be = newbe
+		s.bemu.Unlock()
 	}
 	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 		plog.Panicf("recovery store error: %v", err)
@@ -1313,3 +1317,8 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
 }
 
 func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv }
+func (s *EtcdServer) Backend() backend.Backend {
+	s.bemu.Lock()
+	defer s.bemu.Unlock()
+	return s.be
+}