reverse.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "fmt"
  17. "io"
  18. "log"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
  24. )
  25. // Hop-by-hop headers. These are removed when sent to the backend.
  26. // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
  27. // This list of headers borrowed from stdlib httputil.ReverseProxy
  28. var singleHopHeaders = []string{
  29. "Connection",
  30. "Keep-Alive",
  31. "Proxy-Authenticate",
  32. "Proxy-Authorization",
  33. "Te", // canonicalized version of "TE"
  34. "Trailers",
  35. "Transfer-Encoding",
  36. "Upgrade",
  37. }
  38. func removeSingleHopHeaders(hdrs *http.Header) {
  39. for _, h := range singleHopHeaders {
  40. hdrs.Del(h)
  41. }
  42. }
  43. type reverseProxy struct {
  44. director *director
  45. transport http.RoundTripper
  46. }
  47. func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
  48. proxyreq := new(http.Request)
  49. *proxyreq = *clientreq
  50. // deep-copy the headers, as these will be modified below
  51. proxyreq.Header = make(http.Header)
  52. copyHeader(proxyreq.Header, clientreq.Header)
  53. normalizeRequest(proxyreq)
  54. removeSingleHopHeaders(&proxyreq.Header)
  55. maybeSetForwardedFor(proxyreq)
  56. endpoints := p.director.endpoints()
  57. if len(endpoints) == 0 {
  58. msg := "proxy: zero endpoints currently available"
  59. // TODO: limit the rate of the error logging.
  60. log.Printf(msg)
  61. e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg)
  62. e.WriteTo(rw)
  63. return
  64. }
  65. completeCh := make(chan bool, 1)
  66. closeNotifier, ok := rw.(http.CloseNotifier)
  67. if ok {
  68. go func() {
  69. select {
  70. case <-closeNotifier.CloseNotify():
  71. tp, ok := p.transport.(*http.Transport)
  72. if ok {
  73. tp.CancelRequest(proxyreq)
  74. }
  75. case <-completeCh:
  76. }
  77. }()
  78. defer func() {
  79. completeCh <- true
  80. }()
  81. }
  82. var res *http.Response
  83. var err error
  84. for _, ep := range endpoints {
  85. redirectRequest(proxyreq, ep.URL)
  86. res, err = p.transport.RoundTrip(proxyreq)
  87. if err != nil {
  88. log.Printf("proxy: failed to direct request to %s: %v", ep.URL.String(), err)
  89. ep.Failed()
  90. continue
  91. }
  92. break
  93. }
  94. if res == nil {
  95. // TODO: limit the rate of the error logging.
  96. msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints))
  97. log.Printf(msg)
  98. e := httptypes.NewHTTPError(http.StatusBadGateway, msg)
  99. e.WriteTo(rw)
  100. return
  101. }
  102. defer res.Body.Close()
  103. removeSingleHopHeaders(&res.Header)
  104. copyHeader(rw.Header(), res.Header)
  105. rw.WriteHeader(res.StatusCode)
  106. io.Copy(rw, res.Body)
  107. }
  108. func copyHeader(dst, src http.Header) {
  109. for k, vv := range src {
  110. for _, v := range vv {
  111. dst.Add(k, v)
  112. }
  113. }
  114. }
  115. func redirectRequest(req *http.Request, loc url.URL) {
  116. req.URL.Scheme = loc.Scheme
  117. req.URL.Host = loc.Host
  118. }
  119. func normalizeRequest(req *http.Request) {
  120. req.Proto = "HTTP/1.1"
  121. req.ProtoMajor = 1
  122. req.ProtoMinor = 1
  123. req.Close = false
  124. }
  125. func maybeSetForwardedFor(req *http.Request) {
  126. clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
  127. if err != nil {
  128. return
  129. }
  130. // If we aren't the first proxy retain prior
  131. // X-Forwarded-For information as a comma+space
  132. // separated list and fold multiple headers into one.
  133. if prior, ok := req.Header["X-Forwarded-For"]; ok {
  134. clientIP = strings.Join(prior, ", ") + ", " + clientIP
  135. }
  136. req.Header.Set("X-Forwarded-For", clientIP)
  137. }