|
|
@@ -266,8 +266,9 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
|
|
|
// dont coalesce startup frames
|
|
|
if s.cfg.WriteCoalesceWaitTime > 0 {
|
|
|
w := &writeCoalescer{
|
|
|
- cond: sync.NewCond(&sync.Mutex{}),
|
|
|
- w: c.w,
|
|
|
+ fcond: sync.NewCond(&sync.Mutex{}),
|
|
|
+ cond: sync.NewCond(&sync.Mutex{}),
|
|
|
+ w: c.w,
|
|
|
}
|
|
|
go w.writeFlusher(s.cfg.WriteCoalesceWaitTime, c.quit)
|
|
|
c.w = w
|
|
|
@@ -614,6 +615,11 @@ func (c *deadlineWriter) Write(p []byte) (int, error) {
|
|
|
type writeCoalescer struct {
|
|
|
w io.Writer
|
|
|
|
|
|
+ // fcond waits for a new write to start the flush loop
|
|
|
+ fcond *sync.Cond
|
|
|
+ running bool
|
|
|
+
|
|
|
+ // cond waits for the buffer to be flushed
|
|
|
cond *sync.Cond
|
|
|
buffers net.Buffers
|
|
|
|
|
|
@@ -640,6 +646,14 @@ func (w *writeCoalescer) flush() {
|
|
|
}
|
|
|
|
|
|
func (w *writeCoalescer) Write(p []byte) (int, error) {
|
|
|
+ // TODO: use atomics for this?
|
|
|
+ w.fcond.L.Lock()
|
|
|
+ if !w.running {
|
|
|
+ w.running = true
|
|
|
+ w.fcond.Broadcast()
|
|
|
+ }
|
|
|
+ w.fcond.L.Unlock()
|
|
|
+
|
|
|
w.cond.L.Lock()
|
|
|
w.buffers = append(w.buffers, p)
|
|
|
for len(w.buffers) != 0 {
|
|
|
@@ -656,18 +670,34 @@ func (w *writeCoalescer) Write(p []byte) (int, error) {
|
|
|
}
|
|
|
|
|
|
func (w *writeCoalescer) writeFlusher(interval time.Duration, quit chan struct{}) {
|
|
|
- ticker := time.NewTicker(interval)
|
|
|
- defer ticker.Stop()
|
|
|
+ timer := time.NewTimer(interval)
|
|
|
+ defer timer.Stop()
|
|
|
defer w.flush()
|
|
|
|
|
|
+ if !timer.Stop() {
|
|
|
+ <-timer.C
|
|
|
+ }
|
|
|
+
|
|
|
for {
|
|
|
+ // wait for a write to start the flush loop
|
|
|
+ w.fcond.L.Lock()
|
|
|
+ for !w.running {
|
|
|
+ w.fcond.Wait()
|
|
|
+ }
|
|
|
+ w.fcond.L.Unlock()
|
|
|
+ timer.Reset(interval)
|
|
|
+
|
|
|
select {
|
|
|
case <-quit:
|
|
|
return
|
|
|
- case <-ticker.C:
|
|
|
+ case <-timer.C:
|
|
|
}
|
|
|
|
|
|
+ w.fcond.L.Lock()
|
|
|
w.flush()
|
|
|
+
|
|
|
+ w.running = false
|
|
|
+ w.fcond.L.Unlock()
|
|
|
}
|
|
|
}
|
|
|
|