bridge.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package main is the entry point for the local tester network bridge.
  15. package main
  16. import (
  17. "flag"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "math/rand"
  23. "net"
  24. "sync"
  25. "time"
  26. )
  27. type bridgeConn struct {
  28. in net.Conn
  29. out net.Conn
  30. d dispatcher
  31. }
  32. func newBridgeConn(in net.Conn, d dispatcher) (*bridgeConn, error) {
  33. out, err := net.Dial("tcp", flag.Args()[1])
  34. if err != nil {
  35. in.Close()
  36. return nil, err
  37. }
  38. return &bridgeConn{in, out, d}, nil
  39. }
  40. func (b *bridgeConn) String() string {
  41. return fmt.Sprintf("%v <-> %v", b.in.RemoteAddr(), b.out.RemoteAddr())
  42. }
  43. func (b *bridgeConn) Close() {
  44. b.in.Close()
  45. b.out.Close()
  46. }
  47. func bridge(b *bridgeConn) {
  48. log.Println("bridging", b.String())
  49. go b.d.Copy(b.out, makeFetch(b.in))
  50. b.d.Copy(b.in, makeFetch(b.out))
  51. }
  52. func delayBridge(b *bridgeConn, txDelay, rxDelay time.Duration) {
  53. go b.d.Copy(b.out, makeFetchDelay(makeFetch(b.in), txDelay))
  54. b.d.Copy(b.in, makeFetchDelay(makeFetch(b.out), rxDelay))
  55. }
  56. func timeBridge(b *bridgeConn) {
  57. go func() {
  58. t := time.Duration(rand.Intn(5)+1) * time.Second
  59. time.Sleep(t)
  60. log.Printf("killing connection %s after %v\n", b.String(), t)
  61. b.Close()
  62. }()
  63. bridge(b)
  64. }
  65. func blackhole(b *bridgeConn) {
  66. log.Println("blackholing connection", b.String())
  67. io.Copy(ioutil.Discard, b.in)
  68. b.Close()
  69. }
  70. func readRemoteOnly(b *bridgeConn) {
  71. log.Println("one way (<-)", b.String())
  72. b.d.Copy(b.in, makeFetch(b.out))
  73. }
  74. func writeRemoteOnly(b *bridgeConn) {
  75. log.Println("one way (->)", b.String())
  76. b.d.Copy(b.out, makeFetch(b.in))
  77. }
  78. func corruptReceive(b *bridgeConn) {
  79. log.Println("corruptReceive", b.String())
  80. go b.d.Copy(b.in, makeFetchCorrupt(makeFetch(b.out)))
  81. b.d.Copy(b.out, makeFetch(b.in))
  82. }
  83. func corruptSend(b *bridgeConn) {
  84. log.Println("corruptSend", b.String())
  85. go b.d.Copy(b.out, makeFetchCorrupt(makeFetch(b.in)))
  86. b.d.Copy(b.in, makeFetch(b.out))
  87. }
  88. func makeFetch(c io.Reader) fetchFunc {
  89. return func() ([]byte, error) {
  90. b := make([]byte, 4096)
  91. n, err := c.Read(b)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return b[:n], nil
  96. }
  97. }
  98. func makeFetchCorrupt(f func() ([]byte, error)) fetchFunc {
  99. return func() ([]byte, error) {
  100. b, err := f()
  101. if err != nil {
  102. return nil, err
  103. }
  104. // corrupt one byte approximately every 16K
  105. for i := 0; i < len(b); i++ {
  106. if rand.Intn(16*1024) == 0 {
  107. b[i] = b[i] + 1
  108. }
  109. }
  110. return b, nil
  111. }
  112. }
  113. func makeFetchRand(f func() ([]byte, error)) fetchFunc {
  114. return func() ([]byte, error) {
  115. if rand.Intn(10) == 0 {
  116. return nil, fmt.Errorf("fetchRand: done")
  117. }
  118. b, err := f()
  119. if err != nil {
  120. return nil, err
  121. }
  122. return b, nil
  123. }
  124. }
  125. func makeFetchDelay(f fetchFunc, delay time.Duration) fetchFunc {
  126. return func() ([]byte, error) {
  127. b, err := f()
  128. if err != nil {
  129. return nil, err
  130. }
  131. time.Sleep(delay)
  132. return b, nil
  133. }
  134. }
  135. func randomBlackhole(b *bridgeConn) {
  136. log.Println("random blackhole: connection", b.String())
  137. var wg sync.WaitGroup
  138. wg.Add(2)
  139. go func() {
  140. b.d.Copy(b.in, makeFetchRand(makeFetch(b.out)))
  141. wg.Done()
  142. }()
  143. go func() {
  144. b.d.Copy(b.out, makeFetchRand(makeFetch(b.in)))
  145. wg.Done()
  146. }()
  147. wg.Wait()
  148. b.Close()
  149. }
  150. type config struct {
  151. delayAccept bool
  152. resetListen bool
  153. connFaultRate float64
  154. immediateClose bool
  155. blackhole bool
  156. timeClose bool
  157. writeRemoteOnly bool
  158. readRemoteOnly bool
  159. randomBlackhole bool
  160. corruptSend bool
  161. corruptReceive bool
  162. reorder bool
  163. txDelay string
  164. rxDelay string
  165. }
  166. type acceptFaultFunc func()
  167. type connFaultFunc func(*bridgeConn)
  168. func main() {
  169. var cfg config
  170. flag.BoolVar(&cfg.delayAccept, "delay-accept", false, "delays accepting new connections")
  171. flag.BoolVar(&cfg.resetListen, "reset-listen", false, "resets the listening port")
  172. flag.Float64Var(&cfg.connFaultRate, "conn-fault-rate", 0.0, "rate of faulty connections")
  173. flag.BoolVar(&cfg.immediateClose, "immediate-close", false, "close after accept")
  174. flag.BoolVar(&cfg.blackhole, "blackhole", false, "reads nothing, writes go nowhere")
  175. flag.BoolVar(&cfg.timeClose, "time-close", false, "close after random time")
  176. flag.BoolVar(&cfg.writeRemoteOnly, "write-remote-only", false, "only write, no read")
  177. flag.BoolVar(&cfg.readRemoteOnly, "read-remote-only", false, "only read, no write")
  178. flag.BoolVar(&cfg.randomBlackhole, "random-blackhole", false, "blackhole after data xfer")
  179. flag.BoolVar(&cfg.corruptReceive, "corrupt-receive", false, "corrupt packets received from destination")
  180. flag.BoolVar(&cfg.corruptSend, "corrupt-send", false, "corrupt packets sent to destination")
  181. flag.BoolVar(&cfg.reorder, "reorder", false, "reorder packet delivery")
  182. flag.StringVar(&cfg.txDelay, "tx-delay", "0", "duration to delay client transmission to server")
  183. flag.StringVar(&cfg.rxDelay, "rx-delay", "0", "duration to delay client receive from server")
  184. flag.Parse()
  185. lAddr := flag.Args()[0]
  186. fwdAddr := flag.Args()[1]
  187. log.Println("listening on ", lAddr)
  188. log.Println("forwarding to ", fwdAddr)
  189. l, err := net.Listen("tcp", lAddr)
  190. if err != nil {
  191. log.Fatal(err)
  192. }
  193. defer l.Close()
  194. acceptFaults := []acceptFaultFunc{func() {}}
  195. if cfg.delayAccept {
  196. f := func() {
  197. log.Println("delaying accept")
  198. time.Sleep(3 * time.Second)
  199. }
  200. acceptFaults = append(acceptFaults, f)
  201. }
  202. if cfg.resetListen {
  203. f := func() {
  204. log.Println("reset listen port")
  205. l.Close()
  206. newListener, err := net.Listen("tcp", lAddr)
  207. if err != nil {
  208. log.Fatal(err)
  209. }
  210. l = newListener
  211. }
  212. acceptFaults = append(acceptFaults, f)
  213. }
  214. connFaults := []connFaultFunc{func(b *bridgeConn) { bridge(b) }}
  215. if cfg.immediateClose {
  216. f := func(b *bridgeConn) {
  217. log.Printf("terminating connection %s immediately", b.String())
  218. b.Close()
  219. }
  220. connFaults = append(connFaults, f)
  221. }
  222. if cfg.blackhole {
  223. connFaults = append(connFaults, blackhole)
  224. }
  225. if cfg.timeClose {
  226. connFaults = append(connFaults, timeBridge)
  227. }
  228. if cfg.writeRemoteOnly {
  229. connFaults = append(connFaults, writeRemoteOnly)
  230. }
  231. if cfg.readRemoteOnly {
  232. connFaults = append(connFaults, readRemoteOnly)
  233. }
  234. if cfg.randomBlackhole {
  235. connFaults = append(connFaults, randomBlackhole)
  236. }
  237. if cfg.corruptSend {
  238. connFaults = append(connFaults, corruptSend)
  239. }
  240. if cfg.corruptReceive {
  241. connFaults = append(connFaults, corruptReceive)
  242. }
  243. txd, txdErr := time.ParseDuration(cfg.txDelay)
  244. if txdErr != nil {
  245. log.Fatal(txdErr)
  246. }
  247. rxd, rxdErr := time.ParseDuration(cfg.rxDelay)
  248. if rxdErr != nil {
  249. log.Fatal(rxdErr)
  250. }
  251. if txd != 0 || rxd != 0 {
  252. f := func(b *bridgeConn) { delayBridge(b, txd, rxd) }
  253. connFaults = append(connFaults, f)
  254. }
  255. if len(connFaults) > 1 && cfg.connFaultRate == 0 {
  256. log.Fatal("connection faults defined but conn-fault-rate=0")
  257. }
  258. var disp dispatcher
  259. if cfg.reorder {
  260. disp = newDispatcherPool()
  261. } else {
  262. disp = newDispatcherImmediate()
  263. }
  264. for {
  265. acceptFaults[rand.Intn(len(acceptFaults))]()
  266. conn, err := l.Accept()
  267. if err != nil {
  268. log.Fatal(err)
  269. }
  270. r := rand.Intn(len(connFaults))
  271. if rand.Intn(100) >= int(100.0*cfg.connFaultRate) {
  272. r = 0
  273. }
  274. bc, err := newBridgeConn(conn, disp)
  275. if err != nil {
  276. log.Printf("oops %v", err)
  277. continue
  278. }
  279. go connFaults[r](bc)
  280. }
  281. }