Browse Source

*: etcdserver.sender -> rafthttp.Sender

Yicheng Qin 11 years ago
parent
commit
84fbf7aab5
4 changed files with 390 additions and 303 deletions
  1. 15 116
      etcdserver/sender.go
  2. 1 187
      etcdserver/sender_test.go
  3. 148 0
      rafthttp/sender.go
  4. 226 0
      rafthttp/sender_test.go

+ 15 - 116
etcdserver/sender.go

@@ -17,24 +17,19 @@
 package etcdserver
 
 import (
-	"bytes"
-	"fmt"
 	"log"
 	"net/http"
 	"net/url"
 	"path"
-	"sync"
-	"time"
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 )
 
 const (
-	raftPrefix    = "/raft"
-	connPerSender = 4
-	senderBufSize = connPerSender * 4
+	raftPrefix = "/raft"
 )
 
 type sendHub struct {
@@ -42,7 +37,7 @@ type sendHub struct {
 	cl         ClusterInfo
 	ss         *stats.ServerStats
 	ls         *stats.LeaderStats
-	senders    map[types.ID]*sender
+	senders    map[types.ID]rafthttp.Sender
 	shouldstop chan struct{}
 }
 
@@ -55,7 +50,7 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *
 		cl:         cl,
 		ss:         ss,
 		ls:         ls,
-		senders:    make(map[types.ID]*sender),
+		senders:    make(map[types.ID]rafthttp.Sender),
 		shouldstop: make(chan struct{}, 1),
 	}
 	for _, m := range cl.Members() {
@@ -86,14 +81,13 @@ func (h *sendHub) Send(msgs []raftpb.Message) {
 			h.ss.SendAppendReq(len(data))
 		}
 
-		// TODO (xiangli): reasonable retry logic
-		s.send(data)
+		s.Send(data)
 	}
 }
 
 func (h *sendHub) Stop() {
 	for _, s := range h.senders {
-		s.stop()
+		s.Stop()
 	}
 }
 
@@ -106,14 +100,19 @@ func (h *sendHub) Add(m *Member) {
 		return
 	}
 	// TODO: considering how to switch between all available peer urls
-	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
+	peerURL := m.PickPeerURL()
+	u, err := url.Parse(peerURL)
+	if err != nil {
+		log.Panicf("unexpect peer url %s", peerURL)
+	}
+	u.Path = path.Join(u.Path, raftPrefix)
 	fs := h.ls.Follower(m.ID.String())
-	s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop)
+	s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop)
 	h.senders[m.ID] = s
 }
 
 func (h *sendHub) Remove(id types.ID) {
-	h.senders[id].stop()
+	h.senders[id].Stop()
 	delete(h.senders, id)
 }
 
@@ -128,105 +127,5 @@ func (h *sendHub) Update(m *Member) {
 		log.Panicf("unexpect peer url %s", peerURL)
 	}
 	u.Path = path.Join(u.Path, raftPrefix)
