12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package v3rpc
- import (
- "crypto/tls"
- "math"
- "github.com/coreos/etcd/etcdserver"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/grpc-ecosystem/go-grpc-middleware"
- "github.com/grpc-ecosystem/go-grpc-prometheus"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/health"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
- )
- const (
- grpcOverheadBytes = 512 * 1024
- maxStreams = math.MaxUint32
- maxSendBytes = math.MaxInt32
- )
- func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
- var opts []grpc.ServerOption
- opts = append(opts, grpc.CustomCodec(&codec{}))
- if tls != nil {
- opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
- }
- opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
- newLogUnaryInterceptor(s),
- newUnaryInterceptor(s),
- grpc_prometheus.UnaryServerInterceptor,
- )))
- opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
- newStreamInterceptor(s),
- grpc_prometheus.StreamServerInterceptor,
- )))
- opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
- opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
- opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
- grpcServer := grpc.NewServer(append(opts, gopts...)...)
- pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
- pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
- pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
- pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
- pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
- pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
- // server should register all the services manually
- // use empty service name for all etcd services' health status,
- // see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
- hsrv := health.NewServer()
- hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
- healthpb.RegisterHealthServer(grpcServer, hsrv)
- // set zero values for metrics registered for this grpc server
- grpc_prometheus.Register(grpcServer)
- return grpcServer
- }
|