Browse Source

Merge pull request #497 from Zariel/control-connection

add internal control connection
Chris Bannister 10 years ago
parent
commit
3a02a26250
15 changed files with 378 additions and 201 deletions
  1. 1 1
      cass1batch_test.go
  2. 48 59
      cassandra_test.go
  3. 3 0
      cluster.go
  4. 6 64
      conn.go
  5. 1 0
      conn_test.go
  6. 4 4
      connectionpool.go
  7. 220 0
      control.go
  8. 4 4
      errors_test.go
  9. 11 0
      frame.go
  10. 23 17
      host_source.go
  11. 8 28
      metadata.go
  12. 36 11
      session.go
  13. 1 1
      tuple_test.go
  14. 10 10
      udt_test.go
  15. 2 2
      wiki_test.go

+ 1 - 1
cass1batch_test.go

@@ -9,7 +9,7 @@ import (
 
 func TestProto1BatchInsert(t *testing.T) {
 	session := createSession(t)
-	if err := createTable(session, "CREATE TABLE large (id int primary key)"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.large (id int primary key)"); err != nil {
 		t.Fatal(err)
 	}
 	defer session.Close()

+ 48 - 59
cassandra_test.go

@@ -57,17 +57,11 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
 var initOnce sync.Once
 
 func createTable(s *Session, table string) error {
-	q := s.Query(table)
-	c := s.pool.Pick(nil)
-	if c == nil {
-		return ErrNoConnections
-	}
-
-	if err := c.executeQuery(q).Close(); err != nil {
+	if err := s.control.query(table).Close(); err != nil {
 		return err
 	}
 
-	return c.awaitSchemaAgreement()
+	return s.control.awaitSchemaAgreement()
 }
 
 func createCluster() *ClusterConfig {
@@ -101,28 +95,22 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 		tb.Fatal("createSession:", err)
 	}
 
-	// should reuse the same conn apparently
-	conn := session.pool.Pick(nil)
-	if conn == nil {
-		tb.Fatal("no connections available in the pool")
-	}
-
-	err = conn.executeQuery(session.Query(`DROP KEYSPACE IF EXISTS ` + keyspace).Consistency(All)).Close()
+	err = session.control.query(`DROP KEYSPACE IF EXISTS ` + keyspace).Close()
 	if err != nil {
 		tb.Fatal(err)
 	}
 
-	if err = conn.awaitSchemaAgreement(); err != nil {
+	if err = session.control.awaitSchemaAgreement(); err != nil {
 		tb.Fatal(err)
 	}
 
-	query := session.Query(fmt.Sprintf(`CREATE KEYSPACE %s
+	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
 	WITH replication = {
 		'class' : 'SimpleStrategy',
 		'replication_factor' : %d
-	}`, keyspace, *flagRF)).Consistency(All)
+	}`, keyspace, *flagRF)).Close()
 
-	if err = conn.executeQuery(query).Close(); err != nil {
+	if err != nil {
 		tb.Fatal(err)
 	}
 
@@ -130,7 +118,7 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	// cluster to settle.
 	// TODO(zariel): use events here to know when the cluster has resolved to the
 	// new schema version
-	if err = conn.awaitSchemaAgreement(); err != nil {
+	if err = session.control.awaitSchemaAgreement(); err != nil {
 		tb.Fatal(err)
 	}
 }
@@ -252,7 +240,7 @@ func TestTracing(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE trace (id int primary key)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.trace (id int primary key)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
@@ -284,7 +272,7 @@ func TestPaging(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE paging (id int primary key)"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.paging (id int primary key)"); err != nil {
 		t.Fatal("create table:", err)
 	}
 	for i := 0; i < 100; i++ {
@@ -316,7 +304,7 @@ func TestCAS(t *testing.T) {
 	defer session.Close()
 	session.cfg.SerialConsistency = LocalSerial
 
-	if err := createTable(session, `CREATE TABLE cas_table (
+	if err := createTable(session, `CREATE TABLE gocql_test.cas_table (
 			title         varchar,
 			revid   	  timeuuid,
 			last_modified timestamp,
@@ -436,7 +424,7 @@ func TestMapScanCAS(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE cas_table2 (
+	if err := createTable(session, `CREATE TABLE gocql_test.cas_table2 (
 			title         varchar,
 			revid   	  timeuuid,
 			last_modified timestamp,
@@ -478,7 +466,7 @@ func TestBatch(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE batch_table (id int primary key)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.batch_table (id int primary key)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -486,6 +474,7 @@ func TestBatch(t *testing.T) {
 	for i := 0; i < 100; i++ {
 		batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
 	}
+
 	if err := session.ExecuteBatch(batch); err != nil {
 		t.Fatal("execute batch:", err)
 	}
@@ -506,7 +495,7 @@ func TestUnpreparedBatch(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE batch_unprepared (id int primary key, c counter)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.batch_unprepared (id int primary key, c counter)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -548,7 +537,7 @@ func TestBatchLimit(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE batch_table2 (id int primary key)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.batch_table2 (id int primary key)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -566,7 +555,7 @@ func TestWhereIn(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE where_in_table (id int, cluster int, primary key (id,cluster))`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.where_in_table (id int, cluster int, primary key (id,cluster))`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -595,7 +584,7 @@ func TestTooManyQueryArgs(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE too_many_query_args (id int primary key, value int)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.too_many_query_args (id int primary key, value int)`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -632,7 +621,7 @@ func TestNotEnoughQueryArgs(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -698,7 +687,7 @@ func (n *FullName) UnmarshalCQL(info TypeInfo, data []byte) error {
 func TestMapScanWithRefMap(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
-	if err := createTable(session, `CREATE TABLE scan_map_ref_table (
+	if err := createTable(session, `CREATE TABLE gocql_test.scan_map_ref_table (
 			testtext       text PRIMARY KEY,
 			testfullname   text,
 			testint        int,
@@ -742,7 +731,7 @@ func TestMapScanWithRefMap(t *testing.T) {
 func TestSliceMap(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
-	if err := createTable(session, `CREATE TABLE slice_map_table (
+	if err := createTable(session, `CREATE TABLE gocql_test.slice_map_table (
 			testuuid       timeuuid PRIMARY KEY,
 			testtimestamp  timestamp,
 			testvarchar    varchar,
@@ -866,7 +855,7 @@ func TestScanWithNilArguments(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE scan_with_nil_arguments (
+	if err := createTable(session, `CREATE TABLE gocql_test.scan_with_nil_arguments (
 			foo   varchar,
 			bar   int,
 			PRIMARY KEY (foo, bar)
@@ -902,7 +891,7 @@ func TestScanCASWithNilArguments(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE scan_cas_with_nil_arguments (
+	if err := createTable(session, `CREATE TABLE gocql_test.scan_cas_with_nil_arguments (
 		foo   varchar,
 		bar   varchar,
 		PRIMARY KEY (foo, bar)
@@ -946,7 +935,7 @@ func TestRebindQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -986,7 +975,7 @@ func TestStaticQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1055,7 +1044,7 @@ func TestBoundQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1098,7 +1087,7 @@ func TestBatchQueryInfo(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1143,7 +1132,7 @@ func TestBatchQueryInfo(t *testing.T) {
 }
 
 func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
-	if err := createTable(session, `CREATE TABLE `+table+` (
+	if err := createTable(session, `CREATE TABLE gocql_test.`+table+` (
 			foo   varchar,
 			bar   int,
 			PRIMARY KEY (foo, bar)
@@ -1185,7 +1174,7 @@ func TestMissingSchemaPrepare(t *testing.T) {
 		t.Fatal("expected error, but got nil.")
 	}
 
-	if err := createTable(s, "CREATE TABLE invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
+	if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -1249,7 +1238,7 @@ func TestPreparedCacheEviction(t *testing.T) {
 	stmtsLRU.Max(4)
 	stmtsLRU.Unlock()
 
-	if err := createTable(session, "CREATE TABLE prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 	//Fill the table
@@ -1345,10 +1334,10 @@ func TestPreparedCacheKey(t *testing.T) {
 	defer session2.Close()
 
 	// both keyspaces have a table named "test_stmt_cache_key"
-	if err := createTable(session, "CREATE TABLE test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
 		t.Fatal("create table:", err)
 	}
-	if err := createTable(session2, "CREATE TABLE test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
+	if err := createTable(session2, "CREATE TABLE gocql_test2.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
 		t.Fatal("create table:", err)
 	}
 
@@ -1382,7 +1371,7 @@ func TestMarshalFloat64Ptr(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE float_test (id double, test double, primary key (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.float_test (id double, test double, primary key (id))"); err != nil {
 		t.Fatal("create table:", err)
 	}
 	testNum := float64(7500)
@@ -1396,7 +1385,7 @@ func TestMarshalInet(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE inet_test (ip inet, name text, primary key (ip))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.inet_test (ip inet, name text, primary key (ip))"); err != nil {
 		t.Fatal("create table:", err)
 	}
 	stringIp := "123.34.45.56"
@@ -1447,7 +1436,7 @@ func TestVarint(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1554,7 +1543,7 @@ func TestBatchStats(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE batchStats (id int, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.batchStats (id int, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1580,7 +1569,7 @@ func TestNilInQuery(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE testNilInsert (id int, count int, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.testNilInsert (id int, count int, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 	if err := session.Query("INSERT INTO testNilInsert (id,count) VALUES (?,?)", 1, nil).Exec(); err != nil {
@@ -1601,7 +1590,7 @@ func TestEmptyTimestamp(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1659,7 +1648,7 @@ func TestGetTableMetadata(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE test_table_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_table_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1743,7 +1732,7 @@ func TestGetColumnMetadata(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE test_column_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_column_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1845,7 +1834,7 @@ func TestKeyspaceMetadata(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE test_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -1908,10 +1897,10 @@ func TestRoutingKey(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
-	if err := createTable(session, "CREATE TABLE test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
 		t.Fatalf("failed to create table with error '%v'", err)
 	}
 
@@ -2035,7 +2024,7 @@ func TestTokenAwareConnPool(t *testing.T) {
 		t.Errorf("Expected pool size %d but was %d", cluster.NumConns*len(cluster.Hosts), session.pool.Size())
 	}
 
-	if err := createTable(session, "CREATE TABLE test_token_aware (id int, data text, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.test_token_aware (id int, data text, PRIMARY KEY (id))"); err != nil {
 		t.Fatalf("failed to create test_token_aware table with err: %v", err)
 	}
 	query := session.Query("INSERT INTO test_token_aware (id, data) VALUES (?,?)", 42, "8 * 6 =")
@@ -2134,7 +2123,7 @@ func TestManualQueryPaging(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, "CREATE TABLE testManualPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
+	if err := createTable(session, "CREATE TABLE gocql_test.testManualPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
 		t.Fatal(err)
 	}
 
@@ -2183,7 +2172,7 @@ func TestLexicalUUIDType(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE test_lexical_uuid (
+	if err := createTable(session, `CREATE TABLE gocql_test.test_lexical_uuid (
 			key     varchar,
 			column1 'org.apache.cassandra.db.marshal.LexicalUUIDType',
 			value   int,
@@ -2218,7 +2207,7 @@ func TestSessionBindRoutingKey(t *testing.T) {
 	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE test_bind_routing_key (
+	if err := createTable(session, `CREATE TABLE gocql_test.test_bind_routing_key (
 			key     varchar,
 			value   int,
 			PRIMARY KEY (key)
@@ -2250,7 +2239,7 @@ func TestJSONSupport(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE test_json (
+	if err := createTable(session, `CREATE TABLE gocql_test.test_json (
 		    id text PRIMARY KEY,
 		    age int,
 		    state text

+ 3 - 0
cluster.go

@@ -106,6 +106,9 @@ type ClusterConfig struct {
 	// PoolConfig configures the underlying connection pool, allowing the
 	// configuration of host selection and connection selection policies.
 	PoolConfig PoolConfig
+
+	// internal config for testing
+	disableControlConn bool
 }
 
 // NewCluster generates a new config for the default cluster implementation.

+ 6 - 64
conn.go

@@ -99,6 +99,7 @@ type Conn struct {
 	conn    net.Conn
 	r       *bufio.Reader
 	timeout time.Duration
+	cfg     *ConnConfig
 
 	headerBuf []byte
 
@@ -121,7 +122,7 @@ type Conn struct {
 
 // Connect establishes a connection to a Cassandra node.
 // You must also call the Serve method before you can execute any queries.
-func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
+func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
 	var (
 		err  error
 		conn net.Conn
@@ -166,6 +167,7 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 	c := &Conn{
 		conn:         conn,
 		r:            bufio.NewReader(conn),
+		cfg:          cfg,
 		uniq:         make(chan int, cfg.NumStreams),
 		calls:        make([]callReq, cfg.NumStreams),
 		timeout:      cfg.Timeout,
@@ -191,7 +193,7 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 
 	go c.serve()
 
-	if err := c.startup(&cfg); err != nil {
+	if err := c.startup(); err != nil {
 		conn.Close()
 		return nil, err
 	}
@@ -231,9 +233,9 @@ func (c *Conn) Read(p []byte) (n int, err error) {
 	return
 }
 
-func (c *Conn) startup(cfg *ConnConfig) error {
+func (c *Conn) startup() error {
 	m := map[string]string{
-		"CQL_VERSION": cfg.CQLVersion,
+		"CQL_VERSION": c.cfg.CQLVersion,
 	}
 
 	if c.compressor != nil {
@@ -884,66 +886,6 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 }
 
-func (c *Conn) awaitSchemaAgreement() (err error) {
-
-	const (
-		// TODO(zariel): if we export this make this configurable
-		maxWaitTime = 60 * time.Second
-
-		peerSchemas  = "SELECT schema_version FROM system.peers"
-		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
-	)
-
-	endDeadline := time.Now().Add(maxWaitTime)
-
-	for time.Now().Before(endDeadline) {
-		iter := c.executeQuery(&Query{
-			stmt: peerSchemas,
-			cons: One,
-		})
-
-		versions := make(map[string]struct{})
-
-		var schemaVersion string
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
-
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
-
-		iter = c.executeQuery(&Query{
-			stmt: localSchemas,
-			cons: One,
-		})
-
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
-
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
-
-		if len(versions) <= 1 {
-			return nil
-		}
-
-	cont:
-		time.Sleep(200 * time.Millisecond)
-	}
-
-	if err != nil {
-		return
-	}
-
-	// not exported
-	return errors.New("gocql: cluster schema versions not consistent")
-}
-
 type inflightPrepare struct {
 	info QueryInfo
 	err  error

+ 1 - 0
conn_test.go

@@ -341,6 +341,7 @@ func TestRoundRobinConnPoolRoundRobin(t *testing.T) {
 	cluster := NewCluster(addrs...)
 	cluster.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
 	cluster.PoolConfig.ConnSelectionPolicy = RoundRobinConnPolicy()
+	cluster.disableControlConn = true
 
 	db, err := cluster.CreateSession()
 	if err != nil {

+ 4 - 4
connectionpool.go

@@ -60,7 +60,7 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
 type policyConnPool struct {
 	port     int
 	numConns int
-	connCfg  ConnConfig
+	connCfg  *ConnConfig
 	keyspace string
 
 	mu            sync.RWMutex
@@ -88,7 +88,7 @@ func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
 	pool := &policyConnPool{
 		port:     cfg.Port,
 		numConns: cfg.NumConns,
-		connCfg: ConnConfig{
+		connCfg: &ConnConfig{
 			ProtoVersion:  cfg.ProtoVersion,
 			CQLVersion:    cfg.CQLVersion,
 			Timeout:       cfg.Timeout,
@@ -212,7 +212,7 @@ type hostConnPool struct {
 	port     int
 	addr     string
 	size     int
-	connCfg  ConnConfig
+	connCfg  *ConnConfig
 	keyspace string
 	policy   ConnSelectionPolicy
 	// protection for conns, closed, filling
@@ -222,7 +222,7 @@ type hostConnPool struct {
 	filling bool
 }
 
-func newHostConnPool(host string, port int, size int, connCfg ConnConfig,
+func newHostConnPool(host string, port int, size int, connCfg *ConnConfig,
 	keyspace string, policy ConnSelectionPolicy) *hostConnPool {
 
 	pool := &hostConnPool{

+ 220 - 0
control.go

@@ -0,0 +1,220 @@
+package gocql
+
+import (
+	"errors"
+	"fmt"
+	"sync/atomic"
+	"time"
+)
+
+type controlConn struct {
+	session *Session
+
+	conn       atomic.Value
+	connecting uint64
+
+	retry RetryPolicy
+
+	quit chan struct{}
+}
+
+func createControlConn(session *Session) *controlConn {
+	control := &controlConn{
+		session: session,
+		quit:    make(chan struct{}),
+		retry:   &SimpleRetryPolicy{NumRetries: 3},
+	}
+
+	control.conn.Store((*Conn)(nil))
+	control.reconnect()
+	go control.heartBeat()
+
+	return control
+}
+
+func (c *controlConn) heartBeat() {
+	for {
+		select {
+		case <-c.quit:
+			return
+		case <-time.After(5 * time.Second):
+		}
+
+		resp, err := c.writeFrame(&writeOptionsFrame{})
+		if err != nil {
+			goto reconn
+		}
+
+		switch resp.(type) {
+		case *supportedFrame:
+			continue
+		case error:
+			goto reconn
+		default:
+			panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp))
+		}
+
+	reconn:
+		c.reconnect()
+		time.Sleep(5 * time.Second)
+		continue
+
+	}
+}
+
+func (c *controlConn) reconnect() {
+	if !atomic.CompareAndSwapUint64(&c.connecting, 0, 1) {
+		return
+	}
+
+	success := false
+	defer func() {
+		// debounce reconnect a little
+		if success {
+			go func() {
+				time.Sleep(500 * time.Millisecond)
+				atomic.StoreUint64(&c.connecting, 0)
+			}()
+		} else {
+			atomic.StoreUint64(&c.connecting, 0)
+		}
+	}()
+
+	oldConn := c.conn.Load().(*Conn)
+
+	// TODO: should have our own roundrobbin for hosts so that we can try each
+	// in succession and guantee that we get a different host each time.
+	conn := c.session.pool.Pick(nil)
+	if conn == nil {
+		return
+	}
+
+	newConn, err := Connect(conn.addr, conn.cfg, c)
+	if err != nil {
+		// TODO: add log handler for things like this
+		return
+	}
+
+	c.conn.Store(newConn)
+	success = true
+
+	if oldConn != nil {
+		oldConn.Close()
+	}
+}
+
+func (c *controlConn) HandleError(conn *Conn, err error, closed bool) {
+	if !closed {
+		return
+	}
+
+	oldConn := c.conn.Load().(*Conn)
+	if oldConn != conn {
+		return
+	}
+
+	c.reconnect()
+}
+
+func (c *controlConn) writeFrame(w frameWriter) (frame, error) {
+	conn := c.conn.Load().(*Conn)
+	if conn == nil {
+		return nil, errNoControl
+	}
+
+	framer, err := conn.exec(w, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	return framer.parseFrame()
+}
+
+// query will return nil if the connection is closed or nil
+func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
+	q := c.session.Query(statement, values...).Consistency(One)
+
+	const maxConnectAttempts = 5
+	connectAttempts := 0
+
+	for {
+		conn := c.conn.Load().(*Conn)
+		if conn == nil {
+			if connectAttempts > maxConnectAttempts {
+				return &Iter{err: errNoControl}
+			}
+
+			connectAttempts++
+
+			c.reconnect()
+			continue
+		}
+
+		iter = conn.executeQuery(q)
+		q.attempts++
+		if iter.err == nil || !c.retry.Attempt(q) {
+			break
+		}
+	}
+
+	return
+}
+
+func (c *controlConn) awaitSchemaAgreement() (err error) {
+
+	const (
+		// TODO(zariel): if we export this make this configurable
+		maxWaitTime = 60 * time.Second
+
+		peerSchemas  = "SELECT schema_version FROM system.peers"
+		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
+	)
+
+	endDeadline := time.Now().Add(maxWaitTime)
+
+	for time.Now().Before(endDeadline) {
+		iter := c.query(peerSchemas)
+
+		versions := make(map[string]struct{})
+
+		var schemaVersion string
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		iter = c.query(localSchemas)
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		if len(versions) <= 1 {
+			return nil
+		}
+
+	cont:
+		time.Sleep(200 * time.Millisecond)
+	}
+
+	if err != nil {
+		return
+	}
+
+	// not exported
+	return errors.New("gocql: cluster schema versions not consistent")
+}
+func (c *controlConn) close() {
+	// TODO: handle more gracefully
+	close(c.quit)
+}
+
+var errNoControl = errors.New("gocql: no controll connection available")

+ 4 - 4
errors_test.go

@@ -10,20 +10,20 @@ func TestErrorsParse(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	if err := createTable(session, `CREATE TABLE errors_parse (id int primary key)`); err != nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.errors_parse (id int primary key)`); err != nil {
 		t.Fatal("create:", err)
 	}
 
-	if err := createTable(session, `CREATE TABLE errors_parse (id int primary key)`); err == nil {
+	if err := createTable(session, `CREATE TABLE gocql_test.errors_parse (id int primary key)`); err == nil {
 		t.Fatal("Should have gotten already exists error from cassandra server.")
 	} else {
 		switch e := err.(type) {
 		case *RequestErrAlreadyExists:
 			if e.Table != "errors_parse" {
-				t.Fatal("Failed to parse error response from cassandra for ErrAlreadyExists.")
+				t.Fatalf("expected error table to be 'errors_parse' but was %q", e.Table)
 			}
 		default:
-			t.Fatal("Failed to parse error response from cassandra for ErrAlreadyExists.")
+			t.Fatalf("expected to get RequestErrAlreadyExists instead got %T", e)
 		}
 	}
 }

+ 11 - 0
frame.go

@@ -1397,6 +1397,17 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame) error {
 	return f.finishWrite()
 }
 
+type writeOptionsFrame struct{}
+
+func (w *writeOptionsFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writeOptionsFrame(streamID, w)
+}
+
+func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error {
+	f.writeHeader(f.flags, opOptions, stream)
+	return f.finishWrite()
+}
+
 func (f *framer) readByte() byte {
 	if len(f.rbuf) < 1 {
 		panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.rbuf)))

+ 23 - 17
host_source.go

@@ -27,14 +27,17 @@ type ringDescriber struct {
 func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err error) {
 	// we need conn to be the same because we need to query system.peers and system.local
 	// on the same node to get the whole cluster
+
+	iter := r.session.control.query("SELECT data_center, rack, host_id, tokens, partitioner FROM system.local")
+	if iter == nil {
+		return r.prevHosts, r.prevPartitioner, nil
+	}
+
 	conn := r.session.pool.Pick(nil)
 	if conn == nil {
 		return r.prevHosts, r.prevPartitioner, nil
 	}
 
-	query := r.session.Query("SELECT data_center, rack, host_id, tokens, partitioner FROM system.local")
-	iter := conn.executeQuery(query)
-
 	host := HostInfo{}
 	iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens, &partitioner)
 
@@ -53,8 +56,10 @@ func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err er
 
 	hosts = []HostInfo{host}
 
-	query = r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
-	iter = conn.executeQuery(query)
+	iter = r.session.control.query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
+	if iter == nil {
+		return r.prevHosts, r.prevPartitioner, nil
+	}
 
 	host = HostInfo{}
 	for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
@@ -93,20 +98,21 @@ func (h *ringDescriber) run(sleep time.Duration) {
 	}
 
 	for {
+		// if we have 0 hosts this will return the previous list of hosts to
+		// attempt to reconnect to the cluster otherwise we would never find
+		// downed hosts again, could possibly have an optimisation to only
+		// try to add new hosts if GetHosts didnt error and the hosts didnt change.
+		hosts, partitioner, err := h.GetHosts()
+		if err != nil {
+			log.Println("RingDescriber: unable to get ring topology:", err)
+			continue
+		}
+
+		h.session.pool.SetHosts(hosts)
+		h.session.pool.SetPartitioner(partitioner)
+
 		select {
 		case <-time.After(sleep):
-			// if we have 0 hosts this will return the previous list of hosts to
-			// attempt to reconnect to the cluster otherwise we would never find
-			// downed hosts again, could possibly have an optimisation to only
-			// try to add new hosts if GetHosts didnt error and the hosts didnt change.
-			hosts, partitioner, err := h.GetHosts()
-			if err != nil {
-				log.Println("RingDescriber: unable to get ring topology:", err)
-				continue
-			}
-
-			h.session.pool.SetHosts(hosts)
-			h.session.pool.SetPartitioner(partitioner)
 		case <-h.closeChan:
 			return
 		}

+ 8 - 28
metadata.go

@@ -335,30 +335,18 @@ func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind string)
 }
 
 // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
-func getKeyspaceMetadata(
-	session *Session,
-	keyspaceName string,
-) (*KeyspaceMetadata, error) {
-	query := session.Query(
-		`
+func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
+	const stmt = `
 		SELECT durable_writes, strategy_class, strategy_options
 		FROM system.schema_keyspaces
-		WHERE keyspace_name = ?
-		`,
-		keyspaceName,
-	)
-	// Set a routing key to avoid GetRoutingKey from computing the routing key
-	// TODO use a separate connection (pool) for system keyspace queries.
-	query.RoutingKey([]byte{})
+		WHERE keyspace_name = ?`
 
 	keyspace := &KeyspaceMetadata{Name: keyspaceName}
 	var strategyOptionsJSON []byte
 
-	err := query.Scan(
-		&keyspace.DurableWrites,
-		&keyspace.StrategyClass,
-		&strategyOptionsJSON,
-	)
+	iter := session.control.query(stmt, keyspaceName)
+	iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
+	err := iter.Close()
 	if err != nil {
 		return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
 	}
@@ -431,11 +419,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
 		}
 	}
 
-	// Set a routing key to avoid GetRoutingKey from computing the routing key
-	// TODO use a separate connection (pool) for system keyspace queries.
-	query := session.Query(stmt, keyspaceName)
-	query.RoutingKey([]byte{})
-	iter := query.Iter()
+	iter := session.control.query(stmt, keyspaceName)
 
 	tables := []TableMetadata{}
 	table := TableMetadata{Keyspace: keyspaceName}
@@ -560,11 +544,7 @@ func getColumnMetadata(
 
 	var indexOptionsJSON []byte
 
-	query := session.Query(stmt, keyspaceName)
-	// Set a routing key to avoid GetRoutingKey from computing the routing key
-	// TODO use a separate connection (pool) for system keyspace queries.
-	query.RoutingKey([]byte{})
-	iter := query.Iter()
+	iter := session.control.query(stmt, keyspaceName)
 
 	for scan(iter, &column, &indexOptionsJSON) {
 		var err error

+ 36 - 11
session.go

@@ -38,6 +38,8 @@ type Session struct {
 	hostSource          *ringDescriber
 	mu                  sync.RWMutex
 
+	control *controlConn
+
 	cfg ClusterConfig
 
 	closeMu  sync.RWMutex
@@ -86,6 +88,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
 
+	if !cfg.disableControlConn {
+		s.control = createControlConn(s)
+	}
+
 	if cfg.DiscoverHosts {
 		s.hostSource = &ringDescriber{
 			session:    s,
@@ -188,6 +194,10 @@ func (s *Session) Close() {
 	if s.hostSource != nil {
 		close(s.hostSource.closeChan)
 	}
+
+	if s.control != nil {
+		s.control.close()
+	}
 }
 
 func (s *Session) Closed() bool {
@@ -212,8 +222,12 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 
 		//Assign the error unavailable to the iterator
 		if conn == nil {
-			iter = &Iter{err: ErrNoConnections}
-			break
+			if qry.rt == nil || !qry.rt.Attempt(qry) {
+				iter = &Iter{err: ErrNoConnections}
+				break
+			}
+
+			continue
 		}
 
 		t := time.Now()
@@ -1055,29 +1069,40 @@ func (t *traceWriter) Trace(traceId []byte) {
 		coordinator string
 		duration    int
 	)
-	t.session.Query(`SELECT coordinator, duration
+	iter := t.session.control.query(`SELECT coordinator, duration
 			FROM system_traces.sessions
-			WHERE session_id = ?`, traceId).
-		Consistency(One).Scan(&coordinator, &duration)
+			WHERE session_id = ?`, traceId)
+
+	iter.Scan(&coordinator, &duration)
+	if err := iter.Close(); err != nil {
+		t.mu.Lock()
+		fmt.Fprintln(t.w, "Error:", err)
+		t.mu.Unlock()
+		return
+	}
 
-	iter := t.session.Query(`SELECT event_id, activity, source, source_elapsed
-			FROM system_traces.events
-			WHERE session_id = ?`, traceId).
-		Consistency(One).Iter()
 	var (
 		timestamp time.Time
 		activity  string
 		source    string
 		elapsed   int
 	)
-	t.mu.Lock()
-	defer t.mu.Unlock()
+
 	fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
 		traceId, coordinator, time.Duration(duration)*time.Microsecond)
+
+	t.mu.Lock()
+	defer t.mu.Unlock()
+
+	iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed
+			FROM system_traces.events
+			WHERE session_id = ?`, traceId)
+
 	for iter.Scan(&timestamp, &activity, &source, &elapsed) {
 		fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
 			timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
 	}
+
 	if err := iter.Close(); err != nil {
 		fmt.Fprintln(t.w, "Error:", err)
 	}

+ 1 - 1
tuple_test.go

@@ -12,7 +12,7 @@ func TestTupleSimple(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	err := createTable(session, `CREATE TABLE tuple_test(
+	err := createTable(session, `CREATE TABLE gocql_test.tuple_test(
 		id int,
 		coord frozen<tuple<int, int>>,
 

+ 10 - 10
udt_test.go

@@ -51,7 +51,7 @@ func TestUDT_Marshaler(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	err := createTable(session, `CREATE TYPE position(
+	err := createTable(session, `CREATE TYPE gocql_test.position(
 		lat int,
 		lon int,
 		padding text);`)
@@ -59,7 +59,7 @@ func TestUDT_Marshaler(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	err = createTable(session, `CREATE TABLE houses(
+	err = createTable(session, `CREATE TABLE gocql_test.houses(
 		id int,
 		name text,
 		loc frozen<position>,
@@ -108,14 +108,14 @@ func TestUDT_Reflect(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	err := createTable(session, `CREATE TYPE horse(
+	err := createTable(session, `CREATE TYPE gocql_test.horse(
 		name text,
 		owner text);`)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	err = createTable(session, `CREATE TABLE horse_race(
+	err = createTable(session, `CREATE TABLE gocql_test.horse_race(
 		position int,
 		horse frozen<horse>,
 
@@ -167,14 +167,14 @@ func TestUDT_Proto2error(t *testing.T) {
 	}
 	defer session.Close()
 
-	err = createTable(session, `CREATE TYPE fish(
+	err = createTable(session, `CREATE TYPE gocql_test.fish(
 		name text,
 		owner text);`)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	err = createTable(session, `CREATE TABLE fish_race(
+	err = createTable(session, `CREATE TABLE gocql_test.fish_race(
 		position int,
 		fish frozen<fish>,
 
@@ -208,14 +208,14 @@ func TestUDT_NullObject(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	err := createTable(session, `CREATE TYPE udt_null_type(
+	err := createTable(session, `CREATE TYPE gocql_test.udt_null_type(
 		name text,
 		owner text);`)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	err = createTable(session, `CREATE TABLE udt_null_table(
+	err = createTable(session, `CREATE TABLE gocql_test.udt_null_table(
 		id uuid,
 		udt_col frozen<udt_null_type>,
 
@@ -262,7 +262,7 @@ func TestMapScanUDT(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
 
-	err := createTable(session, `CREATE TYPE log_entry (
+	err := createTable(session, `CREATE TYPE gocql_test.log_entry (
 		created_timestamp timestamp,
 		message text
 	);`)
@@ -270,7 +270,7 @@ func TestMapScanUDT(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	err = createTable(session, `CREATE TABLE requests_by_id (
+	err = createTable(session, `CREATE TABLE gocql_test.requests_by_id (
 		id uuid PRIMARY KEY,
 		type int,
 		log_entries list<frozen <log_entry>>

+ 2 - 2
wiki_test.go

@@ -56,11 +56,11 @@ type WikiTest struct {
 
 func CreateSchema(session *Session, tb testing.TB, table string) *WikiTest {
 	table = "wiki_" + table
-	if err := session.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s", table)).Exec(); err != nil {
+	if err := createTable(session, fmt.Sprintf("DROP TABLE IF EXISTS gocql_test.%s", table)); err != nil {
 		tb.Fatal("CreateSchema:", err)
 	}
 
-	err := createTable(session, fmt.Sprintf(`CREATE TABLE %s (
+	err := createTable(session, fmt.Sprintf(`CREATE TABLE gocql_test.%s (
 			title       varchar,
 			revid       timeuuid,
 			body        varchar,