Browse Source

Start of proper net/http Server integration.

Brad Fitzpatrick 11 years ago
parent
commit
7b49447ec9
1 changed files with 66 additions and 23 deletions
  1. 66 23
      http2.go

+ 66 - 23
http2.go

@@ -52,22 +52,37 @@ type Server struct {
 
 
 func (srv *Server) handleConn(hs *http.Server, c *tls.Conn, h http.Handler) {
 func (srv *Server) handleConn(hs *http.Server, c *tls.Conn, h http.Handler) {
 	sc := &serverConn{
 	sc := &serverConn{
-		hs:          hs,
-		conn:        c,
-		handler:     h,
-		framer:      NewFramer(c, c),
-		streams:     make(map[uint32]*stream),
-		canonHeader: make(map[string]string),
+		hs:             hs,
+		conn:           c,
+		handler:        h,
+		framer:         NewFramer(c, c),
+		streams:        make(map[uint32]*stream),
+		canonHeader:    make(map[string]string),
+		readFrameCh:    make(chan frameAndProcessed),
+		readFrameErrCh: make(chan error, 1),
+		doneServing:    make(chan struct{}),
 	}
 	}
 	sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
 	sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
 	sc.serve()
 	sc.serve()
 }
 }
 
 
+// frameAndProcessed coordinates the readFrames and serve goroutines, since
+// the Framer interface only permits the most recently-read Frame from being
+// accessed. The serve goroutine sends on processed to signal to the readFrames
+// goroutine that another frame may be read.
+type frameAndProcessed struct {
+	f         Frame
+	processed chan struct{}
+}
+
 type serverConn struct {
 type serverConn struct {
-	hs      *http.Server
-	conn    *tls.Conn
-	handler http.Handler
-	framer  *Framer
+	hs             *http.Server
+	conn           *tls.Conn
+	handler        http.Handler
+	framer         *Framer
+	doneServing    chan struct{}          // closed when serverConn.serve ends
+	readFrameCh    chan frameAndProcessed // written by serverConn.readFrames
+	readFrameErrCh chan error
 
 
 	maxStreamID uint32 // max ever seen
 	maxStreamID uint32 // max ever seen
 	streams     map[uint32]*stream
 	streams     map[uint32]*stream
@@ -156,10 +171,27 @@ func (sc *serverConn) canonicalHeader(v string) string {
 	return cv
 	return cv
 }
 }
 
 
+func (sc *serverConn) readFrames() {
+	processed := make(chan struct{}, 1)
+	for {
+		f, err := sc.framer.ReadFrame()
+		if err != nil {
+			close(sc.readFrameCh)
+			sc.readFrameErrCh <- err
+			return
+		}
+		sc.readFrameCh <- frameAndProcessed{f, processed}
+		<-processed
+	}
+}
+
 func (sc *serverConn) serve() {
 func (sc *serverConn) serve() {
 	defer sc.conn.Close()
 	defer sc.conn.Close()
+	defer close(sc.doneServing)
+
 	log.Printf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
 	log.Printf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
 
 
+	// Read the client preface
 	buf := make([]byte, len(ClientPreface))
 	buf := make([]byte, len(ClientPreface))
 	// TODO: timeout reading from the client
 	// TODO: timeout reading from the client
 	if _, err := io.ReadFull(sc.conn, buf); err != nil {
 	if _, err := io.ReadFull(sc.conn, buf); err != nil {
@@ -171,23 +203,34 @@ func (sc *serverConn) serve() {
 		return
 		return
 	}
 	}
 	log.Printf("client %v said hello", sc.conn.RemoteAddr())
 	log.Printf("client %v said hello", sc.conn.RemoteAddr())
-	for {
 
 
-		f, err := sc.framer.ReadFrame()
-		if err == nil {
+	go sc.readFrames()
+
+	for {
+		select {
+		case fp, ok := <-sc.readFrameCh:
+			if !ok {
+				err := <-sc.readFrameErrCh
+				if err != io.EOF {
+					sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), err)
+				}
+				return
+			}
+			f := fp.f
 			log.Printf("got %v: %#v", f.Header(), f)
 			log.Printf("got %v: %#v", f.Header(), f)
-			err = sc.processFrame(f)
-		}
-		if h2e, ok := err.(Error); ok {
-			if h2e.IsConnectionError() {
-				sc.logf("Disconnection; connection error: %v", err)
+			err := sc.processFrame(f)
+			fp.processed <- struct{}{} // let readFrames proceed
+			if h2e, ok := err.(Error); ok {
+				if h2e.IsConnectionError() {
+					sc.logf("Disconnection; connection error: %v", err)
+					return
+				}
+				// TODO: stream errors, etc
+			}
+			if err != nil {
+				sc.logf("Disconnection due to other error: %v", err)
 				return
 				return
 			}
 			}
-			// TODO: stream errors, etc
-		}
-		if err != nil {
-			sc.logf("Disconnection due to other error: %v", err)
-			return
 		}
 		}
 	}
 	}
 }
 }