Browse Source

*: drain http.Response.Body before closing

Gyu-Ho Lee 9 years ago
parent
commit
a42d1dc1fe
4 changed files with 21 additions and 8 deletions
  1. 2 1
      etcdserver/cluster_util.go
  2. 13 1
      pkg/httputil/httputil.go
  3. 1 1
      rafthttp/snapshot_sender.go
  4. 5 5
      rafthttp/stream.go

+ 2 - 1
etcdserver/cluster_util.go

@@ -22,6 +22,7 @@ import (
 	"sort"
 	"time"
 
+	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/go-semver/semver"
@@ -231,7 +232,7 @@ func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
 		}
 		// etcd 2.0 does not have version endpoint on peer url.
 		if resp.StatusCode == http.StatusNotFound {
-			resp.Body.Close()
+			httputil.GracefulClose(resp)
 			return &version.Versions{
 				Server:  "2.0.0",
 				Cluster: "2.0.0",

+ 13 - 1
pkg/httputil/cancelreq.go → pkg/httputil/httputil.go

@@ -7,7 +7,11 @@
 // Package httputil provides HTTP utility functions.
 package httputil
 
-import "net/http"
+import (
+	"io"
+	"io/ioutil"
+	"net/http"
+)
 
 func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
 	ch := make(chan struct{})
@@ -17,3 +21,11 @@ func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
 		close(ch)
 	}
 }
+
+// GracefulClose drains http.Response.Body until it hits EOF
+// and closes it. This prevents TCP/TLS connections from closing,
+// therefore available for reuse.
+func GracefulClose(resp *http.Response) {
+	io.Copy(ioutil.Discard, resp.Body)
+	resp.Body.Close()
+}

+ 1 - 1
rafthttp/snapshot_sender.go

@@ -124,7 +124,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
 		// close the response body when timeouts.
 		// prevents from reading the body forever when the other side dies right after
 		// successfully receives the request body.
-		time.AfterFunc(snapResponseReadTimeout, func() { resp.Body.Close() })
+		time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
 		body, err := ioutil.ReadAll(resp.Body)
 		result <- responseAndError{resp, body, err}
 	}()

+ 5 - 5
rafthttp/stream.go

@@ -417,14 +417,14 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	rv := serverVersion(resp.Header)
 	lv := semver.Must(semver.NewVersion(version.Version))
 	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
-		resp.Body.Close()
+		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 		return nil, errUnsupportedStreamType
 	}
 
 	switch resp.StatusCode {
 	case http.StatusGone:
-		resp.Body.Close()
+		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 		err := fmt.Errorf("the member has been permanently removed from the cluster")
 		select {
@@ -435,7 +435,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	case http.StatusOK:
 		return resp.Body, nil
 	case http.StatusNotFound:
-		resp.Body.Close()
+		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
 	case http.StatusPreconditionFailed:
@@ -444,7 +444,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 			cr.picker.unreachable(u)
 			return nil, err
 		}
-		resp.Body.Close()
+		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 
 		switch strings.TrimSuffix(string(b), "\n") {
@@ -459,7 +459,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
 		}
 	default:
-		resp.Body.Close()
+		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
 	}