|
|
@@ -121,7 +121,7 @@ func (cw *streamWriter) run() {
|
|
|
reportSentFailure(string(t), linkHeartbeatMessage)
|
|
|
|
|
|
log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
|
|
- cw.resetCloser()
|
|
|
+ cw.close()
|
|
|
heartbeatc, msgc = nil, nil
|
|
|
continue
|
|
|
}
|
|
|
@@ -131,7 +131,7 @@ func (cw *streamWriter) run() {
|
|
|
if t == streamTypeMsgApp && m.Term != msgAppTerm {
|
|
|
// TODO: reasonable retry logic
|
|
|
if m.Term > msgAppTerm {
|
|
|
- cw.resetCloser()
|
|
|
+ cw.close()
|
|
|
heartbeatc, msgc = nil, nil
|
|
|
// TODO: report to raft at peer level
|
|
|
cw.r.ReportUnreachable(m.To)
|
|
|
@@ -143,7 +143,7 @@ func (cw *streamWriter) run() {
|
|
|
reportSentFailure(string(t), m)
|
|
|
|
|
|
log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
|
|
|
- cw.resetCloser()
|
|
|
+ cw.close()
|
|
|
heartbeatc, msgc = nil, nil
|
|
|
cw.r.ReportUnreachable(m.To)
|
|
|
continue
|
|
|
@@ -151,7 +151,7 @@ func (cw *streamWriter) run() {
|
|
|
flusher.Flush()
|
|
|
reportSentDuration(string(t), m, time.Since(start))
|
|
|
case conn := <-cw.connc:
|
|
|
- cw.resetCloser()
|
|
|
+ cw.close()
|
|
|
t = conn.t
|
|
|
switch conn.t {
|
|
|
case streamTypeMsgApp:
|
|
|
@@ -175,7 +175,7 @@ func (cw *streamWriter) run() {
|
|
|
cw.mu.Unlock()
|
|
|
heartbeatc, msgc = tickc, cw.msgc
|
|
|
case <-cw.stopc:
|
|
|
- cw.resetCloser()
|
|
|
+ cw.close()
|
|
|
close(cw.done)
|
|
|
return
|
|
|
}
|
|
|
@@ -188,7 +188,7 @@ func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
|
|
|
return cw.msgc, cw.working
|
|
|
}
|
|
|
|
|
|
-func (cw *streamWriter) resetCloser() {
|
|
|
+func (cw *streamWriter) close() {
|
|
|
cw.mu.Lock()
|
|
|
defer cw.mu.Unlock()
|
|
|
if !cw.working {
|
|
|
@@ -297,7 +297,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
|
|
|
switch {
|
|
|
case err != nil:
|
|
|
cr.mu.Lock()
|
|
|
- cr.resetCloser()
|
|
|
+ cr.close()
|
|
|
cr.mu.Unlock()
|
|
|
return err
|
|
|
case isLinkHeartbeatMessage(m):
|
|
|
@@ -324,7 +324,7 @@ func (cr *streamReader) updateMsgAppTerm(term uint64) {
|
|
|
return
|
|
|
}
|
|
|
cr.msgAppTerm = term
|
|
|
- cr.resetCloser()
|
|
|
+ cr.close()
|
|
|
}
|
|
|
|
|
|
// TODO: always cancel in-flight dial and decode
|
|
|
@@ -332,7 +332,7 @@ func (cr *streamReader) stop() {
|
|
|
close(cr.stopc)
|
|
|
cr.mu.Lock()
|
|
|
cr.cancelRequest()
|
|
|
- cr.resetCloser()
|
|
|
+ cr.close()
|
|
|
cr.mu.Unlock()
|
|
|
<-cr.done
|
|
|
}
|
|
|
@@ -392,7 +392,7 @@ func (cr *streamReader) cancelRequest() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (cr *streamReader) resetCloser() {
|
|
|
+func (cr *streamReader) close() {
|
|
|
if cr.closer != nil {
|
|
|
cr.closer.Close()
|
|
|
}
|