proxy.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transport
  15. import (
  16. "fmt"
  17. "io"
  18. mrand "math/rand"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "strings"
  24. "sync"
  25. "time"
  26. humanize "github.com/dustin/go-humanize"
  27. "google.golang.org/grpc/grpclog"
  28. )
  29. // Proxy defines proxy layer that simulates common network faults,
  30. // such as latency spikes, packet drop/corruption, etc..
  31. type Proxy interface {
  32. // From returns proxy source address in "scheme://host:port" format.
  33. From() string
  34. // To returns proxy destination address in "scheme://host:port" format.
  35. To() string
  36. // Ready returns when proxy is ready to serve.
  37. Ready() <-chan struct{}
  38. // Done returns when proxy has been closed.
  39. Done() <-chan struct{}
  40. // Error sends errors while serving proxy.
  41. Error() <-chan error
  42. // Close closes listener and transport.
  43. Close() error
  44. // DelayAccept adds latency ± random variable to accepting new incoming connections.
  45. DelayAccept(latency, rv time.Duration)
  46. // UndelayAccept removes sending latencies.
  47. UndelayAccept()
  48. // LatencyAccept returns current latency on accepting new incoming connections.
  49. LatencyAccept() time.Duration
  50. // DelayTx adds latency ± random variable to "sending" layer.
  51. DelayTx(latency, rv time.Duration)
  52. // UndelayTx removes sending latencies.
  53. UndelayTx()
  54. // LatencyTx returns current send latency.
  55. LatencyTx() time.Duration
  56. // DelayRx adds latency ± random variable to "receiving" layer.
  57. DelayRx(latency, rv time.Duration)
  58. // UndelayRx removes "receiving" latencies.
  59. UndelayRx()
  60. // LatencyRx returns current receive latency.
  61. LatencyRx() time.Duration
  62. // PauseAccept stops accepting new connections.
  63. PauseAccept()
  64. // UnpauseAccept removes pause operation on accepting new connections.
  65. UnpauseAccept()
  66. // PauseTx stops "forwarding" packets.
  67. PauseTx()
  68. // UnpauseTx removes "forwarding" pause operation.
  69. UnpauseTx()
  70. // PauseRx stops "receiving" packets to client.
  71. PauseRx()
  72. // UnpauseRx removes "receiving" pause operation.
  73. UnpauseRx()
  74. // BlackholeTx drops all incoming packets before "forwarding".
  75. BlackholeTx()
  76. // UnblackholeTx removes blackhole operation on "sending".
  77. UnblackholeTx()
  78. // BlackholeRx drops all incoming packets to client.
  79. BlackholeRx()
  80. // UnblackholeRx removes blackhole operation on "receiving".
  81. UnblackholeRx()
  82. // CorruptTx corrupts incoming packets from the listener.
  83. CorruptTx(f func(data []byte) []byte)
  84. // UncorruptTx removes corrupt operation on "forwarding".
  85. UncorruptTx()
  86. // CorruptRx corrupts incoming packets to client.
  87. CorruptRx(f func(data []byte) []byte)
  88. // UncorruptRx removes corrupt operation on "receiving".
  89. UncorruptRx()
  90. // ResetListener closes and restarts listener.
  91. ResetListener() error
  92. }
  93. type proxy struct {
  94. from, to url.URL
  95. tlsInfo TLSInfo
  96. dialTimeout time.Duration
  97. bufferSize int
  98. retryInterval time.Duration
  99. logger grpclog.LoggerV2
  100. readyc chan struct{}
  101. donec chan struct{}
  102. errc chan error
  103. closeOnce sync.Once
  104. closeWg sync.WaitGroup
  105. listenerMu sync.RWMutex
  106. listener net.Listener
  107. latencyAcceptMu sync.RWMutex
  108. latencyAccept time.Duration
  109. latencyTxMu sync.RWMutex
  110. latencyTx time.Duration
  111. latencyRxMu sync.RWMutex
  112. latencyRx time.Duration
  113. corruptTxMu sync.RWMutex
  114. corruptTx func(data []byte) []byte
  115. corruptRxMu sync.RWMutex
  116. corruptRx func(data []byte) []byte
  117. acceptMu sync.Mutex
  118. pauseAcceptc chan struct{}
  119. txMu sync.Mutex
  120. pauseTxc chan struct{}
  121. blackholeTxc chan struct{}
  122. rxMu sync.Mutex
  123. pauseRxc chan struct{}
  124. blackholeRxc chan struct{}
  125. }
  126. // ProxyConfig defines proxy configuration.
  127. type ProxyConfig struct {
  128. From url.URL
  129. To url.URL
  130. TLSInfo TLSInfo
  131. DialTimeout time.Duration
  132. BufferSize int
  133. RetryInterval time.Duration
  134. Logger grpclog.LoggerV2
  135. }
  136. var (
  137. defaultDialTimeout = 3 * time.Second
  138. defaultBufferSize = 48 * 1024
  139. defaultRetryInterval = 10 * time.Millisecond
  140. defaultLogger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 0)
  141. )
  142. // NewProxy returns a proxy implementation with no iptables/tc dependencies.
  143. // The proxy layer overhead is <1ms.
  144. func NewProxy(cfg ProxyConfig) Proxy {
  145. p := &proxy{
  146. from: cfg.From,
  147. to: cfg.To,
  148. tlsInfo: cfg.TLSInfo,
  149. dialTimeout: cfg.DialTimeout,
  150. bufferSize: cfg.BufferSize,
  151. retryInterval: cfg.RetryInterval,
  152. logger: cfg.Logger,
  153. readyc: make(chan struct{}),
  154. donec: make(chan struct{}),
  155. errc: make(chan error, 16),
  156. pauseAcceptc: make(chan struct{}),
  157. pauseTxc: make(chan struct{}),
  158. blackholeTxc: make(chan struct{}),
  159. pauseRxc: make(chan struct{}),
  160. blackholeRxc: make(chan struct{}),
  161. }
  162. if p.dialTimeout == 0 {
  163. p.dialTimeout = defaultDialTimeout
  164. }
  165. if p.bufferSize == 0 {
  166. p.bufferSize = defaultBufferSize
  167. }
  168. if p.retryInterval == 0 {
  169. p.retryInterval = defaultRetryInterval
  170. }
  171. if p.logger == nil {
  172. p.logger = defaultLogger
  173. }
  174. close(p.pauseAcceptc)
  175. close(p.pauseTxc)
  176. close(p.pauseRxc)
  177. if strings.HasPrefix(p.from.Scheme, "http") {
  178. p.from.Scheme = "tcp"
  179. }
  180. if strings.HasPrefix(p.to.Scheme, "http") {
  181. p.to.Scheme = "tcp"
  182. }
  183. var ln net.Listener
  184. var err error
  185. if !p.tlsInfo.Empty() {
  186. ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  187. } else {
  188. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  189. }
  190. if err != nil {
  191. p.errc <- err
  192. p.Close()
  193. return p
  194. }
  195. p.listener = ln
  196. p.closeWg.Add(1)
  197. go p.listenAndServe()
  198. p.logger.Infof("started proxying [%s -> %s]", p.From(), p.To())
  199. return p
  200. }
  201. func (p *proxy) From() string {
  202. return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
  203. }
  204. func (p *proxy) To() string {
  205. return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
  206. }
  207. // TODO: implement packet reordering from multiple TCP connections
  208. // buffer packets per connection for awhile, reorder before transmit
  209. // - https://github.com/coreos/etcd/issues/5614
  210. // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
  211. func (p *proxy) listenAndServe() {
  212. defer p.closeWg.Done()
  213. p.logger.Infof("listen %q", p.From())
  214. close(p.readyc)
  215. for {
  216. p.acceptMu.Lock()
  217. pausec := p.pauseAcceptc
  218. p.acceptMu.Unlock()
  219. select {
  220. case <-pausec:
  221. case <-p.donec:
  222. return
  223. }
  224. p.latencyAcceptMu.RLock()
  225. lat := p.latencyAccept
  226. p.latencyAcceptMu.RUnlock()
  227. if lat > 0 {
  228. select {
  229. case <-time.After(lat):
  230. case <-p.donec:
  231. return
  232. }
  233. }
  234. p.listenerMu.RLock()
  235. ln := p.listener
  236. p.listenerMu.RUnlock()
  237. in, err := ln.Accept()
  238. if err != nil {
  239. select {
  240. case p.errc <- err:
  241. select {
  242. case <-p.donec:
  243. return
  244. default:
  245. }
  246. case <-p.donec:
  247. return
  248. }
  249. if p.logger.V(5) {
  250. p.logger.Errorf("listener accept error %q", err.Error())
  251. }
  252. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  253. select {
  254. case <-time.After(p.retryInterval):
  255. case <-p.donec:
  256. return
  257. }
  258. if p.logger.V(5) {
  259. p.logger.Errorf("listener is closed; retry listen %q", p.From())
  260. }
  261. if err = p.ResetListener(); err != nil {
  262. select {
  263. case p.errc <- err:
  264. select {
  265. case <-p.donec:
  266. return
  267. default:
  268. }
  269. case <-p.donec:
  270. return
  271. }
  272. p.logger.Errorf("failed to reset listener %q", err.Error())
  273. }
  274. }
  275. continue
  276. }
  277. var out net.Conn
  278. if !p.tlsInfo.Empty() {
  279. var tp *http.Transport
  280. tp, err = NewTransport(p.tlsInfo, p.dialTimeout)
  281. if err != nil {
  282. select {
  283. case p.errc <- err:
  284. select {
  285. case <-p.donec:
  286. return
  287. default:
  288. }
  289. case <-p.donec:
  290. return
  291. }
  292. continue
  293. }
  294. out, err = tp.Dial(p.to.Scheme, p.to.Host)
  295. } else {
  296. out, err = net.Dial(p.to.Scheme, p.to.Host)
  297. }
  298. if err != nil {
  299. select {
  300. case p.errc <- err:
  301. select {
  302. case <-p.donec:
  303. return
  304. default:
  305. }
  306. case <-p.donec:
  307. return
  308. }
  309. if p.logger.V(5) {
  310. p.logger.Errorf("dial error %q", err.Error())
  311. }
  312. continue
  313. }
  314. go func() {
  315. // read incoming bytes from listener, dispatch to outgoing connection
  316. p.transmit(out, in)
  317. out.Close()
  318. in.Close()
  319. }()
  320. go func() {
  321. // read response from outgoing connection, write back to listener
  322. p.receive(in, out)
  323. in.Close()
  324. out.Close()
  325. }()
  326. }
  327. }
  328. func (p *proxy) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
  329. func (p *proxy) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
  330. func (p *proxy) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
  331. buf := make([]byte, p.bufferSize)
  332. for {
  333. nr, err := src.Read(buf)
  334. if err != nil {
  335. if err == io.EOF {
  336. return
  337. }
  338. // connection already closed
  339. if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
  340. return
  341. }
  342. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  343. return
  344. }
  345. select {
  346. case p.errc <- err:
  347. select {
  348. case <-p.donec:
  349. return
  350. default:
  351. }
  352. case <-p.donec:
  353. return
  354. }
  355. if p.logger.V(5) {
  356. p.logger.Errorf("read error %q", err.Error())
  357. }
  358. return
  359. }
  360. if nr == 0 {
  361. return
  362. }
  363. data := buf[:nr]
  364. var pausec chan struct{}
  365. var blackholec chan struct{}
  366. if proxySend {
  367. p.txMu.Lock()
  368. pausec = p.pauseTxc
  369. blackholec = p.blackholeTxc
  370. p.txMu.Unlock()
  371. } else {
  372. p.rxMu.Lock()
  373. pausec = p.pauseRxc
  374. blackholec = p.blackholeRxc
  375. p.rxMu.Unlock()
  376. }
  377. select {
  378. case <-pausec:
  379. case <-p.donec:
  380. return
  381. }
  382. blackholed := false
  383. select {
  384. case <-blackholec:
  385. blackholed = true
  386. case <-p.donec:
  387. return
  388. default:
  389. }
  390. if blackholed {
  391. if p.logger.V(5) {
  392. if proxySend {
  393. p.logger.Infof("dropped %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
  394. } else {
  395. p.logger.Infof("dropped %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
  396. }
  397. }
  398. continue
  399. }
  400. var lat time.Duration
  401. if proxySend {
  402. p.latencyTxMu.RLock()
  403. lat = p.latencyTx
  404. p.latencyTxMu.RUnlock()
  405. } else {
  406. p.latencyRxMu.RLock()
  407. lat = p.latencyRx
  408. p.latencyRxMu.RUnlock()
  409. }
  410. if lat > 0 {
  411. select {
  412. case <-time.After(lat):
  413. case <-p.donec:
  414. return
  415. }
  416. }
  417. if proxySend {
  418. p.corruptTxMu.RLock()
  419. if p.corruptTx != nil {
  420. data = p.corruptTx(data)
  421. }
  422. p.corruptTxMu.RUnlock()
  423. } else {
  424. p.corruptRxMu.RLock()
  425. if p.corruptRx != nil {
  426. data = p.corruptRx(data)
  427. }
  428. p.corruptRxMu.RUnlock()
  429. }
  430. var nw int
  431. nw, err = dst.Write(data)
  432. if err != nil {
  433. if err == io.EOF {
  434. return
  435. }
  436. select {
  437. case p.errc <- err:
  438. select {
  439. case <-p.donec:
  440. return
  441. default:
  442. }
  443. case <-p.donec:
  444. return
  445. }
  446. if p.logger.V(5) {
  447. if proxySend {
  448. p.logger.Errorf("write error while sending (%q)", err.Error())
  449. } else {
  450. p.logger.Errorf("write error while receiving (%q)", err.Error())
  451. }
  452. }
  453. return
  454. }
  455. if nr != nw {
  456. select {
  457. case p.errc <- io.ErrShortWrite:
  458. select {
  459. case <-p.donec:
  460. return
  461. default:
  462. }
  463. case <-p.donec:
  464. return
  465. }
  466. if proxySend {
  467. p.logger.Errorf("write error while sending (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw)
  468. } else {
  469. p.logger.Errorf("write error while receiving (%q); read %d bytes != wrote %d bytes", io.ErrShortWrite.Error(), nr, nw)
  470. }
  471. return
  472. }
  473. if p.logger.V(5) {
  474. if proxySend {
  475. p.logger.Infof("transmitted %s [%s -> %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
  476. } else {
  477. p.logger.Infof("received %s [%s <- %s]", humanize.Bytes(uint64(nr)), p.From(), p.To())
  478. }
  479. }
  480. }
  481. }
  482. func (p *proxy) Ready() <-chan struct{} { return p.readyc }
  483. func (p *proxy) Done() <-chan struct{} { return p.donec }
  484. func (p *proxy) Error() <-chan error { return p.errc }
  485. func (p *proxy) Close() (err error) {
  486. p.closeOnce.Do(func() {
  487. close(p.donec)
  488. p.listenerMu.Lock()
  489. if p.listener != nil {
  490. err = p.listener.Close()
  491. p.logger.Infof("closed proxy listener on %q", p.From())
  492. }
  493. p.listenerMu.Unlock()
  494. })
  495. p.closeWg.Wait()
  496. return err
  497. }
  498. func (p *proxy) DelayAccept(latency, rv time.Duration) {
  499. if latency <= 0 {
  500. return
  501. }
  502. d := computeLatency(latency, rv)
  503. p.latencyAcceptMu.Lock()
  504. p.latencyAccept = d
  505. p.latencyAcceptMu.Unlock()
  506. p.logger.Infof("set accept latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To())
  507. }
  508. func (p *proxy) UndelayAccept() {
  509. p.latencyAcceptMu.Lock()
  510. d := p.latencyAccept
  511. p.latencyAccept = 0
  512. p.latencyAcceptMu.Unlock()
  513. p.logger.Infof("removed accept latency %v [%s -> %s]", d, p.From(), p.To())
  514. }
  515. func (p *proxy) LatencyAccept() time.Duration {
  516. p.latencyAcceptMu.RLock()
  517. d := p.latencyAccept
  518. p.latencyAcceptMu.RUnlock()
  519. return d
  520. }
  521. func (p *proxy) DelayTx(latency, rv time.Duration) {
  522. if latency <= 0 {
  523. return
  524. }
  525. d := computeLatency(latency, rv)
  526. p.latencyTxMu.Lock()
  527. p.latencyTx = d
  528. p.latencyTxMu.Unlock()
  529. p.logger.Infof("set transmit latency %v(%v±%v) [%s -> %s]", d, latency, rv, p.From(), p.To())
  530. }
  531. func (p *proxy) UndelayTx() {
  532. p.latencyTxMu.Lock()
  533. d := p.latencyTx
  534. p.latencyTx = 0
  535. p.latencyTxMu.Unlock()
  536. p.logger.Infof("removed transmit latency %v [%s -> %s]", d, p.From(), p.To())
  537. }
  538. func (p *proxy) LatencyTx() time.Duration {
  539. p.latencyTxMu.RLock()
  540. d := p.latencyTx
  541. p.latencyTxMu.RUnlock()
  542. return d
  543. }
  544. func (p *proxy) DelayRx(latency, rv time.Duration) {
  545. if latency <= 0 {
  546. return
  547. }
  548. d := computeLatency(latency, rv)
  549. p.latencyRxMu.Lock()
  550. p.latencyRx = d
  551. p.latencyRxMu.Unlock()
  552. p.logger.Infof("set receive latency %v(%v±%v) [%s <- %s]", d, latency, rv, p.From(), p.To())
  553. }
  554. func (p *proxy) UndelayRx() {
  555. p.latencyRxMu.Lock()
  556. d := p.latencyRx
  557. p.latencyRx = 0
  558. p.latencyRxMu.Unlock()
  559. p.logger.Infof("removed receive latency %v [%s <- %s]", d, p.From(), p.To())
  560. }
  561. func (p *proxy) LatencyRx() time.Duration {
  562. p.latencyRxMu.RLock()
  563. d := p.latencyRx
  564. p.latencyRxMu.RUnlock()
  565. return d
  566. }
  567. func computeLatency(lat, rv time.Duration) time.Duration {
  568. if rv == 0 {
  569. return lat
  570. }
  571. if rv < 0 {
  572. rv *= -1
  573. }
  574. if rv > lat {
  575. rv = lat / 10
  576. }
  577. now := time.Now()
  578. mrand.Seed(int64(now.Nanosecond()))
  579. sign := 1
  580. if now.Second()%2 == 0 {
  581. sign = -1
  582. }
  583. return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
  584. }
  585. func (p *proxy) PauseAccept() {
  586. p.acceptMu.Lock()
  587. p.pauseAcceptc = make(chan struct{})
  588. p.acceptMu.Unlock()
  589. p.logger.Infof("paused accepting new connections [%s -> %s]", p.From(), p.To())
  590. }
  591. func (p *proxy) UnpauseAccept() {
  592. p.acceptMu.Lock()
  593. select {
  594. case <-p.pauseAcceptc: // already unpaused
  595. case <-p.donec:
  596. p.acceptMu.Unlock()
  597. return
  598. default:
  599. close(p.pauseAcceptc)
  600. }
  601. p.acceptMu.Unlock()
  602. p.logger.Infof("unpaused accepting new connections [%s -> %s]", p.From(), p.To())
  603. }
  604. func (p *proxy) PauseTx() {
  605. p.txMu.Lock()
  606. p.pauseTxc = make(chan struct{})
  607. p.txMu.Unlock()
  608. p.logger.Infof("paused transmit listen [%s -> %s]", p.From(), p.To())
  609. }
  610. func (p *proxy) UnpauseTx() {
  611. p.txMu.Lock()
  612. select {
  613. case <-p.pauseTxc: // already unpaused
  614. case <-p.donec:
  615. p.txMu.Unlock()
  616. return
  617. default:
  618. close(p.pauseTxc)
  619. }
  620. p.txMu.Unlock()
  621. p.logger.Infof("unpaused transmit listen [%s -> %s]", p.From(), p.To())
  622. }
  623. func (p *proxy) PauseRx() {
  624. p.rxMu.Lock()
  625. p.pauseRxc = make(chan struct{})
  626. p.rxMu.Unlock()
  627. p.logger.Infof("paused receive listen [%s <- %s]", p.From(), p.To())
  628. }
  629. func (p *proxy) UnpauseRx() {
  630. p.rxMu.Lock()
  631. select {
  632. case <-p.pauseRxc: // already unpaused
  633. case <-p.donec:
  634. p.rxMu.Unlock()
  635. return
  636. default:
  637. close(p.pauseRxc)
  638. }
  639. p.rxMu.Unlock()
  640. p.logger.Infof("unpaused receive listen [%s <- %s]", p.From(), p.To())
  641. }
  642. func (p *proxy) BlackholeTx() {
  643. p.txMu.Lock()
  644. select {
  645. case <-p.blackholeTxc: // already blackholed
  646. case <-p.donec:
  647. p.txMu.Unlock()
  648. return
  649. default:
  650. close(p.blackholeTxc)
  651. }
  652. p.txMu.Unlock()
  653. p.logger.Infof("blackholed transmit [%s -> %s]", p.From(), p.To())
  654. }
  655. func (p *proxy) UnblackholeTx() {
  656. p.txMu.Lock()
  657. p.blackholeTxc = make(chan struct{})
  658. p.txMu.Unlock()
  659. p.logger.Infof("unblackholed transmit [%s -> %s]", p.From(), p.To())
  660. }
  661. func (p *proxy) BlackholeRx() {
  662. p.rxMu.Lock()
  663. select {
  664. case <-p.blackholeRxc: // already blackholed
  665. case <-p.donec:
  666. p.rxMu.Unlock()
  667. return
  668. default:
  669. close(p.blackholeRxc)
  670. }
  671. p.rxMu.Unlock()
  672. p.logger.Infof("blackholed receive [%s <- %s]", p.From(), p.To())
  673. }
  674. func (p *proxy) UnblackholeRx() {
  675. p.rxMu.Lock()
  676. p.blackholeRxc = make(chan struct{})
  677. p.rxMu.Unlock()
  678. p.logger.Infof("unblackholed receive [%s <- %s]", p.From(), p.To())
  679. }
  680. func (p *proxy) CorruptTx(f func([]byte) []byte) {
  681. p.corruptTxMu.Lock()
  682. p.corruptTx = f
  683. p.corruptTxMu.Unlock()
  684. p.logger.Infof("corrupting transmit [%s -> %s]", p.From(), p.To())
  685. }
  686. func (p *proxy) UncorruptTx() {
  687. p.corruptTxMu.Lock()
  688. p.corruptTx = nil
  689. p.corruptTxMu.Unlock()
  690. p.logger.Infof("stopped corrupting transmit [%s -> %s]", p.From(), p.To())
  691. }
  692. func (p *proxy) CorruptRx(f func([]byte) []byte) {
  693. p.corruptRxMu.Lock()
  694. p.corruptRx = f
  695. p.corruptRxMu.Unlock()
  696. p.logger.Infof("corrupting receive [%s <- %s]", p.From(), p.To())
  697. }
  698. func (p *proxy) UncorruptRx() {
  699. p.corruptRxMu.Lock()
  700. p.corruptRx = nil
  701. p.corruptRxMu.Unlock()
  702. p.logger.Infof("stopped corrupting receive [%s <- %s]", p.From(), p.To())
  703. }
  704. func (p *proxy) ResetListener() error {
  705. p.listenerMu.Lock()
  706. defer p.listenerMu.Unlock()
  707. if err := p.listener.Close(); err != nil {
  708. // already closed
  709. if !strings.HasSuffix(err.Error(), "use of closed network connection") {
  710. return err
  711. }
  712. }
  713. var ln net.Listener
  714. var err error
  715. if !p.tlsInfo.Empty() {
  716. ln, err = NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  717. } else {
  718. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  719. }
  720. if err != nil {
  721. return err
  722. }
  723. p.listener = ln
  724. p.logger.Infof("reset listener %q", p.From())
  725. return nil
  726. }