Browse Source

Merge pull request #3978 from xiang90/rh

rafthttp: snapshot sender cleanup
Xiang Li 10 years ago
parent
commit
8eaa8f9e0b
2 changed files with 36 additions and 14 deletions
  1. 24 0
      pkg/ioutil/readcloser.go
  2. 12 14
      rafthttp/snapshot_sender.go

+ 24 - 0
pkg/ioutil/readcloser.go

@@ -0,0 +1,24 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ioutil
+
+import "io"
+
+// ReaderAndCloser implements io.ReadCloser interface by combining
+// reader and closer together.
+type ReaderAndCloser struct {
+	io.Reader
+	io.Closer
+}

+ 12 - 14
rafthttp/snapshot_sender.go

@@ -22,11 +22,17 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/pkg/httputil"
+	pioutil "github.com/coreos/etcd/pkg/ioutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/snap"
 )
 
+var (
+	// timeout for reading snapshot response body
+	snapResponseReadTimeout = 5 * time.Second
+)
+
 type snapshotSender struct {
 	from, to types.ID
 	cid      types.ID
@@ -108,19 +114,17 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
 	result := make(chan responseAndError, 1)
 
 	go func() {
-		// TODO: cancel the request if it has waited for a long time(~5s) after
-		// it has write out the full request body, which helps to avoid receiver
-		// dies when sender is waiting for response
-		// TODO: the snapshot could be large and eat up all resources when writing
-		// it out. Send it block by block and rest some time between to give the
-		// time for main loop to run.
 		resp, err := s.tr.RoundTrip(req)
 		if err != nil {
 			result <- responseAndError{resp, nil, err}
 			return
 		}
+
+		// close the response body when timeouts.
+		// prevents from reading the body forever when the other side dies right after
+		// successfully receives the request body.
+		time.AfterFunc(snapResponseReadTimeout, func() { resp.Body.Close() })
 		body, err := ioutil.ReadAll(resp.Body)
-		resp.Body.Close()
 		result <- responseAndError{resp, body, err}
 	}()
 
@@ -136,12 +140,6 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
 	}
 }
 
-// readCloser implements io.ReadCloser interface.
-type readCloser struct {
-	io.Reader
-	io.Closer
-}
-
 func createSnapBody(merged snap.Message) io.ReadCloser {
 	buf := new(bytes.Buffer)
 	enc := &messageEncoder{w: buf}
@@ -150,7 +148,7 @@ func createSnapBody(merged snap.Message) io.ReadCloser {
 		plog.Panicf("encode message error (%v)", err)
 	}
 
-	return &readCloser{
+	return &pioutil.ReaderAndCloser{
 		Reader: io.MultiReader(buf, merged.ReadCloser),
 		Closer: merged.ReadCloser,
 	}