Browse Source

http2: reduce the number of select cases in serverConn.server

This drops the number of select cases in serverConn.server from 10 to
6. It was 7 in Go 1.7. It increased to 10 in Go 1.8.

* replace testing-only testHookCh with always-on grab-bag serveMsgCh
  to be used for both test purposes, and infrequently used message
  types

* remove the settingsTimer.C case for the initial settings timer
  and use serveMsgCh and time.AfterFunc instead

* ... and do the same for the idle timeout timer.

* ... and for the shutdown timer.

* remove wantStartPushCh and just send the *startPushRequest to
  serveMsgCh too

I could go further with this (and plan to, later), but these are the
safe and easy ones that don't require more work elsewhere.

The speed gets better the more the request/response go via the
serverConn.serve loop. (once per Request.Body.Read or
ResponseWriter.Flush):

  name            old time/op    new time/op  delta
  ServerGets-4    138µs ± 3%     134µs ± 2%   -2.54%   (p=0.002 n=10+10)
  ServerPosts-4   176µs ±27%     154µs ± 2%   -12.62%  (p=0.011 n=10+10)

Updates kubernetes/kubernetes#45216
Updates golang/go#20302

Change-Id: I18019554089d7e3d76355d7137b5957e9597e803
Reviewed-on: https://go-review.googlesource.com/43034
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Tom Bergan <tombergan@google.com>
Brad Fitzpatrick 8 years ago
parent
commit
34057069f4
3 changed files with 62 additions and 35 deletions
  1. 58 30
      http2/server.go
  2. 1 1
      http2/server_push_test.go
  3. 3 4
      http2/server_test.go

+ 58 - 30
http2/server.go

@@ -292,7 +292,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
 		streams:                     make(map[uint32]*stream),
 		streams:                     make(map[uint32]*stream),
 		readFrameCh:                 make(chan readFrameResult),
 		readFrameCh:                 make(chan readFrameResult),
 		wantWriteFrameCh:            make(chan FrameWriteRequest, 8),
 		wantWriteFrameCh:            make(chan FrameWriteRequest, 8),
-		wantStartPushCh:             make(chan startPushRequest, 8),
+		serveMsgCh:                  make(chan interface{}, 8),
 		wroteFrameCh:                make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
 		wroteFrameCh:                make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
 		bodyReadCh:                  make(chan bodyReadMsg),         // buffering doesn't matter either way
 		bodyReadCh:                  make(chan bodyReadMsg),         // buffering doesn't matter either way
 		doneServing:                 make(chan struct{}),
 		doneServing:                 make(chan struct{}),
