proxy_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transport
  15. import (
  16. "bytes"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "strings"
  26. "testing"
  27. "time"
  28. "google.golang.org/grpc/grpclog"
  29. )
  30. var testTLSInfo = TLSInfo{
  31. KeyFile: "./fixtures/server.key.insecure",
  32. CertFile: "./fixtures/server.crt",
  33. TrustedCAFile: "./fixtures/ca.crt",
  34. ClientCertAuth: true,
  35. }
  36. func TestProxy_Unix_Insecure(t *testing.T) { testProxy(t, "unix", false, false) }
  37. func TestProxy_TCP_Insecure(t *testing.T) { testProxy(t, "tcp", false, false) }
  38. func TestProxy_Unix_Secure(t *testing.T) { testProxy(t, "unix", true, false) }
  39. func TestProxy_TCP_Secure(t *testing.T) { testProxy(t, "tcp", true, false) }
  40. func TestProxy_Unix_Insecure_DelayTx(t *testing.T) { testProxy(t, "unix", false, true) }
  41. func TestProxy_TCP_Insecure_DelayTx(t *testing.T) { testProxy(t, "tcp", false, true) }
  42. func TestProxy_Unix_Secure_DelayTx(t *testing.T) { testProxy(t, "unix", true, true) }
  43. func TestProxy_TCP_Secure_DelayTx(t *testing.T) { testProxy(t, "tcp", true, true) }
  44. func testProxy(t *testing.T, scheme string, secure bool, delayTx bool) {
  45. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  46. if scheme == "tcp" {
  47. ln1, ln2 := listen(t, "tcp", "localhost:0", TLSInfo{}), listen(t, "tcp", "localhost:0", TLSInfo{})
  48. srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
  49. ln1.Close()
  50. ln2.Close()
  51. } else {
  52. defer func() {
  53. os.RemoveAll(srcAddr)
  54. os.RemoveAll(dstAddr)
  55. }()
  56. }
  57. tlsInfo := testTLSInfo
  58. if !secure {
  59. tlsInfo = TLSInfo{}
  60. }
  61. ln := listen(t, scheme, dstAddr, tlsInfo)
  62. defer ln.Close()
  63. cfg := ProxyConfig{
  64. From: url.URL{Scheme: scheme, Host: srcAddr},
  65. To: url.URL{Scheme: scheme, Host: dstAddr},
  66. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  67. }
  68. if secure {
  69. cfg.TLSInfo = testTLSInfo
  70. }
  71. p := NewProxy(cfg)
  72. <-p.Ready()
  73. defer p.Close()
  74. data1 := []byte("Hello World!")
  75. donec, writec := make(chan struct{}), make(chan []byte)
  76. go func() {
  77. defer close(donec)
  78. for data := range writec {
  79. send(t, data, scheme, srcAddr, tlsInfo)
  80. }
  81. }()
  82. recvc := make(chan []byte)
  83. go func() {
  84. for i := 0; i < 2; i++ {
  85. recvc <- receive(t, ln)
  86. }
  87. }()
  88. writec <- data1
  89. now := time.Now()
  90. if d := <-recvc; !bytes.Equal(data1, d) {
  91. t.Fatalf("expected %q, got %q", string(data1), string(d))
  92. }
  93. took1 := time.Since(now)
  94. t.Logf("took %v with no latency", took1)
  95. lat, rv := 50*time.Millisecond, 5*time.Millisecond
  96. if delayTx {
  97. p.DelayTx(lat, rv)
  98. }
  99. data2 := []byte("new data")
  100. writec <- data2
  101. now = time.Now()
  102. if d := <-recvc; !bytes.Equal(data2, d) {
  103. t.Fatalf("expected %q, got %q", string(data2), string(d))
  104. }
  105. took2 := time.Since(now)
  106. if delayTx {
  107. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  108. } else {
  109. t.Logf("took %v with no latency", took2)
  110. }
  111. if delayTx {
  112. p.UndelayTx()
  113. if took1 >= took2 {
  114. t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
  115. }
  116. }
  117. close(writec)
  118. select {
  119. case <-donec:
  120. case <-time.After(3 * time.Second):
  121. t.Fatal("took too long to write")
  122. }
  123. select {
  124. case <-p.Done():
  125. t.Fatal("unexpected done")
  126. case err := <-p.Error():
  127. t.Fatal(err)
  128. default:
  129. }
  130. if err := p.Close(); err != nil {
  131. t.Fatal(err)
  132. }
  133. select {
  134. case <-p.Done():
  135. case err := <-p.Error():
  136. if !strings.HasPrefix(err.Error(), "accept ") &&
  137. !strings.HasSuffix(err.Error(), "use of closed network connection") {
  138. t.Fatal(err)
  139. }
  140. case <-time.After(3 * time.Second):
  141. t.Fatal("took too long to close")
  142. }
  143. }
  144. func TestProxy_Unix_Insecure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, false) }
  145. func TestProxy_Unix_Secure_DelayAccept(t *testing.T) { testProxyDelayAccept(t, true) }
  146. func testProxyDelayAccept(t *testing.T, secure bool) {
  147. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  148. defer func() {
  149. os.RemoveAll(srcAddr)
  150. os.RemoveAll(dstAddr)
  151. }()
  152. tlsInfo := testTLSInfo
  153. if !secure {
  154. tlsInfo = TLSInfo{}
  155. }
  156. scheme := "unix"
  157. ln := listen(t, scheme, dstAddr, tlsInfo)
  158. defer ln.Close()
  159. cfg := ProxyConfig{
  160. From: url.URL{Scheme: scheme, Host: srcAddr},
  161. To: url.URL{Scheme: scheme, Host: dstAddr},
  162. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  163. }
  164. if secure {
  165. cfg.TLSInfo = testTLSInfo
  166. }
  167. p := NewProxy(cfg)
  168. <-p.Ready()
  169. defer p.Close()
  170. data := []byte("Hello World!")
  171. now := time.Now()
  172. send(t, data, scheme, srcAddr, tlsInfo)
  173. if d := receive(t, ln); !bytes.Equal(data, d) {
  174. t.Fatalf("expected %q, got %q", string(data), string(d))
  175. }
  176. took1 := time.Since(now)
  177. t.Logf("took %v with no latency", took1)
  178. lat, rv := 700*time.Millisecond, 10*time.Millisecond
  179. p.DelayAccept(lat, rv)
  180. defer p.UndelayAccept()
  181. if err := p.ResetListener(); err != nil {
  182. t.Fatal(err)
  183. }
  184. time.Sleep(200 * time.Millisecond)
  185. now = time.Now()
  186. send(t, data, scheme, srcAddr, tlsInfo)
  187. if d := receive(t, ln); !bytes.Equal(data, d) {
  188. t.Fatalf("expected %q, got %q", string(data), string(d))
  189. }
  190. took2 := time.Since(now)
  191. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  192. if took1 >= took2 {
  193. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  194. }
  195. }
  196. func TestProxy_PauseTx(t *testing.T) {
  197. scheme := "unix"
  198. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  199. defer func() {
  200. os.RemoveAll(srcAddr)
  201. os.RemoveAll(dstAddr)
  202. }()
  203. ln := listen(t, scheme, dstAddr, TLSInfo{})
  204. defer ln.Close()
  205. p := NewProxy(ProxyConfig{
  206. From: url.URL{Scheme: scheme, Host: srcAddr},
  207. To: url.URL{Scheme: scheme, Host: dstAddr},
  208. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  209. })
  210. <-p.Ready()
  211. defer p.Close()
  212. p.PauseTx()
  213. data := []byte("Hello World!")
  214. send(t, data, scheme, srcAddr, TLSInfo{})
  215. recvc := make(chan []byte)
  216. go func() {
  217. recvc <- receive(t, ln)
  218. }()
  219. select {
  220. case d := <-recvc:
  221. t.Fatalf("received unexpected data %q during pause", string(d))
  222. case <-time.After(200 * time.Millisecond):
  223. }
  224. p.UnpauseTx()
  225. select {
  226. case d := <-recvc:
  227. if !bytes.Equal(data, d) {
  228. t.Fatalf("expected %q, got %q", string(data), string(d))
  229. }
  230. case <-time.After(2 * time.Second):
  231. t.Fatal("took too long to receive after unpause")
  232. }
  233. }
  234. func TestProxy_BlackholeTx(t *testing.T) {
  235. scheme := "unix"
  236. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  237. defer func() {
  238. os.RemoveAll(srcAddr)
  239. os.RemoveAll(dstAddr)
  240. }()
  241. ln := listen(t, scheme, dstAddr, TLSInfo{})
  242. defer ln.Close()
  243. p := NewProxy(ProxyConfig{
  244. From: url.URL{Scheme: scheme, Host: srcAddr},
  245. To: url.URL{Scheme: scheme, Host: dstAddr},
  246. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  247. })
  248. <-p.Ready()
  249. defer p.Close()
  250. p.BlackholeTx()
  251. data := []byte("Hello World!")
  252. send(t, data, scheme, srcAddr, TLSInfo{})
  253. recvc := make(chan []byte)
  254. go func() {
  255. recvc <- receive(t, ln)
  256. }()
  257. select {
  258. case d := <-recvc:
  259. t.Fatalf("unexpected data receive %q during blackhole", string(d))
  260. case <-time.After(200 * time.Millisecond):
  261. }
  262. p.UnblackholeTx()
  263. // expect different data, old data dropped
  264. data[0]++
  265. send(t, data, scheme, srcAddr, TLSInfo{})
  266. select {
  267. case d := <-recvc:
  268. if !bytes.Equal(data, d) {
  269. t.Fatalf("expected %q, got %q", string(data), string(d))
  270. }
  271. case <-time.After(2 * time.Second):
  272. t.Fatal("took too long to receive after unblackhole")
  273. }
  274. }
  275. func TestProxy_CorruptTx(t *testing.T) {
  276. scheme := "unix"
  277. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  278. defer func() {
  279. os.RemoveAll(srcAddr)
  280. os.RemoveAll(dstAddr)
  281. }()
  282. ln := listen(t, scheme, dstAddr, TLSInfo{})
  283. defer ln.Close()
  284. p := NewProxy(ProxyConfig{
  285. From: url.URL{Scheme: scheme, Host: srcAddr},
  286. To: url.URL{Scheme: scheme, Host: dstAddr},
  287. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  288. })
  289. <-p.Ready()
  290. defer p.Close()
  291. p.CorruptTx(func(d []byte) []byte {
  292. d[len(d)/2]++
  293. return d
  294. })
  295. data := []byte("Hello World!")
  296. send(t, data, scheme, srcAddr, TLSInfo{})
  297. if d := receive(t, ln); bytes.Equal(d, data) {
  298. t.Fatalf("expected corrupted data, got %q", string(d))
  299. }
  300. p.UncorruptTx()
  301. send(t, data, scheme, srcAddr, TLSInfo{})
  302. if d := receive(t, ln); !bytes.Equal(d, data) {
  303. t.Fatalf("expected uncorrupted data, got %q", string(d))
  304. }
  305. }
  306. func TestProxy_Shutdown(t *testing.T) {
  307. scheme := "unix"
  308. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  309. defer func() {
  310. os.RemoveAll(srcAddr)
  311. os.RemoveAll(dstAddr)
  312. }()
  313. ln := listen(t, scheme, dstAddr, TLSInfo{})
  314. defer ln.Close()
  315. p := NewProxy(ProxyConfig{
  316. From: url.URL{Scheme: scheme, Host: srcAddr},
  317. To: url.URL{Scheme: scheme, Host: dstAddr},
  318. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  319. })
  320. <-p.Ready()
  321. defer p.Close()
  322. px, _ := p.(*proxy)
  323. px.listener.Close()
  324. time.Sleep(200 * time.Millisecond)
  325. data := []byte("Hello World!")
  326. send(t, data, scheme, srcAddr, TLSInfo{})
  327. if d := receive(t, ln); !bytes.Equal(d, data) {
  328. t.Fatalf("expected %q, got %q", string(data), string(d))
  329. }
  330. }
  331. func TestProxy_ShutdownListener(t *testing.T) {
  332. scheme := "unix"
  333. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  334. defer func() {
  335. os.RemoveAll(srcAddr)
  336. os.RemoveAll(dstAddr)
  337. }()
  338. ln := listen(t, scheme, dstAddr, TLSInfo{})
  339. defer ln.Close()
  340. p := NewProxy(ProxyConfig{
  341. From: url.URL{Scheme: scheme, Host: srcAddr},
  342. To: url.URL{Scheme: scheme, Host: dstAddr},
  343. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  344. })
  345. <-p.Ready()
  346. defer p.Close()
  347. // shut down destination
  348. ln.Close()
  349. time.Sleep(200 * time.Millisecond)
  350. ln = listen(t, scheme, dstAddr, TLSInfo{})
  351. defer ln.Close()
  352. data := []byte("Hello World!")
  353. send(t, data, scheme, srcAddr, TLSInfo{})
  354. if d := receive(t, ln); !bytes.Equal(d, data) {
  355. t.Fatalf("expected %q, got %q", string(data), string(d))
  356. }
  357. }
  358. func TestProxyHTTP_Insecure_DelayTx(t *testing.T) { testProxyHTTP(t, false, true) }
  359. func TestProxyHTTP_Secure_DelayTx(t *testing.T) { testProxyHTTP(t, true, true) }
  360. func TestProxyHTTP_Insecure_DelayRx(t *testing.T) { testProxyHTTP(t, false, false) }
  361. func TestProxyHTTP_Secure_DelayRx(t *testing.T) { testProxyHTTP(t, true, false) }
  362. func testProxyHTTP(t *testing.T, secure, delayTx bool) {
  363. scheme := "tcp"
  364. ln1, ln2 := listen(t, scheme, "localhost:0", TLSInfo{}), listen(t, scheme, "localhost:0", TLSInfo{})
  365. srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
  366. ln1.Close()
  367. ln2.Close()
  368. mux := http.NewServeMux()
  369. mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
  370. d, err := ioutil.ReadAll(req.Body)
  371. if err != nil {
  372. t.Fatal(err)
  373. }
  374. if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
  375. t.Fatal(err)
  376. }
  377. })
  378. var tlsConfig *tls.Config
  379. var err error
  380. if secure {
  381. tlsConfig, err = testTLSInfo.ServerConfig()
  382. if err != nil {
  383. t.Fatal(err)
  384. }
  385. }
  386. srv := &http.Server{
  387. Addr: dstAddr,
  388. Handler: mux,
  389. TLSConfig: tlsConfig,
  390. }
  391. donec := make(chan struct{})
  392. defer func() {
  393. srv.Close()
  394. <-donec
  395. }()
  396. go func() {
  397. defer close(donec)
  398. if !secure {
  399. srv.ListenAndServe()
  400. } else {
  401. srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
  402. }
  403. }()
  404. time.Sleep(200 * time.Millisecond)
  405. cfg := ProxyConfig{
  406. From: url.URL{Scheme: scheme, Host: srcAddr},
  407. To: url.URL{Scheme: scheme, Host: dstAddr},
  408. Logger: grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5),
  409. }
  410. if secure {
  411. cfg.TLSInfo = testTLSInfo
  412. }
  413. p := NewProxy(cfg)
  414. <-p.Ready()
  415. defer p.Close()
  416. data := "Hello World!"
  417. now := time.Now()
  418. var resp *http.Response
  419. if secure {
  420. tp, terr := NewTransport(testTLSInfo, 3*time.Second)
  421. if terr != nil {
  422. t.Fatal(terr)
  423. }
  424. cli := &http.Client{Transport: tp}
  425. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  426. } else {
  427. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  428. }
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. d, err := ioutil.ReadAll(resp.Body)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. took1 := time.Since(now)
  437. t.Logf("took %v with no latency", took1)
  438. rs1 := string(d)
  439. exp := fmt.Sprintf("%q(confirmed)", data)
  440. if rs1 != exp {
  441. t.Fatalf("got %q, expected %q", rs1, exp)
  442. }
  443. lat, rv := 100*time.Millisecond, 10*time.Millisecond
  444. if delayTx {
  445. p.DelayTx(lat, rv)
  446. defer p.UndelayTx()
  447. } else {
  448. p.DelayRx(lat, rv)
  449. defer p.UndelayRx()
  450. }
  451. now = time.Now()
  452. if secure {
  453. tp, terr := NewTransport(testTLSInfo, 3*time.Second)
  454. if terr != nil {
  455. t.Fatal(terr)
  456. }
  457. cli := &http.Client{Transport: tp}
  458. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  459. } else {
  460. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  461. }
  462. if err != nil {
  463. t.Fatal(err)
  464. }
  465. d, err = ioutil.ReadAll(resp.Body)
  466. if err != nil {
  467. t.Fatal(err)
  468. }
  469. took2 := time.Since(now)
  470. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  471. rs2 := string(d)
  472. if rs2 != exp {
  473. t.Fatalf("got %q, expected %q", rs2, exp)
  474. }
  475. if took1 > took2 {
  476. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  477. }
  478. }
  479. func newUnixAddr() string {
  480. now := time.Now().UnixNano()
  481. rand.Seed(now)
  482. addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
  483. os.RemoveAll(addr)
  484. return addr
  485. }
  486. func listen(t *testing.T, scheme, addr string, tlsInfo TLSInfo) (ln net.Listener) {
  487. var err error
  488. if !tlsInfo.Empty() {
  489. ln, err = NewListener(addr, scheme, &tlsInfo)
  490. } else {
  491. ln, err = net.Listen(scheme, addr)
  492. }
  493. if err != nil {
  494. t.Fatal(err)
  495. }
  496. return ln
  497. }
  498. func send(t *testing.T, data []byte, scheme, addr string, tlsInfo TLSInfo) {
  499. var out net.Conn
  500. var err error
  501. if !tlsInfo.Empty() {
  502. tp, terr := NewTransport(tlsInfo, 3*time.Second)
  503. if terr != nil {
  504. t.Fatal(terr)
  505. }
  506. out, err = tp.Dial(scheme, addr)
  507. } else {
  508. out, err = net.Dial(scheme, addr)
  509. }
  510. if err != nil {
  511. t.Fatal(err)
  512. }
  513. if _, err = out.Write(data); err != nil {
  514. t.Fatal(err)
  515. }
  516. if err = out.Close(); err != nil {
  517. t.Fatal(err)
  518. }
  519. }
  520. func receive(t *testing.T, ln net.Listener) (data []byte) {
  521. buf := bytes.NewBuffer(make([]byte, 0, 1024))
  522. for {
  523. in, err := ln.Accept()
  524. if err != nil {
  525. t.Fatal(err)
  526. }
  527. var n int64
  528. n, err = buf.ReadFrom(in)
  529. if err != nil {
  530. t.Fatal(err)
  531. }
  532. if n > 0 {
  533. break
  534. }
  535. }
  536. return buf.Bytes()
  537. }