channel.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. const (
  14. minPacketLength = 9
  15. // channelMaxPacket contains the maximum number of bytes that will be
  16. // sent in a single packet. As per RFC 4253, section 6.1, 32k is also
  17. // the minimum.
  18. channelMaxPacket = 1 << 15
  19. // We follow OpenSSH here.
  20. channelWindowSize = 64 * channelMaxPacket
  21. )
  22. // NewChannel represents an incoming request to a channel. It must either be
  23. // accepted for use by calling Accept, or rejected by calling Reject.
  24. type NewChannel interface {
  25. // Accept accepts the channel creation request. It returns the Channel
  26. // and a Go channel containing SSH requests. The Go channel must be
  27. // serviced otherwise the Channel will hang.
  28. Accept() (Channel, <-chan *Request, error)
  29. // Reject rejects the channel creation request. After calling
  30. // this, no other methods on the Channel may be called.
  31. Reject(reason RejectionReason, message string) error
  32. // ChannelType returns the type of the channel, as supplied by the
  33. // client.
  34. ChannelType() string
  35. // ExtraData returns the arbitrary payload for this channel, as supplied
  36. // by the client. This data is specific to the channel type.
  37. ExtraData() []byte
  38. }
  39. // A Channel is an ordered, reliable, flow-controlled, duplex stream
  40. // that is multiplexed over an SSH connection.
  41. type Channel interface {
  42. // Read reads up to len(data) bytes from the channel.
  43. Read(data []byte) (int, error)
  44. // Write writes len(data) bytes to the channel.
  45. Write(data []byte) (int, error)
  46. // Close signals end of channel use. No data may be sent after this
  47. // call.
  48. Close() error
  49. // CloseWrite signals the end of sending in-band
  50. // data. Requests may still be sent, and the other side may
  51. // still send data
  52. CloseWrite() error
  53. // SendRequest sends a channel request. If wantReply is true,
  54. // it will wait for a reply and return the result as a
  55. // boolean, otherwise the return value will be false. Channel
  56. // requests are out-of-band messages so they may be sent even
  57. // if the data stream is closed or blocked by flow control.
  58. // If the channel is closed before a reply is returned, io.EOF
  59. // is returned.
  60. SendRequest(name string, wantReply bool, payload []byte) (bool, error)
  61. // Stderr returns an io.ReadWriter that writes to this channel
  62. // with the extended data type set to stderr. Stderr may
  63. // safely be read and written from a different goroutine than
  64. // Read and Write respectively.
  65. Stderr() io.ReadWriter
  66. }
  67. // Request is a request sent outside of the normal stream of
  68. // data. Requests can either be specific to an SSH channel, or they
  69. // can be global.
  70. type Request struct {
  71. Type string
  72. WantReply bool
  73. Payload []byte
  74. ch *channel
  75. mux *mux
  76. }
  77. // Reply sends a response to a request. It must be called for all requests
  78. // where WantReply is true and is a no-op otherwise. The payload argument is
  79. // ignored for replies to channel-specific requests.
  80. func (r *Request) Reply(ok bool, payload []byte) error {
  81. if !r.WantReply {
  82. return nil
  83. }
  84. if r.ch == nil {
  85. return r.mux.ackRequest(ok, payload)
  86. }
  87. return r.ch.ackRequest(ok)
  88. }
  89. // RejectionReason is an enumeration used when rejecting channel creation
  90. // requests. See RFC 4254, section 5.1.
  91. type RejectionReason uint32
  92. const (
  93. Prohibited RejectionReason = iota + 1
  94. ConnectionFailed
  95. UnknownChannelType
  96. ResourceShortage
  97. )
  98. // String converts the rejection reason to human readable form.
  99. func (r RejectionReason) String() string {
  100. switch r {
  101. case Prohibited:
  102. return "administratively prohibited"
  103. case ConnectionFailed:
  104. return "connect failed"
  105. case UnknownChannelType:
  106. return "unknown channel type"
  107. case ResourceShortage:
  108. return "resource shortage"
  109. }
  110. return fmt.Sprintf("unknown reason %d", int(r))
  111. }
  112. func min(a uint32, b int) uint32 {
  113. if a < uint32(b) {
  114. return a
  115. }
  116. return uint32(b)
  117. }
  118. type channelDirection uint8
  119. const (
  120. channelInbound channelDirection = iota
  121. channelOutbound
  122. )
  123. // channel is an implementation of the Channel interface that works
  124. // with the mux class.
  125. type channel struct {
  126. // R/O after creation
  127. chanType string
  128. extraData []byte
  129. localId, remoteId uint32
  130. // maxIncomingPayload and maxRemotePayload are the maximum
  131. // payload sizes of normal and extended data packets for
  132. // receiving and sending, respectively. The wire packet will
  133. // be 9 or 13 bytes larger (excluding encryption overhead).
  134. maxIncomingPayload uint32
  135. maxRemotePayload uint32
  136. mux *mux
  137. // decided is set to true if an accept or reject message has been sent
  138. // (for outbound channels) or received (for inbound channels).
  139. decided bool
  140. // direction contains either channelOutbound, for channels created
  141. // locally, or channelInbound, for channels created by the peer.
  142. direction channelDirection
  143. // Pending internal channel messages.
  144. msg chan interface{}
  145. // Since requests have no ID, there can be only one request
  146. // with WantReply=true outstanding. This lock is held by a
  147. // goroutine that has such an outgoing request pending.
  148. sentRequestMu sync.Mutex
  149. incomingRequests chan *Request
  150. sentEOF bool
  151. // thread-safe data
  152. remoteWin window
  153. pending *buffer
  154. extPending *buffer
  155. // windowMu protects myWindow, the flow-control window.
  156. windowMu sync.Mutex
  157. myWindow uint32
  158. // writeMu serializes calls to mux.conn.writePacket() and
  159. // protects sentClose and packetPool. This mutex must be
  160. // different from windowMu, as writePacket can block if there
  161. // is a key exchange pending.
  162. writeMu sync.Mutex
  163. sentClose bool
  164. // packetPool has a buffer for each extended channel ID to
  165. // save allocations during writes.
  166. packetPool map[uint32][]byte
  167. }
  168. // writePacket sends a packet. If the packet is a channel close, it updates
  169. // sentClose. This method takes the lock c.writeMu.
  170. func (ch *channel) writePacket(packet []byte) error {
  171. ch.writeMu.Lock()
  172. if ch.sentClose {
  173. ch.writeMu.Unlock()
  174. return io.EOF
  175. }
  176. ch.sentClose = (packet[0] == msgChannelClose)
  177. err := ch.mux.conn.writePacket(packet)
  178. ch.writeMu.Unlock()
  179. return err
  180. }
  181. func (ch *channel) sendMessage(msg interface{}) error {
  182. if debugMux {
  183. log.Printf("send(%d): %#v", ch.mux.chanList.offset, msg)
  184. }
  185. p := Marshal(msg)
  186. binary.BigEndian.PutUint32(p[1:], ch.remoteId)
  187. return ch.writePacket(p)
  188. }
  189. // WriteExtended writes data to a specific extended stream. These streams are
  190. // used, for example, for stderr.
  191. func (ch *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
  192. if ch.sentEOF {
  193. return 0, io.EOF
  194. }
  195. // 1 byte message type, 4 bytes remoteId, 4 bytes data length
  196. opCode := byte(msgChannelData)
  197. headerLength := uint32(9)
  198. if extendedCode > 0 {
  199. headerLength += 4
  200. opCode = msgChannelExtendedData
  201. }
  202. ch.writeMu.Lock()
  203. packet := ch.packetPool[extendedCode]
  204. // We don't remove the buffer from packetPool, so
  205. // WriteExtended calls from different goroutines will be
  206. // flagged as errors by the race detector.
  207. ch.writeMu.Unlock()
  208. for len(data) > 0 {
  209. space := min(ch.maxRemotePayload, len(data))
  210. if space, err = ch.remoteWin.reserve(space); err != nil {
  211. return n, err
  212. }
  213. if want := headerLength + space; uint32(cap(packet)) < want {
  214. packet = make([]byte, want)
  215. } else {
  216. packet = packet[:want]
  217. }
  218. todo := data[:space]
  219. packet[0] = opCode
  220. binary.BigEndian.PutUint32(packet[1:], ch.remoteId)
  221. if extendedCode > 0 {
  222. binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
  223. }
  224. binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
  225. copy(packet[headerLength:], todo)
  226. if err = ch.writePacket(packet); err != nil {
  227. return n, err
  228. }
  229. n += len(todo)
  230. data = data[len(todo):]
  231. }
  232. ch.writeMu.Lock()
  233. ch.packetPool[extendedCode] = packet
  234. ch.writeMu.Unlock()
  235. return n, err
  236. }
  237. func (ch *channel) handleData(packet []byte) error {
  238. headerLen := 9
  239. isExtendedData := packet[0] == msgChannelExtendedData
  240. if isExtendedData {
  241. headerLen = 13
  242. }
  243. if len(packet) < headerLen {
  244. // malformed data packet
  245. return parseError(packet[0])
  246. }
  247. var extended uint32
  248. if isExtendedData {
  249. extended = binary.BigEndian.Uint32(packet[5:])
  250. }
  251. length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
  252. if length == 0 {
  253. return nil
  254. }
  255. if length > ch.maxIncomingPayload {
  256. // TODO(hanwen): should send Disconnect?
  257. return errors.New("ssh: incoming packet exceeds maximum payload size")
  258. }
  259. data := packet[headerLen:]
  260. if length != uint32(len(data)) {
  261. return errors.New("ssh: wrong packet length")
  262. }
  263. ch.windowMu.Lock()
  264. if ch.myWindow < length {
  265. ch.windowMu.Unlock()
  266. // TODO(hanwen): should send Disconnect with reason?
  267. return errors.New("ssh: remote side wrote too much")
  268. }
  269. ch.myWindow -= length
  270. ch.windowMu.Unlock()
  271. if extended == 1 {
  272. ch.extPending.write(data)
  273. } else if extended > 0 {
  274. // discard other extended data.
  275. } else {
  276. ch.pending.write(data)
  277. }
  278. return nil
  279. }
  280. func (c *channel) adjustWindow(n uint32) error {
  281. c.windowMu.Lock()
  282. // Since myWindow is managed on our side, and can never exceed
  283. // the initial window setting, we don't worry about overflow.
  284. c.myWindow += uint32(n)
  285. c.windowMu.Unlock()
  286. return c.sendMessage(windowAdjustMsg{
  287. AdditionalBytes: uint32(n),
  288. })
  289. }
  290. func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
  291. switch extended {
  292. case 1:
  293. n, err = c.extPending.Read(data)
  294. case 0:
  295. n, err = c.pending.Read(data)
  296. default:
  297. return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
  298. }
  299. if n > 0 {
  300. err = c.adjustWindow(uint32(n))
  301. // sendWindowAdjust can return io.EOF if the remote
  302. // peer has closed the connection, however we want to
  303. // defer forwarding io.EOF to the caller of Read until
  304. // the buffer has been drained.
  305. if n > 0 && err == io.EOF {
  306. err = nil
  307. }
  308. }
  309. return n, err
  310. }
  311. func (c *channel) close() {
  312. c.pending.eof()
  313. c.extPending.eof()
  314. close(c.msg)
  315. close(c.incomingRequests)
  316. c.writeMu.Lock()
  317. // This is not necessary for a normal channel teardown, but if
  318. // there was another error, it is.
  319. c.sentClose = true
  320. c.writeMu.Unlock()
  321. // Unblock writers.
  322. c.remoteWin.close()
  323. }
  324. // responseMessageReceived is called when a success or failure message is
  325. // received on a channel to check that such a message is reasonable for the
  326. // given channel.
  327. func (ch *channel) responseMessageReceived() error {
  328. if ch.direction == channelInbound {
  329. return errors.New("ssh: channel response message received on inbound channel")
  330. }
  331. if ch.decided {
  332. return errors.New("ssh: duplicate response received for channel")
  333. }
  334. ch.decided = true
  335. return nil
  336. }
  337. func (ch *channel) handlePacket(packet []byte) error {
  338. switch packet[0] {
  339. case msgChannelData, msgChannelExtendedData:
  340. return ch.handleData(packet)
  341. case msgChannelClose:
  342. ch.sendMessage(channelCloseMsg{PeersID: ch.remoteId})
  343. ch.mux.chanList.remove(ch.localId)
  344. ch.close()
  345. return nil
  346. case msgChannelEOF:
  347. // RFC 4254 is mute on how EOF affects dataExt messages but
  348. // it is logical to signal EOF at the same time.
  349. ch.extPending.eof()
  350. ch.pending.eof()
  351. return nil
  352. }
  353. decoded, err := decode(packet)
  354. if err != nil {
  355. return err
  356. }
  357. switch msg := decoded.(type) {
  358. case *channelOpenFailureMsg:
  359. if err := ch.responseMessageReceived(); err != nil {
  360. return err
  361. }
  362. ch.mux.chanList.remove(msg.PeersID)
  363. ch.msg <- msg
  364. case *channelOpenConfirmMsg:
  365. if err := ch.responseMessageReceived(); err != nil {
  366. return err
  367. }
  368. if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
  369. return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
  370. }
  371. ch.remoteId = msg.MyID
  372. ch.maxRemotePayload = msg.MaxPacketSize
  373. ch.remoteWin.add(msg.MyWindow)
  374. ch.msg <- msg
  375. case *windowAdjustMsg:
  376. if !ch.remoteWin.add(msg.AdditionalBytes) {
  377. return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
  378. }
  379. case *channelRequestMsg:
  380. req := Request{
  381. Type: msg.Request,
  382. WantReply: msg.WantReply,
  383. Payload: msg.RequestSpecificData,
  384. ch: ch,
  385. }
  386. ch.incomingRequests <- &req
  387. default:
  388. ch.msg <- msg
  389. }
  390. return nil
  391. }
  392. func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
  393. ch := &channel{
  394. remoteWin: window{Cond: newCond()},
  395. myWindow: channelWindowSize,
  396. pending: newBuffer(),
  397. extPending: newBuffer(),
  398. direction: direction,
  399. incomingRequests: make(chan *Request, chanSize),
  400. msg: make(chan interface{}, chanSize),
  401. chanType: chanType,
  402. extraData: extraData,
  403. mux: m,
  404. packetPool: make(map[uint32][]byte),
  405. }
  406. ch.localId = m.chanList.add(ch)
  407. return ch
  408. }
  409. var errUndecided = errors.New("ssh: must Accept or Reject channel")
  410. var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
  411. type extChannel struct {
  412. code uint32
  413. ch *channel
  414. }
  415. func (e *extChannel) Write(data []byte) (n int, err error) {
  416. return e.ch.WriteExtended(data, e.code)
  417. }
  418. func (e *extChannel) Read(data []byte) (n int, err error) {
  419. return e.ch.ReadExtended(data, e.code)
  420. }
  421. func (ch *channel) Accept() (Channel, <-chan *Request, error) {
  422. if ch.decided {
  423. return nil, nil, errDecidedAlready
  424. }
  425. ch.maxIncomingPayload = channelMaxPacket
  426. confirm := channelOpenConfirmMsg{
  427. PeersID: ch.remoteId,
  428. MyID: ch.localId,
  429. MyWindow: ch.myWindow,
  430. MaxPacketSize: ch.maxIncomingPayload,
  431. }
  432. ch.decided = true
  433. if err := ch.sendMessage(confirm); err != nil {
  434. return nil, nil, err
  435. }
  436. return ch, ch.incomingRequests, nil
  437. }
  438. func (ch *channel) Reject(reason RejectionReason, message string) error {
  439. if ch.decided {
  440. return errDecidedAlready
  441. }
  442. reject := channelOpenFailureMsg{
  443. PeersID: ch.remoteId,
  444. Reason: reason,
  445. Message: message,
  446. Language: "en",
  447. }
  448. ch.decided = true
  449. return ch.sendMessage(reject)
  450. }
  451. func (ch *channel) Read(data []byte) (int, error) {
  452. if !ch.decided {
  453. return 0, errUndecided
  454. }
  455. return ch.ReadExtended(data, 0)
  456. }
  457. func (ch *channel) Write(data []byte) (int, error) {
  458. if !ch.decided {
  459. return 0, errUndecided
  460. }
  461. return ch.WriteExtended(data, 0)
  462. }
  463. func (ch *channel) CloseWrite() error {
  464. if !ch.decided {
  465. return errUndecided
  466. }
  467. ch.sentEOF = true
  468. return ch.sendMessage(channelEOFMsg{
  469. PeersID: ch.remoteId})
  470. }
  471. func (ch *channel) Close() error {
  472. if !ch.decided {
  473. return errUndecided
  474. }
  475. return ch.sendMessage(channelCloseMsg{
  476. PeersID: ch.remoteId})
  477. }
  478. // Extended returns an io.ReadWriter that sends and receives data on the given,
  479. // SSH extended stream. Such streams are used, for example, for stderr.
  480. func (ch *channel) Extended(code uint32) io.ReadWriter {
  481. if !ch.decided {
  482. return nil
  483. }
  484. return &extChannel{code, ch}
  485. }
  486. func (ch *channel) Stderr() io.ReadWriter {
  487. return ch.Extended(1)
  488. }
  489. func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
  490. if !ch.decided {
  491. return false, errUndecided
  492. }
  493. if wantReply {
  494. ch.sentRequestMu.Lock()
  495. defer ch.sentRequestMu.Unlock()
  496. }
  497. msg := channelRequestMsg{
  498. PeersID: ch.remoteId,
  499. Request: name,
  500. WantReply: wantReply,
  501. RequestSpecificData: payload,
  502. }
  503. if err := ch.sendMessage(msg); err != nil {
  504. return false, err
  505. }
  506. if wantReply {
  507. m, ok := (<-ch.msg)
  508. if !ok {
  509. return false, io.EOF
  510. }
  511. switch m.(type) {
  512. case *channelRequestFailureMsg:
  513. return false, nil
  514. case *channelRequestSuccessMsg:
  515. return true, nil
  516. default:
  517. return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
  518. }
  519. }
  520. return false, nil
  521. }
  522. // ackRequest either sends an ack or nack to the channel request.
  523. func (ch *channel) ackRequest(ok bool) error {
  524. if !ch.decided {
  525. return errUndecided
  526. }
  527. var msg interface{}
  528. if !ok {
  529. msg = channelRequestFailureMsg{
  530. PeersID: ch.remoteId,
  531. }
  532. } else {
  533. msg = channelRequestSuccessMsg{
  534. PeersID: ch.remoteId,
  535. }
  536. }
  537. return ch.sendMessage(msg)
  538. }
  539. func (ch *channel) ChannelType() string {
  540. return ch.chanType
  541. }
  542. func (ch *channel) ExtraData() []byte {
  543. return ch.extraData
  544. }