rpcserver.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package internal
  2. import (
  3. "net"
  4. "github.com/tal-tech/go-zero/core/proc"
  5. "github.com/tal-tech/go-zero/core/stat"
  6. "github.com/tal-tech/go-zero/zrpc/internal/serverinterceptors"
  7. "google.golang.org/grpc"
  8. )
  9. type (
  10. ServerOption func(options *rpcServerOptions)
  11. rpcServerOptions struct {
  12. metrics *stat.Metrics
  13. }
  14. rpcServer struct {
  15. name string
  16. *baseRpcServer
  17. }
  18. )
  19. func init() {
  20. InitLogger()
  21. }
  22. func NewRpcServer(address string, opts ...ServerOption) Server {
  23. var options rpcServerOptions
  24. for _, opt := range opts {
  25. opt(&options)
  26. }
  27. if options.metrics == nil {
  28. options.metrics = stat.NewMetrics(address)
  29. }
  30. return &rpcServer{
  31. baseRpcServer: newBaseRpcServer(address, options.metrics),
  32. }
  33. }
  34. func (s *rpcServer) SetName(name string) {
  35. s.name = name
  36. s.baseRpcServer.SetName(name)
  37. }
  38. func (s *rpcServer) Start(register RegisterFn) error {
  39. lis, err := net.Listen("tcp", s.address)
  40. if err != nil {
  41. return err
  42. }
  43. unaryInterceptors := []grpc.UnaryServerInterceptor{
  44. serverinterceptors.UnaryTracingInterceptor(s.name),
  45. serverinterceptors.UnaryCrashInterceptor(),
  46. serverinterceptors.UnaryStatInterceptor(s.metrics),
  47. serverinterceptors.UnaryPrometheusInterceptor(),
  48. }
  49. unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
  50. streamInterceptors := []grpc.StreamServerInterceptor{
  51. serverinterceptors.StreamCrashInterceptor,
  52. }
  53. streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
  54. options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
  55. WithStreamServerInterceptors(streamInterceptors...))
  56. server := grpc.NewServer(options...)
  57. register(server)
  58. // we need to make sure all others are wrapped up
  59. // so we do graceful stop at shutdown phase instead of wrap up phase
  60. shutdownCalled := proc.AddShutdownListener(func() {
  61. server.GracefulStop()
  62. })
  63. err = server.Serve(lis)
  64. shutdownCalled()
  65. return err
  66. }
  67. func WithMetrics(metrics *stat.Metrics) ServerOption {
  68. return func(options *rpcServerOptions) {
  69. options.metrics = metrics
  70. }
  71. }