userspace.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tcpproxy
  15. import (
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "net"
  20. "sync"
  21. "time"
  22. "github.com/coreos/pkg/capnslog"
  23. "go.uber.org/zap"
  24. )
  25. var plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "proxy/tcpproxy")
  26. type remote struct {
  27. mu sync.Mutex
  28. srv *net.SRV
  29. addr string
  30. inactive bool
  31. }
  32. func (r *remote) inactivate() {
  33. r.mu.Lock()
  34. defer r.mu.Unlock()
  35. r.inactive = true
  36. }
  37. func (r *remote) tryReactivate() error {
  38. conn, err := net.Dial("tcp", r.addr)
  39. if err != nil {
  40. return err
  41. }
  42. conn.Close()
  43. r.mu.Lock()
  44. defer r.mu.Unlock()
  45. r.inactive = false
  46. return nil
  47. }
  48. func (r *remote) isActive() bool {
  49. r.mu.Lock()
  50. defer r.mu.Unlock()
  51. return !r.inactive
  52. }
  53. type TCPProxy struct {
  54. Logger *zap.Logger
  55. Listener net.Listener
  56. Endpoints []*net.SRV
  57. MonitorInterval time.Duration
  58. donec chan struct{}
  59. mu sync.Mutex // guards the following fields
  60. remotes []*remote
  61. pickCount int // for round robin
  62. }
  63. func (tp *TCPProxy) Run() error {
  64. tp.donec = make(chan struct{})
  65. if tp.MonitorInterval == 0 {
  66. tp.MonitorInterval = 5 * time.Minute
  67. }
  68. for _, srv := range tp.Endpoints {
  69. addr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
  70. tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
  71. }
  72. eps := []string{}
  73. for _, ep := range tp.Endpoints {
  74. eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
  75. }
  76. if tp.Logger != nil {
  77. tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))
  78. } else {
  79. plog.Printf("ready to proxy client requests to %+v", eps)
  80. }
  81. go tp.runMonitor()
  82. for {
  83. in, err := tp.Listener.Accept()
  84. if err != nil {
  85. return err
  86. }
  87. go tp.serve(in)
  88. }
  89. }
  90. func (tp *TCPProxy) pick() *remote {
  91. var weighted []*remote
  92. var unweighted []*remote
  93. bestPr := uint16(65535)
  94. w := 0
  95. // find best priority class
  96. for _, r := range tp.remotes {
  97. switch {
  98. case !r.isActive():
  99. case r.srv.Priority < bestPr:
  100. bestPr = r.srv.Priority
  101. w = 0
  102. weighted = nil
  103. unweighted = []*remote{r}
  104. fallthrough
  105. case r.srv.Priority == bestPr:
  106. if r.srv.Weight > 0 {
  107. weighted = append(weighted, r)
  108. w += int(r.srv.Weight)
  109. } else {
  110. unweighted = append(unweighted, r)
  111. }
  112. }
  113. }
  114. if weighted != nil {
  115. if len(unweighted) > 0 && rand.Intn(100) == 1 {
  116. // In the presence of records containing weights greater
  117. // than 0, records with weight 0 should have a very small
  118. // chance of being selected.
  119. r := unweighted[tp.pickCount%len(unweighted)]
  120. tp.pickCount++
  121. return r
  122. }
  123. // choose a uniform random number between 0 and the sum computed
  124. // (inclusive), and select the RR whose running sum value is the
  125. // first in the selected order
  126. choose := rand.Intn(w)
  127. for i := 0; i < len(weighted); i++ {
  128. choose -= int(weighted[i].srv.Weight)
  129. if choose <= 0 {
  130. return weighted[i]
  131. }
  132. }
  133. }
  134. if unweighted != nil {
  135. for i := 0; i < len(tp.remotes); i++ {
  136. picked := tp.remotes[tp.pickCount%len(tp.remotes)]
  137. tp.pickCount++
  138. if picked.isActive() {
  139. return picked
  140. }
  141. }
  142. }
  143. return nil
  144. }
  145. func (tp *TCPProxy) serve(in net.Conn) {
  146. var (
  147. err error
  148. out net.Conn
  149. )
  150. for {
  151. tp.mu.Lock()
  152. remote := tp.pick()
  153. tp.mu.Unlock()
  154. if remote == nil {
  155. break
  156. }
  157. // TODO: add timeout
  158. out, err = net.Dial("tcp", remote.addr)
  159. if err == nil {
  160. break
  161. }
  162. remote.inactivate()
  163. if tp.Logger != nil {
  164. tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
  165. } else {
  166. plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval)
  167. }
  168. }
  169. if out == nil {
  170. in.Close()
  171. return
  172. }
  173. go func() {
  174. io.Copy(in, out)
  175. in.Close()
  176. out.Close()
  177. }()
  178. io.Copy(out, in)
  179. out.Close()
  180. in.Close()
  181. }
  182. func (tp *TCPProxy) runMonitor() {
  183. for {
  184. select {
  185. case <-time.After(tp.MonitorInterval):
  186. tp.mu.Lock()
  187. for _, rem := range tp.remotes {
  188. if rem.isActive() {
  189. continue
  190. }
  191. go func(r *remote) {
  192. if err := r.tryReactivate(); err != nil {
  193. if tp.Logger != nil {
  194. tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
  195. } else {
  196. plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
  197. }
  198. } else {
  199. if tp.Logger != nil {
  200. tp.Logger.Info("activated", zap.String("address", r.addr))
  201. } else {
  202. plog.Printf("activated %s", r.addr)
  203. }
  204. }
  205. }(rem)
  206. }
  207. tp.mu.Unlock()
  208. case <-tp.donec:
  209. return
  210. }
  211. }
  212. }
  213. func (tp *TCPProxy) Stop() {
  214. // graceful shutdown?
  215. // shutdown current connections?
  216. tp.Listener.Close()
  217. close(tp.donec)
  218. }