channel.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. const (
  14. minPacketLength = 9
  15. // channelMaxPacket contains the maximum number of bytes that will be
  16. // sent in a single packet. As per RFC 4253, section 6.1, 32k is also
  17. // the minimum.
  18. channelMaxPacket = 1 << 15
  19. // We follow OpenSSH here.
  20. channelWindowSize = 64 * channelMaxPacket
  21. )
  22. // NewChannel represents an incoming request to a channel. It must either be
  23. // accepted for use by calling Accept, or rejected by calling Reject.
  24. type NewChannel interface {
  25. // Accept accepts the channel creation request. It returns the Channel
  26. // and a Go channel containing SSH requests. The Go channel must be
  27. // serviced otherwise the Channel will hang.
  28. Accept() (Channel, <-chan *Request, error)
  29. // Reject rejects the channel creation request. After calling
  30. // this, no other methods on the Channel may be called.
  31. Reject(reason RejectionReason, message string) error
  32. // ChannelType returns the type of the channel, as supplied by the
  33. // client.
  34. ChannelType() string
  35. // ExtraData returns the arbitrary payload for this channel, as supplied
  36. // by the client. This data is specific to the channel type.
  37. ExtraData() []byte
  38. }
  39. // A Channel is an ordered, reliable, flow-controlled, duplex stream
  40. // that is multiplexed over an SSH connection.
  41. type Channel interface {
  42. // Read reads up to len(data) bytes from the channel.
  43. Read(data []byte) (int, error)
  44. // Write writes len(data) bytes to the channel.
  45. Write(data []byte) (int, error)
  46. // Close signals end of channel use. No data may be sent after this
  47. // call.
  48. Close() error
  49. // CloseWrite signals the end of sending in-band
  50. // data. Requests may still be sent, and the other side may
  51. // still send data
  52. CloseWrite() error
  53. // SendRequest sends a channel request. If wantReply is true,
  54. // it will wait for a reply and return the result as a
  55. // boolean, otherwise the return value will be false. Channel
  56. // requests are out-of-band messages so they may be sent even
  57. // if the data stream is closed or blocked by flow control.
  58. SendRequest(name string, wantReply bool, payload []byte) (bool, error)
  59. // Stderr returns an io.ReadWriter that writes to this channel with the
  60. // extended data type set to stderr.
  61. Stderr() io.ReadWriter
  62. }
  63. // Request is a request sent outside of the normal stream of
  64. // data. Requests can either be specific to an SSH channel, or they
  65. // can be global.
  66. type Request struct {
  67. Type string
  68. WantReply bool
  69. Payload []byte
  70. ch *channel
  71. mux *mux
  72. }
  73. // Reply sends a response to a request. It must be called for all requests
  74. // where WantReply is true and is a no-op otherwise. The payload argument is
  75. // ignored for replies to channel-specific requests.
  76. func (r *Request) Reply(ok bool, payload []byte) error {
  77. if !r.WantReply {
  78. return nil
  79. }
  80. if r.ch == nil {
  81. return r.mux.ackRequest(ok, payload)
  82. }
  83. return r.ch.ackRequest(ok)
  84. }
  85. // RejectionReason is an enumeration used when rejecting channel creation
  86. // requests. See RFC 4254, section 5.1.
  87. type RejectionReason uint32
  88. const (
  89. Prohibited RejectionReason = iota + 1
  90. ConnectionFailed
  91. UnknownChannelType
  92. ResourceShortage
  93. )
  94. // String converts the rejection reason to human readable form.
  95. func (r RejectionReason) String() string {
  96. switch r {
  97. case Prohibited:
  98. return "administratively prohibited"
  99. case ConnectionFailed:
  100. return "connect failed"
  101. case UnknownChannelType:
  102. return "unknown channel type"
  103. case ResourceShortage:
  104. return "resource shortage"
  105. }
  106. return fmt.Sprintf("unknown reason %d", int(r))
  107. }
  108. func min(a uint32, b int) uint32 {
  109. if a < uint32(b) {
  110. return a
  111. }
  112. return uint32(b)
  113. }
  114. type channelDirection uint8
  115. const (
  116. channelInbound channelDirection = iota
  117. channelOutbound
  118. )
  119. // channel is an implementation of the Channel interface that works
  120. // with the mux class.
  121. type channel struct {
  122. // R/O after creation
  123. chanType string
  124. extraData []byte
  125. localId, remoteId uint32
  126. // maxIncomingPayload and maxRemotePayload are the maximum
  127. // payload sizes of normal and extended data packets for
  128. // receiving and sending, respectively. The wire packet will
  129. // be 9 or 13 bytes larger (excluding encryption overhead).
  130. maxIncomingPayload uint32
  131. maxRemotePayload uint32
  132. mux *mux
  133. // decided is set to true if an accept or reject message has been sent
  134. // (for outbound channels) or received (for inbound channels).
  135. decided bool
  136. // direction contains either channelOutbound, for channels created
  137. // locally, or channelInbound, for channels created by the peer.
  138. direction channelDirection
  139. // Pending internal channel messages.
  140. msg chan interface{}
  141. // Since requests have no ID, there can be only one request
  142. // with WantReply=true outstanding. This lock is held by a
  143. // goroutine that has such an outgoing request pending.
  144. sentRequestMu sync.Mutex
  145. incomingRequests chan *Request
  146. sentEOF bool
  147. // thread-safe data
  148. remoteWin window
  149. pending *buffer
  150. extPending *buffer
  151. // windowMu protects myWindow, the flow-control window.
  152. windowMu sync.Mutex
  153. myWindow uint32
  154. // writeMu serializes calls to mux.conn.writePacket() and
  155. // protects sentClose. This mutex must be different from
  156. // windowMu, as writePacket can block if there is a key
  157. // exchange pending
  158. writeMu sync.Mutex
  159. sentClose bool
  160. }
  161. // writePacket sends a packet. If the packet is a channel close, it updates
  162. // sentClose. This method takes the lock c.writeMu.
  163. func (c *channel) writePacket(packet []byte) error {
  164. c.writeMu.Lock()
  165. if c.sentClose {
  166. c.writeMu.Unlock()
  167. return io.EOF
  168. }
  169. c.sentClose = (packet[0] == msgChannelClose)
  170. err := c.mux.conn.writePacket(packet)
  171. c.writeMu.Unlock()
  172. return err
  173. }
  174. func (c *channel) sendMessage(msg interface{}) error {
  175. if debugMux {
  176. log.Printf("send %d: %#v", c.mux.chanList.offset, msg)
  177. }
  178. p := Marshal(msg)
  179. binary.BigEndian.PutUint32(p[1:], c.remoteId)
  180. return c.writePacket(p)
  181. }
  182. // WriteExtended writes data to a specific extended stream. These streams are
  183. // used, for example, for stderr.
  184. func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
  185. if c.sentEOF {
  186. return 0, io.EOF
  187. }
  188. // 1 byte message type, 4 bytes remoteId, 4 bytes data length
  189. opCode := byte(msgChannelData)
  190. headerLength := uint32(9)
  191. if extendedCode > 0 {
  192. headerLength += 4
  193. opCode = msgChannelExtendedData
  194. }
  195. for len(data) > 0 {
  196. space := min(c.maxRemotePayload, len(data))
  197. if space, err = c.remoteWin.reserve(space); err != nil {
  198. return n, err
  199. }
  200. todo := data[:space]
  201. packet := make([]byte, headerLength+uint32(len(todo)))
  202. packet[0] = opCode
  203. binary.BigEndian.PutUint32(packet[1:], c.remoteId)
  204. if extendedCode > 0 {
  205. binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
  206. }
  207. binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
  208. copy(packet[headerLength:], todo)
  209. if err = c.writePacket(packet); err != nil {
  210. return n, err
  211. }
  212. n += len(todo)
  213. data = data[len(todo):]
  214. }
  215. return n, err
  216. }
  217. func (c *channel) handleData(packet []byte) error {
  218. headerLen := 9
  219. isExtendedData := packet[0] == msgChannelExtendedData
  220. if isExtendedData {
  221. headerLen = 13
  222. }
  223. if len(packet) < headerLen {
  224. // malformed data packet
  225. return parseError(packet[0])
  226. }
  227. var extended uint32
  228. if isExtendedData {
  229. extended = binary.BigEndian.Uint32(packet[5:])
  230. }
  231. length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
  232. if length == 0 {
  233. return nil
  234. }
  235. if length > c.maxIncomingPayload {
  236. // TODO(hanwen): should send Disconnect?
  237. return errors.New("ssh: incoming packet exceeds maximum payload size")
  238. }
  239. data := packet[headerLen:]
  240. if length != uint32(len(data)) {
  241. return errors.New("ssh: wrong packet length")
  242. }
  243. c.windowMu.Lock()
  244. if c.myWindow < length {
  245. c.windowMu.Unlock()
  246. // TODO(hanwen): should send Disconnect with reason?
  247. return errors.New("ssh: remote side wrote too much")
  248. }
  249. c.myWindow -= length
  250. c.windowMu.Unlock()
  251. if extended == 1 {
  252. c.extPending.write(data)
  253. } else if extended > 0 {
  254. // discard other extended data.
  255. } else {
  256. c.pending.write(data)
  257. }
  258. return nil
  259. }
  260. func (c *channel) adjustWindow(n uint32) error {
  261. c.windowMu.Lock()
  262. // Since myWindow is managed on our side, and can never exceed
  263. // the initial window setting, we don't worry about overflow.
  264. c.myWindow += uint32(n)
  265. c.windowMu.Unlock()
  266. return c.sendMessage(windowAdjustMsg{
  267. AdditionalBytes: uint32(n),
  268. })
  269. }
  270. func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
  271. switch extended {
  272. case 1:
  273. n, err = c.extPending.Read(data)
  274. case 0:
  275. n, err = c.pending.Read(data)
  276. default:
  277. return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
  278. }
  279. if n > 0 {
  280. err = c.adjustWindow(uint32(n))
  281. // sendWindowAdjust can return io.EOF if the remote
  282. // peer has closed the connection, however we want to
  283. // defer forwarding io.EOF to the caller of Read until
  284. // the buffer has been drained.
  285. if n > 0 && err == io.EOF {
  286. err = nil
  287. }
  288. }
  289. return n, err
  290. }
  291. func (c *channel) close() {
  292. c.pending.eof()
  293. c.extPending.eof()
  294. close(c.msg)
  295. close(c.incomingRequests)
  296. c.writeMu.Lock()
  297. // This is not necesary for a normal channel teardown, but if
  298. // there was another error, it is.
  299. c.sentClose = true
  300. c.writeMu.Unlock()
  301. // Unblock writers.
  302. c.remoteWin.close()
  303. }
  304. // responseMessageReceived is called when a success or failure message is
  305. // received on a channel to check that such a message is reasonable for the
  306. // given channel.
  307. func (c *channel) responseMessageReceived() error {
  308. if c.direction == channelInbound {
  309. return errors.New("ssh: channel response message received on inbound channel")
  310. }
  311. if c.decided {
  312. return errors.New("ssh: duplicate response received for channel")
  313. }
  314. c.decided = true
  315. return nil
  316. }
  317. func (c *channel) handlePacket(packet []byte) error {
  318. switch packet[0] {
  319. case msgChannelData, msgChannelExtendedData:
  320. return c.handleData(packet)
  321. case msgChannelClose:
  322. c.sendMessage(channelCloseMsg{PeersId: c.remoteId})
  323. c.mux.chanList.remove(c.localId)
  324. c.close()
  325. return nil
  326. case msgChannelEOF:
  327. // RFC 4254 is mute on how EOF affects dataExt messages but
  328. // it is logical to signal EOF at the same time.
  329. c.extPending.eof()
  330. c.pending.eof()
  331. return nil
  332. }
  333. decoded, err := decode(packet)
  334. if err != nil {
  335. return err
  336. }
  337. switch msg := decoded.(type) {
  338. case *channelOpenFailureMsg:
  339. if err := c.responseMessageReceived(); err != nil {
  340. return err
  341. }
  342. c.mux.chanList.remove(msg.PeersId)
  343. c.msg <- msg
  344. case *channelOpenConfirmMsg:
  345. if err := c.responseMessageReceived(); err != nil {
  346. return err
  347. }
  348. if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
  349. return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
  350. }
  351. c.remoteId = msg.MyId
  352. c.maxRemotePayload = msg.MaxPacketSize
  353. c.remoteWin.add(msg.MyWindow)
  354. c.msg <- msg
  355. case *windowAdjustMsg:
  356. if !c.remoteWin.add(msg.AdditionalBytes) {
  357. return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
  358. }
  359. case *channelRequestMsg:
  360. req := Request{
  361. Type: msg.Request,
  362. WantReply: msg.WantReply,
  363. Payload: msg.RequestSpecificData,
  364. ch: c,
  365. }
  366. c.incomingRequests <- &req
  367. default:
  368. c.msg <- msg
  369. }
  370. return nil
  371. }
  372. func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
  373. ch := &channel{
  374. remoteWin: window{Cond: newCond()},
  375. myWindow: channelWindowSize,
  376. pending: newBuffer(),
  377. extPending: newBuffer(),
  378. direction: direction,
  379. incomingRequests: make(chan *Request, 16),
  380. msg: make(chan interface{}, 16),
  381. chanType: chanType,
  382. extraData: extraData,
  383. mux: m,
  384. }
  385. ch.localId = m.chanList.add(ch)
  386. return ch
  387. }
  388. var errUndecided = errors.New("ssh: must Accept or Reject channel")
  389. var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
  390. type extChannel struct {
  391. code uint32
  392. ch *channel
  393. }
  394. func (e *extChannel) Write(data []byte) (n int, err error) {
  395. return e.ch.WriteExtended(data, e.code)
  396. }
  397. func (e *extChannel) Read(data []byte) (n int, err error) {
  398. return e.ch.ReadExtended(data, e.code)
  399. }
  400. func (c *channel) Accept() (Channel, <-chan *Request, error) {
  401. if c.decided {
  402. return nil, nil, errDecidedAlready
  403. }
  404. c.maxIncomingPayload = channelMaxPacket
  405. confirm := channelOpenConfirmMsg{
  406. PeersId: c.remoteId,
  407. MyId: c.localId,
  408. MyWindow: c.myWindow,
  409. MaxPacketSize: c.maxIncomingPayload,
  410. }
  411. c.decided = true
  412. if err := c.sendMessage(confirm); err != nil {
  413. return nil, nil, err
  414. }
  415. return c, c.incomingRequests, nil
  416. }
  417. func (ch *channel) Reject(reason RejectionReason, message string) error {
  418. if ch.decided {
  419. return errDecidedAlready
  420. }
  421. reject := channelOpenFailureMsg{
  422. PeersId: ch.remoteId,
  423. Reason: reason,
  424. Message: message,
  425. Language: "en",
  426. }
  427. ch.decided = true
  428. return ch.sendMessage(reject)
  429. }
  430. func (ch *channel) Read(data []byte) (int, error) {
  431. if !ch.decided {
  432. return 0, errUndecided
  433. }
  434. return ch.ReadExtended(data, 0)
  435. }
  436. func (ch *channel) Write(data []byte) (int, error) {
  437. if !ch.decided {
  438. return 0, errUndecided
  439. }
  440. return ch.WriteExtended(data, 0)
  441. }
  442. func (ch *channel) CloseWrite() error {
  443. if !ch.decided {
  444. return errUndecided
  445. }
  446. ch.sentEOF = true
  447. return ch.sendMessage(channelEOFMsg{
  448. PeersId: ch.remoteId})
  449. }
  450. func (ch *channel) Close() error {
  451. if !ch.decided {
  452. return errUndecided
  453. }
  454. return ch.sendMessage(channelCloseMsg{
  455. PeersId: ch.remoteId})
  456. }
  457. // Extended returns an io.ReadWriter that sends and receives data on the given,
  458. // SSH extended stream. Such streams are used, for example, for stderr.
  459. func (ch *channel) Extended(code uint32) io.ReadWriter {
  460. if !ch.decided {
  461. return nil
  462. }
  463. return &extChannel{code, ch}
  464. }
  465. func (ch *channel) Stderr() io.ReadWriter {
  466. return ch.Extended(1)
  467. }
  468. func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
  469. if !ch.decided {
  470. return false, errUndecided
  471. }
  472. if wantReply {
  473. ch.sentRequestMu.Lock()
  474. defer ch.sentRequestMu.Unlock()
  475. }
  476. msg := channelRequestMsg{
  477. PeersId: ch.remoteId,
  478. Request: name,
  479. WantReply: wantReply,
  480. RequestSpecificData: payload,
  481. }
  482. if err := ch.sendMessage(msg); err != nil {
  483. return false, err
  484. }
  485. if wantReply {
  486. m, ok := (<-ch.msg)
  487. if !ok {
  488. return false, io.EOF
  489. }
  490. switch m.(type) {
  491. case *channelRequestFailureMsg:
  492. return false, nil
  493. case *channelRequestSuccessMsg:
  494. return true, nil
  495. default:
  496. return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
  497. }
  498. }
  499. return false, nil
  500. }
  501. // ackRequest either sends an ack or nack to the channel request.
  502. func (ch *channel) ackRequest(ok bool) error {
  503. if !ch.decided {
  504. return errUndecided
  505. }
  506. var msg interface{}
  507. if !ok {
  508. msg = channelRequestFailureMsg{
  509. PeersId: ch.remoteId,
  510. }
  511. } else {
  512. msg = channelRequestSuccessMsg{
  513. PeersId: ch.remoteId,
  514. }
  515. }
  516. return ch.sendMessage(msg)
  517. }
  518. func (ch *channel) ChannelType() string {
  519. return ch.chanType
  520. }
  521. func (ch *channel) ExtraData() []byte {
  522. return ch.extraData
  523. }