transport.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code.
  5. package http2
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/tls"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "io/ioutil"
  14. "log"
  15. "net"
  16. "net/http"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "golang.org/x/net/http2/hpack"
  21. )
  22. const (
  23. // transportDefaultConnFlow is how many connection-level flow control
  24. // tokens we give the server at start-up, past the default 64k.
  25. transportDefaultConnFlow = 1 << 30
  26. // transportDefaultStreamFlow is how many stream-level flow
  27. // control tokens we announce to the peer, and how many bytes
  28. // we buffer per stream.
  29. transportDefaultStreamFlow = 4 << 20
  30. // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
  31. // a stream-level WINDOW_UPDATE for at a time.
  32. transportDefaultStreamMinRefresh = 4 << 10
  33. )
  34. // Transport is an HTTP/2 Transport.
  35. //
  36. // A Transport internally caches connections to servers. It is safe
  37. // for concurrent use by multiple goroutines.
  38. type Transport struct {
  39. // DialTLS specifies an optional dial function for creating
  40. // TLS connections for requests.
  41. //
  42. // If DialTLS is nil, tls.Dial is used.
  43. //
  44. // If the returned net.Conn has a ConnectionState method like tls.Conn,
  45. // it will be used to set http.Response.TLS.
  46. DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
  47. // TLSClientConfig specifies the TLS configuration to use with
  48. // tls.Client. If nil, the default configuration is used.
  49. TLSClientConfig *tls.Config
  50. // ConnPool optionally specifies an alternate connection pool to use.
  51. // If nil, the default is used.
  52. ConnPool ClientConnPool
  53. connPoolOnce sync.Once
  54. connPoolOrDef ClientConnPool // non-nil version of ConnPool
  55. }
  56. var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6")
  57. // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
  58. // It requires Go 1.6 or later and returns an error if the net/http package is too old
  59. // or if t1 has already been HTTP/2-enabled.
  60. func ConfigureTransport(t1 *http.Transport) error {
  61. return configureTransport(t1) // in configure_transport.go (go1.6) or go15.go
  62. }
  63. func (t *Transport) connPool() ClientConnPool {
  64. t.connPoolOnce.Do(t.initConnPool)
  65. return t.connPoolOrDef
  66. }
  67. func (t *Transport) initConnPool() {
  68. if t.ConnPool != nil {
  69. t.connPoolOrDef = t.ConnPool
  70. } else {
  71. t.connPoolOrDef = &clientConnPool{t: t}
  72. }
  73. }
  74. // ClientConn is the state of a single HTTP/2 client connection to an
  75. // HTTP/2 server.
  76. type ClientConn struct {
  77. t *Transport
  78. tconn net.Conn // usually *tls.Conn, except specialized impls
  79. tlsState *tls.ConnectionState // nil only for specialized impls
  80. // readLoop goroutine fields:
  81. readerDone chan struct{} // closed on error
  82. readerErr error // set before readerDone is closed
  83. mu sync.Mutex // guards following
  84. cond *sync.Cond // hold mu; broadcast on flow/closed changes
  85. flow flow // our conn-level flow control quota (cs.flow is per stream)
  86. inflow flow // peer's conn-level flow control
  87. closed bool
  88. goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
  89. streams map[uint32]*clientStream // client-initiated
  90. nextStreamID uint32
  91. bw *bufio.Writer
  92. br *bufio.Reader
  93. fr *Framer
  94. // Settings from peer:
  95. maxFrameSize uint32
  96. maxConcurrentStreams uint32
  97. initialWindowSize uint32
  98. hbuf bytes.Buffer // HPACK encoder writes into this
  99. henc *hpack.Encoder
  100. freeBuf [][]byte
  101. wmu sync.Mutex // held while writing; acquire AFTER wmu if holding both
  102. werr error // first write error that has occurred
  103. }
  104. // clientStream is the state for a single HTTP/2 stream. One of these
  105. // is created for each Transport.RoundTrip call.
  106. type clientStream struct {
  107. cc *ClientConn
  108. ID uint32
  109. resc chan resAndError
  110. bufPipe pipe // buffered pipe with the flow-controlled response payload
  111. flow flow // guarded by cc.mu
  112. inflow flow // guarded by cc.mu
  113. peerReset chan struct{} // closed on peer reset
  114. resetErr error // populated before peerReset is closed
  115. }
  116. // checkReset reports any error sent in a RST_STREAM frame by the
  117. // server.
  118. func (cs *clientStream) checkReset() error {
  119. select {
  120. case <-cs.peerReset:
  121. return cs.resetErr
  122. default:
  123. return nil
  124. }
  125. }
  126. type stickyErrWriter struct {
  127. w io.Writer
  128. err *error
  129. }
  130. func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
  131. if *sew.err != nil {
  132. return 0, *sew.err
  133. }
  134. n, err = sew.w.Write(p)
  135. *sew.err = err
  136. return
  137. }
  138. var ErrNoCachedConn = errors.New("http2: no cached connection was available")
  139. // RoundTripOpt are options for the Transport.RoundTripOpt method.
  140. type RoundTripOpt struct {
  141. // OnlyCachedConn controls whether RoundTripOpt may
  142. // create a new TCP connection. If set true and
  143. // no cached connection is available, RoundTripOpt
  144. // will return ErrNoCachedConn.
  145. OnlyCachedConn bool
  146. }
  147. func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
  148. return t.RoundTripOpt(req, RoundTripOpt{})
  149. }
  150. // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
  151. // and returns a host:port. The port 443 is added if needed.
  152. func authorityAddr(authority string) (addr string) {
  153. if _, _, err := net.SplitHostPort(authority); err == nil {
  154. return authority
  155. }
  156. return net.JoinHostPort(authority, "443")
  157. }
  158. // RoundTripOpt is like RoundTrip, but takes options.
  159. func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
  160. if req.URL.Scheme != "https" {
  161. return nil, errors.New("http2: unsupported scheme")
  162. }
  163. addr := authorityAddr(req.URL.Host)
  164. for {
  165. cc, err := t.connPool().GetClientConn(req, addr)
  166. if err != nil {
  167. return nil, err
  168. }
  169. res, err := cc.RoundTrip(req)
  170. if shouldRetryRequest(req, err) {
  171. continue
  172. }
  173. if err != nil {
  174. return nil, err
  175. }
  176. return res, nil
  177. }
  178. }
  179. // CloseIdleConnections closes any connections which were previously
  180. // connected from previous requests but are now sitting idle.
  181. // It does not interrupt any connections currently in use.
  182. func (t *Transport) CloseIdleConnections() {
  183. if cp, ok := t.connPool().(*clientConnPool); ok {
  184. cp.closeIdleConnections()
  185. }
  186. }
  187. var (
  188. errClientConnClosed = errors.New("http2: client conn is closed")
  189. errClientConnUnusable = errors.New("http2: client conn not usable")
  190. )
  191. func shouldRetryRequest(req *http.Request, err error) bool {
  192. // TODO: retry GET requests (no bodies) more aggressively, if shutdown
  193. // before response.
  194. return err == errClientConnUnusable
  195. }
  196. func (t *Transport) dialClientConn(addr string) (*ClientConn, error) {
  197. host, _, err := net.SplitHostPort(addr)
  198. if err != nil {
  199. return nil, err
  200. }
  201. tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host))
  202. if err != nil {
  203. return nil, err
  204. }
  205. return t.NewClientConn(tconn)
  206. }
  207. func (t *Transport) newTLSConfig(host string) *tls.Config {
  208. cfg := new(tls.Config)
  209. if t.TLSClientConfig != nil {
  210. *cfg = *t.TLSClientConfig
  211. }
  212. cfg.NextProtos = []string{NextProtoTLS} // TODO: don't override if already in list
  213. cfg.ServerName = host
  214. return cfg
  215. }
  216. func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) {
  217. if t.DialTLS != nil {
  218. return t.DialTLS
  219. }
  220. return t.dialTLSDefault
  221. }
  222. func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) {
  223. cn, err := tls.Dial(network, addr, cfg)
  224. if err != nil {
  225. return nil, err
  226. }
  227. if err := cn.Handshake(); err != nil {
  228. return nil, err
  229. }
  230. if !cfg.InsecureSkipVerify {
  231. if err := cn.VerifyHostname(cfg.ServerName); err != nil {
  232. return nil, err
  233. }
  234. }
  235. state := cn.ConnectionState()
  236. if p := state.NegotiatedProtocol; p != NextProtoTLS {
  237. return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
  238. }
  239. if !state.NegotiatedProtocolIsMutual {
  240. return nil, errors.New("http2: could not negotiate protocol mutually")
  241. }
  242. return cn, nil
  243. }
  244. func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
  245. if _, err := c.Write(clientPreface); err != nil {
  246. return nil, err
  247. }
  248. cc := &ClientConn{
  249. t: t,
  250. tconn: c,
  251. readerDone: make(chan struct{}),
  252. nextStreamID: 1,
  253. maxFrameSize: 16 << 10, // spec default
  254. initialWindowSize: 65535, // spec default
  255. maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
  256. streams: make(map[uint32]*clientStream),
  257. }
  258. cc.cond = sync.NewCond(&cc.mu)
  259. cc.flow.add(int32(initialWindowSize))
  260. // TODO: adjust this writer size to account for frame size +
  261. // MTU + crypto/tls record padding.
  262. cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
  263. cc.br = bufio.NewReader(c)
  264. cc.fr = NewFramer(cc.bw, cc.br)
  265. cc.henc = hpack.NewEncoder(&cc.hbuf)
  266. type connectionStater interface {
  267. ConnectionState() tls.ConnectionState
  268. }
  269. if cs, ok := c.(connectionStater); ok {
  270. state := cs.ConnectionState()
  271. cc.tlsState = &state
  272. }
  273. cc.fr.WriteSettings(
  274. Setting{ID: SettingEnablePush, Val: 0},
  275. Setting{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
  276. )
  277. cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
  278. cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
  279. cc.bw.Flush()
  280. if cc.werr != nil {
  281. return nil, cc.werr
  282. }
  283. // Read the obligatory SETTINGS frame
  284. f, err := cc.fr.ReadFrame()
  285. if err != nil {
  286. return nil, err
  287. }
  288. sf, ok := f.(*SettingsFrame)
  289. if !ok {
  290. return nil, fmt.Errorf("expected settings frame, got: %T", f)
  291. }
  292. cc.fr.WriteSettingsAck()
  293. cc.bw.Flush()
  294. sf.ForeachSetting(func(s Setting) error {
  295. switch s.ID {
  296. case SettingMaxFrameSize:
  297. cc.maxFrameSize = s.Val
  298. case SettingMaxConcurrentStreams:
  299. cc.maxConcurrentStreams = s.Val
  300. case SettingInitialWindowSize:
  301. cc.initialWindowSize = s.Val
  302. default:
  303. // TODO(bradfitz): handle more
  304. t.vlogf("Unhandled Setting: %v", s)
  305. }
  306. return nil
  307. })
  308. go cc.readLoop()
  309. return cc, nil
  310. }
  311. func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
  312. cc.mu.Lock()
  313. defer cc.mu.Unlock()
  314. cc.goAway = f
  315. }
  316. func (cc *ClientConn) CanTakeNewRequest() bool {
  317. cc.mu.Lock()
  318. defer cc.mu.Unlock()
  319. return cc.canTakeNewRequestLocked()
  320. }
  321. func (cc *ClientConn) canTakeNewRequestLocked() bool {
  322. return cc.goAway == nil &&
  323. int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
  324. cc.nextStreamID < 2147483647
  325. }
  326. func (cc *ClientConn) closeIfIdle() {
  327. cc.mu.Lock()
  328. if len(cc.streams) > 0 {
  329. cc.mu.Unlock()
  330. return
  331. }
  332. cc.closed = true
  333. // TODO: do clients send GOAWAY too? maybe? Just Close:
  334. cc.mu.Unlock()
  335. cc.tconn.Close()
  336. }
  337. const maxAllocFrameSize = 512 << 10
  338. // frameBuffer returns a scratch buffer suitable for writing DATA frames.
  339. // They're capped at the min of the peer's max frame size or 512KB
  340. // (kinda arbitrarily), but definitely capped so we don't allocate 4GB
  341. // bufers.
  342. func (cc *ClientConn) frameScratchBuffer() []byte {
  343. cc.mu.Lock()
  344. size := cc.maxFrameSize
  345. if size > maxAllocFrameSize {
  346. size = maxAllocFrameSize
  347. }
  348. for i, buf := range cc.freeBuf {
  349. if len(buf) >= int(size) {
  350. cc.freeBuf[i] = nil
  351. cc.mu.Unlock()
  352. return buf[:size]
  353. }
  354. }
  355. cc.mu.Unlock()
  356. return make([]byte, size)
  357. }
  358. func (cc *ClientConn) putFrameScratchBuffer(buf []byte) {
  359. cc.mu.Lock()
  360. defer cc.mu.Unlock()
  361. const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
  362. if len(cc.freeBuf) < maxBufs {
  363. cc.freeBuf = append(cc.freeBuf, buf)
  364. return
  365. }
  366. for i, old := range cc.freeBuf {
  367. if old == nil {
  368. cc.freeBuf[i] = buf
  369. return
  370. }
  371. }
  372. // forget about it.
  373. }
  374. func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
  375. cc.mu.Lock()
  376. if cc.closed || !cc.canTakeNewRequestLocked() {
  377. cc.mu.Unlock()
  378. return nil, errClientConnUnusable
  379. }
  380. cs := cc.newStream()
  381. hasBody := req.Body != nil
  382. // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,}
  383. hdrs := cc.encodeHeaders(req)
  384. first := true // first frame written (HEADERS is first, then CONTINUATION)
  385. cc.wmu.Lock()
  386. frameSize := int(cc.maxFrameSize)
  387. for len(hdrs) > 0 && cc.werr == nil {
  388. chunk := hdrs
  389. if len(chunk) > frameSize {
  390. chunk = chunk[:frameSize]
  391. }
  392. hdrs = hdrs[len(chunk):]
  393. endHeaders := len(hdrs) == 0
  394. if first {
  395. cc.fr.WriteHeaders(HeadersFrameParam{
  396. StreamID: cs.ID,
  397. BlockFragment: chunk,
  398. EndStream: !hasBody,
  399. EndHeaders: endHeaders,
  400. })
  401. first = false
  402. } else {
  403. cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
  404. }
  405. }
  406. cc.bw.Flush()
  407. werr := cc.werr
  408. cc.wmu.Unlock()
  409. cc.mu.Unlock()
  410. if werr != nil {
  411. return nil, werr
  412. }
  413. var bodyCopyErrc chan error
  414. var gotResHeaders chan struct{} // closed on resheaders
  415. if hasBody {
  416. bodyCopyErrc = make(chan error, 1)
  417. gotResHeaders = make(chan struct{})
  418. go func() {
  419. bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders)
  420. }()
  421. }
  422. for {
  423. select {
  424. case re := <-cs.resc:
  425. if gotResHeaders != nil {
  426. close(gotResHeaders)
  427. }
  428. if re.err != nil {
  429. return nil, re.err
  430. }
  431. res := re.res
  432. res.Request = req
  433. res.TLS = cc.tlsState
  434. return res, nil
  435. case err := <-bodyCopyErrc:
  436. if err != nil {
  437. return nil, err
  438. }
  439. }
  440. }
  441. }
  442. var errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body")
  443. func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error {
  444. cc := cs.cc
  445. sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  446. buf := cc.frameScratchBuffer()
  447. defer cc.putFrameScratchBuffer(buf)
  448. for !sentEnd {
  449. var sawEOF bool
  450. n, err := io.ReadFull(body, buf)
  451. if err == io.ErrUnexpectedEOF {
  452. sawEOF = true
  453. err = nil
  454. } else if err == io.EOF {
  455. break
  456. } else if err != nil {
  457. return err
  458. }
  459. toWrite := buf[:n]
  460. for len(toWrite) > 0 && err == nil {
  461. var allowed int32
  462. allowed, err = cs.awaitFlowControl(int32(len(toWrite)))
  463. if err != nil {
  464. return err
  465. }
  466. cc.wmu.Lock()
  467. select {
  468. case <-gotResHeaders:
  469. err = errServerResponseBeforeRequestBody
  470. case <-cs.peerReset:
  471. err = cs.resetErr
  472. default:
  473. data := toWrite[:allowed]
  474. toWrite = toWrite[allowed:]
  475. sentEnd = sawEOF && len(toWrite) == 0
  476. err = cc.fr.WriteData(cs.ID, sentEnd, data)
  477. }
  478. cc.wmu.Unlock()
  479. }
  480. if err != nil {
  481. return err
  482. }
  483. }
  484. var err error
  485. cc.wmu.Lock()
  486. if !sentEnd {
  487. err = cc.fr.WriteData(cs.ID, true, nil)
  488. }
  489. if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  490. err = ferr
  491. }
  492. cc.wmu.Unlock()
  493. return err
  494. }
  495. // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  496. // control tokens from the server.
  497. // It returns either the non-zero number of tokens taken or an error
  498. // if the stream is dead.
  499. func (cs *clientStream) awaitFlowControl(maxBytes int32) (taken int32, err error) {
  500. cc := cs.cc
  501. cc.mu.Lock()
  502. defer cc.mu.Unlock()
  503. for {
  504. if cc.closed {
  505. return 0, errClientConnClosed
  506. }
  507. if err := cs.checkReset(); err != nil {
  508. return 0, err
  509. }
  510. if a := cs.flow.available(); a > 0 {
  511. take := a
  512. if take > maxBytes {
  513. take = maxBytes
  514. }
  515. if take > int32(cc.maxFrameSize) {
  516. take = int32(cc.maxFrameSize)
  517. }
  518. cs.flow.take(take)
  519. return take, nil
  520. }
  521. cc.cond.Wait()
  522. }
  523. }
  524. // requires cc.mu be held.
  525. func (cc *ClientConn) encodeHeaders(req *http.Request) []byte {
  526. cc.hbuf.Reset()
  527. // TODO(bradfitz): figure out :authority-vs-Host stuff between http2 and Go
  528. host := req.Host
  529. if host == "" {
  530. host = req.URL.Host
  531. }
  532. // 8.1.2.3 Request Pseudo-Header Fields
  533. // The :path pseudo-header field includes the path and query parts of the
  534. // target URI (the path-absolute production and optionally a '?' character
  535. // followed by the query production (see Sections 3.3 and 3.4 of
  536. // [RFC3986]).
  537. cc.writeHeader(":authority", host) // probably not right for all sites
  538. cc.writeHeader(":method", req.Method)
  539. cc.writeHeader(":path", req.URL.RequestURI())
  540. cc.writeHeader(":scheme", "https")
  541. for k, vv := range req.Header {
  542. lowKey := strings.ToLower(k)
  543. if lowKey == "host" {
  544. continue
  545. }
  546. for _, v := range vv {
  547. cc.writeHeader(lowKey, v)
  548. }
  549. }
  550. return cc.hbuf.Bytes()
  551. }
  552. func (cc *ClientConn) writeHeader(name, value string) {
  553. cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  554. }
  555. type resAndError struct {
  556. res *http.Response
  557. err error
  558. }
  559. // requires cc.mu be held.
  560. func (cc *ClientConn) newStream() *clientStream {
  561. cs := &clientStream{
  562. cc: cc,
  563. ID: cc.nextStreamID,
  564. resc: make(chan resAndError, 1),
  565. peerReset: make(chan struct{}),
  566. }
  567. cs.flow.add(int32(cc.initialWindowSize))
  568. cs.flow.setConnFlow(&cc.flow)
  569. cs.inflow.add(transportDefaultStreamFlow)
  570. cs.inflow.setConnFlow(&cc.inflow)
  571. cc.nextStreamID += 2
  572. cc.streams[cs.ID] = cs
  573. return cs
  574. }
  575. func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
  576. cc.mu.Lock()
  577. defer cc.mu.Unlock()
  578. cs := cc.streams[id]
  579. if andRemove {
  580. delete(cc.streams, id)
  581. }
  582. return cs
  583. }
  584. // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  585. type clientConnReadLoop struct {
  586. cc *ClientConn
  587. activeRes map[uint32]*clientStream // keyed by streamID
  588. // continueStreamID is the stream ID we're waiting for
  589. // continuation frames for.
  590. continueStreamID uint32
  591. hdec *hpack.Decoder
  592. // Fields reset on each HEADERS:
  593. nextRes *http.Response
  594. sawRegHeader bool // saw non-pseudo header
  595. reqMalformed error // non-nil once known to be malformed
  596. }
  597. // readLoop runs in its own goroutine and reads and dispatches frames.
  598. func (cc *ClientConn) readLoop() {
  599. rl := &clientConnReadLoop{
  600. cc: cc,
  601. activeRes: make(map[uint32]*clientStream),
  602. }
  603. // TODO: figure out henc size
  604. rl.hdec = hpack.NewDecoder(initialHeaderTableSize, rl.onNewHeaderField)
  605. defer rl.cleanup()
  606. cc.readerErr = rl.run()
  607. if ce, ok := cc.readerErr.(ConnectionError); ok {
  608. cc.wmu.Lock()
  609. cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  610. cc.wmu.Unlock()
  611. }
  612. }
  613. func (rl *clientConnReadLoop) cleanup() {
  614. cc := rl.cc
  615. defer cc.tconn.Close()
  616. defer cc.t.connPool().MarkDead(cc)
  617. defer close(cc.readerDone)
  618. // Close any response bodies if the server closes prematurely.
  619. // TODO: also do this if we've written the headers but not
  620. // gotten a response yet.
  621. err := cc.readerErr
  622. if err == io.EOF {
  623. err = io.ErrUnexpectedEOF
  624. }
  625. cc.mu.Lock()
  626. for _, cs := range rl.activeRes {
  627. cs.bufPipe.CloseWithError(err)
  628. }
  629. for _, cs := range cc.streams {
  630. select {
  631. case cs.resc <- resAndError{err: err}:
  632. default:
  633. }
  634. }
  635. cc.closed = true
  636. cc.cond.Broadcast()
  637. cc.mu.Unlock()
  638. }
  639. func (rl *clientConnReadLoop) run() error {
  640. cc := rl.cc
  641. for {
  642. f, err := cc.fr.ReadFrame()
  643. if se, ok := err.(StreamError); ok {
  644. // TODO: deal with stream errors from the framer.
  645. return se
  646. } else if err != nil {
  647. return err
  648. }
  649. cc.vlogf("Transport received %v: %#v", f.Header(), f)
  650. streamID := f.Header().StreamID
  651. _, isContinue := f.(*ContinuationFrame)
  652. if isContinue {
  653. if streamID != rl.continueStreamID {
  654. cc.logf("Protocol violation: got CONTINUATION with id %d; want %d", streamID, rl.continueStreamID)
  655. return ConnectionError(ErrCodeProtocol)
  656. }
  657. } else if rl.continueStreamID != 0 {
  658. // Continue frames need to be adjacent in the stream
  659. // and we were in the middle of headers.
  660. cc.logf("Protocol violation: got %T for stream %d, want CONTINUATION for %d", f, streamID, rl.continueStreamID)
  661. return ConnectionError(ErrCodeProtocol)
  662. }
  663. switch f := f.(type) {
  664. case *HeadersFrame:
  665. err = rl.processHeaders(f)
  666. case *ContinuationFrame:
  667. err = rl.processContinuation(f)
  668. case *DataFrame:
  669. err = rl.processData(f)
  670. case *GoAwayFrame:
  671. err = rl.processGoAway(f)
  672. case *RSTStreamFrame:
  673. err = rl.processResetStream(f)
  674. case *SettingsFrame:
  675. err = rl.processSettings(f)
  676. case *PushPromiseFrame:
  677. err = rl.processPushPromise(f)
  678. case *WindowUpdateFrame:
  679. err = rl.processWindowUpdate(f)
  680. case *PingFrame:
  681. err = rl.processPing(f)
  682. default:
  683. cc.logf("Transport: unhandled response frame type %T", f)
  684. }
  685. if err != nil {
  686. return err
  687. }
  688. }
  689. }
  690. func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame) error {
  691. rl.sawRegHeader = false
  692. rl.reqMalformed = nil
  693. rl.nextRes = &http.Response{
  694. Proto: "HTTP/2.0",
  695. ProtoMajor: 2,
  696. Header: make(http.Header),
  697. }
  698. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  699. }
  700. func (rl *clientConnReadLoop) processContinuation(f *ContinuationFrame) error {
  701. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  702. }
  703. func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID uint32, headersEnded, streamEnded bool) error {
  704. cc := rl.cc
  705. cs := cc.streamByID(streamID, streamEnded)
  706. if cs == nil {
  707. // We could return a ConnectionError(ErrCodeProtocol)
  708. // here, except that in the case of us canceling
  709. // client requests, we may also delete from the
  710. // streams map, in which case we forgot that we sent
  711. // this request. So, just ignore any responses for
  712. // now. They might've been in-flight before the
  713. // server got our RST_STREAM.
  714. return nil
  715. }
  716. _, err := rl.hdec.Write(frag)
  717. if err != nil {
  718. return err
  719. }
  720. if !headersEnded {
  721. rl.continueStreamID = cs.ID
  722. return nil
  723. }
  724. // HEADERS (or CONTINUATION) are now over.
  725. rl.continueStreamID = 0
  726. if rl.reqMalformed != nil {
  727. cs.resc <- resAndError{err: rl.reqMalformed}
  728. rl.cc.writeStreamReset(cs.ID, ErrCodeProtocol, rl.reqMalformed)
  729. return nil
  730. }
  731. res := rl.nextRes
  732. if streamEnded {
  733. res.Body = noBody
  734. } else {
  735. buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
  736. cs.bufPipe = pipe{b: buf}
  737. res.Body = transportResponseBody{cs}
  738. }
  739. rl.activeRes[cs.ID] = cs
  740. cs.resc <- resAndError{res: res}
  741. rl.nextRes = nil // unused now; will be reset next HEADERS frame
  742. return nil
  743. }
  744. // transportResponseBody is the concrete type of Transport.RoundTrip's
  745. // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
  746. // On Close it sends RST_STREAM if EOF wasn't already seen.
  747. type transportResponseBody struct {
  748. cs *clientStream
  749. }
  750. func (b transportResponseBody) Read(p []byte) (n int, err error) {
  751. n, err = b.cs.bufPipe.Read(p)
  752. if n == 0 {
  753. return
  754. }
  755. cs := b.cs
  756. cc := cs.cc
  757. cc.mu.Lock()
  758. defer cc.mu.Unlock()
  759. var connAdd, streamAdd int32
  760. // Check the conn-level first, before the stream-level.
  761. if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
  762. connAdd = transportDefaultConnFlow - v
  763. cc.inflow.add(connAdd)
  764. }
  765. if err == nil { // No need to refresh if the stream is over or failed.
  766. if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
  767. streamAdd = transportDefaultStreamFlow - v
  768. cs.inflow.add(streamAdd)
  769. }
  770. }
  771. if connAdd != 0 || streamAdd != 0 {
  772. cc.wmu.Lock()
  773. defer cc.wmu.Unlock()
  774. if connAdd != 0 {
  775. cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  776. }
  777. if streamAdd != 0 {
  778. cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  779. }
  780. cc.bw.Flush()
  781. }
  782. return
  783. }
  784. func (b transportResponseBody) Close() error {
  785. if b.cs.bufPipe.Err() != io.EOF {
  786. // TODO: write test for this
  787. b.cs.cc.writeStreamReset(b.cs.ID, ErrCodeCancel, nil)
  788. }
  789. return nil
  790. }
  791. func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  792. cc := rl.cc
  793. cs := cc.streamByID(f.StreamID, f.StreamEnded())
  794. if cs == nil {
  795. return nil
  796. }
  797. data := f.Data()
  798. if VerboseLogs {
  799. rl.cc.logf("DATA: %q", data)
  800. }
  801. // Check connection-level flow control.
  802. cc.mu.Lock()
  803. if cs.inflow.available() >= int32(len(data)) {
  804. cs.inflow.take(int32(len(data)))
  805. } else {
  806. cc.mu.Unlock()
  807. return ConnectionError(ErrCodeFlowControl)
  808. }
  809. cc.mu.Unlock()
  810. if _, err := cs.bufPipe.Write(data); err != nil {
  811. return err
  812. }
  813. if f.StreamEnded() {
  814. cs.bufPipe.CloseWithError(io.EOF)
  815. delete(rl.activeRes, cs.ID)
  816. }
  817. return nil
  818. }
  819. func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  820. cc := rl.cc
  821. cc.t.connPool().MarkDead(cc)
  822. if f.ErrCode != 0 {
  823. // TODO: deal with GOAWAY more. particularly the error code
  824. cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  825. }
  826. cc.setGoAway(f)
  827. return nil
  828. }
  829. func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  830. cc := rl.cc
  831. cc.mu.Lock()
  832. defer cc.mu.Unlock()
  833. return f.ForeachSetting(func(s Setting) error {
  834. switch s.ID {
  835. case SettingMaxFrameSize:
  836. cc.maxFrameSize = s.Val
  837. case SettingMaxConcurrentStreams:
  838. cc.maxConcurrentStreams = s.Val
  839. case SettingInitialWindowSize:
  840. // TODO: error if this is too large.
  841. // TODO: adjust flow control of still-open
  842. // frames by the difference of the old initial
  843. // window size and this one.
  844. cc.initialWindowSize = s.Val
  845. default:
  846. // TODO(bradfitz): handle more settings?
  847. cc.vlogf("Unhandled Setting: %v", s)
  848. }
  849. return nil
  850. })
  851. }
  852. func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  853. cc := rl.cc
  854. cs := cc.streamByID(f.StreamID, false)
  855. if f.StreamID != 0 && cs == nil {
  856. return nil
  857. }
  858. cc.mu.Lock()
  859. defer cc.mu.Unlock()
  860. fl := &cc.flow
  861. if cs != nil {
  862. fl = &cs.flow
  863. }
  864. if !fl.add(int32(f.Increment)) {
  865. return ConnectionError(ErrCodeFlowControl)
  866. }
  867. cc.cond.Broadcast()
  868. return nil
  869. }
  870. func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  871. cs := rl.cc.streamByID(f.StreamID, true)
  872. if cs == nil {
  873. // TODO: return error if server tries to RST_STEAM an idle stream
  874. return nil
  875. }
  876. select {
  877. case <-cs.peerReset:
  878. // Already reset.
  879. // This is the only goroutine
  880. // which closes this, so there
  881. // isn't a race.
  882. default:
  883. err := StreamError{cs.ID, f.ErrCode}
  884. cs.resetErr = err
  885. close(cs.peerReset)
  886. cs.bufPipe.CloseWithError(err)
  887. }
  888. delete(rl.activeRes, cs.ID)
  889. return nil
  890. }
  891. func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  892. if f.IsAck() {
  893. // 6.7 PING: " An endpoint MUST NOT respond to PING frames
  894. // containing this flag."
  895. return nil
  896. }
  897. cc := rl.cc
  898. cc.wmu.Lock()
  899. defer cc.wmu.Unlock()
  900. if err := cc.fr.WritePing(true, f.Data); err != nil {
  901. return err
  902. }
  903. return cc.bw.Flush()
  904. }
  905. func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  906. // We told the peer we don't want them.
  907. // Spec says:
  908. // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  909. // setting of the peer endpoint is set to 0. An endpoint that
  910. // has set this setting and has received acknowledgement MUST
  911. // treat the receipt of a PUSH_PROMISE frame as a connection
  912. // error (Section 5.4.1) of type PROTOCOL_ERROR."
  913. return ConnectionError(ErrCodeProtocol)
  914. }
  915. func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
  916. // TODO: do something with err? send it as a debug frame to the peer?
  917. // But that's only in GOAWAY. Invent a new frame type? Is there one already?
  918. cc.wmu.Lock()
  919. cc.fr.WriteRSTStream(streamID, code)
  920. cc.wmu.Unlock()
  921. }
  922. // onNewHeaderField runs on the readLoop goroutine whenever a new
  923. // hpack header field is decoded.
  924. func (rl *clientConnReadLoop) onNewHeaderField(f hpack.HeaderField) {
  925. cc := rl.cc
  926. if VerboseLogs {
  927. cc.logf("Header field: %+v", f)
  928. }
  929. isPseudo := strings.HasPrefix(f.Name, ":")
  930. if isPseudo {
  931. if rl.sawRegHeader {
  932. rl.reqMalformed = errors.New("http2: invalid pseudo header after regular header")
  933. return
  934. }
  935. switch f.Name {
  936. case ":status":
  937. code, err := strconv.Atoi(f.Value)
  938. if err != nil {
  939. rl.reqMalformed = errors.New("http2: invalid :status")
  940. return
  941. }
  942. rl.nextRes.Status = f.Value + " " + http.StatusText(code)
  943. rl.nextRes.StatusCode = code
  944. default:
  945. // "Endpoints MUST NOT generate pseudo-header
  946. // fields other than those defined in this
  947. // document."
  948. rl.reqMalformed = fmt.Errorf("http2: unknown response pseudo header %q", f.Name)
  949. }
  950. } else {
  951. rl.sawRegHeader = true
  952. rl.nextRes.Header.Add(http.CanonicalHeaderKey(f.Name), f.Value)
  953. }
  954. }
  955. func (cc *ClientConn) logf(format string, args ...interface{}) {
  956. cc.t.logf(format, args...)
  957. }
  958. func (cc *ClientConn) vlogf(format string, args ...interface{}) {
  959. cc.t.vlogf(format, args...)
  960. }
  961. func (t *Transport) vlogf(format string, args ...interface{}) {
  962. if VerboseLogs {
  963. t.logf(format, args...)
  964. }
  965. }
  966. func (t *Transport) logf(format string, args ...interface{}) {
  967. log.Printf(format, args...)
  968. }
  969. var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
  970. func strSliceContains(ss []string, s string) bool {
  971. for _, v := range ss {
  972. if v == s {
  973. return true
  974. }
  975. }
  976. return false
  977. }
  978. type erringRoundTripper struct{ err error }
  979. func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }