transport.go 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code.
  5. package http2
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/tls"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "io/ioutil"
  14. "log"
  15. "net"
  16. "net/http"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "golang.org/x/net/http2/hpack"
  21. )
  22. const (
  23. // transportDefaultConnFlow is how many connection-level flow control
  24. // tokens we give the server at start-up, past the default 64k.
  25. transportDefaultConnFlow = 1 << 30
  26. // transportDefaultStreamFlow is how many stream-level flow
  27. // control tokens we announce to the peer, and how many bytes
  28. // we buffer per stream.
  29. transportDefaultStreamFlow = 4 << 20
  30. // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
  31. // a stream-level WINDOW_UPDATE for at a time.
  32. transportDefaultStreamMinRefresh = 4 << 10
  33. )
  34. // Transport is an HTTP/2 Transport.
  35. //
  36. // A Transport internally caches connections to servers. It is safe
  37. // for concurrent use by multiple goroutines.
  38. type Transport struct {
  39. // TODO: remove this and make more general with a TLS dial hook, like http
  40. InsecureTLSDial bool
  41. // TODO: switch to RWMutex
  42. // TODO: add support for sharing conns based on cert names
  43. // (e.g. share conn for googleapis.com and appspot.com)
  44. connMu sync.Mutex
  45. conns map[string][]*clientConn // key is host:port
  46. }
  47. // clientConn is the state of a single HTTP/2 client connection to an
  48. // HTTP/2 server.
  49. type clientConn struct {
  50. t *Transport
  51. tconn *tls.Conn
  52. tlsState *tls.ConnectionState
  53. connKey []string // key(s) this connection is cached in, in t.conns
  54. // readLoop goroutine fields:
  55. readerDone chan struct{} // closed on error
  56. readerErr error // set before readerDone is closed
  57. mu sync.Mutex // guards following
  58. cond *sync.Cond // hold mu; broadcast on flow/closed changes
  59. flow flow // our conn-level flow control quota (cs.flow is per stream)
  60. inflow flow // peer's conn-level flow control
  61. closed bool
  62. goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
  63. streams map[uint32]*clientStream // client-initiated
  64. nextStreamID uint32
  65. bw *bufio.Writer
  66. br *bufio.Reader
  67. fr *Framer
  68. // Settings from peer:
  69. maxFrameSize uint32
  70. maxConcurrentStreams uint32
  71. initialWindowSize uint32
  72. hbuf bytes.Buffer // HPACK encoder writes into this
  73. henc *hpack.Encoder
  74. freeBuf [][]byte
  75. wmu sync.Mutex // held while writing; acquire AFTER wmu if holding both
  76. werr error // first write error that has occurred
  77. }
  78. // clientStream is the state for a single HTTP/2 stream. One of these
  79. // is created for each Transport.RoundTrip call.
  80. type clientStream struct {
  81. cc *clientConn
  82. ID uint32
  83. resc chan resAndError
  84. bufPipe pipe // buffered pipe with the flow-controlled response payload
  85. flow flow // guarded by cc.mu
  86. inflow flow // guarded by cc.mu
  87. peerReset chan struct{} // closed on peer reset
  88. resetErr error // populated before peerReset is closed
  89. }
  90. // checkReset reports any error sent in a RST_STREAM frame by the
  91. // server.
  92. func (cs *clientStream) checkReset() error {
  93. select {
  94. case <-cs.peerReset:
  95. return cs.resetErr
  96. default:
  97. return nil
  98. }
  99. }
  100. type stickyErrWriter struct {
  101. w io.Writer
  102. err *error
  103. }
  104. func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
  105. if *sew.err != nil {
  106. return 0, *sew.err
  107. }
  108. n, err = sew.w.Write(p)
  109. *sew.err = err
  110. return
  111. }
  112. func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
  113. if req.URL.Scheme != "https" {
  114. return nil, errors.New("http2: unsupported scheme")
  115. }
  116. host, port, err := net.SplitHostPort(req.URL.Host)
  117. if err != nil {
  118. host = req.URL.Host
  119. port = "443"
  120. }
  121. for {
  122. cc, err := t.getClientConn(host, port)
  123. if err != nil {
  124. return nil, err
  125. }
  126. res, err := cc.roundTrip(req)
  127. if shouldRetryRequest(err) { // TODO: or clientconn is overloaded (too many outstanding requests)?
  128. continue
  129. }
  130. if err != nil {
  131. return nil, err
  132. }
  133. return res, nil
  134. }
  135. }
  136. // CloseIdleConnections closes any connections which were previously
  137. // connected from previous requests but are now sitting idle.
  138. // It does not interrupt any connections currently in use.
  139. func (t *Transport) CloseIdleConnections() {
  140. t.connMu.Lock()
  141. defer t.connMu.Unlock()
  142. for _, vv := range t.conns {
  143. for _, cc := range vv {
  144. cc.closeIfIdle()
  145. }
  146. }
  147. }
  148. var errClientConnClosed = errors.New("http2: client conn is closed")
  149. func shouldRetryRequest(err error) bool {
  150. // TODO: or GOAWAY graceful shutdown stuff
  151. return err == errClientConnClosed
  152. }
  153. func (t *Transport) removeClientConn(cc *clientConn) {
  154. t.connMu.Lock()
  155. defer t.connMu.Unlock()
  156. for _, key := range cc.connKey {
  157. vv, ok := t.conns[key]
  158. if !ok {
  159. continue
  160. }
  161. newList := filterOutClientConn(vv, cc)
  162. if len(newList) > 0 {
  163. t.conns[key] = newList
  164. } else {
  165. delete(t.conns, key)
  166. }
  167. }
  168. }
  169. func filterOutClientConn(in []*clientConn, exclude *clientConn) []*clientConn {
  170. out := in[:0]
  171. for _, v := range in {
  172. if v != exclude {
  173. out = append(out, v)
  174. }
  175. }
  176. // If we filtered it out, zero out the last item to prevent
  177. // the GC from seeing it.
  178. if len(in) != len(out) {
  179. in[len(in)-1] = nil
  180. }
  181. return out
  182. }
  183. // AddIdleConn adds c as an idle conn for Transport.
  184. // It assumes that c has not yet exchanged SETTINGS frames.
  185. // The addr maybe be either "host" or "host:port".
  186. func (t *Transport) AddIdleConn(addr string, c *tls.Conn) error {
  187. var key string
  188. host, _, err := net.SplitHostPort(addr)
  189. if err == nil {
  190. key = addr
  191. } else {
  192. host = addr
  193. key = addr + ":443"
  194. }
  195. cc, err := t.newClientConn(host, key, c)
  196. if err != nil {
  197. return err
  198. }
  199. t.addConn(key, cc)
  200. return nil
  201. }
  202. func (t *Transport) addConn(key string, cc *clientConn) {
  203. t.connMu.Lock()
  204. defer t.connMu.Unlock()
  205. if t.conns == nil {
  206. t.conns = make(map[string][]*clientConn)
  207. }
  208. t.conns[key] = append(t.conns[key], cc)
  209. }
  210. func (t *Transport) getClientConn(host, port string) (*clientConn, error) {
  211. key := net.JoinHostPort(host, port)
  212. t.connMu.Lock()
  213. for _, cc := range t.conns[key] {
  214. if cc.canTakeNewRequest() {
  215. t.connMu.Unlock()
  216. return cc, nil
  217. }
  218. }
  219. t.connMu.Unlock()
  220. // TODO(bradfitz): use a singleflight.Group to only lock once per 'key'.
  221. // Probably need to vendor it in as github.com/golang/sync/singleflight
  222. // though, since the net package already uses it? Also lines up with
  223. // sameer, bcmills, et al wanting to open source some sync stuff.
  224. cc, err := t.dialClientConn(host, port, key)
  225. if err != nil {
  226. return nil, err
  227. }
  228. t.addConn(key, cc)
  229. return cc, nil
  230. }
  231. func (t *Transport) dialClientConn(host, port, key string) (*clientConn, error) {
  232. cfg := &tls.Config{
  233. ServerName: host,
  234. NextProtos: []string{NextProtoTLS},
  235. InsecureSkipVerify: t.InsecureTLSDial,
  236. }
  237. tconn, err := tls.Dial("tcp", net.JoinHostPort(host, port), cfg)
  238. if err != nil {
  239. return nil, err
  240. }
  241. return t.newClientConn(host, key, tconn)
  242. }
  243. func (t *Transport) newClientConn(host, key string, tconn *tls.Conn) (*clientConn, error) {
  244. if err := tconn.Handshake(); err != nil {
  245. return nil, err
  246. }
  247. if !t.InsecureTLSDial {
  248. if err := tconn.VerifyHostname(host); err != nil {
  249. return nil, err
  250. }
  251. }
  252. state := tconn.ConnectionState()
  253. if p := state.NegotiatedProtocol; p != NextProtoTLS {
  254. return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
  255. }
  256. if !state.NegotiatedProtocolIsMutual {
  257. return nil, errors.New("http2: could not negotiate protocol mutually")
  258. }
  259. if _, err := tconn.Write(clientPreface); err != nil {
  260. return nil, err
  261. }
  262. cc := &clientConn{
  263. t: t,
  264. tconn: tconn,
  265. connKey: []string{key}, // TODO: cert's validated hostnames too
  266. tlsState: &state,
  267. readerDone: make(chan struct{}),
  268. nextStreamID: 1,
  269. maxFrameSize: 16 << 10, // spec default
  270. initialWindowSize: 65535, // spec default
  271. maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
  272. streams: make(map[uint32]*clientStream),
  273. }
  274. cc.cond = sync.NewCond(&cc.mu)
  275. cc.flow.add(int32(initialWindowSize))
  276. // TODO: adjust this writer size to account for frame size +
  277. // MTU + crypto/tls record padding.
  278. cc.bw = bufio.NewWriter(stickyErrWriter{tconn, &cc.werr})
  279. cc.br = bufio.NewReader(tconn)
  280. cc.fr = NewFramer(cc.bw, cc.br)
  281. cc.henc = hpack.NewEncoder(&cc.hbuf)
  282. cc.fr.WriteSettings(
  283. Setting{ID: SettingEnablePush, Val: 0},
  284. Setting{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
  285. )
  286. cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
  287. cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
  288. cc.bw.Flush()
  289. if cc.werr != nil {
  290. return nil, cc.werr
  291. }
  292. // Read the obligatory SETTINGS frame
  293. f, err := cc.fr.ReadFrame()
  294. if err != nil {
  295. return nil, err
  296. }
  297. sf, ok := f.(*SettingsFrame)
  298. if !ok {
  299. return nil, fmt.Errorf("expected settings frame, got: %T", f)
  300. }
  301. cc.fr.WriteSettingsAck()
  302. cc.bw.Flush()
  303. sf.ForeachSetting(func(s Setting) error {
  304. switch s.ID {
  305. case SettingMaxFrameSize:
  306. cc.maxFrameSize = s.Val
  307. case SettingMaxConcurrentStreams:
  308. cc.maxConcurrentStreams = s.Val
  309. case SettingInitialWindowSize:
  310. cc.initialWindowSize = s.Val
  311. default:
  312. // TODO(bradfitz): handle more
  313. t.vlogf("Unhandled Setting: %v", s)
  314. }
  315. return nil
  316. })
  317. go cc.readLoop()
  318. return cc, nil
  319. }
  320. func (cc *clientConn) setGoAway(f *GoAwayFrame) {
  321. cc.mu.Lock()
  322. defer cc.mu.Unlock()
  323. cc.goAway = f
  324. }
  325. func (cc *clientConn) canTakeNewRequest() bool {
  326. cc.mu.Lock()
  327. defer cc.mu.Unlock()
  328. return cc.goAway == nil &&
  329. int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
  330. cc.nextStreamID < 2147483647
  331. }
  332. func (cc *clientConn) closeIfIdle() {
  333. cc.mu.Lock()
  334. if len(cc.streams) > 0 {
  335. cc.mu.Unlock()
  336. return
  337. }
  338. cc.closed = true
  339. // TODO: do clients send GOAWAY too? maybe? Just Close:
  340. cc.mu.Unlock()
  341. cc.tconn.Close()
  342. }
  343. const maxAllocFrameSize = 512 << 10
  344. // frameBuffer returns a scratch buffer suitable for writing DATA frames.
  345. // They're capped at the min of the peer's max frame size or 512KB
  346. // (kinda arbitrarily), but definitely capped so we don't allocate 4GB
  347. // bufers.
  348. func (cc *clientConn) frameScratchBuffer() []byte {
  349. cc.mu.Lock()
  350. size := cc.maxFrameSize
  351. if size > maxAllocFrameSize {
  352. size = maxAllocFrameSize
  353. }
  354. for i, buf := range cc.freeBuf {
  355. if len(buf) >= int(size) {
  356. cc.freeBuf[i] = nil
  357. cc.mu.Unlock()
  358. return buf[:size]
  359. }
  360. }
  361. cc.mu.Unlock()
  362. return make([]byte, size)
  363. }
  364. func (cc *clientConn) putFrameScratchBuffer(buf []byte) {
  365. cc.mu.Lock()
  366. defer cc.mu.Unlock()
  367. const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
  368. if len(cc.freeBuf) < maxBufs {
  369. cc.freeBuf = append(cc.freeBuf, buf)
  370. return
  371. }
  372. for i, old := range cc.freeBuf {
  373. if old == nil {
  374. cc.freeBuf[i] = buf
  375. return
  376. }
  377. }
  378. // forget about it.
  379. }
  380. func (cc *clientConn) roundTrip(req *http.Request) (*http.Response, error) {
  381. cc.mu.Lock()
  382. if cc.closed {
  383. cc.mu.Unlock()
  384. return nil, errClientConnClosed
  385. }
  386. cs := cc.newStream()
  387. hasBody := req.Body != nil
  388. // we send: HEADERS[+CONTINUATION] + (DATA?)
  389. hdrs := cc.encodeHeaders(req)
  390. first := true
  391. cc.wmu.Lock()
  392. frameSize := int(cc.maxFrameSize)
  393. for len(hdrs) > 0 && cc.werr == nil {
  394. chunk := hdrs
  395. if len(chunk) > frameSize {
  396. chunk = chunk[:frameSize]
  397. }
  398. hdrs = hdrs[len(chunk):]
  399. endHeaders := len(hdrs) == 0
  400. if first {
  401. cc.fr.WriteHeaders(HeadersFrameParam{
  402. StreamID: cs.ID,
  403. BlockFragment: chunk,
  404. EndStream: !hasBody,
  405. EndHeaders: endHeaders,
  406. })
  407. first = false
  408. } else {
  409. cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
  410. }
  411. }
  412. cc.bw.Flush()
  413. werr := cc.werr
  414. cc.wmu.Unlock()
  415. cc.mu.Unlock()
  416. if werr != nil {
  417. return nil, werr
  418. }
  419. var bodyCopyErrc chan error
  420. var gotResHeaders chan struct{} // closed on resheaders
  421. if hasBody {
  422. bodyCopyErrc = make(chan error, 1)
  423. gotResHeaders = make(chan struct{})
  424. go func() {
  425. bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders)
  426. }()
  427. }
  428. for {
  429. select {
  430. case re := <-cs.resc:
  431. if gotResHeaders != nil {
  432. close(gotResHeaders)
  433. }
  434. if re.err != nil {
  435. return nil, re.err
  436. }
  437. res := re.res
  438. res.Request = req
  439. res.TLS = cc.tlsState
  440. return res, nil
  441. case err := <-bodyCopyErrc:
  442. if err != nil {
  443. return nil, err
  444. }
  445. }
  446. }
  447. }
  448. var errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body")
  449. func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error {
  450. cc := cs.cc
  451. done := false
  452. for !done {
  453. buf := cc.frameScratchBuffer()
  454. n, err := io.ReadFull(body, buf)
  455. if err == io.ErrUnexpectedEOF {
  456. done = true
  457. } else if err != nil {
  458. return err
  459. }
  460. // Await for n flow control tokens.
  461. if err := cs.awaitFlowControl(int32(n)); err != nil {
  462. return err
  463. }
  464. cc.wmu.Lock()
  465. select {
  466. case <-gotResHeaders:
  467. err = errServerResponseBeforeRequestBody
  468. case <-cs.peerReset:
  469. err = cs.resetErr
  470. default:
  471. err = cc.fr.WriteData(cs.ID, done, buf[:n])
  472. }
  473. cc.wmu.Unlock()
  474. cc.putFrameScratchBuffer(buf)
  475. if err != nil {
  476. return err
  477. }
  478. }
  479. var err error
  480. cc.wmu.Lock()
  481. if !done {
  482. err = cc.fr.WriteData(cs.ID, true, nil)
  483. }
  484. if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  485. err = ferr
  486. }
  487. cc.wmu.Unlock()
  488. return err
  489. }
  490. func (cs *clientStream) awaitFlowControl(n int32) error {
  491. cc := cs.cc
  492. cc.mu.Lock()
  493. defer cc.mu.Unlock()
  494. for {
  495. if cc.closed {
  496. return errClientConnClosed
  497. }
  498. if err := cs.checkReset(); err != nil {
  499. return err
  500. }
  501. if cs.flow.available() >= n {
  502. cs.flow.take(n)
  503. return nil
  504. }
  505. cc.cond.Wait()
  506. }
  507. }
  508. // requires cc.mu be held.
  509. func (cc *clientConn) encodeHeaders(req *http.Request) []byte {
  510. cc.hbuf.Reset()
  511. // TODO(bradfitz): figure out :authority-vs-Host stuff between http2 and Go
  512. host := req.Host
  513. if host == "" {
  514. host = req.URL.Host
  515. }
  516. path := req.URL.Path
  517. if path == "" {
  518. path = "/"
  519. }
  520. cc.writeHeader(":authority", host) // probably not right for all sites
  521. cc.writeHeader(":method", req.Method)
  522. cc.writeHeader(":path", path)
  523. cc.writeHeader(":scheme", "https")
  524. for k, vv := range req.Header {
  525. lowKey := strings.ToLower(k)
  526. if lowKey == "host" {
  527. continue
  528. }
  529. for _, v := range vv {
  530. cc.writeHeader(lowKey, v)
  531. }
  532. }
  533. return cc.hbuf.Bytes()
  534. }
  535. func (cc *clientConn) writeHeader(name, value string) {
  536. cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  537. }
  538. type resAndError struct {
  539. res *http.Response
  540. err error
  541. }
  542. // requires cc.mu be held.
  543. func (cc *clientConn) newStream() *clientStream {
  544. cs := &clientStream{
  545. cc: cc,
  546. ID: cc.nextStreamID,
  547. resc: make(chan resAndError, 1),
  548. peerReset: make(chan struct{}),
  549. }
  550. cs.flow.add(int32(cc.initialWindowSize))
  551. cs.flow.setConnFlow(&cc.flow)
  552. cs.inflow.add(transportDefaultStreamFlow)
  553. cs.inflow.setConnFlow(&cc.inflow)
  554. cc.nextStreamID += 2
  555. cc.streams[cs.ID] = cs
  556. return cs
  557. }
  558. func (cc *clientConn) streamByID(id uint32, andRemove bool) *clientStream {
  559. cc.mu.Lock()
  560. defer cc.mu.Unlock()
  561. cs := cc.streams[id]
  562. if andRemove {
  563. delete(cc.streams, id)
  564. }
  565. return cs
  566. }
  567. // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  568. type clientConnReadLoop struct {
  569. cc *clientConn
  570. activeRes map[uint32]*clientStream // keyed by streamID
  571. // continueStreamID is the stream ID we're waiting for
  572. // continuation frames for.
  573. continueStreamID uint32
  574. hdec *hpack.Decoder
  575. // Fields reset on each HEADERS:
  576. nextRes *http.Response
  577. sawRegHeader bool // saw non-pseudo header
  578. reqMalformed error // non-nil once known to be malformed
  579. }
  580. // readLoop runs in its own goroutine and reads and dispatches frames.
  581. func (cc *clientConn) readLoop() {
  582. rl := &clientConnReadLoop{
  583. cc: cc,
  584. activeRes: make(map[uint32]*clientStream),
  585. }
  586. // TODO: figure out henc size
  587. rl.hdec = hpack.NewDecoder(initialHeaderTableSize, rl.onNewHeaderField)
  588. defer rl.cleanup()
  589. cc.readerErr = rl.run()
  590. if ce, ok := cc.readerErr.(ConnectionError); ok {
  591. cc.wmu.Lock()
  592. cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  593. cc.wmu.Unlock()
  594. }
  595. }
  596. func (rl *clientConnReadLoop) cleanup() {
  597. cc := rl.cc
  598. defer cc.tconn.Close()
  599. defer cc.t.removeClientConn(cc)
  600. defer close(cc.readerDone)
  601. // Close any response bodies if the server closes prematurely.
  602. // TODO: also do this if we've written the headers but not
  603. // gotten a response yet.
  604. err := cc.readerErr
  605. if err == io.EOF {
  606. err = io.ErrUnexpectedEOF
  607. }
  608. cc.mu.Lock()
  609. for _, cs := range rl.activeRes {
  610. cs.bufPipe.CloseWithError(err)
  611. }
  612. for _, cs := range cc.streams {
  613. select {
  614. case cs.resc <- resAndError{err: err}:
  615. default:
  616. }
  617. }
  618. cc.closed = true
  619. cc.cond.Broadcast()
  620. cc.mu.Unlock()
  621. }
  622. func (rl *clientConnReadLoop) run() error {
  623. cc := rl.cc
  624. for {
  625. f, err := cc.fr.ReadFrame()
  626. if se, ok := err.(StreamError); ok {
  627. // TODO: deal with stream errors from the framer.
  628. return se
  629. } else if err != nil {
  630. return err
  631. }
  632. cc.vlogf("Transport received %v: %#v", f.Header(), f)
  633. streamID := f.Header().StreamID
  634. _, isContinue := f.(*ContinuationFrame)
  635. if isContinue {
  636. if streamID != rl.continueStreamID {
  637. cc.logf("Protocol violation: got CONTINUATION with id %d; want %d", streamID, rl.continueStreamID)
  638. return ConnectionError(ErrCodeProtocol)
  639. }
  640. } else if rl.continueStreamID != 0 {
  641. // Continue frames need to be adjacent in the stream
  642. // and we were in the middle of headers.
  643. cc.logf("Protocol violation: got %T for stream %d, want CONTINUATION for %d", f, streamID, rl.continueStreamID)
  644. return ConnectionError(ErrCodeProtocol)
  645. }
  646. switch f := f.(type) {
  647. case *HeadersFrame:
  648. err = rl.processHeaders(f)
  649. case *ContinuationFrame:
  650. err = rl.processContinuation(f)
  651. case *DataFrame:
  652. err = rl.processData(f)
  653. case *GoAwayFrame:
  654. err = rl.processGoAway(f)
  655. case *RSTStreamFrame:
  656. err = rl.processResetStream(f)
  657. case *SettingsFrame:
  658. err = rl.processSettings(f)
  659. case *PushPromiseFrame:
  660. err = rl.processPushPromise(f)
  661. case *WindowUpdateFrame:
  662. err = rl.processWindowUpdate(f)
  663. default:
  664. cc.logf("Transport: unhandled response frame type %T", f)
  665. }
  666. if err != nil {
  667. return err
  668. }
  669. }
  670. }
  671. func (rl *clientConnReadLoop) processHeaders(f *HeadersFrame) error {
  672. rl.sawRegHeader = false
  673. rl.reqMalformed = nil
  674. rl.nextRes = &http.Response{
  675. Proto: "HTTP/2.0",
  676. ProtoMajor: 2,
  677. Header: make(http.Header),
  678. }
  679. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  680. }
  681. func (rl *clientConnReadLoop) processContinuation(f *ContinuationFrame) error {
  682. return rl.processHeaderBlockFragment(f.HeaderBlockFragment(), f.StreamID, f.HeadersEnded(), f.StreamEnded())
  683. }
  684. func (rl *clientConnReadLoop) processHeaderBlockFragment(frag []byte, streamID uint32, headersEnded, streamEnded bool) error {
  685. cc := rl.cc
  686. cs := cc.streamByID(streamID, streamEnded)
  687. if cs == nil {
  688. // We could return a ConnectionError(ErrCodeProtocol)
  689. // here, except that in the case of us canceling
  690. // client requests, we may also delete from the
  691. // streams map, in which case we forgot that we sent
  692. // this request. So, just ignore any responses for
  693. // now. They might've been in-flight before the
  694. // server got our RST_STREAM.
  695. return nil
  696. }
  697. _, err := rl.hdec.Write(frag)
  698. if err != nil {
  699. return err
  700. }
  701. if !headersEnded {
  702. rl.continueStreamID = cs.ID
  703. return nil
  704. }
  705. // HEADERS (or CONTINUATION) are now over.
  706. rl.continueStreamID = 0
  707. if rl.reqMalformed != nil {
  708. cs.resc <- resAndError{err: rl.reqMalformed}
  709. rl.cc.writeStreamReset(cs.ID, ErrCodeProtocol, rl.reqMalformed)
  710. return nil
  711. }
  712. res := rl.nextRes
  713. if streamEnded {
  714. res.Body = noBody
  715. } else {
  716. buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
  717. cs.bufPipe = pipe{b: buf}
  718. res.Body = transportResponseBody{cs}
  719. }
  720. rl.activeRes[cs.ID] = cs
  721. cs.resc <- resAndError{res: res}
  722. rl.nextRes = nil // unused now; will be reset next HEADERS frame
  723. return nil
  724. }
  725. // transportResponseBody is the concrete type of Transport.RoundTrip's
  726. // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
  727. // On Close it sends RST_STREAM if EOF wasn't already seen.
  728. type transportResponseBody struct {
  729. cs *clientStream
  730. }
  731. func (b transportResponseBody) Read(p []byte) (n int, err error) {
  732. n, err = b.cs.bufPipe.Read(p)
  733. if n == 0 {
  734. return
  735. }
  736. cs := b.cs
  737. cc := cs.cc
  738. cc.mu.Lock()
  739. defer cc.mu.Unlock()
  740. var connAdd, streamAdd int32
  741. // Check the conn-level first, before the stream-level.
  742. if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
  743. connAdd = transportDefaultConnFlow - v
  744. cc.inflow.add(connAdd)
  745. }
  746. if err == nil { // No need to refresh if the stream is over or failed.
  747. if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
  748. streamAdd = transportDefaultStreamFlow - v
  749. cs.inflow.add(streamAdd)
  750. }
  751. }
  752. if connAdd != 0 || streamAdd != 0 {
  753. cc.wmu.Lock()
  754. defer cc.wmu.Unlock()
  755. if connAdd != 0 {
  756. cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  757. }
  758. if streamAdd != 0 {
  759. cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  760. }
  761. cc.bw.Flush()
  762. }
  763. return
  764. }
  765. func (b transportResponseBody) Close() error {
  766. if b.cs.bufPipe.Err() != io.EOF {
  767. // TODO: write test for this
  768. b.cs.cc.writeStreamReset(b.cs.ID, ErrCodeCancel, nil)
  769. }
  770. return nil
  771. }
  772. func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  773. cc := rl.cc
  774. cs := cc.streamByID(f.StreamID, f.StreamEnded())
  775. if cs == nil {
  776. return nil
  777. }
  778. data := f.Data()
  779. if VerboseLogs {
  780. rl.cc.logf("DATA: %q", data)
  781. }
  782. // Check connection-level flow control.
  783. cc.mu.Lock()
  784. if cs.inflow.available() >= int32(len(data)) {
  785. cs.inflow.take(int32(len(data)))
  786. } else {
  787. cc.mu.Unlock()
  788. return ConnectionError(ErrCodeFlowControl)
  789. }
  790. cc.mu.Unlock()
  791. if _, err := cs.bufPipe.Write(data); err != nil {
  792. return err
  793. }
  794. if f.StreamEnded() {
  795. cs.bufPipe.CloseWithError(io.EOF)
  796. delete(rl.activeRes, cs.ID)
  797. }
  798. return nil
  799. }
  800. func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  801. cc := rl.cc
  802. cc.t.removeClientConn(cc)
  803. if f.ErrCode != 0 {
  804. // TODO: deal with GOAWAY more. particularly the error code
  805. cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  806. }
  807. cc.setGoAway(f)
  808. return nil
  809. }
  810. func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  811. cc := rl.cc
  812. cc.mu.Lock()
  813. defer cc.mu.Unlock()
  814. return f.ForeachSetting(func(s Setting) error {
  815. switch s.ID {
  816. case SettingMaxFrameSize:
  817. cc.maxFrameSize = s.Val
  818. case SettingMaxConcurrentStreams:
  819. cc.maxConcurrentStreams = s.Val
  820. case SettingInitialWindowSize:
  821. // TODO: error if this is too large.
  822. // TODO: adjust flow control of still-open
  823. // frames by the difference of the old initial
  824. // window size and this one.
  825. cc.initialWindowSize = s.Val
  826. default:
  827. // TODO(bradfitz): handle more settings?
  828. cc.vlogf("Unhandled Setting: %v", s)
  829. }
  830. return nil
  831. })
  832. }
  833. func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  834. cc := rl.cc
  835. cs := cc.streamByID(f.StreamID, false)
  836. if f.StreamID != 0 && cs == nil {
  837. return nil
  838. }
  839. cc.mu.Lock()
  840. defer cc.mu.Unlock()
  841. fl := &cc.flow
  842. if cs != nil {
  843. fl = &cs.flow
  844. }
  845. if !fl.add(int32(f.Increment)) {
  846. return ConnectionError(ErrCodeFlowControl)
  847. }
  848. cc.cond.Broadcast()
  849. return nil
  850. }
  851. func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  852. cs := rl.cc.streamByID(f.StreamID, true)
  853. if cs == nil {
  854. // TODO: return error if server tries to RST_STEAM an idle stream
  855. return nil
  856. }
  857. select {
  858. case <-cs.peerReset:
  859. // Already reset.
  860. // This is the only goroutine
  861. // which closes this, so there
  862. // isn't a race.
  863. default:
  864. err := StreamError{cs.ID, f.ErrCode}
  865. cs.resetErr = err
  866. close(cs.peerReset)
  867. cs.bufPipe.CloseWithError(err)
  868. }
  869. delete(rl.activeRes, cs.ID)
  870. return nil
  871. }
  872. func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  873. // We told the peer we don't want them.
  874. // Spec says:
  875. // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  876. // setting of the peer endpoint is set to 0. An endpoint that
  877. // has set this setting and has received acknowledgement MUST
  878. // treat the receipt of a PUSH_PROMISE frame as a connection
  879. // error (Section 5.4.1) of type PROTOCOL_ERROR."
  880. return ConnectionError(ErrCodeProtocol)
  881. }
  882. func (cc *clientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
  883. // TODO: do something with err? send it as a debug frame to the peer?
  884. // But that's only in GOAWAY. Invent a new frame type? Is there one already?
  885. cc.wmu.Lock()
  886. cc.fr.WriteRSTStream(streamID, code)
  887. cc.wmu.Unlock()
  888. }
  889. // onNewHeaderField runs on the readLoop goroutine whenever a new
  890. // hpack header field is decoded.
  891. func (rl *clientConnReadLoop) onNewHeaderField(f hpack.HeaderField) {
  892. cc := rl.cc
  893. if VerboseLogs {
  894. cc.logf("Header field: %+v", f)
  895. }
  896. isPseudo := strings.HasPrefix(f.Name, ":")
  897. if isPseudo {
  898. if rl.sawRegHeader {
  899. rl.reqMalformed = errors.New("http2: invalid pseudo header after regular header")
  900. return
  901. }
  902. switch f.Name {
  903. case ":status":
  904. code, err := strconv.Atoi(f.Value)
  905. if err != nil {
  906. rl.reqMalformed = errors.New("http2: invalid :status")
  907. return
  908. }
  909. rl.nextRes.Status = f.Value + " " + http.StatusText(code)
  910. rl.nextRes.StatusCode = code
  911. default:
  912. // "Endpoints MUST NOT generate pseudo-header
  913. // fields other than those defined in this
  914. // document."
  915. rl.reqMalformed = fmt.Errorf("http2: unknown response pseudo header %q", f.Name)
  916. }
  917. } else {
  918. rl.sawRegHeader = true
  919. rl.nextRes.Header.Add(http.CanonicalHeaderKey(f.Name), f.Value)
  920. }
  921. }
  922. func (cc *clientConn) logf(format string, args ...interface{}) {
  923. cc.t.logf(format, args...)
  924. }
  925. func (cc *clientConn) vlogf(format string, args ...interface{}) {
  926. cc.t.vlogf(format, args...)
  927. }
  928. func (t *Transport) vlogf(format string, args ...interface{}) {
  929. if VerboseLogs {
  930. t.logf(format, args...)
  931. }
  932. }
  933. func (t *Transport) logf(format string, args ...interface{}) {
  934. log.Printf(format, args...)
  935. }
  936. var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))