conntest.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package nettest
  5. import (
  6. "bytes"
  7. "encoding/binary"
  8. "io"
  9. "io/ioutil"
  10. "math/rand"
  11. "net"
  12. "runtime"
  13. "sync"
  14. "testing"
  15. "time"
  16. )
  17. // MakePipe creates a connection between two endpoints and returns the pair
  18. // as c1 and c2, such that anything written to c1 is read by c2 and vice-versa.
  19. // The stop function closes all resources, including c1, c2, and the underlying
  20. // net.Listener (if there is one), and should not be nil.
  21. type MakePipe func() (c1, c2 net.Conn, stop func(), err error)
  22. // TestConn tests that a net.Conn implementation properly satisfies the interface.
  23. // The tests should not produce any false positives, but may experience
  24. // false negatives. Thus, some issues may only be detected when the test is
  25. // run multiple times. For maximal effectiveness, run the tests under the
  26. // race detector.
  27. func TestConn(t *testing.T, mp MakePipe) {
  28. t.Run("BasicIO", func(t *testing.T) { timeoutWrapper(t, mp, testBasicIO) })
  29. t.Run("PingPong", func(t *testing.T) { timeoutWrapper(t, mp, testPingPong) })
  30. t.Run("RacyRead", func(t *testing.T) { timeoutWrapper(t, mp, testRacyRead) })
  31. t.Run("RacyWrite", func(t *testing.T) { timeoutWrapper(t, mp, testRacyWrite) })
  32. t.Run("ReadTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testReadTimeout) })
  33. t.Run("WriteTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testWriteTimeout) })
  34. t.Run("PastTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPastTimeout) })
  35. t.Run("PresentTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPresentTimeout) })
  36. t.Run("FutureTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testFutureTimeout) })
  37. t.Run("CloseTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testCloseTimeout) })
  38. t.Run("ConcurrentMethods", func(t *testing.T) { timeoutWrapper(t, mp, testConcurrentMethods) })
  39. }
  40. type connTester func(t *testing.T, c1, c2 net.Conn)
  41. func timeoutWrapper(t *testing.T, mp MakePipe, f connTester) {
  42. t.Helper()
  43. c1, c2, stop, err := mp()
  44. if err != nil {
  45. t.Fatalf("unable to make pipe: %v", err)
  46. }
  47. var once sync.Once
  48. defer once.Do(func() { stop() })
  49. timer := time.AfterFunc(time.Minute, func() {
  50. once.Do(func() {
  51. t.Error("test timed out; terminating pipe")
  52. stop()
  53. })
  54. })
  55. defer timer.Stop()
  56. f(t, c1, c2)
  57. }
  58. // testBasicIO tests that the data sent on c1 is properly received on c2.
  59. func testBasicIO(t *testing.T, c1, c2 net.Conn) {
  60. want := make([]byte, 1<<20)
  61. rand.New(rand.NewSource(0)).Read(want)
  62. dataCh := make(chan []byte)
  63. go func() {
  64. rd := bytes.NewReader(want)
  65. if err := chunkedCopy(c1, rd); err != nil {
  66. t.Errorf("unexpected c1.Write error: %v", err)
  67. }
  68. if err := c1.Close(); err != nil {
  69. t.Errorf("unexpected c1.Close error: %v", err)
  70. }
  71. }()
  72. go func() {
  73. wr := new(bytes.Buffer)
  74. if err := chunkedCopy(wr, c2); err != nil {
  75. t.Errorf("unexpected c2.Read error: %v", err)
  76. }
  77. if err := c2.Close(); err != nil {
  78. t.Errorf("unexpected c2.Close error: %v", err)
  79. }
  80. dataCh <- wr.Bytes()
  81. }()
  82. if got := <-dataCh; !bytes.Equal(got, want) {
  83. t.Error("transmitted data differs")
  84. }
  85. }
  86. // testPingPong tests that the two endpoints can synchronously send data to
  87. // each other in a typical request-response pattern.
  88. func testPingPong(t *testing.T, c1, c2 net.Conn) {
  89. var wg sync.WaitGroup
  90. defer wg.Wait()
  91. pingPonger := func(c net.Conn) {
  92. defer wg.Done()
  93. buf := make([]byte, 8)
  94. var prev uint64
  95. for {
  96. if _, err := io.ReadFull(c, buf); err != nil {
  97. if err == io.EOF {
  98. break
  99. }
  100. t.Errorf("unexpected Read error: %v", err)
  101. }
  102. v := binary.LittleEndian.Uint64(buf)
  103. binary.LittleEndian.PutUint64(buf, v+1)
  104. if prev != 0 && prev+2 != v {
  105. t.Errorf("mismatching value: got %d, want %d", v, prev+2)
  106. }
  107. prev = v
  108. if v == 1000 {
  109. break
  110. }
  111. if _, err := c.Write(buf); err != nil {
  112. t.Errorf("unexpected Write error: %v", err)
  113. break
  114. }
  115. }
  116. if err := c.Close(); err != nil {
  117. t.Errorf("unexpected Close error: %v", err)
  118. }
  119. }
  120. wg.Add(2)
  121. go pingPonger(c1)
  122. go pingPonger(c2)
  123. // Start off the chain reaction.
  124. if _, err := c1.Write(make([]byte, 8)); err != nil {
  125. t.Errorf("unexpected c1.Write error: %v", err)
  126. }
  127. }
  128. // testRacyRead tests that it is safe to mutate the input Read buffer
  129. // immediately after cancelation has occurred.
  130. func testRacyRead(t *testing.T, c1, c2 net.Conn) {
  131. go chunkedCopy(c2, rand.New(rand.NewSource(0)))
  132. var wg sync.WaitGroup
  133. defer wg.Wait()
  134. c1.SetReadDeadline(time.Now().Add(time.Millisecond))
  135. for i := 0; i < 10; i++ {
  136. wg.Add(1)
  137. go func() {
  138. defer wg.Done()
  139. b1 := make([]byte, 1024)
  140. b2 := make([]byte, 1024)
  141. for j := 0; j < 100; j++ {
  142. _, err := c1.Read(b1)
  143. copy(b1, b2) // Mutate b1 to trigger potential race
  144. if err != nil {
  145. checkForTimeoutError(t, err)
  146. c1.SetReadDeadline(time.Now().Add(time.Millisecond))
  147. }
  148. }
  149. }()
  150. }
  151. }
  152. // testRacyWrite tests that it is safe to mutate the input Write buffer
  153. // immediately after cancelation has occurred.
  154. func testRacyWrite(t *testing.T, c1, c2 net.Conn) {
  155. go chunkedCopy(ioutil.Discard, c2)
  156. var wg sync.WaitGroup
  157. defer wg.Wait()
  158. c1.SetWriteDeadline(time.Now().Add(time.Millisecond))
  159. for i := 0; i < 10; i++ {
  160. wg.Add(1)
  161. go func() {
  162. defer wg.Done()
  163. b1 := make([]byte, 1024)
  164. b2 := make([]byte, 1024)
  165. for j := 0; j < 100; j++ {
  166. _, err := c1.Write(b1)
  167. copy(b1, b2) // Mutate b1 to trigger potential race
  168. if err != nil {
  169. checkForTimeoutError(t, err)
  170. c1.SetWriteDeadline(time.Now().Add(time.Millisecond))
  171. }
  172. }
  173. }()
  174. }
  175. }
  176. // testReadTimeout tests that Read timeouts do not affect Write.
  177. func testReadTimeout(t *testing.T, c1, c2 net.Conn) {
  178. go chunkedCopy(ioutil.Discard, c2)
  179. c1.SetReadDeadline(aLongTimeAgo)
  180. _, err := c1.Read(make([]byte, 1024))
  181. checkForTimeoutError(t, err)
  182. if _, err := c1.Write(make([]byte, 1024)); err != nil {
  183. t.Errorf("unexpected Write error: %v", err)
  184. }
  185. }
  186. // testWriteTimeout tests that Write timeouts do not affect Read.
  187. func testWriteTimeout(t *testing.T, c1, c2 net.Conn) {
  188. go chunkedCopy(c2, rand.New(rand.NewSource(0)))
  189. c1.SetWriteDeadline(aLongTimeAgo)
  190. _, err := c1.Write(make([]byte, 1024))
  191. checkForTimeoutError(t, err)
  192. if _, err := c1.Read(make([]byte, 1024)); err != nil {
  193. t.Errorf("unexpected Read error: %v", err)
  194. }
  195. }
  196. // testPastTimeout tests that a deadline set in the past immediately times out
  197. // Read and Write requests.
  198. func testPastTimeout(t *testing.T, c1, c2 net.Conn) {
  199. go chunkedCopy(c2, c2)
  200. testRoundtrip(t, c1)
  201. c1.SetDeadline(aLongTimeAgo)
  202. n, err := c1.Write(make([]byte, 1024))
  203. if n != 0 {
  204. t.Errorf("unexpected Write count: got %d, want 0", n)
  205. }
  206. checkForTimeoutError(t, err)
  207. n, err = c1.Read(make([]byte, 1024))
  208. if n != 0 {
  209. t.Errorf("unexpected Read count: got %d, want 0", n)
  210. }
  211. checkForTimeoutError(t, err)
  212. testRoundtrip(t, c1)
  213. }
  214. // testPresentTimeout tests that a past deadline set while there are pending
  215. // Read and Write operations immediately times out those operations.
  216. func testPresentTimeout(t *testing.T, c1, c2 net.Conn) {
  217. var wg sync.WaitGroup
  218. defer wg.Wait()
  219. wg.Add(3)
  220. deadlineSet := make(chan bool, 1)
  221. go func() {
  222. defer wg.Done()
  223. time.Sleep(100 * time.Millisecond)
  224. deadlineSet <- true
  225. c1.SetReadDeadline(aLongTimeAgo)
  226. c1.SetWriteDeadline(aLongTimeAgo)
  227. }()
  228. go func() {
  229. defer wg.Done()
  230. n, err := c1.Read(make([]byte, 1024))
  231. if n != 0 {
  232. t.Errorf("unexpected Read count: got %d, want 0", n)
  233. }
  234. checkForTimeoutError(t, err)
  235. if len(deadlineSet) == 0 {
  236. t.Error("Read timed out before deadline is set")
  237. }
  238. }()
  239. go func() {
  240. defer wg.Done()
  241. var err error
  242. for err == nil {
  243. _, err = c1.Write(make([]byte, 1024))
  244. }
  245. checkForTimeoutError(t, err)
  246. if len(deadlineSet) == 0 {
  247. t.Error("Write timed out before deadline is set")
  248. }
  249. }()
  250. }
  251. // testFutureTimeout tests that a future deadline will eventually time out
  252. // Read and Write operations.
  253. func testFutureTimeout(t *testing.T, c1, c2 net.Conn) {
  254. var wg sync.WaitGroup
  255. wg.Add(2)
  256. c1.SetDeadline(time.Now().Add(100 * time.Millisecond))
  257. go func() {
  258. defer wg.Done()
  259. _, err := c1.Read(make([]byte, 1024))
  260. checkForTimeoutError(t, err)
  261. }()
  262. go func() {
  263. defer wg.Done()
  264. var err error
  265. for err == nil {
  266. _, err = c1.Write(make([]byte, 1024))
  267. }
  268. checkForTimeoutError(t, err)
  269. }()
  270. wg.Wait()
  271. go chunkedCopy(c2, c2)
  272. resyncConn(t, c1)
  273. testRoundtrip(t, c1)
  274. }
  275. // testCloseTimeout tests that calling Close immediately times out pending
  276. // Read and Write operations.
  277. func testCloseTimeout(t *testing.T, c1, c2 net.Conn) {
  278. go chunkedCopy(c2, c2)
  279. var wg sync.WaitGroup
  280. defer wg.Wait()
  281. wg.Add(3)
  282. // Test for cancelation upon connection closure.
  283. c1.SetDeadline(neverTimeout)
  284. go func() {
  285. defer wg.Done()
  286. time.Sleep(100 * time.Millisecond)
  287. c1.Close()
  288. }()
  289. go func() {
  290. defer wg.Done()
  291. var err error
  292. buf := make([]byte, 1024)
  293. for err == nil {
  294. _, err = c1.Read(buf)
  295. }
  296. }()
  297. go func() {
  298. defer wg.Done()
  299. var err error
  300. buf := make([]byte, 1024)
  301. for err == nil {
  302. _, err = c1.Write(buf)
  303. }
  304. }()
  305. }
  306. // testConcurrentMethods tests that the methods of net.Conn can safely
  307. // be called concurrently.
  308. func testConcurrentMethods(t *testing.T, c1, c2 net.Conn) {
  309. if runtime.GOOS == "plan9" {
  310. t.Skip("skipping on plan9; see https://golang.org/issue/20489")
  311. }
  312. go chunkedCopy(c2, c2)
  313. // The results of the calls may be nonsensical, but this should
  314. // not trigger a race detector warning.
  315. var wg sync.WaitGroup
  316. for i := 0; i < 100; i++ {
  317. wg.Add(7)
  318. go func() {
  319. defer wg.Done()
  320. c1.Read(make([]byte, 1024))
  321. }()
  322. go func() {
  323. defer wg.Done()
  324. c1.Write(make([]byte, 1024))
  325. }()
  326. go func() {
  327. defer wg.Done()
  328. c1.SetDeadline(time.Now().Add(10 * time.Millisecond))
  329. }()
  330. go func() {
  331. defer wg.Done()
  332. c1.SetReadDeadline(aLongTimeAgo)
  333. }()
  334. go func() {
  335. defer wg.Done()
  336. c1.SetWriteDeadline(aLongTimeAgo)
  337. }()
  338. go func() {
  339. defer wg.Done()
  340. c1.LocalAddr()
  341. }()
  342. go func() {
  343. defer wg.Done()
  344. c1.RemoteAddr()
  345. }()
  346. }
  347. wg.Wait() // At worst, the deadline is set 10ms into the future
  348. resyncConn(t, c1)
  349. testRoundtrip(t, c1)
  350. }
  351. // checkForTimeoutError checks that the error satisfies the Error interface
  352. // and that Timeout returns true.
  353. func checkForTimeoutError(t *testing.T, err error) {
  354. t.Helper()
  355. if nerr, ok := err.(net.Error); ok {
  356. if !nerr.Timeout() {
  357. t.Errorf("err.Timeout() = false, want true")
  358. }
  359. } else {
  360. t.Errorf("got %T, want net.Error", err)
  361. }
  362. }
  363. // testRoundtrip writes something into c and reads it back.
  364. // It assumes that everything written into c is echoed back to itself.
  365. func testRoundtrip(t *testing.T, c net.Conn) {
  366. t.Helper()
  367. if err := c.SetDeadline(neverTimeout); err != nil {
  368. t.Errorf("roundtrip SetDeadline error: %v", err)
  369. }
  370. const s = "Hello, world!"
  371. buf := []byte(s)
  372. if _, err := c.Write(buf); err != nil {
  373. t.Errorf("roundtrip Write error: %v", err)
  374. }
  375. if _, err := io.ReadFull(c, buf); err != nil {
  376. t.Errorf("roundtrip Read error: %v", err)
  377. }
  378. if string(buf) != s {
  379. t.Errorf("roundtrip data mismatch: got %q, want %q", buf, s)
  380. }
  381. }
  382. // resyncConn resynchronizes the connection into a sane state.
  383. // It assumes that everything written into c is echoed back to itself.
  384. // It assumes that 0xff is not currently on the wire or in the read buffer.
  385. func resyncConn(t *testing.T, c net.Conn) {
  386. t.Helper()
  387. c.SetDeadline(neverTimeout)
  388. errCh := make(chan error)
  389. go func() {
  390. _, err := c.Write([]byte{0xff})
  391. errCh <- err
  392. }()
  393. buf := make([]byte, 1024)
  394. for {
  395. n, err := c.Read(buf)
  396. if n > 0 && bytes.IndexByte(buf[:n], 0xff) == n-1 {
  397. break
  398. }
  399. if err != nil {
  400. t.Errorf("unexpected Read error: %v", err)
  401. break
  402. }
  403. }
  404. if err := <-errCh; err != nil {
  405. t.Errorf("unexpected Write error: %v", err)
  406. }
  407. }
  408. // chunkedCopy copies from r to w in fixed-width chunks to avoid
  409. // causing a Write that exceeds the maximum packet size for packet-based
  410. // connections like "unixpacket".
  411. // We assume that the maximum packet size is at least 1024.
  412. func chunkedCopy(w io.Writer, r io.Reader) error {
  413. b := make([]byte, 1024)
  414. _, err := io.CopyBuffer(struct{ io.Writer }{w}, struct{ io.Reader }{r}, b)
  415. return err
  416. }