mux_test.go 11 KB


  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "io"
  7. "io/ioutil"
  8. "sync"
  9. "testing"
  10. )
  11. func muxPair() (*mux, *mux) {
  12. a, b := memPipe()
  13. s := newMux(a)
  14. c := newMux(b)
  15. return s, c
  16. }
  17. // Returns both ends of a channel, and the mux for the 2nd
  18. // channel.
  19. func channelPair(t *testing.T) (*channel, *channel, *mux) {
  20. c, s := muxPair()
  21. res := make(chan *channel, 1)
  22. go func() {
  23. newCh, ok := <-s.incomingChannels
  24. if !ok {
  25. t.Fatalf("No incoming channel")
  26. }
  27. if newCh.ChannelType() != "chan" {
  28. t.Fatalf("got type %q want chan", newCh.ChannelType())
  29. }
  30. ch, _, err := newCh.Accept()
  31. if err != nil {
  32. t.Fatalf("Accept %v", err)
  33. }
  34. res <- ch.(*channel)
  35. }()
  36. ch, err := c.openChannel("chan", nil)
  37. if err != nil {
  38. t.Fatalf("OpenChannel: %v", err)
  39. }
  40. return <-res, ch, c
  41. }
  42. // Test that stderr and stdout can be addressed from different
  43. // goroutines. This is intended for use with the race detector.
  44. func TestMuxChannelExtendedThreadSafety(t *testing.T) {
  45. writer, reader, mux := channelPair(t)
  46. defer writer.Close()
  47. defer reader.Close()
  48. defer mux.Close()
  49. var wr, rd sync.WaitGroup
  50. magic := "hello world"
  51. wr.Add(2)
  52. go func() {
  53. io.WriteString(writer, magic)
  54. wr.Done()
  55. }()
  56. go func() {
  57. io.WriteString(writer.Stderr(), magic)
  58. wr.Done()
  59. }()
  60. rd.Add(2)
  61. go func() {
  62. c, err := ioutil.ReadAll(reader)
  63. if string(c) != magic {
  64. t.Fatalf("stdout read got %q, want %q (error %s)", c, magic, err)
  65. }
  66. rd.Done()
  67. }()
  68. go func() {
  69. c, err := ioutil.ReadAll(reader.Stderr())
  70. if string(c) != magic {
  71. t.Fatalf("stderr read got %q, want %q (error %s)", c, magic, err)
  72. }
  73. rd.Done()
  74. }()
  75. wr.Wait()
  76. writer.CloseWrite()
  77. rd.Wait()
  78. }
  79. func TestMuxReadWrite(t *testing.T) {
  80. s, c, mux := channelPair(t)
  81. defer s.Close()
  82. defer c.Close()
  83. defer mux.Close()
  84. magic := "hello world"
  85. magicExt := "hello stderr"
  86. go func() {
  87. _, err := s.Write([]byte(magic))
  88. if err != nil {
  89. t.Fatalf("Write: %v", err)
  90. }
  91. _, err = s.Extended(1).Write([]byte(magicExt))
  92. if err != nil {
  93. t.Fatalf("Write: %v", err)
  94. }
  95. }()
  96. var buf [1024]byte
  97. n, err := c.Read(buf[:])
  98. if err != nil {
  99. t.Fatalf("server Read: %v", err)
  100. }
  101. got := string(buf[:n])
  102. if got != magic {
  103. t.Fatalf("server: got %q want %q", got, magic)
  104. }
  105. n, err = c.Extended(1).Read(buf[:])
  106. if err != nil {
  107. t.Fatalf("server Read: %v", err)
  108. }
  109. got = string(buf[:n])
  110. if got != magicExt {
  111. t.Fatalf("server: got %q want %q", got, magic)
  112. }
  113. }
  114. func TestMuxChannelOverflow(t *testing.T) {
  115. reader, writer, mux := channelPair(t)
  116. defer reader.Close()
  117. defer writer.Close()
  118. defer mux.Close()
  119. wDone := make(chan int, 1)
  120. go func() {
  121. if _, err := writer.Write(make([]byte, channelWindowSize)); err != nil {
  122. t.Errorf("could not fill window: %v", err)
  123. }
  124. writer.Write(make([]byte, 1))
  125. wDone <- 1
  126. }()
  127. writer.remoteWin.waitWriterBlocked()
  128. // Send 1 byte.
  129. packet := make([]byte, 1+4+4+1)
  130. packet[0] = msgChannelData
  131. marshalUint32(packet[1:], writer.remoteId)
  132. marshalUint32(packet[5:], uint32(1))
  133. packet[9] = 42
  134. if err := writer.mux.conn.writePacket(packet); err != nil {
  135. t.Errorf("could not send packet")
  136. }
  137. if _, err := reader.SendRequest("hello", true, nil); err == nil {
  138. t.Errorf("SendRequest succeeded.")
  139. }
  140. <-wDone
  141. }
  142. func TestMuxChannelCloseWriteUnblock(t *testing.T) {
  143. reader, writer, mux := channelPair(t)
  144. defer reader.Close()
  145. defer writer.Close()
  146. defer mux.Close()
  147. wDone := make(chan int, 1)
  148. go func() {
  149. if _, err := writer.Write(make([]byte, channelWindowSize)); err != nil {
  150. t.Errorf("could not fill window: %v", err)
  151. }
  152. if _, err := writer.Write(make([]byte, 1)); err != io.EOF {
  153. t.Errorf("got %v, want EOF for unblock write", err)
  154. }
  155. wDone <- 1
  156. }()
  157. writer.remoteWin.waitWriterBlocked()
  158. reader.Close()
  159. <-wDone
  160. }
  161. func TestMuxConnectionCloseWriteUnblock(t *testing.T) {
  162. reader, writer, mux := channelPair(t)
  163. defer reader.Close()
  164. defer writer.Close()
  165. defer mux.Close()
  166. wDone := make(chan int, 1)
  167. go func() {
  168. if _, err := writer.Write(make([]byte, channelWindowSize)); err != nil {
  169. t.Errorf("could not fill window: %v", err)
  170. }
  171. if _, err := writer.Write(make([]byte, 1)); err != io.EOF {
  172. t.Errorf("got %v, want EOF for unblock write", err)
  173. }
  174. wDone <- 1
  175. }()
  176. writer.remoteWin.waitWriterBlocked()
  177. mux.Close()
  178. <-wDone
  179. }
  180. func TestMuxReject(t *testing.T) {
  181. client, server := muxPair()
  182. defer server.Close()
  183. defer client.Close()
  184. go func() {
  185. ch, ok := <-server.incomingChannels
  186. if !ok {
  187. t.Fatalf("Accept")
  188. }
  189. if ch.ChannelType() != "ch" || string(ch.ExtraData()) != "extra" {
  190. t.Fatalf("unexpected channel: %q, %q", ch.ChannelType(), ch.ExtraData())
  191. }
  192. ch.Reject(RejectionReason(42), "message")
  193. }()
  194. ch, err := client.openChannel("ch", []byte("extra"))
  195. if ch != nil {
  196. t.Fatal("openChannel not rejected")
  197. }
  198. ocf, ok := err.(*OpenChannelError)
  199. if !ok {
  200. t.Errorf("got %#v want *OpenChannelError", err)
  201. } else if ocf.Reason != 42 || ocf.Message != "message" {
  202. t.Errorf("got %#v, want {Reason: 42, Message: %q}", ocf, "message")
  203. }
  204. want := "ssh: rejected: unknown reason 42 (message)"
  205. if err.Error() != want {
  206. t.Errorf("got %q, want %q", err.Error(), want)
  207. }
  208. }
  209. func TestMuxChannelRequest(t *testing.T) {
  210. client, server, mux := channelPair(t)
  211. defer server.Close()
  212. defer client.Close()
  213. defer mux.Close()
  214. var received int
  215. var wg sync.WaitGroup
  216. wg.Add(1)
  217. go func() {
  218. for r := range server.incomingRequests {
  219. received++
  220. r.Reply(r.Type == "yes", nil)
  221. }
  222. wg.Done()
  223. }()
  224. _, err := client.SendRequest("yes", false, nil)
  225. if err != nil {
  226. t.Fatalf("SendRequest: %v", err)
  227. }
  228. ok, err := client.SendRequest("yes", true, nil)
  229. if err != nil {
  230. t.Fatalf("SendRequest: %v", err)
  231. }
  232. if !ok {
  233. t.Errorf("SendRequest(yes): %v", ok)
  234. }
  235. ok, err = client.SendRequest("no", true, nil)
  236. if err != nil {
  237. t.Fatalf("SendRequest: %v", err)
  238. }
  239. if ok {
  240. t.Errorf("SendRequest(no): %v", ok)
  241. }
  242. client.Close()
  243. wg.Wait()
  244. if received != 3 {
  245. t.Errorf("got %d requests, want %d", received, 3)
  246. }
  247. }
  248. func TestMuxGlobalRequest(t *testing.T) {
  249. clientMux, serverMux := muxPair()
  250. defer serverMux.Close()
  251. defer clientMux.Close()
  252. var seen bool
  253. go func() {
  254. for r := range serverMux.incomingRequests {
  255. seen = seen || r.Type == "peek"
  256. if r.WantReply {
  257. err := r.Reply(r.Type == "yes",
  258. append([]byte(r.Type), r.Payload...))
  259. if err != nil {
  260. t.Errorf("AckRequest: %v", err)
  261. }
  262. }
  263. }
  264. }()
  265. _, _, err := clientMux.SendRequest("peek", false, nil)
  266. if err != nil {
  267. t.Errorf("SendRequest: %v", err)
  268. }
  269. ok, data, err := clientMux.SendRequest("yes", true, []byte("a"))
  270. if !ok || string(data) != "yesa" || err != nil {
  271. t.Errorf("SendRequest(\"yes\", true, \"a\"): %v %v %v",
  272. ok, data, err)
  273. }
  274. if ok, data, err := clientMux.SendRequest("yes", true, []byte("a")); !ok || string(data) != "yesa" || err != nil {
  275. t.Errorf("SendRequest(\"yes\", true, \"a\"): %v %v %v",
  276. ok, data, err)
  277. }
  278. if ok, data, err := clientMux.SendRequest("no", true, []byte("a")); ok || string(data) != "noa" || err != nil {
  279. t.Errorf("SendRequest(\"no\", true, \"a\"): %v %v %v",
  280. ok, data, err)
  281. }
  282. if !seen {
  283. t.Errorf("never saw 'peek' request")
  284. }
  285. }
  286. func TestMuxGlobalRequestUnblock(t *testing.T) {
  287. clientMux, serverMux := muxPair()
  288. defer serverMux.Close()
  289. defer clientMux.Close()
  290. result := make(chan error, 1)
  291. go func() {
  292. _, _, err := clientMux.SendRequest("hello", true, nil)
  293. result <- err
  294. }()
  295. <-serverMux.incomingRequests
  296. serverMux.conn.Close()
  297. err := <-result
  298. if err != io.EOF {
  299. t.Errorf("want EOF, got %v", io.EOF)
  300. }
  301. }
  302. func TestMuxChannelRequestUnblock(t *testing.T) {
  303. a, b, connB := channelPair(t)
  304. defer a.Close()
  305. defer b.Close()
  306. defer connB.Close()
  307. result := make(chan error, 1)
  308. go func() {
  309. _, err := a.SendRequest("hello", true, nil)
  310. result <- err
  311. }()
  312. <-b.incomingRequests
  313. connB.conn.Close()
  314. err := <-result
  315. if err != io.EOF {
  316. t.Errorf("want EOF, got %v", err)
  317. }
  318. }
  319. func TestMuxCloseChannel(t *testing.T) {
  320. r, w, mux := channelPair(t)
  321. defer mux.Close()
  322. defer r.Close()
  323. defer w.Close()
  324. result := make(chan error, 1)
  325. go func() {
  326. var b [1024]byte
  327. _, err := r.Read(b[:])
  328. result <- err
  329. }()
  330. if err := w.Close(); err != nil {
  331. t.Errorf("w.Close: %v", err)
  332. }
  333. if _, err := w.Write([]byte("hello")); err != io.EOF {
  334. t.Errorf("got err %v, want io.EOF after Close", err)
  335. }
  336. if err := <-result; err != io.EOF {
  337. t.Errorf("got %v (%T), want io.EOF", err, err)
  338. }
  339. }
  340. func TestMuxCloseWriteChannel(t *testing.T) {
  341. r, w, mux := channelPair(t)
  342. defer mux.Close()
  343. result := make(chan error, 1)
  344. go func() {
  345. var b [1024]byte
  346. _, err := r.Read(b[:])
  347. result <- err
  348. }()
  349. if err := w.CloseWrite(); err != nil {
  350. t.Errorf("w.CloseWrite: %v", err)
  351. }
  352. if _, err := w.Write([]byte("hello")); err != io.EOF {
  353. t.Errorf("got err %v, want io.EOF after CloseWrite", err)
  354. }
  355. if err := <-result; err != io.EOF {
  356. t.Errorf("got %v (%T), want io.EOF", err, err)
  357. }
  358. }
  359. func TestMuxInvalidRecord(t *testing.T) {
  360. a, b := muxPair()
  361. defer a.Close()
  362. defer b.Close()
  363. packet := make([]byte, 1+4+4+1)
  364. packet[0] = msgChannelData
  365. marshalUint32(packet[1:], 29348723 /* invalid channel id */)
  366. marshalUint32(packet[5:], 1)
  367. packet[9] = 42
  368. a.conn.writePacket(packet)
  369. go a.SendRequest("hello", false, nil)
  370. // 'a' wrote an invalid packet, so 'b' has exited.
  371. req, ok := <-b.incomingRequests
  372. if ok {
  373. t.Errorf("got request %#v after receiving invalid packet", req)
  374. }
  375. }
  376. func TestZeroWindowAdjust(t *testing.T) {
  377. a, b, mux := channelPair(t)
  378. defer a.Close()
  379. defer b.Close()
  380. defer mux.Close()
  381. go func() {
  382. io.WriteString(a, "hello")
  383. // bogus adjust.
  384. a.sendMessage(windowAdjustMsg{})
  385. io.WriteString(a, "world")
  386. a.Close()
  387. }()
  388. want := "helloworld"
  389. c, _ := ioutil.ReadAll(b)
  390. if string(c) != want {
  391. t.Errorf("got %q want %q", c, want)
  392. }
  393. }
  394. func TestMuxMaxPacketSize(t *testing.T) {
  395. a, b, mux := channelPair(t)
  396. defer a.Close()
  397. defer b.Close()
  398. defer mux.Close()
  399. large := make([]byte, a.maxRemotePayload+1)
  400. packet := make([]byte, 1+4+4+1+len(large))
  401. packet[0] = msgChannelData
  402. marshalUint32(packet[1:], a.remoteId)
  403. marshalUint32(packet[5:], uint32(len(large)))
  404. packet[9] = 42
  405. if err := a.mux.conn.writePacket(packet); err != nil {
  406. t.Errorf("could not send packet")
  407. }
  408. go a.SendRequest("hello", false, nil)
  409. _, ok := <-b.incomingRequests
  410. if ok {
  411. t.Errorf("connection still alive after receiving large packet.")
  412. }
  413. }
  414. // Don't ship code with debug=true.
  415. func TestDebug(t *testing.T) {
  416. if debugMux {
  417. t.Error("mux debug switched on")
  418. }
  419. if debugHandshake {
  420. t.Error("handshake debug switched on")
  421. }
  422. if debugTransport {
  423. t.Error("transport debug switched on")
  424. }
  425. }