浏览代码

Merge pull request #223 from relops/ccm

Use ccm to run integration tests against a cluster
Ben Hood 11 年之前
父节点
当前提交
b3054edf17
共有 6 个文件被更改,包括 87 次插入84 次删除
  1. 11 1
      .travis.yml
  2. 58 36
      cassandra_test.go
  3. 1 0
      conn_test.go
  4. 2 2
      errors_test.go
  5. 13 43
      integration.sh
  6. 2 2
      wiki_test.go

+ 11 - 1
.travis.yml

@@ -1,14 +1,24 @@
 language: go
 
+env:
+  - CASS=1.2.18
+  - CASS=2.0.9
+
 go:
   - 1.2
   - 1.3
 
 before_script:
+  - sudo apt-get install -y libjna-java python-pip
+  - sudo pip install cql PyYAML six
   - go get code.google.com/p/go.tools/cmd/vet
+  - git clone https://github.com/pcmanus/ccm.git
+  - pushd ccm
+  - sudo ./setup.py install
+  - popd
 
 script:
-  - bash integration.sh
+  - bash integration.sh $CASS
   - go vet .
 
 notifications:

+ 58 - 36
cassandra_test.go

@@ -7,6 +7,8 @@ package gocql
 import (
 	"bytes"
 	"flag"
+	"fmt"
+	"log"
 	"math"
 	"math/big"
 	"reflect"
@@ -21,21 +23,40 @@ import (
 )
 
 var (
-	flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
-	flagProto   = flag.Int("proto", 2, "protcol version")
-	flagCQL     = flag.String("cql", "3.0.0", "CQL version")
+	flagCluster  = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
+	flagProto    = flag.Int("proto", 2, "protcol version")
+	flagCQL      = flag.String("cql", "3.0.0", "CQL version")
+	flagRF       = flag.Int("rf", 1, "replication factor for test keyspace")
+	clusterSize  = 1
+	clusterHosts []string
 )
 
+func init() {
+
+	flag.Parse()
+	clusterHosts = strings.Split(*flagCluster, ",")
+	clusterSize = len(clusterHosts)
+	log.SetFlags(log.Lshortfile | log.LstdFlags)
+}
+
 var initOnce sync.Once
 
+func createTable(s *Session, table string) error {
+	err := s.Query(table).Consistency(All).Exec()
+	if clusterSize > 1 {
+		// wait for table definition to propogate
+		time.Sleep(250 * time.Millisecond)
+	}
+	return err
+}
+
 func createSession(tb testing.TB) *Session {
-	cluster := NewCluster(strings.Split(*flagCluster, ",")...)
+	cluster := NewCluster(clusterHosts...)
 	cluster.ProtoVersion = *flagProto
 	cluster.CQLVersion = *flagCQL
-	cluster.Authenticator = PasswordAuthenticator{
-		Username: "cassandra",
-		Password: "cassandra",
-	}
+	cluster.Timeout = 5 * time.Second
+	cluster.Consistency = Quorum
+	cluster.RetryPolicy.NumRetries = 2
 
 	initOnce.Do(func() {
 		session, err := cluster.CreateSession()
@@ -47,13 +68,14 @@ func createSession(tb testing.TB) *Session {
 		if err := session.Query(`DROP KEYSPACE gocql_test`).Exec(); err != nil {
 			tb.Log("drop keyspace:", err)
 		}
-		if err := session.Query(`CREATE KEYSPACE gocql_test
+		if err := session.Query(fmt.Sprintf(`CREATE KEYSPACE gocql_test
 			WITH replication = {
 				'class' : 'SimpleStrategy',
-				'replication_factor' : 1
-			}`).Exec(); err != nil {
+				'replication_factor' : %d
+			}`, *flagRF)).Consistency(All).Exec(); err != nil {
 			tb.Fatal("create keyspace:", err)
 		}
+		tb.Log("Created keyspace")
 		session.Close()
 	})
 	cluster.Keyspace = "gocql_test"
@@ -89,7 +111,7 @@ func TestUseStatementError(t *testing.T) {
 
 //TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections
 func TestInvalidKeyspace(t *testing.T) {
-	cluster := NewCluster(strings.Split(*flagCluster, ",")...)
+	cluster := NewCluster(clusterHosts...)
 	cluster.ProtoVersion = *flagProto
 	cluster.CQLVersion = *flagCQL
 	cluster.Keyspace = "invalidKeyspace"
@@ -108,7 +130,7 @@ func TestTracing(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE trace (id int primary key)`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE trace (id int primary key)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
@@ -140,7 +162,7 @@ func TestPaging(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE paging (id int primary key)").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE paging (id int primary key)"); err != nil {
 		t.Fatal("create table:", err)
 	}
 	for i := 0; i < 100; i++ {
@@ -171,11 +193,11 @@ func TestCAS(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE cas_table (
+	if err := createTable(session, `CREATE TABLE cas_table (
 			title   varchar,
 			revid   timeuuid,
 			PRIMARY KEY (title, revid)
-		)`).Exec(); err != nil {
+		)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
@@ -210,7 +232,7 @@ func TestBatch(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE batch_table (id int primary key)`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE batch_table (id int primary key)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -239,7 +261,7 @@ func TestBatchLimit(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE batch_table2 (id int primary key)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -262,7 +284,7 @@ func TestTooManyQueryArgs(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE too_many_query_args (id int primary key, value int)`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE too_many_query_args (id int primary key, value int)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -299,7 +321,7 @@ func TestNotEnoughQueryArgs(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -347,7 +369,7 @@ func TestCreateSessionTimeout(t *testing.T) {
 func TestSliceMap(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
-	if err := session.Query(`CREATE TABLE slice_map_table (
+	if err := createTable(session, `CREATE TABLE slice_map_table (
 			testuuid       timeuuid PRIMARY KEY,
 			testtimestamp  timestamp,
 			testvarchar    varchar,
@@ -361,7 +383,7 @@ func TestSliceMap(t *testing.T) {
 			testset        set<int>,
 			testmap        map<varchar, varchar>,
 			testvarint     varint
-		)`).Exec(); err != nil {
+		)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 	m := make(map[string]interface{})
@@ -493,11 +515,11 @@ func TestScanWithNilArguments(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE scan_with_nil_arguments (
+	if err := createTable(session, `CREATE TABLE scan_with_nil_arguments (
 			foo   varchar,
 			bar   int,
 			PRIMARY KEY (foo, bar)
-	)`).Exec(); err != nil {
+	)`); err != nil {
 		t.Fatal("create:", err)
 	}
 	for i := 1; i <= 20; i++ {
@@ -529,11 +551,11 @@ func TestScanCASWithNilArguments(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE scan_cas_with_nil_arguments (
+	if err := createTable(session, `CREATE TABLE scan_cas_with_nil_arguments (
 		foo   varchar,
 		bar   varchar,
 		PRIMARY KEY (foo, bar)
-	)`).Exec(); err != nil {
+	)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
@@ -573,7 +595,7 @@ func TestRebindQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE rebind_query (id int, value text, PRIMARY KEY (id))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -613,7 +635,7 @@ func TestStaticQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE static_query_info (id int, value text, PRIMARY KEY (id))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -682,7 +704,7 @@ func TestBoundQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -725,7 +747,7 @@ func TestBatchQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -770,11 +792,11 @@ func TestBatchQueryInfo(t *testing.T) {
 }
 
 func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
-	if err := session.Query(`CREATE TABLE ` + table + ` (
+	if err := createTable(session, `CREATE TABLE `+table+` (
 			foo   varchar,
 			bar   int,
 			PRIMARY KEY (foo, bar)
-	)`).Exec(); err != nil {
+	)`); err != nil {
 		t.Fatal("create:", err)
 	}
 	stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
@@ -852,7 +874,7 @@ func TestPreparedCacheEviction(t *testing.T) {
 	stmtsLRU.Max(4)
 	stmtsLRU.mu.Unlock()
 
-	if err := session.Query("CREATE TABLE prepcachetest (id int,mod int,PRIMARY KEY (id))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 	//Fill the table
@@ -934,7 +956,7 @@ func TestMarshalFloat64Ptr(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE float_test (id double, test double, primary key (id))").Exec(); err != nil {
+	if err := createTable(session, "CREATE TABLE float_test (id double, test double, primary key (id))"); err != nil {
 		t.Fatal("create table:", err)
 	}
 	testNum := float64(7500)
@@ -947,8 +969,8 @@ func TestVarint(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query("CREATE TABLE varint_test (id varchar, test varint, test2 varint, primary key (id))").Exec(); err != nil {
-		t.Fatal("create table:", err)
+	if err := createTable(session, "CREATE TABLE varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
 	if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", 0).Exec(); err != nil {

+ 1 - 0
conn_test.go

@@ -159,6 +159,7 @@ func TestRoundRobin(t *testing.T) {
 }
 
 func TestConnClosing(t *testing.T) {
+	t.Skip("Skipping until test can be ran reliably")
 	srv := NewTestServer(t)
 	defer srv.Stop()
 

+ 2 - 2
errors_test.go

@@ -8,11 +8,11 @@ func TestErrorsParse(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := session.Query(`CREATE TABLE errors_parse (id int primary key)`).Exec(); err != nil {
+	if err := createTable(session, `CREATE TABLE errors_parse (id int primary key)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
-	if err := session.Query(`CREATE TABLE errors_parse (id int primary key)`).Exec(); err == nil {
+	if err := createTable(session, `CREATE TABLE errors_parse (id int primary key)`); err == nil {
 		t.Fatal("Should have gotten already exists error from cassandra server.")
 	} else {
 		switch e := err.(type) {

+ 13 - 43
integration.sh

@@ -2,50 +2,20 @@
 
 set -e
 
-PID_FILE=cassandra.pid
-STARTUP_LOG=startup.log
-ARCHIVE_BASE_URL=http://archive.apache.org/dist/cassandra
-
-for v in 1.2.18 2.0.9
-do
-	TARBALL=apache-cassandra-$v-bin.tar.gz
-	CASSANDRA_DIR=apache-cassandra-$v
-
-	curl -L -O $ARCHIVE_BASE_URL/$v/$TARBALL
-	
-	if [ ! -f $CASSANDRA_DIR/bin/cassandra ]
-	then
-   		tar xzf $TARBALL
+function run_tests() {
+	local version=$1
+	ccm create test -v $version -n 3 -s -d --vnodes
+	ccm status
+	ccm updateconf 'concurrent_reads: 8' 'concurrent_writes: 32' 'rpc_server_type: sync' 'rpc_min_threads: 2' 'rpc_max_threads: 8' 'write_request_timeout_in_ms: 5000' 'read_request_timeout_in_ms: 5000'
+
+	local proto=2
+	if [[ $version == 1.2.* ]]; then
+		proto=1
 	fi
-	
-	CASSANDRA_LOG_DIR=`pwd`/v${v}/log/cassandra
-	CASSANDRA_LOG=$CASSANDRA_LOG_DIR/system.log
-
-	mkdir -p $CASSANDRA_LOG_DIR
-	: >$CASSANDRA_LOG  # create an empty log file
-	
-	sed -i -e 's?/var?'`pwd`/v${v}'?' $CASSANDRA_DIR/conf/cassandra.yaml
-	sed -i -e 's?/var?'`pwd`/v${v}'?' $CASSANDRA_DIR/conf/log4j-server.properties
-
-	echo "Booting Cassandra ${v}, waiting for CQL listener to start ...."
 
-	$CASSANDRA_DIR/bin/cassandra -p $PID_FILE &> $STARTUP_LOG
+	go test -v -proto=$proto -rf=3 -cluster=$(ccm liveset) ./...
 
-	{ tail -n +1 -f $CASSANDRA_LOG & } | sed -n '/Starting listening for CQL clients/q'
-	
-	PID=$(<"$PID_FILE")
-
-	echo "Cassandra ${v} running (PID ${PID}), about to run test suite ...."
-
-	if [[ $v == 1.2.* ]]
-		then
-		go test -v ./... -proto 1
-	else
-		go test -v ./...
-	fi
+	ccm clear
+}
 
-	echo "Test suite passed against Cassandra ${v}, killing server instance (PID ${PID})"
-	
-	kill -9 $PID
-	rm $PID_FILE
-done
+run_tests $1

+ 2 - 2
wiki_test.go

@@ -58,7 +58,7 @@ func (w *WikiTest) CreateSchema() {
 	if err := w.session.Query(`DROP TABLE wiki_page`).Exec(); err != nil && err.Error() != "unconfigured columnfamily wiki_page" {
 		w.tb.Fatal("CreateSchema:", err)
 	}
-	if err := w.session.Query(`CREATE TABLE wiki_page (
+	if err := createTable(w.session, `CREATE TABLE wiki_page (
 			title       varchar,
 			revid       timeuuid,
 			body        varchar,
@@ -69,7 +69,7 @@ func (w *WikiTest) CreateSchema() {
 			tags        set<varchar>,
 			attachments map<varchar, blob>,
 			PRIMARY KEY (title, revid)
-		)`).Exec(); err != nil {
+		)`); err != nil {
 		w.tb.Fatal("CreateSchema:", err)
 	}
 }