-	s := h.senders[m.ID]
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.u = u.String()
-}
-
-type sender struct {
-	tr         http.RoundTripper
-	u          string
-	cid        types.ID
-	fs         *stats.FollowerStats
-	q          chan []byte
-	mu         sync.RWMutex
-	wg         sync.WaitGroup
-	shouldstop chan struct{}
-}
-
-func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
-	s := &sender{
-		tr:         tr,
-		u:          u,
-		cid:        cid,
-		fs:         fs,
-		q:          make(chan []byte, senderBufSize),
-		shouldstop: shouldstop,
-	}
-	s.wg.Add(connPerSender)
-	for i := 0; i < connPerSender; i++ {
-		go s.handle()
-	}
-	return s
-}
-
-func (s *sender) send(data []byte) error {
-	select {
-	case s.q <- data:
-		return nil
-	default:
-		log.Printf("sender: reach the maximal serving to %s", s.u)
-		return fmt.Errorf("reach maximal serving")
-	}
-}
-
-func (s *sender) stop() {
-	close(s.q)
-	s.wg.Wait()
-}
-
-func (s *sender) handle() {
-	defer s.wg.Done()
-	for d := range s.q {
-		start := time.Now()
-		err := s.post(d)
-		end := time.Now()
-		if err != nil {
-			s.fs.Fail()
-			log.Printf("sender: %v", err)
-			continue
-		}
-		s.fs.Succ(end.Sub(start))
-	}
-}
-
-// post POSTs a data payload to a url. Returns nil if the POST succeeds,
-// error on any failure.
-func (s *sender) post(data []byte) error {
-	s.mu.RLock()
-	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
-	s.mu.RUnlock()
-	if err != nil {
-		return fmt.Errorf("new request to %s error: %v", s.u, err)
-	}
-	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
-	resp, err := s.tr.RoundTrip(req)
-	if err != nil {
-		return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
-	}
-	resp.Body.Close()
-
-	switch resp.StatusCode {
-	case http.StatusPreconditionFailed:
-		select {
-		case s.shouldstop <- struct{}{}:
-		default:
-		}
-		log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
-		return nil
-	case http.StatusForbidden:
-		select {
-		case s.shouldstop <- struct{}{}:
-		default:
-		}
-		log.Println("etcdserver: this member has been permanently removed from the cluster")
-		log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
-		return nil
-	case http.StatusNoContent:
-		return nil
-	default:
-		return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
-	}
+	h.senders[m.ID].Update(u.String())
 }

+ 1 - 187
etcdserver/sender_test.go

