transport.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code.
  5. package http2
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/tls"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "io/ioutil"
  14. "log"
  15. "net"
  16. "net/http"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "golang.org/x/net/http2/hpack"
  21. )
  22. const (
  23. // transportDefaultConnFlow is how many connection-level flow control
  24. // tokens we give the server at start-up, past the default 64k.
  25. transportDefaultConnFlow = 1 << 30
  26. // transportDefaultStreamFlow is how many stream-level flow
  27. // control tokens we announce to the peer, and how many bytes
  28. // we buffer per stream.
  29. transportDefaultStreamFlow = 4 << 20
  30. // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
  31. // a stream-level WINDOW_UPDATE for at a time.
  32. transportDefaultStreamMinRefresh = 4 << 10
  33. )
  34. // Transport is an HTTP/2 Transport.
  35. //
  36. // A Transport internally caches connections to servers. It is safe
  37. // for concurrent use by multiple goroutines.
  38. type Transport struct {
  39. // DialTLS specifies an optional dial function for creating
  40. // TLS connections for requests.
  41. //
  42. // If DialTLS is nil, tls.Dial is used.
  43. //
  44. // If the returned net.Conn has a ConnectionState method like tls.Conn,
  45. // it will be used to set http.Response.TLS.
  46. DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
  47. // TLSClientConfig specifies the TLS configuration to use with
  48. // tls.Client. If nil, the default configuration is used.
  49. TLSClientConfig *tls.Config
  50. // TODO: switch to RWMutex
  51. // TODO: add support for sharing conns based on cert names
  52. // (e.g. share conn for googleapis.com and appspot.com)
  53. connMu sync.Mutex
  54. conns map[string][]*clientConn // key is host:port
  55. }
  56. // clientConn is the state of a single HTTP/2 client connection to an
  57. // HTTP/2 server.
  58. type clientConn struct {
  59. t *Transport
  60. tconn net.Conn
  61. tlsState *tls.ConnectionState
  62. connKey []string // key(s) this connection is cached in, in t.conns
  63. // readLoop goroutine fields:
  64. readerDone chan struct{} // closed on error
  65. readerErr error // set before readerDone is closed
  66. mu sync.Mutex // guards following
  67. cond *sync.Cond // hold mu; broadcast on flow/closed changes
  68. flow flow // our conn-level flow control quota (cs.flow is per stream)
  69. inflow flow // peer's conn-level flow control
  70. closed bool
  71. goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
  72. streams map[uint32]*clientStream // client-initiated
  73. nextStreamID uint32
  74. bw *bufio.Writer
  75. br *bufio.Reader
  76. fr *Framer
  77. // Settings from peer:
  78. maxFrameSize uint32
  79. maxConcurrentStreams uint32
  80. initialWindowSize uint32
  81. hbuf bytes.Buffer // HPACK encoder writes into this
  82. henc *hpack.Encoder
  83. freeBuf [][]byte
  84. wmu sync.Mutex // held while writing; acquire AFTER wmu if holding both
  85. werr error // first write error that has occurred
  86. }
  87. // clientStream is the state for a single HTTP/2 stream. One of these
  88. // is created for each Transport.RoundTrip call.
  89. type clientStream struct {
  90. cc *clientConn
  91. ID uint32
  92. resc chan resAndError
  93. bufPipe pipe // buffered pipe with the flow-controlled response payload
  94. flow flow // guarded by cc.mu
  95. inflow flow // guarded by cc.mu
  96. peerReset chan struct{} // closed on peer reset
  97. resetErr error // populated before peerReset is closed
  98. }
  99. // checkReset reports any error sent in a RST_STREAM frame by the
  100. // server.
  101. func (cs *clientStream) checkReset() error {
  102. select {
  103. case <-cs.peerReset:
  104. return cs.resetErr
  105. default:
  106. return nil
  107. }
  108. }
  109. type stickyErrWriter struct {
  110. w io.Writer
  111. err *error
  112. }
  113. func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
  114. if *sew.err != nil {
  115. return 0, *sew.err
  116. }
  117. n, err = sew.w.Write(p)
  118. *sew.err = err
  119. return
  120. }
  121. func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
  122. if req.URL.Scheme != "https" {
  123. return nil, errors.New("http2: unsupported scheme")
  124. }
  125. host, port, err := net.SplitHostPort(req.URL.Host)
  126. if err != nil {
  127. host = req.URL.Host
  128. port = "443"
  129. }
  130. for {
  131. cc, err := t.getClientConn(host, port)
  132. if err != nil {
  133. return nil, err
  134. }
  135. res, err := cc.roundTrip(req)
  136. if shouldRetryRequest(err) { // TODO: or clientconn is overloaded (too many outstanding requests)?
  137. continue
  138. }
  139. if err != nil {
  140. return nil, err
  141. }
  142. return res, nil
  143. }
  144. }
  145. // CloseIdleConnections closes any connections which were previously
  146. // connected from previous requests but are now sitting idle.
  147. // It does not interrupt any connections currently in use.
  148. func (t *Transport) CloseIdleConnections() {
  149. t.connMu.Lock()
  150. defer t.connMu.Unlock()
  151. for _, vv := range t.conns {
  152. for _, cc := range vv {
  153. cc.closeIfIdle()
  154. }
  155. }
  156. }
  157. var errClientConnClosed = errors.New("http2: client conn is closed")
  158. func shouldRetryRequest(err error) bool {
  159. // TODO: or GOAWAY graceful shutdown stuff
  160. return err == errClientConnClosed
  161. }
  162. func (t *Transport) removeClientConn(cc *clientConn) {
  163. t.connMu.Lock()
  164. defer t.connMu.Unlock()
  165. for _, key := range cc.connKey {
  166. vv, ok := t.conns[key]
  167. if !ok {
  168. continue
  169. }
  170. newList := filterOutClientConn(vv, cc)
  171. if len(newList) > 0 {
  172. t.conns[key] = newList
  173. } else {
  174. delete(t.conns, key)
  175. }
  176. }
  177. }
  178. func filterOutClientConn(in []*clientConn, exclude *clientConn) []*clientConn {
  179. out := in[:0]
  180. for _, v := range in {
  181. if v != exclude {
  182. out = append(out, v)
  183. }
  184. }
  185. // If we filtered it out, zero out the last item to prevent
  186. // the GC from seeing it.
  187. if len(in) != len(out) {
  188. in[len(in)-1] = nil
  189. }
  190. return out
  191. }
  192. // AddIdleConn adds c as an idle conn for Transport.
  193. // It assumes that c has not yet exchanged SETTINGS frames.
  194. // The addr maybe be either "host" or "host:port".
  195. func (t *Transport) AddIdleConn(addr string, c *tls.Conn) error {
  196. var key string
  197. _, _, err := net.SplitHostPort(addr)
  198. if err == nil {
  199. key = addr
  200. } else {
  201. key = addr + ":443"
  202. }
  203. cc, err := t.newClientConn(key, c)
  204. if err != nil {
  205. return err
  206. }
  207. t.addConn(key, cc)
  208. return nil
  209. }
  210. func (t *Transport) addConn(key string, cc *clientConn) {
  211. t.connMu.Lock()
  212. defer t.connMu.Unlock()
  213. if t.conns == nil {
  214. t.conns = make(map[string][]*clientConn)
  215. }
  216. t.conns[key] = append(t.conns[key], cc)
  217. }
  218. func (t *Transport) getClientConn(host, port string) (*clientConn, error) {
  219. key := net.JoinHostPort(host, port)
  220. t.connMu.Lock()
  221. for _, cc := range t.conns[key] {
  222. if cc.canTakeNewRequest() {
  223. t.connMu.Unlock()
  224. return cc, nil
  225. }
  226. }
  227. t.connMu.Unlock()
  228. // TODO(bradfitz): use a singleflight.Group to only lock once per 'key'.
  229. // Probably need to vendor it in as github.com/golang/sync/singleflight
  230. // though, since the net package already uses it? Also lines up with
  231. // sameer, bcmills, et al wanting to open source some sync stuff.
  232. cc, err := t.dialClientConn(host, port, key)
  233. if err != nil {
  234. return nil, err
  235. }
  236. t.addConn(key, cc)
  237. return cc, nil
  238. }
  239. func (t *Transport) dialClientConn(host, port, key string) (*clientConn, error) {
  240. tconn, err := t.dialTLS()("tcp", net.JoinHostPort(host, port), t.newTLSConfig(host))
  241. if err != nil {
  242. return nil, err
  243. }
  244. return t.newClientConn(key, tconn)
  245. }
  246. func (t *Transport) newTLSConfig(host string) *tls.Config {
  247. cfg := new(tls.Config)
  248. if t.TLSClientConfig != nil {
  249. *cfg = *t.TLSClientConfig
  250. }
  251. cfg.NextProtos = []string{NextProtoTLS} // TODO: don't override if already in list
  252. cfg.ServerName = host
  253. return cfg
  254. }
  255. func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) {
  256. if t.DialTLS != nil {
  257. return t.DialTLS
  258. }
  259. return t.dialTLSDefault
  260. }
  261. func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) {
  262. cn, err := tls.Dial(network, addr, cfg)
  263. if err != nil {
  264. return nil, err
  265. }
  266. if err := cn.Handshake(); err != nil {
  267. return nil, err
  268. }
  269. if !cfg.InsecureSkipVerify {
  270. if err := cn.VerifyHostname(cfg.ServerName); err != nil {
  271. return nil, err
  272. }
  273. }
  274. state := cn.ConnectionState()
  275. if p := state.NegotiatedProtocol; p != NextProtoTLS {
  276. return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
  277. }
  278. if !state.NegotiatedProtocolIsMutual {
  279. return nil, errors.New("http2: could not negotiate protocol mutually")
  280. }
  281. return cn, nil
  282. }
  283. func (t *Transport) newClientConn(key string, tconn net.Conn) (*clientConn, error) {
  284. if _, err := tconn.Write(clientPreface); err != nil {
  285. return nil, err
  286. }
  287. cc := &clientConn{
  288. t: t,
  289. tconn: tconn,
  290. connKey: []string{key}, // TODO: cert's validated hostnames too
  291. readerDone: make(chan struct{}),
  292. nextStreamID: 1,
  293. maxFrameSize: 16 << 10, // spec default
  294. initialWindowSize: 65535, // spec default
  295. maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
  296. streams: make(map[uint32]*clientStream),
  297. }
  298. cc.cond = sync.NewCond(&cc.mu)
  299. cc.flow.add(int32(initialWindowSize))
  300. // TODO: adjust this writer size to account for frame size +
  301. // MTU + crypto/tls record padding.
  302. cc.bw = bufio.NewWriter(stickyErrWriter{tconn, &cc.werr})
  303. cc.br = bufio.NewReader(tconn)
  304. cc.fr = NewFramer(cc.bw, cc.br)
  305. cc.henc = hpack.NewEncoder(&cc.hbuf)
  306. type connectionStater interface {
  307. ConnectionState() tls.ConnectionState
  308. }
  309. if cs, ok := tconn.(connectionStater); ok {
  310. state := cs.ConnectionState()
  311. cc.tlsState = &state
  312. }
  313. cc.fr.WriteSettings(
  314. Setting{ID: SettingEnablePush, Val: 0},
  315. Setting{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
  316. )
  317. cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
  318. cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
  319. cc.bw.Flush()
  320. if cc.werr != nil {
  321. return nil, cc.werr
  322. }
  323. // Read the obligatory SETTINGS frame
  324. f, err := cc.fr.ReadFrame()
  325. if err != nil {
  326. return nil, err
  327. }
  328. sf, ok := f.(*SettingsFrame)
  329. if !ok {
  330. return nil, fmt.Errorf("expected settings frame, got: %T", f)
  331. }
  332. cc.fr.WriteSettingsAck()
  333. cc.bw.Flush()
  334. sf.ForeachSetting(func(s Setting) error {
  335. switch s.ID {
  336. case SettingMaxFrameSize:
  337. cc.maxFrameSize = s.Val
  338. case SettingMaxConcurrentStreams:
  339. cc.maxConcurrentStreams = s.Val
  340. case SettingInitialWindowSize:
  341. cc.initialWindowSize = s.Val
  342. default:
  343. // TODO(bradfitz): handle more
  344. t.vlogf("Unhandled Setting: %v", s)
  345. }
  346. return nil
  347. })
  348. go cc.readLoop()
  349. return cc, nil
  350. }
  351. func (cc *clientConn) setGoAway(f *GoAwayFrame) {
  352. cc.mu.Lock()
  353. defer cc.mu.Unlock()
  354. cc.goAway = f
  355. }
  356. func (cc *clientConn) canTakeNewRequest() bool {
  357. cc.mu.Lock()
  358. defer cc.mu.Unlock()
  359. return cc.goAway == nil &&
  360. int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
  361. cc.nextStreamID < 2147483647
  362. }
  363. func (cc *clientConn) closeIfIdle() {
  364. cc.mu.Lock()
  365. if len(cc.streams) > 0 {
  366. cc.mu.Unlock()
  367. return
  368. }
  369. cc.closed = true
  370. // TODO: do clients send GOAWAY too? maybe? Just Close:
  371. cc.mu.Unlock()
  372. cc.tconn.Close()
  373. }
  374. const maxAllocFrameSize = 512 << 10
  375. // frameBuffer returns a scratch buffer suitable for writing DATA frames.
  376. // They're capped at the min of the peer's max frame size or 512KB
  377. // (kinda arbitrarily), but definitely capped so we don't allocate 4GB
  378. // bufers.
  379. func (cc *clientConn) frameScratchBuffer() []byte {
  380. cc.mu.Lock()
  381. size := cc.maxFrameSize
  382. if size > maxAllocFrameSize {
  383. size = maxAllocFrameSize
  384. }
  385. for i, buf := range cc.freeBuf {
  386. if len(buf) >= int(size) {
  387. cc.freeBuf[i] = nil
  388. cc.mu.Unlock()
  389. return buf[:size]
  390. }
  391. }
  392. cc.mu.Unlock()
  393. return make([]byte, size)
  394. }
  395. func (cc *clientConn) putFrameScratchBuffer(buf []byte) {
  396. cc.mu.Lock()
  397. defer cc.mu.Unlock()
  398. const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
  399. if len(cc.freeBuf) < maxBufs {
  400. cc.freeBuf = append(cc.freeBuf, buf)
  401. return
  402. }
  403. for i, old := range cc.freeBuf {
  404. if old == nil {
  405. cc.freeBuf[i] = buf
  406. return
  407. }
  408. }
  409. // forget about it.
  410. }
  411. func (cc *clientConn) roundTrip(req *http.Request) (*http.Response, error) {
  412. cc.mu.Lock()
  413. if cc.closed {
  414. cc.mu.Unlock()
  415. return nil, errClientConnClosed
  416. }
  417. cs := cc.newStream()
  418. hasBody := req.Body != nil
  419. // we send: HEADERS[+CONTINUATION] + (DATA?)
  420. hdrs := cc.encodeHeaders(req)
  421. first := true
  422. cc.wmu.Lock()
  423. frameSize := int(cc.maxFrameSize)
  424. for len(hdrs) > 0 && cc.werr == nil {
  425. chunk := hdrs
  426. if len(chunk) > frameSize {
  427. chunk = chunk[:frameSize]
  428. }
  429. hdrs = hdrs[len(chunk):]
  430. endHeaders := len(hdrs) == 0
  431. if first {
  432. cc.fr.WriteHeaders(HeadersFrameParam{
  433. StreamID: cs.ID,
  434. BlockFragment: chunk,
  435. EndStream: !hasBody,
  436. EndHeaders: endHeaders,
  437. })
  438. first = false
  439. } else {
  440. cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
  441. }
  442. }
  443. cc.bw.Flush()
  444. werr := cc.werr
  445. cc.wmu.Unlock()
  446. cc.mu.Unlock()
  447. if werr != nil {
  448. return nil, werr
  449. }
  450. var bodyCopyErrc chan error
  451. var gotResHeaders chan struct{} // closed on resheaders
  452. if hasBody {
  453. bodyCopyErrc = make(chan error, 1)
  454. gotResHeaders = make(chan struct{})
  455. go func() {
  456. bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders)
  457. }()
  458. }
  459. for {
  460. select {
  461. case re := <-cs.resc:
  462. if gotResHeaders != nil {
  463. close(gotResHeaders)
  464. }
  465. if re.err != nil {
  466. return nil, re.err
  467. }
  468. res := re.res
  469. res.Request = req
  470. res.TLS = cc.tlsState
  471. return res, nil
  472. case err := <-bodyCopyErrc:
  473. if err != nil {
  474. return nil, err
  475. }
  476. }
  477. }
  478. }
  479. var errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body")
  480. func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error {
  481. cc := cs.cc
  482. done := false
  483. for !done {
  484. buf := cc.frameScratchBuffer()
  485. n, err := io.ReadFull(body, buf)
  486. if err == io.ErrUnexpectedEOF {
  487. done = true
  488. } else if err != nil {
  489. return err
  490. }
  491. // Await for n flow control tokens.
  492. if err := cs.awaitFlowControl(int32(n)); err != nil {
  493. return err
  494. }
  495. cc.wmu.Lock()
  496. select {
  497. case <-gotResHeaders:
  498. err = errServerResponseBeforeRequestBody
  499. case <-cs.peerReset:
  500. err = cs.resetErr
  501. default:
  502. err = cc.fr.WriteData(cs.ID, done, buf[:n])
  503. }
  504. cc.wmu.Unlock()
  505. cc.putFrameScratchBuffer(buf)
  506. if err != nil {
  507. return err
  508. }
  509. }
  510. var err error
  511. cc.wmu.Lock()
  512. if !done {
  513. err = cc.fr.WriteData(cs.ID, true, nil)
  514. }
  515. if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  516. err = ferr
  517. }
  518. cc.wmu.Unlock()
  519. return err
  520. }
  521. func (cs *clientStream) awaitFlowControl(n int32) error {
  522. cc := cs.cc
  523. cc.mu.Lock()
  524. defer cc.mu.Unlock()
  525. for {
  526. if cc.closed {
  527. return errClientConnClosed
  528. }
  529. if err := cs.checkReset(); err != nil {
  530. return err
  531. }
  532. if cs.flow.available() >= n {
  533. cs.flow.take(n)
  534. return nil
  535. }
  536. cc.cond.Wait()
  537. }
  538. }
  539. // requires cc.mu be held.
  540. func (cc *clientConn) encodeHeaders(req *http.Request) []byte {
  541. cc.hbuf.Reset()
  542. // TODO(bradfitz): figure out :authority-vs-Host stuff between http2 and Go
  543. host := req.Host
  544. if host == "" {
  545. host = req.URL.Host
  546. }
  547. // 8.1.2.3 Request Pseudo-Header Fields
  548. // The :path pseudo-header field includes the path and query parts of the
  549. // target URI (the path-absolute production and optionally a '?' character
  550. // followed by the query production (see Sections 3.3 and 3.4 of
  551. // [RFC3986]).
  552. cc.writeHeader(":authority", host) // probably not right for all sites
  553. cc.writeHeader(":method", req.Method)
  554. cc.writeHeader(":path", req.URL.RequestURI())
  555. cc.writeHeader(":scheme", "https")
  556. for k, vv := range req.Header {
  557. lowKey := strings.ToLower(k)
  558. if lowKey == "host" {
  559. continue
  560. }
  561. for _, v := range vv {
  562. cc.writeHeader(lowKey, v)
  563. }
  564. }
  565. return cc.hbuf.Bytes()
  566. }
  567. func (cc *clientConn) writeHeader(name, value string) {
  568. cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  569. }
  570. type resAndError struct {
  571. res *http.Response
  572. err error
  573. }
  574. // requires cc.mu be held.
  575. func (cc *clientConn) newStream() *clientStream {
  576. cs := &clientStream{
  577. cc: cc,
  578. ID: cc.nextStreamID,
  579. resc: make(chan resAndError, 1),
  580. peerReset: make(chan struct{}),
  581. }
  582. cs.flow.add(int32(cc.initialWindowSize))
  583. cs.flow.setConnFlow(&cc.flow)
  584. cs.inflow.add(transportDefaultStreamFlow)
  585. cs.inflow.setConnFlow(&cc.inflow)
  586. cc.nextStreamID += 2
  587. cc.streams[cs.ID] = cs
  588. return cs
  589. }
  590. func (cc *clientConn) streamByID(id uint32, andRemove bool) *clientStream {
  591. cc.mu.Lock()
  592. defer cc.mu.Unlock()
  593. cs := cc.streams[id]
  594. if andRemove {
  595. delete(cc.streams, id)
  596. }
  597. return cs
  598. }
  599. // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  600. type clientConnReadLoop struct {
  601. cc *clientConn
  602. activeRes map[uint32]*clientStream // keyed by streamID
  603. // continueStreamID is the stream ID we're waiting for
  604. // continuation frames for.
  605. continueStreamID uint32
  606. hdec *hpack.Decoder
  607. // Fields reset on each HEADERS:
  608. nextRes *http.Response
  609. sawRegHeader bool // saw non-pseudo header
  610. reqMalformed error // non-nil once known to be malformed
  611. }
  612. // readLoop runs in its own goroutine and reads and dispatches frames.
  613. func (cc *clientConn) readLoop() {
  614. rl := &clientConnReadLoop{
  615. cc: cc,
  616. activeRes: make(map[uint32]*clientStream),
  617. }
  618. // TODO: figure out henc size
  619. rl.hdec = hpack.NewDecoder(initialHeaderTableSize, rl.onNewHeaderField)
  620. defer rl.cleanup()
  621. cc.readerErr = rl.run()
  622. if ce, ok := cc.readerErr.(ConnectionError); ok {
  623. cc.wmu.Lock()
  624. cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  625. cc.wmu.Unlock()
  626. }
  627. }
  628. func (rl *clientConnReadLoop) cleanup() {
  629. cc := rl.cc
  630. defer cc.tconn.Close()
  631. defer cc.t.removeClientConn(cc)
  632. defer close(cc.readerDone)
  633. // Close any response bodies if the server closes prematurely.
  634. // TODO: also do this if we've written the headers but not
  635. // gotten a response yet.
  636. err := cc.readerErr
  637. if err == io.EOF {
  638. err = io.ErrUnexpectedEOF
  639. }
  640. cc.mu.Lock()
  641. for _, cs := range rl.activeRes {
  642. cs.bufPipe.CloseWithError(err)
  643. }
  644. for _, cs := range cc.streams {
  645. select {
  646. case cs.resc <- resAndError{err: err}:
  647. default:
  648. }
  649. }
  650. cc.closed = true
  651. cc.cond.Broadcast()
  652. cc.mu.Unlock()
  653. }
  654. func (rl *clientConnReadLoop) run() error {
  655. cc := rl.cc
  656. for {
  657. f, err := cc.fr.ReadFrame()
  658. if se, ok := err.(StreamError); ok {
  659. // TODO: deal with stream errors from the framer.
  660. return se
  661. } else if err != nil {
  662. return err
  663. }
  664. cc.vlogf("Transport received %v: %#v", f.Header(), f)
  665. streamID := f.Header().StreamID
  666. _, isContinue := f.(*ContinuationFrame)
  667. if isContinue {
  668. if streamID != rl.continueStreamID {
  669. cc.logf("Protocol violation: got CONTINUATION with id %d; want %d", streamID, rl.continueStreamID)
  670. return ConnectionError(ErrCodeProtocol)
  671. }
  672. } else if rl.continueStreamID != 0 {
  673. // Continue frames need to be adjacent in the stream
  674. // and we were in the middle of headers.
  675. cc.logf("Protocol violation: got %T for stream %d, want CONTINUATION for %d", f, streamID, rl.continueStreamID)
  676. return ConnectionError(ErrCodeProtocol)
  677. }
  678. switch f := f.(type) {
  679. case *HeadersFrame:
  680. err = rl.processHeaders(f)
  681. case *ContinuationFrame:
  682. err = rl.processContinuation(f)
  683. case *DataFrame:
  684. err = rl.processData(f)
  685. case *GoAwayFrame:
  686. err = rl.processGoAway(f)
  687. case *RSTStreamFrame:
  688. err = rl.processResetStream(f)
  689. case *SettingsFrame:
  690. err = rl.processSettings(f)
  691. case *PushPromiseFrame:
  692. err = rl.processPushPromise(f)
  693. case *WindowUpdateFrame:
  694. err = rl.processWindowUpdate(f)
  695. default:
  696. cc.logf("Transport: unhandled response frame type %T", f)
  697. }
  698. if err != nil {
  699. return err
  700. }
  701. }
  702. }
  703. func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame) error {
  704. rl.sawRegHeader = false
  705. rl.reqMalformed = nil
  706. rl.nextRes = &http.Response{
  707. Proto: "HTTP/2.0",
  708. ProtoMajor: 2,
  709. Header: make(http.Header),
  710. }
  711. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  712. }
  713. func (rl *clientConnReadLoop) processContinuation(f *ContinuationFrame) error {
  714. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  715. }
  716. func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID uint32, headersEnded, streamEnded bool) error {
  717. cc := rl.cc
  718. cs := cc.streamByID(streamID, streamEnded)
  719. if cs == nil {
  720. // We could return a ConnectionError(ErrCodeProtocol)
  721. // here, except that in the case of us canceling
  722. // client requests, we may also delete from the
  723. // streams map, in which case we forgot that we sent
  724. // this request. So, just ignore any responses for
  725. // now. They might've been in-flight before the
  726. // server got our RST_STREAM.
  727. return nil
  728. }
  729. _, err := rl.hdec.Write(frag)
  730. if err != nil {
  731. return err
  732. }
  733. if !headersEnded {
  734. rl.continueStreamID = cs.ID
  735. return nil
  736. }
  737. // HEADERS (or CONTINUATION) are now over.
  738. rl.continueStreamID = 0
  739. if rl.reqMalformed != nil {
  740. cs.resc <- resAndError{err: rl.reqMalformed}
  741. rl.cc.writeStreamReset(cs.ID, ErrCodeProtocol, rl.reqMalformed)
  742. return nil
  743. }
  744. res := rl.nextRes
  745. if streamEnded {
  746. res.Body = noBody
  747. } else {
  748. buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
  749. cs.bufPipe = pipe{b: buf}
  750. res.Body = transportResponseBody{cs}
  751. }
  752. rl.activeRes[cs.ID] = cs
  753. cs.resc <- resAndError{res: res}
  754. rl.nextRes = nil // unused now; will be reset next HEADERS frame
  755. return nil
  756. }
  757. // transportResponseBody is the concrete type of Transport.RoundTrip's
  758. // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
  759. // On Close it sends RST_STREAM if EOF wasn't already seen.
  760. type transportResponseBody struct {
  761. cs *clientStream
  762. }
  763. func (b transportResponseBody) Read(p []byte) (n int, err error) {
  764. n, err = b.cs.bufPipe.Read(p)
  765. if n == 0 {
  766. return
  767. }
  768. cs := b.cs
  769. cc := cs.cc
  770. cc.mu.Lock()
  771. defer cc.mu.Unlock()
  772. var connAdd, streamAdd int32
  773. // Check the conn-level first, before the stream-level.
  774. if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
  775. connAdd = transportDefaultConnFlow - v
  776. cc.inflow.add(connAdd)
  777. }
  778. if err == nil { // No need to refresh if the stream is over or failed.
  779. if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
  780. streamAdd = transportDefaultStreamFlow - v
  781. cs.inflow.add(streamAdd)
  782. }
  783. }
  784. if connAdd != 0 || streamAdd != 0 {
  785. cc.wmu.Lock()
  786. defer cc.wmu.Unlock()
  787. if connAdd != 0 {
  788. cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  789. }
  790. if streamAdd != 0 {
  791. cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  792. }
  793. cc.bw.Flush()
  794. }
  795. return
  796. }
  797. func (b transportResponseBody) Close() error {
  798. if b.cs.bufPipe.Err() != io.EOF {
  799. // TODO: write test for this
  800. b.cs.cc.writeStreamReset(b.cs.ID, ErrCodeCancel, nil)
  801. }
  802. return nil
  803. }
  804. func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  805. cc := rl.cc
  806. cs := cc.streamByID(f.StreamID, f.StreamEnded())
  807. if cs == nil {
  808. return nil
  809. }
  810. data := f.Data()
  811. if VerboseLogs {
  812. rl.cc.logf("DATA: %q", data)
  813. }
  814. // Check connection-level flow control.
  815. cc.mu.Lock()
  816. if cs.inflow.available() >= int32(len(data)) {
  817. cs.inflow.take(int32(len(data)))
  818. } else {
  819. cc.mu.Unlock()
  820. return ConnectionError(ErrCodeFlowControl)
  821. }
  822. cc.mu.Unlock()
  823. if _, err := cs.bufPipe.Write(data); err != nil {
  824. return err
  825. }
  826. if f.StreamEnded() {
  827. cs.bufPipe.CloseWithError(io.EOF)
  828. delete(rl.activeRes, cs.ID)
  829. }
  830. return nil
  831. }
  832. func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  833. cc := rl.cc
  834. cc.t.removeClientConn(cc)
  835. if f.ErrCode != 0 {
  836. // TODO: deal with GOAWAY more. particularly the error code
  837. cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  838. }
  839. cc.setGoAway(f)
  840. return nil
  841. }
  842. func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  843. cc := rl.cc
  844. cc.mu.Lock()
  845. defer cc.mu.Unlock()
  846. return f.ForeachSetting(func(s Setting) error {
  847. switch s.ID {
  848. case SettingMaxFrameSize:
  849. cc.maxFrameSize = s.Val
  850. case SettingMaxConcurrentStreams:
  851. cc.maxConcurrentStreams = s.Val
  852. case SettingInitialWindowSize:
  853. // TODO: error if this is too large.
  854. // TODO: adjust flow control of still-open
  855. // frames by the difference of the old initial
  856. // window size and this one.
  857. cc.initialWindowSize = s.Val
  858. default:
  859. // TODO(bradfitz): handle more settings?
  860. cc.vlogf("Unhandled Setting: %v", s)
  861. }
  862. return nil
  863. })
  864. }
  865. func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  866. cc := rl.cc
  867. cs := cc.streamByID(f.StreamID, false)
  868. if f.StreamID != 0 && cs == nil {
  869. return nil
  870. }
  871. cc.mu.Lock()
  872. defer cc.mu.Unlock()
  873. fl := &cc.flow
  874. if cs != nil {
  875. fl = &cs.flow
  876. }
  877. if !fl.add(int32(f.Increment)) {
  878. return ConnectionError(ErrCodeFlowControl)
  879. }
  880. cc.cond.Broadcast()
  881. return nil
  882. }
  883. func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  884. cs := rl.cc.streamByID(f.StreamID, true)
  885. if cs == nil {
  886. // TODO: return error if server tries to RST_STEAM an idle stream
  887. return nil
  888. }
  889. select {
  890. case <-cs.peerReset:
  891. // Already reset.
  892. // This is the only goroutine
  893. // which closes this, so there
  894. // isn't a race.
  895. default:
  896. err := StreamError{cs.ID, f.ErrCode}
  897. cs.resetErr = err
  898. close(cs.peerReset)
  899. cs.bufPipe.CloseWithError(err)
  900. }
  901. delete(rl.activeRes, cs.ID)
  902. return nil
  903. }
  904. func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  905. // We told the peer we don't want them.
  906. // Spec says:
  907. // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  908. // setting of the peer endpoint is set to 0. An endpoint that
  909. // has set this setting and has received acknowledgement MUST
  910. // treat the receipt of a PUSH_PROMISE frame as a connection
  911. // error (Section 5.4.1) of type PROTOCOL_ERROR."
  912. return ConnectionError(ErrCodeProtocol)
  913. }
  914. func (cc *clientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
  915. // TODO: do something with err? send it as a debug frame to the peer?
  916. // But that's only in GOAWAY. Invent a new frame type? Is there one already?
  917. cc.wmu.Lock()
  918. cc.fr.WriteRSTStream(streamID, code)
  919. cc.wmu.Unlock()
  920. }
  921. // onNewHeaderField runs on the readLoop goroutine whenever a new
  922. // hpack header field is decoded.
  923. func (rl *clientConnReadLoop) onNewHeaderField(f hpack.HeaderField) {
  924. cc := rl.cc
  925. if VerboseLogs {
  926. cc.logf("Header field: %+v", f)
  927. }
  928. isPseudo := strings.HasPrefix(f.Name, ":")
  929. if isPseudo {
  930. if rl.sawRegHeader {
  931. rl.reqMalformed = errors.New("http2: invalid pseudo header after regular header")
  932. return
  933. }
  934. switch f.Name {
  935. case ":status":
  936. code, err := strconv.Atoi(f.Value)
  937. if err != nil {
  938. rl.reqMalformed = errors.New("http2: invalid :status")
  939. return
  940. }
  941. rl.nextRes.Status = f.Value + " " + http.StatusText(code)
  942. rl.nextRes.StatusCode = code
  943. default:
  944. // "Endpoints MUST NOT generate pseudo-header
  945. // fields other than those defined in this
  946. // document."
  947. rl.reqMalformed = fmt.Errorf("http2: unknown response pseudo header %q", f.Name)
  948. }
  949. } else {
  950. rl.sawRegHeader = true
  951. rl.nextRes.Header.Add(http.CanonicalHeaderKey(f.Name), f.Value)
  952. }
  953. }
  954. func (cc *clientConn) logf(format string, args ...interface{}) {
  955. cc.t.logf(format, args...)
  956. }
  957. func (cc *clientConn) vlogf(format string, args ...interface{}) {
  958. cc.t.vlogf(format, args...)
  959. }
  960. func (t *Transport) vlogf(format string, args ...interface{}) {
  961. if VerboseLogs {
  962. t.logf(format, args...)
  963. }
  964. }
  965. func (t *Transport) logf(format string, args ...interface{}) {
  966. log.Printf(format, args...)
  967. }
  968. var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))