server_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "strings"
  26. "testing"
  27. "time"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "go.uber.org/zap"
  30. )
  31. // enable DebugLevel
  32. var testLogger = zap.NewExample()
  33. var testTLSInfo = transport.TLSInfo{
  34. KeyFile: "./fixtures/server.key.insecure",
  35. CertFile: "./fixtures/server.crt",
  36. TrustedCAFile: "./fixtures/ca.crt",
  37. ClientCertAuth: true,
  38. }
  39. func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) }
  40. func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) }
  41. func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) }
  42. func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) }
  43. func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
  44. func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) }
  45. func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) }
  46. func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) }
  47. func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
  48. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  49. if scheme == "tcp" {
  50. ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
  51. srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
  52. ln1.Close()
  53. ln2.Close()
  54. } else {
  55. defer func() {
  56. os.RemoveAll(srcAddr)
  57. os.RemoveAll(dstAddr)
  58. }()
  59. }
  60. tlsInfo := testTLSInfo
  61. if !secure {
  62. tlsInfo = transport.TLSInfo{}
  63. }
  64. ln := listen(t, scheme, dstAddr, tlsInfo)
  65. defer ln.Close()
  66. cfg := ServerConfig{
  67. Logger: testLogger,
  68. From: url.URL{Scheme: scheme, Host: srcAddr},
  69. To: url.URL{Scheme: scheme, Host: dstAddr},
  70. }
  71. if secure {
  72. cfg.TLSInfo = testTLSInfo
  73. }
  74. p := NewServer(cfg)
  75. <-p.Ready()
  76. defer p.Close()
  77. data1 := []byte("Hello World!")
  78. donec, writec := make(chan struct{}), make(chan []byte)
  79. go func() {
  80. defer close(donec)
  81. for data := range writec {
  82. send(t, data, scheme, srcAddr, tlsInfo)
  83. }
  84. }()
  85. recvc := make(chan []byte)
  86. go func() {
  87. for i := 0; i < 2; i++ {
  88. recvc <- receive(t, ln)
  89. }
  90. }()
  91. writec <- data1
  92. now := time.Now()
  93. if d := <-recvc; !bytes.Equal(data1, d) {
  94. t.Fatalf("expected %q, got %q", string(data1), string(d))
  95. }
  96. took1 := time.Since(now)
  97. t.Logf("took %v with no latency", took1)
  98. lat, rv := 50*time.Millisecond, 5*time.Millisecond
  99. if delayTx {
  100. p.DelayTx(lat, rv)
  101. }
  102. data2 := []byte("new data")
  103. writec <- data2
  104. now = time.Now()
  105. if d := <-recvc; !bytes.Equal(data2, d) {
  106. t.Fatalf("expected %q, got %q", string(data2), string(d))
  107. }
  108. took2 := time.Since(now)
  109. if delayTx {
  110. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  111. } else {
  112. t.Logf("took %v with no latency", took2)
  113. }
  114. if delayTx {
  115. p.UndelayTx()
  116. if took1 >= took2 {
  117. t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
  118. }
  119. }
  120. close(writec)
  121. select {
  122. case <-donec:
  123. case <-time.After(3 * time.Second):
  124. t.Fatal("took too long to write")
  125. }
  126. select {
  127. case <-p.Done():
  128. t.Fatal("unexpected done")
  129. case err := <-p.Error():
  130. t.Fatal(err)
  131. default:
  132. }
  133. if err := p.Close(); err != nil {
  134. t.Fatal(err)
  135. }
  136. select {
  137. case <-p.Done():
  138. case err := <-p.Error():
  139. if !strings.HasPrefix(err.Error(), "accept ") &&
  140. !strings.HasSuffix(err.Error(), "use of closed network connection") {
  141. t.Fatal(err)
  142. }
  143. case <-time.After(3 * time.Second):
  144. t.Fatal("took too long to close")
  145. }
  146. }
  147. func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
  148. func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
  149. func testServerDelayAccept(t *testing.T, secure bool) {
  150. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  151. defer func() {
  152. os.RemoveAll(srcAddr)
  153. os.RemoveAll(dstAddr)
  154. }()
  155. tlsInfo := testTLSInfo
  156. if !secure {
  157. tlsInfo = transport.TLSInfo{}
  158. }
  159. scheme := "unix"
  160. ln := listen(t, scheme, dstAddr, tlsInfo)
  161. defer ln.Close()
  162. cfg := ServerConfig{
  163. Logger: testLogger,
  164. From: url.URL{Scheme: scheme, Host: srcAddr},
  165. To: url.URL{Scheme: scheme, Host: dstAddr},
  166. }
  167. if secure {
  168. cfg.TLSInfo = testTLSInfo
  169. }
  170. p := NewServer(cfg)
  171. <-p.Ready()
  172. defer p.Close()
  173. data := []byte("Hello World!")
  174. now := time.Now()
  175. send(t, data, scheme, srcAddr, tlsInfo)
  176. if d := receive(t, ln); !bytes.Equal(data, d) {
  177. t.Fatalf("expected %q, got %q", string(data), string(d))
  178. }
  179. took1 := time.Since(now)
  180. t.Logf("took %v with no latency", took1)
  181. lat, rv := 700*time.Millisecond, 10*time.Millisecond
  182. p.DelayAccept(lat, rv)
  183. defer p.UndelayAccept()
  184. if err := p.ResetListener(); err != nil {
  185. t.Fatal(err)
  186. }
  187. time.Sleep(200 * time.Millisecond)
  188. now = time.Now()
  189. send(t, data, scheme, srcAddr, tlsInfo)
  190. if d := receive(t, ln); !bytes.Equal(data, d) {
  191. t.Fatalf("expected %q, got %q", string(data), string(d))
  192. }
  193. took2 := time.Since(now)
  194. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  195. if took1 >= took2 {
  196. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  197. }
  198. }
  199. func TestServer_PauseTx(t *testing.T) {
  200. scheme := "unix"
  201. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  202. defer func() {
  203. os.RemoveAll(srcAddr)
  204. os.RemoveAll(dstAddr)
  205. }()
  206. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  207. defer ln.Close()
  208. p := NewServer(ServerConfig{
  209. Logger: testLogger,
  210. From: url.URL{Scheme: scheme, Host: srcAddr},
  211. To: url.URL{Scheme: scheme, Host: dstAddr},
  212. })
  213. <-p.Ready()
  214. defer p.Close()
  215. p.PauseTx()
  216. data := []byte("Hello World!")
  217. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  218. recvc := make(chan []byte)
  219. go func() {
  220. recvc <- receive(t, ln)
  221. }()
  222. select {
  223. case d := <-recvc:
  224. t.Fatalf("received unexpected data %q during pause", string(d))
  225. case <-time.After(200 * time.Millisecond):
  226. }
  227. p.UnpauseTx()
  228. select {
  229. case d := <-recvc:
  230. if !bytes.Equal(data, d) {
  231. t.Fatalf("expected %q, got %q", string(data), string(d))
  232. }
  233. case <-time.After(2 * time.Second):
  234. t.Fatal("took too long to receive after unpause")
  235. }
  236. }
  237. func TestServer_ModifyTx_corrupt(t *testing.T) {
  238. scheme := "unix"
  239. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  240. defer func() {
  241. os.RemoveAll(srcAddr)
  242. os.RemoveAll(dstAddr)
  243. }()
  244. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  245. defer ln.Close()
  246. p := NewServer(ServerConfig{
  247. Logger: testLogger,
  248. From: url.URL{Scheme: scheme, Host: srcAddr},
  249. To: url.URL{Scheme: scheme, Host: dstAddr},
  250. })
  251. <-p.Ready()
  252. defer p.Close()
  253. p.ModifyTx(func(d []byte) []byte {
  254. d[len(d)/2]++
  255. return d
  256. })
  257. data := []byte("Hello World!")
  258. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  259. if d := receive(t, ln); bytes.Equal(d, data) {
  260. t.Fatalf("expected corrupted data, got %q", string(d))
  261. }
  262. p.UnmodifyTx()
  263. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  264. if d := receive(t, ln); !bytes.Equal(d, data) {
  265. t.Fatalf("expected uncorrupted data, got %q", string(d))
  266. }
  267. }
  268. func TestServer_ModifyTx_packet_loss(t *testing.T) {
  269. scheme := "unix"
  270. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  271. defer func() {
  272. os.RemoveAll(srcAddr)
  273. os.RemoveAll(dstAddr)
  274. }()
  275. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  276. defer ln.Close()
  277. p := NewServer(ServerConfig{
  278. Logger: testLogger,
  279. From: url.URL{Scheme: scheme, Host: srcAddr},
  280. To: url.URL{Scheme: scheme, Host: dstAddr},
  281. })
  282. <-p.Ready()
  283. defer p.Close()
  284. // 50% packet loss
  285. p.ModifyTx(func(d []byte) []byte {
  286. half := len(d) / 2
  287. return d[:half:half]
  288. })
  289. data := []byte("Hello World!")
  290. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  291. if d := receive(t, ln); bytes.Equal(d, data) {
  292. t.Fatalf("expected corrupted data, got %q", string(d))
  293. }
  294. p.UnmodifyTx()
  295. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  296. if d := receive(t, ln); !bytes.Equal(d, data) {
  297. t.Fatalf("expected uncorrupted data, got %q", string(d))
  298. }
  299. }
  300. func TestServer_BlackholeTx(t *testing.T) {
  301. scheme := "unix"
  302. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  303. defer func() {
  304. os.RemoveAll(srcAddr)
  305. os.RemoveAll(dstAddr)
  306. }()
  307. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  308. defer ln.Close()
  309. p := NewServer(ServerConfig{
  310. Logger: testLogger,
  311. From: url.URL{Scheme: scheme, Host: srcAddr},
  312. To: url.URL{Scheme: scheme, Host: dstAddr},
  313. })
  314. <-p.Ready()
  315. defer p.Close()
  316. p.BlackholeTx()
  317. data := []byte("Hello World!")
  318. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  319. recvc := make(chan []byte)
  320. go func() {
  321. recvc <- receive(t, ln)
  322. }()
  323. select {
  324. case d := <-recvc:
  325. t.Fatalf("unexpected data receive %q during blackhole", string(d))
  326. case <-time.After(200 * time.Millisecond):
  327. }
  328. p.UnblackholeTx()
  329. // expect different data, old data dropped
  330. data[0]++
  331. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  332. select {
  333. case d := <-recvc:
  334. if !bytes.Equal(data, d) {
  335. t.Fatalf("expected %q, got %q", string(data), string(d))
  336. }
  337. case <-time.After(2 * time.Second):
  338. t.Fatal("took too long to receive after unblackhole")
  339. }
  340. }
  341. func TestServer_Shutdown(t *testing.T) {
  342. scheme := "unix"
  343. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  344. defer func() {
  345. os.RemoveAll(srcAddr)
  346. os.RemoveAll(dstAddr)
  347. }()
  348. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  349. defer ln.Close()
  350. p := NewServer(ServerConfig{
  351. Logger: testLogger,
  352. From: url.URL{Scheme: scheme, Host: srcAddr},
  353. To: url.URL{Scheme: scheme, Host: dstAddr},
  354. })
  355. <-p.Ready()
  356. defer p.Close()
  357. s, _ := p.(*server)
  358. s.listener.Close()
  359. time.Sleep(200 * time.Millisecond)
  360. data := []byte("Hello World!")
  361. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  362. if d := receive(t, ln); !bytes.Equal(d, data) {
  363. t.Fatalf("expected %q, got %q", string(data), string(d))
  364. }
  365. }
  366. func TestServer_ShutdownListener(t *testing.T) {
  367. scheme := "unix"
  368. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  369. defer func() {
  370. os.RemoveAll(srcAddr)
  371. os.RemoveAll(dstAddr)
  372. }()
  373. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  374. defer ln.Close()
  375. p := NewServer(ServerConfig{
  376. Logger: testLogger,
  377. From: url.URL{Scheme: scheme, Host: srcAddr},
  378. To: url.URL{Scheme: scheme, Host: dstAddr},
  379. })
  380. <-p.Ready()
  381. defer p.Close()
  382. // shut down destination
  383. ln.Close()
  384. time.Sleep(200 * time.Millisecond)
  385. ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
  386. defer ln.Close()
  387. data := []byte("Hello World!")
  388. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  389. if d := receive(t, ln); !bytes.Equal(d, data) {
  390. t.Fatalf("expected %q, got %q", string(data), string(d))
  391. }
  392. }
  393. func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
  394. func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) }
  395. func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
  396. func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) }
  397. func testServerHTTP(t *testing.T, secure, delayTx bool) {
  398. scheme := "tcp"
  399. ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
  400. srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
  401. ln1.Close()
  402. ln2.Close()
  403. mux := http.NewServeMux()
  404. mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
  405. d, err := ioutil.ReadAll(req.Body)
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
  410. t.Fatal(err)
  411. }
  412. })
  413. var tlsConfig *tls.Config
  414. var err error
  415. if secure {
  416. tlsConfig, err = testTLSInfo.ServerConfig()
  417. if err != nil {
  418. t.Fatal(err)
  419. }
  420. }
  421. srv := &http.Server{
  422. Addr: dstAddr,
  423. Handler: mux,
  424. TLSConfig: tlsConfig,
  425. }
  426. donec := make(chan struct{})
  427. defer func() {
  428. srv.Close()
  429. <-donec
  430. }()
  431. go func() {
  432. defer close(donec)
  433. if !secure {
  434. srv.ListenAndServe()
  435. } else {
  436. srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
  437. }
  438. }()
  439. time.Sleep(200 * time.Millisecond)
  440. cfg := ServerConfig{
  441. Logger: testLogger,
  442. From: url.URL{Scheme: scheme, Host: srcAddr},
  443. To: url.URL{Scheme: scheme, Host: dstAddr},
  444. }
  445. if secure {
  446. cfg.TLSInfo = testTLSInfo
  447. }
  448. p := NewServer(cfg)
  449. <-p.Ready()
  450. defer p.Close()
  451. data := "Hello World!"
  452. now := time.Now()
  453. var resp *http.Response
  454. if secure {
  455. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  456. if terr != nil {
  457. t.Fatal(terr)
  458. }
  459. cli := &http.Client{Transport: tp}
  460. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  461. } else {
  462. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  463. }
  464. if err != nil {
  465. t.Fatal(err)
  466. }
  467. d, err := ioutil.ReadAll(resp.Body)
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. took1 := time.Since(now)
  472. t.Logf("took %v with no latency", took1)
  473. rs1 := string(d)
  474. exp := fmt.Sprintf("%q(confirmed)", data)
  475. if rs1 != exp {
  476. t.Fatalf("got %q, expected %q", rs1, exp)
  477. }
  478. lat, rv := 100*time.Millisecond, 10*time.Millisecond
  479. if delayTx {
  480. p.DelayTx(lat, rv)
  481. defer p.UndelayTx()
  482. } else {
  483. p.DelayRx(lat, rv)
  484. defer p.UndelayRx()
  485. }
  486. now = time.Now()
  487. if secure {
  488. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  489. if terr != nil {
  490. t.Fatal(terr)
  491. }
  492. cli := &http.Client{Transport: tp}
  493. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  494. } else {
  495. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  496. }
  497. if err != nil {
  498. t.Fatal(err)
  499. }
  500. d, err = ioutil.ReadAll(resp.Body)
  501. if err != nil {
  502. t.Fatal(err)
  503. }
  504. took2 := time.Since(now)
  505. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  506. rs2 := string(d)
  507. if rs2 != exp {
  508. t.Fatalf("got %q, expected %q", rs2, exp)
  509. }
  510. if took1 > took2 {
  511. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  512. }
  513. }
  514. func newUnixAddr() string {
  515. now := time.Now().UnixNano()
  516. rand.Seed(now)
  517. addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
  518. os.RemoveAll(addr)
  519. return addr
  520. }
  521. func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
  522. var err error
  523. if !tlsInfo.Empty() {
  524. ln, err = transport.NewListener(addr, scheme, &tlsInfo)
  525. } else {
  526. ln, err = net.Listen(scheme, addr)
  527. }
  528. if err != nil {
  529. t.Fatal(err)
  530. }
  531. return ln
  532. }
  533. func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
  534. var out net.Conn
  535. var err error
  536. if !tlsInfo.Empty() {
  537. tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
  538. if terr != nil {
  539. t.Fatal(terr)
  540. }
  541. out, err = tp.Dial(scheme, addr)
  542. } else {
  543. out, err = net.Dial(scheme, addr)
  544. }
  545. if err != nil {
  546. t.Fatal(err)
  547. }
  548. if _, err = out.Write(data); err != nil {
  549. t.Fatal(err)
  550. }
  551. if err = out.Close(); err != nil {
  552. t.Fatal(err)
  553. }
  554. }
  555. func receive(t *testing.T, ln net.Listener) (data []byte) {
  556. buf := bytes.NewBuffer(make([]byte, 0, 1024))
  557. for {
  558. in, err := ln.Accept()
  559. if err != nil {
  560. t.Fatal(err)
  561. }
  562. var n int64
  563. n, err = buf.ReadFrom(in)
  564. if err != nil {
  565. t.Fatal(err)
  566. }
  567. if n > 0 {
  568. break
  569. }
  570. }
  571. return buf.Bytes()
  572. }