server.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "fmt"
  17. "io"
  18. mrand "math/rand"
  19. "net"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/coreos/etcd/pkg/transport"
  26. humanize "github.com/dustin/go-humanize"
  27. "go.uber.org/zap"
  28. )
  29. // Server defines proxy server layer that simulates common network faults,
  30. // such as latency spikes, packet drop/corruption, etc..
  31. type Server interface {
  32. // From returns proxy source address in "scheme://host:port" format.
  33. From() string
  34. // To returns proxy destination address in "scheme://host:port" format.
  35. To() string
  36. // Ready returns when proxy is ready to serve.
  37. Ready() <-chan struct{}
  38. // Done returns when proxy has been closed.
  39. Done() <-chan struct{}
  40. // Error sends errors while serving proxy.
  41. Error() <-chan error
  42. // Close closes listener and transport.
  43. Close() error
  44. // DelayAccept adds latency ± random variable to accepting new incoming connections.
  45. DelayAccept(latency, rv time.Duration)
  46. // UndelayAccept removes sending latencies.
  47. UndelayAccept()
  48. // LatencyAccept returns current latency on accepting new incoming connections.
  49. LatencyAccept() time.Duration
  50. // DelayTx adds latency ± random variable to "sending" layer.
  51. DelayTx(latency, rv time.Duration)
  52. // UndelayTx removes sending latencies.
  53. UndelayTx()
  54. // LatencyTx returns current send latency.
  55. LatencyTx() time.Duration
  56. // DelayRx adds latency ± random variable to "receiving" layer.
  57. DelayRx(latency, rv time.Duration)
  58. // UndelayRx removes "receiving" latencies.
  59. UndelayRx()
  60. // LatencyRx returns current receive latency.
  61. LatencyRx() time.Duration
  62. // PauseAccept stops accepting new connections.
  63. PauseAccept()
  64. // UnpauseAccept removes pause operation on accepting new connections.
  65. UnpauseAccept()
  66. // PauseTx stops "forwarding" packets.
  67. PauseTx()
  68. // UnpauseTx removes "forwarding" pause operation.
  69. UnpauseTx()
  70. // PauseRx stops "receiving" packets to client.
  71. PauseRx()
  72. // UnpauseRx removes "receiving" pause operation.
  73. UnpauseRx()
  74. // BlackholeTx drops all incoming packets before "forwarding".
  75. BlackholeTx()
  76. // UnblackholeTx removes blackhole operation on "sending".
  77. UnblackholeTx()
  78. // BlackholeRx drops all incoming packets to client.
  79. BlackholeRx()
  80. // UnblackholeRx removes blackhole operation on "receiving".
  81. UnblackholeRx()
  82. // CorruptTx corrupts incoming packets from the listener.
  83. CorruptTx(f func(data []byte) []byte)
  84. // UncorruptTx removes corrupt operation on "forwarding".
  85. UncorruptTx()
  86. // CorruptRx corrupts incoming packets to client.
  87. CorruptRx(f func(data []byte) []byte)
  88. // UncorruptRx removes corrupt operation on "receiving".
  89. UncorruptRx()
  90. // ResetListener closes and restarts listener.
  91. ResetListener() error
  92. }
  93. type proxyServer struct {
  94. lg *zap.Logger
  95. from, to url.URL
  96. tlsInfo transport.TLSInfo
  97. dialTimeout time.Duration
  98. bufferSize int
  99. retryInterval time.Duration
  100. readyc chan struct{}
  101. donec chan struct{}
  102. errc chan error
  103. closeOnce sync.Once
  104. closeWg sync.WaitGroup
  105. listenerMu sync.RWMutex
  106. listener net.Listener
  107. latencyAcceptMu sync.RWMutex
  108. latencyAccept time.Duration
  109. latencyTxMu sync.RWMutex
  110. latencyTx time.Duration
  111. latencyRxMu sync.RWMutex
  112. latencyRx time.Duration
  113. corruptTxMu sync.RWMutex
  114. corruptTx func(data []byte) []byte
  115. corruptRxMu sync.RWMutex
  116. corruptRx func(data []byte) []byte
  117. acceptMu sync.Mutex
  118. pauseAcceptc chan struct{}
  119. txMu sync.Mutex
  120. pauseTxc chan struct{}
  121. blackholeTxc chan struct{}
  122. rxMu sync.Mutex
  123. pauseRxc chan struct{}
  124. blackholeRxc chan struct{}
  125. }
  126. // ServerConfig defines proxy server configuration.
  127. type ServerConfig struct {
  128. Logger *zap.Logger
  129. From url.URL
  130. To url.URL
  131. TLSInfo transport.TLSInfo
  132. DialTimeout time.Duration
  133. BufferSize int
  134. RetryInterval time.Duration
  135. }
  136. var (
  137. defaultDialTimeout = 3 * time.Second
  138. defaultBufferSize = 48 * 1024
  139. defaultRetryInterval = 10 * time.Millisecond
  140. defaultLogger *zap.Logger
  141. )
  142. func init() {
  143. var err error
  144. defaultLogger, err = zap.NewProduction()
  145. if err != nil {
  146. panic(err)
  147. }
  148. }
  149. // NewServer returns a proxy implementation with no iptables/tc dependencies.
  150. // The proxy layer overhead is <1ms.
  151. func NewServer(cfg ServerConfig) Server {
  152. p := &proxyServer{
  153. lg: cfg.Logger,
  154. from: cfg.From,
  155. to: cfg.To,
  156. tlsInfo: cfg.TLSInfo,
  157. dialTimeout: cfg.DialTimeout,
  158. bufferSize: cfg.BufferSize,
  159. retryInterval: cfg.RetryInterval,
  160. readyc: make(chan struct{}),
  161. donec: make(chan struct{}),
  162. errc: make(chan error, 16),
  163. pauseAcceptc: make(chan struct{}),
  164. pauseTxc: make(chan struct{}),
  165. blackholeTxc: make(chan struct{}),
  166. pauseRxc: make(chan struct{}),
  167. blackholeRxc: make(chan struct{}),
  168. }
  169. if p.dialTimeout == 0 {
  170. p.dialTimeout = defaultDialTimeout
  171. }
  172. if p.bufferSize == 0 {
  173. p.bufferSize = defaultBufferSize
  174. }
  175. if p.retryInterval == 0 {
  176. p.retryInterval = defaultRetryInterval
  177. }
  178. if p.lg == nil {
  179. p.lg = defaultLogger
  180. }
  181. close(p.pauseAcceptc)
  182. close(p.pauseTxc)
  183. close(p.pauseRxc)
  184. if strings.HasPrefix(p.from.Scheme, "http") {
  185. p.from.Scheme = "tcp"
  186. }
  187. if strings.HasPrefix(p.to.Scheme, "http") {
  188. p.to.Scheme = "tcp"
  189. }
  190. var ln net.Listener
  191. var err error
  192. if !p.tlsInfo.Empty() {
  193. ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  194. } else {
  195. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  196. }
  197. if err != nil {
  198. p.errc <- err
  199. p.Close()
  200. return p
  201. }
  202. p.listener = ln
  203. p.closeWg.Add(1)
  204. go p.listenAndServe()
  205. p.lg.Info("started proxying", zap.String("from", p.From()), zap.String("to", p.To()))
  206. return p
  207. }
  208. func (p *proxyServer) From() string {
  209. return fmt.Sprintf("%s://%s", p.from.Scheme, p.from.Host)
  210. }
  211. func (p *proxyServer) To() string {
  212. return fmt.Sprintf("%s://%s", p.to.Scheme, p.to.Host)
  213. }
  214. // TODO: implement packet reordering from multiple TCP connections
  215. // buffer packets per connection for awhile, reorder before transmit
  216. // - https://github.com/coreos/etcd/issues/5614
  217. // - https://github.com/coreos/etcd/pull/6918#issuecomment-264093034
  218. func (p *proxyServer) listenAndServe() {
  219. defer p.closeWg.Done()
  220. p.lg.Info("proxy is listening on", zap.String("from", p.From()))
  221. close(p.readyc)
  222. for {
  223. p.acceptMu.Lock()
  224. pausec := p.pauseAcceptc
  225. p.acceptMu.Unlock()
  226. select {
  227. case <-pausec:
  228. case <-p.donec:
  229. return
  230. }
  231. p.latencyAcceptMu.RLock()
  232. lat := p.latencyAccept
  233. p.latencyAcceptMu.RUnlock()
  234. if lat > 0 {
  235. select {
  236. case <-time.After(lat):
  237. case <-p.donec:
  238. return
  239. }
  240. }
  241. p.listenerMu.RLock()
  242. ln := p.listener
  243. p.listenerMu.RUnlock()
  244. in, err := ln.Accept()
  245. if err != nil {
  246. select {
  247. case p.errc <- err:
  248. select {
  249. case <-p.donec:
  250. return
  251. default:
  252. }
  253. case <-p.donec:
  254. return
  255. }
  256. p.lg.Debug("listener accept error", zap.Error(err))
  257. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  258. select {
  259. case <-time.After(p.retryInterval):
  260. case <-p.donec:
  261. return
  262. }
  263. p.lg.Debug("listener is closed; retry listening on", zap.String("from", p.From()))
  264. if err = p.ResetListener(); err != nil {
  265. select {
  266. case p.errc <- err:
  267. select {
  268. case <-p.donec:
  269. return
  270. default:
  271. }
  272. case <-p.donec:
  273. return
  274. }
  275. p.lg.Warn("failed to reset listener", zap.Error(err))
  276. }
  277. }
  278. continue
  279. }
  280. var out net.Conn
  281. if !p.tlsInfo.Empty() {
  282. var tp *http.Transport
  283. tp, err = transport.NewTransport(p.tlsInfo, p.dialTimeout)
  284. if err != nil {
  285. select {
  286. case p.errc <- err:
  287. select {
  288. case <-p.donec:
  289. return
  290. default:
  291. }
  292. case <-p.donec:
  293. return
  294. }
  295. continue
  296. }
  297. out, err = tp.Dial(p.to.Scheme, p.to.Host)
  298. } else {
  299. out, err = net.Dial(p.to.Scheme, p.to.Host)
  300. }
  301. if err != nil {
  302. select {
  303. case p.errc <- err:
  304. select {
  305. case <-p.donec:
  306. return
  307. default:
  308. }
  309. case <-p.donec:
  310. return
  311. }
  312. p.lg.Debug("failed to dial", zap.Error(err))
  313. continue
  314. }
  315. go func() {
  316. // read incoming bytes from listener, dispatch to outgoing connection
  317. p.transmit(out, in)
  318. out.Close()
  319. in.Close()
  320. }()
  321. go func() {
  322. // read response from outgoing connection, write back to listener
  323. p.receive(in, out)
  324. in.Close()
  325. out.Close()
  326. }()
  327. }
  328. }
  329. func (p *proxyServer) transmit(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, true) }
  330. func (p *proxyServer) receive(dst io.Writer, src io.Reader) { p.ioCopy(dst, src, false) }
  331. func (p *proxyServer) ioCopy(dst io.Writer, src io.Reader, proxySend bool) {
  332. buf := make([]byte, p.bufferSize)
  333. for {
  334. nr, err := src.Read(buf)
  335. if err != nil {
  336. if err == io.EOF {
  337. return
  338. }
  339. // connection already closed
  340. if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
  341. return
  342. }
  343. if strings.HasSuffix(err.Error(), "use of closed network connection") {
  344. return
  345. }
  346. select {
  347. case p.errc <- err:
  348. select {
  349. case <-p.donec:
  350. return
  351. default:
  352. }
  353. case <-p.donec:
  354. return
  355. }
  356. p.lg.Debug("failed to read", zap.Error(err))
  357. return
  358. }
  359. if nr == 0 {
  360. return
  361. }
  362. data := buf[:nr]
  363. var pausec chan struct{}
  364. var blackholec chan struct{}
  365. if proxySend {
  366. p.txMu.Lock()
  367. pausec = p.pauseTxc
  368. blackholec = p.blackholeTxc
  369. p.txMu.Unlock()
  370. } else {
  371. p.rxMu.Lock()
  372. pausec = p.pauseRxc
  373. blackholec = p.blackholeRxc
  374. p.rxMu.Unlock()
  375. }
  376. select {
  377. case <-pausec:
  378. case <-p.donec:
  379. return
  380. }
  381. blackholed := false
  382. select {
  383. case <-blackholec:
  384. blackholed = true
  385. case <-p.donec:
  386. return
  387. default:
  388. }
  389. if blackholed {
  390. if proxySend {
  391. p.lg.Debug(
  392. "dropped",
  393. zap.String("data-size", humanize.Bytes(uint64(nr))),
  394. zap.String("from", p.From()),
  395. zap.String("to", p.To()),
  396. )
  397. } else {
  398. p.lg.Debug(
  399. "dropped",
  400. zap.String("data-size", humanize.Bytes(uint64(nr))),
  401. zap.String("from", p.To()),
  402. zap.String("to", p.From()),
  403. )
  404. }
  405. continue
  406. }
  407. var lat time.Duration
  408. if proxySend {
  409. p.latencyTxMu.RLock()
  410. lat = p.latencyTx
  411. p.latencyTxMu.RUnlock()
  412. } else {
  413. p.latencyRxMu.RLock()
  414. lat = p.latencyRx
  415. p.latencyRxMu.RUnlock()
  416. }
  417. if lat > 0 {
  418. select {
  419. case <-time.After(lat):
  420. case <-p.donec:
  421. return
  422. }
  423. }
  424. if proxySend {
  425. p.corruptTxMu.RLock()
  426. if p.corruptTx != nil {
  427. data = p.corruptTx(data)
  428. }
  429. p.corruptTxMu.RUnlock()
  430. } else {
  431. p.corruptRxMu.RLock()
  432. if p.corruptRx != nil {
  433. data = p.corruptRx(data)
  434. }
  435. p.corruptRxMu.RUnlock()
  436. }
  437. var nw int
  438. nw, err = dst.Write(data)
  439. if err != nil {
  440. if err == io.EOF {
  441. return
  442. }
  443. select {
  444. case p.errc <- err:
  445. select {
  446. case <-p.donec:
  447. return
  448. default:
  449. }
  450. case <-p.donec:
  451. return
  452. }
  453. if proxySend {
  454. p.lg.Debug("failed to write while sending", zap.Error(err))
  455. } else {
  456. p.lg.Debug("failed to write while receiving", zap.Error(err))
  457. }
  458. return
  459. }
  460. if nr != nw {
  461. select {
  462. case p.errc <- io.ErrShortWrite:
  463. select {
  464. case <-p.donec:
  465. return
  466. default:
  467. }
  468. case <-p.donec:
  469. return
  470. }
  471. if proxySend {
  472. p.lg.Debug(
  473. "failed to write while sending; read/write bytes are different",
  474. zap.Int("read-bytes", nr),
  475. zap.Int("write-bytes", nw),
  476. zap.Error(io.ErrShortWrite),
  477. )
  478. } else {
  479. p.lg.Debug(
  480. "failed to write while receiving; read/write bytes are different",
  481. zap.Int("read-bytes", nr),
  482. zap.Int("write-bytes", nw),
  483. zap.Error(io.ErrShortWrite),
  484. )
  485. }
  486. return
  487. }
  488. if proxySend {
  489. p.lg.Debug(
  490. "transmitted",
  491. zap.String("data-size", humanize.Bytes(uint64(nr))),
  492. zap.String("from", p.From()),
  493. zap.String("to", p.To()),
  494. )
  495. } else {
  496. p.lg.Debug(
  497. "received",
  498. zap.String("data-size", humanize.Bytes(uint64(nr))),
  499. zap.String("from", p.To()),
  500. zap.String("to", p.From()),
  501. )
  502. }
  503. }
  504. }
  505. func (p *proxyServer) Ready() <-chan struct{} { return p.readyc }
  506. func (p *proxyServer) Done() <-chan struct{} { return p.donec }
  507. func (p *proxyServer) Error() <-chan error { return p.errc }
  508. func (p *proxyServer) Close() (err error) {
  509. p.closeOnce.Do(func() {
  510. close(p.donec)
  511. p.listenerMu.Lock()
  512. if p.listener != nil {
  513. err = p.listener.Close()
  514. p.lg.Info(
  515. "closed proxy listener",
  516. zap.String("from", p.From()),
  517. zap.String("to", p.To()),
  518. )
  519. }
  520. p.lg.Sync()
  521. p.listenerMu.Unlock()
  522. })
  523. p.closeWg.Wait()
  524. return err
  525. }
  526. func (p *proxyServer) DelayAccept(latency, rv time.Duration) {
  527. if latency <= 0 {
  528. return
  529. }
  530. d := computeLatency(latency, rv)
  531. p.latencyAcceptMu.Lock()
  532. p.latencyAccept = d
  533. p.latencyAcceptMu.Unlock()
  534. p.lg.Info(
  535. "set accept latency",
  536. zap.Duration("latency", d),
  537. zap.Duration("given-latency", latency),
  538. zap.Duration("given-latency-random-variable", rv),
  539. zap.String("from", p.From()),
  540. zap.String("to", p.To()),
  541. )
  542. }
  543. func (p *proxyServer) UndelayAccept() {
  544. p.latencyAcceptMu.Lock()
  545. d := p.latencyAccept
  546. p.latencyAccept = 0
  547. p.latencyAcceptMu.Unlock()
  548. p.lg.Info(
  549. "removed accept latency",
  550. zap.Duration("latency", d),
  551. zap.String("from", p.From()),
  552. zap.String("to", p.To()),
  553. )
  554. }
  555. func (p *proxyServer) LatencyAccept() time.Duration {
  556. p.latencyAcceptMu.RLock()
  557. d := p.latencyAccept
  558. p.latencyAcceptMu.RUnlock()
  559. return d
  560. }
  561. func (p *proxyServer) DelayTx(latency, rv time.Duration) {
  562. if latency <= 0 {
  563. return
  564. }
  565. d := computeLatency(latency, rv)
  566. p.latencyTxMu.Lock()
  567. p.latencyTx = d
  568. p.latencyTxMu.Unlock()
  569. p.lg.Info(
  570. "set transmit latency",
  571. zap.Duration("latency", d),
  572. zap.Duration("given-latency", latency),
  573. zap.Duration("given-latency-random-variable", rv),
  574. zap.String("from", p.From()),
  575. zap.String("to", p.To()),
  576. )
  577. }
  578. func (p *proxyServer) UndelayTx() {
  579. p.latencyTxMu.Lock()
  580. d := p.latencyTx
  581. p.latencyTx = 0
  582. p.latencyTxMu.Unlock()
  583. p.lg.Info(
  584. "removed transmit latency",
  585. zap.Duration("latency", d),
  586. zap.String("from", p.From()),
  587. zap.String("to", p.To()),
  588. )
  589. }
  590. func (p *proxyServer) LatencyTx() time.Duration {
  591. p.latencyTxMu.RLock()
  592. d := p.latencyTx
  593. p.latencyTxMu.RUnlock()
  594. return d
  595. }
  596. func (p *proxyServer) DelayRx(latency, rv time.Duration) {
  597. if latency <= 0 {
  598. return
  599. }
  600. d := computeLatency(latency, rv)
  601. p.latencyRxMu.Lock()
  602. p.latencyRx = d
  603. p.latencyRxMu.Unlock()
  604. p.lg.Info(
  605. "set receive latency",
  606. zap.Duration("latency", d),
  607. zap.Duration("given-latency", latency),
  608. zap.Duration("given-latency-random-variable", rv),
  609. zap.String("from", p.To()),
  610. zap.String("to", p.From()),
  611. )
  612. }
  613. func (p *proxyServer) UndelayRx() {
  614. p.latencyRxMu.Lock()
  615. d := p.latencyRx
  616. p.latencyRx = 0
  617. p.latencyRxMu.Unlock()
  618. p.lg.Info(
  619. "removed receive latency",
  620. zap.Duration("latency", d),
  621. zap.String("from", p.To()),
  622. zap.String("to", p.From()),
  623. )
  624. }
  625. func (p *proxyServer) LatencyRx() time.Duration {
  626. p.latencyRxMu.RLock()
  627. d := p.latencyRx
  628. p.latencyRxMu.RUnlock()
  629. return d
  630. }
  631. func computeLatency(lat, rv time.Duration) time.Duration {
  632. if rv == 0 {
  633. return lat
  634. }
  635. if rv < 0 {
  636. rv *= -1
  637. }
  638. if rv > lat {
  639. rv = lat / 10
  640. }
  641. now := time.Now()
  642. mrand.Seed(int64(now.Nanosecond()))
  643. sign := 1
  644. if now.Second()%2 == 0 {
  645. sign = -1
  646. }
  647. return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
  648. }
  649. func (p *proxyServer) PauseAccept() {
  650. p.acceptMu.Lock()
  651. p.pauseAcceptc = make(chan struct{})
  652. p.acceptMu.Unlock()
  653. p.lg.Info(
  654. "paused accepting new connections",
  655. zap.String("from", p.From()),
  656. zap.String("to", p.To()),
  657. )
  658. }
  659. func (p *proxyServer) UnpauseAccept() {
  660. p.acceptMu.Lock()
  661. select {
  662. case <-p.pauseAcceptc: // already unpaused
  663. case <-p.donec:
  664. p.acceptMu.Unlock()
  665. return
  666. default:
  667. close(p.pauseAcceptc)
  668. }
  669. p.acceptMu.Unlock()
  670. p.lg.Info(
  671. "unpaused accepting new connections",
  672. zap.String("from", p.From()),
  673. zap.String("to", p.To()),
  674. )
  675. }
  676. func (p *proxyServer) PauseTx() {
  677. p.txMu.Lock()
  678. p.pauseTxc = make(chan struct{})
  679. p.txMu.Unlock()
  680. p.lg.Info(
  681. "paused transmit listen",
  682. zap.String("from", p.From()),
  683. zap.String("to", p.To()),
  684. )
  685. }
  686. func (p *proxyServer) UnpauseTx() {
  687. p.txMu.Lock()
  688. select {
  689. case <-p.pauseTxc: // already unpaused
  690. case <-p.donec:
  691. p.txMu.Unlock()
  692. return
  693. default:
  694. close(p.pauseTxc)
  695. }
  696. p.txMu.Unlock()
  697. p.lg.Info(
  698. "unpaused transmit listen",
  699. zap.String("from", p.From()),
  700. zap.String("to", p.To()),
  701. )
  702. }
  703. func (p *proxyServer) PauseRx() {
  704. p.rxMu.Lock()
  705. p.pauseRxc = make(chan struct{})
  706. p.rxMu.Unlock()
  707. p.lg.Info(
  708. "paused receive listen",
  709. zap.String("from", p.To()),
  710. zap.String("to", p.From()),
  711. )
  712. }
  713. func (p *proxyServer) UnpauseRx() {
  714. p.rxMu.Lock()
  715. select {
  716. case <-p.pauseRxc: // already unpaused
  717. case <-p.donec:
  718. p.rxMu.Unlock()
  719. return
  720. default:
  721. close(p.pauseRxc)
  722. }
  723. p.rxMu.Unlock()
  724. p.lg.Info(
  725. "unpaused receive listen",
  726. zap.String("from", p.To()),
  727. zap.String("to", p.From()),
  728. )
  729. }
  730. func (p *proxyServer) BlackholeTx() {
  731. p.txMu.Lock()
  732. select {
  733. case <-p.blackholeTxc: // already blackholed
  734. case <-p.donec:
  735. p.txMu.Unlock()
  736. return
  737. default:
  738. close(p.blackholeTxc)
  739. }
  740. p.txMu.Unlock()
  741. p.lg.Info(
  742. "blackholed transmit",
  743. zap.String("from", p.From()),
  744. zap.String("to", p.To()),
  745. )
  746. }
  747. func (p *proxyServer) UnblackholeTx() {
  748. p.txMu.Lock()
  749. p.blackholeTxc = make(chan struct{})
  750. p.txMu.Unlock()
  751. p.lg.Info(
  752. "unblackholed transmit",
  753. zap.String("from", p.From()),
  754. zap.String("to", p.To()),
  755. )
  756. }
  757. func (p *proxyServer) BlackholeRx() {
  758. p.rxMu.Lock()
  759. select {
  760. case <-p.blackholeRxc: // already blackholed
  761. case <-p.donec:
  762. p.rxMu.Unlock()
  763. return
  764. default:
  765. close(p.blackholeRxc)
  766. }
  767. p.rxMu.Unlock()
  768. p.lg.Info(
  769. "blackholed receive",
  770. zap.String("from", p.To()),
  771. zap.String("to", p.From()),
  772. )
  773. }
  774. func (p *proxyServer) UnblackholeRx() {
  775. p.rxMu.Lock()
  776. p.blackholeRxc = make(chan struct{})
  777. p.rxMu.Unlock()
  778. p.lg.Info(
  779. "unblackholed receive",
  780. zap.String("from", p.To()),
  781. zap.String("to", p.From()),
  782. )
  783. }
  784. func (p *proxyServer) CorruptTx(f func([]byte) []byte) {
  785. p.corruptTxMu.Lock()
  786. p.corruptTx = f
  787. p.corruptTxMu.Unlock()
  788. p.lg.Info(
  789. "corrupting transmit",
  790. zap.String("from", p.From()),
  791. zap.String("to", p.To()),
  792. )
  793. }
  794. func (p *proxyServer) UncorruptTx() {
  795. p.corruptTxMu.Lock()
  796. p.corruptTx = nil
  797. p.corruptTxMu.Unlock()
  798. p.lg.Info(
  799. "stopped corrupting transmit",
  800. zap.String("from", p.From()),
  801. zap.String("to", p.To()),
  802. )
  803. }
  804. func (p *proxyServer) CorruptRx(f func([]byte) []byte) {
  805. p.corruptRxMu.Lock()
  806. p.corruptRx = f
  807. p.corruptRxMu.Unlock()
  808. p.lg.Info(
  809. "corrupting receive",
  810. zap.String("from", p.To()),
  811. zap.String("to", p.From()),
  812. )
  813. }
  814. func (p *proxyServer) UncorruptRx() {
  815. p.corruptRxMu.Lock()
  816. p.corruptRx = nil
  817. p.corruptRxMu.Unlock()
  818. p.lg.Info(
  819. "stopped corrupting receive",
  820. zap.String("from", p.To()),
  821. zap.String("to", p.From()),
  822. )
  823. }
  824. func (p *proxyServer) ResetListener() error {
  825. p.listenerMu.Lock()
  826. defer p.listenerMu.Unlock()
  827. if err := p.listener.Close(); err != nil {
  828. // already closed
  829. if !strings.HasSuffix(err.Error(), "use of closed network connection") {
  830. return err
  831. }
  832. }
  833. var ln net.Listener
  834. var err error
  835. if !p.tlsInfo.Empty() {
  836. ln, err = transport.NewListener(p.from.Host, p.from.Scheme, &p.tlsInfo)
  837. } else {
  838. ln, err = net.Listen(p.from.Scheme, p.from.Host)
  839. }
  840. if err != nil {
  841. return err
  842. }
  843. p.listener = ln
  844. p.lg.Info(
  845. "reset listener on",
  846. zap.String("from", p.From()),
  847. )
  848. return nil
  849. }