123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package httpproxy
- import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "strings"
- "sync/atomic"
- "time"
- "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
- "github.com/coreos/etcd/pkg/httputil"
- "github.com/coreos/pkg/capnslog"
- )
- var (
- plog = capnslog.NewPackageLogger("github.com/coreos/etcd/proxy", "httpproxy")
- // Hop-by-hop headers. These are removed when sent to the backend.
- // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
- // This list of headers borrowed from stdlib httputil.ReverseProxy
- singleHopHeaders = []string{
- "Connection",
- "Keep-Alive",
- "Proxy-Authenticate",
- "Proxy-Authorization",
- "Te", // canonicalized version of "TE"
- "Trailers",
- "Transfer-Encoding",
- "Upgrade",
- }
- )
- func removeSingleHopHeaders(hdrs *http.Header) {
- for _, h := range singleHopHeaders {
- hdrs.Del(h)
- }
- }
- type reverseProxy struct {
- director *director
- transport http.RoundTripper
- }
- func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
- reportIncomingRequest(clientreq)
- proxyreq := new(http.Request)
- *proxyreq = *clientreq
- startTime := time.Now()
- var (
- proxybody []byte
- err error
- )
- if clientreq.Body != nil {
- proxybody, err = ioutil.ReadAll(clientreq.Body)
- if err != nil {
- msg := fmt.Sprintf("failed to read request body: %v", err)
- plog.Println(msg)
- e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
- if we := e.WriteTo(rw); we != nil {
- plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
- }
- return
- }
- }
- // deep-copy the headers, as these will be modified below
- proxyreq.Header = make(http.Header)
- copyHeader(proxyreq.Header, clientreq.Header)
- normalizeRequest(proxyreq)
- removeSingleHopHeaders(&proxyreq.Header)
- maybeSetForwardedFor(proxyreq)
- endpoints := p.director.endpoints()
- if len(endpoints) == 0 {
- msg := "zero endpoints currently available"
- reportRequestDropped(clientreq, zeroEndpoints)
- // TODO: limit the rate of the error logging.
- plog.Println(msg)
- e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
- if we := e.WriteTo(rw); we != nil {
- plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
- }
- return
- }
- var requestClosed int32
- completeCh := make(chan bool, 1)
- closeNotifier, ok := rw.(http.CloseNotifier)
- cancel := httputil.RequestCanceler(p.transport, proxyreq)
- if ok {
- closeCh := closeNotifier.CloseNotify()
- go func() {
- select {
- case <-closeCh:
- atomic.StoreInt32(&requestClosed, 1)
- plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
- cancel()
- case <-completeCh:
- }
- }()
- defer func() {
- completeCh <- true
- }()
- }
- var res *http.Response
- for _, ep := range endpoints {
- if proxybody != nil {
- proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
- }
- redirectRequest(proxyreq, ep.URL)
- res, err = p.transport.RoundTrip(proxyreq)
- if atomic.LoadInt32(&requestClosed) == 1 {
- return
- }
- if err != nil {
- reportRequestDropped(clientreq, failedSendingRequest)
- plog.Printf("failed to direct request to %s: %v", ep.URL.String(), err)
- ep.Failed()
- continue
- }
- break
- }
- if res == nil {
- // TODO: limit the rate of the error logging.
- msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
- reportRequestDropped(clientreq, failedGettingResponse)
- plog.Println(msg)
- e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
- if we := e.WriteTo(rw); we != nil {
- plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
- }
- return
- }
- defer res.Body.Close()
- reportRequestHandled(clientreq, res, startTime)
- removeSingleHopHeaders(&res.Header)
- copyHeader(rw.Header(), res.Header)
- rw.WriteHeader(res.StatusCode)
- io.Copy(rw, res.Body)
- }
- func copyHeader(dst, src http.Header) {
- for k, vv := range src {
- for _, v := range vv {
- dst.Add(k, v)
- }
- }
- }
- func redirectRequest(req *http.Request, loc url.URL) {
- req.URL.Scheme = loc.Scheme
- req.URL.Host = loc.Host
- }
- func normalizeRequest(req *http.Request) {
- req.Proto = "HTTP/1.1"
- req.ProtoMajor = 1
- req.ProtoMinor = 1
- req.Close = false
- }
- func maybeSetForwardedFor(req *http.Request) {
- clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
- if err != nil {
- return
- }
- // If we aren't the first proxy retain prior
- // X-Forwarded-For information as a comma+space
- // separated list and fold multiple headers into one.
- if prior, ok := req.Header["X-Forwarded-For"]; ok {
- clientIP = strings.Join(prior, ", ") + ", " + clientIP
- }
- req.Header.Set("X-Forwarded-For", clientIP)
- }
|