Преглед изворни кода

Merge pull request #496 from Zariel/schema-consistentency

Add method to await for schema to become consistent
Chris Bannister пре 10 година
родитељ
комит
7566057662
4 измењених фајлова са 85 додато и 40 уклоњено
  1. 1 1
      .travis.yml
  2. 23 31
      cassandra_test.go
  3. 60 0
      conn.go
  4. 1 8
      integration.sh

+ 1 - 1
.travis.yml

@@ -39,7 +39,7 @@ install:
 script:
   - set -e
   - go test -v -tags unit
-  - PATH=$PATH:$HOME/.local/bin bash -x integration.sh $CASS $AUTH
+  - PATH=$PATH:$HOME/.local/bin bash integration.sh $CASS $AUTH
   - go vet .
 
 notifications:

+ 23 - 31
cassandra_test.go

@@ -32,6 +32,7 @@ var (
 	flagRunSslTest   = flag.Bool("runssl", false, "Set to true to run ssl test")
 	flagRunAuthTest  = flag.Bool("runauth", false, "Set to true to run authentication test")
 	flagCompressTest = flag.String("compressor", "", "compressor to use")
+	flagTimeout      = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
 	clusterHosts     []string
 )
 
@@ -56,19 +57,24 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
 var initOnce sync.Once
 
 func createTable(s *Session, table string) error {
-	err := s.Query(table).Consistency(All).Exec()
-	if *clusterSize > 1 {
-		// wait for table definition to propogate
-		time.Sleep(1 * time.Second)
+	q := s.Query(table)
+	c := s.pool.Pick(nil)
+	if c == nil {
+		return ErrNoConnections
+	}
+
+	if err := c.executeQuery(q).Close(); err != nil {
+		return err
 	}
-	return err
+
+	return c.awaitSchemaAgreement()
 }
 
 func createCluster() *ClusterConfig {
 	cluster := NewCluster(clusterHosts...)
 	cluster.ProtoVersion = *flagProto
 	cluster.CQLVersion = *flagCQL
-	cluster.Timeout = 5 * time.Second
+	cluster.Timeout = *flagTimeout
 	cluster.Consistency = Quorum
 	if *flagRetry > 0 {
 		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
@@ -106,6 +112,10 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 		tb.Fatal(err)
 	}
 
+	if err = conn.awaitSchemaAgreement(); err != nil {
+		tb.Fatal(err)
+	}
+
 	query := session.Query(fmt.Sprintf(`CREATE KEYSPACE %s
 	WITH replication = {
 		'class' : 'SimpleStrategy',
@@ -120,9 +130,9 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	// cluster to settle.
 	// TODO(zariel): use events here to know when the cluster has resolved to the
 	// new schema version
-	time.Sleep(5 * time.Millisecond)
-
-	tb.Logf("Created keyspace %s", keyspace)
+	if err = conn.awaitSchemaAgreement(); err != nil {
+		tb.Fatal(err)
+	}
 }
 
 func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
@@ -2021,24 +2031,9 @@ func TestTokenAwareConnPool(t *testing.T) {
 	cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
 	cluster.DiscoverHosts = true
 
-	// Drop and re-create the keyspace once. Different tests should use their own
-	// individual tables, but can assume that the table does not exist before.
-	initOnce.Do(func() {
-		createKeyspace(t, cluster, "gocql_test")
-	})
-
-	cluster.Keyspace = "gocql_test"
-	session, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal("createSession:", err)
-	}
+	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if *clusterSize > 1 {
-		// wait for autodiscovery to update the pool with the list of known hosts
-		time.Sleep(*flagAutoWait)
-	}
-
 	if session.pool.Size() != cluster.NumConns*len(cluster.Hosts) {
 		t.Errorf("Expected pool size %d but was %d", cluster.NumConns*len(cluster.Hosts), session.pool.Size())
 	}
@@ -2050,14 +2045,11 @@ func TestTokenAwareConnPool(t *testing.T) {
 	if err := query.Exec(); err != nil {
 		t.Fatalf("failed to insert with err: %v", err)
 	}
+
 	query = session.Query("SELECT data FROM test_token_aware where id = ?", 42).Consistency(One)
-	iter := query.Iter()
 	var data string
-	if !iter.Scan(&data) {
-		t.Error("failed to scan data")
-	}
-	if err := iter.Close(); err != nil {
-		t.Errorf("iter failed with err: %v", err)
+	if err := query.Scan(&data); err != nil {
+		t.Error(err)
 	}
 
 	// TODO add verification that the query went to the correct host

+ 60 - 0
conn.go

@@ -839,6 +839,66 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 }
 
+func (c *Conn) awaitSchemaAgreement() (err error) {
+
+	const (
+		// TODO(zariel): if we export this make this configurable
+		maxWaitTime = 60 * time.Second
+
+		peerSchemas  = "SELECT schema_version FROM system.peers"
+		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
+	)
+
+	endDeadline := time.Now().Add(maxWaitTime)
+
+	for time.Now().Before(endDeadline) {
+		iter := c.executeQuery(&Query{
+			stmt: peerSchemas,
+			cons: One,
+		})
+
+		versions := make(map[string]struct{})
+
+		var schemaVersion string
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		iter = c.executeQuery(&Query{
+			stmt: localSchemas,
+			cons: One,
+		})
+
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		if len(versions) <= 1 {
+			return nil
+		}
+
+	cont:
+		time.Sleep(200 * time.Millisecond)
+	}
+
+	if err != nil {
+		return
+	}
+
+	// not exported
+	return errors.New("gocql: cluster schema versions not consistent")
+}
+
 type inflightPrepare struct {
 	info *resultPreparedFrame
 	err  error

+ 1 - 8
integration.sh

@@ -61,7 +61,7 @@ function run_tests() {
     	go test -v . -timeout 15s -run=TestAuthentication -tags integration -runssl -runauth -proto=$proto -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=1000ms
 	else
 
-		go test -timeout 5m -tags integration -cover -v -runssl -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=2000ms -compressor=snappy ./... | tee results.txt
+		go test -timeout 5m -tags integration -v -gocql.timeout=10s -runssl -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=2000ms -compressor=snappy ./...
 
 		if [ ${PIPESTATUS[0]} -ne 0 ]; then
 			echo "--- FAIL: ccm status follows:"
@@ -72,13 +72,6 @@ function run_tests() {
 			echo "--- FAIL: Received a non-zero exit code from the go test execution, please investigate this"
 			exit 1
 		fi
-
-		cover=`cat results.txt | grep coverage: | grep -o "[0-9]\{1,3\}" | head -n 1`
-
-		if [[ $cover -lt "55" ]]; then
-			echo "--- FAIL: expected coverage of at least 60 %, but coverage was $cover %"
-			exit 1
-		fi
 	fi
 
 	ccm remove