Browse Source

Move handler writing flow control to serve goroutine's frame scheduler.

Previously the 'flow' type had a sync.Cond+Mutex and it was the
Handler goroutine that blocked on large writes, waiting for a suitable
number of flow controlled bytes to Write. Now a Handler goroutine
writes as much as it wants (it already had it in memory already),
still typically bounded to its bufio.Writer size, and it's the serve
goroutine's frame scheduler that's responsible for cutting them up
into suitably-sized DATA frames, and figuring out prioritization, and
even cutting those DATA frames up into smaller bits. (There can still
only be one Write, however large, active at a time from
Handlers... the Handler blocks waiting for its whole Write to
complete)

This change is now possible because of the changes made in f16a0b35
that added visibility into what was being written by adding the
writeFramer interface type. Now we can type-assert the 'writeData'
type and modify it as needed if it's too big for what we're allowed to
write.

Future changes will improve the frame scheduler to deal with stream
priorities, and notably now be able to cut weightedly divide write
resources when several streams have pending data and have no dependent
streams. (Because now the frame scheduler has more visibility into
everything that was pending, whereas before that knowledge was locked
up inside the goroutine that was blocked on the *flow).
Brad Fitzpatrick 11 years ago
parent
commit
2b45947877
6 changed files with 225 additions and 210 deletions
  1. 24 56
      flow.go
  2. 19 55
      flow_test.go
  3. 7 18
      http2.go
  4. 49 68
      server.go
  5. 60 3
      server_test.go
  6. 66 10
      writesched.go

+ 24 - 56
flow.go

@@ -7,77 +7,45 @@
 
 package http2
 
-import "sync"
-
-// flow is the flow control window's counting semaphore.
+// flow is the flow control window's size.
 type flow struct {
-	c      *sync.Cond // protects size
-	size   int32
-	closed bool
+	// n is the number of DATA bytes we're allowed to send.
+	// A flow is kept both on a conn and a per-stream.
+	n int32
+
+	// conn points to the shared connection-level flow that is
+	// shared by all streams on that conn. It is nil for the flow
+	// that's on the conn directly.
+	conn *flow
 }
 
-func newFlow(n int32) *flow {
-	return &flow{
-		c:    sync.NewCond(new(sync.Mutex)),
-		size: n,
-	}
-}
+func (f *flow) setConnFlow(cf *flow) { f.conn = cf }
 
