proxy.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transport
  15. import (
  16. "fmt"
  17. "io"
  18. mrand "math/rand"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "sync"
  24. "time"
  25. humanize "github.com/dustin/go-humanize"
  26. "go.uber.org/zap"
  27. )
  28. // Proxy defines proxy layer that simulates common network faults,
  29. // such as latency spikes, packet drop/corruption, etc..
  30. type Proxy interface {
  31. // From returns proxy source address in "scheme://host:port" format.
  32. From() string
  33. // To returns proxy destination address in "scheme://host:port" format.
  34. To() string
  35. // Ready returns when proxy is ready to serve.
  36. Ready() <-chan struct{}
  37. // Done returns when proxy has been closed.
  38. Done() <-chan struct{}
  39. // Error sends errors while serving proxy.
  40. Error() <-chan error
  41. // Close closes listener and transport.
  42. Close() error
  43. // DelayAccept adds latency ± random variable to accepting new incoming connections.
  44. DelayAccept(latency, rv time.Duration)
  45. // UndelayAccept removes sending latencies.
  46. UndelayAccept()
  47. // LatencyAccept returns current latency on accepting new incoming connections.
  48. LatencyAccept() time.Duration
  49. // DelayTx adds latency ± random variable to "sending" layer.
  50. DelayTx(latency, rv time.Duration)
  51. // UndelayTx removes sending latencies.
  52. UndelayTx()
  53. // LatencyTx returns current send latency.
  54. LatencyTx() time.Duration
  55. // DelayRx adds latency ± random variable to "receiving" layer.
  56. DelayRx(latency, rv time.Duration)
  57. // UndelayRx removes "receiving" latencies.
  58. UndelayRx()
  59. // LatencyRx returns current receive latency.
  60. LatencyRx() time.Duration
  61. // PauseAccept stops accepting new connections.
  62. PauseAccept()
  63. // UnpauseAccept removes pause operation on accepting new connections.
  64. UnpauseAccept()
  65. // PauseTx stops "forwarding" packets.
  66. PauseTx()
  67. // UnpauseTx removes "forwarding" pause operation.
  68. UnpauseTx()
  69. // PauseRx stops "receiving" packets to client.
  70. PauseRx()
  71. // UnpauseRx removes "receiving" pause operation.
  72. UnpauseRx()
  73. // BlackholeTx drops all incoming packets before "forwarding".
  74. BlackholeTx()
  75. // UnblackholeTx removes blackhole operation on "sending".
  76. UnblackholeTx()
  77. // BlackholeRx drops all incoming packets to client.
  78. BlackholeRx()
  79. // UnblackholeRx removes blackhole operation on "receiving".
  80. UnblackholeRx()
  81. // CorruptTx corrupts incoming packets from the listener.
  82. CorruptTx(f func(data []byte) []byte)
  83. // UncorruptTx removes corrupt operation on "forwarding".
  84. UncorruptTx()
  85. // CorruptRx corrupts incoming packets to client.
  86. CorruptRx(f func(data []byte) []byte)
  87. // UncorruptRx removes corrupt operation on "receiving".
  88. UncorruptRx()
  89. // ResetListener closes and restarts listener.
  90. ResetListener() error
  91. }
  92. type proxy struct {
  93. logger *zap.Logger
  94. from, to url.URL
  95. tlsInfo TLSInfo
  96. dialTimeout time.Duration
  97. bufferSize int
  98. retryInterval time.Duration
  99. readyc chan struct{}
  100. donec chan struct{}
  101. errc chan error
  102. closeOnce sync.Once
  103. closeWg sync.WaitGroup
  104. listenerMu sync.RWMutex
  105. listener net.Listener
  106. latencyAcceptMu sync.RWMutex
  107. latencyAccept time.Duration
  108. latencyTxMu sync.RWMutex
  109. latencyTx time.Duration
  110. latencyRxMu sync.RWMutex
  111. latencyRx time.Duration
  112. corruptTxMu sync.RWMutex
  113. corruptTx func(data []byte) []byte
  114. corruptRxMu sync.RWMutex
  115. corruptRx func(data []byte) []byte
  116. acceptMu sync.Mutex
  117. pauseAcceptc chan struct{}
  118. txMu sync.Mutex
  119. pauseTxc chan struct{}
  120. blackholeTxc chan struct{}
  121. rxMu sync.Mutex
  122. pauseRxc chan struct{}
  123. blackholeRxc chan struct{}
  124. }
  125. // ProxyConfig defines proxy configuration.
  126. type ProxyConfig struct {
  127. Logger *zap.Logger
  128. From url.URL
  129. To url.URL
  130. TLSInfo TLSInfo
  131. DialTimeout time.Duration
  132. BufferSize int
  133. RetryInterval time.Duration
  134. }
  135. var (
  136. defaultDialTimeout = 3 * time.Second
  137. defaultBufferSize = 48 * 1024
  138. defaultRetryInterval = 10 * time.Millisecond
  139. defaultLogger *zap.Logger
  140. )
  141. func init() {
  142. var err error
  143. defaultLogger, err = zap.NewProduction()
  144. if err != nil {
  145. panic(err)
  146. }
  147. }
  148. // NewProxy returns a proxy implementation with no iptables/tc dependencies.
  149. // The proxy layer overhead is <1ms.
  150. func NewProxy(cfg ProxyConfig) Proxy {
  151. p := &proxy{
  152. logger: cfg.Logger,
  153. from: cfg.From,
  154. to: cfg.To,
  155. tlsInfo: cfg.TLSInfo,
  156. dialTimeout: cfg.DialTimeout,
  157. bufferSize: cfg.BufferSize,
  158. retryInterval: cfg.RetryInterval,
  159. readyc: make(chan struct{}),
  160. donec: make(chan struct{}),
  161. errc: make(chan error, 16),
  162. pauseAcceptc: make(chan struct{}),
  163. pauseTxc: make(chan struct{}),
  164. blackholeTxc: make(chan struct{}),
  165. pauseRxc: make(chan struct{}),
  166. blackholeRxc: make(chan struct{}),
  167. }
  168. if p.dialTimeout == 0 {
  169. p.dialTimeout = defaultDialTimeout
  170. }
  171. if p.bufferSize == 0 {
  172. p.bufferSize = defaultBufferSize
  173. }
  174. if p.retryInterval == 0 {
  175. p.retryInterval = defaultRetryInterval
  176. }
  177. if p.logger == nil {
  178. p.logger = defaultLogger
  179. }
  180. close(p.pauseAcceptc)
  181. close(p.pauseTxc)
  182. close(p.pauseRxc)
  183. if strings.HasPrefix(p.from.Scheme, "http") {
  184. p.from.Scheme = "tcp"
  185. }
  186. if strings.HasPrefix(p.to.Scheme, "http") {
  187. p.to.Scheme = "tcp"
  188. }
  189. var ln net.Listener
  190. var err error
  191. if !p.tlsInfo.Empty() {
  192. ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  193. } else {
  194. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  195. }
  196. if err != nil {
  197. p.errc <- err
  198. p.Close()
  199. return p
  200. }
  201. p.listener = ln
  202. p.closeWg.Add(1)
  203. go p.listenAndServe()
  204. p.logger.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
  205. return p
  206. }
  207. func (p *proxy) From() string {
  208. return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
  209. }
  210. func (p *proxy) To() string {
  211. return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
  212. }
  213. // TODO: implement packet reordering from multiple TCP connections
  214. // buffer packets per connection for awhile, reorder before transmit
  215. // - https://github.com/coreos/etcd/issues/5614
  216. // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
  217. func (p *proxy) listenAndServe() {
  218. defer p.closeWg.Done()
  219. p.logger.Info("proxy is listening on", zap.String("from", p.From()))
  220. close(p.readyc)
  221. for {
  222. p.acceptMu.Lock()
  223. pausec := p.pauseAcceptc
  224. p.acceptMu.Unlock()
  225. select {
  226. case <-pausec:
  227. case <-p.donec:
  228. return
  229. }
  230. p.latencyAcceptMu.RLock()
  231. lat := p.latencyAccept
  232. p.latencyAcceptMu.RUnlock()
  233. if lat > 0 {
  234. select {
  235. case <-time.After(lat):
  236. case <-p.donec:
  237. return
  238. }
  239. }
  240. p.listenerMu.RLock()
  241. ln := p.listener
  242. p.listenerMu.RUnlock()
  243. in, err := ln.Accept()
  244. if err != nil {
  245. select {
  246. case p.errc <- err:
  247. select {
  248. case <-p.donec:
  249. return
  250. default:
  251. }
  252. case <-p.donec:
  253. return
  254. }
  255. p.logger.Debug("listener accept error", zap.Error(err))
  256. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  257. select {
  258. case <-time.After(p.retryInterval):
  259. case <-p.donec:
  260. return
  261. }
  262. p.logger.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
  263. if err = p.ResetListener(); err != nil {
  264. select {
  265. case p.errc <- err:
  266. select {
  267. case <-p.donec:
  268. return
  269. default:
  270. }
  271. case <-p.donec:
  272. return
  273. }
  274. p.logger.Warn("failed to reset listener", zap.Error(err))
  275. }
  276. }
  277. continue
  278. }
  279. var out net.Conn
  280. if !p.tlsInfo.Empty() {
  281. var tp *http.Transport
  282. tp, err = NewTransport(p.tlsInfo, p.dialTimeout)
  283. if err != nil {
  284. select {
  285. case p.errc <- err:
  286. select {
  287. case <-p.donec:
  288. return
  289. default:
  290. }
  291. case <-p.donec:
  292. return
  293. }
  294. continue
  295. }
  296. out, err = tp.Dial(p.to.Scheme, p.to.Host)
  297. } else {
  298. out, err = net.Dial(p.to.Scheme, p.to.Host)
  299. }
  300. if err != nil {
  301. select {
  302. case p.errc <- err:
  303. select {
  304. case <-p.donec:
  305. return
  306. default:
  307. }
  308. case <-p.donec:
  309. return
  310. }
  311. p.logger.Debug("failed to dial", zap.Error(err))
  312. continue
  313. }
  314. go func() {
  315. // read incoming bytes from listener, dispatch to outgoing connection
  316. p.transmit(out, in)
  317. out.Close()
  318. in.Close()
  319. }()
  320. go func() {
  321. // read response from outgoing connection, write back to listener
  322. p.receive(in, out)
  323. in.Close()
  324. out.Close()
  325. }()
  326. }
  327. }
  328. func (p *proxy) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
  329. func (p *proxy) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
  330. func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
  331. buf := make([]byte, p.bufferSize)
  332. for {
  333. nr, err := src.Read(buf)
  334. if err != nil {
  335. if err == io.EOF {
  336. return
  337. }
  338. // connection already closed
  339. if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
  340. return
  341. }
  342. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  343. return
  344. }
  345. select {
  346. case p.errc <- err:
  347. select {
  348. case <-p.donec:
  349. return
  350. default:
  351. }
  352. case <-p.donec:
  353. return
  354. }
  355. p.logger.Debug("failed to read", zap.Error(err))
  356. return
  357. }
  358. if nr == 0 {
  359. return
  360. }
  361. data := buf[:nr]
  362. var pausec chan struct{}
  363. var blackholec chan struct{}
  364. if proxySend {
  365. p.txMu.Lock()
  366. pausec = p.pauseTxc
  367. blackholec = p.blackholeTxc
  368. p.txMu.Unlock()
  369. } else {
  370. p.rxMu.Lock()
  371. pausec = p.pauseRxc
  372. blackholec = p.blackholeRxc
  373. p.rxMu.Unlock()
  374. }
  375. select {
  376. case <-pausec:
  377. case <-p.donec:
  378. return
  379. }
  380. blackholed := false
  381. select {
  382. case <-blackholec:
  383. blackholed = true
  384. case <-p.donec:
  385. return
  386. default:
  387. }
  388. if blackholed {
  389. if proxySend {
  390. p.logger.Debug(
  391. "dropped",
  392. zap.String("data-size", humanize.Bytes(uint64(nr))),
  393. zap.String("from", p.From()),
  394. zap.String("to", p.To()),
  395. )
  396. } else {
  397. p.logger.Debug(
  398. "dropped",
  399. zap.String("data-size", humanize.Bytes(uint64(nr))),
  400. zap.String("from", p.To()),
  401. zap.String("to", p.From()),
  402. )
  403. }
  404. continue
  405. }
  406. var lat time.Duration
  407. if proxySend {
  408. p.latencyTxMu.RLock()
  409. lat = p.latencyTx
  410. p.latencyTxMu.RUnlock()
  411. } else {
  412. p.latencyRxMu.RLock()
  413. lat = p.latencyRx
  414. p.latencyRxMu.RUnlock()
  415. }
  416. if lat > 0 {
  417. select {
  418. case <-time.After(lat):
  419. case <-p.donec:
  420. return
  421. }
  422. }
  423. if proxySend {
  424. p.corruptTxMu.RLock()
  425. if p.corruptTx != nil {
  426. data = p.corruptTx(data)
  427. }
  428. p.corruptTxMu.RUnlock()
  429. } else {
  430. p.corruptRxMu.RLock()
  431. if p.corruptRx != nil {
  432. data = p.corruptRx(data)
  433. }
  434. p.corruptRxMu.RUnlock()
  435. }
  436. var nw int
  437. nw, err = dst.Write(data)
  438. if err != nil {
  439. if err == io.EOF {
  440. return
  441. }
  442. select {
  443. case p.errc <- err:
  444. select {
  445. case <-p.donec:
  446. return
  447. default:
  448. }
  449. case <-p.donec:
  450. return
  451. }
  452. if proxySend {
  453. p.logger.Debug("failed to write while sending", zap.Error(err))
  454. } else {
  455. p.logger.Debug("failed to write while receiving", zap.Error(err))
  456. }
  457. return
  458. }
  459. if nr != nw {
  460. select {
  461. case p.errc <- io.ErrShortWrite:
  462. select {
  463. case <-p.donec:
  464. return
  465. default:
  466. }
  467. case <-p.donec:
  468. return
  469. }
  470. if proxySend {
  471. p.logger.Debug(
  472. "failed to write while sending; read/write bytes are different",
  473. zap.Int("read-bytes", nr),
  474. zap.Int("write-bytes", nw),
  475. zap.Error(io.ErrShortWrite),
  476. )
  477. } else {
  478. p.logger.Debug(
  479. "failed to write while receiving; read/write bytes are different",
  480. zap.Int("read-bytes", nr),
  481. zap.Int("write-bytes", nw),
  482. zap.Error(io.ErrShortWrite),
  483. )
  484. }
  485. return
  486. }
  487. if proxySend {
  488. p.logger.Debug(
  489. "transmitted",
  490. zap.String("data-size", humanize.Bytes(uint64(nr))),
  491. zap.String("from", p.From()),
  492. zap.String("to", p.To()),
  493. )
  494. } else {
  495. p.logger.Debug(
  496. "received",
  497. zap.String("data-size", humanize.Bytes(uint64(nr))),
  498. zap.String("from", p.To()),
  499. zap.String("to", p.From()),
  500. )
  501. }
  502. }
  503. }
  504. func (p *proxy) Ready() <-chan struct{} { return p.readyc }
  505. func (p *proxy) Done() <-chan struct{} { return p.donec }
  506. func (p *proxy) Error() <-chan error { return p.errc }
  507. func (p *proxy) Close() (err error) {
  508. p.closeOnce.Do(func() {
  509. close(p.donec)
  510. p.listenerMu.Lock()
  511. if p.listener != nil {
  512. err = p.listener.Close()
  513. p.logger.Info(
  514. "closed proxy listener",
  515. zap.String("from", p.From()),
  516. zap.String("to", p.To()),
  517. )
  518. }
  519. p.logger.Sync()
  520. p.listenerMu.Unlock()
  521. })
  522. p.closeWg.Wait()
  523. return err
  524. }
  525. func (p *proxy) DelayAccept(latency, rv time.Duration) {
  526. if latency <= 0 {
  527. return
  528. }
  529. d := computeLatency(latency, rv)
  530. p.latencyAcceptMu.Lock()
  531. p.latencyAccept = d
  532. p.latencyAcceptMu.Unlock()
  533. p.logger.Info(
  534. "set accept latency",
  535. zap.Duration("latency", d),
  536. zap.Duration("given-latency", latency),
  537. zap.Duration("given-latency-random-variable", rv),
  538. zap.String("from", p.From()),
  539. zap.String("to", p.To()),
  540. )
  541. }
  542. func (p *proxy) UndelayAccept() {
  543. p.latencyAcceptMu.Lock()
  544. d := p.latencyAccept
  545. p.latencyAccept = 0
  546. p.latencyAcceptMu.Unlock()
  547. p.logger.Info(
  548. "removed accept latency",
  549. zap.Duration("latency", d),
  550. zap.String("from", p.From()),
  551. zap.String("to", p.To()),
  552. )
  553. }
  554. func (p *proxy) LatencyAccept() time.Duration {
  555. p.latencyAcceptMu.RLock()
  556. d := p.latencyAccept
  557. p.latencyAcceptMu.RUnlock()
  558. return d
  559. }
  560. func (p *proxy) DelayTx(latency, rv time.Duration) {
  561. if latency <= 0 {
  562. return
  563. }
  564. d := computeLatency(latency, rv)
  565. p.latencyTxMu.Lock()
  566. p.latencyTx = d
  567. p.latencyTxMu.Unlock()
  568. p.logger.Info(
  569. "set transmit latency",
  570. zap.Duration("latency", d),
  571. zap.Duration("given-latency", latency),
  572. zap.Duration("given-latency-random-variable", rv),
  573. zap.String("from", p.From()),
  574. zap.String("to", p.To()),
  575. )
  576. }
  577. func (p *proxy) UndelayTx() {
  578. p.latencyTxMu.Lock()
  579. d := p.latencyTx
  580. p.latencyTx = 0
  581. p.latencyTxMu.Unlock()
  582. p.logger.Info(
  583. "removed transmit latency",
  584. zap.Duration("latency", d),
  585. zap.String("from", p.From()),
  586. zap.String("to", p.To()),
  587. )
  588. }
  589. func (p *proxy) LatencyTx() time.Duration {
  590. p.latencyTxMu.RLock()
  591. d := p.latencyTx
  592. p.latencyTxMu.RUnlock()
  593. return d
  594. }
  595. func (p *proxy) DelayRx(latency, rv time.Duration) {
  596. if latency <= 0 {
  597. return
  598. }
  599. d := computeLatency(latency, rv)
  600. p.latencyRxMu.Lock()
  601. p.latencyRx = d
  602. p.latencyRxMu.Unlock()
  603. p.logger.Info(
  604. "set receive latency",
  605. zap.Duration("latency", d),
  606. zap.Duration("given-latency", latency),
  607. zap.Duration("given-latency-random-variable", rv),
  608. zap.String("from", p.To()),
  609. zap.String("to", p.From()),
  610. )
  611. }
  612. func (p *proxy) UndelayRx() {
  613. p.latencyRxMu.Lock()
  614. d := p.latencyRx
  615. p.latencyRx = 0
  616. p.latencyRxMu.Unlock()
  617. p.logger.Info(
  618. "removed receive latency",
  619. zap.Duration("latency", d),
  620. zap.String("from", p.To()),
  621. zap.String("to", p.From()),
  622. )
  623. }
  624. func (p *proxy) LatencyRx() time.Duration {
  625. p.latencyRxMu.RLock()
  626. d := p.latencyRx
  627. p.latencyRxMu.RUnlock()
  628. return d
  629. }
  630. func computeLatency(lat, rv time.Duration) time.Duration {
  631. if rv == 0 {
  632. return lat
  633. }
  634. if rv < 0 {
  635. rv *= -1
  636. }
  637. if rv > lat {
  638. rv = lat / 10
  639. }
  640. now := time.Now()
  641. mrand.Seed(int64(now.Nanosecond()))
  642. sign := 1
  643. if now.Second()%2 == 0 {
  644. sign = -1
  645. }
  646. return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
  647. }
  648. func (p *proxy) PauseAccept() {
  649. p.acceptMu.Lock()
  650. p.pauseAcceptc = make(chan struct{})
  651. p.acceptMu.Unlock()
  652. p.logger.Info(
  653. "paused accepting new connections",
  654. zap.String("from", p.From()),
  655. zap.String("to", p.To()),
  656. )
  657. }
  658. func (p *proxy) UnpauseAccept() {
  659. p.acceptMu.Lock()
  660. select {
  661. case <-p.pauseAcceptc: // already unpaused
  662. case <-p.donec:
  663. p.acceptMu.Unlock()
  664. return
  665. default:
  666. close(p.pauseAcceptc)
  667. }
  668. p.acceptMu.Unlock()
  669. p.logger.Info(
  670. "unpaused accepting new connections",
  671. zap.String("from", p.From()),
  672. zap.String("to", p.To()),
  673. )
  674. }
  675. func (p *proxy) PauseTx() {
  676. p.txMu.Lock()
  677. p.pauseTxc = make(chan struct{})
  678. p.txMu.Unlock()
  679. p.logger.Info(
  680. "paused transmit listen",
  681. zap.String("from", p.From()),
  682. zap.String("to", p.To()),
  683. )
  684. }
  685. func (p *proxy) UnpauseTx() {
  686. p.txMu.Lock()
  687. select {
  688. case <-p.pauseTxc: // already unpaused
  689. case <-p.donec:
  690. p.txMu.Unlock()
  691. return
  692. default:
  693. close(p.pauseTxc)
  694. }
  695. p.txMu.Unlock()
  696. p.logger.Info(
  697. "unpaused transmit listen",
  698. zap.String("from", p.From()),
  699. zap.String("to", p.To()),
  700. )
  701. }
  702. func (p *proxy) PauseRx() {
  703. p.rxMu.Lock()
  704. p.pauseRxc = make(chan struct{})
  705. p.rxMu.Unlock()
  706. p.logger.Info(
  707. "paused receive listen",
  708. zap.String("from", p.To()),
  709. zap.String("to", p.From()),
  710. )
  711. }
  712. func (p *proxy) UnpauseRx() {
  713. p.rxMu.Lock()
  714. select {
  715. case <-p.pauseRxc: // already unpaused
  716. case <-p.donec:
  717. p.rxMu.Unlock()
  718. return
  719. default:
  720. close(p.pauseRxc)
  721. }
  722. p.rxMu.Unlock()
  723. p.logger.Info(
  724. "unpaused receive listen",
  725. zap.String("from", p.To()),
  726. zap.String("to", p.From()),
  727. )
  728. }
  729. func (p *proxy) BlackholeTx() {
  730. p.txMu.Lock()
  731. select {
  732. case <-p.blackholeTxc: // already blackholed
  733. case <-p.donec:
  734. p.txMu.Unlock()
  735. return
  736. default:
  737. close(p.blackholeTxc)
  738. }
  739. p.txMu.Unlock()
  740. p.logger.Info(
  741. "blackholed transmit",
  742. zap.String("from", p.From()),
  743. zap.String("to", p.To()),
  744. )
  745. }
  746. func (p *proxy) UnblackholeTx() {
  747. p.txMu.Lock()
  748. p.blackholeTxc = make(chan struct{})
  749. p.txMu.Unlock()
  750. p.logger.Info(
  751. "unblackholed transmit",
  752. zap.String("from", p.From()),
  753. zap.String("to", p.To()),
  754. )
  755. }
  756. func (p *proxy) BlackholeRx() {
  757. p.rxMu.Lock()
  758. select {
  759. case <-p.blackholeRxc: // already blackholed
  760. case <-p.donec:
  761. p.rxMu.Unlock()
  762. return
  763. default:
  764. close(p.blackholeRxc)
  765. }
  766. p.rxMu.Unlock()
  767. p.logger.Info(
  768. "blackholed receive",
  769. zap.String("from", p.To()),
  770. zap.String("to", p.From()),
  771. )
  772. }
  773. func (p *proxy) UnblackholeRx() {
  774. p.rxMu.Lock()
  775. p.blackholeRxc = make(chan struct{})
  776. p.rxMu.Unlock()
  777. p.logger.Info(
  778. "unblackholed receive",
  779. zap.String("from", p.To()),
  780. zap.String("to", p.From()),
  781. )
  782. }
  783. func (p *proxy) CorruptTx(f func([]byte) []byte) {
  784. p.corruptTxMu.Lock()
  785. p.corruptTx = f
  786. p.corruptTxMu.Unlock()
  787. p.logger.Info(
  788. "corrupting transmit",
  789. zap.String("from", p.From()),
  790. zap.String("to", p.To()),
  791. )
  792. }
  793. func (p *proxy) UncorruptTx() {
  794. p.corruptTxMu.Lock()
  795. p.corruptTx = nil
  796. p.corruptTxMu.Unlock()
  797. p.logger.Info(
  798. "stopped corrupting transmit",
  799. zap.String("from", p.From()),
  800. zap.String("to", p.To()),
  801. )
  802. }
  803. func (p *proxy) CorruptRx(f func([]byte) []byte) {
  804. p.corruptRxMu.Lock()
  805. p.corruptRx = f
  806. p.corruptRxMu.Unlock()
  807. p.logger.Info(
  808. "corrupting receive",
  809. zap.String("from", p.To()),
  810. zap.String("to", p.From()),
  811. )
  812. }
  813. func (p *proxy) UncorruptRx() {
  814. p.corruptRxMu.Lock()
  815. p.corruptRx = nil
  816. p.corruptRxMu.Unlock()
  817. p.logger.Info(
  818. "stopped corrupting receive",
  819. zap.String("from", p.To()),
  820. zap.String("to", p.From()),
  821. )
  822. }
  823. func (p *proxy) ResetListener() error {
  824. p.listenerMu.Lock()
  825. defer p.listenerMu.Unlock()
  826. if err := p.listener.Close(); err != nil {
  827. // already closed
  828. if !strings.HasSuffix(err.Error(), "use of closed network connection") {
  829. return err
  830. }
  831. }
  832. var ln net.Listener
  833. var err error
  834. if !p.tlsInfo.Empty() {
  835. ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  836. } else {
  837. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  838. }
  839. if err != nil {
  840. return err
  841. }
  842. p.listener = ln
  843. p.logger.Info(
  844. "reset listener on",
  845. zap.String("from", p.From()),
  846. )
  847. return nil
  848. }