|
|
@@ -472,8 +472,7 @@ func TestPolicyConnPoolSSL(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestQueryTimeout(t *testing.T) {
|
|
|
- t.Skip("skipping until query timeouts are enabled")
|
|
|
- srv := NewTestServer(t, protoVersion2)
|
|
|
+ srv := NewTestServer(t, defaultProto)
|
|
|
defer srv.Stop()
|
|
|
|
|
|
cluster := NewCluster(srv.Address)
|
|
|
@@ -509,6 +508,31 @@ func TestQueryTimeout(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestQueryTimeoutReuseStream(t *testing.T) {
|
|
|
+ srv := NewTestServer(t, defaultProto)
|
|
|
+ defer srv.Stop()
|
|
|
+
|
|
|
+ cluster := NewCluster(srv.Address)
|
|
|
+ // Set the timeout arbitrarily low so that the query hits the timeout in a
|
|
|
+ // timely manner.
|
|
|
+ cluster.Timeout = 1 * time.Millisecond
|
|
|
+ cluster.NumConns = 1
|
|
|
+ cluster.NumStreams = 1
|
|
|
+
|
|
|
+ db, err := cluster.CreateSession()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("NewCluster: %v", err)
|
|
|
+ }
|
|
|
+ defer db.Close()
|
|
|
+
|
|
|
+ db.Query("slow").Exec()
|
|
|
+
|
|
|
+ err = db.Query("void").Exec()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func NewTestServer(t testing.TB, protocol uint8) *TestServer {
|
|
|
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
|
|
|
if err != nil {
|
|
|
@@ -656,6 +680,18 @@ func (srv *TestServer) process(f *framer) {
|
|
|
case "timeout":
|
|
|
<-srv.quit
|
|
|
return
|
|
|
+ case "slow":
|
|
|
+ go func() {
|
|
|
+ f.writeHeader(0, opResult, head.stream)
|
|
|
+ f.writeInt(resultKindVoid)
|
|
|
+ f.wbuf[0] = srv.protocol | 0x80
|
|
|
+ select {
|
|
|
+ case <-srv.quit:
|
|
|
+ case <-time.After(50 * time.Millisecond):
|
|
|
+ f.finishWrite()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return
|
|
|
default:
|
|
|
f.writeHeader(0, opResult, head.stream)
|
|
|
f.writeInt(resultKindVoid)
|