@@ -17,10 +17,7 @@
 package etcdserver
 
 import (
-	"errors"
-	"io/ioutil"
 	"net/http"
-	"sync"
 	"testing"
 	"time"
 
@@ -64,9 +61,6 @@ func TestSendHubAdd(t *testing.T) {
 	if !ok {
 		t.Fatalf("senders[1] is nil, want exists")
 	}
-	if s.u != "http://a/raft" {
-		t.Errorf("url = %s, want %s", s.u, "http://a/raft")
-	}
 
 	h.Add(m)
 	ns := h.senders[types.ID(1)]
@@ -104,7 +98,7 @@ func TestSendHubShouldStop(t *testing.T) {
 		t.Fatalf("received unexpected shouldstop notification")
 	case <-time.After(10 * time.Millisecond):
 	}
-	h.senders[1].send([]byte("somedata"))
+	h.senders[1].Send([]byte("somedata"))
 
 	testutil.ForceGosched()
 	select {
@@ -114,169 +108,6 @@ func TestSendHubShouldStop(t *testing.T) {
 	}
 }
 
-// TestSenderSend tests that send func could post data using roundtripper
-// and increase success count in stats.
-func TestSenderSend(t *testing.T) {
-	tr := &roundTripperRecorder{}
-	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
-
-	if err := s.send([]byte("some data")); err != nil {
-		t.Fatalf("unexpect send error: %v", err)
-	}
-	s.stop()
-
-	if tr.Request() == nil {
-		t.Errorf("sender fails to post the data")
-	}
-	fs.Lock()
-	defer fs.Unlock()
-	if fs.Counts.Success != 1 {
-		t.Errorf("success = %d, want 1", fs.Counts.Success)
-	}
-}
-
-func TestSenderExceedMaximalServing(t *testing.T) {
-	tr := newRoundTripperBlocker()
-	fs := &stats.FollowerStats{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
-
-	// keep the sender busy and make the buffer full
-	// nothing can go out as we block the sender
-	for i := 0; i < connPerSender+senderBufSize; i++ {
-		if err := s.send([]byte("some data")); err != nil {
-			t.Errorf("send err = %v, want nil", err)
-		}
-		// force the sender to grab data
-		testutil.ForceGosched()
-	}
-
-	// try to send a data when we are sure the buffer is full
-	if err := s.send([]byte("some data")); err == nil {
-		t.Errorf("unexpect send success")
-	}
-
-	// unblock the senders and force them to send out the data
-	tr.unblock()
-	testutil.ForceGosched()
-
-	// It could send new data after previous ones succeed
-	if err := s.send([]byte("some data")); err != nil {
-		t.Errorf("send err = %v, want nil", err)
-	}
-	s.stop()
-}
-
-// TestSenderSendFailed tests that when send func meets the post error,
-// it increases fail count in stats.
-func TestSenderSendFailed(t *testing.T) {
-	fs := &stats.FollowerStats{}
-	s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
-
-	if err := s.send([]byte("some data")); err != nil {
-		t.Fatalf("unexpect send error: %v", err)
-	}
-	s.stop()
-
-	fs.Lock()
-	defer fs.Unlock()
-	if fs.Counts.Fail != 1 {
-		t.Errorf("fail = %d, want 1", fs.Counts.Fail)
-	}
-}
-
-func TestSenderPost(t *testing.T) {
-	tr := &roundTripperRecorder{}
-	s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
-	if err := s.post([]byte("some data")); err != nil {
-		t.Fatalf("unexpect post error: %v", err)
-	}
-	s.stop()
-
-	if g := tr.Request().Method; g != "POST" {
-		t.Errorf("method = %s, want %s", g, "POST")
-	}
-	if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
-		t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
-	}
-	if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
-		t.Errorf("content type = %s, want %s", g, "application/protobuf")
-	}
-	if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
-		t.Errorf("cluster id = %s, want %s", g, "1")
-	}
-	b, err := ioutil.ReadAll(tr.Request().Body)
-	if err != nil {
-		t.Fatalf("unexpected ReadAll error: %v", err)
-	}
-	if string(b) != "some data" {
-		t.Errorf("body = %s, want %s", b, "some data")
-	}
-}
-
-func TestSenderPostBad(t *testing.T) {
-	tests := []struct {
-		u    string
-		code int
-		err  error
-	}{
-		// bad url
-		{":bad url", http.StatusNoContent, nil},
-		// RoundTrip returns error
-		{"http://10.0.0.1", 0, errors.New("blah")},
-		// unexpected response status code
-		{"http://10.0.0.1", http.StatusOK, nil},
-		{"http://10.0.0.1", http.StatusCreated, nil},
-	}
-	for i, tt := range tests {
-		shouldstop := make(chan struct{})
-		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
-		err := s.post([]byte("some data"))
-		s.stop()
-
-		if err == nil {
-			t.Errorf("#%d: err = nil, want not nil", i)
-		}
-	}
-}
-
-func TestSenderPostShouldStop(t *testing.T) {
-	tests := []struct {
-		u    string
-		code int
-		err  error
-	}{
-		{"http://10.0.0.1", http.StatusForbidden, nil},
-		{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
-	}
-	for i, tt := range tests {
-		shouldstop := make(chan struct{}, 1)
-		s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
-		s.post([]byte("some data"))
-		s.stop()
-		select {
-		case <-shouldstop:
-		default:
-			t.Fatalf("#%d: cannot receive shouldstop notification", i)
-		}
-	}
-}
-
-type roundTripperBlocker struct {
-	c chan struct{}
-}
-
-func newRoundTripperBlocker() *roundTripperBlocker {
-	return &roundTripperBlocker{c: make(chan struct{})}
-}
-func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
-	<-t.c
-	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
-}
-func (t *roundTripperBlocker) unblock() {
-	close(t.c)
-}
-
 type respRoundTripper struct {
 	code int
 	err  error
@@ -289,23 +120,6 @@ func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)
 	return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
 }
 
-type roundTripperRecorder struct {
-	req *http.Request
-	sync.Mutex
-}
-
-func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
-	t.Lock()
-	defer t.Unlock()
-	t.req = req
-	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
-}
-func (t *roundTripperRecorder) Request() *http.Request {
-	t.Lock()
-	defer t.Unlock()
-	return t.req
-}
-
 type nopReadCloser struct{}
 
 func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }

+ 148 - 0
rafthttp/sender.go

