Browse Source

Merge pull request #3759 from yichengq/rafthttp-unreachable

rafthttp: mark unreachable on unexpected response
Yicheng Qin 10 years ago
parent
commit
1b3d9130c9
3 changed files with 15 additions and 6 deletions
  1. 9 6
      rafthttp/pipeline.go
  2. 5 0
      rafthttp/stream.go
  3. 1 0
      rafthttp/transport_test.go

+ 9 - 6
rafthttp/pipeline.go

@@ -153,14 +153,17 @@ func (p *pipeline) post(data []byte) (err error) {
 	resp.Body.Close()
 
 	err = checkPostResponse(resp, b, req, p.to)
-	// errMemberRemoved is a critical error since a removed member should
-	// always be stopped. So we use reportCriticalError to report it to errorc.
-	if err == errMemberRemoved {
-		reportCriticalError(err, p.errorc)
-		return nil
+	if err != nil {
+		p.picker.unreachable(u)
+		// errMemberRemoved is a critical error since a removed member should
+		// always be stopped. So we use reportCriticalError to report it to errorc.
+		if err == errMemberRemoved {
+			reportCriticalError(err, p.errorc)
+		}
+		return err
 	}
 
-	return err
+	return nil
 }
 
 // waitSchedule waits other goroutines to be scheduled for a while

+ 5 - 0
rafthttp/stream.go

@@ -392,12 +392,14 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	lv := semver.Must(semver.NewVersion(version.Version))
 	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, errUnsupportedStreamType
 	}
 
 	switch resp.StatusCode {
 	case http.StatusGone:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		err := fmt.Errorf("the member has been permanently removed from the cluster")
 		select {
 		case cr.errorc <- err:
@@ -408,6 +410,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		return resp.Body, nil
 	case http.StatusNotFound:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
 	case http.StatusPreconditionFailed:
 		b, err := ioutil.ReadAll(resp.Body)
@@ -416,6 +419,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 			return nil, err
 		}
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 
 		switch strings.TrimSuffix(string(b), "\n") {
 		case errIncompatibleVersion.Error():
@@ -430,6 +434,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		}
 	default:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
 	}
 }

+ 1 - 0
rafthttp/transport_test.go

@@ -128,6 +128,7 @@ func TestTransportUpdate(t *testing.T) {
 func TestTransportErrorc(t *testing.T) {
 	errorc := make(chan error, 1)
 	tr := &Transport{
+		Raft:        &fakeRaft{},
 		LeaderStats: stats.NewLeaderStats(""),
 		ErrorC:      errorc,
 		streamRt:    newRespRoundTripper(http.StatusForbidden, nil),