Browse Source

*: refactor kv rpc implementation

Xiang Li 10 years ago
parent
commit
c37bd2385a
3 changed files with 71 additions and 28 deletions
  1. 1 1
      etcdmain/etcd.go
  2. 19 19
      etcdserver/api/v3rpc/key.go
  3. 51 8
      etcdserver/v3demo_server.go

+ 1 - 1
etcdmain/etcd.go

@@ -321,7 +321,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 	if cfg.v3demo {
 	if cfg.v3demo {
 		// set up v3 demo rpc
 		// set up v3 demo rpc
 		grpcServer := grpc.NewServer()
 		grpcServer := grpc.NewServer()
-		etcdserverpb.RegisterKVServer(grpcServer, v3rpc.New(s))
+		etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
 		etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
 		etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
 		go plog.Fatal(grpcServer.Serve(v3l))
 		go plog.Fatal(grpcServer.Serve(v3l))
 	}
 	}

+ 19 - 19
etcdserver/api/v3rpc/key.go

@@ -23,73 +23,73 @@ import (
 	"github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/storage"
 )
 )
 
 
-type handler struct {
-	server etcdserver.V3DemoServer
+type kvServer struct {
+	kv etcdserver.RaftKV
 }
 }
 
 
-func New(s etcdserver.V3DemoServer) pb.KVServer {
-	return &handler{s}
+func NewKVServer(s etcdserver.RaftKV) pb.KVServer {
+	return &kvServer{s}
 }
 }
 
 
-func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 	if err := checkRangeRequest(r); err != nil {
 	if err := checkRangeRequest(r); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r})
+	resp, err := s.kv.Range(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, togRPCError(err)
 		return nil, togRPCError(err)
 	}
 	}
 
 
-	return resp.(*pb.RangeResponse), err
+	return resp, err
 }
 }
 
 
-func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
 	if err := checkPutRequest(r); err != nil {
 	if err := checkPutRequest(r); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r})
+	resp, err := s.kv.Put(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, togRPCError(err)
 		return nil, togRPCError(err)
 	}
 	}
 
 
-	return resp.(*pb.PutResponse), err
+	return resp, err
 }
 }
 
 
-func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 	if err := checkDeleteRequest(r); err != nil {
 	if err := checkDeleteRequest(r); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r})
+	resp, err := s.kv.DeleteRange(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, togRPCError(err)
 		return nil, togRPCError(err)
 	}
 	}
 
 
-	return resp.(*pb.DeleteRangeResponse), err
+	return resp, err
 }
 }
 
 
-func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
 	if err := checkTxnRequest(r); err != nil {
 	if err := checkTxnRequest(r); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r})
+	resp, err := s.kv.Txn(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, togRPCError(err)
 		return nil, togRPCError(err)
 	}
 	}
 
 
-	return resp.(*pb.TxnResponse), err
+	return resp, err
 }
 }
 
 
-func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
-	resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r})
+func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+	resp, err := s.kv.Compact(ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, togRPCError(err)
 		return nil, togRPCError(err)
 	}
 	}
 
 
-	return resp.(*pb.CompactionResponse), nil
+	return resp, nil
 }
 }
 
 
 func checkRangeRequest(r *pb.RangeRequest) error {
 func checkRangeRequest(r *pb.RangeRequest) error {

+ 51 - 8
etcdserver/v3demo_server.go

@@ -23,8 +23,52 @@ import (
 	dstorage "github.com/coreos/etcd/storage"
 	dstorage "github.com/coreos/etcd/storage"
 )
 )
 
 
-type V3DemoServer interface {
-	V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error)
+type RaftKV interface {
+	Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
+	Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
+	DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
+	Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
+	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
+}
+
+func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+	result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Range: r})
+	if err != nil {
+		return nil, err
+	}
+	return result.resp.(*pb.RangeResponse), result.err
+}
+
+func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+	result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Put: r})
+	if err != nil {
+		return nil, err
+	}
+	return result.resp.(*pb.PutResponse), result.err
+}
+
+func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+	result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{DeleteRange: r})
+	if err != nil {
+		return nil, err
+	}
+	return result.resp.(*pb.DeleteRangeResponse), result.err
+}
+
+func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+	result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Txn: r})
+	if err != nil {
+		return nil, err
+	}
+	return result.resp.(*pb.TxnResponse), result.err
+}
+
+func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+	result, err := s.processInternalRaftReq(ctx, pb.InternalRaftRequest{Compaction: r})
+	if err != nil {
+		return nil, err
+	}
+	return result.resp.(*pb.CompactionResponse), result.err
 }
 }
 
 
 type applyResult struct {
 type applyResult struct {
@@ -32,12 +76,12 @@ type applyResult struct {
 	err  error
 	err  error
 }
 }
 
 
-func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
+func (s *EtcdServer) processInternalRaftReq(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
 	r.ID = s.reqIDGen.Next()
 	r.ID = s.reqIDGen.Next()
 
 
 	data, err := r.Marshal()
 	data, err := r.Marshal()
 	if err != nil {
 	if err != nil {
-		return &pb.EmptyResponse{}, err
+		return nil, err
 	}
 	}
 	ch := s.w.Register(r.ID)
 	ch := s.w.Register(r.ID)
 
 
@@ -45,13 +89,12 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr
 
 
 	select {
 	select {
 	case x := <-ch:
 	case x := <-ch:
-		result := x.(*applyResult)
-		return result.resp, result.err
+		return x.(*applyResult), nil
 	case <-ctx.Done():
 	case <-ctx.Done():
 		s.w.Trigger(r.ID, nil) // GC wait
 		s.w.Trigger(r.ID, nil) // GC wait
-		return &pb.EmptyResponse{}, ctx.Err()
+		return nil, ctx.Err()
 	case <-s.done:
 	case <-s.done:
-		return &pb.EmptyResponse{}, ErrStopped
+		return nil, ErrStopped
 	}
 	}
 }
 }