Pārlūkot izejas kodu

rafthttp: send all MsgApp on stream msgAppV2

For stream msgAppV2, as long as the message is MsgApp type, it should be sent
through stream msgAppV2.
Yicheng Qin 10 gadi atpakaļ
vecāks
revīzija
5060b2f322
2 mainītis faili ar 3 papildinājumiem un 5 dzēšanām
  1. 3 1
      rafthttp/peer.go
  2. 0 4
      rafthttp/stream.go

+ 3 - 1
rafthttp/peer.go

@@ -264,7 +264,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri
 	// stream for a long time, only use one of the N pipelines to send MsgSnap.
 	if isMsgSnap(m) {
 		return p.pipeline.msgc, pipelineMsg
-	} else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) {
+	} else if writec, ok = p.msgAppWriter.writec(); ok && isMsgApp(m) {
 		return writec, streamAppV2
 	} else if writec, ok = p.writer.writec(); ok {
 		return writec, streamMsg
@@ -272,4 +272,6 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri
 	return p.pipeline.msgc, pipelineMsg
 }
 
+func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
+
 func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

+ 0 - 4
rafthttp/stream.go

@@ -441,10 +441,6 @@ func (cr *streamReader) close() {
 	cr.closer = nil
 }
 
-func canUseMsgAppStream(m raftpb.Message) bool {
-	return m.Type == raftpb.MsgApp && m.Term == m.LogTerm
-}
-
 func isClosedConnectionError(err error) bool {
 	operr, ok := err.(*net.OpError)
 	return ok && operr.Err.Error() == "use of closed network connection"