123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Package main is the entry point for the local tester network bridge.
- package main
- import (
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "math/rand"
- "net"
- "sync"
- "time"
- )
- type bridgeConn struct {
- in net.Conn
- out net.Conn
- d dispatcher
- }
- func newBridgeConn(in net.Conn, d dispatcher) (*bridgeConn, error) {
- out, err := net.Dial("tcp", flag.Args()[1])
- if err != nil {
- in.Close()
- return nil, err
- }
- return &bridgeConn{in, out, d}, nil
- }
- func (b *bridgeConn) String() string {
- return fmt.Sprintf("%v <-> %v", b.in.RemoteAddr(), b.out.RemoteAddr())
- }
- func (b *bridgeConn) Close() {
- b.in.Close()
- b.out.Close()
- }
- func bridge(b *bridgeConn) {
- log.Println("bridging", b.String())
- go b.d.Copy(b.out, makeFetch(b.in))
- b.d.Copy(b.in, makeFetch(b.out))
- }
- func delayBridge(b *bridgeConn, txDelay, rxDelay time.Duration) {
- go b.d.Copy(b.out, makeFetchDelay(makeFetch(b.in), txDelay))
- b.d.Copy(b.in, makeFetchDelay(makeFetch(b.out), rxDelay))
- }
- func timeBridge(b *bridgeConn) {
- go func() {
- t := time.Duration(rand.Intn(5)+1) * time.Second
- time.Sleep(t)
- log.Printf("killing connection %s after %v\n", b.String(), t)
- b.Close()
- }()
- bridge(b)
- }
- func blackhole(b *bridgeConn) {
- log.Println("blackholing connection", b.String())
- io.Copy(ioutil.Discard, b.in)
- b.Close()
- }
- func readRemoteOnly(b *bridgeConn) {
- log.Println("one way (<-)", b.String())
- b.d.Copy(b.in, makeFetch(b.out))
- }
- func writeRemoteOnly(b *bridgeConn) {
- log.Println("one way (->)", b.String())
- b.d.Copy(b.out, makeFetch(b.in))
- }
- func corruptReceive(b *bridgeConn) {
- log.Println("corruptReceive", b.String())
- go b.d.Copy(b.in, makeFetchCorrupt(makeFetch(b.out)))
- b.d.Copy(b.out, makeFetch(b.in))
- }
- func corruptSend(b *bridgeConn) {
- log.Println("corruptSend", b.String())
- go b.d.Copy(b.out, makeFetchCorrupt(makeFetch(b.in)))
- b.d.Copy(b.in, makeFetch(b.out))
- }
- func makeFetch(c io.Reader) fetchFunc {
- return func() ([]byte, error) {
- b := make([]byte, 4096)
- n, err := c.Read(b)
- if err != nil {
- return nil, err
- }
- return b[:n], nil
- }
- }
- func makeFetchCorrupt(f func() ([]byte, error)) fetchFunc {
- return func() ([]byte, error) {
- b, err := f()
- if err != nil {
- return nil, err
- }
- // corrupt one byte approximately every 16K
- for i := 0; i < len(b); i++ {
- if rand.Intn(16*1024) == 0 {
- b[i] = b[i] + 1
- }
- }
- return b, nil
- }
- }
- func makeFetchRand(f func() ([]byte, error)) fetchFunc {
- return func() ([]byte, error) {
- if rand.Intn(10) == 0 {
- return nil, fmt.Errorf("fetchRand: done")
- }
- b, err := f()
- if err != nil {
- return nil, err
- }
- return b, nil
- }
- }
- func makeFetchDelay(f fetchFunc, delay time.Duration) fetchFunc {
- return func() ([]byte, error) {
- b, err := f()
- if err != nil {
- return nil, err
- }
- time.Sleep(delay)
- return b, nil
- }
- }
- func randomBlackhole(b *bridgeConn) {
- log.Println("random blackhole: connection", b.String())
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- b.d.Copy(b.in, makeFetchRand(makeFetch(b.out)))
- wg.Done()
- }()
- go func() {
- b.d.Copy(b.out, makeFetchRand(makeFetch(b.in)))
- wg.Done()
- }()
- wg.Wait()
- b.Close()
- }
- type config struct {
- delayAccept bool
- resetListen bool
- connFaultRate float64
- immediateClose bool
- blackhole bool
- timeClose bool
- writeRemoteOnly bool
- readRemoteOnly bool
- randomBlackhole bool
- corruptSend bool
- corruptReceive bool
- reorder bool
- txDelay string
- rxDelay string
- }
- type acceptFaultFunc func()
- type connFaultFunc func(*bridgeConn)
- func main() {
- var cfg config
- flag.BoolVar(&cfg.delayAccept, "delay-accept", false, "delays accepting new connections")
- flag.BoolVar(&cfg.resetListen, "reset-listen", false, "resets the listening port")
- flag.Float64Var(&cfg.connFaultRate, "conn-fault-rate", 0.0, "rate of faulty connections")
- flag.BoolVar(&cfg.immediateClose, "immediate-close", false, "close after accept")
- flag.BoolVar(&cfg.blackhole, "blackhole", false, "reads nothing, writes go nowhere")
- flag.BoolVar(&cfg.timeClose, "time-close", false, "close after random time")
- flag.BoolVar(&cfg.writeRemoteOnly, "write-remote-only", false, "only write, no read")
- flag.BoolVar(&cfg.readRemoteOnly, "read-remote-only", false, "only read, no write")
- flag.BoolVar(&cfg.randomBlackhole, "random-blackhole", false, "blackhole after data xfer")
- flag.BoolVar(&cfg.corruptReceive, "corrupt-receive", false, "corrupt packets received from destination")
- flag.BoolVar(&cfg.corruptSend, "corrupt-send", false, "corrupt packets sent to destination")
- flag.BoolVar(&cfg.reorder, "reorder", false, "reorder packet delivery")
- flag.StringVar(&cfg.txDelay, "tx-delay", "0", "duration to delay client transmission to server")
- flag.StringVar(&cfg.rxDelay, "rx-delay", "0", "duration to delay client receive from server")
- flag.Parse()
- lAddr := flag.Args()[0]
- fwdAddr := flag.Args()[1]
- log.Println("listening on ", lAddr)
- log.Println("forwarding to ", fwdAddr)
- l, err := net.Listen("tcp", lAddr)
- if err != nil {
- log.Fatal(err)
- }
- defer l.Close()
- acceptFaults := []acceptFaultFunc{func() {}}
- if cfg.delayAccept {
- f := func() {
- log.Println("delaying accept")
- time.Sleep(3 * time.Second)
- }
- acceptFaults = append(acceptFaults, f)
- }
- if cfg.resetListen {
- f := func() {
- log.Println("reset listen port")
- l.Close()
- newListener, err := net.Listen("tcp", lAddr)
- if err != nil {
- log.Fatal(err)
- }
- l = newListener
- }
- acceptFaults = append(acceptFaults, f)
- }
- connFaults := []connFaultFunc{func(b *bridgeConn) { bridge(b) }}
- if cfg.immediateClose {
- f := func(b *bridgeConn) {
- log.Printf("terminating connection %s immediately", b.String())
- b.Close()
- }
- connFaults = append(connFaults, f)
- }
- if cfg.blackhole {
- connFaults = append(connFaults, blackhole)
- }
- if cfg.timeClose {
- connFaults = append(connFaults, timeBridge)
- }
- if cfg.writeRemoteOnly {
- connFaults = append(connFaults, writeRemoteOnly)
- }
- if cfg.readRemoteOnly {
- connFaults = append(connFaults, readRemoteOnly)
- }
- if cfg.randomBlackhole {
- connFaults = append(connFaults, randomBlackhole)
- }
- if cfg.corruptSend {
- connFaults = append(connFaults, corruptSend)
- }
- if cfg.corruptReceive {
- connFaults = append(connFaults, corruptReceive)
- }
- txd, txdErr := time.ParseDuration(cfg.txDelay)
- if txdErr != nil {
- log.Fatal(txdErr)
- }
- rxd, rxdErr := time.ParseDuration(cfg.rxDelay)
- if rxdErr != nil {
- log.Fatal(rxdErr)
- }
- if txd != 0 || rxd != 0 {
- f := func(b *bridgeConn) { delayBridge(b, txd, rxd) }
- connFaults = append(connFaults, f)
- }
- if len(connFaults) > 1 && cfg.connFaultRate == 0 {
- log.Fatal("connection faults defined but conn-fault-rate=0")
- }
- var disp dispatcher
- if cfg.reorder {
- disp = newDispatcherPool()
- } else {
- disp = newDispatcherImmediate()
- }
- for {
- acceptFaults[rand.Intn(len(acceptFaults))]()
- conn, err := l.Accept()
- if err != nil {
- log.Fatal(err)
- }
- r := rand.Intn(len(connFaults))
- if rand.Intn(100) >= int(100.0*cfg.connFaultRate) {
- r = 0
- }
- bc, err := newBridgeConn(conn, disp)
- if err != nil {
- log.Printf("oops %v", err)
- continue
- }
- go connFaults[r](bc)
- }
- }
|