Преглед изворни кода

repair write coalescing (#1245)

* repair write coalescing

Current write coalescing depends on the idea that the passed in
connection can do the writev syscall.

In Go only net.Conns implemented by the net package have this property,
since that the mechanism requires an implementation of a private method,
checked against an interface private to the net package.

So in practise only real TCP connections implement that.

TLS connections cannot do that either, but that is not handled here.

To get a real tcp connection, I broke up the abstraction of the
deadlineWriter and put the effect of it into the flush call.

To make it clearer that we really need a network connection, I changed
the interface passed in to the clearer net.Conn.

This is just the minimal fix to get writev back. A little cleanup work
is needed to.

Without this fix, we get only delayed writes which later happen
in quick succession instead of coalesced writes,
which adds only latency without any beneficial effect.

Coalescing tests needed adaption, since we really need a net.Conn now.

* tls and write coalescing doesn't work together

since write coalescing as implemented in Go can only work on connections
directly implemented in the net package of the Go stdlib. No wrappers
providing framing like TLS are allowed. Using anything else but a pure
TCP connection will lead to delayed writing of small packets instead of
write coalescing via writev.

This simply disables coalescing transparently in that case to avoid
unwanted delays.

I would have loved to return an error instead, but write coalescing is
on by default and I didn't want to suddenly return errors with the defaults.
Ingo Oeser пре 6 година
родитељ
комит
93aced8f23
3 измењених фајлова са 109 додато и 17 уклоњено
  1. 11 5
      conn.go
  2. 88 3
      conn_test.go
  3. 10 9
      connectionpool.go

+ 11 - 5
conn.go

@@ -252,7 +252,7 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
 
 	// dont coalesce startup frames
 	if s.cfg.WriteCoalesceWaitTime > 0 && !cfg.disableCoalesce {
-		c.w = newWriteCoalescer(c.w, s.cfg.WriteCoalesceWaitTime, c.quit)
+		c.w = newWriteCoalescer(conn, c.timeout, s.cfg.WriteCoalesceWaitTime, c.quit)
 	}
 
 	go c.serve()
@@ -652,19 +652,20 @@ func (c *deadlineWriter) Write(p []byte) (int, error) {
 	return c.w.Write(p)
 }
 
-func newWriteCoalescer(w io.Writer, d time.Duration, quit <-chan struct{}) *writeCoalescer {
+func newWriteCoalescer(conn net.Conn, timeout time.Duration, d time.Duration, quit <-chan struct{}) *writeCoalescer {
 	wc := &writeCoalescer{
 		writeCh: make(chan struct{}), // TODO: could this be sync?
 		cond:    sync.NewCond(&sync.Mutex{}),
-		w:       w,
+		c:       conn,
 		quit:    quit,
+		timeout: timeout,
 	}
 	go wc.writeFlusher(d)
 	return wc
 }
 
 type writeCoalescer struct {
-	w io.Writer
+	c net.Conn
 
 	quit    <-chan struct{}
 	writeCh chan struct{}
@@ -673,6 +674,7 @@ type writeCoalescer struct {
 	// cond waits for the buffer to be flushed
 	cond    *sync.Cond
 	buffers net.Buffers
+	timeout time.Duration
 
 	// result of the write
 	err error
@@ -684,10 +686,14 @@ func (w *writeCoalescer) flushLocked() {
 		return
 	}
 
+	if w.timeout > 0 {
+		w.c.SetWriteDeadline(time.Now().Add(w.timeout))
+	}
+
 	// Given we are going to do a fanout n is useless and according to
 	// the docs WriteTo should return 0 and err or bytes written and
 	// no error.
-	_, w.err = w.buffers.WriteTo(w.w)
+	_, w.err = w.buffers.WriteTo(w.c)
 	if w.err != nil {
 		w.buffers = nil
 	}

+ 88 - 3
conn_test.go

@@ -804,13 +804,78 @@ func TestContext_Timeout(t *testing.T) {
 	}
 }
 
+// tcpConnPair returns a matching set of a TCP client side and server side connection.
+func tcpConnPair() (s, c net.Conn, err error) {
+	l, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		// maybe ipv6 works, if ipv4 fails?
+		l, err = net.Listen("tcp6", "[::1]:0")
+		if err != nil {
+			return nil, nil, err
+		}
+	}
+	defer l.Close() // we only try to accept one connection, so will stop listening.
+
+	addr := l.Addr()
+	done := make(chan struct{})
+	var errDial error
+	go func(done chan<- struct{}) {
+		c, errDial = net.Dial(addr.Network(), addr.String())
+		close(done)
+	}(done)
+
+	s, err = l.Accept()
+	<-done
+
+	if err == nil {
+		err = errDial
+	}
+
+	if err != nil {
+		if s != nil {
+			s.Close()
+		}
+		if c != nil {
+			c.Close()
+		}
+	}
+
+	return s, c, err
+}
+
 func TestWriteCoalescing(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
+	server, client, err := tcpConnPair()
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	var buf bytes.Buffer
+	done := make(chan struct{}, 1)
+	var (
+		buf      bytes.Buffer
+		bufMutex sync.Mutex
+	)
+	go func() {
+		defer close(done)
+		defer server.Close()
+		var err error
+		b := make([]byte, 256)
+		var n int
+		for {
+			if n, err = server.Read(b); err != nil {
+				break
+			}
+			bufMutex.Lock()
+			buf.Write(b[:n])
+			bufMutex.Unlock()
+		}
+		if err != io.EOF {
+			t.Errorf("unexpected read error: %v", err)
+		}
+	}()
 	w := &writeCoalescer{
-		w:       &buf,
+		c:       client,
 		writeCh: make(chan struct{}),
 		cond:    sync.NewCond(&sync.Mutex{}),
 		quit:    ctx.Done(),
@@ -829,9 +894,11 @@ func TestWriteCoalescing(t *testing.T) {
 		}
 	}()
 
+	bufMutex.Lock()
 	if buf.Len() != 0 {
 		t.Fatalf("expected buffer to be empty have: %v", buf.String())
 	}
+	bufMutex.Unlock()
 
 	for true {
 		w.cond.L.Lock()
@@ -843,6 +910,9 @@ func TestWriteCoalescing(t *testing.T) {
 	}
 
 	w.flush()
+	client.Close()
+	<-done
+
 	if got := buf.String(); got != "onetwo" && got != "twoone" {
 		t.Fatalf("expected to get %q got %q", "onetwo or twoone", got)
 	}
@@ -853,19 +923,34 @@ func TestWriteCoalescing_WriteAfterClose(t *testing.T) {
 	defer cancel()
 
 	var buf bytes.Buffer
-	w := newWriteCoalescer(&buf, 5*time.Millisecond, ctx.Done())
+	defer cancel()
+	server, client, err := tcpConnPair()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	done := make(chan struct{}, 1)
+	go func() {
+		io.Copy(&buf, server)
+		server.Close()
+		close(done)
+	}()
+	w := newWriteCoalescer(client, 0, 5*time.Millisecond, ctx.Done())
 
 	// ensure 1 write works
 	if _, err := w.Write([]byte("one")); err != nil {
 		t.Fatal(err)
 	}
 
+	client.Close()
+	<-done
 	if v := buf.String(); v != "one" {
 		t.Fatalf("expected buffer to be %q got %q", "one", v)
 	}
 
 	// now close and do a write, we should error
 	cancel()
+	client.Close() // close client conn too, since server won't see the answer anyway.
 
 	if _, err := w.Write([]byte("two")); err == nil {
 		t.Fatal("expected to get error for write after closing")

+ 10 - 9
connectionpool.go

@@ -90,15 +90,16 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
 	}
 
 	return &ConnConfig{
-		ProtoVersion:   cfg.ProtoVersion,
-		CQLVersion:     cfg.CQLVersion,
-		Timeout:        cfg.Timeout,
-		ConnectTimeout: cfg.ConnectTimeout,
-		Compressor:     cfg.Compressor,
-		Authenticator:  cfg.Authenticator,
-		AuthProvider:   cfg.AuthProvider,
-		Keepalive:      cfg.SocketKeepalive,
-		tlsConfig:      tlsConfig,
+		ProtoVersion:    cfg.ProtoVersion,
+		CQLVersion:      cfg.CQLVersion,
+		Timeout:         cfg.Timeout,
+		ConnectTimeout:  cfg.ConnectTimeout,
+		Compressor:      cfg.Compressor,
+		Authenticator:   cfg.Authenticator,
+		AuthProvider:    cfg.AuthProvider,
+		Keepalive:       cfg.SocketKeepalive,
+		tlsConfig:       tlsConfig,
+		disableCoalesce: tlsConfig != nil, // write coalescing doesn't work with framing on top of TCP like in TLS.
 	}, nil
 }