@@ -0,0 +1,148 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+)
+
+const (
+	connPerSender = 4
+	senderBufSize = connPerSender * 4
+)
+
+type Sender interface {
+	Update(u string)
+	// Send sends the data to the remote node. It is always non-blocking.
+	// It may be fail to send data if it returns nil error.
+	Send(data []byte) error
+	// Stop performs any necessary finalization and terminates the Sender
+	// elegantly.
+	Stop()
+}
+
+func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
+	s := &sender{
+		tr:         tr,
+		u:          u,
+		cid:        cid,
+		fs:         fs,
+		q:          make(chan []byte, senderBufSize),
+		shouldstop: shouldstop,
+	}
+	s.wg.Add(connPerSender)
+	for i := 0; i < connPerSender; i++ {
+		go s.handle()
+	}
+	return s
+}
+
+type sender struct {
+	tr         http.RoundTripper
+	u          string
+	cid        types.ID
+	fs         *stats.FollowerStats
+	q          chan []byte
+	mu         sync.RWMutex
+	wg         sync.WaitGroup
+	shouldstop chan struct{}
+}
+
+func (s *sender) Update(u string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.u = u
+}
+
+// TODO (xiangli): reasonable retry logic
+func (s *sender) Send(data []byte) error {
+	select {
+	case s.q <- data:
+		return nil
+	default:
+		log.Printf("sender: reach the maximal serving to %s", s.u)
+		return fmt.Errorf("reach maximal serving")
+	}
+}
+
+func (s *sender) Stop() {
+	close(s.q)
+	s.wg.Wait()
+}
+
+func (s *sender) handle() {
+	defer s.wg.Done()
+	for d := range s.q {
+		start := time.Now()
+		err := s.post(d)
+		end := time.Now()
+		if err != nil {
+			s.fs.Fail()
+			log.Printf("sender: %v", err)
+			continue
+		}
+		s.fs.Succ(end.Sub(start))
+	}
+}
+
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (s *sender) post(data []byte) error {
+	s.mu.RLock()
+	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
+	s.mu.RUnlock()
+	if err != nil {
+		return fmt.Errorf("new request to %s error: %v", s.u, err)
+	}
+	req.Header.Set("Content-Type", "application/protobuf")
+	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
+	resp, err := s.tr.RoundTrip(req)
+	if err != nil {
+		return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
+	}
+	resp.Body.Close()
+
+	switch resp.StatusCode {
+	case http.StatusPreconditionFailed:
+		select {
+		case s.shouldstop <- struct{}{}:
+		default:
+		}
+		log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		return nil
+	case http.StatusForbidden:
+		select {
+		case s.shouldstop <- struct{}{}:
+		default:
+		}
+		log.Println("etcdserver: this member has been permanently removed from the cluster")
+		log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
+		return nil
+	case http.StatusNoContent:
+		return nil
+	default:
+		return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
+	}
+}

+ 226 - 0
rafthttp/sender_test.go

