Browse Source

Merge pull request #2797 from yichengq/stream-2.0

rafthttp: try stream msgappV1 handler if msgappV2 is unsupported
Yicheng Qin 10 years ago
parent
commit
5d741e4945
7 changed files with 277 additions and 26 deletions
  1. 3 0
      rafthttp/http.go
  2. 8 2
      rafthttp/http_test.go
  3. 9 5
      rafthttp/peer.go
  4. 4 3
      rafthttp/pipeline_test.go
  5. 94 11
      rafthttp/stream.go
  6. 144 5
      rafthttp/stream_test.go
  7. 15 0
      rafthttp/transport.go

+ 3 - 0
rafthttp/http.go

@@ -24,6 +24,7 @@ import (
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/version"
 )
 )
 
 
 const (
 const (
@@ -125,6 +126,8 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 
 
+	w.Header().Add("X-Server-Version", version.Version)
+
 	var t streamType
 	var t streamType
 	switch path.Dir(r.URL.Path) {
 	switch path.Dir(r.URL.Path) {
 	// backward compatibility
 	// backward compatibility

+ 8 - 2
rafthttp/http_test.go

@@ -28,6 +28,7 @@ import (
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/version"
 )
 )
 
 
 func TestServeRaftPrefix(t *testing.T) {
 func TestServeRaftPrefix(t *testing.T) {
@@ -197,11 +198,14 @@ func TestServeRaftStreamPrefix(t *testing.T) {
 		case <-time.After(time.Second):
 		case <-time.After(time.Second):
 			t.Fatalf("#%d: failed to attach outgoingConn", i)
 			t.Fatalf("#%d: failed to attach outgoingConn", i)
 		}
 		}
+		if g := rw.Header().Get("X-Server-Version"); g != version.Version {
+			t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version)
+		}
 		if conn.t != tt.wtype {
 		if conn.t != tt.wtype {
-			t.Errorf("$%d: type = %s, want %s", i, conn.t, tt.wtype)
+			t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
 		}
 		}
 		if conn.termStr != wterm {
 		if conn.termStr != wterm {
-			t.Errorf("$%d: term = %s, want %s", i, conn.termStr, wterm)
+			t.Errorf("#%d: term = %s, want %s", i, conn.termStr, wterm)
 		}
 		}
 		conn.Close()
 		conn.Close()
 	}
 	}
@@ -345,6 +349,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
 type fakePeer struct {
 type fakePeer struct {
 	msgs  []raftpb.Message
 	msgs  []raftpb.Message
 	urls  types.URLs
 	urls  types.URLs
+	term  uint64
 	connc chan *outgoingConn
 	connc chan *outgoingConn
 }
 }
 
 
@@ -356,5 +361,6 @@ func newFakePeer() *fakePeer {
 
 
 func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
 func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
 func (pr *fakePeer) Update(urls types.URLs)                { pr.urls = urls }
 func (pr *fakePeer) Update(urls types.URLs)                { pr.urls = urls }
+func (pr *fakePeer) setTerm(term uint64)                   { pr.term = term }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) Stop()                                 {}
 func (pr *fakePeer) Stop()                                 {}

+ 9 - 5
rafthttp/peer.go

@@ -70,6 +70,8 @@ type Peer interface {
 	Send(m raftpb.Message)
 	Send(m raftpb.Message)
 	// Update updates the urls of remote peer.
 	// Update updates the urls of remote peer.
 	Update(urls types.URLs)
 	Update(urls types.URLs)
+	// setTerm sets the term of ongoing communication.
+	setTerm(term uint64)
 	// attachOutgoingConn attachs the outgoing connection to the peer for
 	// attachOutgoingConn attachs the outgoing connection to the peer for
 	// stream usage. After the call, the ownership of the outgoing
 	// stream usage. After the call, the ownership of the outgoing
 	// connection hands over to the peer. The peer will close the connection
 	// connection hands over to the peer. The peer will close the connection
@@ -99,11 +101,13 @@ type peer struct {
 	msgAppWriter *streamWriter
 	msgAppWriter *streamWriter
 	writer       *streamWriter
 	writer       *streamWriter
 	pipeline     *pipeline
 	pipeline     *pipeline
+	msgAppReader *streamReader
 
 
 	sendc    chan raftpb.Message
 	sendc    chan raftpb.Message
 	recvc    chan raftpb.Message
 	recvc    chan raftpb.Message
 	propc    chan raftpb.Message
 	propc    chan raftpb.Message
 	newURLsC chan types.URLs
 	newURLsC chan types.URLs
+	termc    chan uint64
 
 
 	// for testing
 	// for testing
 	pausec  chan struct{}
 	pausec  chan struct{}
@@ -125,6 +129,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
 		newURLsC:     make(chan types.URLs),
 		newURLsC:     make(chan types.URLs),
+		termc:        make(chan uint64),
 		pausec:       make(chan struct{}),
 		pausec:       make(chan struct{}),
 		resumec:      make(chan struct{}),
 		resumec:      make(chan struct{}),
 		stopc:        make(chan struct{}),
 		stopc:        make(chan struct{}),
@@ -149,7 +154,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 
 
 	go func() {
 	go func() {
 		var paused bool
 		var paused bool
-		msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
+		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
 		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc)
 		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc)
 		for {
 		for {
 			select {
 			select {
@@ -169,9 +174,6 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 						m.Type, p.id, name, bufSizeMap[name])
 						m.Type, p.id, name, bufSizeMap[name])
 				}
 				}
 			case mm := <-p.recvc:
 			case mm := <-p.recvc:
-				if mm.Type == raftpb.MsgApp {
-					msgAppReader.updateMsgAppTerm(mm.Term)
-				}
 				if err := r.Process(context.TODO(), mm); err != nil {
 				if err := r.Process(context.TODO(), mm); err != nil {
 					log.Printf("peer: process raft message error: %v", err)
 					log.Printf("peer: process raft message error: %v", err)
 				}
 				}
@@ -186,7 +188,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 				p.msgAppWriter.stop()
 				p.msgAppWriter.stop()
 				p.writer.stop()
 				p.writer.stop()
 				p.pipeline.stop()
 				p.pipeline.stop()
-				msgAppReader.stop()
+				p.msgAppReader.stop()
 				reader.stop()
 				reader.stop()
 				close(p.done)
 				close(p.done)
 				return
 				return
@@ -211,6 +213,8 @@ func (p *peer) Update(urls types.URLs) {
 	}
 	}
 }
 }
 
 
