Browse Source

Merge pull request #3362 from yichengq/rafthttp-cancel

rafthttp: always cancel in-flight request when stop streamReader
Yicheng Qin 10 years ago
parent
commit
2d06f6b371

+ 20 - 0
client/cancelreq.go

@@ -0,0 +1,20 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq.go
+
+// +build go1.5
+
+package client
+
+import "net/http"
+
+func requestCanceler(tr CancelableTransport, req *http.Request) func() {
+	ch := make(chan struct{})
+	req.Cancel = ch
+
+	return func() {
+		close(ch)
+	}
+}

+ 17 - 0
client/cancelreq_go14.go

@@ -0,0 +1,17 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq_go14.go
+
+// +build !go1.5
+
+package client
+
+import "net/http"
+
+func requestCanceler(tr CancelableTransport, req *http.Request) func() {
+	return func() {
+		tr.CancelRequest(req)
+	}
+}

+ 3 - 1
client/client.go

@@ -384,6 +384,8 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
 	}
 	defer hcancel()
 
+	reqcancel := requestCanceler(c.transport, req)
+
 	rtchan := make(chan roundTripResponse, 1)
 	go func() {
 		resp, err := c.transport.RoundTrip(req)
@@ -399,7 +401,7 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
 		resp, err = rtresp.resp, rtresp.err
 	case <-hctx.Done():
 		// cancel and wait for request to actually exit before continuing
-		c.transport.CancelRequest(req)
+		reqcancel()
 		rtresp := <-rtchan
 		resp = rtresp.resp
 		switch {

+ 0 - 19
client/client_test.go

@@ -109,25 +109,6 @@ func newFakeTransport() *fakeTransport {
 	}
 }
 
-func (t *fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {
-	select {
-	case resp := <-t.respchan:
-		return resp, nil
-	case err := <-t.errchan:
-		return nil, err
-	case <-t.startCancel:
-		select {
-		// this simulates that the request is finished before cancel effects
-		case resp := <-t.respchan:
-			return resp, nil
-		// wait on finishCancel to simulate taking some amount of
-		// time while calling CancelRequest
-		case <-t.finishCancel:
-			return nil, errors.New("cancelled")
-		}
-	}
-}
-
 func (t *fakeTransport) CancelRequest(*http.Request) {
 	t.startCancel <- struct{}{}
 }

+ 41 - 0
client/fake_transport_go14_test.go

@@ -0,0 +1,41 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !go1.5
+
+package client
+
+import (
+	"errors"
+	"net/http"
+)
+
+func (t *fakeTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	select {
+	case resp := <-t.respchan:
+		return resp, nil
+	case err := <-t.errchan:
+		return nil, err
+	case <-t.startCancel:
+		select {
+		// this simulates that the request is finished before cancel effects
+		case resp := <-t.respchan:
+			return resp, nil
+		// wait on finishCancel to simulate taking some amount of
+		// time while calling CancelRequest
+		case <-t.finishCancel:
+			return nil, errors.New("cancelled")
+		}
+	}
+}

+ 42 - 0
client/fake_transport_test.go

@@ -0,0 +1,42 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build go1.5
+
+package client
+
+import (
+	"errors"
+	"net/http"
+)
+
+func (t *fakeTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+	select {
+	case resp := <-t.respchan:
+		return resp, nil
+	case err := <-t.errchan:
+		return nil, err
+	case <-t.startCancel:
+	case <-req.Cancel:
+	}
+	select {
+	// this simulates that the request is finished before cancel effects
+	case resp := <-t.respchan:
+		return resp, nil
+	// wait on finishCancel to simulate taking some amount of
+	// time while calling CancelRequest
+	case <-t.finishCancel:
+		return nil, errors.New("cancelled")
+	}
+}

+ 20 - 0
pkg/httputil/cancelreq.go

@@ -0,0 +1,20 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq.go
+
+// +build go1.5
+
+package httputil
+
+import "net/http"
+
+func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
+	ch := make(chan struct{})
+	req.Cancel = ch
+
+	return func() {
+		close(ch)
+	}
+}

+ 25 - 0
pkg/httputil/cancelreq_go14.go

@@ -0,0 +1,25 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq_go14.go
+
+// +build !go1.5
+
+package httputil
+
+import "net/http"
+
+type requestCanceler interface {
+	CancelRequest(req *http.Request)
+}
+
+func RequestCanceler(rt http.RoundTripper, req *http.Request) func() {
+	c, ok := rt.(requestCanceler)
+	if !ok {
+		return func() {}
+	}
+	return func() {
+		c.CancelRequest(req)
+	}
+}

+ 5 - 6
proxy/reverse.go

@@ -26,8 +26,10 @@ import (
 	"strings"
 	"sync/atomic"
 
-	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	"time"
+
+	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
+	"github.com/coreos/etcd/pkg/httputil"
 )
 
 // Hop-by-hop headers. These are removed when sent to the backend.
@@ -98,17 +100,14 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
 	var requestClosed int32
 	completeCh := make(chan bool, 1)
 	closeNotifier, ok := rw.(http.CloseNotifier)
+	cancel := httputil.RequestCanceler(p.transport, proxyreq)
 	if ok {
 		go func() {
 			select {
 			case <-closeNotifier.CloseNotify():
 				atomic.StoreInt32(&requestClosed, 1)
 				log.Printf("proxy: client %v closed request prematurely", clientreq.RemoteAddr)
-
-				tp, ok := p.transport.(*http.Transport)
-				if ok {
-					tp.CancelRequest(proxyreq)
-				}
+				cancel()
 			case <-completeCh:
 			}
 		}()

+ 35 - 0
rafthttp/fake_roundtripper_go14_test.go

@@ -0,0 +1,35 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !go1.5
+
+package rafthttp
+
+import (
+	"errors"
+	"net/http"
+)
+
+func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
+	c := make(chan struct{}, 1)
+	t.mu.Lock()
+	t.cancel[req] = c
+	t.mu.Unlock()
+	select {
+	case <-t.unblockc:
+		return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
+	case <-c:
+		return nil, errors.New("request canceled")
+	}
+}

+ 37 - 0
rafthttp/fake_roundtripper_test.go

@@ -0,0 +1,37 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build go1.5
+
+package rafthttp
+
+import (
+	"errors"
+	"net/http"
+)
+
+func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
+	c := make(chan struct{}, 1)
+	t.mu.Lock()
+	t.cancel[req] = c
+	t.mu.Unlock()
+	select {
+	case <-t.unblockc:
+		return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
+	case <-req.Cancel:
+		return nil, errors.New("request canceled")
+	case <-c:
+		return nil, errors.New("request canceled")
+	}
+}

