grpc.go 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "crypto/tls"
  17. "io/ioutil"
  18. "math"
  19. "os"
  20. "sync"
  21. "github.com/coreos/etcd/etcdserver"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "github.com/grpc-ecosystem/go-grpc-middleware"
  24. "github.com/grpc-ecosystem/go-grpc-prometheus"
  25. "google.golang.org/grpc"
  26. "google.golang.org/grpc/credentials"
  27. "google.golang.org/grpc/grpclog"
  28. "google.golang.org/grpc/health"
  29. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  30. )
  31. const (
  32. grpcOverheadBytes = 512 * 1024
  33. maxStreams = math.MaxUint32
  34. maxSendBytes = math.MaxInt32
  35. )
  36. // integration tests call this multiple times, which is racey in gRPC side
  37. var grpclogOnce sync.Once
  38. func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
  39. var opts []grpc.ServerOption
  40. opts = append(opts, grpc.CustomCodec(&codec{}))
  41. if tls != nil {
  42. opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
  43. }
  44. opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
  45. newLogUnaryInterceptor(s),
  46. newUnaryInterceptor(s),
  47. grpc_prometheus.UnaryServerInterceptor,
  48. )))
  49. opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
  50. newStreamInterceptor(s),
  51. grpc_prometheus.StreamServerInterceptor,
  52. )))
  53. opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
  54. opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
  55. opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
  56. grpcServer := grpc.NewServer(append(opts, gopts...)...)
  57. pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
  58. pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
  59. pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
  60. pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
  61. pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
  62. pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
  63. // server should register all the services manually
  64. // use empty service name for all etcd services' health status,
  65. // see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
  66. hsrv := health.NewServer()
  67. hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
  68. healthpb.RegisterHealthServer(grpcServer, hsrv)
  69. // set zero values for metrics registered for this grpc server
  70. grpc_prometheus.Register(grpcServer)
  71. grpclogOnce.Do(func() {
  72. if s.Cfg.Debug {
  73. grpc.EnableTracing = true
  74. // enable info, warning, error
  75. grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
  76. } else {
  77. // only discard info
  78. grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
  79. }
  80. })
  81. return grpcServer
  82. }