Browse Source

rafthttp: update streamReader term in time

Because etcd 2.1 will build stream to any existing peers and etcd 2.0
requires the remote to provide most updated term, it is
necessary for streamReader to know the latest term.
Yicheng Qin 10 years ago
parent
commit
19fc1a7137
3 changed files with 26 additions and 5 deletions
  1. 2 0
      rafthttp/http_test.go
  2. 9 5
      rafthttp/peer.go
  3. 15 0
      rafthttp/transport.go

+ 2 - 0
rafthttp/http_test.go

@@ -349,6 +349,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
 type fakePeer struct {
 	msgs  []raftpb.Message
 	urls  types.URLs
+	term  uint64
 	connc chan *outgoingConn
 }
 
@@ -360,5 +361,6 @@ func newFakePeer() *fakePeer {
 
 func (pr *fakePeer) Send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
 func (pr *fakePeer) Update(urls types.URLs)                { pr.urls = urls }
+func (pr *fakePeer) setTerm(term uint64)                   { pr.term = term }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) Stop()                                 {}

+ 9 - 5
rafthttp/peer.go

@@ -70,6 +70,8 @@ type Peer interface {
 	Send(m raftpb.Message)
 	// Update updates the urls of remote peer.
 	Update(urls types.URLs)
+	// setTerm sets the term of ongoing communication.
+	setTerm(term uint64)
 	// attachOutgoingConn attachs the outgoing connection to the peer for
 	// stream usage. After the call, the ownership of the outgoing
 	// connection hands over to the peer. The peer will close the connection
@@ -99,11 +101,13 @@ type peer struct {
 	msgAppWriter *streamWriter
 	writer       *streamWriter
 	pipeline     *pipeline
+	msgAppReader *streamReader
 
 	sendc    chan raftpb.Message
 	recvc    chan raftpb.Message
 	propc    chan raftpb.Message
 	newURLsC chan types.URLs
+	termc    chan uint64
 
 	// for testing
 	pausec  chan struct{}
@@ -125,6 +129,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		propc:        make(chan raftpb.Message, maxPendingProposals),
 		newURLsC:     make(chan types.URLs),
+		termc:        make(chan uint64),
 		pausec:       make(chan struct{}),
 		resumec:      make(chan struct{}),
 		stopc:        make(chan struct{}),
@@ -149,7 +154,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 
 	go func() {
 		var paused bool
-		msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
+		p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc)
 		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc)
 		for {
 			select {
@@ -169,9 +174,6 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 						m.Type, p.id, name, bufSizeMap[name])
 				}
 			case mm := <-p.recvc:
-				if mm.Type == raftpb.MsgApp {
-					msgAppReader.updateMsgAppTerm(mm.Term)
-				}
 				if err := r.Process(context.TODO(), mm); err != nil {
 					log.Printf("peer: process raft message error: %v", err)
 				}
@@ -186,7 +188,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 				p.msgAppWriter.stop()
 				p.writer.stop()
 				p.pipeline.stop()
-				msgAppReader.stop()
+				p.msgAppReader.stop()
 				reader.stop()
 				close(p.done)
 				return
@@ -211,6 +213,8 @@ func (p *peer) Update(urls types.URLs) {
 	}
 }
 
+func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) }
+
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	var ok bool
 	switch conn.t {

+ 15 - 0
rafthttp/transport.go

@@ -77,6 +77,7 @@ type transport struct {
 	serverStats  *stats.ServerStats
 	leaderStats  *stats.LeaderStats
 
+	term    uint64               // the latest term that has been observed
 	mu      sync.RWMutex         // protect the remote and peer map
 	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
 	peers   map[types.ID]Peer    // peers map
@@ -112,6 +113,16 @@ func (t *transport) Get(id types.ID) Peer {
 	return t.peers[id]
 }
 
+func (t *transport) maybeUpdatePeersTerm(term uint64) {
+	if t.term >= term {
+		return
+	}
+	t.term = term
+	for _, p := range t.peers {
+		p.setTerm(term)
+	}
+}
+
 func (t *transport) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		// intentionally dropped message
@@ -120,6 +131,10 @@ func (t *transport) Send(msgs []raftpb.Message) {
 		}
 		to := types.ID(m.To)
 
+		if m.Type != raftpb.MsgProp { // proposal message does not have a valid term
+			t.maybeUpdatePeersTerm(m.Term)
+		}
+
 		p, ok := t.peers[to]
 		if ok {
 			if m.Type == raftpb.MsgApp {