Browse Source

Fix a crash and state transitions when handler closes while client still open

From the commit:

// This previously crashed (reported by Mathieu Lonjaret as observed
// while using Camlistore) because we got a DATA frame from the client
// after the handler exited and our logic at the time was wrong,
// keeping a stream in the map in stateClosed, which tickled an
// invariant check later when we tried to remove that stream (via
// defer sc.closeAllStreamsOnConnClose) when the serverConn serve loop
// ended.

Also adding a possible TODO at top:

// TODO (maybe): add a mechanism for Handlers to going into half-closed-local
// mode (rw.(io.Closer) test?) but not exit their handler, and
// continue to be able to read from the Request.Body. This would be a
// somewhat semantic change from HTTP/1 (or at least what we expose in
// net/http), so I'd probably want to add it there
// too. For now, this package says that returning from the Handler
// ServeHTTP function means you're both done reading and done writing,
// without a way to stop just one or the other.
Brad Fitzpatrick 11 years ago
parent
commit
f3a6d9a1b0
2 changed files with 68 additions and 20 deletions
  1. 37 18
      server.go
  2. 31 2
      server_test.go

+ 37 - 18
server.go

@@ -55,6 +55,7 @@ var responseWriterStatePool = sync.Pool{
 var (
 	testHookOnConn        func()
 	testHookGetServerConn func(*serverConn)
+	testHookOnPanicMu     *sync.Mutex // nil except in tests
 	testHookOnPanic       func(sc *serverConn, panicVal interface{}) (rePanic bool)
 )
 
@@ -76,6 +77,15 @@ var (
 // Handlers running) and not be woken up again until the PING packet
 // returns.
 
+// TODO (maybe): add a mechanism for Handlers to going into
+// half-closed-local mode (rw.(io.Closer) test?) but not exit their
+// handler, and continue to be able to read from the
+// Request.Body. This would be a somewhat semantic change from HTTP/1
+// (or at least what we expose in net/http), so I'd probably want to
+// add it there too. For now, this package says that returning from
+// the Handler ServeHTTP function means you're both done reading and
+// done writing, without a way to stop just one or the other.
+
 // Server is an HTTP/2 server.
 type Server struct {
 	// MaxHandlers limits the number of http.Handler ServeHTTP goroutines
@@ -452,6 +462,10 @@ func (sc *serverConn) stopShutdownTimer() {
 }
 
 func (sc *serverConn) notePanic() {
+	if testHookOnPanicMu != nil {
+		testHookOnPanicMu.Lock()
+		defer testHookOnPanicMu.Unlock()
+	}
 	if testHookOnPanic != nil {
 		if e := recover(); e != nil {
 			if testHookOnPanic(sc, e) {
@@ -642,7 +656,17 @@ func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
 		}
 		switch st.state {
 		case stateOpen:
-			st.state = stateHalfClosedLocal
+			// Here we would go to stateHalfClosedLocal in
+			// theory, but since our handler is done and
+			// the net/http package provides no mechanism
+			// for finishing writing to a ResponseWriter
+			// while still reading data (see possible TODO
+			// at top of this file), we go into closed
+			// state here anyway, after telling the peer
+			// we're hanging up on them.
+			st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
+			errCancel := StreamError{st.id, ErrCodeCancel}
+			sc.resetStream(errCancel)
 		case stateHalfClosedRemote:
 			sc.closeStream(st, nil)
 		}
@@ -720,13 +744,11 @@ func (sc *serverConn) shutDownIn(d time.Duration) {
 
 func (sc *serverConn) resetStream(se StreamError) {
 	sc.serveG.check()
-	st, ok := sc.streams[se.StreamID]
-	if !ok {
-		panic("internal package error; resetStream called on non-existent stream")
-	}
 	sc.writeFrame(frameWriteMsg{write: se})
-	st.sentReset = true
-	sc.closeStream(st, se)
+	if st, ok := sc.streams[se.StreamID]; ok {
+		st.sentReset = true
+		sc.closeStream(st, se)
+	}
 }
 
 // curHeaderStreamID returns the stream ID of the header block we're
@@ -911,7 +933,7 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
 func (sc *serverConn) closeStream(st *stream, err error) {
 	sc.serveG.check()
 	if st.state == stateIdle || st.state == stateClosed {
-		panic("invariant")
+		panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
 	}
 	st.state = stateClosed
 	sc.curOpenStreams--
@@ -1006,7 +1028,12 @@ func (sc *serverConn) processData(f *DataFrame) error {
 	// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
 	id := f.Header().StreamID
 	st, ok := sc.streams[id]
-	if !ok || (st.state != stateOpen && st.state != stateHalfClosedLocal) {
+	if !ok || st.state != stateOpen {
+		// This includes sending a RST_STREAM if the stream is
+		// in stateHalfClosedLocal (which currently means that
+		// the http.Handler returned, so it's done reading &
+		// done writing). Try to stop the client from sending
+		// more DATA.
 		return StreamError{id, ErrCodeStreamClosed}
 	}
 	if st.body == nil {
@@ -1043,15 +1070,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
 		} else {
 			st.body.Close(io.EOF)
 		}
-		switch st.state {
-		case stateOpen:
-			st.state = stateHalfClosedRemote
-		case stateHalfClosedLocal:
-			// TODO: this causes a known crash (currently skipped
-			// test in server_test.go). We shouldn't leave
-			// streams in the map in stateClosed.
-			st.state = stateClosed
-		}
+		st.state = stateHalfClosedRemote
 	}
 	return nil
 }

+ 31 - 2
server_test.go

@@ -41,8 +41,15 @@ type serverTester struct {
 	logFilter []string // substrings to filter out
 }
 
+func init() {
+	testHookOnPanicMu = new(sync.Mutex)
+}
+
 func newServerTester(t *testing.T, handler http.HandlerFunc) *serverTester {
+	testHookOnPanicMu.Lock()
 	testHookOnPanic = nil
+	testHookOnPanicMu.Unlock()
+
 	logBuf := new(bytes.Buffer)
 	ts := httptest.NewUnstartedServer(handler)
 	ConfigureServer(ts.Config, &Server{})
@@ -1792,9 +1799,14 @@ func TestServer_Response_ManyHeaders_With_Continuation(t *testing.T) {
 	})
 }
 
+// This previously crashed (reported by Mathieu Lonjaret as observed
+// while using Camlistore) because we got a DATA frame from the client
+// after the handler exited and our logic at the time was wrong,
+// keeping a stream in the map in stateClosed, which tickled an
+// invariant check later when we tried to remove that stream (via
+// defer sc.closeAllStreamsOnConnClose) when the serverConn serve loop
+// ended.
 func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
-	condSkipFailingTest(t)
-
 	testServerResponse(t, func(w http.ResponseWriter, r *http.Request) error {
 		// nothing
 		return nil
@@ -1809,6 +1821,11 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
 		if !hf.HeadersEnded() || !hf.StreamEnded() {
 			t.Fatalf("want END_HEADERS+END_STREAM, got %v", hf)
 		}
+
+		// Sent when the a Handler closes while a client has
+		// indicated it's still sending DATA:
+		st.wantRSTStream(1, ErrCodeCancel)
+
 		// Now the handler has ended, so it's ended its
 		// stream, but the client hasn't closed its side
 		// (stateClosedLocal).  So send more data and verify
@@ -1816,17 +1833,29 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
 		// it did before.
 		st.writeData(1, true, []byte("foo"))
 
+		// Sent after a peer sends data anyway (admittedly the
+		// previous RST_STREAM might've still been in-flight),
+		// but they'll get the more friendly 'cancel' code
+		// first.
+		st.wantRSTStream(1, ErrCodeStreamClosed)
+
+		// Set up a bunch of machinery to record the panic we saw
+		// previously.
 		var (
 			panMu    sync.Mutex
 			panicVal interface{}
 		)
+
+		testHookOnPanicMu.Lock()
 		testHookOnPanic = func(sc *serverConn, pv interface{}) bool {
 			panMu.Lock()
 			panicVal = pv
 			panMu.Unlock()
 			return true
 		}
+		testHookOnPanicMu.Unlock()
 
+		// Now force the serve loop to end, via closing the connection.
 		st.cc.Close()
 		select {
 		case <-st.sc.doneServing: