resolver_conn_wrapper.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "strings"
  21. "google.golang.org/grpc/grpclog"
  22. "google.golang.org/grpc/resolver"
  23. )
  24. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  25. // It implements resolver.ClientConnection interface.
  26. type ccResolverWrapper struct {
  27. cc *ClientConn
  28. resolver resolver.Resolver
  29. addrCh chan []resolver.Address
  30. scCh chan string
  31. done chan struct{}
  32. }
  33. // split2 returns the values from strings.SplitN(s, sep, 2).
  34. // If sep is not found, it returns "", s instead.
  35. func split2(s, sep string) (string, string) {
  36. spl := strings.SplitN(s, sep, 2)
  37. if len(spl) < 2 {
  38. return "", s
  39. }
  40. return spl[0], spl[1]
  41. }
  42. // parseTarget splits target into a struct containing scheme, authority and
  43. // endpoint.
  44. func parseTarget(target string) (ret resolver.Target) {
  45. ret.Scheme, ret.Endpoint = split2(target, "://")
  46. ret.Authority, ret.Endpoint = split2(ret.Endpoint, "/")
  47. return ret
  48. }
  49. // newCCResolverWrapper parses cc.target for scheme and gets the resolver
  50. // builder for this scheme. It then builds the resolver and starts the
  51. // monitoring goroutine for it.
  52. //
  53. // This function could return nil, nil, in tests for old behaviors.
  54. // TODO(bar) never return nil, nil when DNS becomes the default resolver.
  55. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  56. target := parseTarget(cc.target)
  57. grpclog.Infof("dialing to target with scheme: %q", target.Scheme)
  58. rb := resolver.Get(target.Scheme)
  59. if rb == nil {
  60. // TODO(bar) return error when DNS becomes the default (implemented and
  61. // registered by DNS package).
  62. grpclog.Infof("could not get resolver for scheme: %q", target.Scheme)
  63. return nil, nil
  64. }
  65. ccr := &ccResolverWrapper{
  66. cc: cc,
  67. addrCh: make(chan []resolver.Address, 1),
  68. scCh: make(chan string, 1),
  69. done: make(chan struct{}),
  70. }
  71. var err error
  72. ccr.resolver, err = rb.Build(target, ccr, resolver.BuildOption{})
  73. if err != nil {
  74. return nil, err
  75. }
  76. go ccr.watcher()
  77. return ccr, nil
  78. }
  79. // watcher processes address updates and service config updates sequencially.
  80. // Otherwise, we need to resolve possible races between address and service
  81. // config (e.g. they specify different balancer types).
  82. func (ccr *ccResolverWrapper) watcher() {
  83. for {
  84. select {
  85. case <-ccr.done:
  86. return
  87. default:
  88. }
  89. select {
  90. case addrs := <-ccr.addrCh:
  91. grpclog.Infof("ccResolverWrapper: sending new addresses to balancer wrapper: %v", addrs)
  92. // TODO(bar switching) this should never be nil. Pickfirst should be default.
  93. if ccr.cc.balancerWrapper != nil {
  94. // TODO(bar switching) create balancer if it's nil?
  95. ccr.cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
  96. }
  97. case sc := <-ccr.scCh:
  98. grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
  99. case <-ccr.done:
  100. return
  101. }
  102. }
  103. }
  104. func (ccr *ccResolverWrapper) close() {
  105. ccr.resolver.Close()
  106. close(ccr.done)
  107. }
  108. // NewAddress is called by the resolver implemenetion to send addresses to gRPC.
  109. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  110. select {
  111. case <-ccr.addrCh:
  112. default:
  113. }
  114. ccr.addrCh <- addrs
  115. }
  116. // NewServiceConfig is called by the resolver implemenetion to send service
  117. // configs to gPRC.
  118. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  119. select {
  120. case <-ccr.scCh:
  121. default:
  122. }
  123. ccr.scCh <- sc
  124. }