Browse Source

rafthttp: fix TestStopBlockedPipeline

Refactor the fake cancel implementation.

The old one may cancel other in-flight message in random, which leaves
the original target message blocked forever.
Yicheng Qin 10 years ago
parent
commit
c21cc5b39b
1 changed files with 20 additions and 15 deletions
  1. 20 15
      rafthttp/pipeline_test.go

+ 20 - 15
rafthttp/pipeline_test.go

@@ -212,34 +212,39 @@ func TestStopBlockedPipeline(t *testing.T) {
 }
 
 type roundTripperBlocker struct {
-	c         chan error
-	mu        sync.Mutex
-	unblocked bool
+	unblockc chan struct{}
+	mu       sync.Mutex
+	cancel   map[*http.Request]chan struct{}
 }
 
 func newRoundTripperBlocker() *roundTripperBlocker {
-	return &roundTripperBlocker{c: make(chan error)}
+	return &roundTripperBlocker{
+		unblockc: make(chan struct{}),
+		cancel:   make(map[*http.Request]chan struct{}),
+	}
 }
 func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
-	err := <-t.c
-	if err != nil {
-		return nil, err
+	c := make(chan struct{}, 1)
+	t.mu.Lock()
+	t.cancel[req] = c
+	t.mu.Unlock()
+	select {
+	case <-t.unblockc:
+		return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
+	case <-c:
+		return nil, errors.New("request canceled")
 	}
-	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
 }
 func (t *roundTripperBlocker) unblock() {
-	t.mu.Lock()
-	t.unblocked = true
-	t.mu.Unlock()
-	close(t.c)
+	close(t.unblockc)
 }
 func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
-	if t.unblocked {
-		return
+	if c, ok := t.cancel[req]; ok {
+		c <- struct{}{}
+		delete(t.cancel, req)
 	}
-	t.c <- errors.New("request canceled")
 }
 
 type respRoundTripper struct {