proxy_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transport
  15. import (
  16. "bytes"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "strings"
  26. "testing"
  27. "time"
  28. "go.uber.org/zap"
  29. )
  30. // enable DebugLevel
  31. var testLogger = zap.NewExample()
  32. var testTLSInfo = TLSInfo{
  33. KeyFile: "./fixtures/server.key.insecure",
  34. CertFile: "./fixtures/server.crt",
  35. TrustedCAFile: "./fixtures/ca.crt",
  36. ClientCertAuth: true,
  37. }
  38. func TestProxy_Unix_Insecure(t *testing.T) { testProxy(t, "unix", false, false) }
  39. func TestProxy_TCP_Insecure(t *testing.T) { testProxy(t, "tcp", false, false) }
  40. func TestProxy_Unix_Secure(t *testing.T) { testProxy(t, "unix", true, false) }
  41. func TestProxy_TCP_Secure(t *testing.T) { testProxy(t, "tcp", true, false) }
  42. func TestProxy_Unix_Insecure_DelayTx(t *testing.T) { testProxy(t, "unix", false, true) }
  43. func TestProxy_TCP_Insecure_DelayTx(t *testing.T) { testProxy(t, "tcp", false, true) }
  44. func TestProxy_Unix_Secure_DelayTx(t *testing.T) { testProxy(t, "unix", true, true) }
  45. func TestProxy_TCP_Secure_DelayTx(t *testing.T) { testProxy(t, "tcp", true, true) }
  46. func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
  47. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  48. if scheme == "tcp" {
  49. ln1, ln2 := listen(t, "tcp", "localhost:0", TLSInfo{}), listen(t, "tcp", "localhost:0", TLSInfo{})
  50. srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
  51. ln1.Close()
  52. ln2.Close()
  53. } else {
  54. defer func() {
  55. os.RemoveAll(srcAddr)
  56. os.RemoveAll(dstAddr)
  57. }()
  58. }
  59. tlsInfo := testTLSInfo
  60. if !secure {
  61. tlsInfo = TLSInfo{}
  62. }
  63. ln := listen(t, scheme, dstAddr, tlsInfo)
  64. defer ln.Close()
  65. cfg := ProxyConfig{
  66. Logger: testLogger,
  67. From: url.URL{Scheme: scheme, Host: srcAddr},
  68. To: url.URL{Scheme: scheme, Host: dstAddr},
  69. }
  70. if secure {
  71. cfg.TLSInfo = testTLSInfo
  72. }
  73. p := NewProxy(cfg)
  74. <-p.Ready()
  75. defer p.Close()
  76. data1 := []byte("Hello World!")
  77. donec, writec := make(chan struct{}), make(chan []byte)
  78. go func() {
  79. defer close(donec)
  80. for data := range writec {
  81. send(t, data, scheme, srcAddr, tlsInfo)
  82. }
  83. }()
  84. recvc := make(chan []byte)
  85. go func() {
  86. for i := 0; i < 2; i++ {
  87. recvc <- receive(t, ln)
  88. }
  89. }()
  90. writec <- data1
  91. now := time.Now()
  92. if d := <-recvc; !bytes.Equal(data1, d) {
  93. t.Fatalf("expected %q, got %q", string(data1), string(d))
  94. }
  95. took1 := time.Since(now)
  96. t.Logf("took %v with no latency", took1)
  97. lat, rv := 50*time.Millisecond, 5*time.Millisecond
  98. if delayTx {
  99. p.DelayTx(lat, rv)
  100. }
  101. data2 := []byte("new data")
  102. writec <- data2
  103. now = time.Now()
  104. if d := <-recvc; !bytes.Equal(data2, d) {
  105. t.Fatalf("expected %q, got %q", string(data2), string(d))
  106. }
  107. took2 := time.Since(now)
  108. if delayTx {
  109. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  110. } else {
  111. t.Logf("took %v with no latency", took2)
  112. }
  113. if delayTx {
  114. p.UndelayTx()
  115. if took1 >= took2 {
  116. t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
  117. }
  118. }
  119. close(writec)
  120. select {
  121. case <-donec:
  122. case <-time.After(3 * time.Second):
  123. t.Fatal("took too long to write")
  124. }
  125. select {
  126. case <-p.Done():
  127. t.Fatal("unexpected done")
  128. case err := <-p.Error():
  129. t.Fatal(err)
  130. default:
  131. }
  132. if err := p.Close(); err != nil {
  133. t.Fatal(err)
  134. }
  135. select {
  136. case <-p.Done():
  137. case err := <-p.Error():
  138. if !strings.HasPrefix(err.Error(), "accept ") &&
  139. !strings.HasSuffix(err.Error(), "use of closed network connection") {
  140. t.Fatal(err)
  141. }
  142. case <-time.After(3 * time.Second):
  143. t.Fatal("took too long to close")
  144. }
  145. }
  146. func TestProxy_Unix_Insecure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, false) }
  147. func TestProxy_Unix_Secure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, true) }
  148. func testProxyDelayAccept(t *testing.T, secure bool) {
  149. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  150. defer func() {
  151. os.RemoveAll(srcAddr)
  152. os.RemoveAll(dstAddr)
  153. }()
  154. tlsInfo := testTLSInfo
  155. if !secure {
  156. tlsInfo = TLSInfo{}
  157. }
  158. scheme := "unix"
  159. ln := listen(t, scheme, dstAddr, tlsInfo)
  160. defer ln.Close()
  161. cfg := ProxyConfig{
  162. Logger: testLogger,
  163. From: url.URL{Scheme: scheme, Host: srcAddr},
  164. To: url.URL{Scheme: scheme, Host: dstAddr},
  165. }
  166. if secure {
  167. cfg.TLSInfo = testTLSInfo
  168. }
  169. p := NewProxy(cfg)
  170. <-p.Ready()
  171. defer p.Close()
  172. data := []byte("Hello World!")
  173. now := time.Now()
  174. send(t, data, scheme, srcAddr, tlsInfo)
  175. if d := receive(t, ln); !bytes.Equal(data, d) {
  176. t.Fatalf("expected %q, got %q", string(data), string(d))
  177. }
  178. took1 := time.Since(now)
  179. t.Logf("took %v with no latency", took1)
  180. lat, rv := 700*time.Millisecond, 10*time.Millisecond
  181. p.DelayAccept(lat, rv)
  182. defer p.UndelayAccept()
  183. if err := p.ResetListener(); err != nil {
  184. t.Fatal(err)
  185. }
  186. time.Sleep(200 * time.Millisecond)
  187. now = time.Now()
  188. send(t, data, scheme, srcAddr, tlsInfo)
  189. if d := receive(t, ln); !bytes.Equal(data, d) {
  190. t.Fatalf("expected %q, got %q", string(data), string(d))
  191. }
  192. took2 := time.Since(now)
  193. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  194. if took1 >= took2 {
  195. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  196. }
  197. }
  198. func TestProxy_PauseTx(t *testing.T) {
  199. scheme := "unix"
  200. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  201. defer func() {
  202. os.RemoveAll(srcAddr)
  203. os.RemoveAll(dstAddr)
  204. }()
  205. ln := listen(t, scheme, dstAddr, TLSInfo{})
  206. defer ln.Close()
  207. p := NewProxy(ProxyConfig{
  208. Logger: testLogger,
  209. From: url.URL{Scheme: scheme, Host: srcAddr},
  210. To: url.URL{Scheme: scheme, Host: dstAddr},
  211. })
  212. <-p.Ready()
  213. defer p.Close()
  214. p.PauseTx()
  215. data := []byte("Hello World!")
  216. send(t, data, scheme, srcAddr, TLSInfo{})
  217. recvc := make(chan []byte)
  218. go func() {
  219. recvc <- receive(t, ln)
  220. }()
  221. select {
  222. case d := <-recvc:
  223. t.Fatalf("received unexpected data %q during pause", string(d))
  224. case <-time.After(200 * time.Millisecond):
  225. }
  226. p.UnpauseTx()
  227. select {
  228. case d := <-recvc:
  229. if !bytes.Equal(data, d) {
  230. t.Fatalf("expected %q, got %q", string(data), string(d))
  231. }
  232. case <-time.After(2 * time.Second):
  233. t.Fatal("took too long to receive after unpause")
  234. }
  235. }
  236. func TestProxy_BlackholeTx(t *testing.T) {
  237. scheme := "unix"
  238. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  239. defer func() {
  240. os.RemoveAll(srcAddr)
  241. os.RemoveAll(dstAddr)
  242. }()
  243. ln := listen(t, scheme, dstAddr, TLSInfo{})
  244. defer ln.Close()
  245. p := NewProxy(ProxyConfig{
  246. Logger: testLogger,
  247. From: url.URL{Scheme: scheme, Host: srcAddr},
  248. To: url.URL{Scheme: scheme, Host: dstAddr},
  249. })
  250. <-p.Ready()
  251. defer p.Close()
  252. p.BlackholeTx()
  253. data := []byte("Hello World!")
  254. send(t, data, scheme, srcAddr, TLSInfo{})
  255. recvc := make(chan []byte)
  256. go func() {
  257. recvc <- receive(t, ln)
  258. }()
  259. select {
  260. case d := <-recvc:
  261. t.Fatalf("unexpected data receive %q during blackhole", string(d))
  262. case <-time.After(200 * time.Millisecond):
  263. }
  264. p.UnblackholeTx()
  265. // expect different data, old data dropped
  266. data[0]++
  267. send(t, data, scheme, srcAddr, TLSInfo{})
  268. select {
  269. case d := <-recvc:
  270. if !bytes.Equal(data, d) {
  271. t.Fatalf("expected %q, got %q", string(data), string(d))
  272. }
  273. case <-time.After(2 * time.Second):
  274. t.Fatal("took too long to receive after unblackhole")
  275. }
  276. }
  277. func TestProxy_CorruptTx(t *testing.T) {
  278. scheme := "unix"
  279. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  280. defer func() {
  281. os.RemoveAll(srcAddr)
  282. os.RemoveAll(dstAddr)
  283. }()
  284. ln := listen(t, scheme, dstAddr, TLSInfo{})
  285. defer ln.Close()
  286. p := NewProxy(ProxyConfig{
  287. Logger: testLogger,
  288. From: url.URL{Scheme: scheme, Host: srcAddr},
  289. To: url.URL{Scheme: scheme, Host: dstAddr},
  290. })
  291. <-p.Ready()
  292. defer p.Close()
  293. p.CorruptTx(func(d []byte) []byte {
  294. d[len(d)/2]++
  295. return d
  296. })
  297. data := []byte("Hello World!")
  298. send(t, data, scheme, srcAddr, TLSInfo{})
  299. if d := receive(t, ln); bytes.Equal(d, data) {
  300. t.Fatalf("expected corrupted data, got %q", string(d))
  301. }
  302. p.UncorruptTx()
  303. send(t, data, scheme, srcAddr, TLSInfo{})
  304. if d := receive(t, ln); !bytes.Equal(d, data) {
  305. t.Fatalf("expected uncorrupted data, got %q", string(d))
  306. }
  307. }
  308. func TestProxy_Shutdown(t *testing.T) {
  309. scheme := "unix"
  310. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  311. defer func() {
  312. os.RemoveAll(srcAddr)
  313. os.RemoveAll(dstAddr)
  314. }()
  315. ln := listen(t, scheme, dstAddr, TLSInfo{})
  316. defer ln.Close()
  317. p := NewProxy(ProxyConfig{
  318. Logger: testLogger,
  319. From: url.URL{Scheme: scheme, Host: srcAddr},
  320. To: url.URL{Scheme: scheme, Host: dstAddr},
  321. })
  322. <-p.Ready()
  323. defer p.Close()
  324. px, _ := p.(*proxy)
  325. px.listener.Close()
  326. time.Sleep(200 * time.Millisecond)
  327. data := []byte("Hello World!")
  328. send(t, data, scheme, srcAddr, TLSInfo{})
  329. if d := receive(t, ln); !bytes.Equal(d, data) {
  330. t.Fatalf("expected %q, got %q", string(data), string(d))
  331. }
  332. }
  333. func TestProxy_ShutdownListener(t *testing.T) {
  334. scheme := "unix"
  335. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  336. defer func() {
  337. os.RemoveAll(srcAddr)
  338. os.RemoveAll(dstAddr)
  339. }()
  340. ln := listen(t, scheme, dstAddr, TLSInfo{})
  341. defer ln.Close()
  342. p := NewProxy(ProxyConfig{
  343. Logger: testLogger,
  344. From: url.URL{Scheme: scheme, Host: srcAddr},
  345. To: url.URL{Scheme: scheme, Host: dstAddr},
  346. })
  347. <-p.Ready()
  348. defer p.Close()
  349. // shut down destination
  350. ln.Close()
  351. time.Sleep(200 * time.Millisecond)
  352. ln = listen(t, scheme, dstAddr, TLSInfo{})
  353. defer ln.Close()
  354. data := []byte("Hello World!")
  355. send(t, data, scheme, srcAddr, TLSInfo{})
  356. if d := receive(t, ln); !bytes.Equal(d, data) {
  357. t.Fatalf("expected %q, got %q", string(data), string(d))
  358. }
  359. }
  360. func TestProxyHTTP_Insecure_DelayTx(t *testing.T) { testProxyHTTP(t, false, true) }
  361. func TestProxyHTTP_Secure_DelayTx(t *testing.T) { testProxyHTTP(t, true, true) }
  362. func TestProxyHTTP_Insecure_DelayRx(t *testing.T) { testProxyHTTP(t, false, false) }
  363. func TestProxyHTTP_Secure_DelayRx(t *testing.T) { testProxyHTTP(t, true, false) }
  364. func testProxyHTTP(t *testing.T, secure, delayTx bool) {
  365. scheme := "tcp"
  366. ln1, ln2 := listen(t, scheme, "localhost:0", TLSInfo{}), listen(t, scheme, "localhost:0", TLSInfo{})
  367. srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
  368. ln1.Close()
  369. ln2.Close()
  370. mux := http.NewServeMux()
  371. mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
  372. d, err := ioutil.ReadAll(req.Body)
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
  377. t.Fatal(err)
  378. }
  379. })
  380. var tlsConfig *tls.Config
  381. var err error
  382. if secure {
  383. tlsConfig, err = testTLSInfo.ServerConfig()
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. }
  388. srv := &http.Server{
  389. Addr: dstAddr,
  390. Handler: mux,
  391. TLSConfig: tlsConfig,
  392. }
  393. donec := make(chan struct{})
  394. defer func() {
  395. srv.Close()
  396. <-donec
  397. }()
  398. go func() {
  399. defer close(donec)
  400. if !secure {
  401. srv.ListenAndServe()
  402. } else {
  403. srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
  404. }
  405. }()
  406. time.Sleep(200 * time.Millisecond)
  407. cfg := ProxyConfig{
  408. Logger: testLogger,
  409. From: url.URL{Scheme: scheme, Host: srcAddr},
  410. To: url.URL{Scheme: scheme, Host: dstAddr},
  411. }
  412. if secure {
  413. cfg.TLSInfo = testTLSInfo
  414. }
  415. p := NewProxy(cfg)
  416. <-p.Ready()
  417. defer p.Close()
  418. data := "Hello World!"
  419. now := time.Now()
  420. var resp *http.Response
  421. if secure {
  422. tp, terr := NewTransport(testTLSInfo, 3*time.Second)
  423. if terr != nil {
  424. t.Fatal(terr)
  425. }
  426. cli := &http.Client{Transport: tp}
  427. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  428. } else {
  429. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  430. }
  431. if err != nil {
  432. t.Fatal(err)
  433. }
  434. d, err := ioutil.ReadAll(resp.Body)
  435. if err != nil {
  436. t.Fatal(err)
  437. }
  438. took1 := time.Since(now)
  439. t.Logf("took %v with no latency", took1)
  440. rs1 := string(d)
  441. exp := fmt.Sprintf("%q(confirmed)", data)
  442. if rs1 != exp {
  443. t.Fatalf("got %q, expected %q", rs1, exp)
  444. }
  445. lat, rv := 100*time.Millisecond, 10*time.Millisecond
  446. if delayTx {
  447. p.DelayTx(lat, rv)
  448. defer p.UndelayTx()
  449. } else {
  450. p.DelayRx(lat, rv)
  451. defer p.UndelayRx()
  452. }
  453. now = time.Now()
  454. if secure {
  455. tp, terr := NewTransport(testTLSInfo, 3*time.Second)
  456. if terr != nil {
  457. t.Fatal(terr)
  458. }
  459. cli := &http.Client{Transport: tp}
  460. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  461. } else {
  462. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  463. }
  464. if err != nil {
  465. t.Fatal(err)
  466. }
  467. d, err = ioutil.ReadAll(resp.Body)
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. took2 := time.Since(now)
  472. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  473. rs2 := string(d)
  474. if rs2 != exp {
  475. t.Fatalf("got %q, expected %q", rs2, exp)
  476. }
  477. if took1 > took2 {
  478. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  479. }
  480. }
  481. func newUnixAddr() string {
  482. now := time.Now().UnixNano()
  483. rand.Seed(now)
  484. addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
  485. os.RemoveAll(addr)
  486. return addr
  487. }
  488. func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener) {
  489. var err error
  490. if !tlsInfo.Empty() {
  491. ln, err = NewListener(addr, scheme, &tlsInfo)
  492. } else {
  493. ln, err = net.Listen(scheme, addr)
  494. }
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. return ln
  499. }
  500. func send(t *testing.T, data []byte, scheme, addr string, tlsInfo TLSInfo) {
  501. var out net.Conn
  502. var err error
  503. if !tlsInfo.Empty() {
  504. tp, terr := NewTransport(tlsInfo, 3*time.Second)
  505. if terr != nil {
  506. t.Fatal(terr)
  507. }
  508. out, err = tp.Dial(scheme, addr)
  509. } else {
  510. out, err = net.Dial(scheme, addr)
  511. }
  512. if err != nil {
  513. t.Fatal(err)
  514. }
  515. if _, err = out.Write(data); err != nil {
  516. t.Fatal(err)
  517. }
  518. if err = out.Close(); err != nil {
  519. t.Fatal(err)
  520. }
  521. }
  522. func receive(t *testing.T, ln net.Listener) (data []byte) {
  523. buf := bytes.NewBuffer(make([]byte, 0, 1024))
  524. for {
  525. in, err := ln.Accept()
  526. if err != nil {
  527. t.Fatal(err)
  528. }
  529. var n int64
  530. n, err = buf.ReadFrom(in)
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. if n > 0 {
  535. break
  536. }
  537. }
  538. return buf.Bytes()
  539. }