channel.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // A Channel is an ordered, reliable, duplex stream that is multiplexed over an
  11. // SSH connection.
  12. type Channel interface {
  13. // Accept accepts the channel creation request.
  14. Accept() error
  15. // Reject rejects the channel creation request. After calling this, no
  16. // other methods on the Channel may be called. If they are then the
  17. // peer is likely to signal a protocol error and drop the connection.
  18. Reject(reason RejectionReason, message string) error
  19. // Read may return a ChannelRequest as an error.
  20. Read(data []byte) (int, error)
  21. Write(data []byte) (int, error)
  22. Close() error
  23. // AckRequest either sends an ack or nack to the channel request.
  24. AckRequest(ok bool) error
  25. // ChannelType returns the type of the channel, as supplied by the
  26. // client.
  27. ChannelType() string
  28. // ExtraData returns the arbitary payload for this channel, as supplied
  29. // by the client. This data is specific to the channel type.
  30. ExtraData() []byte
  31. }
  32. // ChannelRequest represents a request sent on a channel, outside of the normal
  33. // stream of bytes. It may result from calling Read on a Channel.
  34. type ChannelRequest struct {
  35. Request string
  36. WantReply bool
  37. Payload []byte
  38. }
  39. func (c ChannelRequest) Error() string {
  40. return "channel request received"
  41. }
  42. // RejectionReason is an enumeration used when rejecting channel creation
  43. // requests. See RFC 4254, section 5.1.
  44. type RejectionReason int
  45. const (
  46. Prohibited RejectionReason = iota + 1
  47. ConnectionFailed
  48. UnknownChannelType
  49. ResourceShortage
  50. )
  51. type channel struct {
  52. // immutable once created
  53. chanType string
  54. extraData []byte
  55. theyClosed bool
  56. theySentEOF bool
  57. weClosed bool
  58. dead bool
  59. serverConn *ServerConn
  60. myId, theirId uint32
  61. myWindow, theirWindow uint32
  62. maxPacketSize uint32
  63. err error
  64. pendingRequests []ChannelRequest
  65. pendingData []byte
  66. head, length int
  67. // This lock is inferior to serverConn.lock
  68. lock sync.Mutex
  69. cond *sync.Cond
  70. }
  71. func (c *channel) Accept() error {
  72. c.serverConn.lock.Lock()
  73. defer c.serverConn.lock.Unlock()
  74. if c.serverConn.err != nil {
  75. return c.serverConn.err
  76. }
  77. confirm := channelOpenConfirmMsg{
  78. PeersId: c.theirId,
  79. MyId: c.myId,
  80. MyWindow: c.myWindow,
  81. MaxPacketSize: c.maxPacketSize,
  82. }
  83. return c.serverConn.writePacket(marshal(msgChannelOpenConfirm, confirm))
  84. }
  85. func (c *channel) Reject(reason RejectionReason, message string) error {
  86. c.serverConn.lock.Lock()
  87. defer c.serverConn.lock.Unlock()
  88. if c.serverConn.err != nil {
  89. return c.serverConn.err
  90. }
  91. reject := channelOpenFailureMsg{
  92. PeersId: c.theirId,
  93. Reason: uint32(reason),
  94. Message: message,
  95. Language: "en",
  96. }
  97. return c.serverConn.writePacket(marshal(msgChannelOpenFailure, reject))
  98. }
  99. func (c *channel) handlePacket(packet interface{}) {
  100. c.lock.Lock()
  101. defer c.lock.Unlock()
  102. switch packet := packet.(type) {
  103. case *channelRequestMsg:
  104. req := ChannelRequest{
  105. Request: packet.Request,
  106. WantReply: packet.WantReply,
  107. Payload: packet.RequestSpecificData,
  108. }
  109. c.pendingRequests = append(c.pendingRequests, req)
  110. c.cond.Signal()
  111. case *channelCloseMsg:
  112. c.theyClosed = true
  113. c.cond.Signal()
  114. case *channelEOFMsg:
  115. c.theySentEOF = true
  116. c.cond.Signal()
  117. default:
  118. panic("unknown packet type")
  119. }
  120. }
  121. func (c *channel) handleData(data []byte) {
  122. c.lock.Lock()
  123. defer c.lock.Unlock()
  124. // The other side should never send us more than our window.
  125. if len(data)+c.length > len(c.pendingData) {
  126. // TODO(agl): we should tear down the channel with a protocol
  127. // error.
  128. return
  129. }
  130. c.myWindow -= uint32(len(data))
  131. for i := 0; i < 2; i++ {
  132. tail := c.head + c.length
  133. if tail > len(c.pendingData) {
  134. tail -= len(c.pendingData)
  135. }
  136. n := copy(c.pendingData[tail:], data)
  137. data = data[n:]
  138. c.length += n
  139. }
  140. c.cond.Signal()
  141. }
  142. func (c *channel) Read(data []byte) (n int, err error) {
  143. c.lock.Lock()
  144. defer c.lock.Unlock()
  145. if c.err != nil {
  146. return 0, c.err
  147. }
  148. if c.myWindow <= uint32(len(c.pendingData))/2 {
  149. packet := marshal(msgChannelWindowAdjust, windowAdjustMsg{
  150. PeersId: c.theirId,
  151. AdditionalBytes: uint32(len(c.pendingData)) - c.myWindow,
  152. })
  153. if err := c.serverConn.writePacket(packet); err != nil {
  154. return 0, err
  155. }
  156. }
  157. for {
  158. if c.theySentEOF || c.theyClosed || c.dead {
  159. return 0, io.EOF
  160. }
  161. if len(c.pendingRequests) > 0 {
  162. req := c.pendingRequests[0]
  163. if len(c.pendingRequests) == 1 {
  164. c.pendingRequests = nil
  165. } else {
  166. oldPendingRequests := c.pendingRequests
  167. c.pendingRequests = make([]ChannelRequest, len(oldPendingRequests)-1)
  168. copy(c.pendingRequests, oldPendingRequests[1:])
  169. }
  170. return 0, req
  171. }
  172. if c.length > 0 {
  173. tail := c.head + c.length
  174. if tail > len(c.pendingData) {
  175. tail -= len(c.pendingData)
  176. }
  177. n = copy(data, c.pendingData[c.head:tail])
  178. c.head += n
  179. c.length -= n
  180. if c.head == len(c.pendingData) {
  181. c.head = 0
  182. }
  183. return
  184. }
  185. c.cond.Wait()
  186. }
  187. panic("unreachable")
  188. }
  189. func (c *channel) Write(data []byte) (n int, err error) {
  190. for len(data) > 0 {
  191. c.lock.Lock()
  192. if c.dead || c.weClosed {
  193. return 0, io.EOF
  194. }
  195. if c.theirWindow == 0 {
  196. c.cond.Wait()
  197. continue
  198. }
  199. c.lock.Unlock()
  200. todo := data
  201. if uint32(len(todo)) > c.theirWindow {
  202. todo = todo[:c.theirWindow]
  203. }
  204. packet := make([]byte, 1+4+4+len(todo))
  205. packet[0] = msgChannelData
  206. packet[1] = byte(c.theirId >> 24)
  207. packet[2] = byte(c.theirId >> 16)
  208. packet[3] = byte(c.theirId >> 8)
  209. packet[4] = byte(c.theirId)
  210. packet[5] = byte(len(todo) >> 24)
  211. packet[6] = byte(len(todo) >> 16)
  212. packet[7] = byte(len(todo) >> 8)
  213. packet[8] = byte(len(todo))
  214. copy(packet[9:], todo)
  215. c.serverConn.lock.Lock()
  216. if err = c.serverConn.writePacket(packet); err != nil {
  217. c.serverConn.lock.Unlock()
  218. return
  219. }
  220. c.serverConn.lock.Unlock()
  221. n += len(todo)
  222. data = data[len(todo):]
  223. }
  224. return
  225. }
  226. func (c *channel) Close() error {
  227. c.serverConn.lock.Lock()
  228. defer c.serverConn.lock.Unlock()
  229. if c.serverConn.err != nil {
  230. return c.serverConn.err
  231. }
  232. if c.weClosed {
  233. return errors.New("ssh: channel already closed")
  234. }
  235. c.weClosed = true
  236. closeMsg := channelCloseMsg{
  237. PeersId: c.theirId,
  238. }
  239. return c.serverConn.writePacket(marshal(msgChannelClose, closeMsg))
  240. }
  241. func (c *channel) AckRequest(ok bool) error {
  242. c.serverConn.lock.Lock()
  243. defer c.serverConn.lock.Unlock()
  244. if c.serverConn.err != nil {
  245. return c.serverConn.err
  246. }
  247. if ok {
  248. ack := channelRequestSuccessMsg{
  249. PeersId: c.theirId,
  250. }
  251. return c.serverConn.writePacket(marshal(msgChannelSuccess, ack))
  252. } else {
  253. ack := channelRequestFailureMsg{
  254. PeersId: c.theirId,
  255. }
  256. return c.serverConn.writePacket(marshal(msgChannelFailure, ack))
  257. }
  258. panic("unreachable")
  259. }
  260. func (c *channel) ChannelType() string {
  261. return c.chanType
  262. }
  263. func (c *channel) ExtraData() []byte {
  264. return c.extraData
  265. }