@@ -405,10 +405,9 @@ type serverConn struct {
 	doneServing      chan struct{}          // closed when serverConn.serve ends
 	doneServing      chan struct{}          // closed when serverConn.serve ends
 	readFrameCh      chan readFrameResult   // written by serverConn.readFrames
 	readFrameCh      chan readFrameResult   // written by serverConn.readFrames
 	wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
 	wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
-	wantStartPushCh  chan startPushRequest  // from handlers -> serve
 	wroteFrameCh     chan frameWriteResult  // from writeFrameAsync -> serve, tickles more frame writes
 	wroteFrameCh     chan frameWriteResult  // from writeFrameAsync -> serve, tickles more frame writes
 	bodyReadCh       chan bodyReadMsg       // from handlers -> serve
 	bodyReadCh       chan bodyReadMsg       // from handlers -> serve
-	testHookCh       chan func(int)         // code to run on the serve loop
+	serveMsgCh       chan interface{}       // misc messages & code to send to / run on the serve loop
 	flow             flow                   // conn-wide (not stream-specific) outbound flow control
 	flow             flow                   // conn-wide (not stream-specific) outbound flow control
 	inflow           flow                   // conn-wide inbound flow control
 	inflow           flow                   // conn-wide inbound flow control
 	tlsState         *tls.ConnectionState   // shared by all handlers, like net/http
 	tlsState         *tls.ConnectionState   // shared by all handlers, like net/http
@@ -440,10 +439,8 @@ type serverConn struct {
 	inFrameScheduleLoop         bool              // whether we're in the scheduleFrameWrite loop
 	inFrameScheduleLoop         bool              // whether we're in the scheduleFrameWrite loop
 	needToSendGoAway            bool              // we need to schedule a GOAWAY frame write
 	needToSendGoAway            bool              // we need to schedule a GOAWAY frame write
 	goAwayCode                  ErrCode
 	goAwayCode                  ErrCode
-	shutdownTimerCh             <-chan time.Time // nil until used
-	shutdownTimer               *time.Timer      // nil until used
-	idleTimer                   *time.Timer      // nil if unused
-	idleTimerCh                 <-chan time.Time // nil if unused
+	shutdownTimer               *time.Timer // nil until used
+	idleTimer                   *time.Timer // nil if unused
 
 
 	// Owned by the writeFrameAsync goroutine:
 	// Owned by the writeFrameAsync goroutine:
 	headerWriteBuf bytes.Buffer
 	headerWriteBuf bytes.Buffer
@@ -748,9 +745,8 @@ func (sc *serverConn) serve() {
 	sc.setConnState(http.StateIdle)
 	sc.setConnState(http.StateIdle)
 
 
 	if sc.srv.IdleTimeout != 0 {
 	if sc.srv.IdleTimeout != 0 {
-		sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout)
+		sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
 		defer sc.idleTimer.Stop()
 		defer sc.idleTimer.Stop()
-		sc.idleTimerCh = sc.idleTimer.C
 	}
 	}
 
 
 	var gracefulShutdownCh chan struct{}
 	var gracefulShutdownCh chan struct{}
@@ -764,7 +760,9 @@ func (sc *serverConn) serve() {
 
 
 	go sc.readFrames() // closed by defer sc.conn.Close above
 	go sc.readFrames() // closed by defer sc.conn.Close above
 
 
-	settingsTimer := time.NewTimer(firstSettingsTimeout)
+	settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
+	defer settingsTimer.Stop()
+
 	loopNum := 0
 	loopNum := 0
 	for {
 	for {
 		loopNum++
 		loopNum++
@@ -775,8 +773,6 @@ func (sc *serverConn) serve() {
 				break
 				break
 			}
 			}
 			sc.writeFrame(wr)
 			sc.writeFrame(wr)
-		case spr := <-sc.wantStartPushCh:
-			sc.startPush(spr)
 		case res := <-sc.wroteFrameCh:
 		case res := <-sc.wroteFrameCh:
 			sc.wroteFrame(res)
 			sc.wroteFrame(res)
 		case res := <-sc.readFrameCh:
 		case res := <-sc.readFrameCh:
@@ -784,26 +780,38 @@ func (sc *serverConn) serve() {
 				return
 				return
 			}
 			}
 			res.readMore()
 			res.readMore()
-			if settingsTimer.C != nil {
+			if settingsTimer != nil {
 				settingsTimer.Stop()
 				settingsTimer.Stop()
-				settingsTimer.C = nil
+				settingsTimer = nil
 			}
 			}
 		case m := <-sc.bodyReadCh:
 		case m := <-sc.bodyReadCh:
 			sc.noteBodyRead(m.st, m.n)
 			sc.noteBodyRead(m.st, m.n)
-		case <-settingsTimer.C:
-			sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
-			return
 		case <-gracefulShutdownCh:
 		case <-gracefulShutdownCh:
 			gracefulShutdownCh = nil
 			gracefulShutdownCh = nil
 			sc.startGracefulShutdown()
 			sc.startGracefulShutdown()
-		case <-sc.shutdownTimerCh:
-			sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
-			return
-		case <-sc.idleTimerCh:
-			sc.vlogf("connection is idle")
-			sc.goAway(ErrCodeNo)
-		case fn := <-sc.testHookCh:
-			fn(loopNum)
+		case msg := <-sc.serveMsgCh:
+			switch v := msg.(type) {
+			case func(int):
+				v(loopNum) // for testing
+			case *timerMessage:
+				switch v {
+				case settingsTimerMsg:
+					sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
+					return
+				case idleTimerMsg:
+					sc.vlogf("connection is idle")
+					sc.goAway(ErrCodeNo)
+				case shutdownTimerMsg:
+					sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
+					return
+				default:
+					panic("unknown timer")
+				}
+			case *startPushRequest:
+				sc.startPush(v)
+			default:
+				panic(fmt.Sprintf("unexpected type %T", v))
+			}
 		}
 		}
 
 
 		if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
 		if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
@@ -820,6 +828,27 @@ func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh
 	}
 	}
 }
 }
 
 
