transport.go 6.2 KB


  1. // Copyright 2015 The Go Authors.
  2. // See https://go.googlesource.com/go/+/master/CONTRIBUTORS
  3. // Licensed under the same terms as Go itself:
  4. // https://go.googlesource.com/go/+/master/LICENSE
  5. package http2
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/tls"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "log"
  14. "net"
  15. "net/http"
  16. "strings"
  17. "sync"
  18. "github.com/bradfitz/http2/hpack"
  19. )
  20. type Transport struct {
  21. Fallback http.RoundTripper
  22. }
  23. type clientConn struct {
  24. tconn *tls.Conn
  25. bw *bufio.Writer
  26. br *bufio.Reader
  27. fr *Framer
  28. readerDone chan struct{} // closed on error
  29. readerErr error // set before readerDone is closed
  30. werr error // first write error that has occurred
  31. hbuf bytes.Buffer // HPACK encoder writes into this
  32. henc *hpack.Encoder
  33. hdec *hpack.Decoder
  34. nextRes http.Header
  35. // Settings from peer:
  36. maxFrameSize uint32
  37. mu sync.Mutex
  38. streams map[uint32]*clientStream
  39. nextStreamID uint32
  40. }
  41. type clientStream struct {
  42. ID uint32
  43. resc chan *http.Response
  44. pw *io.PipeWriter
  45. pr *io.PipeReader
  46. }
  47. type stickyErrWriter struct {
  48. w io.Writer
  49. err *error
  50. }
  51. func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
  52. if *sew.err != nil {
  53. return 0, *sew.err
  54. }
  55. n, err = sew.w.Write(p)
  56. *sew.err = err
  57. return
  58. }
  59. func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
  60. if req.URL.Scheme != "https" {
  61. if t.Fallback == nil {
  62. return nil, errors.New("http2: unsupported scheme and no Fallback")
  63. }
  64. return t.Fallback.RoundTrip(req)
  65. }
  66. host, port, err := net.SplitHostPort(req.URL.Host)
  67. if err != nil {
  68. host = req.URL.Host
  69. port = "443"
  70. }
  71. cfg := &tls.Config{
  72. ServerName: host,
  73. NextProtos: []string{NextProtoTLS},
  74. }
  75. tconn, err := tls.Dial("tcp", host+":"+port, cfg)
  76. if err != nil {
  77. return nil, err
  78. }
  79. if err := tconn.Handshake(); err != nil {
  80. return nil, err
  81. }
  82. if err := tconn.VerifyHostname(cfg.ServerName); err != nil {
  83. return nil, err
  84. }
  85. state := tconn.ConnectionState()
  86. if p := state.NegotiatedProtocol; p != NextProtoTLS {
  87. // TODO(bradfitz): fall back to Fallback
  88. return nil, fmt.Errorf("bad protocol: %v", p)
  89. }
  90. if !state.NegotiatedProtocolIsMutual {
  91. return nil, errors.New("could not negotiate protocol mutually")
  92. }
  93. if _, err := tconn.Write(clientPreface); err != nil {
  94. return nil, err
  95. }
  96. cc := &clientConn{
  97. tconn: tconn,
  98. readerDone: make(chan struct{}),
  99. nextStreamID: 1,
  100. streams: make(map[uint32]*clientStream),
  101. }
  102. cc.bw = bufio.NewWriter(stickyErrWriter{tconn, &cc.werr})
  103. cc.br = bufio.NewReader(tconn)
  104. cc.fr = NewFramer(cc.bw, cc.br)
  105. cc.henc = hpack.NewEncoder(&cc.hbuf)
  106. cc.fr.WriteSettings()
  107. cc.bw.Flush()
  108. if cc.werr != nil {
  109. return nil, cc.werr
  110. }
  111. // Read the obligatory SETTINGS frame
  112. f, err := cc.fr.ReadFrame()
  113. if err != nil {
  114. return nil, err
  115. }
  116. sf, ok := f.(*SettingsFrame)
  117. if !ok {
  118. return nil, fmt.Errorf("expected settings frame, got: %T", f)
  119. }
  120. cc.fr.WriteSettingsAck()
  121. cc.bw.Flush()
  122. sf.ForeachSetting(func(s Setting) error {
  123. switch s.ID {
  124. case SettingMaxFrameSize:
  125. cc.maxFrameSize = s.Val
  126. // TODO(bradfitz): handle the others
  127. default:
  128. log.Printf("Unhandled Setting: %v", s)
  129. }
  130. return nil
  131. })
  132. // TODO: figure out henc size
  133. cc.hdec = hpack.NewDecoder(initialHeaderTableSize, cc.onNewHeaderField)
  134. go cc.readLoop()
  135. cs := cc.newStream()
  136. hasBody := false // TODO
  137. // we send: HEADERS[+CONTINUATION] + (DATA?)
  138. hdrs := cc.encodeHeaders(req)
  139. first := true
  140. for len(hdrs) > 0 {
  141. chunk := hdrs
  142. if len(chunk) > int(cc.maxFrameSize) {
  143. chunk = chunk[:cc.maxFrameSize]
  144. }
  145. hdrs = hdrs[len(chunk):]
  146. endHeaders := len(hdrs) == 0
  147. if first {
  148. cc.fr.WriteHeaders(HeadersFrameParam{
  149. StreamID: cs.ID,
  150. BlockFragment: chunk,
  151. EndStream: !hasBody,
  152. EndHeaders: endHeaders,
  153. })
  154. first = false
  155. } else {
  156. cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
  157. }
  158. }
  159. cc.bw.Flush()
  160. if cc.werr != nil {
  161. return nil, cc.werr
  162. }
  163. return <-cs.resc, nil
  164. }
  165. func (cc *clientConn) encodeHeaders(req *http.Request) []byte {
  166. cc.hbuf.Reset()
  167. // TODO(bradfitz): figure out :authority-vs-Host stuff between http2 and Go
  168. host := req.Host
  169. if host == "" {
  170. host = req.URL.Host
  171. }
  172. cc.writeHeader(":method", req.Method)
  173. cc.writeHeader(":scheme", "https")
  174. cc.writeHeader(":authority", host) // probably not right for all sites
  175. cc.writeHeader(":path", req.URL.Path)
  176. for k, vv := range req.Header {
  177. for _, v := range vv {
  178. cc.writeHeader(strings.ToLower(k), v)
  179. }
  180. }
  181. if _, ok := req.Header["Host"]; !ok {
  182. cc.writeHeader("host", host)
  183. }
  184. return cc.hbuf.Bytes()
  185. }
  186. func (cc *clientConn) writeHeader(name, value string) {
  187. log.Printf("sending %q = %q", name, value)
  188. cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  189. }
  190. func (cc *clientConn) newStream() *clientStream {
  191. cc.mu.Lock()
  192. defer cc.mu.Unlock()
  193. cs := &clientStream{
  194. ID: cc.nextStreamID,
  195. resc: make(chan *http.Response, 1),
  196. }
  197. cc.nextStreamID += 2
  198. cc.streams[cs.ID] = cs
  199. return cs
  200. }
  201. func (cc *clientConn) streamByID(id uint32) *clientStream {
  202. cc.mu.Lock()
  203. defer cc.mu.Unlock()
  204. return cc.streams[id]
  205. }
  206. // runs in its own goroutine.
  207. func (cc *clientConn) readLoop() {
  208. defer close(cc.readerDone)
  209. for {
  210. f, err := cc.fr.ReadFrame()
  211. if err != nil {
  212. cc.readerErr = err
  213. // TODO: don't log it.
  214. log.Printf("ReadFrame: %v", err)
  215. return
  216. }
  217. cs := cc.streamByID(f.Header().StreamID)
  218. log.Printf("Read %v: %#v", f.Header(), f)
  219. headersEnded := false
  220. streamEnded := false
  221. if ff, ok := f.(interface {
  222. StreamEnded() bool
  223. }); ok {
  224. streamEnded = ff.StreamEnded()
  225. }
  226. switch f := f.(type) {
  227. case *HeadersFrame:
  228. cc.nextRes = make(http.Header)
  229. cs.pr, cs.pw = io.Pipe()
  230. cc.hdec.Write(f.HeaderBlockFragment())
  231. headersEnded = f.HeadersEnded()
  232. case *ContinuationFrame:
  233. // TODO: verify stream id is the same
  234. cc.hdec.Write(f.HeaderBlockFragment())
  235. headersEnded = f.HeadersEnded()
  236. case *DataFrame:
  237. log.Printf("DATA: %q", f.Data())
  238. cs.pw.Write(f.Data())
  239. default:
  240. }
  241. if streamEnded {
  242. cs.pw.Close()
  243. }
  244. if headersEnded {
  245. if cs == nil {
  246. panic("couldn't find stream") // TODO be graceful
  247. }
  248. cs.resc <- &http.Response{
  249. Header: cc.nextRes,
  250. Body: cs.pr,
  251. }
  252. }
  253. }
  254. }
  255. func (cc *clientConn) onNewHeaderField(f hpack.HeaderField) {
  256. log.Printf("Header field: %+v", f)
  257. cc.nextRes.Add(http.CanonicalHeaderKey(f.Name), f.Value)
  258. }