فهرست منبع

pkg/proxy: initial commit (for functional tests)

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 سال پیش
والد
کامیت
748d2204a2

+ 16 - 0
pkg/proxy/doc.go

@@ -0,0 +1,16 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package proxy implements proxy servers for network fault testing.
+package proxy

+ 19 - 0
pkg/proxy/fixtures/ca-csr.json

@@ -0,0 +1,19 @@
+{
+  "key": {
+    "algo": "rsa",
+    "size": 2048
+  },
+  "names": [
+    {
+      "O": "etcd",
+      "OU": "etcd Security",
+      "L": "San Francisco",
+      "ST": "California",
+      "C": "USA"
+    }
+  ],
+  "CN": "ca",
+  "ca": {
+    "expiry": "87600h"
+  }
+}

+ 22 - 0
pkg/proxy/fixtures/ca.crt

@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDsTCCApmgAwIBAgIUZzOo4zcHY/nEXY1PD8A7povXlWUwDQYJKoZIhvcNAQEL
+BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH
+Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl
+Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx
+MDBaMG8xDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE
+BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT
+ZWN1cml0eTELMAkGA1UEAxMCY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQDD4Ys48LDWGyojj3Rcr6fnESY+UycaaGoTXADWLPmm+sQR3KcsJxF4054S
+d2G+NBfJHZvTHhVqOeqZxNtoqgje4paY2A5TbWBdV+xoGfbakwwngiX1yeF1I54k
+KH19zb8rBKAm7xixO60hE2CIYzMuw9lDkwoHpI6/PJdy7jwtytbo2Oac512JiO9Y
+dHp9dr3mrCzoKEBRtL1asRKfzp6gBC5rIw5T4jrq37feerV4pDEJX7fvexxVocVm
+tT4bmMq3Ap6OFFAzmE/ITI8pXvFaOd9lyebNXQmrreKJLUfEIZa6JulLCYxfkJ8z
++CcNLyn6ZXNMaIZ8G9Hm6VRdRi8/AgMBAAGjRTBDMA4GA1UdDwEB/wQEAwIBBjAS
+BgNVHRMBAf8ECDAGAQH/AgECMB0GA1UdDgQWBBRDLNYEX8XI7nM53k1rUR+mpTjQ
+NTANBgkqhkiG9w0BAQsFAAOCAQEACDe3Fa1KE/rvVtyCLW/IBfKV01NShFTsb6x8
+GrPEQ6NJLZQ2MzdyJgAF2a/nZ9KVgrhGXoyoZBCKP9Dd/JDzSSZcBztfNK8dRv2A
+XHBBF6tZ19I+XY9c7/CfhJ2CEYJpeN9r3GKSqV+njkmg8n/On2BTlFsij88plK8H
+ORyemc1nQI+ARPSu2r3rJbYa4yI2U6w4L4BTCVImg3bX50GImmXGlwvnJMFik1FX
++0hdfetRxxMZ1pm2Uy6099KkULnSKabZGwRiBUHQJYh0EeuAOQ4a6MG5DRkURWNs
+dInjPOLY9/7S5DQKwz/NtqXA8EEymZosHxpiRp+zzKB4XaV9Ig==
+-----END CERTIFICATE-----

+ 13 - 0
pkg/proxy/fixtures/gencert.json

@@ -0,0 +1,13 @@
+{
+  "signing": {
+    "default": {
+        "usages": [
+          "signing",
+          "key encipherment",
+          "server auth",
+          "client auth"
+        ],
+        "expiry": "87600h"
+    }
+  }
+}

+ 26 - 0
pkg/proxy/fixtures/gencerts.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+if ! [[ "$0" =~ "./gencerts.sh" ]]; then
+	echo "must be run from 'fixtures'"
+	exit 255
+fi
+
+if ! which cfssl; then
+	echo "cfssl is not installed"
+	exit 255
+fi
+
+cfssl gencert --initca=true ./ca-csr.json | cfssljson --bare ./ca
+mv ca.pem ca.crt
+openssl x509 -in ca.crt -noout -text
+
+# generate DNS: localhost, IP: 127.0.0.1, CN: example.com certificates
+cfssl gencert \
+    --ca ./ca.crt \
+    --ca-key ./ca-key.pem \
+    --config ./gencert.json \
+    ./server-ca-csr.json | cfssljson --bare ./server
+mv server.pem server.crt
+mv server-key.pem server.key.insecure
+
+rm -f *.csr *.pem *.stderr *.txt

+ 20 - 0
pkg/proxy/fixtures/server-ca-csr.json

@@ -0,0 +1,20 @@
+{
+  "key": {
+    "algo": "rsa",
+    "size": 2048
+  },
+  "names": [
+    {
+      "O": "etcd",
+      "OU": "etcd Security",
+      "L": "San Francisco",
+      "ST": "California",
+      "C": "USA"
+    }
+  ],
+  "CN": "example.com",
+  "hosts": [
+    "127.0.0.1",
+    "localhost"
+  ]
+}

+ 24 - 0
pkg/proxy/fixtures/server.crt

