Browse Source

rafthttp: add test for streamReader.updateMsgAppTerm

Yicheng Qin 10 years ago
parent
commit
2e43ac8463
2 changed files with 38 additions and 1 deletions
  1. 3 1
      rafthttp/stream.go
  2. 35 0
      rafthttp/stream_test.go

+ 3 - 1
rafthttp/stream.go

@@ -317,10 +317,12 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
 	}
 	}
 }
 }
 
 
+// updateMsgAppTerm updates the term for MsgApp stream, and closes
+// the existing MsgApp stream if term is updated.
 func (cr *streamReader) updateMsgAppTerm(term uint64) {
 func (cr *streamReader) updateMsgAppTerm(term uint64) {
 	cr.mu.Lock()
 	cr.mu.Lock()
 	defer cr.mu.Unlock()
 	defer cr.mu.Unlock()
-	if cr.msgAppTerm == term {
+	if cr.msgAppTerm >= term {
 		return
 		return
 	}
 	}
 	cr.msgAppTerm = term
 	cr.msgAppTerm = term

+ 35 - 0
rafthttp/stream_test.go

@@ -153,6 +153,41 @@ func TestStreamReaderDialResult(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestStreamReaderUpdateMsgAppTerm(t *testing.T) {
+	term := uint64(2)
+	tests := []struct {
+		term   uint64
+		typ    streamType
+		wterm  uint64
+		wclose bool
+	}{
+		// lower term
+		{1, streamTypeMsgApp, 2, false},
+		// unchanged term
+		{2, streamTypeMsgApp, 2, false},
+		// higher term
+		{3, streamTypeMessage, 3, false},
+		{3, streamTypeMsgAppV2, 3, false},
+		// higher term, reset closer
+		{3, streamTypeMsgApp, 3, true},
+	}
+	for i, tt := range tests {
+		closer := &fakeWriteFlushCloser{}
+		cr := &streamReader{
+			msgAppTerm: term,
+			t:          tt.typ,
+			closer:     closer,
+		}
+		cr.updateMsgAppTerm(tt.term)
+		if cr.msgAppTerm != tt.wterm {
+			t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm)
+		}
+		if closer.closed != tt.wclose {
+			t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose)
+		}
+	}
+}
+
 // TestStream tests that streamReader and streamWriter can build stream to
 // TestStream tests that streamReader and streamWriter can build stream to
 // send messages between each other.
 // send messages between each other.
 func TestStream(t *testing.T) {
 func TestStream(t *testing.T) {