@@ -0,0 +1,226 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package rafthttp
+
+import (
+	"errors"
+	"io/ioutil"
+	"net/http"
+	"sync"
+	"testing"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/pkg/types"
+)
+
+// TestSenderSend tests that send func could post data using roundtripper
+// and increase success count in stats.
+func TestSenderSend(t *testing.T) {
+	tr := &roundTripperRecorder{}
+	fs := &stats.FollowerStats{}
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+
+	if err := s.Send([]byte("some data")); err != nil {
+		t.Fatalf("unexpect send error: %v", err)
+	}
+	s.Stop()
+
+	if tr.Request() == nil {
+		t.Errorf("sender fails to post the data")
+	}
+	fs.Lock()
+	defer fs.Unlock()
+	if fs.Counts.Success != 1 {
+		t.Errorf("success = %d, want 1", fs.Counts.Success)
+	}
+}
+
+func TestSenderExceedMaximalServing(t *testing.T) {
+	tr := newRoundTripperBlocker()
+	fs := &stats.FollowerStats{}
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
+
+	// keep the sender busy and make the buffer full
+	// nothing can go out as we block the sender
+	for i := 0; i < connPerSender+senderBufSize; i++ {
+		if err := s.Send([]byte("some data")); err != nil {
+			t.Errorf("send err = %v, want nil", err)
+		}
+		// force the sender to grab data
+		testutil.ForceGosched()
+	}
+
+	// try to send a data when we are sure the buffer is full
+	if err := s.Send([]byte("some data")); err == nil {
+		t.Errorf("unexpect send success")
+	}
+
+	// unblock the senders and force them to send out the data
+	tr.unblock()
+	testutil.ForceGosched()
+
+	// It could send new data after previous ones succeed
+	if err := s.Send([]byte("some data")); err != nil {
+		t.Errorf("send err = %v, want nil", err)
+	}
+	s.Stop()
+}
+
+// TestSenderSendFailed tests that when send func meets the post error,
+// it increases fail count in stats.
+func TestSenderSendFailed(t *testing.T) {
+	fs := &stats.FollowerStats{}
+	s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
+
+	if err := s.Send([]byte("some data")); err != nil {
+		t.Fatalf("unexpect Send error: %v", err)
+	}
+	s.Stop()
+
+	fs.Lock()
+	defer fs.Unlock()
+	if fs.Counts.Fail != 1 {
+		t.Errorf("fail = %d, want 1", fs.Counts.Fail)
+	}
+}
+
+func TestSenderPost(t *testing.T) {
+	tr := &roundTripperRecorder{}
+	s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
+	if err := s.post([]byte("some data")); err != nil {
+		t.Fatalf("unexpect post error: %v", err)
+	}
+	s.Stop()
+
+	if g := tr.Request().Method; g != "POST" {
+		t.Errorf("method = %s, want %s", g, "POST")
+	}
+	if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
+		t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
+	}
+	if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
+		t.Errorf("content type = %s, want %s", g, "application/protobuf")
+	}
+	if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
+		t.Errorf("cluster id = %s, want %s", g, "1")
+	}
+	b, err := ioutil.ReadAll(tr.Request().Body)
+	if err != nil {
+		t.Fatalf("unexpected ReadAll error: %v", err)
+	}
+	if string(b) != "some data" {
+		t.Errorf("body = %s, want %s", b, "some data")
+	}
+}
+
+func TestSenderPostBad(t *testing.T) {
+	tests := []struct {
+		u    string
+		code int
+		err  error
+	}{
+		// bad url
+		{":bad url", http.StatusNoContent, nil},
+		// RoundTrip returns error
+		{"http://10.0.0.1", 0, errors.New("blah")},
+		// unexpected response status code
+		{"http://10.0.0.1", http.StatusOK, nil},
+		{"http://10.0.0.1", http.StatusCreated, nil},
+	}
+	for i, tt := range tests {
+		shouldstop := make(chan struct{})
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		err := s.post([]byte("some data"))
+		s.Stop()
+
+		if err == nil {
+			t.Errorf("#%d: err = nil, want not nil", i)
+		}
+	}
+}
+
+func TestSenderPostShouldStop(t *testing.T) {
+	tests := []struct {
+		u    string
+		code int
+		err  error
+	}{
+		{"http://10.0.0.1", http.StatusForbidden, nil},
+		{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
+	}
+	for i, tt := range tests {
+		shouldstop := make(chan struct{}, 1)
+		s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
+		s.post([]byte("some data"))
+		s.Stop()
+		select {
+		case <-shouldstop:
+		default:
+			t.Fatalf("#%d: cannot receive shouldstop notification", i)
+		}
+	}
+}
+
+type roundTripperBlocker struct {
+	c chan struct{}
+}
+
+func newRoundTripperBlocker() *roundTripperBlocker {
+	return &roundTripperBlocker{c: make(chan struct{})}
+}
+func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
+	<-t.c
+	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
+}
+func (t *roundTripperBlocker) unblock() {
+	close(t.c)
+}
+
+type respRoundTripper struct {
+	code int
+	err  error
+}
+
+func newRespRoundTripper(code int, err error) *respRoundTripper {
+	return &respRoundTripper{code: code, err: err}
+}
+func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+	return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
+}
+
+type roundTripperRecorder struct {
+	req *http.Request
+	sync.Mutex
+}
+
+func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
+	t.Lock()
+	defer t.Unlock()
+	t.req = req
+	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
+}
+func (t *roundTripperRecorder) Request() *http.Request {
+	t.Lock()
+	defer t.Unlock()
+	return t.req
+}
+
+type nopReadCloser struct{}
+
+func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
+func (n *nopReadCloser) Close() error               { return nil }