director.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "log"
  17. "math/rand"
  18. "net/url"
  19. "sync"
  20. "time"
  21. )
  22. const (
  23. // amount of time an endpoint will be held in a failed
  24. // state before being reconsidered for proxied requests
  25. endpointFailureWait = 5 * time.Second
  26. // how often the proxy will attempt to refresh its set of endpoints
  27. refreshEndpoints = 30 * time.Second
  28. )
  29. func newDirector(urlsFunc GetProxyURLs) *director {
  30. d := &director{
  31. uf: urlsFunc,
  32. }
  33. d.refresh()
  34. go func() {
  35. for {
  36. select {
  37. case <-time.After(refreshEndpoints):
  38. d.refresh()
  39. }
  40. }
  41. }()
  42. return d
  43. }
  44. type director struct {
  45. sync.Mutex
  46. ep []*endpoint
  47. uf GetProxyURLs
  48. }
  49. func (d *director) refresh() {
  50. urls := d.uf()
  51. d.Lock()
  52. defer d.Unlock()
  53. var endpoints []*endpoint
  54. for _, u := range urls {
  55. uu, err := url.Parse(u)
  56. if err != nil {
  57. log.Printf("proxy: upstream URL invalid: %v", err)
  58. continue
  59. }
  60. endpoints = append(endpoints, newEndpoint(*uu))
  61. }
  62. // shuffle array to avoid connections being "stuck" to a single endpoint
  63. for i := range endpoints {
  64. j := rand.Intn(i + 1)
  65. endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
  66. }
  67. d.ep = endpoints
  68. }
  69. func (d *director) endpoints() []*endpoint {
  70. d.Lock()
  71. defer d.Unlock()
  72. filtered := make([]*endpoint, 0)
  73. for _, ep := range d.ep {
  74. if ep.Available {
  75. filtered = append(filtered, ep)
  76. }
  77. }
  78. return filtered
  79. }
  80. func newEndpoint(u url.URL) *endpoint {
  81. ep := endpoint{
  82. URL: u,
  83. Available: true,
  84. failFunc: timedUnavailabilityFunc(endpointFailureWait),
  85. }
  86. return &ep
  87. }
  88. type endpoint struct {
  89. sync.Mutex
  90. URL url.URL
  91. Available bool
  92. failFunc func(ep *endpoint)
  93. }
  94. func (ep *endpoint) Failed() {
  95. ep.Lock()
  96. if !ep.Available {
  97. ep.Unlock()
  98. return
  99. }
  100. ep.Available = false
  101. ep.Unlock()
  102. log.Printf("proxy: marked endpoint %s unavailable", ep.URL.String())
  103. if ep.failFunc == nil {
  104. log.Printf("proxy: no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
  105. return
  106. }
  107. ep.failFunc(ep)
  108. }
  109. func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
  110. return func(ep *endpoint) {
  111. time.AfterFunc(wait, func() {
  112. ep.Available = true
  113. log.Printf("proxy: marked endpoint %s available", ep.URL.String())
  114. })
  115. }
  116. }