|
|
@@ -56,16 +56,16 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
|
|
|
var initOnce sync.Once
|
|
|
|
|
|
func createTable(s *Session, table string) error {
|
|
|
- err := s.Query(table).Consistency(All).Exec()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
+ q := s.Query(table)
|
|
|
c := s.pool.Pick(nil)
|
|
|
if c == nil {
|
|
|
return ErrNoConnections
|
|
|
}
|
|
|
|
|
|
+ if err := c.executeQuery(q).Close(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
return c.awaitSchemaAgreement()
|
|
|
}
|
|
|
|
|
|
@@ -111,7 +111,9 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
|
|
|
tb.Fatal(err)
|
|
|
}
|
|
|
|
|
|
- time.Sleep(1 * time.Second)
|
|
|
+ if err = conn.awaitSchemaAgreement(); err != nil {
|
|
|
+ tb.Fatal(err)
|
|
|
+ }
|
|
|
|
|
|
query := session.Query(fmt.Sprintf(`CREATE KEYSPACE %s
|
|
|
WITH replication = {
|
|
|
@@ -127,8 +129,9 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
|
|
|
// cluster to settle.
|
|
|
// TODO(zariel): use events here to know when the cluster has resolved to the
|
|
|
// new schema version
|
|
|
- time.Sleep(5 * time.Second)
|
|
|
- return nil
|
|
|
+ if err = conn.awaitSchemaAgreement(); err != nil {
|
|
|
+ tb.Fatal(err)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
|