@@ -0,0 +1,24 @@
+-----BEGIN CERTIFICATE-----
+MIIEEjCCAvqgAwIBAgIUIYc+vmysep1pDc2ua/VQEeMFQVAwDQYJKoZIhvcNAQEL
+BQAwbzEMMAoGA1UEBhMDVVNBMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQH
+Ew1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRldGNkMRYwFAYDVQQLEw1ldGNkIFNl
+Y3VyaXR5MQswCQYDVQQDEwJjYTAeFw0xODAxMDIxNjQxMDBaFw0yNzEyMzExNjQx
+MDBaMHgxDDAKBgNVBAYTA1VTQTETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UE
+BxMNU2FuIEZyYW5jaXNjbzENMAsGA1UEChMEZXRjZDEWMBQGA1UECxMNZXRjZCBT
+ZWN1cml0eTEUMBIGA1UEAxMLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUA
+A4IBDwAwggEKAoIBAQDEq7aT2BQZfmJ2xpUm8xWJlN0c3cOLVZRH9mIrEutIHmip
+BYq3ZIq3q52w+T3sMcaJNMGjCteE8Lu+G9YSmtfZMAWnkaM02KOjVMkkQcK7Z4vM
+lOUjlO+dsvhfmw3CPghqSs6M1K2CTqhuEiXdOBofuEMmwKNRgkV/jT92PUs0h8kq
+loc/I3/H+hx/ZJ1i0S0xkZKpaImc0oZ9ZDo07biMrsUIzjwbN69mEs+CtVkah4sy
+k6UyRoU2k21lyRTK0LxNjWc9ylzDNUuf6DwduU7lPZsqTaJrFNAAPpOlI4k2EcjL
+3zD8amKkJGDm+PQz97PbTA381ec4ZAtB8volxCebAgMBAAGjgZwwgZkwDgYDVR0P
+AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMB
+Af8EAjAAMB0GA1UdDgQWBBTTZQnMn5tuUgVE+8c9W0hmbghGoDAfBgNVHSMEGDAW
+gBRDLNYEX8XI7nM53k1rUR+mpTjQNTAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8A
+AAEwDQYJKoZIhvcNAQELBQADggEBAKUQVj0YDuxg4tinlOZhp4ge7tCA+gL7vV+Q
+iDrkWfOlGjDgwYqWMYDXMHWKIW9ea8LzyI/bVEcaHlnBmNOYuS7g47EWNiU7WUA5
+iTkm3CKA5zHFFPcXHW0GQeCQrX9y3SepKS3cP8TAyZFfC/FvV24Kn1oQhJbEe0ZV
+In/vPHssW7jlVe0FGVUn7FutRQgiA1pTAtS6AP4LeZ9O41DTWkPqV4nBgcxlvkgD
+KjEoXXSb5C0LoR5zwAo9zB3RtmqnmvkHAOv3G92YctdS2VbCmd8CNLj9H7gMmQiH
+ThsStVOhb2uo6Ni4PgzUIYKGTd4ZjUXCYxFKck//ajDyCHlL8v4=
+-----END CERTIFICATE-----

+ 27 - 0
pkg/proxy/fixtures/server.key.insecure

@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAxKu2k9gUGX5idsaVJvMViZTdHN3Di1WUR/ZiKxLrSB5oqQWK
+t2SKt6udsPk97DHGiTTBowrXhPC7vhvWEprX2TAFp5GjNNijo1TJJEHCu2eLzJTl
+I5TvnbL4X5sNwj4IakrOjNStgk6obhIl3TgaH7hDJsCjUYJFf40/dj1LNIfJKpaH
+PyN/x/ocf2SdYtEtMZGSqWiJnNKGfWQ6NO24jK7FCM48GzevZhLPgrVZGoeLMpOl
+MkaFNpNtZckUytC8TY1nPcpcwzVLn+g8HblO5T2bKk2iaxTQAD6TpSOJNhHIy98w
+/GpipCRg5vj0M/ez20wN/NXnOGQLQfL6JcQnmwIDAQABAoIBAGTx1eaQk9B6BEP+
+rXOudTGGzO8SDFop9M/y8HQ3Y7hCk2mdxJNY8bJQTcIWS+g9rC+kencbC3/aqCJt
+2zT1cTCy61QU9nYbc/JThGIttqvF/AVnryzSNyL0R3Oa/Dbk7CDSgK3cQ6qMgPru
+Ka0gLJh3VVBAtBMUEGPltdsUntM4sHTh5FAabP0ioBJ1QLG6Aak7LOQikjBEFJoc
+Tea4uRsE7IreP5Mn7UW92nkt1ey5UGzBtNNtpHbVaHmfQojwlwkLtnV35sumbvK6
+6KTMNREZv6xSIMwkYxm1zRE3Cus/1jGIc8MZF0BxgcCR+G37l+BKwL8CSymHPxhH
+dvGxoPECgYEA3STp52CbI/KyVfvjxK2OIex/NV1jKh85wQsLtkaRv3/a/EEg7MV7
+54dEvo5KKOZXfeOd9r9G9h1RffjSD9MhxfPhyGwuOcqa8IE1zNwlY/v7KL7HtDIf
+2mrXWF5Klafh8aXYcaRH0ZSLnl/nXUXYht4/0NRGiXnttUgqs6hvY70CgYEA46tO
+J5QkgF3YVY0gx10wRCAnnKLkAaHdtxtteXOJh79xsGXQ4LLngc+mz1hLt+TNJza+
+BZhoWwY/ZgyiTH0pebGr/U0QUMoUHlGgjgj3Aa/XFpOhtyLU+IU/PYl0BUz9dqsN
+TDtv6p/HQhfd98vUNsbACQda+YAo+oRdO5kLQjcCgYB3OAZNcXxRte5EgoY5KqN8
+UGYH2++w7qKRGqZWvtamGYRyB557Zr+0gu0hmc4LHJrASGyJcHcOCaI8Ol7snxMP
+B7qJ9SA6kapTzCS361rQ+zBct/UrhPY9JuovPq4Q3i/luVXldf4t01otqGAvnY7s
+rnZS242nYa8v0tcKgdyDNQKBgB3Z60BzQyn1pBTrkT2ysU5tbOQz03OHVrvYg80l
+4gWDi5OWdgHQU1yI7pVHPX5aKLAYlGfFaQFuW0e1Jl6jFpoXOrbWsOn25RZom4Wk
+FUcKWEhkiRKrJYOEbRtTd3vucVlq6i5xqKX51zWKTZddCXE5NBq69Sm7rSPT0Sms
+UnaXAoGAXYAE5slvjcylJpMV4lxTBmNtA9+pw1T7I379mIyqZ0OS25nmpskHU7FR
+SQDSRHw7hHuyjEHyhMoHEGLfUMIltQoi+pcrieVQelJdSuX7VInzHPAR5RppUVFl
+jOZZKlIiqs+UfCoOgsIblXuw7a/ATnAnXakutSFgHU1lN1gN02U=
+-----END RSA PRIVATE KEY-----

+ 949 - 0
pkg/proxy/server.go

