Browse Source

Merge pull request #7612 from gyuho/adapter-maintenance-API

*: adapter maintenance api
Gyu-Ho Lee 8 years ago
parent
commit
eafab47f05

+ 26 - 8
clientv3/maintenance.go

@@ -18,6 +18,7 @@ import (
 	"io"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 )
@@ -53,12 +54,31 @@ type Maintenance interface {
 }
 
 type maintenance struct {
-	c      *Client
+	dial   func(endpoint string) (pb.MaintenanceClient, func(), error)
 	remote pb.MaintenanceClient
 }
 
 func NewMaintenance(c *Client) Maintenance {
-	return &maintenance{c: c, remote: pb.NewMaintenanceClient(c.conn)}
+	return &maintenance{
+		dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
+			conn, err := c.dial(endpoint)
+			if err != nil {
+				return nil, nil, err
+			}
+			cancel := func() { conn.Close() }
+			return pb.NewMaintenanceClient(conn), cancel, nil
+		},
+		remote: pb.NewMaintenanceClient(c.conn),
+	}
+}
+
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
+	return &maintenance{
+		dial: func(string) (pb.MaintenanceClient, func(), error) {
+			return remote, func() {}, nil
+		},
+		remote: remote,
+	}
 }
 
 func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
@@ -109,12 +129,11 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 }
 
 func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
-	conn, err := m.c.Dial(endpoint)
+	remote, cancel, err := m.dial(endpoint)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
-	defer conn.Close()
-	remote := pb.NewMaintenanceClient(conn)
+	defer cancel()
 	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false))
 	if err != nil {
 		return nil, toErr(ctx, err)
@@ -123,12 +142,11 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
 }
 
 func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
-	conn, err := m.c.Dial(endpoint)
+	remote, cancel, err := m.dial(endpoint)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
-	defer conn.Close()
-	remote := pb.NewMaintenanceClient(conn)
+	defer cancel()
 	resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false))
 	if err != nil {
 		return nil, toErr(ctx, err)

+ 3 - 0
etcdserver/api/v3client/v3client.go

@@ -39,5 +39,8 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
 	wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
 	c.Watcher = clientv3.NewWatchFromWatchClient(wc)
 
+	mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
+	c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc)
+
 	return c
 }

+ 2 - 1
integration/cluster_proxy.go

@@ -56,13 +56,14 @@ func toGRPC(c *clientv3.Client) grpcAPI {
 	kvp, kvpch := grpcproxy.NewKvProxy(c)
 	wp, wpch := grpcproxy.NewWatchProxy(c)
 	lp, lpch := grpcproxy.NewLeaseProxy(c)
+	mp := grpcproxy.NewMaintenanceProxy(c)
 
 	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
 		adapter.KvServerToKvClient(kvp),
 		adapter.LeaseServerToLeaseClient(lp),
 		adapter.WatchServerToWatchClient(wp),
-		pb.NewMaintenanceClient(c.ActiveConnection()),
+		adapter.MaintenanceServerToMaintenanceClient(mp),
 		pb.NewAuthClient(c.ActiveConnection()),
 	}
 	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}

+ 79 - 0
proxy/grpcproxy/adapter/maintenance_client_adapter.go

@@ -0,0 +1,79 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+type mts2mtc struct{ mts pb.MaintenanceServer }
+
+func MaintenanceServerToMaintenanceClient(mts pb.MaintenanceServer) pb.MaintenanceClient {
+	return &mts2mtc{mts}
+}
+
+func (s *mts2mtc) Alarm(ctx context.Context, r *pb.AlarmRequest, opts ...grpc.CallOption) (*pb.AlarmResponse, error) {
+	return s.mts.Alarm(ctx, r)
+}
+
+func (s *mts2mtc) Status(ctx context.Context, r *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) {
+	return s.mts.Status(ctx, r)
+}
+
+func (s *mts2mtc) Defragment(ctx context.Context, dr *pb.DefragmentRequest, opts ...grpc.CallOption) (*pb.DefragmentResponse, error) {
+	return s.mts.Defragment(ctx, dr)
+}
+
+func (s *mts2mtc) Hash(ctx context.Context, r *pb.HashRequest, opts ...grpc.CallOption) (*pb.HashResponse, error) {
+	return s.mts.Hash(ctx, r)
+}
+
+func (s *mts2mtc) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (pb.Maintenance_SnapshotClient, error) {
+	cs := newPipeStream(ctx, func(ss chanServerStream) error {
+		return s.mts.Snapshot(in, &ss2scServerStream{ss})
+	})
+	return &ss2scClientStream{cs}, nil
+}
+
+// ss2scClientStream implements Maintenance_SnapshotClient
+type ss2scClientStream struct{ chanClientStream }
+
+// ss2scServerStream implements Maintenance_SnapshotServer
+type ss2scServerStream struct{ chanServerStream }
+
+func (s *ss2scClientStream) Send(rr *pb.SnapshotRequest) error {
+	return s.SendMsg(rr)
+}
+func (s *ss2scClientStream) Recv() (*pb.SnapshotResponse, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.SnapshotResponse), nil
+}
+
+func (s *ss2scServerStream) Send(rr *pb.SnapshotResponse) error {
+	return s.SendMsg(rr)
+}
+func (s *ss2scServerStream) Recv() (*pb.SnapshotRequest, error) {
+	var v interface{}
+	if err := s.RecvMsg(&v); err != nil {
+		return nil, err
+	}
+	return v.(*pb.SnapshotRequest), nil
+}