server_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "crypto/tls"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "strings"
  27. "testing"
  28. "time"
  29. "go.etcd.io/etcd/pkg/transport"
  30. "go.uber.org/zap"
  31. )
  32. // enable DebugLevel
  33. var testLogger = zap.NewExample()
  34. var testTLSInfo = transport.TLSInfo{
  35. KeyFile: "./fixtures/server.key.insecure",
  36. CertFile: "./fixtures/server.crt",
  37. TrustedCAFile: "./fixtures/ca.crt",
  38. ClientCertAuth: true,
  39. }
  40. func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) }
  41. func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) }
  42. func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) }
  43. func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) }
  44. func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) }
  45. func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) }
  46. func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) }
  47. func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) }
  48. func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
  49. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  50. if scheme == "tcp" {
  51. ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{})
  52. srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String()
  53. ln1.Close()
  54. ln2.Close()
  55. } else {
  56. defer func() {
  57. os.RemoveAll(srcAddr)
  58. os.RemoveAll(dstAddr)
  59. }()
  60. }
  61. tlsInfo := testTLSInfo
  62. if !secure {
  63. tlsInfo = transport.TLSInfo{}
  64. }
  65. ln := listen(t, scheme, dstAddr, tlsInfo)
  66. defer ln.Close()
  67. cfg := ServerConfig{
  68. Logger: testLogger,
  69. From: url.URL{Scheme: scheme, Host: srcAddr},
  70. To: url.URL{Scheme: scheme, Host: dstAddr},
  71. }
  72. if secure {
  73. cfg.TLSInfo = testTLSInfo
  74. }
  75. p := NewServer(cfg)
  76. <-p.Ready()
  77. defer p.Close()
  78. data1 := []byte("Hello World!")
  79. donec, writec := make(chan struct{}), make(chan []byte)
  80. go func() {
  81. defer close(donec)
  82. for data := range writec {
  83. send(t, data, scheme, srcAddr, tlsInfo)
  84. }
  85. }()
  86. recvc := make(chan []byte)
  87. go func() {
  88. for i := 0; i < 2; i++ {
  89. recvc <- receive(t, ln)
  90. }
  91. }()
  92. writec <- data1
  93. now := time.Now()
  94. if d := <-recvc; !bytes.Equal(data1, d) {
  95. t.Fatalf("expected %q, got %q", string(data1), string(d))
  96. }
  97. took1 := time.Since(now)
  98. t.Logf("took %v with no latency", took1)
  99. lat, rv := 50*time.Millisecond, 5*time.Millisecond
  100. if delayTx {
  101. p.DelayTx(lat, rv)
  102. }
  103. data2 := []byte("new data")
  104. writec <- data2
  105. now = time.Now()
  106. if d := <-recvc; !bytes.Equal(data2, d) {
  107. t.Fatalf("expected %q, got %q", string(data2), string(d))
  108. }
  109. took2 := time.Since(now)
  110. if delayTx {
  111. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  112. } else {
  113. t.Logf("took %v with no latency", took2)
  114. }
  115. if delayTx {
  116. p.UndelayTx()
  117. if took1 >= took2 {
  118. t.Fatalf("expected took1 %v < took2 %v (with latency)", took1, took2)
  119. }
  120. }
  121. close(writec)
  122. select {
  123. case <-donec:
  124. case <-time.After(3 * time.Second):
  125. t.Fatal("took too long to write")
  126. }
  127. select {
  128. case <-p.Done():
  129. t.Fatal("unexpected done")
  130. case err := <-p.Error():
  131. t.Fatal(err)
  132. default:
  133. }
  134. if err := p.Close(); err != nil {
  135. t.Fatal(err)
  136. }
  137. select {
  138. case <-p.Done():
  139. case err := <-p.Error():
  140. if !strings.HasPrefix(err.Error(), "accept ") &&
  141. !strings.HasSuffix(err.Error(), "use of closed network connection") {
  142. t.Fatal(err)
  143. }
  144. case <-time.After(3 * time.Second):
  145. t.Fatal("took too long to close")
  146. }
  147. }
  148. func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) }
  149. func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) }
  150. func testServerDelayAccept(t *testing.T, secure bool) {
  151. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  152. defer func() {
  153. os.RemoveAll(srcAddr)
  154. os.RemoveAll(dstAddr)
  155. }()
  156. tlsInfo := testTLSInfo
  157. if !secure {
  158. tlsInfo = transport.TLSInfo{}
  159. }
  160. scheme := "unix"
  161. ln := listen(t, scheme, dstAddr, tlsInfo)
  162. defer ln.Close()
  163. cfg := ServerConfig{
  164. Logger: testLogger,
  165. From: url.URL{Scheme: scheme, Host: srcAddr},
  166. To: url.URL{Scheme: scheme, Host: dstAddr},
  167. }
  168. if secure {
  169. cfg.TLSInfo = testTLSInfo
  170. }
  171. p := NewServer(cfg)
  172. <-p.Ready()
  173. defer p.Close()
  174. data := []byte("Hello World!")
  175. now := time.Now()
  176. send(t, data, scheme, srcAddr, tlsInfo)
  177. if d := receive(t, ln); !bytes.Equal(data, d) {
  178. t.Fatalf("expected %q, got %q", string(data), string(d))
  179. }
  180. took1 := time.Since(now)
  181. t.Logf("took %v with no latency", took1)
  182. lat, rv := 700*time.Millisecond, 10*time.Millisecond
  183. p.DelayAccept(lat, rv)
  184. defer p.UndelayAccept()
  185. if err := p.ResetListener(); err != nil {
  186. t.Fatal(err)
  187. }
  188. time.Sleep(200 * time.Millisecond)
  189. now = time.Now()
  190. send(t, data, scheme, srcAddr, tlsInfo)
  191. if d := receive(t, ln); !bytes.Equal(data, d) {
  192. t.Fatalf("expected %q, got %q", string(data), string(d))
  193. }
  194. took2 := time.Since(now)
  195. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  196. if took1 >= took2 {
  197. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  198. }
  199. }
  200. func TestServer_PauseTx(t *testing.T) {
  201. scheme := "unix"
  202. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  203. defer func() {
  204. os.RemoveAll(srcAddr)
  205. os.RemoveAll(dstAddr)
  206. }()
  207. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  208. defer ln.Close()
  209. p := NewServer(ServerConfig{
  210. Logger: testLogger,
  211. From: url.URL{Scheme: scheme, Host: srcAddr},
  212. To: url.URL{Scheme: scheme, Host: dstAddr},
  213. })
  214. <-p.Ready()
  215. defer p.Close()
  216. p.PauseTx()
  217. data := []byte("Hello World!")
  218. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  219. recvc := make(chan []byte)
  220. go func() {
  221. recvc <- receive(t, ln)
  222. }()
  223. select {
  224. case d := <-recvc:
  225. t.Fatalf("received unexpected data %q during pause", string(d))
  226. case <-time.After(200 * time.Millisecond):
  227. }
  228. p.UnpauseTx()
  229. select {
  230. case d := <-recvc:
  231. if !bytes.Equal(data, d) {
  232. t.Fatalf("expected %q, got %q", string(data), string(d))
  233. }
  234. case <-time.After(2 * time.Second):
  235. t.Fatal("took too long to receive after unpause")
  236. }
  237. }
  238. func TestServer_ModifyTx_corrupt(t *testing.T) {
  239. scheme := "unix"
  240. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  241. defer func() {
  242. os.RemoveAll(srcAddr)
  243. os.RemoveAll(dstAddr)
  244. }()
  245. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  246. defer ln.Close()
  247. p := NewServer(ServerConfig{
  248. Logger: testLogger,
  249. From: url.URL{Scheme: scheme, Host: srcAddr},
  250. To: url.URL{Scheme: scheme, Host: dstAddr},
  251. })
  252. <-p.Ready()
  253. defer p.Close()
  254. p.ModifyTx(func(d []byte) []byte {
  255. d[len(d)/2]++
  256. return d
  257. })
  258. data := []byte("Hello World!")
  259. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  260. if d := receive(t, ln); bytes.Equal(d, data) {
  261. t.Fatalf("expected corrupted data, got %q", string(d))
  262. }
  263. p.UnmodifyTx()
  264. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  265. if d := receive(t, ln); !bytes.Equal(d, data) {
  266. t.Fatalf("expected uncorrupted data, got %q", string(d))
  267. }
  268. }
  269. func TestServer_ModifyTx_packet_loss(t *testing.T) {
  270. scheme := "unix"
  271. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  272. defer func() {
  273. os.RemoveAll(srcAddr)
  274. os.RemoveAll(dstAddr)
  275. }()
  276. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  277. defer ln.Close()
  278. p := NewServer(ServerConfig{
  279. Logger: testLogger,
  280. From: url.URL{Scheme: scheme, Host: srcAddr},
  281. To: url.URL{Scheme: scheme, Host: dstAddr},
  282. })
  283. <-p.Ready()
  284. defer p.Close()
  285. // 50% packet loss
  286. p.ModifyTx(func(d []byte) []byte {
  287. half := len(d) / 2
  288. return d[:half:half]
  289. })
  290. data := []byte("Hello World!")
  291. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  292. if d := receive(t, ln); bytes.Equal(d, data) {
  293. t.Fatalf("expected corrupted data, got %q", string(d))
  294. }
  295. p.UnmodifyTx()
  296. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  297. if d := receive(t, ln); !bytes.Equal(d, data) {
  298. t.Fatalf("expected uncorrupted data, got %q", string(d))
  299. }
  300. }
  301. func TestServer_BlackholeTx(t *testing.T) {
  302. scheme := "unix"
  303. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  304. defer func() {
  305. os.RemoveAll(srcAddr)
  306. os.RemoveAll(dstAddr)
  307. }()
  308. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  309. defer ln.Close()
  310. p := NewServer(ServerConfig{
  311. Logger: testLogger,
  312. From: url.URL{Scheme: scheme, Host: srcAddr},
  313. To: url.URL{Scheme: scheme, Host: dstAddr},
  314. })
  315. <-p.Ready()
  316. defer p.Close()
  317. p.BlackholeTx()
  318. data := []byte("Hello World!")
  319. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  320. recvc := make(chan []byte)
  321. go func() {
  322. recvc <- receive(t, ln)
  323. }()
  324. select {
  325. case d := <-recvc:
  326. t.Fatalf("unexpected data receive %q during blackhole", string(d))
  327. case <-time.After(200 * time.Millisecond):
  328. }
  329. p.UnblackholeTx()
  330. // expect different data, old data dropped
  331. data[0]++
  332. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  333. select {
  334. case d := <-recvc:
  335. if !bytes.Equal(data, d) {
  336. t.Fatalf("expected %q, got %q", string(data), string(d))
  337. }
  338. case <-time.After(2 * time.Second):
  339. t.Fatal("took too long to receive after unblackhole")
  340. }
  341. }
  342. func TestServer_Shutdown(t *testing.T) {
  343. scheme := "unix"
  344. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  345. defer func() {
  346. os.RemoveAll(srcAddr)
  347. os.RemoveAll(dstAddr)
  348. }()
  349. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  350. defer ln.Close()
  351. p := NewServer(ServerConfig{
  352. Logger: testLogger,
  353. From: url.URL{Scheme: scheme, Host: srcAddr},
  354. To: url.URL{Scheme: scheme, Host: dstAddr},
  355. })
  356. <-p.Ready()
  357. defer p.Close()
  358. s, _ := p.(*server)
  359. s.listener.Close()
  360. time.Sleep(200 * time.Millisecond)
  361. data := []byte("Hello World!")
  362. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  363. if d := receive(t, ln); !bytes.Equal(d, data) {
  364. t.Fatalf("expected %q, got %q", string(data), string(d))
  365. }
  366. }
  367. func TestServer_ShutdownListener(t *testing.T) {
  368. scheme := "unix"
  369. srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
  370. defer func() {
  371. os.RemoveAll(srcAddr)
  372. os.RemoveAll(dstAddr)
  373. }()
  374. ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
  375. defer ln.Close()
  376. p := NewServer(ServerConfig{
  377. Logger: testLogger,
  378. From: url.URL{Scheme: scheme, Host: srcAddr},
  379. To: url.URL{Scheme: scheme, Host: dstAddr},
  380. })
  381. <-p.Ready()
  382. defer p.Close()
  383. // shut down destination
  384. ln.Close()
  385. time.Sleep(200 * time.Millisecond)
  386. ln = listen(t, scheme, dstAddr, transport.TLSInfo{})
  387. defer ln.Close()
  388. data := []byte("Hello World!")
  389. send(t, data, scheme, srcAddr, transport.TLSInfo{})
  390. if d := receive(t, ln); !bytes.Equal(d, data) {
  391. t.Fatalf("expected %q, got %q", string(data), string(d))
  392. }
  393. }
  394. func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) }
  395. func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) }
  396. func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) }
  397. func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) }
  398. func testServerHTTP(t *testing.T, secure, delayTx bool) {
  399. scheme := "tcp"
  400. ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{})
  401. srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String()
  402. ln1.Close()
  403. ln2.Close()
  404. mux := http.NewServeMux()
  405. mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) {
  406. d, err := ioutil.ReadAll(req.Body)
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil {
  411. t.Fatal(err)
  412. }
  413. })
  414. var tlsConfig *tls.Config
  415. var err error
  416. if secure {
  417. tlsConfig, err = testTLSInfo.ServerConfig()
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. }
  422. srv := &http.Server{
  423. Addr: dstAddr,
  424. Handler: mux,
  425. TLSConfig: tlsConfig,
  426. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  427. }
  428. donec := make(chan struct{})
  429. defer func() {
  430. srv.Close()
  431. <-donec
  432. }()
  433. go func() {
  434. defer close(donec)
  435. if !secure {
  436. srv.ListenAndServe()
  437. } else {
  438. srv.ListenAndServeTLS(testTLSInfo.CertFile, testTLSInfo.KeyFile)
  439. }
  440. }()
  441. time.Sleep(200 * time.Millisecond)
  442. cfg := ServerConfig{
  443. Logger: testLogger,
  444. From: url.URL{Scheme: scheme, Host: srcAddr},
  445. To: url.URL{Scheme: scheme, Host: dstAddr},
  446. }
  447. if secure {
  448. cfg.TLSInfo = testTLSInfo
  449. }
  450. p := NewServer(cfg)
  451. <-p.Ready()
  452. defer p.Close()
  453. data := "Hello World!"
  454. now := time.Now()
  455. var resp *http.Response
  456. if secure {
  457. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  458. if terr != nil {
  459. t.Fatal(terr)
  460. }
  461. cli := &http.Client{Transport: tp}
  462. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  463. } else {
  464. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  465. }
  466. if err != nil {
  467. t.Fatal(err)
  468. }
  469. d, err := ioutil.ReadAll(resp.Body)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. took1 := time.Since(now)
  474. t.Logf("took %v with no latency", took1)
  475. rs1 := string(d)
  476. exp := fmt.Sprintf("%q(confirmed)", data)
  477. if rs1 != exp {
  478. t.Fatalf("got %q, expected %q", rs1, exp)
  479. }
  480. lat, rv := 100*time.Millisecond, 10*time.Millisecond
  481. if delayTx {
  482. p.DelayTx(lat, rv)
  483. defer p.UndelayTx()
  484. } else {
  485. p.DelayRx(lat, rv)
  486. defer p.UndelayRx()
  487. }
  488. now = time.Now()
  489. if secure {
  490. tp, terr := transport.NewTransport(testTLSInfo, 3*time.Second)
  491. if terr != nil {
  492. t.Fatal(terr)
  493. }
  494. cli := &http.Client{Transport: tp}
  495. resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data))
  496. } else {
  497. resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data))
  498. }
  499. if err != nil {
  500. t.Fatal(err)
  501. }
  502. d, err = ioutil.ReadAll(resp.Body)
  503. if err != nil {
  504. t.Fatal(err)
  505. }
  506. took2 := time.Since(now)
  507. t.Logf("took %v with latency %v±%v", took2, lat, rv)
  508. rs2 := string(d)
  509. if rs2 != exp {
  510. t.Fatalf("got %q, expected %q", rs2, exp)
  511. }
  512. if took1 > took2 {
  513. t.Fatalf("expected took1 %v < took2 %v", took1, took2)
  514. }
  515. }
  516. func newUnixAddr() string {
  517. now := time.Now().UnixNano()
  518. rand.Seed(now)
  519. addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000))
  520. os.RemoveAll(addr)
  521. return addr
  522. }
  523. func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) {
  524. var err error
  525. if !tlsInfo.Empty() {
  526. ln, err = transport.NewListener(addr, scheme, &tlsInfo)
  527. } else {
  528. ln, err = net.Listen(scheme, addr)
  529. }
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. return ln
  534. }
  535. func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) {
  536. var out net.Conn
  537. var err error
  538. if !tlsInfo.Empty() {
  539. tp, terr := transport.NewTransport(tlsInfo, 3*time.Second)
  540. if terr != nil {
  541. t.Fatal(terr)
  542. }
  543. out, err = tp.Dial(scheme, addr)
  544. } else {
  545. out, err = net.Dial(scheme, addr)
  546. }
  547. if err != nil {
  548. t.Fatal(err)
  549. }
  550. if _, err = out.Write(data); err != nil {
  551. t.Fatal(err)
  552. }
  553. if err = out.Close(); err != nil {
  554. t.Fatal(err)
  555. }
  556. }
  557. func receive(t *testing.T, ln net.Listener) (data []byte) {
  558. buf := bytes.NewBuffer(make([]byte, 0, 1024))
  559. for {
  560. in, err := ln.Accept()
  561. if err != nil {
  562. t.Fatal(err)
  563. }
  564. var n int64
  565. n, err = buf.ReadFrom(in)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. if n > 0 {
  570. break
  571. }
  572. }
  573. return buf.Bytes()
  574. }