Browse Source

Merge pull request #2484 from yichengq/336

rafthttp: drop messages in channel when disconnection
Yicheng Qin 10 years ago
parent
commit
32105e6ed0
4 changed files with 47 additions and 33 deletions
  1. 2 2
      rafthttp/functional_test.go
  2. 10 12
      rafthttp/peer.go
  3. 12 5
      rafthttp/stream.go
  4. 23 14
      rafthttp/stream_test.go

+ 2 - 2
rafthttp/functional_test.go

@@ -122,10 +122,10 @@ func newServerStats() *stats.ServerStats {
 func waitStreamWorking(p *peer) bool {
 func waitStreamWorking(p *peer) bool {
 	for i := 0; i < 1000; i++ {
 	for i := 0; i < 1000; i++ {
 		time.Sleep(time.Millisecond)
 		time.Sleep(time.Millisecond)
-		if !p.msgAppWriter.isWorking() {
+		if _, ok := p.msgAppWriter.writec(); !ok {
 			continue
 			continue
 		}
 		}
-		if !p.writer.isWorking() {
+		if _, ok := p.writer.writec(); !ok {
 			continue
 			continue
 		}
 		}
 		return true
 		return true

+ 10 - 12
rafthttp/peer.go

@@ -115,8 +115,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 	p := &peer{
 	p := &peer{
 		id:           to,
 		id:           to,
 		r:            r,
 		r:            r,
-		msgAppWriter: startStreamWriter(fs, r),
-		writer:       startStreamWriter(fs, r),
+		msgAppWriter: startStreamWriter(to, fs, r),
+		writer:       startStreamWriter(to, fs, r),
 		pipeline:     newPipeline(tr, picker, to, cid, fs, r, errorc),
 		pipeline:     newPipeline(tr, picker, to, cid, fs, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
 		recvc:        make(chan raftpb.Message, recvBufSize),
@@ -244,20 +244,18 @@ func (p *peer) Stop() {
 
 
 // pick picks a chan for sending the given message. The picked chan and the picked chan
 // pick picks a chan for sending the given message. The picked chan and the picked chan
 // string name are returned.
 // string name are returned.
-func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) {
-	switch {
+func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
+	var ok bool
 	// Considering MsgSnap may have a big size, e.g., 1G, and will block
 	// Considering MsgSnap may have a big size, e.g., 1G, and will block
 	// stream for a long time, only use one of the N pipelines to send MsgSnap.
 	// stream for a long time, only use one of the N pipelines to send MsgSnap.
-	case isMsgSnap(m):
-		return p.pipeline.msgc, pipelineMsg
-	case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
-		return p.msgAppWriter.msgc, streamApp
-	case p.writer.isWorking():
-		return p.writer.msgc, streamMsg
-	default:
+	if isMsgSnap(m) {
 		return p.pipeline.msgc, pipelineMsg
 		return p.pipeline.msgc, pipelineMsg
+	} else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) {
+		return writec, streamApp
+	} else if writec, ok = p.writer.writec(); ok {
+		return writec, streamMsg
 	}
 	}
-	return
+	return p.pipeline.msgc, pipelineMsg
 }
 }
 
 
 func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
 func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

+ 12 - 5
rafthttp/stream.go

@@ -61,6 +61,7 @@ type outgoingConn struct {
 // streamWriter is a long-running go-routine that writes messages into the
 // streamWriter is a long-running go-routine that writes messages into the
 // attached outgoingConn.
 // attached outgoingConn.
 type streamWriter struct {
 type streamWriter struct {
+	id types.ID
 	fs *stats.FollowerStats
 	fs *stats.FollowerStats
 	r  Raft
 	r  Raft
 
 
@@ -74,8 +75,9 @@ type streamWriter struct {
 	done  chan struct{}
 	done  chan struct{}
 }
 }
 
 
-func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter {
+func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
 	w := &streamWriter{
+		id:    id,
 		fs:    fs,
 		fs:    fs,
 		r:     r,
 		r:     r,
 		msgc:  make(chan raftpb.Message, streamBufSize),
 		msgc:  make(chan raftpb.Message, streamBufSize),
@@ -163,18 +165,23 @@ func (cw *streamWriter) run() {
 	}
 	}
 }
 }
 
 
-func (cw *streamWriter) isWorking() bool {
+func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
 	cw.mu.Lock()
 	cw.mu.Lock()
 	defer cw.mu.Unlock()
 	defer cw.mu.Unlock()
-	return cw.working
+	return cw.msgc, cw.working
 }
 }
 
 
 func (cw *streamWriter) resetCloser() {
 func (cw *streamWriter) resetCloser() {
 	cw.mu.Lock()
 	cw.mu.Lock()
 	defer cw.mu.Unlock()
 	defer cw.mu.Unlock()
-	if cw.working {
-		cw.closer.Close()
+	if !cw.working {
+		return
+	}
+	cw.closer.Close()
+	if len(cw.msgc) > 0 {
+		cw.r.ReportUnreachable(uint64(cw.id))
 	}
 	}
+	cw.msgc = make(chan raftpb.Message, streamBufSize)
 	cw.working = false
 	cw.working = false
 }
 }
 
 

+ 23 - 14
rafthttp/stream_test.go

@@ -18,10 +18,10 @@ import (
 // to streamWriter. After that, streamWriter can use it to send messages
 // to streamWriter. After that, streamWriter can use it to send messages
 // continuously, and closes it when stopped.
 // continuously, and closes it when stopped.
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
 	// the expected initial state of streamWrite is not working
 	// the expected initial state of streamWrite is not working
-	if g := sw.isWorking(); g != false {
-		t.Errorf("initial working status = %v, want false", g)
+	if _, ok := sw.writec(); ok != false {
+		t.Errorf("initial working status = %v, want false", ok)
 	}
 	}
 
 
 	// repeatitive tests to ensure it can use latest connection
 	// repeatitive tests to ensure it can use latest connection
@@ -36,15 +36,15 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 			t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed)
 			t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed)
 		}
 		}
 		// starts working
 		// starts working
-		if g := sw.isWorking(); g != true {
-			t.Errorf("#%d: working status = %v, want true", i, g)
+		if _, ok := sw.writec(); ok != true {
+			t.Errorf("#%d: working status = %v, want true", i, ok)
 		}
 		}
 
 
 		sw.msgc <- raftpb.Message{}
 		sw.msgc <- raftpb.Message{}
 		testutil.ForceGosched()
 		testutil.ForceGosched()
 		// still working
 		// still working
-		if g := sw.isWorking(); g != true {
-			t.Errorf("#%d: working status = %v, want true", i, g)
+		if _, ok := sw.writec(); ok != true {
+			t.Errorf("#%d: working status = %v, want true", i, ok)
 		}
 		}
 		if wfc.written == 0 {
 		if wfc.written == 0 {
 			t.Errorf("#%d: failed to write to the underlying connection", i)
 			t.Errorf("#%d: failed to write to the underlying connection", i)
@@ -53,8 +53,8 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 
 
 	sw.stop()
 	sw.stop()
 	// no longer in working status now
 	// no longer in working status now
-	if g := sw.isWorking(); g != false {
-		t.Errorf("working status after stop = %v, want false", g)
+	if _, ok := sw.writec(); ok != false {
+		t.Errorf("working status after stop = %v, want false", ok)
 	}
 	}
 	if wfc.closed != true {
 	if wfc.closed != true {
 		t.Errorf("failed to close the underlying connection")
 		t.Errorf("failed to close the underlying connection")
@@ -64,7 +64,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 // outgoingConn will close the outgoingConn and fall back to non-working status.
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
-	sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
+	sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
 	defer sw.stop()
 	defer sw.stop()
 	wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
 	wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@@ -72,8 +72,8 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
 	sw.msgc <- raftpb.Message{}
 	sw.msgc <- raftpb.Message{}
 	testutil.ForceGosched()
 	testutil.ForceGosched()
 	// no longer working
 	// no longer working
-	if g := sw.isWorking(); g != false {
-		t.Errorf("working = %v, want false", g)
+	if _, ok := sw.writec(); ok != false {
+		t.Errorf("working = %v, want false", ok)
 	}
 	}
 	if wfc.closed != true {
 	if wfc.closed != true {
 		t.Errorf("failed to close the underlying connection")
 		t.Errorf("failed to close the underlying connection")
@@ -197,7 +197,7 @@ func TestStream(t *testing.T) {
 		srv := httptest.NewServer(h)
 		srv := httptest.NewServer(h)
 		defer srv.Close()
 		defer srv.Close()
 
 
-		sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
+		sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{})
 		defer sw.stop()
 		defer sw.stop()
 		h.sw = sw
 		h.sw = sw
 
 
@@ -207,8 +207,17 @@ func TestStream(t *testing.T) {
 		if tt.t == streamTypeMsgApp {
 		if tt.t == streamTypeMsgApp {
 			sr.updateMsgAppTerm(tt.term)
 			sr.updateMsgAppTerm(tt.term)
 		}
 		}
+		// wait for stream to work
+		var writec chan<- raftpb.Message
+		for {
+			var ok bool
+			if writec, ok = sw.writec(); ok {
+				break
+			}
+			time.Sleep(time.Millisecond)
+		}
 
 
-		sw.msgc <- tt.m
+		writec <- tt.m
 		var m raftpb.Message
 		var m raftpb.Message
 		select {
 		select {
 		case m = <-tt.wc:
 		case m = <-tt.wc: