Browse Source

writecoalescer: use channels to coordinate the flush loop (#1208)

Chris Bannister 7 years ago
parent
commit
fb832297d7
2 changed files with 84 additions and 33 deletions
  1. 50 30
      conn.go
  2. 34 3
      conn_test.go

+ 50 - 30
conn.go

@@ -265,13 +265,7 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
 
 	// dont coalesce startup frames
 	if s.cfg.WriteCoalesceWaitTime > 0 {
-		w := &writeCoalescer{
-			fcond: sync.NewCond(&sync.Mutex{}),
-			cond:  sync.NewCond(&sync.Mutex{}),
-			w:     c.w,
-		}
-		go w.writeFlusher(s.cfg.WriteCoalesceWaitTime, c.quit)
-		c.w = w
+		c.w = newWriteCoalescer(c.w, s.cfg.WriteCoalesceWaitTime, c.quit)
 	}
 
 	go c.serve()
@@ -612,11 +606,22 @@ func (c *deadlineWriter) Write(p []byte) (int, error) {
 	return c.w.Write(p)
 }
 
+func newWriteCoalescer(w io.Writer, d time.Duration, quit <-chan struct{}) *writeCoalescer {
+	wc := &writeCoalescer{
+		writeCh: make(chan struct{}), // TODO: could this be sync?
+		cond:    sync.NewCond(&sync.Mutex{}),
+		w:       w,
+		quit:    quit,
+	}
+	go wc.writeFlusher(d)
+	return wc
+}
+
 type writeCoalescer struct {
 	w io.Writer
 
-	// fcond waits for a new write to start the flush loop
-	fcond   *sync.Cond
+	quit    <-chan struct{}
+	writeCh chan struct{}
 	running bool
 
 	// cond waits for the buffer to be flushed
@@ -627,10 +632,8 @@ type writeCoalescer struct {
 	err error
 }
 
-func (w *writeCoalescer) flush() {
-	w.cond.L.Lock()
-	defer w.cond.L.Unlock()
-
+func (w *writeCoalescer) flushLocked() {
+	w.running = false
 	if len(w.buffers) == 0 {
 		return
 	}
@@ -645,16 +648,36 @@ func (w *writeCoalescer) flush() {
 	w.cond.Broadcast()
 }
 
+func (w *writeCoalescer) flush() {
+	w.cond.L.Lock()
+	w.flushLocked()
+	w.cond.L.Unlock()
+}
+
+func (w *writeCoalescer) stop() {
+	w.cond.L.Lock()
+	defer w.cond.L.Unlock()
+
+	w.flushLocked()
+	// nil the channel out sends block forever on it
+	// instead of closing which causes a send on closed channel
+	// panic.
+	w.writeCh = nil
+}
+
 func (w *writeCoalescer) Write(p []byte) (int, error) {
-	// TODO: use atomics for this?
-	w.fcond.L.Lock()
+	w.cond.L.Lock()
+
 	if !w.running {
-		w.running = true
-		w.fcond.Broadcast()
+		select {
+		case w.writeCh <- struct{}{}:
+			w.running = true
+		case <-w.quit:
+			w.cond.L.Unlock()
+			return 0, io.EOF // TODO: better error here?
+		}
 	}
-	w.fcond.L.Unlock()
 
-	w.cond.L.Lock()
 	w.buffers = append(w.buffers, p)
 	for len(w.buffers) != 0 {
 		w.cond.Wait()
@@ -669,10 +692,10 @@ func (w *writeCoalescer) Write(p []byte) (int, error) {
 	return len(p), nil
 }
 
-func (w *writeCoalescer) writeFlusher(interval time.Duration, quit chan struct{}) {
+func (w *writeCoalescer) writeFlusher(interval time.Duration) {
 	timer := time.NewTimer(interval)
 	defer timer.Stop()
-	defer w.flush()
+	defer w.stop()
 
 	if !timer.Stop() {
 		<-timer.C
@@ -680,24 +703,21 @@ func (w *writeCoalescer) writeFlusher(interval time.Duration, quit chan struct{}
 
 	for {
 		// wait for a write to start the flush loop
-		w.fcond.L.Lock()
-		for !w.running {
-			w.fcond.Wait()
+		select {
+		case <-w.writeCh:
+		case <-w.quit:
+			return
 		}
-		w.fcond.L.Unlock()
+
 		timer.Reset(interval)
 
 		select {
-		case <-quit:
+		case <-w.quit:
 			return
 		case <-timer.C:
 		}
 
-		w.fcond.L.Lock()
 		w.flush()
-
-		w.running = false
-		w.fcond.L.Unlock()
 	}
 }
 

+ 34 - 3
conn_test.go

@@ -752,11 +752,16 @@ func TestContext_Timeout(t *testing.T) {
 }
 
 func TestWriteCoalescing(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	var buf bytes.Buffer
 	w := &writeCoalescer{
-		w:     &buf,
-		cond:  sync.NewCond(&sync.Mutex{}),
-		fcond: sync.NewCond(&sync.Mutex{}),
+		w:       &buf,
+		writeCh: make(chan struct{}),
+		cond:    sync.NewCond(&sync.Mutex{}),
+		quit:    ctx.Done(),
+		running: true,
 	}
 
 	go func() {
@@ -790,6 +795,32 @@ func TestWriteCoalescing(t *testing.T) {
 	}
 }
 
+func TestWriteCoalescing_WriteAfterClose(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	var buf bytes.Buffer
+	w := newWriteCoalescer(&buf, 5*time.Millisecond, ctx.Done())
+
+	// ensure 1 write works
+	if _, err := w.Write([]byte("one")); err != nil {
+		t.Fatal(err)
+	}
+
+	if v := buf.String(); v != "one" {
+		t.Fatalf("expected buffer to be %q got %q", "one", v)
+	}
+
+	// now close and do a write, we should error
+	cancel()
+
+	if _, err := w.Write([]byte("two")); err == nil {
+		t.Fatal("expected to get error for write after closing")
+	} else if err != io.EOF {
+		t.Fatalf("expected to get EOF got %v", err)
+	}
+}
+
 type recordingFrameHeaderObserver struct {
 	t      *testing.T
 	mu     sync.Mutex