balancer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /*
  2. *
  3. * Copyright 2016, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "fmt"
  36. "sync"
  37. "golang.org/x/net/context"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/naming"
  40. "google.golang.org/grpc/transport"
  41. )
  42. // Address represents a server the client connects to.
  43. // This is the EXPERIMENTAL API and may be changed or extended in the future.
  44. type Address struct {
  45. // Addr is the server address on which a connection will be established.
  46. Addr string
  47. // Metadata is the information associated with Addr, which may be used
  48. // to make load balancing decision.
  49. Metadata interface{}
  50. }
  51. // BalancerGetOptions configures a Get call.
  52. // This is the EXPERIMENTAL API and may be changed or extended in the future.
  53. type BalancerGetOptions struct {
  54. // BlockingWait specifies whether Get should block when there is no
  55. // connected address.
  56. BlockingWait bool
  57. }
  58. // Balancer chooses network addresses for RPCs.
  59. // This is the EXPERIMENTAL API and may be changed or extended in the future.
  60. type Balancer interface {
  61. // Start does the initialization work to bootstrap a Balancer. For example,
  62. // this function may start the name resolution and watch the updates. It will
  63. // be called when dialing.
  64. Start(target string) error
  65. // Up informs the Balancer that gRPC has a connection to the server at
  66. // addr. It returns down which is called once the connection to addr gets
  67. // lost or closed.
  68. // TODO: It is not clear how to construct and take advantage the meaningful error
  69. // parameter for down. Need realistic demands to guide.
  70. Up(addr Address) (down func(error))
  71. // Get gets the address of a server for the RPC corresponding to ctx.
  72. // i) If it returns a connected address, gRPC internals issues the RPC on the
  73. // connection to this address;
  74. // ii) If it returns an address on which the connection is under construction
  75. // (initiated by Notify(...)) but not connected, gRPC internals
  76. // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or
  77. // Shutdown state;
  78. // or
  79. // * issues RPC on the connection otherwise.
  80. // iii) If it returns an address on which the connection does not exist, gRPC
  81. // internals treats it as an error and will fail the corresponding RPC.
  82. //
  83. // Therefore, the following is the recommended rule when writing a custom Balancer.
  84. // If opts.BlockingWait is true, it should return a connected address or
  85. // block if there is no connected address. It should respect the timeout or
  86. // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast
  87. // RPCs), it should return an address it has notified via Notify(...) immediately
  88. // instead of blocking.
  89. //
  90. // The function returns put which is called once the rpc has completed or failed.
  91. // put can collect and report RPC stats to a remote load balancer. gRPC internals
  92. // will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
  93. //
  94. // TODO: Add other non-recoverable errors?
  95. Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
  96. // Notify returns a channel that is used by gRPC internals to watch the addresses
  97. // gRPC needs to connect. The addresses might be from a name resolver or remote
  98. // load balancer. gRPC internals will compare it with the existing connected
  99. // addresses. If the address Balancer notified is not in the existing connected
  100. // addresses, gRPC starts to connect the address. If an address in the existing
  101. // connected addresses is not in the notification list, the corresponding connection
  102. // is shutdown gracefully. Otherwise, there are no operations to take. Note that
  103. // the Address slice must be the full list of the Addresses which should be connected.
  104. // It is NOT delta.
  105. Notify() <-chan []Address
  106. // Close shuts down the balancer.
  107. Close() error
  108. }
  109. // downErr implements net.Error. It is constructed by gRPC internals and passed to the down
  110. // call of Balancer.
  111. type downErr struct {
  112. timeout bool
  113. temporary bool
  114. desc string
  115. }
  116. func (e downErr) Error() string { return e.desc }
  117. func (e downErr) Timeout() bool { return e.timeout }
  118. func (e downErr) Temporary() bool { return e.temporary }
  119. func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr {
  120. return downErr{
  121. timeout: timeout,
  122. temporary: temporary,
  123. desc: fmt.Sprintf(format, a...),
  124. }
  125. }
  126. // RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
  127. // the name resolution updates and updates the addresses available correspondingly.
  128. func RoundRobin(r naming.Resolver) Balancer {
  129. return &roundRobin{r: r}
  130. }
  131. type roundRobin struct {
  132. r naming.Resolver
  133. w naming.Watcher
  134. open []Address // all the addresses the client should potentially connect
  135. mu sync.Mutex
  136. addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
  137. connected []Address // all the connected addresses
  138. next int // index of the next address to return for Get()
  139. waitCh chan struct{} // the channel to block when there is no connected address available
  140. done bool // The Balancer is closed.
  141. }
  142. func (rr *roundRobin) watchAddrUpdates() error {
  143. updates, err := rr.w.Next()
  144. if err != nil {
  145. grpclog.Println("grpc: the naming watcher stops working due to %v.", err)
  146. return err
  147. }
  148. rr.mu.Lock()
  149. defer rr.mu.Unlock()
  150. for _, update := range updates {
  151. addr := Address{
  152. Addr: update.Addr,
  153. }
  154. switch update.Op {
  155. case naming.Add:
  156. var exist bool
  157. for _, v := range rr.open {
  158. if addr == v {
  159. exist = true
  160. grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
  161. break
  162. }
  163. }
  164. if exist {
  165. continue
  166. }
  167. rr.open = append(rr.open, addr)
  168. case naming.Delete:
  169. for i, v := range rr.open {
  170. if v == addr {
  171. copy(rr.open[i:], rr.open[i+1:])
  172. rr.open = rr.open[:len(rr.open)-1]
  173. break
  174. }
  175. }
  176. default:
  177. grpclog.Println("Unknown update.Op ", update.Op)
  178. }
  179. }
  180. // Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified.
  181. open := make([]Address, len(rr.open), len(rr.open))
  182. copy(open, rr.open)
  183. if rr.done {
  184. return ErrClientConnClosing
  185. }
  186. rr.addrCh <- open
  187. return nil
  188. }
  189. func (rr *roundRobin) Start(target string) error {
  190. if rr.r == nil {
  191. // If there is no name resolver installed, it is not needed to
  192. // do name resolution. In this case, rr.addrCh stays nil.
  193. return nil
  194. }
  195. w, err := rr.r.Resolve(target)
  196. if err != nil {
  197. return err
  198. }
  199. rr.w = w
  200. rr.addrCh = make(chan []Address)
  201. go func() {
  202. for {
  203. if err := rr.watchAddrUpdates(); err != nil {
  204. return
  205. }
  206. }
  207. }()
  208. return nil
  209. }
  210. // Up appends addr to the end of rr.connected and sends notification if there
  211. // are pending Get() calls.
  212. func (rr *roundRobin) Up(addr Address) func(error) {
  213. rr.mu.Lock()
  214. defer rr.mu.Unlock()
  215. for _, a := range rr.connected {
  216. if a == addr {
  217. return nil
  218. }
  219. }
  220. rr.connected = append(rr.connected, addr)
  221. if len(rr.connected) == 1 {
  222. // addr is only one available. Notify the Get() callers who are blocking.
  223. if rr.waitCh != nil {
  224. close(rr.waitCh)
  225. rr.waitCh = nil
  226. }
  227. }
  228. return func(err error) {
  229. rr.down(addr, err)
  230. }
  231. }
  232. // down removes addr from rr.connected and moves the remaining addrs forward.
  233. func (rr *roundRobin) down(addr Address, err error) {
  234. rr.mu.Lock()
  235. defer rr.mu.Unlock()
  236. for i, a := range rr.connected {
  237. if a == addr {
  238. copy(rr.connected[i:], rr.connected[i+1:])
  239. rr.connected = rr.connected[:len(rr.connected)-1]
  240. return
  241. }
  242. }
  243. }
  244. // Get returns the next addr in the rotation.
  245. func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
  246. var ch chan struct{}
  247. rr.mu.Lock()
  248. if rr.done {
  249. rr.mu.Unlock()
  250. err = ErrClientConnClosing
  251. return
  252. }
  253. if rr.next >= len(rr.connected) {
  254. rr.next = 0
  255. }
  256. if len(rr.connected) > 0 {
  257. addr = rr.connected[rr.next]
  258. rr.next++
  259. rr.mu.Unlock()
  260. return
  261. }
  262. // There is no address available. Wait on rr.waitCh.
  263. // TODO(zhaoq): Handle the case when opts.BlockingWait is false.
  264. if rr.waitCh == nil {
  265. ch = make(chan struct{})
  266. rr.waitCh = ch
  267. } else {
  268. ch = rr.waitCh
  269. }
  270. rr.mu.Unlock()
  271. for {
  272. select {
  273. case <-ctx.Done():
  274. err = transport.ContextErr(ctx.Err())
  275. return
  276. case <-ch:
  277. rr.mu.Lock()
  278. if rr.done {
  279. rr.mu.Unlock()
  280. err = ErrClientConnClosing
  281. return
  282. }
  283. if len(rr.connected) == 0 {
  284. // The newly added addr got removed by Down() again.
  285. if rr.waitCh == nil {
  286. ch = make(chan struct{})
  287. rr.waitCh = ch
  288. } else {
  289. ch = rr.waitCh
  290. }
  291. rr.mu.Unlock()
  292. continue
  293. }
  294. if rr.next >= len(rr.connected) {
  295. rr.next = 0
  296. }
  297. addr = rr.connected[rr.next]
  298. rr.next++
  299. rr.mu.Unlock()
  300. return
  301. }
  302. }
  303. }
  304. func (rr *roundRobin) Notify() <-chan []Address {
  305. return rr.addrCh
  306. }
  307. func (rr *roundRobin) Close() error {
  308. rr.mu.Lock()
  309. defer rr.mu.Unlock()
  310. rr.done = true
  311. if rr.w != nil {
  312. rr.w.Close()
  313. }
  314. if rr.waitCh != nil {
  315. close(rr.waitCh)
  316. rr.waitCh = nil
  317. }
  318. if rr.addrCh != nil {
  319. close(rr.addrCh)
  320. }
  321. return nil
  322. }