balancer.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package balancer implements client balancer.
  15. package balancer
  16. import (
  17. "strconv"
  18. "sync"
  19. "time"
  20. "go.etcd.io/etcd/clientv3/balancer/connectivity"
  21. "go.etcd.io/etcd/clientv3/balancer/picker"
  22. "go.uber.org/zap"
  23. "google.golang.org/grpc/balancer"
  24. grpcconnectivity "google.golang.org/grpc/connectivity"
  25. "google.golang.org/grpc/resolver"
  26. _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
  27. _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
  28. )
  29. // Config defines balancer configurations.
  30. type Config struct {
  31. // Policy configures balancer policy.
  32. Policy picker.Policy
  33. // Picker implements gRPC picker.
  34. // Leave empty if "Policy" field is not custom.
  35. // TODO: currently custom policy is not supported.
  36. // Picker picker.Picker
  37. // Name defines an additional name for balancer.
  38. // Useful for balancer testing to avoid register conflicts.
  39. // If empty, defaults to policy name.
  40. Name string
  41. // Logger configures balancer logging.
  42. // If nil, logs are discarded.
  43. Logger *zap.Logger
  44. }
  45. // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
  46. // must be invoked at initialization time.
  47. func RegisterBuilder(cfg Config) {
  48. bb := &builder{cfg}
  49. balancer.Register(bb)
  50. bb.cfg.Logger.Debug(
  51. "registered balancer",
  52. zap.String("policy", bb.cfg.Policy.String()),
  53. zap.String("name", bb.cfg.Name),
  54. )
  55. }
  56. type builder struct {
  57. cfg Config
  58. }
  59. // Build is called initially when creating "ccBalancerWrapper".
  60. // "grpc.Dial" is called to this client connection.
  61. // Then, resolved addresses will be handled via "HandleResolvedAddrs".
  62. func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  63. bb := &baseBalancer{
  64. id: strconv.FormatInt(time.Now().UnixNano(), 36),
  65. policy: b.cfg.Policy,
  66. name: b.cfg.Name,
  67. lg: b.cfg.Logger,
  68. addrToSc: make(map[resolver.Address]balancer.SubConn),
  69. scToAddr: make(map[balancer.SubConn]resolver.Address),
  70. scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
  71. currentConn: nil,
  72. connectivityRecorder: connectivity.New(b.cfg.Logger),
  73. // initialize picker always returns "ErrNoSubConnAvailable"
  74. picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
  75. }
  76. // TODO: support multiple connections
  77. bb.mu.Lock()
  78. bb.currentConn = cc
  79. bb.mu.Unlock()
  80. bb.lg.Info(
  81. "built balancer",
  82. zap.String("balancer-id", bb.id),
  83. zap.String("policy", bb.policy.String()),
  84. zap.String("resolver-target", cc.Target()),
  85. )
  86. return bb
  87. }
  88. // Name implements "grpc/balancer.Builder" interface.
  89. func (b *builder) Name() string { return b.cfg.Name }
  90. // Balancer defines client balancer interface.
  91. type Balancer interface {
  92. // Balancer is called on specified client connection. Client initiates gRPC
  93. // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
  94. // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
  95. // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
  96. // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
  97. // changes, thus requires failover logic in this method.
  98. balancer.Balancer
  99. // Picker calls "Pick" for every client request.
  100. picker.Picker
  101. }
  102. type baseBalancer struct {
  103. id string
  104. policy picker.Policy
  105. name string
  106. lg *zap.Logger
  107. mu sync.RWMutex
  108. addrToSc map[resolver.Address]balancer.SubConn
  109. scToAddr map[balancer.SubConn]resolver.Address
  110. scToSt map[balancer.SubConn]grpcconnectivity.State
  111. currentConn balancer.ClientConn
  112. connectivityRecorder connectivity.Recorder
  113. picker picker.Picker
  114. }
  115. // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
  116. // gRPC sends initial or updated resolved addresses from "Build".
  117. func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  118. if err != nil {
  119. bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
  120. return
  121. }
  122. bb.lg.Info("resolved",
  123. zap.String("picker", bb.picker.String()),
  124. zap.String("balancer-id", bb.id),
  125. zap.Strings("addresses", addrsToStrings(addrs)),
  126. )
  127. bb.mu.Lock()
  128. defer bb.mu.Unlock()
  129. resolved := make(map[resolver.Address]struct{})
  130. for _, addr := range addrs {
  131. resolved[addr] = struct{}{}
  132. if _, ok := bb.addrToSc[addr]; !ok {
  133. sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
  134. if err != nil {
  135. bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
  136. continue
  137. }
  138. bb.lg.Info("created subconn", zap.String("address", addr.Addr))
  139. bb.addrToSc[addr] = sc
  140. bb.scToAddr[sc] = addr
  141. bb.scToSt[sc] = grpcconnectivity.Idle
  142. sc.Connect()
  143. }
  144. }
  145. for addr, sc := range bb.addrToSc {
  146. if _, ok := resolved[addr]; !ok {
  147. // was removed by resolver or failed to create subconn
  148. bb.currentConn.RemoveSubConn(sc)
  149. delete(bb.addrToSc, addr)
  150. bb.lg.Info(
  151. "removed subconn",
  152. zap.String("picker", bb.picker.String()),
  153. zap.String("balancer-id", bb.id),
  154. zap.String("address", addr.Addr),
  155. zap.String("subconn", scToString(sc)),
  156. )
  157. // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
  158. // The entry will be deleted in HandleSubConnStateChange.
  159. // (DO NOT) delete(bb.scToAddr, sc)
  160. // (DO NOT) delete(bb.scToSt, sc)
  161. }
  162. }
  163. }
  164. // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
  165. func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
  166. bb.mu.Lock()
  167. defer bb.mu.Unlock()
  168. old, ok := bb.scToSt[sc]
  169. if !ok {
  170. bb.lg.Warn(
  171. "state change for an unknown subconn",
  172. zap.String("picker", bb.picker.String()),
  173. zap.String("balancer-id", bb.id),
  174. zap.String("subconn", scToString(sc)),
  175. zap.Int("subconn-size", len(bb.scToAddr)),
  176. zap.String("state", s.String()),
  177. )
  178. return
  179. }
  180. bb.lg.Info(
  181. "state changed",
  182. zap.String("picker", bb.picker.String()),
  183. zap.String("balancer-id", bb.id),
  184. zap.Bool("connected", s == grpcconnectivity.Ready),
  185. zap.String("subconn", scToString(sc)),
  186. zap.Int("subconn-size", len(bb.scToAddr)),
  187. zap.String("address", bb.scToAddr[sc].Addr),
  188. zap.String("old-state", old.String()),
  189. zap.String("new-state", s.String()),
  190. )
  191. bb.scToSt[sc] = s
  192. switch s {
  193. case grpcconnectivity.Idle:
  194. sc.Connect()
  195. case grpcconnectivity.Shutdown:
  196. // When an address was removed by resolver, b called RemoveSubConn but
  197. // kept the sc's state in scToSt. Remove state for this sc here.
  198. delete(bb.scToAddr, sc)
  199. delete(bb.scToSt, sc)
  200. }
  201. oldAggrState := bb.connectivityRecorder.GetCurrentState()
  202. bb.connectivityRecorder.RecordTransition(old, s)
  203. // Update balancer picker when one of the following happens:
  204. // - this sc became ready from not-ready
  205. // - this sc became not-ready from ready
  206. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  207. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  208. if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
  209. (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
  210. bb.updatePicker()
  211. }
  212. bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
  213. }
  214. func (bb *baseBalancer) updatePicker() {
  215. if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
  216. bb.picker = picker.NewErr(balancer.ErrTransientFailure)
  217. bb.lg.Info(
  218. "updated picker to transient error picker",
  219. zap.String("picker", bb.picker.String()),
  220. zap.String("balancer-id", bb.id),
  221. zap.String("policy", bb.policy.String()),
  222. )
  223. return
  224. }
  225. // only pass ready subconns to picker
  226. scToAddr := make(map[balancer.SubConn]resolver.Address)
  227. for addr, sc := range bb.addrToSc {
  228. if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
  229. scToAddr[sc] = addr
  230. }
  231. }
  232. bb.picker = picker.New(picker.Config{
  233. Policy: bb.policy,
  234. Logger: bb.lg,
  235. SubConnToResolverAddress: scToAddr,
  236. })
  237. bb.lg.Info(
  238. "updated picker",
  239. zap.String("picker", bb.picker.String()),
  240. zap.String("balancer-id", bb.id),
  241. zap.String("policy", bb.policy.String()),
  242. zap.Strings("subconn-ready", scsToStrings(scToAddr)),
  243. zap.Int("subconn-size", len(scToAddr)),
  244. )
  245. }
  246. // Close implements "grpc/balancer.Balancer" interface.
  247. // Close is a nop because base balancer doesn't have internal state to clean up,
  248. // and it doesn't need to call RemoveSubConn for the SubConns.
  249. func (bb *baseBalancer) Close() {
  250. // TODO
  251. }