Browse Source

rafthttp: always cancel in-flight request when stop streamReader

This problem is totally fixed at 1.5.

go1.5 adds a Request.Cancel channel, which allows for "race free"
cancellation
(https://github.com/golang/go/commit/8b4278ffb75e79c277bfa90c5e473bfad9f7c1bd).
Our implementation relies on it to always cancel in-flight request.
Yicheng Qin 10 years ago
parent
commit
fc95ec0cc6
3 changed files with 51 additions and 10 deletions
  1. 20 0
      pkg/httputil/cancelreq.go
  2. 25 0
      pkg/httputil/cancelreq_go14.go
  3. 6 10
      rafthttp/stream.go

+ 20 - 0
pkg/httputil/cancelreq.go

@@ -0,0 +1,20 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq.go
+
+// +build go1.5
+
+package httputil
+
+import "net/http"
+
+func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
+	ch := make(chan struct{})
+	req.Cancel = ch
+
+	return func() {
+		close(ch)
+	}
+}

+ 25 - 0
pkg/httputil/cancelreq_go14.go

@@ -0,0 +1,25 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq_go14.go
+
+// +build !go1.5
+
+package httputil
+
+import "net/http"
+
+type requestCanceler interface {
+	CancelRequest(req *http.Request)
+}
+
+func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
+	c, ok := rt.(requestCanceler)
+	if !ok {
+		return func() {}
+	}
+	return func() {
+		c.CancelRequest(req)
+	}
+}

+ 6 - 10
rafthttp/stream.go

@@ -28,6 +28,7 @@ import (
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/version"
@@ -261,7 +262,7 @@ type streamReader struct {
 
 
 	mu         sync.Mutex
 	mu         sync.Mutex
 	msgAppTerm uint64
 	msgAppTerm uint64
-	req        *http.Request
+	cancel     func()
 	closer     io.Closer
 	closer     io.Closer
 	stopc      chan struct{}
 	stopc      chan struct{}
 	done       chan struct{}
 	done       chan struct{}
@@ -385,11 +386,12 @@ func (cr *streamReader) updateMsgAppTerm(term uint64) {
 	}
 	}
 }
 }
 
 
-// TODO: always cancel in-flight dial and decode
 func (cr *streamReader) stop() {
 func (cr *streamReader) stop() {
 	close(cr.stopc)
 	close(cr.stopc)
 	cr.mu.Lock()
 	cr.mu.Lock()
-	cr.cancelRequest()
+	if cr.cancel != nil {
+		cr.cancel()
+	}
 	cr.close()
 	cr.close()
 	cr.mu.Unlock()
 	cr.mu.Unlock()
 	<-cr.done
 	<-cr.done
@@ -425,7 +427,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	}
 	}
 
 
 	cr.mu.Lock()
 	cr.mu.Lock()
-	cr.req = req
+	cr.cancel = httputil.RequestCanceler(cr.tr, req)
 	cr.mu.Unlock()
 	cr.mu.Unlock()
 
 
 	resp, err := cr.tr.RoundTrip(req)
 	resp, err := cr.tr.RoundTrip(req)
@@ -480,12 +482,6 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	}
 	}
 }
 }
 
 
-func (cr *streamReader) cancelRequest() {
-	if canceller, ok := cr.tr.(*http.Transport); ok {
-		canceller.CancelRequest(cr.req)
-	}
-}
-
 func (cr *streamReader) close() {
 func (cr *streamReader) close() {
 	if cr.closer != nil {
 	if cr.closer != nil {
 		cr.closer.Close()
 		cr.closer.Close()