Browse Source

Merge pull request #2782 from yichengq/not-close-stream

rafthttp: only close streamMsgApp when updating term
Yicheng Qin 10 years ago
parent
commit
60c8719d08
2 changed files with 41 additions and 2 deletions
  1. 6 2
      rafthttp/stream.go
  2. 35 0
      rafthttp/stream_test.go

+ 6 - 2
rafthttp/stream.go

@@ -317,14 +317,18 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
 	}
 }
 
+// updateMsgAppTerm updates the term for MsgApp stream, and closes
+// the existing MsgApp stream if term is updated.
 func (cr *streamReader) updateMsgAppTerm(term uint64) {
 	cr.mu.Lock()
 	defer cr.mu.Unlock()
-	if cr.msgAppTerm == term {
+	if cr.msgAppTerm >= term {
 		return
 	}
 	cr.msgAppTerm = term
-	cr.close()
+	if cr.t == streamTypeMsgApp {
+		cr.close()
+	}
 }
 
 // TODO: always cancel in-flight dial and decode

+ 35 - 0
rafthttp/stream_test.go

@@ -153,6 +153,41 @@ func TestStreamReaderDialResult(t *testing.T) {
 	}
 }
 
+func TestStreamReaderUpdateMsgAppTerm(t *testing.T) {
+	term := uint64(2)
+	tests := []struct {
+		term   uint64
+		typ    streamType
+		wterm  uint64
+		wclose bool
+	}{
+		// lower term
+		{1, streamTypeMsgApp, 2, false},
+		// unchanged term
+		{2, streamTypeMsgApp, 2, false},
+		// higher term
+		{3, streamTypeMessage, 3, false},
+		{3, streamTypeMsgAppV2, 3, false},
+		// higher term, reset closer
+		{3, streamTypeMsgApp, 3, true},
+	}
+	for i, tt := range tests {
+		closer := &fakeWriteFlushCloser{}
+		cr := &streamReader{
+			msgAppTerm: term,
+			t:          tt.typ,
+			closer:     closer,
+		}
+		cr.updateMsgAppTerm(tt.term)
+		if cr.msgAppTerm != tt.wterm {
+			t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm)
+		}
+		if closer.closed != tt.wclose {
+			t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose)
+		}
+	}
+}
+
 // TestStream tests that streamReader and streamWriter can build stream to
 // send messages between each other.
 func TestStream(t *testing.T) {