Browse Source

Merge pull request #3528 from xiang90/compact

*: support v3 compaction
Xiang Li 10 years ago
parent
commit
b1c2d7e526

+ 55 - 0
etcdctlv3/command/compaction.go

@@ -0,0 +1,55 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"strconv"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// NewCompactionCommand returns the CLI command for "compaction".
+func NewCompactionCommand() cli.Command {
+	return cli.Command{
+		Name: "compaction",
+		Action: func(c *cli.Context) {
+			compactionCommandFunc(c)
+		},
+	}
+}
+
+// compactionCommandFunc executes the "compaction" command.
+func compactionCommandFunc(c *cli.Context) {
+	if len(c.Args()) != 1 {
+		panic("bad arg")
+	}
+
+	rev, err := strconv.ParseInt(c.Args()[0], 10, 64)
+	if err != nil {
+		panic("bad arg")
+	}
+
+	conn, err := grpc.Dial(c.GlobalString("endpoint"))
+	if err != nil {
+		panic(err)
+	}
+	etcd := pb.NewEtcdClient(conn)
+	req := &pb.CompactionRequest{Revision: rev}
+
+	etcd.Compact(context.Background(), req)
+}

+ 1 - 0
etcdctlv3/main.go

@@ -35,6 +35,7 @@ func main() {
 		command.NewPutCommand(),
 		command.NewDeleteRangeCommand(),
 		command.NewTxnCommand(),
+		command.NewCompactionCommand(),
 	}
 
 	app.Run(os.Args)

+ 6 - 1
etcdserver/api/v3rpc/key.go

@@ -84,7 +84,12 @@ func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e
 }
 
 func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
-	panic("not implemented")
+	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r})
+	if err != nil {
+		err = togRPCError(err)
+	}
+
+	return resp.(*pb.CompactionResponse), nil
 }
 
 func checkRangeRequest(r *pb.RangeRequest) error {

+ 45 - 0
etcdserver/etcdserverpb/raft_internal.pb.go

@@ -23,6 +23,7 @@ type InternalRaftRequest struct {
 	Put         *PutRequest         `protobuf:"bytes,4,opt,name=put" json:"put,omitempty"`
 	DeleteRange *DeleteRangeRequest `protobuf:"bytes,5,opt,name=delete_range" json:"delete_range,omitempty"`
 	Txn         *TxnRequest         `protobuf:"bytes,6,opt,name=txn" json:"txn,omitempty"`
+	Compaction  *CompactionRequest  `protobuf:"bytes,7,opt,name=compaction" json:"compaction,omitempty"`
 }
 
 func (m *InternalRaftRequest) Reset()         { *m = InternalRaftRequest{} }
@@ -106,6 +107,16 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) {
 		}
 		i += n5
 	}
+	if m.Compaction != nil {
+		data[i] = 0x3a
+		i++
+		i = encodeVarintRaftInternal(data, i, uint64(m.Compaction.Size()))
+		n6, err := m.Compaction.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n6
+	}
 	return i, nil
 }
 
@@ -180,6 +191,10 @@ func (m *InternalRaftRequest) Size() (n int) {
 		l = m.Txn.Size()
 		n += 1 + l + sovRaftInternal(uint64(l))
 	}
+	if m.Compaction != nil {
+		l = m.Compaction.Size()
+		n += 1 + l + sovRaftInternal(uint64(l))
+	}
 	return n
 }
 
@@ -387,6 +402,36 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
 				return err
 			}
 			iNdEx = postIndex
+		case 7:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Compaction", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthRaftInternal
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.Compaction == nil {
+				m.Compaction = &CompactionRequest{}
+			}
+			if err := m.Compaction.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
 		default:
 			var sizeOfWire int
 			for {

+ 1 - 0
etcdserver/etcdserverpb/raft_internal.proto

@@ -19,6 +19,7 @@ message InternalRaftRequest {
   PutRequest put = 4;
   DeleteRangeRequest delete_range = 5;
   TxnRequest txn = 6;
+  CompactionRequest compaction = 7;
 }
 
 message EmptyResponse {

+ 14 - 0
etcdserver/v3demo_server.go

@@ -60,6 +60,8 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
 		return applyDeleteRange(s.kv, r.DeleteRange)
 	case r.Txn != nil:
 		return applyTxn(s.kv, r.Txn)
+	case r.Compaction != nil:
+		return applyCompaction(s.kv, r.Compaction)
 	default:
 		panic("not implemented")
 	}
@@ -128,6 +130,18 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse {
 	return txnResp
 }
 
+func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) *pb.CompactionResponse {
+	resp := &pb.CompactionResponse{}
+	resp.Header = &pb.ResponseHeader{}
+	err := kv.Compact(compaction.Revision)
+	if err != nil {
+		panic("handle error")
+	}
+	// get the current revision. which key to get is not important.
+	_, resp.Header.Revision, _ = kv.Range([]byte("compaction"), nil, 1, 0)
+	return resp
+}
+
 func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
 	switch {
 	case union.RequestRange != nil: