ソースを参照

conn: implement write coalescing (#1175)

* conn: implement write coalescing

Add an option to switch on write coalescing outgoing requests using
net.Buffers. This can be switched off via cluster config and the
interval to buffer for can be adjusted.

* review comments
Chris Bannister 7 年 前
コミット
fccc308274
3 ファイル変更139 行追加9 行削除
  1. 8 0
      cluster.go
  2. 93 8
      conn.go
  3. 38 1
      conn_test.go

+ 8 - 0
cluster.go

@@ -136,6 +136,13 @@ type ClusterConfig struct {
 	// Default idempotence for queries
 	DefaultIdempotence bool
 
+	// The time to wait for frames before flushing the frames connection to Cassandra.
+	// Can help reduce syscall overhead by making less calls to write. Set to 0 to
+	// disable.
+	//
+	// (default: 200 microseconds)
+	WriteCoalesceWaitTime time.Duration
+
 	// internal config for testing
 	disableControlConn bool
 }
@@ -166,6 +173,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
 		ReconnectInterval:      60 * time.Second,
 		ConvictionPolicy:       &SimpleConvictionPolicy{},
 		ReconnectionPolicy:     &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
+		WriteCoalesceWaitTime:  200 * time.Microsecond,
 	}
 	return cfg
 }

+ 93 - 8
conn.go

@@ -124,8 +124,11 @@ var TimeoutLimit int64 = 0
 // queries, but users are usually advised to use a more reliable, higher
 // level API.
 type Conn struct {
-	conn          net.Conn
-	r             *bufio.Reader
+	conn net.Conn
+	r    *bufio.Reader
+
+	w *writeCoalescer
+
 	timeout       time.Duration
 	cfg           *ConnConfig
 	frameObserver FrameHeaderObserver
@@ -215,9 +218,9 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
 		cancel func()
 	)
 	if cfg.ConnectTimeout > 0 {
-		ctx, cancel = context.WithTimeout(context.Background(), cfg.ConnectTimeout)
+		ctx, cancel = context.WithTimeout(context.TODO(), cfg.ConnectTimeout)
 	} else {
-		ctx, cancel = context.WithCancel(context.Background())
+		ctx, cancel = context.WithCancel(context.TODO())
 	}
 	defer cancel()
 
@@ -257,17 +260,47 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
 		return nil, errors.New("gocql: no response to connection startup within timeout")
 	}
 
+	// dont coalesce startup frames
+	if s.cfg.WriteCoalesceWaitTime > 0 {
+		w := &writeCoalescer{
+			w:       conn,
+			timeout: cfg.Timeout,
+		}
+		w.cond = sync.NewCond(&w.mu)
+		c.w = w
+		go c.writeFlusher()
+	}
 	go c.serve()
 
 	return c, nil
 }
 
-func (c *Conn) Write(p []byte) (int, error) {
-	if c.timeout > 0 {
-		c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
+func (c *Conn) writeFlusher() {
+	ticker := time.NewTicker(c.session.cfg.WriteCoalesceWaitTime)
+	defer ticker.Stop()
+	defer c.w.flush()
+
+	for {
+		select {
+		case <-c.quit:
+			return
+		case <-ticker.C:
+		}
+
+		c.w.flush()
 	}
+}
 
-	return c.conn.Write(p)
+func (c *Conn) Write(p []byte) (n int, err error) {
+	if c.w != nil {
+		n, err = c.w.write(p)
+	} else {
+		if c.timeout > 0 {
+			c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
+		}
+		n, err = c.conn.Write(p)
+	}
+	return n, err
 }
 
 func (c *Conn) Read(p []byte) (n int, err error) {
@@ -584,6 +617,58 @@ type callReq struct {
 	timer *time.Timer
 }
 
+type writeCoalescer struct {
+	w       io.Writer
+	timeout time.Duration
+
+	cond    *sync.Cond
+	mu      sync.Mutex
+	buffers net.Buffers
+
+	// result of the write
+	err error
+}
+
+func (w *writeCoalescer) flush() {
+	if w.timeout > 0 {
+		type deadliner interface {
+			SetWriteDeadline(time.Time) error
+		}
+		w.w.(deadliner).SetWriteDeadline(time.Now().Add(w.timeout))
+	}
+
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if len(w.buffers) == 0 {
+		return
+	}
+
+	// Given we are going to do a fanout n is useless and according to
+	// the docs WriteTo should return 0 and err or bytes written and
+	// no error.
+	_, w.err = w.buffers.WriteTo(w.w)
+	if w.err != nil {
+		w.buffers = nil
+	}
+	w.cond.Broadcast()
+}
+
+func (w *writeCoalescer) write(p []byte) (int, error) {
+	w.mu.Lock()
+	w.buffers = append(w.buffers, p)
+	for len(w.buffers) != 0 {
+		w.cond.Wait()
+	}
+
+	err := w.err
+	w.mu.Unlock()
+
+	if err != nil {
+		return 0, err
+	}
+	return len(p), nil
+}
+
 func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*framer, error) {
 	// TODO: move tracer onto conn
 	stream, ok := c.streams.GetStream()

+ 38 - 1
conn_test.go

@@ -311,7 +311,7 @@ func TestCancel(t *testing.T) {
 	}()
 
 	// The query will timeout after about 1 seconds, so cancel it after a short pause
-	time.AfterFunc(20 * time.Millisecond, qry.Cancel)
+	time.AfterFunc(20*time.Millisecond, qry.Cancel)
 	wg.Wait()
 }
 
@@ -751,6 +751,43 @@ func TestContext_Timeout(t *testing.T) {
 	}
 }
 
+func TestWriteCoalescing(t *testing.T) {
+	var buf bytes.Buffer
+	w := &writeCoalescer{
+		w: &buf,
+	}
+	w.cond = sync.NewCond(&w.mu)
+
+	var wg sync.WaitGroup
+
+	wg.Add(1)
+	go func() {
+		wg.Done()
+		if _, err := w.write([]byte("one")); err != nil {
+			t.Error(err)
+		}
+	}()
+	wg.Wait()
+
+	wg.Add(1)
+	go func() {
+		wg.Done()
+		if _, err := w.write([]byte("two")); err != nil {
+			t.Error(err)
+		}
+	}()
+	wg.Wait()
+
+	if buf.Len() != 0 {
+		t.Fatalf("expected buffer to be empty have: %v", buf.String())
+	}
+
+	w.flush()
+	if got := buf.String(); got != "onetwo" {
+		t.Fatalf("expected to get %q got %q", "onetwo", got)
+	}
+}
+
 type recordingFrameHeaderObserver struct {
 	t      *testing.T
 	mu     sync.Mutex