@@ -0,0 +1,949 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package proxy
+
+import (
+	"fmt"
+	"io"
+	mrand "math/rand"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/pkg/transport"
+
+	humanize "github.com/dustin/go-humanize"
+	"go.uber.org/zap"
+)
+
+// Server defines proxy server layer that simulates common network faults,
+// such as latency spikes, packet drop/corruption, etc..
+type Server interface {
+	// From returns proxy source address in "scheme://host:port" format.
+	From() string
+	// To returns proxy destination address in "scheme://host:port" format.
+	To() string
+
+	// Ready returns when proxy is ready to serve.
+	Ready() <-chan struct{}
+	// Done returns when proxy has been closed.
+	Done() <-chan struct{}
+	// Error sends errors while serving proxy.
+	Error() <-chan error
+	// Close closes listener and transport.
+	Close() error
+
+	// DelayAccept adds latency ± random variable to accepting new incoming connections.
+	DelayAccept(latency, rv time.Duration)
+	// UndelayAccept removes sending latencies.
+	UndelayAccept()
+	// LatencyAccept returns current latency on accepting new incoming connections.
+	LatencyAccept() time.Duration
+	// DelayTx adds latency ± random variable to "sending" layer.
+	DelayTx(latency, rv time.Duration)
+	// UndelayTx removes sending latencies.
+	UndelayTx()
+	// LatencyTx returns current send latency.
+	LatencyTx() time.Duration
+	// DelayRx adds latency ± random variable to "receiving" layer.
+	DelayRx(latency, rv time.Duration)
+	// UndelayRx removes "receiving" latencies.
+	UndelayRx()
+	// LatencyRx returns current receive latency.
+	LatencyRx() time.Duration
+
+	// PauseAccept stops accepting new connections.
+	PauseAccept()
+	// UnpauseAccept removes pause operation on accepting new connections.
+	UnpauseAccept()
+	// PauseTx stops "forwarding" packets.
+	PauseTx()
+	// UnpauseTx removes "forwarding" pause operation.
+	UnpauseTx()
+	// PauseRx stops "receiving" packets to client.
+	PauseRx()
+	// UnpauseRx removes "receiving" pause operation.
+	UnpauseRx()
+
+	// BlackholeTx drops all incoming packets before "forwarding".
+	BlackholeTx()
+	// UnblackholeTx removes blackhole operation on "sending".
+	UnblackholeTx()
+	// BlackholeRx drops all incoming packets to client.
+	BlackholeRx()
+	// UnblackholeRx removes blackhole operation on "receiving".
+	UnblackholeRx()
+
+	// CorruptTx corrupts incoming packets from the listener.
+	CorruptTx(f func(data []byte) []byte)
+	// UncorruptTx removes corrupt operation on "forwarding".
+	UncorruptTx()
+	// CorruptRx corrupts incoming packets to client.
+	CorruptRx(f func(data []byte) []byte)
+	// UncorruptRx removes corrupt operation on "receiving".
+	UncorruptRx()
+
+	// ResetListener closes and restarts listener.
+	ResetListener() error
+}
+
+type proxyServer struct {
+	lg *zap.Logger
+
+	from, to      url.URL
+	tlsInfo       transport.TLSInfo
+	dialTimeout   time.Duration
+	bufferSize    int
+	retryInterval time.Duration
+
+	readyc chan struct{}
+	donec  chan struct{}
+	errc   chan error
+
+	closeOnce sync.Once
+	closeWg   sync.WaitGroup
+
+	listenerMu sync.RWMutex
+	listener   net.Listener
+
+	latencyAcceptMu sync.RWMutex
+	latencyAccept   time.Duration
+	latencyTxMu     sync.RWMutex
+	latencyTx       time.Duration
+	latencyRxMu     sync.RWMutex
+	latencyRx       time.Duration
+
+	corruptTxMu sync.RWMutex
+	corruptTx   func(data []byte) []byte
+	corruptRxMu sync.RWMutex
+	corruptRx   func(data []byte) []byte
+
+	acceptMu     sync.Mutex
+	pauseAcceptc chan struct{}
+	txMu         sync.Mutex
+	pauseTxc     chan struct{}
+	blackholeTxc chan struct{}
+	rxMu         sync.Mutex
+	pauseRxc     chan struct{}
+	blackholeRxc chan struct{}
+}
+
+// ServerConfig defines proxy server configuration.
+type ServerConfig struct {
+	Logger        *zap.Logger
+	From          url.URL
+	To            url.URL
+	TLSInfo       transport.TLSInfo
+	DialTimeout   time.Duration
+	BufferSize    int
+	RetryInterval time.Duration
+}
+
+var (
+	defaultDialTimeout   = 3 * time.Second
+	defaultBufferSize    = 48 * 1024
+	defaultRetryInterval = 10 * time.Millisecond
+	defaultLogger        *zap.Logger
+)
+
+func init() {
+	var err error
+	defaultLogger, err = zap.NewProduction()
+	if err != nil {
+		panic(err)
+	}
+}
+
+// NewServer returns a proxy implementation with no iptables/tc dependencies.
+// The proxy layer overhead is <1ms.
+func NewServer(cfg ServerConfig) Server {
+	p := &proxyServer{
+		lg: cfg.Logger,
+
+		from:          cfg.From,
+		to:            cfg.To,
+		tlsInfo:       cfg.TLSInfo,
+		dialTimeout:   cfg.DialTimeout,
+		bufferSize:    cfg.BufferSize,
+		retryInterval: cfg.RetryInterval,
+
+		readyc: make(chan struct{}),
+		donec:  make(chan struct{}),
+		errc:   make(chan error, 16),
+
+		pauseAcceptc: make(chan struct{}),
+		pauseTxc:     make(chan struct{}),
+		blackholeTxc: make(chan struct{}),
+		pauseRxc:     make(chan struct{}),
+		blackholeRxc: make(chan struct{}),
+	}
+	if p.dialTimeout == 0 {
+		p.dialTimeout = defaultDialTimeout
+	}
+	if p.bufferSize == 0 {
+		p.bufferSize = defaultBufferSize
+	}
+	if p.retryInterval == 0 {
+		p.retryInterval = defaultRetryInterval
+	}
+	if p.lg == nil {
+		p.lg = defaultLogger
+	}
+	close(p.pauseAcceptc)
+	close(p.pauseTxc)
+	close(p.pauseRxc)
+
+	if strings.HasPrefix(p.from.Scheme, "http") {
+		p.from.Scheme = "tcp"
+	}
+	if strings.HasPrefix(p.to.Scheme, "http") {
+		p.to.Scheme = "tcp"
+	}
+
+	var ln net.Listener
+	var err error
+	if !p.tlsInfo.Empty() {
+		ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
+	} else {
+		ln, err = net.Listen(p.from.Scheme, p.from.Host)
+	}
+	if err != nil {
+		p.errc <- err
+		p.Close()
+		return p
+	}
+	p.listener = ln
+
+	p.closeWg.Add(1)
+	go p.listenAndServe()
+
+	p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
+	return p
+}
+
+func (p *proxyServer) From() string {
+	return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
+}
+
+func (p *proxyServer) To() string {
+	return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
+}
+
+// TODO: implement packet reordering from multiple TCP connections
+// buffer packets per connection for awhile, reorder before transmit
+// - https://github.com/coreos/etcd/issues/5614
+// - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
+
+func (p *proxyServer) listenAndServe() {
+	defer p.closeWg.Done()
+
+	p.lg.Info("proxy is listening on", zap.String("from", p.From()))
+	close(p.readyc)
+
+	for {
+		p.acceptMu.Lock()
+		pausec := p.pauseAcceptc
+		p.acceptMu.Unlock()
+		select {
+		case <-pausec:
+		case <-p.donec:
+			return
+		}
+
+		p.latencyAcceptMu.RLock()
+		lat := p.latencyAccept
+		p.latencyAcceptMu.RUnlock()
+		if lat > 0 {
+			select {
+			case <-time.After(lat):
+			case <-p.donec:
+				return
+			}
+		}
+
+		p.listenerMu.RLock()
+		ln := p.listener
+		p.listenerMu.RUnlock()
+
+		in, err := ln.Accept()
+		if err != nil {
+			select {
+			case p.errc <- err:
+				select {
+				case <-p.donec:
+					return
+				default:
+				}
+			case <-p.donec:
+				return
+			}
+			p.lg.Debug("listener accept error", zap.Error(err))
+
+			if strings.HasSuffix(err.Error(), "use of closed network connection") {
+				select {
+				case <-time.After(p.retryInterval):
+				case <-p.donec:
+					return
+				}
+				p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
+
+				if err = p.ResetListener(); err != nil {
+					select {
+					case p.errc <- err:
+						select {
+						case <-p.donec:
+							return
+						default:
+						}
+					case <-p.donec:
+						return
+					}
+					p.lg.Warn("failed to reset listener", zap.Error(err))
+				}
+			}
+
+			continue
+		}
+
+		var out net.Conn
+		if !p.tlsInfo.Empty() {
+			var tp *http.Transport
+			tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout)
+			if err != nil {
+				select {
+				case p.errc <- err:
+					select {
+					case <-p.donec:
+						return
+					default:
+					}
+				case <-p.donec:
+					return
+				}
+				continue
+			}
+			out, err = tp.Dial(p.to.Scheme, p.to.Host)
+		} else {
+			out, err = net.Dial(p.to.Scheme, p.to.Host)
+		}
+		if err != nil {
+			select {
+			case p.errc <- err:
+				select {
+				case <-p.donec:
+					return
+				default:
+				}
+			case <-p.donec:
+				return
+			}
+			p.lg.Debug("failed to dial", zap.Error(err))
+			continue
+		}
+
+		go func() {
+			// read incoming bytes from listener, dispatch to outgoing connection
+			p.transmit(out, in)
+			out.Close()
+			in.Close()
+		}()
+		go func() {
+			// read response from outgoing connection, write back to listener
+			p.receive(in, out)
+			in.Close()
+			out.Close()
+		}()
+	}
+}
+
+func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
+func (p *proxyServer) receive(dst io.Writer, src io.Reader)  { p.ioCopy(dst, src, false) }
+func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
+	buf := make([]byte, p.bufferSize)
+	for {
+		nr, err := src.Read(buf)
+		if err != nil {
+			if err == io.EOF {
+				return
+			}
+			// connection already closed
+			if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
+				return
+			}
+			if strings.HasSuffix(err.Error(), "use of closed network connection") {
+				return
+			}
+			select {
+			case p.errc <- err:
+				select {
+				case <-p.donec:
+					return
+				default:
+				}
+			case <-p.donec:
+				return
+			}
+			p.lg.Debug("failed to read", zap.Error(err))
+			return
+		}
+		if nr == 0 {
+			return
+		}
+		data := buf[:nr]
+
+		var pausec chan struct{}
+		var blackholec chan struct{}
+		if proxySend {
+			p.txMu.Lock()
+			pausec = p.pauseTxc
+			blackholec = p.blackholeTxc
+			p.txMu.Unlock()
+		} else {
+			p.rxMu.Lock()
+			pausec = p.pauseRxc
+			blackholec = p.blackholeRxc
+			p.rxMu.Unlock()
+		}
+		select {
+		case <-pausec:
+		case <-p.donec:
+			return
+		}
+		blackholed := false
+		select {
+		case <-blackholec:
+			blackholed = true
+		case <-p.donec:
+			return
+		default:
+		}
+		if blackholed {
+			if proxySend {
+				p.lg.Debug(
+					"dropped",
+					zap.String("data-size", humanize.Bytes(uint64(nr))),
+					zap.String("from", p.From()),
+					zap.String("to", p.To()),
+				)
+			} else {
+				p.lg.Debug(
+					"dropped",
+					zap.String("data-size", humanize.Bytes(uint64(nr))),
+					zap.String("from", p.To()),
+					zap.String("to", p.From()),
+				)
+			}
+			continue
+		}
+
+		var lat time.Duration
+		if proxySend {
+			p.latencyTxMu.RLock()
+			lat = p.latencyTx
+			p.latencyTxMu.RUnlock()
+		} else {
+			p.latencyRxMu.RLock()
+			lat = p.latencyRx
+			p.latencyRxMu.RUnlock()
+		}
+		if lat > 0 {
+			select {
+			case <-time.After(lat):
+			case <-p.donec:
+				return
+			}
+		}
+
+		if proxySend {
+			p.corruptTxMu.RLock()
+			if p.corruptTx != nil {
+				data = p.corruptTx(data)
+			}
+			p.corruptTxMu.RUnlock()
+		} else {
+			p.corruptRxMu.RLock()
+			if p.corruptRx != nil {
+				data = p.corruptRx(data)
+			}
+			p.corruptRxMu.RUnlock()
+		}
+
+		var nw int
+		nw, err = dst.Write(data)
+		if err != nil {
+			if err == io.EOF {
+				return
+			}
+			select {
+			case p.errc <- err:
+				select {
+				case <-p.donec:
+					return
+				default:
+				}
+			case <-p.donec:
+				return
+			}
+			if proxySend {
+				p.lg.Debug("failed to write while sending", zap.Error(err))
+			} else {
+				p.lg.Debug("failed to write while receiving", zap.Error(err))
+			}
+			return
+		}
+
+		if nr != nw {
+			select {
+			case p.errc <- io.ErrShortWrite:
+				select {
+				case <-p.donec:
+					return
+				default:
+				}
+			case <-p.donec:
+				return
+			}
+			if proxySend {
+				p.lg.Debug(
+					"failed to write while sending; read/write bytes are different",
+					zap.Int("read-bytes", nr),
+					zap.Int("write-bytes", nw),
+					zap.Error(io.ErrShortWrite),
+				)
+			} else {
+				p.lg.Debug(
+					"failed to write while receiving; read/write bytes are different",
+					zap.Int("read-bytes", nr),
+					zap.Int("write-bytes", nw),
+					zap.Error(io.ErrShortWrite),
+				)
+			}
+			return
+		}
+
+		if proxySend {
+			p.lg.Debug(
+				"transmitted",
+				zap.String("data-size", humanize.Bytes(uint64(nr))),
+				zap.String("from", p.From()),
+				zap.String("to", p.To()),
+			)
+		} else {
+			p.lg.Debug(
+				"received",
+				zap.String("data-size", humanize.Bytes(uint64(nr))),
+				zap.String("from", p.To()),
+				zap.String("to", p.From()),
+			)
+		}
+
+	}
+}
+
+func (p *proxyServer) Ready() <-chan struct{} { return p.readyc }
+func (p *proxyServer) Done() <-chan struct{}  { return p.donec }
+func (p *proxyServer) Error() <-chan error    { return p.errc }
+func (p *proxyServer) Close() (err error) {
+	p.closeOnce.Do(func() {
+		close(p.donec)
+		p.listenerMu.Lock()
+		if p.listener != nil {
+			err = p.listener.Close()
+			p.lg.Info(
+				"closed proxy listener",
+				zap.String("from", p.From()),
+				zap.String("to", p.To()),
+			)
+		}
+		p.lg.Sync()
+		p.listenerMu.Unlock()
+	})
+	p.closeWg.Wait()
+	return err
+}
+
+func (p *proxyServer) DelayAccept(latency, rv time.Duration) {
+	if latency <= 0 {
+		return
+	}
+	d := computeLatency(latency, rv)
+	p.latencyAcceptMu.Lock()
+	p.latencyAccept = d
+	p.latencyAcceptMu.Unlock()
+
+	p.lg.Info(
+		"set accept latency",
+		zap.Duration("latency", d),
+		zap.Duration("given-latency", latency),
+		zap.Duration("given-latency-random-variable", rv),
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UndelayAccept() {
+	p.latencyAcceptMu.Lock()
+	d := p.latencyAccept
+	p.latencyAccept = 0
+	p.latencyAcceptMu.Unlock()
+
+	p.lg.Info(
+		"removed accept latency",
+		zap.Duration("latency", d),
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) LatencyAccept() time.Duration {
+	p.latencyAcceptMu.RLock()
+	d := p.latencyAccept
+	p.latencyAcceptMu.RUnlock()
+	return d
+}
+
+func (p *proxyServer) DelayTx(latency, rv time.Duration) {
+	if latency <= 0 {
+		return
+	}
+	d := computeLatency(latency, rv)
+	p.latencyTxMu.Lock()
+	p.latencyTx = d
+	p.latencyTxMu.Unlock()
+
+	p.lg.Info(
+		"set transmit latency",
+		zap.Duration("latency", d),
+		zap.Duration("given-latency", latency),
+		zap.Duration("given-latency-random-variable", rv),
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UndelayTx() {
+	p.latencyTxMu.Lock()
+	d := p.latencyTx
+	p.latencyTx = 0
+	p.latencyTxMu.Unlock()
+
+	p.lg.Info(
+		"removed transmit latency",
+		zap.Duration("latency", d),
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) LatencyTx() time.Duration {
+	p.latencyTxMu.RLock()
+	d := p.latencyTx
+	p.latencyTxMu.RUnlock()
+	return d
+}
+
+func (p *proxyServer) DelayRx(latency, rv time.Duration) {
+	if latency <= 0 {
+		return
+	}
+	d := computeLatency(latency, rv)
+	p.latencyRxMu.Lock()
+	p.latencyRx = d
+	p.latencyRxMu.Unlock()
+
+	p.lg.Info(
+		"set receive latency",
+		zap.Duration("latency", d),
+		zap.Duration("given-latency", latency),
+		zap.Duration("given-latency-random-variable", rv),
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) UndelayRx() {
+	p.latencyRxMu.Lock()
+	d := p.latencyRx
+	p.latencyRx = 0
+	p.latencyRxMu.Unlock()
+
+	p.lg.Info(
+		"removed receive latency",
+		zap.Duration("latency", d),
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) LatencyRx() time.Duration {
+	p.latencyRxMu.RLock()
+	d := p.latencyRx
+	p.latencyRxMu.RUnlock()
+	return d
+}
+
+func computeLatency(lat, rv time.Duration) time.Duration {
+	if rv == 0 {
+		return lat
+	}
+	if rv < 0 {
+		rv *= -1
+	}
+	if rv > lat {
+		rv = lat / 10
+	}
+	now := time.Now()
+	mrand.Seed(int64(now.Nanosecond()))
+	sign := 1
+	if now.Second()%2 == 0 {
+		sign = -1
+	}
+	return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
+}
+
+func (p *proxyServer) PauseAccept() {
+	p.acceptMu.Lock()
+	p.pauseAcceptc = make(chan struct{})
+	p.acceptMu.Unlock()
+
+	p.lg.Info(
+		"paused accepting new connections",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UnpauseAccept() {
+	p.acceptMu.Lock()
+	select {
+	case <-p.pauseAcceptc: // already unpaused
+	case <-p.donec:
+		p.acceptMu.Unlock()
+		return
+	default:
+		close(p.pauseAcceptc)
+	}
+	p.acceptMu.Unlock()
+
+	p.lg.Info(
+		"unpaused accepting new connections",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) PauseTx() {
+	p.txMu.Lock()
+	p.pauseTxc = make(chan struct{})
+	p.txMu.Unlock()
+
+	p.lg.Info(
+		"paused transmit listen",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UnpauseTx() {
+	p.txMu.Lock()
+	select {
+	case <-p.pauseTxc: // already unpaused
+	case <-p.donec:
+		p.txMu.Unlock()
+		return
+	default:
+		close(p.pauseTxc)
+	}
+	p.txMu.Unlock()
+
+	p.lg.Info(
+		"unpaused transmit listen",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) PauseRx() {
+	p.rxMu.Lock()
+	p.pauseRxc = make(chan struct{})
+	p.rxMu.Unlock()
+
+	p.lg.Info(
+		"paused receive listen",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) UnpauseRx() {
+	p.rxMu.Lock()
+	select {
+	case <-p.pauseRxc: // already unpaused
+	case <-p.donec:
+		p.rxMu.Unlock()
+		return
+	default:
+		close(p.pauseRxc)
+	}
+	p.rxMu.Unlock()
+
+	p.lg.Info(
+		"unpaused receive listen",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) BlackholeTx() {
+	p.txMu.Lock()
+	select {
+	case <-p.blackholeTxc: // already blackholed
+	case <-p.donec:
+		p.txMu.Unlock()
+		return
+	default:
+		close(p.blackholeTxc)
+	}
+	p.txMu.Unlock()
+
+	p.lg.Info(
+		"blackholed transmit",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UnblackholeTx() {
+	p.txMu.Lock()
+	p.blackholeTxc = make(chan struct{})
+	p.txMu.Unlock()
+
+	p.lg.Info(
+		"unblackholed transmit",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) BlackholeRx() {
+	p.rxMu.Lock()
+	select {
+	case <-p.blackholeRxc: // already blackholed
+	case <-p.donec:
+		p.rxMu.Unlock()
+		return
+	default:
+		close(p.blackholeRxc)
+	}
+	p.rxMu.Unlock()
+
+	p.lg.Info(
+		"blackholed receive",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) UnblackholeRx() {
+	p.rxMu.Lock()
+	p.blackholeRxc = make(chan struct{})
+	p.rxMu.Unlock()
+
+	p.lg.Info(
+		"unblackholed receive",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) CorruptTx(f func([]byte) []byte) {
+	p.corruptTxMu.Lock()
+	p.corruptTx = f
+	p.corruptTxMu.Unlock()
+
+	p.lg.Info(
+		"corrupting transmit",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) UncorruptTx() {
+	p.corruptTxMu.Lock()
+	p.corruptTx = nil
+	p.corruptTxMu.Unlock()
+
+	p.lg.Info(
+		"stopped corrupting transmit",
+		zap.String("from", p.From()),
+		zap.String("to", p.To()),
+	)
+}
+
+func (p *proxyServer) CorruptRx(f func([]byte) []byte) {
+	p.corruptRxMu.Lock()
+	p.corruptRx = f
+	p.corruptRxMu.Unlock()
+	p.lg.Info(
+		"corrupting receive",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) UncorruptRx() {
+	p.corruptRxMu.Lock()
+	p.corruptRx = nil
+	p.corruptRxMu.Unlock()
+
+	p.lg.Info(
+		"stopped corrupting receive",
+		zap.String("from", p.To()),
+		zap.String("to", p.From()),
+	)
+}
+
+func (p *proxyServer) ResetListener() error {
+	p.listenerMu.Lock()
+	defer p.listenerMu.Unlock()
+
+	if err := p.listener.Close(); err != nil {
+		// already closed
+		if !strings.HasSuffix(err.Error(), "use of closed network connection") {
+			return err
+		}
+	}
+
+	var ln net.Listener
+	var err error
+	if !p.tlsInfo.Empty() {
+		ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
+	} else {
+		ln, err = net.Listen(p.from.Scheme, p.from.Host)
+	}
+	if err != nil {
+		return err
+	}
+	p.listener = ln
+
+	p.lg.Info(
+		"reset listener on",
+		zap.String("from", p.From()),
+	)
+	return nil
+}

+ 611 - 0
pkg/proxy/server_test.go

@@ -0,0 +1,611 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package proxy
+
+import (
+	"bytes"
+	"crypto/tls"
+	"fmt"
+	"io/ioutil"
+	"math/rand"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/pkg/transport"
+
+	"go.uber.org/zap"
+)
+
+// enable DebugLevel
+var testLogger = zap.NewExample()
+
+var testTLSInfo = transport.TLSInfo{
+	KeyFile:        "./fixtures/server.key.insecure",
+	CertFile:       "./fixtures/server.crt",
+	TrustedCAFile:  "./fixtures/ca.crt",
+	ClientCertAuth: true,
+}
+
+func TestServer_Unix_Insecure(t *testing.T)         { testServer(t, "unix", false, false) }
+func TestServer_TCP_Insecure(t *testing.T)          { testServer(t, "tcp", false, false) }
+func TestServer_Unix_Secure(t *testing.T)           { testServer(t, "unix", true, false) }
+func TestServer_TCP_Secure(t *testing.T)            { testServer(t, "tcp", true, false) }
+func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
+func TestServer_TCP_Insecure_DelayTx(t *testing.T)  { testServer(t, "tcp", false, true) }
+func TestServer_Unix_Secure_DelayTx(t *testing.T)   { testServer(t, "unix", true, true) }
+func TestServer_TCP_Secure_DelayTx(t *testing.T)    { testServer(t, "tcp", true, true) }
+func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	if scheme == "tcp" {
+		ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
+		srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
+		ln1.Close()
+		ln2.Close()
+	} else {
+		defer func() {
+			os.RemoveAll(srcAddr)
+			os.RemoveAll(dstAddr)
+		}()
+	}
+	tlsInfo := testTLSInfo
+	if !secure {
+		tlsInfo = transport.TLSInfo{}
+	}
+	ln := listen(t, scheme, dstAddr, tlsInfo)
+	defer ln.Close()
+
+	cfg := ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	}
+	if secure {
+		cfg.TLSInfo = testTLSInfo
+	}
+	p := NewServer(cfg)
+	<-p.Ready()
+	defer p.Close()
+
+	data1 := []byte("Hello World!")
+	donec, writec := make(chan struct{}), make(chan []byte)
+
+	go func() {
+		defer close(donec)
+		for data := range writec {
+			send(t, data, scheme, srcAddr, tlsInfo)
+		}
+	}()
+
+	recvc := make(chan []byte)
+	go func() {
+		for i := 0; i < 2; i++ {
+			recvc <- receive(t, ln)
+		}
+	}()
+
+	writec <- data1
+	now := time.Now()
+	if d := <-recvc; !bytes.Equal(data1, d) {
+		t.Fatalf("expected %q, got %q", string(data1), string(d))
+	}
+	took1 := time.Since(now)
+	t.Logf("took %v with no latency", took1)
+
+	lat, rv := 50*time.Millisecond, 5*time.Millisecond
+	if delayTx {
+		p.DelayTx(lat, rv)
+	}
+
+	data2 := []byte("new data")
+	writec <- data2
+	now = time.Now()
+	if d := <-recvc; !bytes.Equal(data2, d) {
+		t.Fatalf("expected %q, got %q", string(data2), string(d))
+	}
+	took2 := time.Since(now)
+	if delayTx {
+		t.Logf("took %v with latency %v±%v", took2, lat, rv)
+	} else {
+		t.Logf("took %v with no latency", took2)
+	}
+
+	if delayTx {
+		p.UndelayTx()
+		if took1 >= took2 {
+			t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
+		}
+	}
+
+	close(writec)
+	select {
+	case <-donec:
+	case <-time.After(3 * time.Second):
+		t.Fatal("took too long to write")
+	}
+
+	select {
+	case <-p.Done():
+		t.Fatal("unexpected done")
+	case err := <-p.Error():
+		t.Fatal(err)
+	default:
+	}
+
+	if err := p.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-p.Done():
+	case err := <-p.Error():
+		if !strings.HasPrefix(err.Error(), "accept ") &&
+			!strings.HasSuffix(err.Error(), "use of closed network connection") {
+			t.Fatal(err)
+		}
+	case <-time.After(3 * time.Second):
+		t.Fatal("took too long to close")
+	}
+}
+
+func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
+func TestServer_Unix_Secure_DelayAccept(t *testing.T)   { testServerDelayAccept(t, true) }
+func testServerDelayAccept(t *testing.T, secure bool) {
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+	tlsInfo := testTLSInfo
+	if !secure {
+		tlsInfo = transport.TLSInfo{}
+	}
+	scheme := "unix"
+	ln := listen(t, scheme, dstAddr, tlsInfo)
+	defer ln.Close()
+
+	cfg := ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	}
+	if secure {
+		cfg.TLSInfo = testTLSInfo
+	}
+	p := NewServer(cfg)
+	<-p.Ready()
+	defer p.Close()
+
+	data := []byte("Hello World!")
+
+	now := time.Now()
+	send(t, data, scheme, srcAddr, tlsInfo)
+	if d := receive(t, ln); !bytes.Equal(data, d) {
+		t.Fatalf("expected %q, got %q", string(data), string(d))
+	}
+	took1 := time.Since(now)
+	t.Logf("took %v with no latency", took1)
+
+	lat, rv := 700*time.Millisecond, 10*time.Millisecond
+	p.DelayAccept(lat, rv)
+	defer p.UndelayAccept()
+	if err := p.ResetListener(); err != nil {
+		t.Fatal(err)
+	}
+	time.Sleep(200 * time.Millisecond)
+
+	now = time.Now()
+	send(t, data, scheme, srcAddr, tlsInfo)
+	if d := receive(t, ln); !bytes.Equal(data, d) {
+		t.Fatalf("expected %q, got %q", string(data), string(d))
+	}
+	took2 := time.Since(now)
+	t.Logf("took %v with latency %v±%v", took2, lat, rv)
+
+	if took1 >= took2 {
+		t.Fatalf("expected took1 %v < took2 %v", took1, took2)
+	}
+}
+
+func TestServer_PauseTx(t *testing.T) {
+	scheme := "unix"
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+	ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	p := NewServer(ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	})
+	<-p.Ready()
+	defer p.Close()
+
+	p.PauseTx()
+
+	data := []byte("Hello World!")
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+
+	recvc := make(chan []byte)
+	go func() {
+		recvc <- receive(t, ln)
+	}()
+
+	select {
+	case d := <-recvc:
+		t.Fatalf("received unexpected data %q during pause", string(d))
+	case <-time.After(200 * time.Millisecond):
+	}
+
+	p.UnpauseTx()
+
+	select {
+	case d := <-recvc:
+		if !bytes.Equal(data, d) {
+			t.Fatalf("expected %q, got %q", string(data), string(d))
+		}
+	case <-time.After(2 * time.Second):
+		t.Fatal("took too long to receive after unpause")
+	}
+}
+
+func TestServer_BlackholeTx(t *testing.T) {
+	scheme := "unix"
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+	ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	p := NewServer(ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	})
+	<-p.Ready()
+	defer p.Close()
+
+	p.BlackholeTx()
+
+	data := []byte("Hello World!")
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+
+	recvc := make(chan []byte)
+	go func() {
+		recvc <- receive(t, ln)
+	}()
+
+	select {
+	case d := <-recvc:
+		t.Fatalf("unexpected data receive %q during blackhole", string(d))
+	case <-time.After(200 * time.Millisecond):
+	}
+
+	p.UnblackholeTx()
+
+	// expect different data, old data dropped
+	data[0]++
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+
+	select {
+	case d := <-recvc:
+		if !bytes.Equal(data, d) {
+			t.Fatalf("expected %q, got %q", string(data), string(d))
+		}
+	case <-time.After(2 * time.Second):
+		t.Fatal("took too long to receive after unblackhole")
+	}
+}
+
+func TestServer_CorruptTx(t *testing.T) {
+	scheme := "unix"
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+	ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	p := NewServer(ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	})
+	<-p.Ready()
+	defer p.Close()
+
+	p.CorruptTx(func(d []byte) []byte {
+		d[len(d)/2]++
+		return d
+	})
+	data := []byte("Hello World!")
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+	if d := receive(t, ln); bytes.Equal(d, data) {
+		t.Fatalf("expected corrupted data, got %q", string(d))
+	}
+
+	p.UncorruptTx()
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+	if d := receive(t, ln); !bytes.Equal(d, data) {
+		t.Fatalf("expected uncorrupted data, got %q", string(d))
+	}
+}
+
+func TestServer_Shutdown(t *testing.T) {
+	scheme := "unix"
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+	ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	p := NewServer(ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	})
+	<-p.Ready()
+	defer p.Close()
+
+	px, _ := p.(*proxyServer)
+	px.listener.Close()
+	time.Sleep(200 * time.Millisecond)
+
+	data := []byte("Hello World!")
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+	if d := receive(t, ln); !bytes.Equal(d, data) {
+		t.Fatalf("expected %q, got %q", string(data), string(d))
+	}
+}
+
+func TestServer_ShutdownListener(t *testing.T) {
+	scheme := "unix"
+	srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
+	defer func() {
+		os.RemoveAll(srcAddr)
+		os.RemoveAll(dstAddr)
+	}()
+
+	ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	p := NewServer(ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	})
+	<-p.Ready()
+	defer p.Close()
+
+	// shut down destination
+	ln.Close()
+	time.Sleep(200 * time.Millisecond)
+
+	ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
+	defer ln.Close()
+
+	data := []byte("Hello World!")
+	send(t, data, scheme, srcAddr, transport.TLSInfo{})
+	if d := receive(t, ln); !bytes.Equal(d, data) {
+		t.Fatalf("expected %q, got %q", string(data), string(d))
+	}
+}
+
+func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
+func TestServerHTTP_Secure_DelayTx(t *testing.T)   { testServerHTTP(t, true, true) }
+func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
+func TestServerHTTP_Secure_DelayRx(t *testing.T)   { testServerHTTP(t, true, false) }
+func testServerHTTP(t *testing.T, secure, delayTx bool) {
+	scheme := "tcp"
+	ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
+	srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
+	ln1.Close()
+	ln2.Close()
+
+	mux := http.NewServeMux()
+	mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
+		d, err := ioutil.ReadAll(req.Body)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
+			t.Fatal(err)
+		}
+	})
+	var tlsConfig *tls.Config
+	var err error
+	if secure {
+		tlsConfig, err = testTLSInfo.ServerConfig()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	srv := &http.Server{
+		Addr:      dstAddr,
+		Handler:   mux,
+		TLSConfig: tlsConfig,
+	}
+
+	donec := make(chan struct{})
+	defer func() {
+		srv.Close()
+		<-donec
+	}()
+	go func() {
+		defer close(donec)
+		if !secure {
+			srv.ListenAndServe()
+		} else {
+			srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
+		}
+	}()
+	time.Sleep(200 * time.Millisecond)
+
+	cfg := ServerConfig{
+		Logger: testLogger,
+		From:   url.URL{Scheme: scheme, Host: srcAddr},
+		To:     url.URL{Scheme: scheme, Host: dstAddr},
+	}
+	if secure {
+		cfg.TLSInfo = testTLSInfo
+	}
+	p := NewServer(cfg)
+	<-p.Ready()
+	defer p.Close()
+
+	data := "Hello World!"
+
+	now := time.Now()
+	var resp *http.Response
+	if secure {
+		tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
+		if terr != nil {
+			t.Fatal(terr)
+		}
+		cli := &http.Client{Transport: tp}
+		resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
+	} else {
+		resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
+	}
+	if err != nil {
+		t.Fatal(err)
+	}
+	d, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+	took1 := time.Since(now)
+	t.Logf("took %v with no latency", took1)
+
+	rs1 := string(d)
+	exp := fmt.Sprintf("%q(confirmed)", data)
+	if rs1 != exp {
+		t.Fatalf("got %q, expected %q", rs1, exp)
+	}
+
+	lat, rv := 100*time.Millisecond, 10*time.Millisecond
+	if delayTx {
+		p.DelayTx(lat, rv)
+		defer p.UndelayTx()
+	} else {
+		p.DelayRx(lat, rv)
+		defer p.UndelayRx()
+	}
+
+	now = time.Now()
+	if secure {
+		tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
+		if terr != nil {
+			t.Fatal(terr)
+		}
+		cli := &http.Client{Transport: tp}
+		resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
+	} else {
+		resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
+	}
+	if err != nil {
+		t.Fatal(err)
+	}
+	d, err = ioutil.ReadAll(resp.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+	took2 := time.Since(now)
+	t.Logf("took %v with latency %v±%v", took2, lat, rv)
+
+	rs2 := string(d)
+	if rs2 != exp {
+		t.Fatalf("got %q, expected %q", rs2, exp)
+	}
+	if took1 > took2 {
+		t.Fatalf("expected took1 %v < took2 %v", took1, took2)
+	}
+}
+
+func newUnixAddr() string {
+	now := time.Now().UnixNano()
+	rand.Seed(now)
+	addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
+	os.RemoveAll(addr)
+	return addr
+}
+
+func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
+	var err error
+	if !tlsInfo.Empty() {
+		ln, err = transport.NewListener(addr, scheme, &tlsInfo)
+	} else {
+		ln, err = net.Listen(scheme, addr)
+	}
+	if err != nil {
+		t.Fatal(err)
+	}
+	return ln
+}
+
+func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
+	var out net.Conn
+	var err error
+	if !tlsInfo.Empty() {
+		tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
+		if terr != nil {
+			t.Fatal(terr)
+		}
+		out, err = tp.Dial(scheme, addr)
+	} else {
+		out, err = net.Dial(scheme, addr)
+	}
+	if err != nil {
+		t.Fatal(err)
+	}
+	if _, err = out.Write(data); err != nil {
+		t.Fatal(err)
+	}
+	if err = out.Close(); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func receive(t *testing.T, ln net.Listener) (data []byte) {
+	buf := bytes.NewBuffer(make([]byte, 0, 1024))
+	for {
+		in, err := ln.Accept()
+		if err != nil {
+			t.Fatal(err)
+		}
+		var n int64
+		n, err = buf.ReadFrom(in)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if n > 0 {
+			break
+		}
+	}
+	return buf.Bytes()
+}