|
|
@@ -180,6 +180,11 @@ func (p *peer) Stop() {
|
|
|
|
|
|
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) {
|
|
|
switch {
|
|
|
+ // Considering MsgSnap may have a big size, e.g., 1G, and will block
|
|
|
+ // stream for a long time, only use one of the N pipelines to send MsgSnap.
|
|
|
+ case isMsgSnap(m):
|
|
|
+ writec = p.pipeline.msgc
|
|
|
+ name, size = "pipeline", pipelineBufSize
|
|
|
case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
|
|
|
writec = p.msgAppWriter.msgc
|
|
|
name, size = "msgapp stream", streamBufSize
|
|
|
@@ -192,3 +197,5 @@ func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string,
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
+func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
|