bridge.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package main is the entry point for the local tester network bridge.
  15. package main
  16. import (
  17. "flag"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "math/rand"
  23. "net"
  24. "sync"
  25. "time"
  26. )
  27. type bridgeConn struct {
  28. in net.Conn
  29. out net.Conn
  30. d dispatcher
  31. }
  32. func newBridgeConn(in net.Conn, d dispatcher) (*bridgeConn, error) {
  33. out, err := net.Dial("tcp", flag.Args()[1])
  34. if err != nil {
  35. in.Close()
  36. return nil, err
  37. }
  38. return &bridgeConn{in, out, d}, nil
  39. }
  40. func (b *bridgeConn) String() string {
  41. return fmt.Sprintf("%v <-> %v", b.in.RemoteAddr(), b.out.RemoteAddr())
  42. }
  43. func (b *bridgeConn) Close() {
  44. b.in.Close()
  45. b.out.Close()
  46. }
  47. func bridge(b *bridgeConn) {
  48. log.Println("bridging", b.String())
  49. go b.d.Copy(b.out, makeFetch(b.in))
  50. b.d.Copy(b.in, makeFetch(b.out))
  51. }
  52. func timeBridge(b *bridgeConn) {
  53. go func() {
  54. t := time.Duration(rand.Intn(5)+1) * time.Second
  55. time.Sleep(t)
  56. log.Printf("killing connection %s after %v\n", b.String(), t)
  57. b.Close()
  58. }()
  59. bridge(b)
  60. }
  61. func blackhole(b *bridgeConn) {
  62. log.Println("blackholing connection", b.String())
  63. io.Copy(ioutil.Discard, b.in)
  64. b.Close()
  65. }
  66. func readRemoteOnly(b *bridgeConn) {
  67. log.Println("one way (<-)", b.String())
  68. b.d.Copy(b.in, makeFetch(b.out))
  69. }
  70. func writeRemoteOnly(b *bridgeConn) {
  71. log.Println("one way (->)", b.String())
  72. b.d.Copy(b.out, makeFetch(b.in))
  73. }
  74. func corruptReceive(b *bridgeConn) {
  75. log.Println("corruptReceive", b.String())
  76. go b.d.Copy(b.in, makeFetchCorrupt(makeFetch(b.out)))
  77. b.d.Copy(b.out, makeFetch(b.in))
  78. }
  79. func corruptSend(b *bridgeConn) {
  80. log.Println("corruptSend", b.String())
  81. go b.d.Copy(b.out, makeFetchCorrupt(makeFetch(b.in)))
  82. b.d.Copy(b.in, makeFetch(b.out))
  83. }
  84. func makeFetch(c io.Reader) fetchFunc {
  85. return func() ([]byte, error) {
  86. b := make([]byte, 4096)
  87. n, err := c.Read(b)
  88. if err != nil {
  89. return nil, err
  90. }
  91. return b[:n], nil
  92. }
  93. }
  94. func makeFetchCorrupt(f func() ([]byte, error)) fetchFunc {
  95. return func() ([]byte, error) {
  96. b, err := f()
  97. if err != nil {
  98. return nil, err
  99. }
  100. // corrupt one byte approximately every 16K
  101. for i := 0; i < len(b); i++ {
  102. if rand.Intn(16*1024) == 0 {
  103. b[i] = b[i] + 1
  104. }
  105. }
  106. return b, nil
  107. }
  108. }
  109. func makeFetchRand(f func() ([]byte, error)) fetchFunc {
  110. return func() ([]byte, error) {
  111. if rand.Intn(10) == 0 {
  112. return nil, fmt.Errorf("fetchRand: done")
  113. }
  114. b, err := f()
  115. if err != nil {
  116. return nil, err
  117. }
  118. return b, nil
  119. }
  120. }
  121. func randomBlackhole(b *bridgeConn) {
  122. log.Println("random blackhole: connection", b.String())
  123. var wg sync.WaitGroup
  124. wg.Add(2)
  125. go func() {
  126. b.d.Copy(b.in, makeFetchRand(makeFetch(b.out)))
  127. wg.Done()
  128. }()
  129. go func() {
  130. b.d.Copy(b.out, makeFetchRand(makeFetch(b.in)))
  131. wg.Done()
  132. }()
  133. wg.Wait()
  134. b.Close()
  135. }
  136. type config struct {
  137. delayAccept bool
  138. resetListen bool
  139. connFaultRate float64
  140. immediateClose bool
  141. blackhole bool
  142. timeClose bool
  143. writeRemoteOnly bool
  144. readRemoteOnly bool
  145. randomBlackhole bool
  146. corruptSend bool
  147. corruptReceive bool
  148. reorder bool
  149. }
  150. type acceptFaultFunc func()
  151. type connFaultFunc func(*bridgeConn)
  152. func main() {
  153. var cfg config
  154. flag.BoolVar(&cfg.delayAccept, "delay-accept", true, "delays accepting new connections")
  155. flag.BoolVar(&cfg.resetListen, "reset-listen", true, "resets the listening port")
  156. flag.Float64Var(&cfg.connFaultRate, "conn-fault-rate", 0.25, "rate of faulty connections")
  157. flag.BoolVar(&cfg.immediateClose, "immediate-close", true, "close after accept")
  158. flag.BoolVar(&cfg.blackhole, "blackhole", true, "reads nothing, writes go nowhere")
  159. flag.BoolVar(&cfg.timeClose, "time-close", true, "close after random time")
  160. flag.BoolVar(&cfg.writeRemoteOnly, "write-remote-only", true, "only write, no read")
  161. flag.BoolVar(&cfg.readRemoteOnly, "read-remote-only", true, "only read, no write")
  162. flag.BoolVar(&cfg.randomBlackhole, "random-blackhole", true, "blackhole after data xfer")
  163. flag.BoolVar(&cfg.corruptReceive, "corrupt-receive", true, "corrupt packets received from destination")
  164. flag.BoolVar(&cfg.corruptSend, "corrupt-send", true, "corrupt packets sent to destination")
  165. flag.BoolVar(&cfg.reorder, "reorder", true, "reorder packet delivery")
  166. flag.Parse()
  167. lAddr := flag.Args()[0]
  168. fwdAddr := flag.Args()[1]
  169. log.Println("listening on ", lAddr)
  170. log.Println("forwarding to ", fwdAddr)
  171. l, err := net.Listen("tcp", lAddr)
  172. if err != nil {
  173. log.Fatal(err)
  174. }
  175. defer l.Close()
  176. acceptFaults := []acceptFaultFunc{func() {}}
  177. if cfg.delayAccept {
  178. f := func() {
  179. log.Println("delaying accept")
  180. time.Sleep(3 * time.Second)
  181. }
  182. acceptFaults = append(acceptFaults, f)
  183. }
  184. if cfg.resetListen {
  185. f := func() {
  186. log.Println("reset listen port")
  187. l.Close()
  188. newListener, err := net.Listen("tcp", lAddr)
  189. if err != nil {
  190. log.Fatal(err)
  191. }
  192. l = newListener
  193. }
  194. acceptFaults = append(acceptFaults, f)
  195. }
  196. connFaults := []connFaultFunc{func(b *bridgeConn) { bridge(b) }}
  197. if cfg.immediateClose {
  198. f := func(b *bridgeConn) {
  199. log.Printf("terminating connection %s immediately", b.String())
  200. b.Close()
  201. }
  202. connFaults = append(connFaults, f)
  203. }
  204. if cfg.blackhole {
  205. connFaults = append(connFaults, blackhole)
  206. }
  207. if cfg.timeClose {
  208. connFaults = append(connFaults, timeBridge)
  209. }
  210. if cfg.writeRemoteOnly {
  211. connFaults = append(connFaults, writeRemoteOnly)
  212. }
  213. if cfg.readRemoteOnly {
  214. connFaults = append(connFaults, readRemoteOnly)
  215. }
  216. if cfg.randomBlackhole {
  217. connFaults = append(connFaults, randomBlackhole)
  218. }
  219. if cfg.corruptSend {
  220. connFaults = append(connFaults, corruptSend)
  221. }
  222. if cfg.corruptReceive {
  223. connFaults = append(connFaults, corruptReceive)
  224. }
  225. var disp dispatcher
  226. if cfg.reorder {
  227. disp = newDispatcherPool()
  228. } else {
  229. disp = newDispatcherImmediate()
  230. }
  231. for {
  232. acceptFaults[rand.Intn(len(acceptFaults))]()
  233. conn, err := l.Accept()
  234. if err != nil {
  235. log.Fatal(err)
  236. }
  237. r := rand.Intn(len(connFaults))
  238. if rand.Intn(100) > int(100.0*cfg.connFaultRate) {
  239. r = 0
  240. }
  241. bc, err := newBridgeConn(conn, disp)
  242. if err != nil {
  243. log.Printf("oops %v", err)
  244. continue
  245. }
  246. go connFaults[r](bc)
  247. }
  248. }