rpcserver.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package internal
  2. import (
  3. "net"
  4. "github.com/tal-tech/go-zero/core/proc"
  5. "github.com/tal-tech/go-zero/core/stat"
  6. "github.com/tal-tech/go-zero/zrpc/internal/serverinterceptors"
  7. "google.golang.org/grpc"
  8. )
  9. type (
  10. ServerOption func(options *rpcServerOptions)
  11. rpcServerOptions struct {
  12. metrics *stat.Metrics
  13. }
  14. rpcServer struct {
  15. name string
  16. *baseRpcServer
  17. }
  18. )
  19. func init() {
  20. InitLogger()
  21. }
  22. func NewRpcServer(address string, opts ...ServerOption) Server {
  23. var options rpcServerOptions
  24. for _, opt := range opts {
  25. opt(&options)
  26. }
  27. if options.metrics == nil {
  28. options.metrics = stat.NewMetrics(address)
  29. }
  30. return &rpcServer{
  31. baseRpcServer: newBaseRpcServer(address, options.metrics),
  32. }
  33. }
  34. func (s *rpcServer) SetName(name string) {
  35. s.name = name
  36. s.baseRpcServer.SetName(name)
  37. }
  38. func (s *rpcServer) Start(register RegisterFn) error {
  39. lis, err := net.Listen("tcp", s.address)
  40. if err != nil {
  41. return err
  42. }
  43. unaryInterceptors := []grpc.UnaryServerInterceptor{
  44. serverinterceptors.UnaryTracingInterceptor(s.name),
  45. serverinterceptors.UnaryCrashInterceptor(),
  46. serverinterceptors.UnaryStatInterceptor(s.metrics),
  47. serverinterceptors.UnaryPrometheusInterceptor(),
  48. }
  49. unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
  50. streamInterceptors := []grpc.StreamServerInterceptor{
  51. serverinterceptors.StreamCrashInterceptor,
  52. }
  53. streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
  54. options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
  55. WithStreamServerInterceptors(streamInterceptors...))
  56. server := grpc.NewServer(options...)
  57. register(server)
  58. // we need to make sure all others are wrapped up
  59. // so we do graceful stop at shutdown phase instead of wrap up phase
  60. waitForCalled := proc.AddWrapUpListener(func() {
  61. server.GracefulStop()
  62. })
  63. defer waitForCalled()
  64. return server.Serve(lis)
  65. }
  66. func WithMetrics(metrics *stat.Metrics) ServerOption {
  67. return func(options *rpcServerOptions) {
  68. options.metrics = metrics
  69. }
  70. }