|
|
@@ -24,23 +24,35 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
|
|
|
- flagProto = flag.Int("proto", 2, "protcol version")
|
|
|
- flagCQL = flag.String("cql", "3.0.0", "CQL version")
|
|
|
- flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
|
|
|
- clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
|
|
|
- flagRetry = flag.Int("retries", 5, "number of times to retry queries")
|
|
|
- flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
|
|
|
- clusterHosts []string
|
|
|
+ flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
|
|
|
+ flagProto = flag.Int("proto", 2, "protcol version")
|
|
|
+ flagCQL = flag.String("cql", "3.0.0", "CQL version")
|
|
|
+ flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
|
|
|
+ clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
|
|
|
+ flagRetry = flag.Int("retries", 5, "number of times to retry queries")
|
|
|
+ flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
|
|
|
+ flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
|
|
|
+ clusterHosts []string
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
-
|
|
|
flag.Parse()
|
|
|
clusterHosts = strings.Split(*flagCluster, ",")
|
|
|
log.SetFlags(log.Lshortfile | log.LstdFlags)
|
|
|
}
|
|
|
|
|
|
+func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
|
|
|
+ if *flagRunSslTest {
|
|
|
+ cluster.SslOpts = &SslOptions{
|
|
|
+ CertPath: "testdata/pki/gocql.crt",
|
|
|
+ KeyPath: "testdata/pki/gocql.key",
|
|
|
+ CaPath: "testdata/pki/ca.crt",
|
|
|
+ EnableHostVerification: false,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return cluster
|
|
|
+}
|
|
|
+
|
|
|
var initOnce sync.Once
|
|
|
|
|
|
func createTable(s *Session, table string) error {
|
|
|
@@ -61,7 +73,7 @@ func createCluster() *ClusterConfig {
|
|
|
if *flagRetry > 0 {
|
|
|
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
|
|
|
}
|
|
|
-
|
|
|
+ cluster = addSslOptions(cluster)
|
|
|
return cluster
|
|
|
}
|
|
|
|
|
|
@@ -114,7 +126,7 @@ func TestRingDiscovery(t *testing.T) {
|
|
|
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
|
|
|
}
|
|
|
cluster.DiscoverHosts = true
|
|
|
-
|
|
|
+ cluster = addSslOptions(cluster)
|
|
|
session, err := cluster.CreateSession()
|
|
|
if err != nil {
|
|
|
t.Errorf("got error connecting to the cluster %v", err)
|
|
|
@@ -136,6 +148,7 @@ func TestRingDiscovery(t *testing.T) {
|
|
|
|
|
|
func TestEmptyHosts(t *testing.T) {
|
|
|
cluster := NewCluster()
|
|
|
+ cluster = addSslOptions(cluster)
|
|
|
if session, err := cluster.CreateSession(); err == nil {
|
|
|
session.Close()
|
|
|
t.Error("expected err, got nil")
|
|
|
@@ -162,6 +175,7 @@ func TestInvalidKeyspace(t *testing.T) {
|
|
|
cluster.ProtoVersion = *flagProto
|
|
|
cluster.CQLVersion = *flagCQL
|
|
|
cluster.Keyspace = "invalidKeyspace"
|
|
|
+ cluster = addSslOptions(cluster)
|
|
|
session, err := cluster.CreateSession()
|
|
|
if err != nil {
|
|
|
if err != ErrNoConnectionsStarted {
|
|
|
@@ -298,6 +312,48 @@ func TestCAS(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestMapScanCAS(t *testing.T) {
|
|
|
+ if *flagProto == 1 {
|
|
|
+ t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
|
|
|
+ }
|
|
|
+
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+
|
|
|
+ if err := createTable(session, `CREATE TABLE cas_table2 (
|
|
|
+ title varchar,
|
|
|
+ revid timeuuid,
|
|
|
+ last_modified timestamp,
|
|
|
+ deleted boolean,
|
|
|
+ PRIMARY KEY (title, revid)
|
|
|
+ )`); err != nil {
|
|
|
+ t.Fatal("create:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ title, revid, modified, deleted := "baz", TimeUUID(), time.Now(), false
|
|
|
+ mapCAS := map[string]interface{}{}
|
|
|
+
|
|
|
+ if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
|
|
|
+ VALUES (?, ?, ?, ?) IF NOT EXISTS`,
|
|
|
+ title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
|
|
|
+ t.Fatal("insert:", err)
|
|
|
+ } else if !applied {
|
|
|
+ t.Fatal("insert should have been applied")
|
|
|
+ }
|
|
|
+
|
|
|
+ mapCAS = map[string]interface{}{}
|
|
|
+ if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
|
|
|
+ VALUES (?, ?, ?, ?) IF NOT EXISTS`,
|
|
|
+ title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
|
|
|
+ t.Fatal("insert:", err)
|
|
|
+ } else if applied {
|
|
|
+ t.Fatal("insert should not have been applied")
|
|
|
+ } else if title != mapCAS["title"] || revid != mapCAS["revid"] || deleted != mapCAS["deleted"] {
|
|
|
+ t.Fatalf("expected %s/%v/%v/%v but got %s/%v/%v%v", title, revid, modified, false, mapCAS["title"], mapCAS["revid"], mapCAS["last_modified"], mapCAS["deleted"])
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
func TestBatch(t *testing.T) {
|
|
|
if *flagProto == 1 {
|
|
|
t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
|
|
|
@@ -430,6 +486,7 @@ func TestCreateSessionTimeout(t *testing.T) {
|
|
|
t.Fatal("no startup timeout")
|
|
|
}()
|
|
|
c := NewCluster("127.0.0.1:1")
|
|
|
+ c = addSslOptions(c)
|
|
|
_, err := c.CreateSession()
|
|
|
|
|
|
if err == nil {
|
|
|
@@ -440,6 +497,64 @@ func TestCreateSessionTimeout(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+type FullName struct {
|
|
|
+ FirstName string
|
|
|
+ LastName string
|
|
|
+}
|
|
|
+
|
|
|
+func (n FullName) MarshalCQL(info *TypeInfo) ([]byte, error) {
|
|
|
+ return []byte(n.FirstName + " " + n.LastName), nil
|
|
|
+}
|
|
|
+func (n *FullName) UnmarshalCQL(info *TypeInfo, data []byte) error {
|
|
|
+ t := strings.SplitN(string(data), " ", 2)
|
|
|
+ n.FirstName, n.LastName = t[0], t[1]
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func TestMapScanWithRefMap(t *testing.T) {
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+ if err := createTable(session, `CREATE TABLE scan_map_ref_table (
|
|
|
+ testtext text PRIMARY KEY,
|
|
|
+ testfullname text,
|
|
|
+ testint int,
|
|
|
+ )`); err != nil {
|
|
|
+ t.Fatal("create table:", err)
|
|
|
+ }
|
|
|
+ m := make(map[string]interface{})
|
|
|
+ m["testtext"] = "testtext"
|
|
|
+ m["testfullname"] = FullName{"John", "Doe"}
|
|
|
+ m["testint"] = 100
|
|
|
+
|
|
|
+ if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`, m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
|
|
|
+ t.Fatal("insert:", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ var testText string
|
|
|
+ var testFullName FullName
|
|
|
+ ret := map[string]interface{}{
|
|
|
+ "testtext": &testText,
|
|
|
+ "testfullname": &testFullName,
|
|
|
+ // testint is not set here.
|
|
|
+ }
|
|
|
+ iter := session.Query(`SELECT * FROM scan_map_ref_table`).Iter()
|
|
|
+ if ok := iter.MapScan(ret); !ok {
|
|
|
+ t.Fatal("select:", iter.Close())
|
|
|
+ } else {
|
|
|
+ if ret["testtext"] != "testtext" {
|
|
|
+ t.Fatal("returned testtext did not match")
|
|
|
+ }
|
|
|
+ f := ret["testfullname"].(FullName)
|
|
|
+ if f.FirstName != "John" || f.LastName != "Doe" {
|
|
|
+ t.Fatal("returned testfullname did not match")
|
|
|
+ }
|
|
|
+ if ret["testint"] != 100 {
|
|
|
+ t.Fatal("returned testinit did not match")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
func TestSliceMap(t *testing.T) {
|
|
|
session := createSession(t)
|
|
|
defer session.Close()
|
|
|
@@ -1328,3 +1443,27 @@ func TestNilInQuery(t *testing.T) {
|
|
|
t.Fatalf("expected id to be 1, got %v", id)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// Don't initialize time.Time bind variable if cassandra timestamp column is empty
|
|
|
+func TestEmptyTimestamp(t *testing.T) {
|
|
|
+ session := createSession(t)
|
|
|
+ defer session.Close()
|
|
|
+
|
|
|
+ if err := createTable(session, "CREATE TABLE test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
|
|
|
+ t.Fatalf("failed to create table with error '%v'", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := session.Query("INSERT INTO test_empty_timestamp (id, num) VALUES (?,?)", 1, 561).Exec(); err != nil {
|
|
|
+ t.Fatalf("failed to insert with err: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ var timeVal time.Time
|
|
|
+
|
|
|
+ if err := session.Query("SELECT time FROM test_empty_timestamp where id = ?", 1).Scan(&timeVal); err != nil {
|
|
|
+ t.Fatalf("failed to select with err: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !timeVal.IsZero() {
|
|
|
+ t.Errorf("time.Time bind variable should still be empty (was %s)", timeVal)
|
|
|
+ }
|
|
|
+}
|