+ 3 - 7
rafthttp/pipeline.go

@@ -25,6 +25,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
@@ -43,10 +44,6 @@ const (
 
 var errStopped = errors.New("stopped")
 
-type canceler interface {
-	CancelRequest(*http.Request)
-}
-
 type pipeline struct {
 	from, to types.ID
 	cid      types.ID
@@ -149,15 +146,14 @@ func (p *pipeline) post(data []byte) (err error) {
 		}
 	}()
 	done := make(chan struct{}, 1)
+	cancel := httputil.RequestCanceler(p.tr, req)
 	go func() {
 		select {
 		case <-done:
 		case <-p.stopc:
 			waitSchedule()
 			stopped = true
-			if cancel, ok := p.tr.(canceler); ok {
-				cancel.CancelRequest(req)
-			}
+			cancel()
 		}
 	}()
 

+ 0 - 12
rafthttp/pipeline_test.go

@@ -223,18 +223,6 @@ func newRoundTripperBlocker() *roundTripperBlocker {
 		cancel:   make(map[*http.Request]chan struct{}),
 	}
 }
-func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
-	c := make(chan struct{}, 1)
-	t.mu.Lock()
-	t.cancel[req] = c
-	t.mu.Unlock()
-	select {
-	case <-t.unblockc:
-		return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
-	case <-c:
-		return nil, errors.New("request canceled")
-	}
-}
 func (t *roundTripperBlocker) unblock() {
 	close(t.unblockc)
 }

+ 12 - 10
rafthttp/stream.go

@@ -28,6 +28,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/httputil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/version"
@@ -261,7 +262,7 @@ type streamReader struct {
 
 	mu         sync.Mutex
 	msgAppTerm uint64
-	req        *http.Request
+	cancel     func()
 	closer     io.Closer
 	stopc      chan struct{}
 	done       chan struct{}
@@ -385,11 +386,12 @@ func (cr *streamReader) updateMsgAppTerm(term uint64) {
 	}
 }
 
-// TODO: always cancel in-flight dial and decode
 func (cr *streamReader) stop() {
 	close(cr.stopc)
 	cr.mu.Lock()
-	cr.cancelRequest()
+	if cr.cancel != nil {
+		cr.cancel()
+	}
 	cr.close()
 	cr.mu.Unlock()
 	<-cr.done
@@ -425,7 +427,13 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	}
 
 	cr.mu.Lock()
-	cr.req = req
+	select {
+	case <-cr.stopc:
+		cr.mu.Unlock()
+		return nil, fmt.Errorf("stream reader is stopped")
+	default:
+	}
+	cr.cancel = httputil.RequestCanceler(cr.tr, req)
 	cr.mu.Unlock()
 
 	resp, err := cr.tr.RoundTrip(req)
@@ -480,12 +488,6 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	}
 }
 
-func (cr *streamReader) cancelRequest() {
-	if canceller, ok := cr.tr.(*http.Transport); ok {
-		canceller.CancelRequest(cr.req)
-	}
-}
-
 func (cr *streamReader) close() {
 	if cr.closer != nil {
 		cr.closer.Close()