1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- package internal
- import (
- "net"
- "github.com/tal-tech/go-zero/core/proc"
- "github.com/tal-tech/go-zero/core/stat"
- "github.com/tal-tech/go-zero/zrpc/internal/serverinterceptors"
- "google.golang.org/grpc"
- )
- type (
- // ServerOption defines the method to customize a rpcServerOptions.
- ServerOption func(options *rpcServerOptions)
- rpcServerOptions struct {
- metrics *stat.Metrics
- }
- rpcServer struct {
- name string
- *baseRpcServer
- }
- )
- func init() {
- InitLogger()
- }
- // NewRpcServer returns a Server.
- func NewRpcServer(address string, opts ...ServerOption) Server {
- var options rpcServerOptions
- for _, opt := range opts {
- opt(&options)
- }
- if options.metrics == nil {
- options.metrics = stat.NewMetrics(address)
- }
- return &rpcServer{
- baseRpcServer: newBaseRpcServer(address, options.metrics),
- }
- }
- func (s *rpcServer) SetName(name string) {
- s.name = name
- s.baseRpcServer.SetName(name)
- }
- func (s *rpcServer) Start(register RegisterFn) error {
- lis, err := net.Listen("tcp", s.address)
- if err != nil {
- return err
- }
- unaryInterceptors := []grpc.UnaryServerInterceptor{
- serverinterceptors.UnaryTracingInterceptor(s.name),
- serverinterceptors.UnaryCrashInterceptor(),
- serverinterceptors.UnaryStatInterceptor(s.metrics),
- serverinterceptors.UnaryPrometheusInterceptor(),
- }
- unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
- streamInterceptors := []grpc.StreamServerInterceptor{
- serverinterceptors.StreamCrashInterceptor,
- }
- streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
- options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
- WithStreamServerInterceptors(streamInterceptors...))
- server := grpc.NewServer(options...)
- register(server)
- // we need to make sure all others are wrapped up
- // so we do graceful stop at shutdown phase instead of wrap up phase
- waitForCalled := proc.AddWrapUpListener(func() {
- server.GracefulStop()
- })
- defer waitForCalled()
- return server.Serve(lis)
- }
- // WithMetrics returns a func that sets metrics to a Server.
- func WithMetrics(metrics *stat.Metrics) ServerOption {
- return func(options *rpcServerOptions) {
- options.metrics = metrics
- }
- }
|