server.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. import (
  9. "bytes"
  10. "crypto/tls"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log"
  15. "net"
  16. "net/http"
  17. "net/url"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "github.com/bradfitz/http2/hpack"
  22. )
  23. // TODO: finish GOAWAY support. Consider each incoming frame type and whether
  24. // it should be ignored during a shutdown race.
  25. // Server is an HTTP/2 server.
  26. type Server struct {
  27. // MaxStreams optionally ...
  28. MaxStreams int
  29. }
  30. var testHookOnConn func() // for testing
  31. // ConfigureServer adds HTTP/2 support to a net/http Server.
  32. //
  33. // The configuration conf may be nil.
  34. //
  35. // ConfigureServer must be called before s begins serving.
  36. func ConfigureServer(s *http.Server, conf *Server) {
  37. if conf == nil {
  38. conf = new(Server)
  39. }
  40. if s.TLSConfig == nil {
  41. s.TLSConfig = new(tls.Config)
  42. }
  43. haveNPN := false
  44. for _, p := range s.TLSConfig.NextProtos {
  45. if p == npnProto {
  46. haveNPN = true
  47. break
  48. }
  49. }
  50. if !haveNPN {
  51. s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, npnProto)
  52. }
  53. if s.TLSNextProto == nil {
  54. s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
  55. }
  56. s.TLSNextProto[npnProto] = func(hs *http.Server, c *tls.Conn, h http.Handler) {
  57. if testHookOnConn != nil {
  58. testHookOnConn()
  59. }
  60. conf.handleConn(hs, c, h)
  61. }
  62. }
  63. func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
  64. sc := &serverConn{
  65. hs: hs,
  66. conn: c,
  67. handler: h,
  68. framer: NewFramer(c, c), // TODO: write to a (custom?) buffered writer that can alternate when it's in buffered mode.
  69. streams: make(map[uint32]*stream),
  70. canonHeader: make(map[string]string),
  71. readFrameCh: make(chan frameAndProcessed),
  72. readFrameErrCh: make(chan error, 1),
  73. writeHeaderCh: make(chan headerWriteReq), // must not be buffered
  74. windowUpdateCh: make(chan windowUpdateReq, 8),
  75. flow: newFlow(initialWindowSize),
  76. doneServing: make(chan struct{}),
  77. maxWriteFrameSize: initialMaxFrameSize,
  78. initialWindowSize: initialWindowSize,
  79. serveG: newGoroutineLock(),
  80. }
  81. sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
  82. sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
  83. sc.serve()
  84. }
  85. // frameAndProcessed coordinates the readFrames and serve goroutines, since
  86. // the Framer interface only permits the most recently-read Frame from being
  87. // accessed. The serve goroutine sends on processed to signal to the readFrames
  88. // goroutine that another frame may be read.
  89. type frameAndProcessed struct {
  90. f Frame
  91. processed chan struct{}
  92. }
  93. type serverConn struct {
  94. // Immutable:
  95. hs *http.Server
  96. conn net.Conn
  97. handler http.Handler
  98. framer *Framer
  99. hpackDecoder *hpack.Decoder
  100. hpackEncoder *hpack.Encoder
  101. doneServing chan struct{} // closed when serverConn.serve ends
  102. readFrameCh chan frameAndProcessed // written by serverConn.readFrames
  103. readFrameErrCh chan error
  104. writeHeaderCh chan headerWriteReq // must not be buffered
  105. windowUpdateCh chan windowUpdateReq
  106. serveG goroutineLock // used to verify funcs are on serve()
  107. flow *flow // the connection-wide one
  108. // Everything following is owned by the serve loop; use serveG.check()
  109. maxStreamID uint32 // max ever seen
  110. streams map[uint32]*stream
  111. maxWriteFrameSize uint32 // TODO: update this when settings come in
  112. initialWindowSize int32
  113. canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
  114. sentGoAway bool
  115. req requestParam // non-zero while reading request headers
  116. headerWriteBuf bytes.Buffer // used to write response headers
  117. }
  118. // requestParam is the state of the next request, initialized over
  119. // potentially several frames HEADERS + zero or more CONTINUATION
  120. // frames.
  121. type requestParam struct {
  122. // stream is non-nil if we're reading (HEADER or CONTINUATION)
  123. // frames for a request (but not DATA).
  124. stream *stream
  125. header http.Header
  126. method, path string
  127. scheme, authority string
  128. sawRegularHeader bool // saw a non-pseudo header already
  129. invalidHeader bool // an invalid header was seen
  130. }
  131. type stream struct {
  132. id uint32
  133. state streamState // owned by serverConn's processing loop
  134. flow *flow // limits writing from Handler to client
  135. body *pipe // non-nil if expecting DATA frames
  136. bodyBytes int64 // body bytes seen so far
  137. declBodyBytes int64 // or -1 if undeclared
  138. }
  139. func (sc *serverConn) state(streamID uint32) streamState {
  140. sc.serveG.check()
  141. // http://http2.github.io/http2-spec/#rfc.section.5.1
  142. if st, ok := sc.streams[streamID]; ok {
  143. return st.state
  144. }
  145. // "The first use of a new stream identifier implicitly closes all
  146. // streams in the "idle" state that might have been initiated by
  147. // that peer with a lower-valued stream identifier. For example, if
  148. // a client sends a HEADERS frame on stream 7 without ever sending a
  149. // frame on stream 5, then stream 5 transitions to the "closed"
  150. // state when the first frame for stream 7 is sent or received."
  151. if streamID <= sc.maxStreamID {
  152. return stateClosed
  153. }
  154. return stateIdle
  155. }
  156. func (sc *serverConn) vlogf(format string, args ...interface{}) {
  157. if VerboseLogs {
  158. sc.logf(format, args...)
  159. }
  160. }
  161. func (sc *serverConn) logf(format string, args ...interface{}) {
  162. if lg := sc.hs.ErrorLog; lg != nil {
  163. lg.Printf(format, args...)
  164. } else {
  165. log.Printf(format, args...)
  166. }
  167. }
  168. func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
  169. if err == nil {
  170. return
  171. }
  172. str := err.Error()
  173. if strings.Contains(str, "use of closed network connection") {
  174. // Boring, expected errors.
  175. sc.vlogf(format, args...)
  176. } else {
  177. sc.logf(format, args...)
  178. }
  179. }
  180. func (sc *serverConn) onNewHeaderField(f hpack.HeaderField) {
  181. sc.serveG.check()
  182. switch {
  183. case !validHeader(f.Name):
  184. sc.req.invalidHeader = true
  185. case strings.HasPrefix(f.Name, ":"):
  186. if sc.req.sawRegularHeader {
  187. sc.logf("pseudo-header after regular header")
  188. sc.req.invalidHeader = true
  189. return
  190. }
  191. var dst *string
  192. switch f.Name {
  193. case ":method":
  194. dst = &sc.req.method
  195. case ":path":
  196. dst = &sc.req.path
  197. case ":scheme":
  198. dst = &sc.req.scheme
  199. case ":authority":
  200. dst = &sc.req.authority
  201. default:
  202. // 8.1.2.1 Pseudo-Header Fields
  203. // "Endpoints MUST treat a request or response
  204. // that contains undefined or invalid
  205. // pseudo-header fields as malformed (Section
  206. // 8.1.2.6)."
  207. sc.logf("invalid pseudo-header %q", f.Name)
  208. sc.req.invalidHeader = true
  209. return
  210. }
  211. if *dst != "" {
  212. sc.logf("duplicate pseudo-header %q sent", f.Name)
  213. sc.req.invalidHeader = true
  214. return
  215. }
  216. *dst = f.Value
  217. case f.Name == "cookie":
  218. sc.req.sawRegularHeader = true
  219. if s, ok := sc.req.header["Cookie"]; ok && len(s) == 1 {
  220. s[0] = s[0] + "; " + f.Value
  221. } else {
  222. sc.req.header.Add("Cookie", f.Value)
  223. }
  224. default:
  225. sc.req.sawRegularHeader = true
  226. sc.req.header.Add(sc.canonicalHeader(f.Name), f.Value)
  227. }
  228. }
  229. func (sc *serverConn) canonicalHeader(v string) string {
  230. sc.serveG.check()
  231. // TODO: use a sync.Pool instead of putting the cache on *serverConn?
  232. cv, ok := sc.canonHeader[v]
  233. if !ok {
  234. cv = http.CanonicalHeaderKey(v)
  235. sc.canonHeader[v] = cv
  236. }
  237. return cv
  238. }
  239. // readFrames is the loop that reads incoming frames.
  240. // It's run on its own goroutine.
  241. func (sc *serverConn) readFrames() {
  242. processed := make(chan struct{}, 1)
  243. for {
  244. f, err := sc.framer.ReadFrame()
  245. if err != nil {
  246. close(sc.readFrameCh)
  247. sc.readFrameErrCh <- err
  248. return
  249. }
  250. sc.readFrameCh <- frameAndProcessed{f, processed}
  251. <-processed
  252. }
  253. }
  254. func (sc *serverConn) serve() {
  255. sc.serveG.check()
  256. defer sc.conn.Close()
  257. defer close(sc.doneServing)
  258. sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
  259. // Read the client preface
  260. buf := make([]byte, len(ClientPreface))
  261. // TODO: timeout reading from the client
  262. if _, err := io.ReadFull(sc.conn, buf); err != nil {
  263. sc.logf("error reading client preface: %v", err)
  264. return
  265. }
  266. if !bytes.Equal(buf, clientPreface) {
  267. sc.logf("bogus greeting from client: %q", buf)
  268. return
  269. }
  270. sc.vlogf("client %v said hello", sc.conn.RemoteAddr())
  271. f, err := sc.framer.ReadFrame()
  272. if err != nil {
  273. sc.logf("error reading initial frame from client: %v", err)
  274. return
  275. }
  276. sf, ok := f.(*SettingsFrame)
  277. if !ok {
  278. sc.logf("invalid initial frame type %T received from client", f)
  279. return
  280. }
  281. if err := sf.ForeachSetting(sc.processSetting); err != nil {
  282. sc.logf("initial settings error: %v", err)
  283. return
  284. }
  285. // TODO: don't send two network packets for our SETTINGS + our
  286. // ACK of their settings. But if we make framer write to a
  287. // *bufio.Writer, that increases the per-connection memory
  288. // overhead, and there could be many idle conns. So maybe some
  289. // liveswitchWriter-like thing where we only switch to a
  290. // *bufio Writer when we really need one temporarily, else go
  291. // back to an unbuffered writes by default.
  292. if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil {
  293. sc.logf("error writing server's initial settings: %v", err)
  294. return
  295. }
  296. if err := sc.framer.WriteSettingsAck(); err != nil {
  297. sc.logf("error writing server's ack of client's settings: %v", err)
  298. return
  299. }
  300. go sc.readFrames()
  301. for {
  302. select {
  303. case hr := <-sc.writeHeaderCh:
  304. if err := sc.writeHeaderInLoop(hr); err != nil {
  305. sc.condlogf(err, "error writing response header: %v", err)
  306. return
  307. }
  308. case wu := <-sc.windowUpdateCh:
  309. if err := sc.sendWindowUpdateInLoop(wu); err != nil {
  310. sc.condlogf(err, "error writing window update: %v", err)
  311. return
  312. }
  313. case fp, ok := <-sc.readFrameCh:
  314. if !ok {
  315. err := <-sc.readFrameErrCh
  316. if err != io.EOF {
  317. errstr := err.Error()
  318. if !strings.Contains(errstr, "use of closed network connection") {
  319. sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
  320. }
  321. }
  322. return
  323. }
  324. f := fp.f
  325. sc.vlogf("got %v: %#v", f.Header(), f)
  326. err := sc.processFrame(f)
  327. fp.processed <- struct{}{} // let readFrames proceed
  328. switch ev := err.(type) {
  329. case nil:
  330. // nothing.
  331. case StreamError:
  332. if err := sc.resetStreamInLoop(ev); err != nil {
  333. sc.logf("Error writing RSTSTream: %v", err)
  334. return
  335. }
  336. case ConnectionError:
  337. sc.logf("Disconnecting; %v", ev)
  338. return
  339. case goAwayFlowError:
  340. if err := sc.goAway(ErrCodeFlowControl); err != nil {
  341. sc.condlogf(err, "failed to GOAWAY: %v", err)
  342. return
  343. }
  344. default:
  345. sc.logf("Disconnection due to other error: %v", err)
  346. return
  347. }
  348. }
  349. }
  350. }
  351. func (sc *serverConn) goAway(code ErrCode) error {
  352. sc.serveG.check()
  353. sc.sentGoAway = true
  354. return sc.framer.WriteGoAway(sc.maxStreamID, code, nil)
  355. }
  356. func (sc *serverConn) resetStreamInLoop(se StreamError) error {
  357. sc.serveG.check()
  358. if err := sc.framer.WriteRSTStream(se.streamID, uint32(se.code)); err != nil {
  359. return err
  360. }
  361. delete(sc.streams, se.streamID)
  362. return nil
  363. }
  364. func (sc *serverConn) curHeaderStreamID() uint32 {
  365. sc.serveG.check()
  366. st := sc.req.stream
  367. if st == nil {
  368. return 0
  369. }
  370. return st.id
  371. }
  372. func (sc *serverConn) processFrame(f Frame) error {
  373. sc.serveG.check()
  374. if s := sc.curHeaderStreamID(); s != 0 {
  375. if cf, ok := f.(*ContinuationFrame); !ok {
  376. return ConnectionError(ErrCodeProtocol)
  377. } else if cf.Header().StreamID != s {
  378. return ConnectionError(ErrCodeProtocol)
  379. }
  380. }
  381. switch f := f.(type) {
  382. case *SettingsFrame:
  383. return sc.processSettings(f)
  384. case *HeadersFrame:
  385. return sc.processHeaders(f)
  386. case *ContinuationFrame:
  387. return sc.processContinuation(f)
  388. case *WindowUpdateFrame:
  389. return sc.processWindowUpdate(f)
  390. case *PingFrame:
  391. return sc.processPing(f)
  392. case *DataFrame:
  393. return sc.processData(f)
  394. default:
  395. log.Printf("Ignoring unknown frame %#v", f)
  396. return nil
  397. }
  398. }
  399. func (sc *serverConn) processPing(f *PingFrame) error {
  400. sc.serveG.check()
  401. if f.Flags.Has(FlagSettingsAck) {
  402. // 6.7 PING: " An endpoint MUST NOT respond to PING frames
  403. // containing this flag."
  404. return nil
  405. }
  406. if f.StreamID != 0 {
  407. // "PING frames are not associated with any individual
  408. // stream. If a PING frame is received with a stream
  409. // identifier field value other than 0x0, the recipient MUST
  410. // respond with a connection error (Section 5.4.1) of type
  411. // PROTOCOL_ERROR."
  412. return ConnectionError(ErrCodeProtocol)
  413. }
  414. return sc.framer.WritePing(true, f.Data)
  415. }
  416. func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
  417. sc.serveG.check()
  418. switch {
  419. case f.StreamID != 0: // stream-level flow control
  420. st := sc.streams[f.StreamID]
  421. if st == nil {
  422. // "WINDOW_UPDATE can be sent by a peer that has sent a
  423. // frame bearing the END_STREAM flag. This means that a
  424. // receiver could receive a WINDOW_UPDATE frame on a "half
  425. // closed (remote)" or "closed" stream. A receiver MUST
  426. // NOT treat this as an error, see Section 5.1."
  427. return nil
  428. }
  429. if !st.flow.add(int32(f.Increment)) {
  430. return StreamError{f.StreamID, ErrCodeFlowControl}
  431. }
  432. default: // connection-level flow control
  433. if !sc.flow.add(int32(f.Increment)) {
  434. return goAwayFlowError{}
  435. }
  436. }
  437. return nil
  438. }
  439. func (sc *serverConn) processSettings(f *SettingsFrame) error {
  440. sc.serveG.check()
  441. return f.ForeachSetting(sc.processSetting)
  442. }
  443. func (sc *serverConn) processSetting(s Setting) error {
  444. sc.serveG.check()
  445. sc.vlogf("processing setting %v", s)
  446. switch s.ID {
  447. case SettingInitialWindowSize:
  448. return sc.processSettingInitialWindowSize(s.Val)
  449. }
  450. log.Printf("TODO: handle %v", s)
  451. return nil
  452. }
  453. func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
  454. sc.serveG.check()
  455. if val > (1<<31 - 1) {
  456. // 6.5.2 Defined SETTINGS Parameters
  457. // "Values above the maximum flow control window size of
  458. // 231-1 MUST be treated as a connection error (Section
  459. // 5.4.1) of type FLOW_CONTROL_ERROR."
  460. return ConnectionError(ErrCodeFlowControl)
  461. }
  462. // "A SETTINGS frame can alter the initial flow control window
  463. // size for all current streams. When the value of
  464. // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
  465. // adjust the size of all stream flow control windows that it
  466. // maintains by the difference between the new value and the
  467. // old value."
  468. old := sc.initialWindowSize
  469. sc.initialWindowSize = int32(val)
  470. growth := sc.initialWindowSize - old // may be negative
  471. for _, st := range sc.streams {
  472. if !st.flow.add(growth) {
  473. // 6.9.2 Initial Flow Control Window Size
  474. // "An endpoint MUST treat a change to
  475. // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
  476. // control window to exceed the maximum size as a
  477. // connection error (Section 5.4.1) of type
  478. // FLOW_CONTROL_ERROR."
  479. return ConnectionError(ErrCodeFlowControl)
  480. }
  481. }
  482. return nil
  483. }
  484. func (sc *serverConn) processData(f *DataFrame) error {
  485. sc.serveG.check()
  486. // "If a DATA frame is received whose stream is not in "open"
  487. // or "half closed (local)" state, the recipient MUST respond
  488. // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
  489. id := f.Header().StreamID
  490. st, ok := sc.streams[id]
  491. if !ok || (st.state != stateOpen && st.state != stateHalfClosedLocal) {
  492. return StreamError{id, ErrCodeStreamClosed}
  493. }
  494. if st.body == nil {
  495. // Not expecting data.
  496. // TODO: which error code?
  497. return StreamError{id, ErrCodeStreamClosed}
  498. }
  499. data := f.Data()
  500. // Sender sending more than they'd declared?
  501. if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
  502. st.body.Close(fmt.Errorf("Sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
  503. return StreamError{id, ErrCodeStreamClosed}
  504. }
  505. if len(data) > 0 {
  506. // TODO: verify they're allowed to write with the flow control
  507. // window we'd advertised to them.
  508. // TODO: verify n from Write
  509. if _, err := st.body.Write(data); err != nil {
  510. return StreamError{id, ErrCodeStreamClosed}
  511. }
  512. st.bodyBytes += int64(len(data))
  513. }
  514. if f.Header().Flags.Has(FlagDataEndStream) {
  515. if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
  516. st.body.Close(fmt.Errorf("Request declared a Content-Length of %d but only wrote %d bytes",
  517. st.declBodyBytes, st.bodyBytes))
  518. } else {
  519. st.body.Close(io.EOF)
  520. }
  521. }
  522. return nil
  523. }
  524. func (sc *serverConn) processHeaders(f *HeadersFrame) error {
  525. sc.serveG.check()
  526. id := f.Header().StreamID
  527. if sc.sentGoAway {
  528. // Ignore.
  529. return nil
  530. }
  531. // http://http2.github.io/http2-spec/#rfc.section.5.1.1
  532. if id%2 != 1 || id <= sc.maxStreamID || sc.req.stream != nil {
  533. // Streams initiated by a client MUST use odd-numbered
  534. // stream identifiers. [...] The identifier of a newly
  535. // established stream MUST be numerically greater than all
  536. // streams that the initiating endpoint has opened or
  537. // reserved. [...] An endpoint that receives an unexpected
  538. // stream identifier MUST respond with a connection error
  539. // (Section 5.4.1) of type PROTOCOL_ERROR.
  540. return ConnectionError(ErrCodeProtocol)
  541. }
  542. if id > sc.maxStreamID {
  543. sc.maxStreamID = id
  544. }
  545. st := &stream{
  546. id: id,
  547. state: stateOpen,
  548. flow: newFlow(sc.initialWindowSize),
  549. }
  550. if f.Header().Flags.Has(FlagHeadersEndStream) {
  551. st.state = stateHalfClosedRemote
  552. }
  553. sc.streams[id] = st
  554. sc.req = requestParam{
  555. stream: st,
  556. header: make(http.Header),
  557. }
  558. return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
  559. }
  560. func (sc *serverConn) processContinuation(f *ContinuationFrame) error {
  561. sc.serveG.check()
  562. st := sc.streams[f.Header().StreamID]
  563. if st == nil || sc.curHeaderStreamID() != st.id {
  564. return ConnectionError(ErrCodeProtocol)
  565. }
  566. return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
  567. }
  568. func (sc *serverConn) processHeaderBlockFragment(st *stream, frag []byte, end bool) error {
  569. sc.serveG.check()
  570. if _, err := sc.hpackDecoder.Write(frag); err != nil {
  571. // TODO: convert to stream error I assume?
  572. return err
  573. }
  574. if !end {
  575. return nil
  576. }
  577. if err := sc.hpackDecoder.Close(); err != nil {
  578. // TODO: convert to stream error I assume?
  579. return err
  580. }
  581. rw, req, err := sc.newWriterAndRequest()
  582. sc.req = requestParam{}
  583. if err != nil {
  584. return err
  585. }
  586. st.body = req.Body.(*requestBody).pipe // may be nil
  587. st.declBodyBytes = req.ContentLength
  588. go sc.runHandler(rw, req)
  589. return nil
  590. }
  591. func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, error) {
  592. sc.serveG.check()
  593. rp := &sc.req
  594. if rp.invalidHeader || rp.method == "" || rp.path == "" ||
  595. (rp.scheme != "https" && rp.scheme != "http") {
  596. // See 8.1.2.6 Malformed Requests and Responses:
  597. //
  598. // Malformed requests or responses that are detected
  599. // MUST be treated as a stream error (Section 5.4.2)
  600. // of type PROTOCOL_ERROR."
  601. //
  602. // 8.1.2.3 Request Pseudo-Header Fields
  603. // "All HTTP/2 requests MUST include exactly one valid
  604. // value for the :method, :scheme, and :path
  605. // pseudo-header fields"
  606. return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol}
  607. }
  608. var tlsState *tls.ConnectionState // make this non-nil if https
  609. if rp.scheme == "https" {
  610. // TODO: get from sc's ConnectionState
  611. tlsState = &tls.ConnectionState{}
  612. }
  613. authority := rp.authority
  614. if authority == "" {
  615. authority = rp.header.Get("Host")
  616. }
  617. bodyOpen := rp.stream.state == stateOpen
  618. body := &requestBody{
  619. sc: sc,
  620. streamID: rp.stream.id,
  621. }
  622. req := &http.Request{
  623. Method: rp.method,
  624. URL: &url.URL{},
  625. RemoteAddr: sc.conn.RemoteAddr().String(),
  626. Header: rp.header,
  627. RequestURI: rp.path,
  628. Proto: "HTTP/2.0",
  629. ProtoMajor: 2,
  630. ProtoMinor: 0,
  631. TLS: tlsState,
  632. Host: authority,
  633. Body: body,
  634. }
  635. if bodyOpen {
  636. body.pipe = &pipe{
  637. b: buffer{buf: make([]byte, 65536)}, // TODO: share/remove
  638. }
  639. body.pipe.c.L = &body.pipe.m
  640. if vv, ok := rp.header["Content-Length"]; ok {
  641. req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
  642. } else {
  643. req.ContentLength = -1
  644. }
  645. }
  646. rws := responseWriterStatePool.Get().(*responseWriterState)
  647. rws.sc = sc
  648. rws.streamID = rp.stream.id
  649. rws.req = req
  650. rws.body = body
  651. rws.wbuf.Reset()
  652. rw := &responseWriter{rws: rws}
  653. return rw, req, nil
  654. }
  655. var responseWriterStatePool = sync.Pool{
  656. New: func() interface{} {
  657. return new(responseWriterState)
  658. },
  659. }
  660. // Run on its own goroutine.
  661. func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
  662. defer rw.handlerDone()
  663. // TODO: catch panics like net/http.Server
  664. sc.handler.ServeHTTP(rw, req)
  665. }
  666. // called from handler goroutines
  667. func (sc *serverConn) writeData(streamID uint32, p []byte) (n int, err error) {
  668. // TODO: implement
  669. log.Printf("WRITE on %d: %q", streamID, p)
  670. return len(p), nil
  671. }
  672. // headerWriteReq is a request to write an HTTP response header from a server Handler.
  673. type headerWriteReq struct {
  674. streamID uint32
  675. httpResCode int
  676. h http.Header // may be nil
  677. endStream bool
  678. }
  679. // called from handler goroutines.
  680. // h may be nil.
  681. func (sc *serverConn) writeHeader(req headerWriteReq) {
  682. sc.writeHeaderCh <- req
  683. }
  684. func (sc *serverConn) writeHeaderInLoop(req headerWriteReq) error {
  685. sc.serveG.check()
  686. sc.headerWriteBuf.Reset()
  687. sc.hpackEncoder.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(req.httpResCode)})
  688. for k, vv := range req.h {
  689. for _, v := range vv {
  690. // TODO: for gargage, cache lowercase copies of headers at
  691. // least for common ones and/or popular recent ones for
  692. // this serverConn. LRU?
  693. sc.hpackEncoder.WriteField(hpack.HeaderField{Name: strings.ToLower(k), Value: v})
  694. }
  695. }
  696. headerBlock := sc.headerWriteBuf.Bytes()
  697. if len(headerBlock) > int(sc.maxWriteFrameSize) {
  698. // we'll need continuation ones.
  699. panic("TODO")
  700. }
  701. return sc.framer.WriteHeaders(HeadersFrameParam{
  702. StreamID: req.streamID,
  703. BlockFragment: headerBlock,
  704. EndStream: req.endStream,
  705. EndHeaders: true, // no continuation yet
  706. })
  707. }
  708. type windowUpdateReq struct {
  709. streamID uint32
  710. n uint32
  711. }
  712. // called from handler goroutines
  713. func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) {
  714. const maxUint32 = 2147483647
  715. for n >= maxUint32 {
  716. sc.windowUpdateCh <- windowUpdateReq{streamID, maxUint32}
  717. n -= maxUint32
  718. }
  719. if n > 0 {
  720. sc.windowUpdateCh <- windowUpdateReq{streamID, uint32(n)}
  721. }
  722. }
  723. func (sc *serverConn) sendWindowUpdateInLoop(wu windowUpdateReq) error {
  724. sc.serveG.check()
  725. // TODO: sc.bufferedOutput.StartBuffering()
  726. if err := sc.framer.WriteWindowUpdate(0, wu.n); err != nil {
  727. return err
  728. }
  729. if err := sc.framer.WriteWindowUpdate(wu.streamID, wu.n); err != nil {
  730. return err
  731. }
  732. // TODO: return sc.bufferedOutput.Flush()
  733. return nil
  734. }
  735. type requestBody struct {
  736. sc *serverConn
  737. streamID uint32
  738. closed bool
  739. pipe *pipe // non-nil if we have a HTTP entity message body
  740. }
  741. var errClosedBody = errors.New("body closed by handler")
  742. func (b *requestBody) Close() error {
  743. if b.pipe != nil {
  744. b.pipe.Close(errClosedBody)
  745. }
  746. b.closed = true
  747. return nil
  748. }
  749. func (b *requestBody) Read(p []byte) (n int, err error) {
  750. if b.pipe == nil {
  751. return 0, io.EOF
  752. }
  753. n, err = b.pipe.Read(p)
  754. if n > 0 {
  755. b.sc.sendWindowUpdate(b.streamID, n)
  756. // TODO: tell b.sc to send back 'n' flow control quota credits to the sender
  757. }
  758. return
  759. }
  760. // responseWriter is the http.ResponseWriter implementation. It's
  761. // intentionally small (1 pointer wide) to minimize garbage. The
  762. // responseWriterState pointer inside is zeroed at the end of a
  763. // request (in handlerDone) and calls on the responseWriter thereafter
  764. // simply crash (caller's mistake), but the much larger responseWriterState
  765. // and buffers are reused between multiple requests.
  766. type responseWriter struct {
  767. rws *responseWriterState
  768. }
  769. type responseWriterState struct {
  770. // immutable within a request:
  771. sc *serverConn
  772. streamID uint32
  773. req *http.Request
  774. body *requestBody // to close at end of request, if DATA frames didn't
  775. wbuf bytes.Buffer
  776. // mutated by http.Handler goroutine:
  777. h http.Header // h goes from maybe-nil to non-nil; contents changed by http.Handler goroutine
  778. wroteHeaders bool
  779. calledHeader bool
  780. }
  781. // Optional http.ResponseWriter interfaces implemented.
  782. var (
  783. _ http.Flusher = (*responseWriter)(nil)
  784. _ stringWriter = (*responseWriter)(nil)
  785. // TODO: hijacker for websockets
  786. )
  787. func (w *responseWriter) Flush() {
  788. // TODO: implement
  789. }
  790. // TODO: bufio writing of responseWriter. add Flush, add pools of
  791. // bufio.Writers, adjust bufio writer sized based on frame size
  792. // updates from peer? For now: naive.
  793. func (w *responseWriter) Header() http.Header {
  794. rws := w.rws
  795. if rws == nil {
  796. panic("Header called after Handler finished")
  797. }
  798. if rws.h == nil {
  799. rws.h = make(http.Header)
  800. }
  801. rws.calledHeader = true
  802. return rws.h
  803. }
  804. func (w *responseWriter) WriteHeader(code int) {
  805. rws := w.rws
  806. if rws == nil {
  807. panic("WriteHeader called after Handler finished")
  808. }
  809. if rws.wroteHeaders {
  810. return
  811. }
  812. // TODO: defer actually writing this frame until a Flush or
  813. // handlerDone, like net/http's Server. then we can coalesce
  814. // e.g. a 204 response to have a Header response frame with
  815. // END_STREAM set, without a separate frame being sent in
  816. // handleDone.
  817. rws.wroteHeaders = true
  818. rws.sc.writeHeader(headerWriteReq{
  819. streamID: rws.streamID,
  820. httpResCode: code,
  821. h: rws.h,
  822. })
  823. }
  824. func (w *responseWriter) WriteString(s string) (n int, err error) {
  825. // TODO: better impl
  826. return w.Write([]byte(s))
  827. }
  828. func (w *responseWriter) Write(p []byte) (n int, err error) {
  829. rws := w.rws
  830. if rws == nil {
  831. panic("Write called after Handler finished")
  832. }
  833. if !rws.wroteHeaders {
  834. w.WriteHeader(200)
  835. }
  836. return rws.sc.writeData(rws.streamID, p) // blocks waiting for tokens
  837. }
  838. func (w *responseWriter) handlerDone() {
  839. rws := w.rws
  840. if rws == nil {
  841. panic("handlerDone called twice")
  842. }
  843. w.rws = nil
  844. if !rws.wroteHeaders {
  845. rws.sc.writeHeader(headerWriteReq{
  846. streamID: rws.streamID,
  847. httpResCode: 200,
  848. h: rws.h,
  849. endStream: true, // handler has finished; can't be any data.
  850. })
  851. }
  852. // TODO: recycle rws back into a pool
  853. }