userspace.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tcpproxy
  15. import (
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "net"
  20. "sync"
  21. "time"
  22. "github.com/coreos/pkg/capnslog"
  23. )
  24. var (
  25. plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy")
  26. )
  27. type remote struct {
  28. mu sync.Mutex
  29. srv *net.SRV
  30. addr string
  31. inactive bool
  32. }
  33. func (r *remote) inactivate() {
  34. r.mu.Lock()
  35. defer r.mu.Unlock()
  36. r.inactive = true
  37. }
  38. func (r *remote) tryReactivate() error {
  39. conn, err := net.Dial("tcp", r.addr)
  40. if err != nil {
  41. return err
  42. }
  43. conn.Close()
  44. r.mu.Lock()
  45. defer r.mu.Unlock()
  46. r.inactive = false
  47. return nil
  48. }
  49. func (r *remote) isActive() bool {
  50. r.mu.Lock()
  51. defer r.mu.Unlock()
  52. return !r.inactive
  53. }
  54. type TCPProxy struct {
  55. Listener net.Listener
  56. Endpoints []*net.SRV
  57. MonitorInterval time.Duration
  58. donec chan struct{}
  59. mu sync.Mutex // guards the following fields
  60. remotes []*remote
  61. pickCount int // for round robin
  62. }
  63. func (tp *TCPProxy) Run() error {
  64. tp.donec = make(chan struct{})
  65. if tp.MonitorInterval == 0 {
  66. tp.MonitorInterval = 5 * time.Minute
  67. }
  68. for _, srv := range tp.Endpoints {
  69. addr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
  70. tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
  71. }
  72. plog.Printf("ready to proxy client requests to %+v", tp.Endpoints)
  73. go tp.runMonitor()
  74. for {
  75. in, err := tp.Listener.Accept()
  76. if err != nil {
  77. return err
  78. }
  79. go tp.serve(in)
  80. }
  81. }
  82. func (tp *TCPProxy) pick() *remote {
  83. var weighted []*remote
  84. var unweighted []*remote
  85. bestPr := uint16(65535)
  86. w := 0
  87. // find best priority class
  88. for _, r := range tp.remotes {
  89. switch {
  90. case !r.isActive():
  91. case r.srv.Priority < bestPr:
  92. bestPr = r.srv.Priority
  93. w = 0
  94. weighted, unweighted = nil, nil
  95. unweighted = []*remote{r}
  96. fallthrough
  97. case r.srv.Priority == bestPr:
  98. if r.srv.Weight > 0 {
  99. weighted = append(weighted, r)
  100. w += int(r.srv.Weight)
  101. } else {
  102. unweighted = append(unweighted, r)
  103. }
  104. }
  105. }
  106. if weighted != nil {
  107. if len(unweighted) > 0 && rand.Intn(100) == 1 {
  108. // In the presence of records containing weights greater
  109. // than 0, records with weight 0 should have a very small
  110. // chance of being selected.
  111. r := unweighted[tp.pickCount%len(unweighted)]
  112. tp.pickCount++
  113. return r
  114. }
  115. // choose a uniform random number between 0 and the sum computed
  116. // (inclusive), and select the RR whose running sum value is the
  117. // first in the selected order
  118. choose := rand.Intn(w)
  119. for i := 0; i < len(weighted); i++ {
  120. choose -= int(weighted[i].srv.Weight)
  121. if choose <= 0 {
  122. return weighted[i]
  123. }
  124. }
  125. }
  126. if unweighted != nil {
  127. for i := 0; i < len(tp.remotes); i++ {
  128. picked := tp.remotes[tp.pickCount%len(tp.remotes)]
  129. tp.pickCount++
  130. if picked.isActive() {
  131. return picked
  132. }
  133. }
  134. }
  135. return nil
  136. }
  137. func (tp *TCPProxy) serve(in net.Conn) {
  138. var (
  139. err error
  140. out net.Conn
  141. )
  142. for {
  143. tp.mu.Lock()
  144. remote := tp.pick()
  145. tp.mu.Unlock()
  146. if remote == nil {
  147. break
  148. }
  149. // TODO: add timeout
  150. out, err = net.Dial("tcp", remote.addr)
  151. if err == nil {
  152. break
  153. }
  154. remote.inactivate()
  155. plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval)
  156. }
  157. if out == nil {
  158. in.Close()
  159. return
  160. }
  161. go func() {
  162. io.Copy(in, out)
  163. in.Close()
  164. out.Close()
  165. }()
  166. io.Copy(out, in)
  167. out.Close()
  168. in.Close()
  169. }
  170. func (tp *TCPProxy) runMonitor() {
  171. for {
  172. select {
  173. case <-time.After(tp.MonitorInterval):
  174. tp.mu.Lock()
  175. for _, rem := range tp.remotes {
  176. if rem.isActive() {
  177. continue
  178. }
  179. go func(r *remote) {
  180. if err := r.tryReactivate(); err != nil {
  181. plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
  182. } else {
  183. plog.Printf("activated %s", r.addr)
  184. }
  185. }(rem)
  186. }
  187. tp.mu.Unlock()
  188. case <-tp.donec:
  189. return
  190. }
  191. }
  192. }
  193. func (tp *TCPProxy) Stop() {
  194. // graceful shutdown?
  195. // shutdown current connections?
  196. tp.Listener.Close()
  197. close(tp.donec)
  198. }