reverse.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package httpproxy
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "strings"
  25. "sync/atomic"
  26. "time"
  27. "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
  28. "github.com/coreos/pkg/capnslog"
  29. )
  30. var (
  31. plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/httpproxy")
  32. // Hop-by-hop headers. These are removed when sent to the backend.
  33. // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
  34. // This list of headers borrowed from stdlib httputil.ReverseProxy
  35. singleHopHeaders = []string{
  36. "Connection",
  37. "Keep-Alive",
  38. "Proxy-Authenticate",
  39. "Proxy-Authorization",
  40. "Te", // canonicalized version of "TE"
  41. "Trailers",
  42. "Transfer-Encoding",
  43. "Upgrade",
  44. }
  45. )
  46. func removeSingleHopHeaders(hdrs *http.Header) {
  47. for _, h := range singleHopHeaders {
  48. hdrs.Del(h)
  49. }
  50. }
  51. type reverseProxy struct {
  52. director *director
  53. transport http.RoundTripper
  54. }
  55. func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
  56. reportIncomingRequest(clientreq)
  57. proxyreq := new(http.Request)
  58. *proxyreq = *clientreq
  59. startTime := time.Now()
  60. var (
  61. proxybody []byte
  62. err error
  63. )
  64. if clientreq.Body != nil {
  65. proxybody, err = ioutil.ReadAll(clientreq.Body)
  66. if err != nil {
  67. msg := fmt.Sprintf("failed to read request body: %v", err)
  68. plog.Println(msg)
  69. e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
  70. if we := e.WriteTo(rw); we != nil {
  71. plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
  72. }
  73. return
  74. }
  75. }
  76. // deep-copy the headers, as these will be modified below
  77. proxyreq.Header = make(http.Header)
  78. copyHeader(proxyreq.Header, clientreq.Header)
  79. normalizeRequest(proxyreq)
  80. removeSingleHopHeaders(&proxyreq.Header)
  81. maybeSetForwardedFor(proxyreq)
  82. endpoints := p.director.endpoints()
  83. if len(endpoints) == 0 {
  84. msg := "zero endpoints currently available"
  85. reportRequestDropped(clientreq, zeroEndpoints)
  86. // TODO: limit the rate of the error logging.
  87. plog.Println(msg)
  88. e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
  89. if we := e.WriteTo(rw); we != nil {
  90. plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
  91. }
  92. return
  93. }
  94. var requestClosed int32
  95. completeCh := make(chan bool, 1)
  96. closeNotifier, ok := rw.(http.CloseNotifier)
  97. ctx, cancel := context.WithCancel(context.Background())
  98. proxyreq = proxyreq.WithContext(ctx)
  99. defer cancel()
  100. if ok {
  101. closeCh := closeNotifier.CloseNotify()
  102. go func() {
  103. select {
  104. case <-closeCh:
  105. atomic.StoreInt32(&requestClosed, 1)
  106. plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
  107. cancel()
  108. case <-completeCh:
  109. }
  110. }()
  111. defer func() {
  112. completeCh <- true
  113. }()
  114. }
  115. var res *http.Response
  116. for _, ep := range endpoints {
  117. if proxybody != nil {
  118. proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
  119. }
  120. redirectRequest(proxyreq, ep.URL)
  121. res, err = p.transport.RoundTrip(proxyreq)
  122. if atomic.LoadInt32(&requestClosed) == 1 {
  123. return
  124. }
  125. if err != nil {
  126. reportRequestDropped(clientreq, failedSendingRequest)
  127. plog.Printf("failed to direct request to %s: %v", ep.URL.String(), err)
  128. ep.Failed()
  129. continue
  130. }
  131. break
  132. }
  133. if res == nil {
  134. // TODO: limit the rate of the error logging.
  135. msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
  136. reportRequestDropped(clientreq, failedGettingResponse)
  137. plog.Println(msg)
  138. e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
  139. if we := e.WriteTo(rw); we != nil {
  140. plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
  141. }
  142. return
  143. }
  144. defer res.Body.Close()
  145. reportRequestHandled(clientreq, res, startTime)
  146. removeSingleHopHeaders(&res.Header)
  147. copyHeader(rw.Header(), res.Header)
  148. rw.WriteHeader(res.StatusCode)
  149. io.Copy(rw, res.Body)
  150. }
  151. func copyHeader(dst, src http.Header) {
  152. for k, vv := range src {
  153. for _, v := range vv {
  154. dst.Add(k, v)
  155. }
  156. }
  157. }
  158. func redirectRequest(req *http.Request, loc url.URL) {
  159. req.URL.Scheme = loc.Scheme
  160. req.URL.Host = loc.Host
  161. }
  162. func normalizeRequest(req *http.Request) {
  163. req.Proto = "HTTP/1.1"
  164. req.ProtoMajor = 1
  165. req.ProtoMinor = 1
  166. req.Close = false
  167. }
  168. func maybeSetForwardedFor(req *http.Request) {
  169. clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
  170. if err != nil {
  171. return
  172. }
  173. // If we aren't the first proxy retain prior
  174. // X-Forwarded-For information as a comma+space
  175. // separated list and fold multiple headers into one.
  176. if prior, ok := req.Header["X-Forwarded-For"]; ok {
  177. clientIP = strings.Join(prior, ", ") + ", " + clientIP
  178. }
  179. req.Header.Set("X-Forwarded-For", clientIP)
  180. }