balancer.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package balancer
  15. import (
  16. "fmt"
  17. "strconv"
  18. "sync"
  19. "time"
  20. "go.etcd.io/etcd/clientv3/balancer/picker"
  21. "go.uber.org/zap"
  22. "google.golang.org/grpc/balancer"
  23. "google.golang.org/grpc/connectivity"
  24. "google.golang.org/grpc/resolver"
  25. _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
  26. _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
  27. )
  28. // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
  29. // must be invoked at initialization time.
  30. func RegisterBuilder(cfg Config) {
  31. bb := &builder{cfg}
  32. balancer.Register(bb)
  33. bb.cfg.Logger.Debug(
  34. "registered balancer",
  35. zap.String("policy", bb.cfg.Policy.String()),
  36. zap.String("name", bb.cfg.Name),
  37. )
  38. }
  39. type builder struct {
  40. cfg Config
  41. }
  42. // Build is called initially when creating "ccBalancerWrapper".
  43. // "grpc.Dial" is called to this client connection.
  44. // Then, resolved addresses will be handled via "HandleResolvedAddrs".
  45. func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  46. bb := &baseBalancer{
  47. id: strconv.FormatInt(time.Now().UnixNano(), 36),
  48. policy: b.cfg.Policy,
  49. name: b.cfg.Name,
  50. lg: b.cfg.Logger,
  51. addrToSc: make(map[resolver.Address]balancer.SubConn),
  52. scToAddr: make(map[balancer.SubConn]resolver.Address),
  53. scToSt: make(map[balancer.SubConn]connectivity.State),
  54. currentConn: nil,
  55. csEvltr: &connectivityStateEvaluator{},
  56. // initialize picker always returns "ErrNoSubConnAvailable"
  57. Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
  58. }
  59. if bb.lg == nil {
  60. bb.lg = zap.NewNop()
  61. }
  62. // TODO: support multiple connections
  63. bb.mu.Lock()
  64. bb.currentConn = cc
  65. bb.mu.Unlock()
  66. bb.lg.Info(
  67. "built balancer",
  68. zap.String("balancer-id", bb.id),
  69. zap.String("policy", bb.policy.String()),
  70. zap.String("resolver-target", cc.Target()),
  71. )
  72. return bb
  73. }
  74. // Name implements "grpc/balancer.Builder" interface.
  75. func (b *builder) Name() string { return b.cfg.Name }
  76. // Balancer defines client balancer interface.
  77. type Balancer interface {
  78. // Balancer is called on specified client connection. Client initiates gRPC
  79. // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
  80. // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
  81. // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
  82. // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
  83. // changes, thus requires failover logic in this method.
  84. balancer.Balancer
  85. // Picker calls "Pick" for every client request.
  86. picker.Picker
  87. }
  88. type baseBalancer struct {
  89. id string
  90. policy picker.Policy
  91. name string
  92. lg *zap.Logger
  93. mu sync.RWMutex
  94. addrToSc map[resolver.Address]balancer.SubConn
  95. scToAddr map[balancer.SubConn]resolver.Address
  96. scToSt map[balancer.SubConn]connectivity.State
  97. currentConn balancer.ClientConn
  98. currentState connectivity.State
  99. csEvltr *connectivityStateEvaluator
  100. picker.Picker
  101. }
  102. // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
  103. // gRPC sends initial or updated resolved addresses from "Build".
  104. func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  105. if err != nil {
  106. bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
  107. return
  108. }
  109. bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
  110. bb.mu.Lock()
  111. defer bb.mu.Unlock()
  112. resolved := make(map[resolver.Address]struct{})
  113. for _, addr := range addrs {
  114. resolved[addr] = struct{}{}
  115. if _, ok := bb.addrToSc[addr]; !ok {
  116. sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
  117. if err != nil {
  118. bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
  119. continue
  120. }
  121. bb.addrToSc[addr] = sc
  122. bb.scToAddr[sc] = addr
  123. bb.scToSt[sc] = connectivity.Idle
  124. sc.Connect()
  125. }
  126. }
  127. for addr, sc := range bb.addrToSc {
  128. if _, ok := resolved[addr]; !ok {
  129. // was removed by resolver or failed to create subconn
  130. bb.currentConn.RemoveSubConn(sc)
  131. delete(bb.addrToSc, addr)
  132. bb.lg.Info(
  133. "removed subconn",
  134. zap.String("balancer-id", bb.id),
  135. zap.String("address", addr.Addr),
  136. zap.String("subconn", scToString(sc)),
  137. )
  138. // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
  139. // The entry will be deleted in HandleSubConnStateChange.
  140. // (DO NOT) delete(bb.scToAddr, sc)
  141. // (DO NOT) delete(bb.scToSt, sc)
  142. }
  143. }
  144. }
  145. // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
  146. func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  147. bb.mu.Lock()
  148. defer bb.mu.Unlock()
  149. old, ok := bb.scToSt[sc]
  150. if !ok {
  151. bb.lg.Warn(
  152. "state change for an unknown subconn",
  153. zap.String("balancer-id", bb.id),
  154. zap.String("subconn", scToString(sc)),
  155. zap.String("state", s.String()),
  156. )
  157. return
  158. }
  159. bb.lg.Info(
  160. "state changed",
  161. zap.String("balancer-id", bb.id),
  162. zap.Bool("connected", s == connectivity.Ready),
  163. zap.String("subconn", scToString(sc)),
  164. zap.String("address", bb.scToAddr[sc].Addr),
  165. zap.String("old-state", old.String()),
  166. zap.String("new-state", s.String()),
  167. )
  168. bb.scToSt[sc] = s
  169. switch s {
  170. case connectivity.Idle:
  171. sc.Connect()
  172. case connectivity.Shutdown:
  173. // When an address was removed by resolver, b called RemoveSubConn but
  174. // kept the sc's state in scToSt. Remove state for this sc here.
  175. delete(bb.scToAddr, sc)
  176. delete(bb.scToSt, sc)
  177. }
  178. oldAggrState := bb.currentState
  179. bb.currentState = bb.csEvltr.recordTransition(old, s)
  180. // Regenerate picker when one of the following happens:
  181. // - this sc became ready from not-ready
  182. // - this sc became not-ready from ready
  183. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  184. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  185. if (s == connectivity.Ready) != (old == connectivity.Ready) ||
  186. (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
  187. bb.regeneratePicker()
  188. }
  189. bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
  190. return
  191. }
  192. func (bb *baseBalancer) regeneratePicker() {
  193. if bb.currentState == connectivity.TransientFailure {
  194. bb.lg.Info(
  195. "generated transient error picker",
  196. zap.String("balancer-id", bb.id),
  197. zap.String("policy", bb.policy.String()),
  198. )
  199. bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
  200. return
  201. }
  202. // only pass ready subconns to picker
  203. scs := make([]balancer.SubConn, 0)
  204. addrToSc := make(map[resolver.Address]balancer.SubConn)
  205. scToAddr := make(map[balancer.SubConn]resolver.Address)
  206. for addr, sc := range bb.addrToSc {
  207. if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
  208. scs = append(scs, sc)
  209. addrToSc[addr] = sc
  210. scToAddr[sc] = addr
  211. }
  212. }
  213. switch bb.policy {
  214. case picker.RoundrobinBalanced:
  215. bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
  216. default:
  217. panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
  218. }
  219. bb.lg.Info(
  220. "generated picker",
  221. zap.String("balancer-id", bb.id),
  222. zap.String("policy", bb.policy.String()),
  223. zap.Strings("subconn-ready", scsToStrings(addrToSc)),
  224. zap.Int("subconn-size", len(addrToSc)),
  225. )
  226. }
  227. // Close implements "grpc/balancer.Balancer" interface.
  228. // Close is a nop because base balancer doesn't have internal state to clean up,
  229. // and it doesn't need to call RemoveSubConn for the SubConns.
  230. func (bb *baseBalancer) Close() {
  231. // TODO
  232. }