server_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "strings"
  26. "testing"
  27. "time"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "go.uber.org/zap"
  30. )
  31. // enable DebugLevel
  32. var testLogger = zap.NewExample()
  33. var testTLSInfo = transport.TLSInfo{
  34. KeyFile: "./fixtures/server.key.insecure",
  35. CertFile: "./fixtures/server.crt",
  36. TrustedCAFile: "./fixtures/ca.crt",
  37. ClientCertAuth: true,
  38. }
  39. func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) }
  40. func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) }
  41. func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) }
  42. func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) }
  43. func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
  44. func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) }
  45. func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) }
  46. func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) }
  47. func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
  48. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  49. if scheme == "tcp" {
  50. ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
  51. srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
  52. ln1.Close()
  53. ln2.Close()
  54. } else {
  55. defer func() {
  56. os.RemoveAll(srcAddr)
  57. os.RemoveAll(dstAddr)
  58. }()
  59. }
  60. tlsInfo := testTLSInfo
  61. if !secure {
  62. tlsInfo = transport.TLSInfo{}
  63. }
  64. ln := listen(t, scheme, dstAddr, tlsInfo)
  65. defer ln.Close()
  66. cfg := ServerConfig{
  67. Logger: testLogger,
  68. From: url.URL{Scheme: scheme, Host: srcAddr},
  69. To: url.URL{Scheme: scheme, Host: dstAddr},
  70. }
  71. if secure {
  72. cfg.TLSInfo = testTLSInfo
  73. }
  74. p := NewServer(cfg)
  75. <-p.Ready()
  76. defer p.Close()
  77. data1 := []byte("Hello World!")
  78. donec, writec := make(chan struct{}), make(chan []byte)
  79. go func() {
  80. defer close(donec)
  81. for data := range writec {
  82. send(t, data, scheme, srcAddr, tlsInfo)
  83. }
  84. }()
  85. recvc := make(chan []byte)
  86. go func() {
  87. for i := 0; i < 2; i++ {
  88. recvc <- receive(t, ln)
  89. }
  90. }()
  91. writec <- data1
  92. now := time.Now()
  93. if d := <-recvc; !bytes.Equal(data1, d) {
  94. t.Fatalf("expected %q, got %q", string(data1), string(d))
  95. }
  96. took1 := time.Since(now)
  97. t.Logf("took %v with no latency", took1)
  98. lat, rv := 50*time.Millisecond, 5*time.Millisecond
  99. if delayTx {
  100. p.DelayTx(lat, rv)
  101. }
  102. data2 := []byte("new data")
  103. writec <- data2
  104. now = time.Now()
  105. if d := <-recvc; !bytes.Equal(data2, d) {
  106. t.Fatalf("expected %q, got %q", string(data2), string(d))
  107. }
  108. took2 := time.Since(now)
  109. if delayTx {
  110. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  111. } else {
  112. t.Logf("took %v with no latency", took2)
  113. }
  114. if delayTx {
  115. p.UndelayTx()
  116. if took1 >= took2 {
  117. t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
  118. }
  119. }
  120. close(writec)
  121. select {
  122. case <-donec:
  123. case <-time.After(3 * time.Second):
  124. t.Fatal("took too long to write")
  125. }
  126. select {
  127. case <-p.Done():
  128. t.Fatal("unexpected done")
  129. case err := <-p.Error():
  130. t.Fatal(err)
  131. default:
  132. }
  133. if err := p.Close(); err != nil {
  134. t.Fatal(err)
  135. }
  136. select {
  137. case <-p.Done():
  138. case err := <-p.Error():
  139. if !strings.HasPrefix(err.Error(), "accept ") &&
  140. !strings.HasSuffix(err.Error(), "use of closed network connection") {
  141. t.Fatal(err)
  142. }
  143. case <-time.After(3 * time.Second):
  144. t.Fatal("took too long to close")
  145. }
  146. }
  147. func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
  148. func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
  149. func testServerDelayAccept(t *testing.T, secure bool) {
  150. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  151. defer func() {
  152. os.RemoveAll(srcAddr)
  153. os.RemoveAll(dstAddr)
  154. }()
  155. tlsInfo := testTLSInfo
  156. if !secure {
  157. tlsInfo = transport.TLSInfo{}
  158. }
  159. scheme := "unix"
  160. ln := listen(t, scheme, dstAddr, tlsInfo)
  161. defer ln.Close()
  162. cfg := ServerConfig{
  163. Logger: testLogger,
  164. From: url.URL{Scheme: scheme, Host: srcAddr},
  165. To: url.URL{Scheme: scheme, Host: dstAddr},
  166. }
  167. if secure {
  168. cfg.TLSInfo = testTLSInfo
  169. }
  170. p := NewServer(cfg)
  171. <-p.Ready()
  172. defer p.Close()
  173. data := []byte("Hello World!")
  174. now := time.Now()
  175. send(t, data, scheme, srcAddr, tlsInfo)
  176. if d := receive(t, ln); !bytes.Equal(data, d) {
  177. t.Fatalf("expected %q, got %q", string(data), string(d))
  178. }
  179. took1 := time.Since(now)
  180. t.Logf("took %v with no latency", took1)
  181. lat, rv := 700*time.Millisecond, 10*time.Millisecond
  182. p.DelayAccept(lat, rv)
  183. defer p.UndelayAccept()
  184. if err := p.ResetListener(); err != nil {
  185. t.Fatal(err)
  186. }
  187. time.Sleep(200 * time.Millisecond)
  188. now = time.Now()
  189. send(t, data, scheme, srcAddr, tlsInfo)
  190. if d := receive(t, ln); !bytes.Equal(data, d) {
  191. t.Fatalf("expected %q, got %q", string(data), string(d))
  192. }
  193. took2 := time.Since(now)
  194. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  195. if took1 >= took2 {
  196. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  197. }
  198. }
  199. func TestServer_PauseTx(t *testing.T) {
  200. scheme := "unix"
  201. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  202. defer func() {
  203. os.RemoveAll(srcAddr)
  204. os.RemoveAll(dstAddr)
  205. }()
  206. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  207. defer ln.Close()
  208. p := NewServer(ServerConfig{
  209. Logger: testLogger,
  210. From: url.URL{Scheme: scheme, Host: srcAddr},
  211. To: url.URL{Scheme: scheme, Host: dstAddr},
  212. })
  213. <-p.Ready()
  214. defer p.Close()
  215. p.PauseTx()
  216. data := []byte("Hello World!")
  217. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  218. recvc := make(chan []byte)
  219. go func() {
  220. recvc <- receive(t, ln)
  221. }()
  222. select {
  223. case d := <-recvc:
  224. t.Fatalf("received unexpected data %q during pause", string(d))
  225. case <-time.After(200 * time.Millisecond):
  226. }
  227. p.UnpauseTx()
  228. select {
  229. case d := <-recvc:
  230. if !bytes.Equal(data, d) {
  231. t.Fatalf("expected %q, got %q", string(data), string(d))
  232. }
  233. case <-time.After(2 * time.Second):
  234. t.Fatal("took too long to receive after unpause")
  235. }
  236. }
  237. func TestServer_BlackholeTx(t *testing.T) {
  238. scheme := "unix"
  239. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  240. defer func() {
  241. os.RemoveAll(srcAddr)
  242. os.RemoveAll(dstAddr)
  243. }()
  244. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  245. defer ln.Close()
  246. p := NewServer(ServerConfig{
  247. Logger: testLogger,
  248. From: url.URL{Scheme: scheme, Host: srcAddr},
  249. To: url.URL{Scheme: scheme, Host: dstAddr},
  250. })
  251. <-p.Ready()
  252. defer p.Close()
  253. p.BlackholeTx()
  254. data := []byte("Hello World!")
  255. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  256. recvc := make(chan []byte)
  257. go func() {
  258. recvc <- receive(t, ln)
  259. }()
  260. select {
  261. case d := <-recvc:
  262. t.Fatalf("unexpected data receive %q during blackhole", string(d))
  263. case <-time.After(200 * time.Millisecond):
  264. }
  265. p.UnblackholeTx()
  266. // expect different data, old data dropped
  267. data[0]++
  268. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  269. select {
  270. case d := <-recvc:
  271. if !bytes.Equal(data, d) {
  272. t.Fatalf("expected %q, got %q", string(data), string(d))
  273. }
  274. case <-time.After(2 * time.Second):
  275. t.Fatal("took too long to receive after unblackhole")
  276. }
  277. }
  278. func TestServer_CorruptTx(t *testing.T) {
  279. scheme := "unix"
  280. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  281. defer func() {
  282. os.RemoveAll(srcAddr)
  283. os.RemoveAll(dstAddr)
  284. }()
  285. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  286. defer ln.Close()
  287. p := NewServer(ServerConfig{
  288. Logger: testLogger,
  289. From: url.URL{Scheme: scheme, Host: srcAddr},
  290. To: url.URL{Scheme: scheme, Host: dstAddr},
  291. })
  292. <-p.Ready()
  293. defer p.Close()
  294. p.CorruptTx(func(d []byte) []byte {
  295. d[len(d)/2]++
  296. return d
  297. })
  298. data := []byte("Hello World!")
  299. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  300. if d := receive(t, ln); bytes.Equal(d, data) {
  301. t.Fatalf("expected corrupted data, got %q", string(d))
  302. }
  303. p.UncorruptTx()
  304. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  305. if d := receive(t, ln); !bytes.Equal(d, data) {
  306. t.Fatalf("expected uncorrupted data, got %q", string(d))
  307. }
  308. }
  309. func TestServer_Shutdown(t *testing.T) {
  310. scheme := "unix"
  311. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  312. defer func() {
  313. os.RemoveAll(srcAddr)
  314. os.RemoveAll(dstAddr)
  315. }()
  316. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  317. defer ln.Close()
  318. p := NewServer(ServerConfig{
  319. Logger: testLogger,
  320. From: url.URL{Scheme: scheme, Host: srcAddr},
  321. To: url.URL{Scheme: scheme, Host: dstAddr},
  322. })
  323. <-p.Ready()
  324. defer p.Close()
  325. px, _ := p.(*proxyServer)
  326. px.listener.Close()
  327. time.Sleep(200 * time.Millisecond)
  328. data := []byte("Hello World!")
  329. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  330. if d := receive(t, ln); !bytes.Equal(d, data) {
  331. t.Fatalf("expected %q, got %q", string(data), string(d))
  332. }
  333. }
  334. func TestServer_ShutdownListener(t *testing.T) {
  335. scheme := "unix"
  336. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  337. defer func() {
  338. os.RemoveAll(srcAddr)
  339. os.RemoveAll(dstAddr)
  340. }()
  341. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  342. defer ln.Close()
  343. p := NewServer(ServerConfig{
  344. Logger: testLogger,
  345. From: url.URL{Scheme: scheme, Host: srcAddr},
  346. To: url.URL{Scheme: scheme, Host: dstAddr},
  347. })
  348. <-p.Ready()
  349. defer p.Close()
  350. // shut down destination
  351. ln.Close()
  352. time.Sleep(200 * time.Millisecond)
  353. ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
  354. defer ln.Close()
  355. data := []byte("Hello World!")
  356. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  357. if d := receive(t, ln); !bytes.Equal(d, data) {
  358. t.Fatalf("expected %q, got %q", string(data), string(d))
  359. }
  360. }
  361. func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
  362. func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) }
  363. func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
  364. func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) }
  365. func testServerHTTP(t *testing.T, secure, delayTx bool) {
  366. scheme := "tcp"
  367. ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
  368. srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
  369. ln1.Close()
  370. ln2.Close()
  371. mux := http.NewServeMux()
  372. mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
  373. d, err := ioutil.ReadAll(req.Body)
  374. if err != nil {
  375. t.Fatal(err)
  376. }
  377. if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
  378. t.Fatal(err)
  379. }
  380. })
  381. var tlsConfig *tls.Config
  382. var err error
  383. if secure {
  384. tlsConfig, err = testTLSInfo.ServerConfig()
  385. if err != nil {
  386. t.Fatal(err)
  387. }
  388. }
  389. srv := &http.Server{
  390. Addr: dstAddr,
  391. Handler: mux,
  392. TLSConfig: tlsConfig,
  393. }
  394. donec := make(chan struct{})
  395. defer func() {
  396. srv.Close()
  397. <-donec
  398. }()
  399. go func() {
  400. defer close(donec)
  401. if !secure {
  402. srv.ListenAndServe()
  403. } else {
  404. srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
  405. }
  406. }()
  407. time.Sleep(200 * time.Millisecond)
  408. cfg := ServerConfig{
  409. Logger: testLogger,
  410. From: url.URL{Scheme: scheme, Host: srcAddr},
  411. To: url.URL{Scheme: scheme, Host: dstAddr},
  412. }
  413. if secure {
  414. cfg.TLSInfo = testTLSInfo
  415. }
  416. p := NewServer(cfg)
  417. <-p.Ready()
  418. defer p.Close()
  419. data := "Hello World!"
  420. now := time.Now()
  421. var resp *http.Response
  422. if secure {
  423. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  424. if terr != nil {
  425. t.Fatal(terr)
  426. }
  427. cli := &http.Client{Transport: tp}
  428. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  429. } else {
  430. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  431. }
  432. if err != nil {
  433. t.Fatal(err)
  434. }
  435. d, err := ioutil.ReadAll(resp.Body)
  436. if err != nil {
  437. t.Fatal(err)
  438. }
  439. took1 := time.Since(now)
  440. t.Logf("took %v with no latency", took1)
  441. rs1 := string(d)
  442. exp := fmt.Sprintf("%q(confirmed)", data)
  443. if rs1 != exp {
  444. t.Fatalf("got %q, expected %q", rs1, exp)
  445. }
  446. lat, rv := 100*time.Millisecond, 10*time.Millisecond
  447. if delayTx {
  448. p.DelayTx(lat, rv)
  449. defer p.UndelayTx()
  450. } else {
  451. p.DelayRx(lat, rv)
  452. defer p.UndelayRx()
  453. }
  454. now = time.Now()
  455. if secure {
  456. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  457. if terr != nil {
  458. t.Fatal(terr)
  459. }
  460. cli := &http.Client{Transport: tp}
  461. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  462. } else {
  463. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  464. }
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. d, err = ioutil.ReadAll(resp.Body)
  469. if err != nil {
  470. t.Fatal(err)
  471. }
  472. took2 := time.Since(now)
  473. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  474. rs2 := string(d)
  475. if rs2 != exp {
  476. t.Fatalf("got %q, expected %q", rs2, exp)
  477. }
  478. if took1 > took2 {
  479. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  480. }
  481. }
  482. func newUnixAddr() string {
  483. now := time.Now().UnixNano()
  484. rand.Seed(now)
  485. addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
  486. os.RemoveAll(addr)
  487. return addr
  488. }
  489. func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
  490. var err error
  491. if !tlsInfo.Empty() {
  492. ln, err = transport.NewListener(addr, scheme, &tlsInfo)
  493. } else {
  494. ln, err = net.Listen(scheme, addr)
  495. }
  496. if err != nil {
  497. t.Fatal(err)
  498. }
  499. return ln
  500. }
  501. func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
  502. var out net.Conn
  503. var err error
  504. if !tlsInfo.Empty() {
  505. tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
  506. if terr != nil {
  507. t.Fatal(terr)
  508. }
  509. out, err = tp.Dial(scheme, addr)
  510. } else {
  511. out, err = net.Dial(scheme, addr)
  512. }
  513. if err != nil {
  514. t.Fatal(err)
  515. }
  516. if _, err = out.Write(data); err != nil {
  517. t.Fatal(err)
  518. }
  519. if err = out.Close(); err != nil {
  520. t.Fatal(err)
  521. }
  522. }
  523. func receive(t *testing.T, ln net.Listener) (data []byte) {
  524. buf := bytes.NewBuffer(make([]byte, 0, 1024))
  525. for {
  526. in, err := ln.Accept()
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. var n int64
  531. n, err = buf.ReadFrom(in)
  532. if err != nil {
  533. t.Fatal(err)
  534. }
  535. if n > 0 {
  536. break
  537. }
  538. }
  539. return buf.Bytes()
  540. }