Pārlūkot izejas kodu

Add method to await for schema to become consistent

Wait for schema to become consistent across the cluster when
creating tables and keyspaces. Dont export this for now.

Fixes #495
Fixes #422
Chris Bannister 10 gadi atpakaļ
vecāks
revīzija
8024440659
2 mainītis faili ar 72 papildinājumiem un 29 dzēšanām
  1. 16 29
      cassandra_test.go
  2. 56 0
      conn.go

+ 16 - 29
cassandra_test.go

@@ -57,11 +57,16 @@ var initOnce sync.Once
 
 func createTable(s *Session, table string) error {
 	err := s.Query(table).Consistency(All).Exec()
-	if *clusterSize > 1 {
-		// wait for table definition to propogate
-		time.Sleep(1 * time.Second)
+	if err != nil {
+		return err
+	}
+
+	c := s.pool.Pick(nil)
+	if c == nil {
+		return ErrNoConnections
 	}
-	return err
+
+	return c.awaitSchemaAgreement()
 }
 
 func createCluster() *ClusterConfig {
@@ -120,9 +125,9 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	// cluster to settle.
 	// TODO(zariel): use events here to know when the cluster has resolved to the
 	// new schema version
-	time.Sleep(5 * time.Millisecond)
-
-	tb.Logf("Created keyspace %s", keyspace)
+	if err := conn.awaitSchemaAgreement(); err != nil {
+		tb.Fatal(err)
+	}
 }
 
 func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
@@ -2021,24 +2026,9 @@ func TestTokenAwareConnPool(t *testing.T) {
 	cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
 	cluster.DiscoverHosts = true
 
-	// Drop and re-create the keyspace once. Different tests should use their own
-	// individual tables, but can assume that the table does not exist before.
-	initOnce.Do(func() {
-		createKeyspace(t, cluster, "gocql_test")
-	})
-
-	cluster.Keyspace = "gocql_test"
-	session, err := cluster.CreateSession()
-	if err != nil {
-		t.Fatal("createSession:", err)
-	}
+	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if *clusterSize > 1 {
-		// wait for autodiscovery to update the pool with the list of known hosts
-		time.Sleep(*flagAutoWait)
-	}
-
 	if session.pool.Size() != cluster.NumConns*len(cluster.Hosts) {
 		t.Errorf("Expected pool size %d but was %d", cluster.NumConns*len(cluster.Hosts), session.pool.Size())
 	}
@@ -2050,14 +2040,11 @@ func TestTokenAwareConnPool(t *testing.T) {
 	if err := query.Exec(); err != nil {
 		t.Fatalf("failed to insert with err: %v", err)
 	}
+
 	query = session.Query("SELECT data FROM test_token_aware where id = ?", 42).Consistency(One)
-	iter := query.Iter()
 	var data string
-	if !iter.Scan(&data) {
-		t.Error("failed to scan data")
-	}
-	if err := iter.Close(); err != nil {
-		t.Errorf("iter failed with err: %v", err)
+	if err := query.Scan(&data); err != nil {
+		t.Error(err)
 	}
 
 	// TODO add verification that the query went to the correct host

+ 56 - 0
conn.go

@@ -839,6 +839,62 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 }
 
+func (c *Conn) awaitSchemaAgreement() error {
+
+	const (
+		// TODO(zariel): if we export this make this configurable
+		maxWaitTime = 10 * time.Second
+
+		peerSchemas  = "SELECT schema_version FROM system.peers"
+		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
+	)
+
+	endDeadline := time.Now().Add(maxWaitTime)
+
+	for time.Now().Before(endDeadline) {
+		iter := c.executeQuery(&Query{
+			stmt: peerSchemas,
+			cons: One,
+		})
+
+		versions := make(map[string]struct{})
+
+		var schemaVersion string
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err := iter.Close(); err != nil {
+			// TODO: should we keep trying?
+			return err
+		}
+
+		iter = c.executeQuery(&Query{
+			stmt: localSchemas,
+			cons: One,
+		})
+
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err := iter.Close(); err != nil {
+			return err
+		}
+
+		if len(versions) <= 1 {
+			return nil
+		}
+
+		time.Sleep(200 * time.Millisecond)
+	}
+
+	// not exported
+	return errors.New("gocql: cluster schema versions not consistent")
+}
+
 type inflightPrepare struct {
 	info *resultPreparedFrame
 	err  error