|
|
@@ -9,6 +9,7 @@ import (
|
|
|
"flag"
|
|
|
"reflect"
|
|
|
"speter.net/go/exp/math/dec/inf"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
@@ -248,6 +249,79 @@ func TestBatchLimit(t *testing.T) {
|
|
|
|
|
|
}
|
|
|
|
|
|
+// TestTooManyQueryArgs tests to make sure the library correctly handles the application level bug
|
|
|
+// whereby too many query arguments are passed to a query
|
|
|
+func TestTooManyQueryArgs(t *testing.T) {
|
|
|
+ if *flagProto == 1 {
|
|
|
+ t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
|
|
|
+ }
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+
|
|
|
+ if err := session.Query(`CREATE TABLE too_many_query_args (id int primary key, value int)`).Exec(); err != nil {
|
|
|
+ t.Fatal("create table:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ _, err := session.Query(`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2).Iter().SliceMap()
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ t.Fatal("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != ErrQueryArgLength {
|
|
|
+ t.Fatalf("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ batch := session.NewBatch(UnloggedBatch)
|
|
|
+ batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
|
|
|
+ err = session.ExecuteBatch(batch)
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != ErrQueryArgLength {
|
|
|
+ t.Fatalf("'INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength, but returned: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// TestNotEnoughQueryArgs tests to make sure the library correctly handles the application level bug
|
|
|
+// whereby not enough query arguments are passed to a query
|
|
|
+func TestNotEnoughQueryArgs(t *testing.T) {
|
|
|
+ if *flagProto == 1 {
|
|
|
+ t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
|
|
|
+ }
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+
|
|
|
+ if err := session.Query(`CREATE TABLE not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`).Exec(); err != nil {
|
|
|
+ t.Fatal("create table:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ _, err := session.Query(`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1).Iter().SliceMap()
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ t.Fatal("'`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != ErrQueryArgLength {
|
|
|
+ t.Fatalf("'`SELECT * FROM too_few_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength, but returned: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ batch := session.NewBatch(UnloggedBatch)
|
|
|
+ batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
|
|
|
+ err = session.ExecuteBatch(batch)
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != ErrQueryArgLength {
|
|
|
+ t.Fatalf("'INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
|
|
|
// and prevents an infinite loop of connection retries.
|
|
|
func TestCreateSessionTimeout(t *testing.T) {
|
|
|
@@ -487,9 +561,10 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
|
|
|
}
|
|
|
stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
|
|
|
conn := session.Pool.Pick(nil)
|
|
|
- conn.prepMu.Lock()
|
|
|
flight := new(inflightPrepare)
|
|
|
- conn.prep[stmt] = flight
|
|
|
+ stmtsLRU.mu.Lock()
|
|
|
+ stmtsLRU.lru.Add(conn.addr+stmt, flight)
|
|
|
+ stmtsLRU.mu.Unlock()
|
|
|
flight.info = &queryInfo{
|
|
|
id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
|
|
|
args: []ColumnInfo{ColumnInfo{
|
|
|
@@ -499,16 +574,8 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
|
|
|
TypeInfo: &TypeInfo{
|
|
|
Type: TypeVarchar,
|
|
|
},
|
|
|
- }, ColumnInfo{
|
|
|
- Keyspace: "gocql_test",
|
|
|
- Table: table,
|
|
|
- Name: "bar",
|
|
|
- TypeInfo: &TypeInfo{
|
|
|
- Type: TypeInt,
|
|
|
- },
|
|
|
}},
|
|
|
}
|
|
|
- conn.prepMu.Unlock()
|
|
|
return stmt, conn
|
|
|
}
|
|
|
|
|
|
@@ -523,6 +590,9 @@ func TestReprepareStatement(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestReprepareBatch(t *testing.T) {
|
|
|
+ if *flagProto == 1 {
|
|
|
+ t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
|
|
|
+ }
|
|
|
session := createSession(t)
|
|
|
defer session.Close()
|
|
|
stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
|
|
|
@@ -533,3 +603,88 @@ func TestReprepareBatch(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+//TestPreparedCacheEviction will make sure that the cache size is maintained
|
|
|
+func TestPreparedCacheEviction(t *testing.T) {
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+ stmtsLRU.mu.Lock()
|
|
|
+ stmtsLRU.Max(4)
|
|
|
+ stmtsLRU.mu.Unlock()
|
|
|
+
|
|
|
+ if err := session.Query("CREATE TABLE prepcachetest (id int,mod int,PRIMARY KEY (id))").Exec(); err != nil {
|
|
|
+ t.Fatalf("failed to create table with error '%v'", err)
|
|
|
+ }
|
|
|
+ //Fill the table
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
+ if err := session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", i, 10000%(i+1)).Exec(); err != nil {
|
|
|
+ t.Fatalf("insert into prepcachetest failed, err '%v'", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //Populate the prepared statement cache with select statements
|
|
|
+ var id, mod int
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
+ err := session.Query("SELECT id,mod FROM prepcachetest WHERE id = "+strconv.FormatInt(int64(i), 10)).Scan(&id, &mod)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("select from prepcachetest failed, error '%v'", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //generate an update statement to test they are prepared
|
|
|
+ err := session.Query("UPDATE prepcachetest SET mod = ? WHERE id = ?", 1, 11).Exec()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("update prepcachetest failed, error '%v'", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ //generate a delete statement to test they are prepared
|
|
|
+ err = session.Query("DELETE FROM prepcachetest WHERE id = ?", 1).Exec()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("delete from prepcachetest failed, error '%v'", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ //generate an insert statement to test they are prepared
|
|
|
+ err = session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", 3, 11).Exec()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("insert into prepcachetest failed, error '%v'", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ //Make sure the cache size is maintained
|
|
|
+ if stmtsLRU.lru.Len() != stmtsLRU.lru.MaxEntries {
|
|
|
+ t.Fatalf("expected cache size of %v, got %v", stmtsLRU.lru.MaxEntries, stmtsLRU.lru.Len())
|
|
|
+ }
|
|
|
+
|
|
|
+ //Walk through all the configured hosts and test cache retention and eviction
|
|
|
+ var selFound, insFound, updFound, delFound, selEvict bool
|
|
|
+ for i := range session.cfg.Hosts {
|
|
|
+ _, ok := stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042SELECT id,mod FROM prepcachetest WHERE id = 1")
|
|
|
+ selFound = selFound || ok
|
|
|
+
|
|
|
+ _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042INSERT INTO prepcachetest (id,mod) VALUES (?, ?)")
|
|
|
+ insFound = insFound || ok
|
|
|
+
|
|
|
+ _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042UPDATE prepcachetest SET mod = ? WHERE id = ?")
|
|
|
+ updFound = updFound || ok
|
|
|
+
|
|
|
+ _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042DELETE FROM prepcachetest WHERE id = ?")
|
|
|
+ delFound = delFound || ok
|
|
|
+
|
|
|
+ _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042SELECT id,mod FROM prepcachetest WHERE id = 0")
|
|
|
+ selEvict = selEvict || !ok
|
|
|
+
|
|
|
+ }
|
|
|
+ if !selEvict {
|
|
|
+ t.Fatalf("expected first select statement to be purged, but statement was found in the cache.")
|
|
|
+ }
|
|
|
+ if !selFound {
|
|
|
+ t.Fatalf("expected second select statement to be cached, but statement was purged or not prepared.")
|
|
|
+ }
|
|
|
+ if !insFound {
|
|
|
+ t.Fatalf("expected insert statement to be cached, but statement was purged or not prepared.")
|
|
|
+ }
|
|
|
+ if !updFound {
|
|
|
+ t.Fatalf("expected update statement to be cached, but statement was purged or not prepared.")
|
|
|
+ }
|
|
|
+ if !delFound {
|
|
|
+ t.Error("expected delete statement to be cached, but statement was purged or not prepared.")
|
|
|
+ }
|
|
|
+}
|