director.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package httpproxy
  15. import (
  16. "math/rand"
  17. "net/url"
  18. "sync"
  19. "time"
  20. )
  21. // defaultRefreshInterval is the default proxyRefreshIntervalMs value
  22. // as in etcdmain/config.go.
  23. const defaultRefreshInterval = 30000 * time.Millisecond
  24. var once sync.Once
  25. func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
  26. d := &director{
  27. uf: urlsFunc,
  28. failureWait: failureWait,
  29. }
  30. d.refresh()
  31. go func() {
  32. // In order to prevent missing proxy endpoints in the first try:
  33. // when given refresh interval of defaultRefreshInterval or greater
  34. // and whenever there is no available proxy endpoints,
  35. // give 1-second refreshInterval.
  36. for {
  37. es := d.endpoints()
  38. ri := refreshInterval
  39. if ri >= defaultRefreshInterval {
  40. if len(es) == 0 {
  41. ri = time.Second
  42. }
  43. }
  44. if len(es) > 0 {
  45. once.Do(func() {
  46. var sl []string
  47. for _, e := range es {
  48. sl = append(sl, e.URL.String())
  49. }
  50. plog.Infof("endpoints found %q", sl)
  51. })
  52. }
  53. time.Sleep(ri)
  54. d.refresh()
  55. }
  56. }()
  57. return d
  58. }
  59. type director struct {
  60. sync.Mutex
  61. ep []*endpoint
  62. uf GetProxyURLs
  63. failureWait time.Duration
  64. }
  65. func (d *director) refresh() {
  66. urls := d.uf()
  67. d.Lock()
  68. defer d.Unlock()
  69. var endpoints []*endpoint
  70. for _, u := range urls {
  71. uu, err := url.Parse(u)
  72. if err != nil {
  73. plog.Printf("upstream URL invalid: %v", err)
  74. continue
  75. }
  76. endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
  77. }
  78. // shuffle array to avoid connections being "stuck" to a single endpoint
  79. for i := range endpoints {
  80. j := rand.Intn(i + 1)
  81. endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
  82. }
  83. d.ep = endpoints
  84. }
  85. func (d *director) endpoints() []*endpoint {
  86. d.Lock()
  87. defer d.Unlock()
  88. filtered := make([]*endpoint, 0)
  89. for _, ep := range d.ep {
  90. if ep.Available {
  91. filtered = append(filtered, ep)
  92. }
  93. }
  94. return filtered
  95. }
  96. func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
  97. ep := endpoint{
  98. URL: u,
  99. Available: true,
  100. failFunc: timedUnavailabilityFunc(failureWait),
  101. }
  102. return &ep
  103. }
  104. type endpoint struct {
  105. sync.Mutex
  106. URL url.URL
  107. Available bool
  108. failFunc func(ep *endpoint)
  109. }
  110. func (ep *endpoint) Failed() {
  111. ep.Lock()
  112. if !ep.Available {
  113. ep.Unlock()
  114. return
  115. }
  116. ep.Available = false
  117. ep.Unlock()
  118. plog.Printf("marked endpoint %s unavailable", ep.URL.String())
  119. if ep.failFunc == nil {
  120. plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
  121. return
  122. }
  123. ep.failFunc(ep)
  124. }
  125. func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
  126. return func(ep *endpoint) {
  127. time.AfterFunc(wait, func() {
  128. ep.Available = true
  129. plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
  130. })
  131. }
  132. }