// Copyright 2018 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package proxy import ( "fmt" "io" mrand "math/rand" "net" "net/http" "net/url" "strings" "sync" "time" "github.com/coreos/etcd/pkg/transport" humanize "github.com/dustin/go-humanize" "go.uber.org/zap" ) // Server defines proxy server layer that simulates common network faults, // such as latency spikes, packet drop/corruption, etc.. type Server interface { // From returns proxy source address in "scheme://host:port" format. From() string // To returns proxy destination address in "scheme://host:port" format. To() string // Ready returns when proxy is ready to serve. Ready() <-chan struct{} // Done returns when proxy has been closed. Done() <-chan struct{} // Error sends errors while serving proxy. Error() <-chan error // Close closes listener and transport. Close() error // DelayAccept adds latency ± random variable to accepting new incoming connections. DelayAccept(latency, rv time.Duration) // UndelayAccept removes sending latencies. UndelayAccept() // LatencyAccept returns current latency on accepting new incoming connections. LatencyAccept() time.Duration // DelayTx adds latency ± random variable to "sending" layer. DelayTx(latency, rv time.Duration) // UndelayTx removes sending latencies. UndelayTx() // LatencyTx returns current send latency. LatencyTx() time.Duration // DelayRx adds latency ± random variable to "receiving" layer. DelayRx(latency, rv time.Duration) // UndelayRx removes "receiving" latencies. UndelayRx() // LatencyRx returns current receive latency. LatencyRx() time.Duration // PauseAccept stops accepting new connections. PauseAccept() // UnpauseAccept removes pause operation on accepting new connections. UnpauseAccept() // PauseTx stops "forwarding" packets. PauseTx() // UnpauseTx removes "forwarding" pause operation. UnpauseTx() // PauseRx stops "receiving" packets to client. PauseRx() // UnpauseRx removes "receiving" pause operation. UnpauseRx() // BlackholeTx drops all incoming packets before "forwarding". BlackholeTx() // UnblackholeTx removes blackhole operation on "sending". UnblackholeTx() // BlackholeRx drops all incoming packets to client. BlackholeRx() // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() // CorruptTx corrupts incoming packets from the listener. CorruptTx(f func(data []byte) []byte) // UncorruptTx removes corrupt operation on "forwarding". UncorruptTx() // CorruptRx corrupts incoming packets to client. CorruptRx(f func(data []byte) []byte) // UncorruptRx removes corrupt operation on "receiving". UncorruptRx() // ResetListener closes and restarts listener. ResetListener() error } type proxyServer struct { lg *zap.Logger from, to url.URL tlsInfo transport.TLSInfo dialTimeout time.Duration bufferSize int retryInterval time.Duration readyc chan struct{} donec chan struct{} errc chan error closeOnce sync.Once closeWg sync.WaitGroup listenerMu sync.RWMutex listener net.Listener latencyAcceptMu sync.RWMutex latencyAccept time.Duration latencyTxMu sync.RWMutex latencyTx time.Duration latencyRxMu sync.RWMutex latencyRx time.Duration corruptTxMu sync.RWMutex corruptTx func(data []byte) []byte corruptRxMu sync.RWMutex corruptRx func(data []byte) []byte acceptMu sync.Mutex pauseAcceptc chan struct{} txMu sync.Mutex pauseTxc chan struct{} blackholeTxc chan struct{} rxMu sync.Mutex pauseRxc chan struct{} blackholeRxc chan struct{} } // ServerConfig defines proxy server configuration. type ServerConfig struct { Logger *zap.Logger From url.URL To url.URL TLSInfo transport.TLSInfo DialTimeout time.Duration BufferSize int RetryInterval time.Duration } var ( defaultDialTimeout = 3 * time.Second defaultBufferSize = 48 * 1024 defaultRetryInterval = 10 * time.Millisecond defaultLogger *zap.Logger ) func init() { var err error defaultLogger, err = zap.NewProduction() if err != nil { panic(err) } } // NewServer returns a proxy implementation with no iptables/tc dependencies. // The proxy layer overhead is <1ms. func NewServer(cfg ServerConfig) Server { p := &proxyServer{ lg: cfg.Logger, from: cfg.From, to: cfg.To, tlsInfo: cfg.TLSInfo, dialTimeout: cfg.DialTimeout, bufferSize: cfg.BufferSize, retryInterval: cfg.RetryInterval, readyc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 16), pauseAcceptc: make(chan struct{}), pauseTxc: make(chan struct{}), blackholeTxc: make(chan struct{}), pauseRxc: make(chan struct{}), blackholeRxc: make(chan struct{}), } if p.dialTimeout == 0 { p.dialTimeout = defaultDialTimeout } if p.bufferSize == 0 { p.bufferSize = defaultBufferSize } if p.retryInterval == 0 { p.retryInterval = defaultRetryInterval } if p.lg == nil { p.lg = defaultLogger } close(p.pauseAcceptc) close(p.pauseTxc) close(p.pauseRxc) if strings.HasPrefix(p.from.Scheme, "http") { p.from.Scheme = "tcp" } if strings.HasPrefix(p.to.Scheme, "http") { p.to.Scheme = "tcp" } var ln net.Listener var err error if !p.tlsInfo.Empty() { ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) } else { ln, err = net.Listen(p.from.Scheme, p.from.Host) } if err != nil { p.errc <- err p.Close() return p } p.listener = ln p.closeWg.Add(1) go p.listenAndServe() p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To())) return p } func (p *proxyServer) From() string { return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host) } func (p *proxyServer) To() string { return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host) } // TODO: implement packet reordering from multiple TCP connections // buffer packets per connection for awhile, reorder before transmit // - https://github.com/coreos/etcd/issues/5614 // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034 func (p *proxyServer) listenAndServe() { defer p.closeWg.Done() p.lg.Info("proxy is listening on", zap.String("from", p.From())) close(p.readyc) for { p.acceptMu.Lock() pausec := p.pauseAcceptc p.acceptMu.Unlock() select { case <-pausec: case <-p.donec: return } p.latencyAcceptMu.RLock() lat := p.latencyAccept p.latencyAcceptMu.RUnlock() if lat > 0 { select { case <-time.After(lat): case <-p.donec: return } } p.listenerMu.RLock() ln := p.listener p.listenerMu.RUnlock() in, err := ln.Accept() if err != nil { select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } p.lg.Debug("listener accept error", zap.Error(err)) if strings.HasSuffix(err.Error(), "use of closed network connection") { select { case <-time.After(p.retryInterval): case <-p.donec: return } p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From())) if err = p.ResetListener(); err != nil { select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } p.lg.Warn("failed to reset listener", zap.Error(err)) } } continue } var out net.Conn if !p.tlsInfo.Empty() { var tp *http.Transport tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout) if err != nil { select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } continue } out, err = tp.Dial(p.to.Scheme, p.to.Host) } else { out, err = net.Dial(p.to.Scheme, p.to.Host) } if err != nil { select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } p.lg.Debug("failed to dial", zap.Error(err)) continue } go func() { // read incoming bytes from listener, dispatch to outgoing connection p.transmit(out, in) out.Close() in.Close() }() go func() { // read response from outgoing connection, write back to listener p.receive(in, out) in.Close() out.Close() }() } } func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) } func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) } func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) { buf := make([]byte, p.bufferSize) for { nr, err := src.Read(buf) if err != nil { if err == io.EOF { return } // connection already closed if strings.HasSuffix(err.Error(), "read: connection reset by peer") { return } if strings.HasSuffix(err.Error(), "use of closed network connection") { return } select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } p.lg.Debug("failed to read", zap.Error(err)) return } if nr == 0 { return } data := buf[:nr] var pausec chan struct{} var blackholec chan struct{} if proxySend { p.txMu.Lock() pausec = p.pauseTxc blackholec = p.blackholeTxc p.txMu.Unlock() } else { p.rxMu.Lock() pausec = p.pauseRxc blackholec = p.blackholeRxc p.rxMu.Unlock() } select { case <-pausec: case <-p.donec: return } blackholed := false select { case <-blackholec: blackholed = true case <-p.donec: return default: } if blackholed { if proxySend { p.lg.Debug( "dropped", zap.String("data-size", humanize.Bytes(uint64(nr))), zap.String("from", p.From()), zap.String("to", p.To()), ) } else { p.lg.Debug( "dropped", zap.String("data-size", humanize.Bytes(uint64(nr))), zap.String("from", p.To()), zap.String("to", p.From()), ) } continue } var lat time.Duration if proxySend { p.latencyTxMu.RLock() lat = p.latencyTx p.latencyTxMu.RUnlock() } else { p.latencyRxMu.RLock() lat = p.latencyRx p.latencyRxMu.RUnlock() } if lat > 0 { select { case <-time.After(lat): case <-p.donec: return } } if proxySend { p.corruptTxMu.RLock() if p.corruptTx != nil { data = p.corruptTx(data) } p.corruptTxMu.RUnlock() } else { p.corruptRxMu.RLock() if p.corruptRx != nil { data = p.corruptRx(data) } p.corruptRxMu.RUnlock() } var nw int nw, err = dst.Write(data) if err != nil { if err == io.EOF { return } select { case p.errc <- err: select { case <-p.donec: return default: } case <-p.donec: return } if proxySend { p.lg.Debug("failed to write while sending", zap.Error(err)) } else { p.lg.Debug("failed to write while receiving", zap.Error(err)) } return } if nr != nw { select { case p.errc <- io.ErrShortWrite: select { case <-p.donec: return default: } case <-p.donec: return } if proxySend { p.lg.Debug( "failed to write while sending; read/write bytes are different", zap.Int("read-bytes", nr), zap.Int("write-bytes", nw), zap.Error(io.ErrShortWrite), ) } else { p.lg.Debug( "failed to write while receiving; read/write bytes are different", zap.Int("read-bytes", nr), zap.Int("write-bytes", nw), zap.Error(io.ErrShortWrite), ) } return } if proxySend { p.lg.Debug( "transmitted", zap.String("data-size", humanize.Bytes(uint64(nr))), zap.String("from", p.From()), zap.String("to", p.To()), ) } else { p.lg.Debug( "received", zap.String("data-size", humanize.Bytes(uint64(nr))), zap.String("from", p.To()), zap.String("to", p.From()), ) } } } func (p *proxyServer) Ready() <-chan struct{} { return p.readyc } func (p *proxyServer) Done() <-chan struct{} { return p.donec } func (p *proxyServer) Error() <-chan error { return p.errc } func (p *proxyServer) Close() (err error) { p.closeOnce.Do(func() { close(p.donec) p.listenerMu.Lock() if p.listener != nil { err = p.listener.Close() p.lg.Info( "closed proxy listener", zap.String("from", p.From()), zap.String("to", p.To()), ) } p.lg.Sync() p.listenerMu.Unlock() }) p.closeWg.Wait() return err } func (p *proxyServer) DelayAccept(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) p.latencyAcceptMu.Lock() p.latencyAccept = d p.latencyAcceptMu.Unlock() p.lg.Info( "set accept latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UndelayAccept() { p.latencyAcceptMu.Lock() d := p.latencyAccept p.latencyAccept = 0 p.latencyAcceptMu.Unlock() p.lg.Info( "removed accept latency", zap.Duration("latency", d), zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) LatencyAccept() time.Duration { p.latencyAcceptMu.RLock() d := p.latencyAccept p.latencyAcceptMu.RUnlock() return d } func (p *proxyServer) DelayTx(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) p.latencyTxMu.Lock() p.latencyTx = d p.latencyTxMu.Unlock() p.lg.Info( "set transmit latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UndelayTx() { p.latencyTxMu.Lock() d := p.latencyTx p.latencyTx = 0 p.latencyTxMu.Unlock() p.lg.Info( "removed transmit latency", zap.Duration("latency", d), zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) LatencyTx() time.Duration { p.latencyTxMu.RLock() d := p.latencyTx p.latencyTxMu.RUnlock() return d } func (p *proxyServer) DelayRx(latency, rv time.Duration) { if latency <= 0 { return } d := computeLatency(latency, rv) p.latencyRxMu.Lock() p.latencyRx = d p.latencyRxMu.Unlock() p.lg.Info( "set receive latency", zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) UndelayRx() { p.latencyRxMu.Lock() d := p.latencyRx p.latencyRx = 0 p.latencyRxMu.Unlock() p.lg.Info( "removed receive latency", zap.Duration("latency", d), zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) LatencyRx() time.Duration { p.latencyRxMu.RLock() d := p.latencyRx p.latencyRxMu.RUnlock() return d } func computeLatency(lat, rv time.Duration) time.Duration { if rv == 0 { return lat } if rv < 0 { rv *= -1 } if rv > lat { rv = lat / 10 } now := time.Now() mrand.Seed(int64(now.Nanosecond())) sign := 1 if now.Second()%2 == 0 { sign = -1 } return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) } func (p *proxyServer) PauseAccept() { p.acceptMu.Lock() p.pauseAcceptc = make(chan struct{}) p.acceptMu.Unlock() p.lg.Info( "paused accepting new connections", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UnpauseAccept() { p.acceptMu.Lock() select { case <-p.pauseAcceptc: // already unpaused case <-p.donec: p.acceptMu.Unlock() return default: close(p.pauseAcceptc) } p.acceptMu.Unlock() p.lg.Info( "unpaused accepting new connections", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) PauseTx() { p.txMu.Lock() p.pauseTxc = make(chan struct{}) p.txMu.Unlock() p.lg.Info( "paused transmit listen", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UnpauseTx() { p.txMu.Lock() select { case <-p.pauseTxc: // already unpaused case <-p.donec: p.txMu.Unlock() return default: close(p.pauseTxc) } p.txMu.Unlock() p.lg.Info( "unpaused transmit listen", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) PauseRx() { p.rxMu.Lock() p.pauseRxc = make(chan struct{}) p.rxMu.Unlock() p.lg.Info( "paused receive listen", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) UnpauseRx() { p.rxMu.Lock() select { case <-p.pauseRxc: // already unpaused case <-p.donec: p.rxMu.Unlock() return default: close(p.pauseRxc) } p.rxMu.Unlock() p.lg.Info( "unpaused receive listen", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) BlackholeTx() { p.txMu.Lock() select { case <-p.blackholeTxc: // already blackholed case <-p.donec: p.txMu.Unlock() return default: close(p.blackholeTxc) } p.txMu.Unlock() p.lg.Info( "blackholed transmit", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UnblackholeTx() { p.txMu.Lock() p.blackholeTxc = make(chan struct{}) p.txMu.Unlock() p.lg.Info( "unblackholed transmit", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) BlackholeRx() { p.rxMu.Lock() select { case <-p.blackholeRxc: // already blackholed case <-p.donec: p.rxMu.Unlock() return default: close(p.blackholeRxc) } p.rxMu.Unlock() p.lg.Info( "blackholed receive", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) UnblackholeRx() { p.rxMu.Lock() p.blackholeRxc = make(chan struct{}) p.rxMu.Unlock() p.lg.Info( "unblackholed receive", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) CorruptTx(f func([]byte) []byte) { p.corruptTxMu.Lock() p.corruptTx = f p.corruptTxMu.Unlock() p.lg.Info( "corrupting transmit", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) UncorruptTx() { p.corruptTxMu.Lock() p.corruptTx = nil p.corruptTxMu.Unlock() p.lg.Info( "stopped corrupting transmit", zap.String("from", p.From()), zap.String("to", p.To()), ) } func (p *proxyServer) CorruptRx(f func([]byte) []byte) { p.corruptRxMu.Lock() p.corruptRx = f p.corruptRxMu.Unlock() p.lg.Info( "corrupting receive", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) UncorruptRx() { p.corruptRxMu.Lock() p.corruptRx = nil p.corruptRxMu.Unlock() p.lg.Info( "stopped corrupting receive", zap.String("from", p.To()), zap.String("to", p.From()), ) } func (p *proxyServer) ResetListener() error { p.listenerMu.Lock() defer p.listenerMu.Unlock() if err := p.listener.Close(); err != nil { // already closed if !strings.HasSuffix(err.Error(), "use of closed network connection") { return err } } var ln net.Listener var err error if !p.tlsInfo.Empty() { ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo) } else { ln, err = net.Listen(p.from.Scheme, p.from.Host) } if err != nil { return err } p.listener = ln p.lg.Info( "reset listener on", zap.String("from", p.From()), ) return nil }