balancer.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package balancer
  15. import (
  16. "fmt"
  17. "strconv"
  18. "sync"
  19. "time"
  20. "go.etcd.io/etcd/clientv3/balancer/picker"
  21. "go.uber.org/zap"
  22. "google.golang.org/grpc/balancer"
  23. "google.golang.org/grpc/connectivity"
  24. "google.golang.org/grpc/resolver"
  25. _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
  26. _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
  27. )
  28. // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
  29. // must be invoked at initialization time.
  30. func RegisterBuilder(cfg Config) {
  31. bb := &builder{cfg}
  32. balancer.Register(bb)
  33. bb.cfg.Logger.Debug(
  34. "registered balancer",
  35. zap.String("policy", bb.cfg.Policy.String()),
  36. zap.String("name", bb.cfg.Name),
  37. )
  38. }
  39. type builder struct {
  40. cfg Config
  41. }
  42. // Build is called initially when creating "ccBalancerWrapper".
  43. // "grpc.Dial" is called to this client connection.
  44. // Then, resolved addresses will be handled via "HandleResolvedAddrs".
  45. func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  46. bb := &baseBalancer{
  47. id: strconv.FormatInt(time.Now().UnixNano(), 36),
  48. policy: b.cfg.Policy,
  49. name: b.cfg.Policy.String(),
  50. lg: b.cfg.Logger,
  51. addrToSc: make(map[resolver.Address]balancer.SubConn),
  52. scToAddr: make(map[balancer.SubConn]resolver.Address),
  53. scToSt: make(map[balancer.SubConn]connectivity.State),
  54. currentConn: nil,
  55. csEvltr: &connectivityStateEvaluator{},
  56. // initialize picker always returns "ErrNoSubConnAvailable"
  57. Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
  58. }
  59. if b.cfg.Name != "" {
  60. bb.name = b.cfg.Name
  61. }
  62. if bb.lg == nil {
  63. bb.lg = zap.NewNop()
  64. }
  65. // TODO: support multiple connections
  66. bb.mu.Lock()
  67. bb.currentConn = cc
  68. bb.mu.Unlock()
  69. bb.lg.Info(
  70. "built balancer",
  71. zap.String("balancer-id", bb.id),
  72. zap.String("policy", bb.policy.String()),
  73. zap.String("resolver-target", cc.Target()),
  74. )
  75. return bb
  76. }
  77. // Name implements "grpc/balancer.Builder" interface.
  78. func (b *builder) Name() string { return b.cfg.Name }
  79. // Balancer defines client balancer interface.
  80. type Balancer interface {
  81. // Balancer is called on specified client connection. Client initiates gRPC
  82. // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
  83. // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
  84. // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
  85. // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
  86. // changes, thus requires failover logic in this method.
  87. balancer.Balancer
  88. // Picker calls "Pick" for every client request.
  89. picker.Picker
  90. }
  91. type baseBalancer struct {
  92. id string
  93. policy picker.Policy
  94. name string
  95. lg *zap.Logger
  96. mu sync.RWMutex
  97. addrToSc map[resolver.Address]balancer.SubConn
  98. scToAddr map[balancer.SubConn]resolver.Address
  99. scToSt map[balancer.SubConn]connectivity.State
  100. currentConn balancer.ClientConn
  101. currentState connectivity.State
  102. csEvltr *connectivityStateEvaluator
  103. picker.Picker
  104. }
  105. // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
  106. // gRPC sends initial or updated resolved addresses from "Build".
  107. func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  108. if err != nil {
  109. bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
  110. return
  111. }
  112. bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
  113. bb.mu.Lock()
  114. defer bb.mu.Unlock()
  115. resolved := make(map[resolver.Address]struct{})
  116. for _, addr := range addrs {
  117. resolved[addr] = struct{}{}
  118. if _, ok := bb.addrToSc[addr]; !ok {
  119. sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
  120. if err != nil {
  121. bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
  122. continue
  123. }
  124. bb.addrToSc[addr] = sc
  125. bb.scToAddr[sc] = addr
  126. bb.scToSt[sc] = connectivity.Idle
  127. sc.Connect()
  128. }
  129. }
  130. for addr, sc := range bb.addrToSc {
  131. if _, ok := resolved[addr]; !ok {
  132. // was removed by resolver or failed to create subconn
  133. bb.currentConn.RemoveSubConn(sc)
  134. delete(bb.addrToSc, addr)
  135. bb.lg.Info(
  136. "removed subconn",
  137. zap.String("balancer-id", bb.id),
  138. zap.String("address", addr.Addr),
  139. zap.String("subconn", scToString(sc)),
  140. )
  141. // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
  142. // The entry will be deleted in HandleSubConnStateChange.
  143. // (DO NOT) delete(bb.scToAddr, sc)
  144. // (DO NOT) delete(bb.scToSt, sc)
  145. }
  146. }
  147. }
  148. // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
  149. func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  150. bb.mu.Lock()
  151. defer bb.mu.Unlock()
  152. old, ok := bb.scToSt[sc]
  153. if !ok {
  154. bb.lg.Warn(
  155. "state change for an unknown subconn",
  156. zap.String("balancer-id", bb.id),
  157. zap.String("subconn", scToString(sc)),
  158. zap.String("state", s.String()),
  159. )
  160. return
  161. }
  162. bb.lg.Info(
  163. "state changed",
  164. zap.String("balancer-id", bb.id),
  165. zap.Bool("connected", s == connectivity.Ready),
  166. zap.String("subconn", scToString(sc)),
  167. zap.String("address", bb.scToAddr[sc].Addr),
  168. zap.String("old-state", old.String()),
  169. zap.String("new-state", s.String()),
  170. )
  171. bb.scToSt[sc] = s
  172. switch s {
  173. case connectivity.Idle:
  174. sc.Connect()
  175. case connectivity.Shutdown:
  176. // When an address was removed by resolver, b called RemoveSubConn but
  177. // kept the sc's state in scToSt. Remove state for this sc here.
  178. delete(bb.scToAddr, sc)
  179. delete(bb.scToSt, sc)
  180. }
  181. oldAggrState := bb.currentState
  182. bb.currentState = bb.csEvltr.recordTransition(old, s)
  183. // Regenerate picker when one of the following happens:
  184. // - this sc became ready from not-ready
  185. // - this sc became not-ready from ready
  186. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  187. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  188. if (s == connectivity.Ready) != (old == connectivity.Ready) ||
  189. (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
  190. bb.regeneratePicker()
  191. }
  192. bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
  193. return
  194. }
  195. func (bb *baseBalancer) regeneratePicker() {
  196. if bb.currentState == connectivity.TransientFailure {
  197. bb.lg.Info(
  198. "generated transient error picker",
  199. zap.String("balancer-id", bb.id),
  200. zap.String("policy", bb.policy.String()),
  201. )
  202. bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
  203. return
  204. }
  205. // only pass ready subconns to picker
  206. scs := make([]balancer.SubConn, 0)
  207. addrToSc := make(map[resolver.Address]balancer.SubConn)
  208. scToAddr := make(map[balancer.SubConn]resolver.Address)
  209. for addr, sc := range bb.addrToSc {
  210. if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
  211. scs = append(scs, sc)
  212. addrToSc[addr] = sc
  213. scToAddr[sc] = addr
  214. }
  215. }
  216. switch bb.policy {
  217. case picker.RoundrobinBalanced:
  218. bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
  219. default:
  220. panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
  221. }
  222. bb.lg.Info(
  223. "generated picker",
  224. zap.String("balancer-id", bb.id),
  225. zap.String("policy", bb.policy.String()),
  226. zap.Strings("subconn-ready", scsToStrings(addrToSc)),
  227. zap.Int("subconn-size", len(addrToSc)),
  228. )
  229. }
  230. // Close implements "grpc/balancer.Balancer" interface.
  231. // Close is a nop because base balancer doesn't have internal state to clean up,
  232. // and it doesn't need to call RemoveSubConn for the SubConns.
  233. func (bb *baseBalancer) Close() {
  234. // TODO
  235. }