|
@@ -191,40 +191,6 @@ func TestQueryRetry(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func TestConnClosing(t *testing.T) {
|
|
|
|
|
- t.Skip("Skipping until test can be ran reliably")
|
|
|
|
|
-
|
|
|
|
|
- srv := NewTestServer(t, protoVersion2)
|
|
|
|
|
- defer srv.Stop()
|
|
|
|
|
-
|
|
|
|
|
- db, err := NewCluster(srv.Address).CreateSession()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatalf("NewCluster: %v", err)
|
|
|
|
|
- }
|
|
|
|
|
- defer db.Close()
|
|
|
|
|
-
|
|
|
|
|
- numConns := db.cfg.NumConns
|
|
|
|
|
- count := db.cfg.NumStreams * numConns
|
|
|
|
|
-
|
|
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
|
|
- wg.Add(count)
|
|
|
|
|
- for i := 0; i < count; i++ {
|
|
|
|
|
- go func(wg *sync.WaitGroup) {
|
|
|
|
|
- wg.Done()
|
|
|
|
|
- db.Query("kill").Exec()
|
|
|
|
|
- }(wg)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
-
|
|
|
|
|
- time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
|
|
|
|
|
- conns := db.pool.Size()
|
|
|
|
|
-
|
|
|
|
|
- if conns != numConns {
|
|
|
|
|
- t.Errorf("Expected to have %d connections but have %d", numConns, conns)
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func TestStreams_Protocol1(t *testing.T) {
|
|
func TestStreams_Protocol1(t *testing.T) {
|
|
|
srv := NewTestServer(t, protoVersion1)
|
|
srv := NewTestServer(t, protoVersion1)
|
|
|
defer srv.Stop()
|
|
defer srv.Stop()
|
|
@@ -242,7 +208,7 @@ func TestStreams_Protocol1(t *testing.T) {
|
|
|
defer db.Close()
|
|
defer db.Close()
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
|
- for i := 1; i < db.cfg.NumStreams; i++ {
|
|
|
|
|
|
|
+ for i := 1; i < 128; i++ {
|
|
|
// here were just validating that if we send NumStream request we get
|
|
// here were just validating that if we send NumStream request we get
|
|
|
// a response for every stream and the lengths for the queries are set
|
|
// a response for every stream and the lengths for the queries are set
|
|
|
// correctly.
|
|
// correctly.
|
|
@@ -257,33 +223,6 @@ func TestStreams_Protocol1(t *testing.T) {
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func TestStreams_Protocol2(t *testing.T) {
|
|
|
|
|
- srv := NewTestServer(t, protoVersion2)
|
|
|
|
|
- defer srv.Stop()
|
|
|
|
|
-
|
|
|
|
|
- // TODO: these are more like session tests and should instead operate
|
|
|
|
|
- // on a single Conn
|
|
|
|
|
- cluster := NewCluster(srv.Address)
|
|
|
|
|
- cluster.NumConns = 1
|
|
|
|
|
- cluster.ProtoVersion = 2
|
|
|
|
|
-
|
|
|
|
|
- db, err := cluster.CreateSession()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
- defer db.Close()
|
|
|
|
|
-
|
|
|
|
|
- for i := 1; i < db.cfg.NumStreams; i++ {
|
|
|
|
|
- // the test server processes each conn synchronously
|
|
|
|
|
- // here were just validating that if we send NumStream request we get
|
|
|
|
|
- // a response for every stream and the lengths for the queries are set
|
|
|
|
|
- // correctly.
|
|
|
|
|
- if err = db.Query("void").Exec(); err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func TestStreams_Protocol3(t *testing.T) {
|
|
func TestStreams_Protocol3(t *testing.T) {
|
|
|
srv := NewTestServer(t, protoVersion3)
|
|
srv := NewTestServer(t, protoVersion3)
|
|
|
defer srv.Stop()
|
|
defer srv.Stop()
|
|
@@ -300,7 +239,7 @@ func TestStreams_Protocol3(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
defer db.Close()
|
|
defer db.Close()
|
|
|
|
|
|
|
|
- for i := 1; i < db.cfg.NumStreams; i++ {
|
|
|
|
|
|
|
+ for i := 1; i < 32768; i++ {
|
|
|
// the test server processes each conn synchronously
|
|
// the test server processes each conn synchronously
|
|
|
// here were just validating that if we send NumStream request we get
|
|
// here were just validating that if we send NumStream request we get
|
|
|
// a response for every stream and the lengths for the queries are set
|
|
// a response for every stream and the lengths for the queries are set
|
|
@@ -472,6 +411,10 @@ func TestQueryTimeout(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestQueryTimeoutReuseStream(t *testing.T) {
|
|
func TestQueryTimeoutReuseStream(t *testing.T) {
|
|
|
|
|
+ t.Skip("no longer tests anything")
|
|
|
|
|
+ // TODO(zariel): move this to conn test, we really just want to check what
|
|
|
|
|
+ // happens when a conn is
|
|
|
|
|
+
|
|
|
srv := NewTestServer(t, defaultProto)
|
|
srv := NewTestServer(t, defaultProto)
|
|
|
defer srv.Stop()
|
|
defer srv.Stop()
|
|
|
|
|
|
|
@@ -480,7 +423,6 @@ func TestQueryTimeoutReuseStream(t *testing.T) {
|
|
|
// timely manner.
|
|
// timely manner.
|
|
|
cluster.Timeout = 1 * time.Millisecond
|
|
cluster.Timeout = 1 * time.Millisecond
|
|
|
cluster.NumConns = 1
|
|
cluster.NumConns = 1
|
|
|
- cluster.NumStreams = 1
|
|
|
|
|
|
|
|
|
|
db, err := cluster.CreateSession()
|
|
db, err := cluster.CreateSession()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -505,7 +447,6 @@ func TestQueryTimeoutClose(t *testing.T) {
|
|
|
// timely manner.
|
|
// timely manner.
|
|
|
cluster.Timeout = 1000 * time.Millisecond
|
|
cluster.Timeout = 1000 * time.Millisecond
|
|
|
cluster.NumConns = 1
|
|
cluster.NumConns = 1
|
|
|
- cluster.NumStreams = 1
|
|
|
|
|
|
|
|
|
|
db, err := cluster.CreateSession()
|
|
db, err := cluster.CreateSession()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -532,54 +473,6 @@ func TestQueryTimeoutClose(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func TestExecPanic(t *testing.T) {
|
|
|
|
|
- t.Skip("test can cause unrelated failures, skipping until it can be fixed.")
|
|
|
|
|
- srv := NewTestServer(t, defaultProto)
|
|
|
|
|
- defer srv.Stop()
|
|
|
|
|
-
|
|
|
|
|
- cluster := NewCluster(srv.Address)
|
|
|
|
|
- // Set the timeout arbitrarily low so that the query hits the timeout in a
|
|
|
|
|
- // timely manner.
|
|
|
|
|
- cluster.Timeout = 5 * time.Millisecond
|
|
|
|
|
- cluster.NumConns = 1
|
|
|
|
|
- // cluster.NumStreams = 1
|
|
|
|
|
-
|
|
|
|
|
- db, err := cluster.CreateSession()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
- defer db.Close()
|
|
|
|
|
-
|
|
|
|
|
- streams := db.cfg.NumStreams
|
|
|
|
|
-
|
|
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
|
|
- wg.Add(streams)
|
|
|
|
|
- for i := 0; i < streams; i++ {
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer wg.Done()
|
|
|
|
|
- q := db.Query("void")
|
|
|
|
|
- for {
|
|
|
|
|
- if err := q.Exec(); err != nil {
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
-
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer wg.Done()
|
|
|
|
|
- for i := 0; i < int(TimeoutLimit); i++ {
|
|
|
|
|
- db.Query("timeout").Exec()
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
-
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
-
|
|
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func NewTestServer(t testing.TB, protocol uint8) *TestServer {
|
|
func NewTestServer(t testing.TB, protocol uint8) *TestServer {
|
|
|
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
|
|
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
|
|
|
if err != nil {
|
|
if err != nil {
|