-// cur returns the current number of bytes allow to write.  Obviously
-// it's not safe to call this and assume acquiring that number of
-// bytes from the acquire method won't be block in the presence of
-// concurrent acquisitions.
-func (f *flow) cur() int32 {
-	f.c.L.Lock()
-	defer f.c.L.Unlock()
-	return f.size
+func (f *flow) available() int32 {
+	n := f.n
+	if f.conn != nil && f.conn.n < n {
+		n = f.conn.n
+	}
+	return n
 }
 
-// wait waits for between 1 and n bytes (inclusive) to be available
-// and returns the number of quota bytes decremented from the quota
-// and allowed to be written. The returned value will be 0 iff the
-// stream has been killed.
-func (f *flow) wait(n int32) (got int32) {
-	if n < 0 {
-		panic("negative acquire")
+func (f *flow) take(n int32) {
+	if n > f.available() {
+		panic("internal error: took too much")
 	}
-	f.c.L.Lock()
-	defer f.c.L.Unlock()
-	for {
-		if f.closed {
-			return 0
-		}
-		if f.size >= 1 {
-			got = f.size
-			if got > n {
-				got = n
-			}
-			f.size -= got
-			return got
-		}
-		f.c.Wait()
+	f.n -= n
+	if f.conn != nil {
+		f.conn.n -= n
 	}
 }
 
 // add adds n bytes (positive or negative) to the flow control window.
 // It returns false if the sum would exceed 2^31-1.
 func (f *flow) add(n int32) bool {
-	f.c.L.Lock()
-	defer f.c.L.Unlock()
-	remain := (1<<31 - 1) - f.size
+	remain := (1<<31 - 1) - f.n
 	if n > remain {
 		return false
 	}
-	f.size += n
-	f.c.Broadcast()
+	f.n += n
 	return true
 }
-
-// close marks the flow as closed, meaning everybody gets all the
-// tokens they want, because everything else will fail anyway.
-func (f *flow) close() {
-	f.c.L.Lock()
-	defer f.c.L.Unlock()
-	f.closed = true
-	f.c.Broadcast()
-}

+ 19 - 55
flow_test.go

@@ -5,59 +5,46 @@
 
 package http2
 
-import (
-	"testing"
-	"time"
-)
+import "testing"
 
 func TestFlow(t *testing.T) {
-	f := newFlow(10)
-	if got, want := f.cur(), int32(10); got != want {
-		t.Fatalf("size = %d; want %d", got, want)
-	}
-	if got, want := f.wait(1), int32(1); got != want {
-		t.Errorf("wait = %d; want %d", got, want)
-	}
-	if got, want := f.cur(), int32(9); got != want {
-		t.Fatalf("size = %d; want %d", got, want)
-	}
-	if got, want := f.wait(20), int32(9); got != want {
-		t.Errorf("wait = %d; want %d", got, want)
+	var st flow
+	var conn flow
+	st.add(3)
+	conn.add(2)
+
+	if got, want := st.available(), int32(3); got != want {
+		t.Errorf("available = %d; want %d", got, want)
 	}
-	if got, want := f.cur(), int32(0); got != want {
-		t.Fatalf("size = %d; want %d", got, want)
+	st.setConnFlow(&conn)
+	if got, want := st.available(), int32(2); got != want {
+		t.Errorf("after parent setup, available = %d; want %d", got, want)
 	}
 
-	// Wait for 10, which should block, so start a background goroutine
-	// to refill it.
-	go func() {
-		time.Sleep(50 * time.Millisecond)
-		f.add(50)
-	}()
-	if got, want := f.wait(1), int32(1); got != want {
-		t.Errorf("after block, got %d; want %d", got, want)
+	st.take(2)
+	if got, want := conn.available(), int32(0); got != want {
+		t.Errorf("after taking 2, conn = %d; want %d", got, want)
 	}
-
-	if got, want := f.cur(), int32(49); got != want {
-		t.Fatalf("size = %d; want %d", got, want)
+	if got, want := st.available(), int32(0); got != want {
+		t.Errorf("after taking 2, stream = %d; want %d", got, want)
 	}
 }
 
 func TestFlowAdd(t *testing.T) {
-	f := newFlow(0)
+	var f flow
 	if !f.add(1) {
 		t.Fatal("failed to add 1")
 	}
 	if !f.add(-1) {
 		t.Fatal("failed to add -1")
 	}
-	if got, want := f.cur(), int32(0); got != want {
+	if got, want := f.available(), int32(0); got != want {
 		t.Fatalf("size = %d; want %d", got, want)
 	}
 	if !f.add(1<<31 - 1) {
 		t.Fatal("failed to add 2^31-1")
 	}
-	if got, want := f.cur(), int32(1<<31-1); got != want {
+	if got, want := f.available(), int32(1<<31-1); got != want {
 		t.Fatalf("size = %d; want %d", got, want)
 	}
 	if f.add(1) {
@@ -65,26 +52,3 @@ func TestFlowAdd(t *testing.T) {
 	}
 
 }
-
-func TestFlowClose(t *testing.T) {
-	f := newFlow(0)
-
-	// Wait for 10, which should block, so start a background goroutine
-	// to refill it.
-	go func() {
-		time.Sleep(50 * time.Millisecond)
-		f.close()
-	}()
-	gotc := make(chan int32)
-	go func() {
-		gotc <- f.wait(1)
-	}()
-	select {
-	case got := <-gotc:
-		if got != 0 {
-			t.Errorf("got %d; want 0", got)
-		}
-	case <-time.After(2 * time.Second):
-		t.Error("timeout")
-	}
-}

+ 7 - 18
http2.go

@@ -187,35 +187,24 @@ func (g gate) Done() { g <- struct{}{} }
 func (g gate) Wait() { <-g }
 
 // A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed).
-type closeWaiter struct {
-	m      sync.Mutex
-	c      sync.Cond
-	closed bool
-}
+type closeWaiter chan struct{}
 
 // Init makes a closeWaiter usable.
 // It exists because so a closeWaiter value can be placed inside a
 // larger struct and have the Mutex and Cond's memory in the same
 // allocation.
 func (cw *closeWaiter) Init() {
-	cw.c.L = &cw.m
+	*cw = make(chan struct{})
 }
 
-// Close marks the closeWwaiter as closed and unblocks any waiters.
-func (cw *closeWaiter) Close() {
-	cw.m.Lock()
-	cw.closed = true
-	cw.m.Unlock()
-	cw.c.Broadcast()
+// Close marks the closeWaiter as closed and unblocks any waiters.
+func (cw closeWaiter) Close() {
+	close(cw)
 }
 
 // Wait waits for the closeWaiter to become closed.
-func (cw *closeWaiter) Wait() {
-	cw.m.Lock()
-	defer cw.m.Unlock()
-	for !cw.closed {
-		cw.c.Wait()
-	}
+func (cw closeWaiter) Wait() {
+	<-cw
 }
 
 // bufferedWriter is a buffered writer that writes to w.

+ 49 - 68
server.go

@@ -5,6 +5,10 @@
 // Licensed under the same terms as Go itself:
 // https://code.google.com/p/go/source/browse/LICENSE
 
+// TODO: replace all <-sc.doneServing with reads from the stream's cw
+// instead, and make sure that on close we close all open
+// streams. then remove doneServing?
+
 package http2
 
 import (
@@ -152,25 +156,27 @@ func ConfigureServer(s *http.Server, conf *Server) {
 
 func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
 	sc := &serverConn{
-		srv:               srv,
-		hs:                hs,
-		conn:              c,
-		bw:                newBufferedWriter(c),
-		handler:           h,
-		streams:           make(map[uint32]*stream),
-		readFrameCh:       make(chan frameAndGate),
-		readFrameErrCh:    make(chan error, 1), // must be buffered for 1
-		wantWriteFrameCh:  make(chan frameWriteMsg, 8),
-		wroteFrameCh:      make(chan struct{}, 1), // buffered; one send in reading goroutine
-		flow:              newFlow(initialWindowSize),
-		doneServing:       make(chan struct{}),
-		advMaxStreams:     srv.maxConcurrentStreams(),
-		maxWriteFrameSize: initialMaxFrameSize,
+		srv:              srv,
+		hs:               hs,
+		conn:             c,
+		bw:               newBufferedWriter(c),
+		handler:          h,
+		streams:          make(map[uint32]*stream),
+		readFrameCh:      make(chan frameAndGate),
+		readFrameErrCh:   make(chan error, 1), // must be buffered for 1
+		wantWriteFrameCh: make(chan frameWriteMsg, 8),
+		wroteFrameCh:     make(chan struct{}, 1), // buffered; one send in reading goroutine
+		doneServing:      make(chan struct{}),
+		advMaxStreams:    srv.maxConcurrentStreams(),
+		writeSched: writeScheduler{
+			maxFrameSize: initialMaxFrameSize,
+		},
 		initialWindowSize: initialWindowSize,
 		headerTableSize:   initialHeaderTableSize,
 		serveG:            newGoroutineLock(),
 		pushEnabled:       true,
 	}
+	sc.flow.add(sc.initialWindowSize)
 	sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
 	sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
 
@@ -209,7 +215,7 @@ type serverConn struct {
 	wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
 	wroteFrameCh     chan struct{}      // from writeFrameAsync -> serve, tickles more frame writes
 	testHookCh       chan func()        // code to run on the serve loop
-	flow             *flow              // connection-wide (not stream-specific) flow control
+	flow             flow               // connection-wide (not stream-specific) flow control
 
 	// Everything following is owned by the serve loop; use serveG.check():
 	serveG                goroutineLock // used to verify funcs are on serve()
@@ -221,7 +227,6 @@ type serverConn struct {
 	curOpenStreams        uint32 // client's number of open streams
 	maxStreamID           uint32 // max ever seen
 	streams               map[uint32]*stream
-	maxWriteFrameSize     uint32
 	initialWindowSize     int32
 	headerTableSize       uint32
 	maxHeaderListSize     uint32            // zero means unknown (default)
@@ -266,7 +271,7 @@ type stream struct {
 	// immutable:
 	id   uint32
 	conn *serverConn
-	flow *flow       // limits writing from Handler to client
+	flow flow        // limits writing from Handler to client
 	body *pipe       // non-nil if expecting DATA frames
 	cw   closeWaiter // closed wait stream transitions to closed state
 
@@ -555,6 +560,8 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData,
 		return err
 	case <-sc.doneServing:
 		return errClientDisconnected
+	case <-stream.cw:
+		return errStreamBroken
 	}
 }
 
@@ -583,7 +590,7 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
 func (sc *serverConn) writeFrame(wm frameWriteMsg) {
 	sc.serveG.check()
 	// Fast path for common case:
-	if !sc.writingFrame {
+	if !sc.writingFrame && sc.writeSched.empty() {
 		sc.startFrameWrite(wm)
 		return
 	}
@@ -657,28 +664,22 @@ func (sc *serverConn) scheduleFrameWrite() {
 		})
 		return
 	}
-	if sc.writeSched.empty() && sc.needsFrameFlush {
-		sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
-		sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
-		return
-	}
-	if sc.inGoAway {
-		// No more frames after we've sent GOAWAY.
-		return
-	}
 	if sc.needToSendSettingsAck {
 		sc.needToSendSettingsAck = false
 		sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
 		return
 	}
-	if sc.writeSched.empty() {
+	if !sc.inGoAway {
+		if wm, ok := sc.writeSched.take(); ok {
+			sc.startFrameWrite(wm)
+			return
+		}
+	}
+	if sc.needsFrameFlush {
+		sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
+		sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
 		return
 	}
-	// TODO: if wm is a data frame, make sure it's not too big
-	// (because a SETTINGS frame changed our max frame size while
-	// a stream was open and writing) and cut it up into smaller
-	// bits.
-	sc.startFrameWrite(sc.writeSched.take())
 }
 
 func (sc *serverConn) goAway(code ErrCode) {
@@ -862,6 +863,7 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
 			return goAwayFlowError{}
 		}
 	}
+	sc.scheduleFrameWrite()
 	return nil
 }
 
@@ -879,6 +881,7 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
 	if ok {
 		st.gotReset = true
 		sc.closeStream(st, StreamError{f.StreamID, f.ErrCode})
+		// XXX TODO drain writeSched for that stream
 	}
 	return nil
 }
@@ -891,11 +894,10 @@ func (sc *serverConn) closeStream(st *stream, err error) {
 	st.state = stateClosed
 	sc.curOpenStreams--
 	delete(sc.streams, st.id)
-	st.flow.close()
 	if p := st.body; p != nil {
 		p.Close(err)
 	}
-	st.cw.Close() // signals Handler's CloseNotifier goroutine (if any) to send
+	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
 }
 
 func (sc *serverConn) processSettings(f *SettingsFrame) error {
@@ -936,7 +938,7 @@ func (sc *serverConn) processSetting(s Setting) error {
 	case SettingInitialWindowSize:
 		return sc.processSettingInitialWindowSize(s.Val)
 	case SettingMaxFrameSize:
-		sc.maxWriteFrameSize = s.Val
+		sc.writeSched.maxFrameSize = s.Val
 	case SettingMaxHeaderListSize:
 		sc.maxHeaderListSize = s.Val
 	default:
@@ -1049,8 +1051,8 @@ func (sc *serverConn) processHeaders(f *HeadersFrame) error {
 		conn:  sc,
 		id:    id,
 		state: stateOpen,
-		flow:  newFlow(sc.initialWindowSize),
 	}
+	st.flow.add(sc.initialWindowSize)
 	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	if f.StreamEnded() {
 		st.state = stateHalfClosedRemote
@@ -1343,13 +1345,6 @@ type responseWriterState struct {
 	closeNotifierCh chan bool  // nil until first used
 }
 
-func (rws *responseWriterState) writeData(p []byte, end bool) error {
-	rws.curWrite.streamID = rws.stream.id
-	rws.curWrite.p = p
-	rws.curWrite.endStream = end
-	return rws.stream.conn.writeDataFromHandler(rws.stream, &rws.curWrite, rws.frameWriteCh)
-}
-
 type chunkWriter struct{ rws *responseWriterState }
 
 func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
@@ -1383,34 +1378,20 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
 			contentLength: clen,
 		}, rws.frameWriteCh)
 		if endStream {
-			return
+			return 0, nil
 		}
 	}
-	if len(p) == 0 {
-		if rws.handlerDone {
-			err = rws.writeData(nil, true)
-		}
-		return
+	if len(p) == 0 && !rws.handlerDone {
+		return 0, nil
 	}
-	for len(p) > 0 {
-		chunk := p
-		if len(chunk) > handlerChunkWriteSize {
-			chunk = chunk[:handlerChunkWriteSize]
-		}
-		allowedSize := rws.stream.flow.wait(int32(len(chunk)))
-		if allowedSize == 0 {
-			return n, errStreamBroken
-		}
-		chunk = chunk[:allowedSize]
-		p = p[len(chunk):]
-		isFinal := rws.handlerDone && len(p) == 0
-		err = rws.writeData(chunk, isFinal)
-		if err != nil {
-			break
-		}
-		n += len(chunk)
+	curWrite := &rws.curWrite
+	curWrite.streamID = rws.stream.id
+	curWrite.p = p
+	curWrite.endStream = rws.handlerDone
+	if err := rws.stream.conn.writeDataFromHandler(rws.stream, curWrite, rws.frameWriteCh); err != nil {
+		return 0, err
 	}
-	return
+	return len(p), nil
 }
 
 func (w *responseWriter) Flush() {

+ 60 - 3
server_test.go

@@ -1383,6 +1383,7 @@ func TestServer_Response_Header_Flush_MidWrite(t *testing.T) {
 
 func TestServer_Response_LargeWrite(t *testing.T) {
 	const size = 1 << 20
+	const maxFrameSize = 16 << 10
 	testServerResponse(t, func(w http.ResponseWriter, r *http.Request) error {
 		n, err := w.Write(bytes.Repeat([]byte("a"), size))
 		if err != nil {
@@ -1393,6 +1394,14 @@ func TestServer_Response_LargeWrite(t *testing.T) {
 		}
 		return nil
 	}, func(st *serverTester) {
+		if err := st.fr.WriteSettings(
+			Setting{SettingInitialWindowSize, 0},
+			Setting{SettingMaxFrameSize, maxFrameSize},
+		); err != nil {
+			t.Fatal(err)
+		}
+		st.wantSettingsAck()
+
 		getSlash(st) // make the single request
 
 		// Give the handler quota to write:
@@ -1433,7 +1442,7 @@ func TestServer_Response_LargeWrite(t *testing.T) {
 		if bytes != size {
 			t.Errorf("Got %d bytes; want %d", bytes, size)
 		}
-		if want := 257; frames != want {
+		if want := int(size / maxFrameSize); frames < want || frames > want*2 {
 			t.Errorf("Got %d frames; want %d", frames, size)
 		}
 	})
@@ -1442,6 +1451,7 @@ func TestServer_Response_LargeWrite(t *testing.T) {
 // Test that the handler can't write more than the client allows
 func TestServer_Response_LargeWrite_FlowControlled(t *testing.T) {
 	const size = 1 << 20
+	const maxFrameSize = 16 << 10
 	testServerResponse(t, func(w http.ResponseWriter, r *http.Request) error {
 		w.(http.Flusher).Flush()
 		n, err := w.Write(bytes.Repeat([]byte("a"), size))
@@ -1456,13 +1466,15 @@ func TestServer_Response_LargeWrite_FlowControlled(t *testing.T) {
 		// Set the window size to something explicit for this test.
 		// It's also how much initial data we expect.
 		const initWindowSize = 123
-		if err := st.fr.WriteSettings(Setting{SettingInitialWindowSize, initWindowSize}); err != nil {
+		if err := st.fr.WriteSettings(
+			Setting{SettingInitialWindowSize, initWindowSize},
+			Setting{SettingMaxFrameSize, maxFrameSize},
+		); err != nil {
 			t.Fatal(err)
 		}
 		st.wantSettingsAck()
 
 		getSlash(st) // make the single request
-
 		defer func() { st.fr.WriteRSTStream(1, ErrCodeCancel) }()
 
 		hf := st.wantHeaders()
@@ -1494,6 +1506,51 @@ func TestServer_Response_LargeWrite_FlowControlled(t *testing.T) {
 	})
 }
 
+// Test that the handler blocked in a Write is unblocked if the server sends a RST_STREAM.
+func TestServer_Response_RST_Unblocks_LargeWrite(t *testing.T) {
+	const size = 1 << 20
+	const maxFrameSize = 16 << 10
+	testServerResponse(t, func(w http.ResponseWriter, r *http.Request) error {
+		w.(http.Flusher).Flush()
+		errc := make(chan error, 1)
+		go func() {
+			_, err := w.Write(bytes.Repeat([]byte("a"), size))
+			errc <- err
+		}()
+		select {
+		case err := <-errc:
+			if err == nil {
+				return errors.New("unexpected nil error from Write in handler")
+			}
+			return nil
+		case <-time.After(2 * time.Second):
+			return errors.New("timeout waiting for Write in handler")
+		}
+	}, func(st *serverTester) {
+		if err := st.fr.WriteSettings(
+			Setting{SettingInitialWindowSize, 0},
+			Setting{SettingMaxFrameSize, maxFrameSize},
+		); err != nil {
+			t.Fatal(err)
+		}
+		st.wantSettingsAck()
+
+		getSlash(st) // make the single request
+
+		hf := st.wantHeaders()
+		if hf.StreamEnded() {
+			t.Fatal("unexpected END_STREAM flag")
+		}
+		if !hf.HeadersEnded() {
+			t.Fatal("want END_HEADERS flag")
+		}
+
+		if err := st.fr.WriteRSTStream(1, ErrCodeCancel); err != nil {
+			t.Fatal(err)
+		}
+	})
+}
+
 func TestServer_Response_Automatic100Continue(t *testing.T) {
 	const msg = "foo"
 	const reply = "bar"

+ 66 - 10
writesched.go

@@ -29,6 +29,10 @@ type writeScheduler struct {
 	// They're sent before any stream-specific freams.
 	zero writeQueue
 
+	// maxFrameSize is the maximum size of a DATA frame
+	// we'll write.
+	maxFrameSize uint32
+
 	// sq contains the stream-specific queues, keyed by stream ID.
 	// when a stream is idle, it's deleted from the map.
 	sq map[uint32]*writeQueue
@@ -58,16 +62,16 @@ func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
 }
 
 // take returns the most important frame to write and removes it from the scheduler.
-// It is illegal to call this if the scheduler is empty.
-func (ws *writeScheduler) take() frameWriteMsg {
+// It is illegal to call this if the scheduler is empty or if there are no connection-level
+// flow control bytes available.
+func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
 	// If there any frames not associated with streams, prefer those first.
 	// These are usually SETTINGS, etc.
 	if !ws.zero.empty() {
-		return ws.zero.shift()
+		return ws.zero.shift(), true
 	}
-
 	if len(ws.sq) == 0 {
-		panic("internal error: take should only be called if non-empty")
+		return
 	}
 
 	// Next, prioritize frames on streams that aren't DATA frames (no cost).
@@ -80,20 +84,64 @@ func (ws *writeScheduler) take() frameWriteMsg {
 	// Now, all that remains are DATA frames. So pick the best one.
 	// TODO: do that. For now, pick a random one.
 	for id, q := range ws.sq {
-		return ws.takeFrom(id, q)
+		if wm, ok := ws.takeFrom(id, q); ok {
+			return wm, true
+		}
 	}
-	panic("internal error: take should only be called if non-empty")
+	return
 }
 
-func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) frameWriteMsg {
-	wm := q.shift()
+func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
+	wm = q.head()
+	// If the first item in this queue costs flow control tokens
+	// and we don't have enough, write as much as we can.
+	if wd, ok := wm.write.(*writeData); ok {
+		allowed := wm.stream.flow.available() // max we can write
+		if allowed == 0 {
+			// No quota available. Caller can try the next stream.
+			return wm, false
+		}
+		if int32(ws.maxFrameSize) < allowed {
+			allowed = int32(ws.maxFrameSize)
+		}
+		if allowed == 0 {
+			panic("internal error: ws.maxFrameSize not initialized or invalid")
+		}
+		// TODO: further restrict the allowed size, because even if
+		// the peer says it's okay to write 16MB data frames, we might
+		// want to write smaller ones to properly weight competing
+		// streams' priorities.
+		if len(wd.p) > int(allowed) {
+			wm.stream.flow.take(allowed)
+			chunk := wd.p[:allowed]
+			wd.p = wd.p[allowed:]
+			// Make up a new write message of a valid size, rather
+			// than shifting one off the queue.
+			return frameWriteMsg{
+				stream: wm.stream,
+				write: &writeData{
+					streamID: wd.streamID,
+					p:        chunk,
+					// even if the original was true, there are bytes
+					// remaining because len(wd.p) > allowed, so we
+					// know endStream is false:
+					endStream: false,
+				},
+				// completeness.  our caller is blocking on the final
+				// DATA frame, not these intermediates:
+				done: nil,
+			}, true
+		}
+	}
+
+	q.shift()
 	if q.empty() {
 		// TODO: reclaim its slice and use it for future allocations
 		// in the writeScheduler.streamQueue method above when making
 		// the writeQueue.
 		delete(ws.sq, id)
 	}
-	return wm
+	return wm, true
 }
 
 type writeQueue struct {
@@ -106,6 +154,14 @@ func (q *writeQueue) push(wm frameWriteMsg) {
 	q.s = append(q.s, wm)
 }
 
+// head returns the next item that would be removed by shift.
+func (q *writeQueue) head() frameWriteMsg {
+	if len(q.s) == 0 {
+		panic("invalid use of queue")
+	}
+	return q.s[0]
+}
+
 func (q *writeQueue) shift() frameWriteMsg {
 	if len(q.s) == 0 {
 		panic("invalid use of queue")