proxy.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package zrpc
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/syncx"
  6. "github.com/tal-tech/go-zero/zrpc/internal"
  7. "github.com/tal-tech/go-zero/zrpc/internal/auth"
  8. "google.golang.org/grpc"
  9. )
  10. // A RpcProxy is a rpc proxy.
  11. type RpcProxy struct {
  12. backend string
  13. clients map[string]Client
  14. options []internal.ClientOption
  15. sharedCalls syncx.SharedCalls
  16. lock sync.Mutex
  17. }
  18. // NewProxy returns a RpcProxy.
  19. func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
  20. return &RpcProxy{
  21. backend: backend,
  22. clients: make(map[string]Client),
  23. options: opts,
  24. sharedCalls: syncx.NewSharedCalls(),
  25. }
  26. }
  27. // TakeConn returns a grpc.ClientConn.
  28. func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
  29. cred := auth.ParseCredential(ctx)
  30. key := cred.App + "/" + cred.Token
  31. val, err := p.sharedCalls.Do(key, func() (interface{}, error) {
  32. p.lock.Lock()
  33. client, ok := p.clients[key]
  34. p.lock.Unlock()
  35. if ok {
  36. return client, nil
  37. }
  38. opts := append(p.options, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
  39. App: cred.App,
  40. Token: cred.Token,
  41. })))
  42. client, err := NewClientWithTarget(p.backend, opts...)
  43. if err != nil {
  44. return nil, err
  45. }
  46. p.lock.Lock()
  47. p.clients[key] = client
  48. p.lock.Unlock()
  49. return client, nil
  50. })
  51. if err != nil {
  52. return nil, err
  53. }
  54. return val.(Client).Conn(), nil
  55. }