+type timerMessage int
+
+// Timeout message values sent to serveMsgCh.
+var (
+	settingsTimerMsg = new(timerMessage)
+	idleTimerMsg     = new(timerMessage)
+	shutdownTimerMsg = new(timerMessage)
+)
+
+func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
+func (sc *serverConn) onIdleTimer()     { sc.sendServeMsg(idleTimerMsg) }
+func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
+
+func (sc *serverConn) sendServeMsg(msg interface{}) {
+	sc.serveG.checkNotOn() // NOT
+	select {
+	case sc.serveMsgCh <- msg:
+	case <-sc.doneServing:
+	}
+}
+
 // readPreface reads the ClientPreface greeting from the peer
 // readPreface reads the ClientPreface greeting from the peer
 // or returns an error on timeout or an invalid greeting.
 // or returns an error on timeout or an invalid greeting.
 func (sc *serverConn) readPreface() error {
 func (sc *serverConn) readPreface() error {
@@ -1172,8 +1201,7 @@ func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) {
 
 
 func (sc *serverConn) shutDownIn(d time.Duration) {
 func (sc *serverConn) shutDownIn(d time.Duration) {
 	sc.serveG.check()
 	sc.serveG.check()
-	sc.shutdownTimer = time.NewTimer(d)
-	sc.shutdownTimerCh = sc.shutdownTimer.C
+	sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
 }
 }
 
 
 func (sc *serverConn) resetStream(se StreamError) {
 func (sc *serverConn) resetStream(se StreamError) {
@@ -2551,7 +2579,7 @@ func (w *responseWriter) push(target string, opts pushOptions) error {
 		return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
 		return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
 	}
 	}
 
 
-	msg := startPushRequest{
+	msg := &startPushRequest{
 		parent: st,
 		parent: st,
 		method: opts.Method,
 		method: opts.Method,
 		url:    u,
 		url:    u,
@@ -2564,7 +2592,7 @@ func (w *responseWriter) push(target string, opts pushOptions) error {
 		return errClientDisconnected
 		return errClientDisconnected
 	case <-st.cw:
 	case <-st.cw:
 		return errStreamClosed
 		return errStreamClosed
-	case sc.wantStartPushCh <- msg:
+	case sc.serveMsgCh <- msg:
 	}
 	}
 
 
 	select {
 	select {
@@ -2586,7 +2614,7 @@ type startPushRequest struct {
 	done   chan error
 	done   chan error
 }
 }
 
 
-func (sc *serverConn) startPush(msg startPushRequest) {
+func (sc *serverConn) startPush(msg *startPushRequest) {
 	sc.serveG.check()
 	sc.serveG.check()
 
 
 	// http://tools.ietf.org/html/rfc7540#section-6.6.
 	// http://tools.ietf.org/html/rfc7540#section-6.6.

+ 1 - 1
http2/server_push_test.go

@@ -508,7 +508,7 @@ func TestServer_Push_RejectAfterGoAway(t *testing.T) {
 				return
 				return
 			default:
 			default:
 			}
 			}
-			st.sc.testHookCh <- func(loopNum int) {
+			st.sc.serveMsgCh <- func(loopNum int) {
 				if !st.sc.pushEnabled {
 				if !st.sc.pushEnabled {
 					readyOnce.Do(func() { close(ready) })
 					readyOnce.Do(func() { close(ready) })
 				}
 				}

+ 3 - 4
http2/server_test.go

@@ -142,7 +142,6 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
 		st.scMu.Lock()
 		st.scMu.Lock()
 		defer st.scMu.Unlock()
 		defer st.scMu.Unlock()
 		st.sc = v
 		st.sc = v
-		st.sc.testHookCh = make(chan func(int))
 	}
 	}
 	log.SetOutput(io.MultiWriter(stderrv(), twriter{t: t, st: st}))
 	log.SetOutput(io.MultiWriter(stderrv(), twriter{t: t, st: st}))
 	if !onlyServer {
 	if !onlyServer {
@@ -187,7 +186,7 @@ func (st *serverTester) addLogFilter(phrase string) {
 
 
 func (st *serverTester) stream(id uint32) *stream {
 func (st *serverTester) stream(id uint32) *stream {
 	ch := make(chan *stream, 1)
 	ch := make(chan *stream, 1)
-	st.sc.testHookCh <- func(int) {
+	st.sc.serveMsgCh <- func(int) {
 		ch <- st.sc.streams[id]
 		ch <- st.sc.streams[id]
 	}
 	}
 	return <-ch
 	return <-ch
@@ -195,7 +194,7 @@ func (st *serverTester) stream(id uint32) *stream {
 
 
 func (st *serverTester) streamState(id uint32) streamState {
 func (st *serverTester) streamState(id uint32) streamState {
 	ch := make(chan streamState, 1)
 	ch := make(chan streamState, 1)
-	st.sc.testHookCh <- func(int) {
+	st.sc.serveMsgCh <- func(int) {
 		state, _ := st.sc.state(id)
 		state, _ := st.sc.state(id)
 		ch <- state
 		ch <- state
 	}
 	}
@@ -205,7 +204,7 @@ func (st *serverTester) streamState(id uint32) streamState {
 // loopNum reports how many times this conn's select loop has gone around.
 // loopNum reports how many times this conn's select loop has gone around.
 func (st *serverTester) loopNum() int {
 func (st *serverTester) loopNum() int {
 	lastc := make(chan int, 1)
 	lastc := make(chan int, 1)
-	st.sc.testHookCh <- func(loopNum int) {
+	st.sc.serveMsgCh <- func(loopNum int) {
 		lastc <- loopNum
 		lastc <- loopNum
 	}
 	}
 	return <-lastc
 	return <-lastc