proxy.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package zrpc
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/syncx"
  6. "github.com/tal-tech/go-zero/zrpc/internal"
  7. "github.com/tal-tech/go-zero/zrpc/internal/auth"
  8. "google.golang.org/grpc"
  9. )
  10. type RpcProxy struct {
  11. backend string
  12. clients map[string]Client
  13. options []internal.ClientOption
  14. sharedCalls syncx.SharedCalls
  15. lock sync.Mutex
  16. }
  17. func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
  18. return &RpcProxy{
  19. backend: backend,
  20. clients: make(map[string]Client),
  21. options: opts,
  22. sharedCalls: syncx.NewSharedCalls(),
  23. }
  24. }
  25. func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
  26. cred := auth.ParseCredential(ctx)
  27. key := cred.App + "/" + cred.Token
  28. val, err := p.sharedCalls.Do(key, func() (interface{}, error) {
  29. p.lock.Lock()
  30. client, ok := p.clients[key]
  31. p.lock.Unlock()
  32. if ok {
  33. return client, nil
  34. }
  35. opts := append(p.options, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
  36. App: cred.App,
  37. Token: cred.Token,
  38. })))
  39. client, err := NewClientWithTarget(p.backend, opts...)
  40. if err != nil {
  41. return nil, err
  42. }
  43. p.lock.Lock()
  44. p.clients[key] = client
  45. p.lock.Unlock()
  46. return client, nil
  47. })
  48. if err != nil {
  49. return nil, err
  50. }
  51. return val.(Client).Conn(), nil
  52. }