+func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) }
+
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	var ok bool
 	var ok bool
 	switch conn.t {
 	switch conn.t {

+ 4 - 3
rafthttp/pipeline_test.go

@@ -197,15 +197,16 @@ func (t *roundTripperBlocker) unblock() {
 }
 }
 
 
 type respRoundTripper struct {
 type respRoundTripper struct {
-	code int
-	err  error
+	code   int
+	header http.Header
+	err    error
 }
 }
 
 
 func newRespRoundTripper(code int, err error) *respRoundTripper {
 func newRespRoundTripper(code int, err error) *respRoundTripper {
 	return &respRoundTripper{code: code, err: err}
 	return &respRoundTripper{code: code, err: err}
 }
 }
 func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
 func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
-	return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
+	return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
 }
 }
 
 
 type roundTripperRecorder struct {
 type roundTripperRecorder struct {

+ 94 - 11
rafthttp/stream.go

@@ -25,9 +25,11 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/version"
 )
 )
 
 
 const (
 const (
@@ -38,6 +40,16 @@ const (
 	streamBufSize = 4096
 	streamBufSize = 4096
 )
 )
 
 
+var (
+	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
+
+	// the key is in string format "major.minor.patch"
+	supportedStream = map[string][]streamType{
+		"2.0.0": []streamType{streamTypeMsgApp},
+		"2.1.0": []streamType{streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage},
+	}
+)
+
 type streamType string
 type streamType string
 
 
 func (t streamType) endpoint() string {
 func (t streamType) endpoint() string {
@@ -256,13 +268,30 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr
 
 
 func (cr *streamReader) run() {
 func (cr *streamReader) run() {
 	for {
 	for {
-		rc, err := cr.dial()
+		t := cr.t
+		rc, err := cr.dial(t)
+		// downgrade to streamTypeMsgApp if the remote doesn't support
+		// streamTypeMsgAppV2
+		if t == streamTypeMsgAppV2 && err == errUnsupportedStreamType {
+			t = streamTypeMsgApp
+			rc, err = cr.dial(t)
+		}
 		if err != nil {
 		if err != nil {
-			log.Printf("rafthttp: roundtripping error: %v", err)
+			if err != errUnsupportedStreamType {
+				log.Printf("rafthttp: roundtripping error: %v", err)
+			}
 		} else {
 		} else {
-			err := cr.decodeLoop(rc)
-			if err != io.EOF && !isClosedConnectionError(err) {
-				log.Printf("rafthttp: failed to read message on stream %s due to %v", cr.t, err)
+			err := cr.decodeLoop(rc, t)
+			switch {
+			// all data is read out
+			case err == io.EOF:
+			// connection is closed by the remote
+			case isClosedConnectionError(err):
+			// stream msgapp is only used for etcd 2.0, and etcd 2.0 doesn't
+			// heartbeat on the idle stream, so it is expected to time out.
+			case t == streamTypeMsgApp && isNetworkTimeoutError(err):
+			default:
+				log.Printf("rafthttp: failed to read message on stream %s due to %v", t, err)
 			}
 			}
 		}
 		}
 		select {
 		select {
@@ -276,10 +305,10 @@ func (cr *streamReader) run() {
 	}
 	}
 }
 }
 
 
-func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
+func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 	var dec decoder
 	var dec decoder
 	cr.mu.Lock()
 	cr.mu.Lock()
-	switch cr.t {
+	switch t {
 	case streamTypeMsgApp:
 	case streamTypeMsgApp:
 		dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
 		dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
 	case streamTypeMsgAppV2:
 	case streamTypeMsgAppV2:
@@ -287,7 +316,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
 	case streamTypeMessage:
 	case streamTypeMessage:
 		dec = &messageDecoder{r: rc}
 		dec = &messageDecoder{r: rc}
 	default:
 	default:
-		log.Panicf("rafthttp: unhandled stream type %s", cr.t)
+		log.Panicf("rafthttp: unhandled stream type %s", t)
 	}
 	}
 	cr.closer = rc
 	cr.closer = rc
 	cr.mu.Unlock()
 	cr.mu.Unlock()
@@ -347,14 +376,14 @@ func (cr *streamReader) isWorking() bool {
 	return cr.closer != nil
 	return cr.closer != nil
 }
 }
 
 
-func (cr *streamReader) dial() (io.ReadCloser, error) {
+func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	u := cr.picker.pick()
 	u := cr.picker.pick()
 	cr.mu.Lock()
 	cr.mu.Lock()
 	term := cr.msgAppTerm
 	term := cr.msgAppTerm
 	cr.mu.Unlock()
 	cr.mu.Unlock()
 
 
 	uu := u
 	uu := u
-	uu.Path = path.Join(cr.t.endpoint(), cr.from.String())
+	uu.Path = path.Join(t.endpoint(), cr.from.String())
 	req, err := http.NewRequest("GET", uu.String(), nil)
 	req, err := http.NewRequest("GET", uu.String(), nil)
 	if err != nil {
 	if err != nil {
 		cr.picker.unreachable(u)
 		cr.picker.unreachable(u)
@@ -362,7 +391,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
 	}
 	}
 	req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
 	req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
 	req.Header.Set("X-Raft-To", cr.to.String())
 	req.Header.Set("X-Raft-To", cr.to.String())
-	if cr.t == streamTypeMsgApp {
+	if t == streamTypeMsgApp {
 		req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
 		req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
 	}
 	}
 	cr.mu.Lock()
 	cr.mu.Lock()
@@ -373,6 +402,14 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
 		cr.picker.unreachable(u)
 		cr.picker.unreachable(u)
 		return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
 		return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
 	}
 	}
+
+	rv := serverVersion(resp.Header)
+	lv := semver.Must(semver.NewVersion(version.Version))
+	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
+		resp.Body.Close()
+		return nil, errUnsupportedStreamType
+	}
+
 	switch resp.StatusCode {
 	switch resp.StatusCode {
 	case http.StatusGone:
 	case http.StatusGone:
 		resp.Body.Close()
 		resp.Body.Close()
@@ -384,6 +421,9 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
 		return nil, err
 		return nil, err
 	case http.StatusOK:
 	case http.StatusOK:
 		return resp.Body, nil
 		return resp.Body, nil
+	case http.StatusNotFound:
+		resp.Body.Close()
+		return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to)
 	default:
 	default:
 		resp.Body.Close()
 		resp.Body.Close()
 		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
 		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
@@ -411,3 +451,46 @@ func isClosedConnectionError(err error) bool {
 	operr, ok := err.(*net.OpError)
 	operr, ok := err.(*net.OpError)
 	return ok && operr.Err.Error() == "use of closed network connection"
 	return ok && operr.Err.Error() == "use of closed network connection"
 }
 }
+
+// serverVersion returns the version from the given header.
+func serverVersion(h http.Header) *semver.Version {
+	verStr := h.Get("X-Server-Version")
+	// backward compatibility with etcd 2.0
+	if verStr == "" {
+		verStr = "2.0.0"
+	}
+	return semver.Must(semver.NewVersion(verStr))
+}
+
+// compareMajorMinorVersion returns an integer comparing two versions based on
+// their major and minor version. The result will be 0 if a==b, -1 if a < b,
+// and 1 if a > b.
+func compareMajorMinorVersion(a, b *semver.Version) int {
+	na := &semver.Version{Major: a.Major, Minor: a.Minor}
+	nb := &semver.Version{Major: b.Major, Minor: b.Minor}
+	switch {
+	case na.LessThan(*nb):
+		return -1
+	case nb.LessThan(*na):
+		return 1
+	default:
+		return 0
+	}
+}
+
+// checkStreamSupport checks whether the stream type is supported in the
+// given version.
+func checkStreamSupport(v *semver.Version, t streamType) bool {
+	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
+	for _, s := range supportedStream[nv.String()] {
+		if s == t {
+			return true
+		}
+	}
+	return false
+}
+
+func isNetworkTimeoutError(err error) bool {
+	nerr, ok := err.(net.Error)
+	return ok && nerr.Timeout()
+}

+ 144 - 5
rafthttp/stream_test.go

@@ -9,10 +9,12 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/version"
 )
 )
 
 
 // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
 // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
@@ -87,13 +89,12 @@ func TestStreamReaderDialRequest(t *testing.T) {
 		sr := &streamReader{
 		sr := &streamReader{
 			tr:         tr,
 			tr:         tr,
 			picker:     mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			picker:     mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			t:          tt,
 			from:       types.ID(1),
 			from:       types.ID(1),
 			to:         types.ID(2),
 			to:         types.ID(2),
 			cid:        types.ID(1),
 			cid:        types.ID(1),
 			msgAppTerm: 1,
 			msgAppTerm: 1,
 		}
 		}
-		sr.dial()
+		sr.dial(tt)
 
 
 		req := tr.Request()
 		req := tr.Request()
 		wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
 		wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
@@ -132,18 +133,23 @@ func TestStreamReaderDialResult(t *testing.T) {
 		{http.StatusGone, nil, false, true},
 		{http.StatusGone, nil, false, true},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		tr := newRespRoundTripper(tt.code, tt.err)
+		h := http.Header{}
+		h.Add("X-Server-Version", version.Version)
+		tr := &respRoundTripper{
+			code:   tt.code,
+			header: h,
+			err:    tt.err,
+		}
 		sr := &streamReader{
 		sr := &streamReader{
 			tr:     tr,
 			tr:     tr,
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			t:      streamTypeMessage,
 			from:   types.ID(1),
 			from:   types.ID(1),
 			to:     types.ID(2),
 			to:     types.ID(2),
 			cid:    types.ID(1),
 			cid:    types.ID(1),
 			errorc: make(chan error, 1),
 			errorc: make(chan error, 1),
 		}
 		}
 
 
-		_, err := sr.dial()
+		_, err := sr.dial(streamTypeMessage)
 		if ok := err == nil; ok != tt.wok {
 		if ok := err == nil; ok != tt.wok {
 			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
 			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
 		}
 		}
@@ -188,6 +194,30 @@ func TestStreamReaderUpdateMsgAppTerm(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestStreamReaderDialDetectUnsupport tests that dial func could find
+// out that the stream type is not supported by the remote.
+func TestStreamReaderDialDetectUnsupport(t *testing.T) {
+	for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
+		// the response from etcd 2.0
+		tr := &respRoundTripper{
+			code:   http.StatusNotFound,
+			header: http.Header{},
+		}
+		sr := &streamReader{
+			tr:     tr,
+			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			from:   types.ID(1),
+			to:     types.ID(2),
+			cid:    types.ID(1),
+		}
+
+		_, err := sr.dial(typ)
+		if err != errUnsupportedStreamType {
+			t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
+		}
+	}
+}
+
 // TestStream tests that streamReader and streamWriter can build stream to
 // TestStream tests that streamReader and streamWriter can build stream to
 // send messages between each other.
 // send messages between each other.
 func TestStream(t *testing.T) {
 func TestStream(t *testing.T) {
@@ -272,6 +302,114 @@ func TestStream(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestServerVersion(t *testing.T) {
+	tests := []struct {
+		h  http.Header
+		wv *semver.Version
+	}{
+		// backward compatibility with etcd 2.0
+		{
+			http.Header{},
+			semver.Must(semver.NewVersion("2.0.0")),
+		},
+		{
+			http.Header{"X-Server-Version": []string{"2.1.0"}},
+			semver.Must(semver.NewVersion("2.1.0")),
+		},
+		{
+			http.Header{"X-Server-Version": []string{"2.1.0-alpha.0+git"}},
+			semver.Must(semver.NewVersion("2.1.0-alpha.0+git")),
+		},
+	}
+	for i, tt := range tests {
+		v := serverVersion(tt.h)
+		if v.String() != tt.wv.String() {
+			t.Errorf("#%d: version = %s, want %s", i, v, tt.wv)
+		}
+	}
+}
+
+func TestCompareMajorMinorVersion(t *testing.T) {
+	tests := []struct {
+		va, vb *semver.Version
+		w      int
+	}{
+		// equal to
+		{
+			semver.Must(semver.NewVersion("2.1.0")),
+			semver.Must(semver.NewVersion("2.1.0")),
+			0,
+		},
+		// smaller than
+		{
+			semver.Must(semver.NewVersion("2.0.0")),
+			semver.Must(semver.NewVersion("2.1.0")),
+			-1,
+		},
+		// bigger than
+		{
+			semver.Must(semver.NewVersion("2.2.0")),
+			semver.Must(semver.NewVersion("2.1.0")),
+			1,
+		},
+		// ignore patch
+		{
+			semver.Must(semver.NewVersion("2.1.1")),
+			semver.Must(semver.NewVersion("2.1.0")),
+			0,
+		},
+		// ignore prerelease
+		{
+			semver.Must(semver.NewVersion("2.1.0-alpha.0")),
+			semver.Must(semver.NewVersion("2.1.0")),
+			0,
+		},
+	}
+	for i, tt := range tests {
+		if g := compareMajorMinorVersion(tt.va, tt.vb); g != tt.w {
+			t.Errorf("#%d: compare = %d, want %d", i, g, tt.w)
+		}
+	}
+}
+
+func TestCheckStreamSupport(t *testing.T) {
+	tests := []struct {
+		v *semver.Version
+		t streamType
+		w bool
+	}{
+		// support
+		{
+			semver.Must(semver.NewVersion("2.0.0")),
+			streamTypeMsgApp,
+			true,
+		},
+		// ignore patch
+		{
+			semver.Must(semver.NewVersion("2.0.9")),
+			streamTypeMsgApp,
+			true,
+		},
+		// ignore prerelease
+		{
+			semver.Must(semver.NewVersion("2.0.0-alpha")),
+			streamTypeMsgApp,
+			true,
+		},
+		// not support
+		{
+			semver.Must(semver.NewVersion("2.0.0")),
+			streamTypeMsgAppV2,
+			false,
+		},
+	}
+	for i, tt := range tests {
+		if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
+			t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
+		}
+	}
+}
+
 type fakeWriteFlushCloser struct {
 type fakeWriteFlushCloser struct {
 	err     error
 	err     error
 	written int
 	written int
@@ -294,6 +432,7 @@ type fakeStreamHandler struct {
 }
 }
 
 
 func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	w.Header().Add("X-Server-Version", version.Version)
 	w.(http.Flusher).Flush()
 	w.(http.Flusher).Flush()
 	c := newCloseNotifier()
 	c := newCloseNotifier()
 	h.sw.attach(&outgoingConn{
 	h.sw.attach(&outgoingConn{

+ 15 - 0
rafthttp/transport.go

@@ -77,6 +77,7 @@ type transport struct {
 	serverStats  *stats.ServerStats
 	serverStats  *stats.ServerStats
 	leaderStats  *stats.LeaderStats
 	leaderStats  *stats.LeaderStats
 
 
+	term    uint64               // the latest term that has been observed
 	mu      sync.RWMutex         // protect the remote and peer map
 	mu      sync.RWMutex         // protect the remote and peer map
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
 	peers   map[types.ID]Peer    // peers map
@@ -112,6 +113,16 @@ func (t *transport) Get(id types.ID) Peer {
 	return t.peers[id]
 	return t.peers[id]
 }
 }
 
 
+func (t *transport) maybeUpdatePeersTerm(term uint64) {
+	if t.term >= term {
+		return
+	}
+	t.term = term
+	for _, p := range t.peers {
+		p.setTerm(term)
+	}
+}
+
 func (t *transport) Send(msgs []raftpb.Message) {
 func (t *transport) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 	for _, m := range msgs {
 		// intentionally dropped message
 		// intentionally dropped message
@@ -120,6 +131,10 @@ func (t *transport) Send(msgs []raftpb.Message) {
 		}
 		}
 		to := types.ID(m.To)
 		to := types.ID(m.To)
 
 
+		if m.Type != raftpb.MsgProp { // proposal message does not have a valid term
+			t.maybeUpdatePeersTerm(m.Term)
+		}
+
 		p, ok := t.peers[to]
 		p, ok := t.peers[to]
 		if ok {
 		if ok {
 			if m.Type == raftpb.MsgApp {
 			if m.Type == raftpb.MsgApp {