Browse Source

rafthttp: mark unreachable on unexpected response

In rafthttp, when making request to some endpoint, it may receive
response with unexpected status code and header. This indicates the endpoint
doesn't function correctly. It should mark the endpoint unreachable.
Yicheng Qin 10 years ago
parent
commit
908a011604
2 changed files with 15 additions and 6 deletions
  1. 10 6
      rafthttp/pipeline.go
  2. 5 0
      rafthttp/stream.go

+ 10 - 6
rafthttp/pipeline.go

@@ -153,14 +153,18 @@ func (p *pipeline) post(data []byte) (err error) {
 	resp.Body.Close()
 
 	err = checkPostResponse(resp, b, req, p.to)
-	// errMemberRemoved is a critical error since a removed member should
-	// always be stopped. So we use reportCriticalError to report it to errorc.
-	if err == errMemberRemoved {
-		reportCriticalError(err, p.errorc)
-		return nil
+	if err != nil {
+		p.picker.unreachable(u)
+		// errMemberRemoved is a critical error since a removed member should
+		// always be stopped. So we use reportCriticalError to report it to errorc.
+		if err == errMemberRemoved {
+			reportCriticalError(err, p.errorc)
+			return nil
+		}
+		return err
 	}
 
-	return err
+	return nil
 }
 
 // waitSchedule waits other goroutines to be scheduled for a while

+ 5 - 0
rafthttp/stream.go

@@ -392,12 +392,14 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	lv := semver.Must(semver.NewVersion(version.Version))
 	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, errUnsupportedStreamType
 	}
 
 	switch resp.StatusCode {
 	case http.StatusGone:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		err := fmt.Errorf("the member has been permanently removed from the cluster")
 		select {
 		case cr.errorc <- err:
@@ -408,6 +410,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		return resp.Body, nil
 	case http.StatusNotFound:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote)
 	case http.StatusPreconditionFailed:
 		b, err := ioutil.ReadAll(resp.Body)
@@ -416,6 +419,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 			return nil, err
 		}
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 
 		switch strings.TrimSuffix(string(b), "\n") {
 		case errIncompatibleVersion.Error():
@@ -430,6 +434,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 		}
 	default:
 		resp.Body.Close()
+		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
 	}
 }