Explorar o código

conn: recycle call query timeout timers

In conn reuse timeout timers for each call in exec.
Chris Bannister %!s(int64=9) %!d(string=hai) anos
pai
achega
1fc753f0b8
Modificáronse 2 ficheiros con 76 adicións e 9 borrados
  1. 24 9
      conn.go
  2. 52 0
      conn_test.go

+ 24 - 9
conn.go

@@ -464,14 +464,6 @@ func (c *Conn) recv() error {
 	return nil
 }
 
-type callReq struct {
-	// could use a waitgroup but this allows us to do timeouts on the read/send
-	resp     chan error
-	framer   *framer
-	timeout  chan struct{} // indicates to recv() that a call has timedout
-	streamID int           // current stream in use
-}
-
 func (c *Conn) releaseStream(stream int) {
 	c.mu.Lock()
 	call := c.calls[stream]
@@ -503,6 +495,16 @@ var (
 	}
 )
 
+type callReq struct {
+	// could use a waitgroup but this allows us to do timeouts on the read/send
+	resp     chan error
+	framer   *framer
+	timeout  chan struct{} // indicates to recv() that a call has timedout
+	streamID int           // current stream in use
+
+	timer *time.Timer
+}
+
 func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
 	// TODO: move tracer onto conn
 	stream, ok := c.streams.GetStream()
@@ -546,7 +548,20 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
 
 	var timeoutCh <-chan time.Time
 	if c.timeout > 0 {
-		timeoutCh = time.After(c.timeout)
+		if call.timer == nil {
+			call.timer = time.NewTimer(0)
+			<-call.timer.C
+		} else {
+			if !call.timer.Stop() {
+				select {
+				case <-call.timer.C:
+				default:
+				}
+			}
+		}
+
+		call.timer.Reset(c.timeout)
+		timeoutCh = call.timer.C
 	}
 
 	select {

+ 52 - 0
conn_test.go

@@ -341,6 +341,58 @@ func TestQueryTimeout(t *testing.T) {
 	}
 }
 
+func TestQueryTimeoutMany(t *testing.T) {
+	srv := NewTestServer(t, 3)
+	defer srv.Stop()
+
+	cluster := testCluster(srv.Address, 3)
+	// Set the timeout arbitrarily low so that the query hits the timeout in a
+	// timely manner.
+	cluster.Timeout = 5 * time.Millisecond
+	cluster.NumConns = 1
+
+	db, err := cluster.CreateSession()
+	if err != nil {
+		t.Fatalf("NewCluster: %v", err)
+	}
+	defer db.Close()
+
+	for i := 0; i < 128; i++ {
+		err := db.Query("void").Exec()
+		if err != nil {
+			t.Error(err)
+			return
+		}
+	}
+}
+
+func BenchmarkSingleConn(b *testing.B) {
+	srv := NewTestServer(b, 3)
+	defer srv.Stop()
+
+	cluster := testCluster(srv.Address, 3)
+	// Set the timeout arbitrarily low so that the query hits the timeout in a
+	// timely manner.
+	cluster.Timeout = 500 * time.Millisecond
+	cluster.NumConns = 1
+	db, err := cluster.CreateSession()
+	if err != nil {
+		b.Fatalf("NewCluster: %v", err)
+	}
+	defer db.Close()
+
+	b.ResetTimer()
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			err := db.Query("void").Exec()
+			if err != nil {
+				b.Error(err)
+				return
+			}
+		}
+	})
+}
+
 func TestQueryTimeoutReuseStream(t *testing.T) {
 	t.Skip("no longer tests anything")
 	// TODO(zariel): move this to conn test, we really just want to check what