Browse Source

Add KeyspaceMetadata() func to Session which returns schema information

- Implemented functions for retrieving keyspace, table, and column
schema metadata from the system schema tables
- Implemented a function to reason about schema metadata in order to
determine the partition key and clustering columns for a table
- Implemented a cassandra type parser for parsing information about
validator and comparator definitions in table and column schema metadata
- Implemented schemaDescriber, an analog to ringDescriber for schema metadata
- Added KeyspaceMetadata() function to Session
- Added Justin Corpron to AUTHORS
Justin Corpron 11 years ago
parent
commit
4ce212b9a3
6 changed files with 1803 additions and 7 deletions
  1. 1 0
      AUTHORS
  2. 277 0
      cassandra_test.go
  3. 1 1
      marshal.go
  4. 836 0
      metadata.go
  5. 670 0
      metadata_test.go
  6. 18 6
      session.go

+ 1 - 0
AUTHORS

@@ -43,3 +43,4 @@ James Maloney <jamessagan@gmail.com>
 Ashwin Purohit <purohit@gmail.com>
 Dan Kinder <dkinder.is.me@gmail.com>
 Oliver Beattie <oliver@obeattie.com>
+Justin Corpron <justin@retailnext.com>

+ 277 - 0
cassandra_test.go

@@ -1427,3 +1427,280 @@ func TestEmptyTimestamp(t *testing.T) {
 		t.Errorf("time.Time bind variable should still be empty (was %s)", timeVal)
 	}
 }
+
+func TestGetKeyspaceMetadata(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	keyspaceMetadata, err := getKeyspaceMetadata(session)
+	if err != nil {
+		t.Fatalf("failed to query the keyspace metadata with err: %v", err)
+	}
+	if keyspaceMetadata == nil {
+		t.Fatal("failed to query the keyspace metadata, nil returned")
+	}
+	if keyspaceMetadata.Name != "gocql_test" {
+		t.Errorf("Expected keyspace name to be 'gocql' but was '%s'", keyspaceMetadata.Name)
+	}
+	if keyspaceMetadata.StrategyClass != "org.apache.cassandra.locator.SimpleStrategy" {
+		t.Errorf("Expected replication strategy class to be 'org.apache.cassandra.locator.SimpleStrategy' but was '%s'", keyspaceMetadata.StrategyClass)
+	}
+	if keyspaceMetadata.StrategyOptions == nil {
+		t.Error("Expected replication strategy options map but was nil")
+	}
+	rfStr, ok := keyspaceMetadata.StrategyOptions["replication_factor"]
+	if !ok {
+		t.Fatalf("Expected strategy option 'replication_factor' but was not found in %v", keyspaceMetadata.StrategyOptions)
+	}
+	rfInt, err := strconv.Atoi(rfStr.(string))
+	if err != nil {
+		t.Fatalf("Error converting string to int with err: %v", err)
+	}
+	if rfInt != *flagRF {
+		t.Errorf("Expected replication factor to be %d but was %d", *flagRF, rfInt)
+	}
+}
+
+func TestGetTableMetadata(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE test_table_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+
+	tables, err := getTableMetadata(session)
+	if err != nil {
+		t.Fatalf("failed to query the table metadata with err: %v", err)
+	}
+	if tables == nil {
+		t.Fatal("failed to query the table metadata, nil returned")
+	}
+
+	var testTable *TableMetadata
+
+	// verify all tables have minimum expected data
+	for i := range tables {
+		table := &tables[i]
+
+		if table.Name == "" {
+			t.Errorf("Expected table name to be set, but it was empty: index=%d metadata=%+v", i, table)
+		}
+		if table.Keyspace != "gocql_test" {
+			t.Errorf("Expected keyspace for '%d' table metadata to be 'gocql_test' but was '%s'", table.Name, table.Keyspace)
+		}
+		if table.KeyValidator == "" {
+			t.Errorf("Expected key validator to be set for table %s", table.Name)
+		}
+		if table.Comparator == "" {
+			t.Errorf("Expected comparator to be set for table %s", table.Name)
+		}
+		if table.DefaultValidator == "" {
+			t.Errorf("Expected default validator to be set for table %s", table.Name)
+		}
+
+		// these fields are not set until the metadata is compiled
+		if table.PartitionKey != nil {
+			t.Errorf("Did not expect partition key for table %s", table.Name)
+		}
+		if table.ClusteringColumns != nil {
+			t.Errorf("Did not expect clustering columns for table %s", table.Name)
+		}
+		if table.Columns != nil {
+			t.Errorf("Did not expect columns for table %s", table.Name)
+		}
+
+		// for the next part of the test after this loop, find the metadata for the test table
+		if table.Name == "test_table_metadata" {
+			testTable = table
+		}
+	}
+
+	// verify actual values on the test tables
+	if testTable == nil {
+		t.Fatal("Expected table metadata for name 'test_table_metadata'")
+	}
+	if testTable.KeyValidator != "org.apache.cassandra.db.marshal.Int32Type" {
+		t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.Int32Type' but was '%s'", testTable.KeyValidator)
+	}
+	if testTable.Comparator != "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)" {
+		t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)' but was '%s'", testTable.Comparator)
+	}
+	if testTable.DefaultValidator != "org.apache.cassandra.db.marshal.BytesType" {
+		t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.BytesType' but was '%s'", testTable.DefaultValidator)
+	}
+	expectedKeyAliases := []string{"first_id"}
+	if !reflect.DeepEqual(testTable.KeyAliases, expectedKeyAliases) {
+		t.Errorf("Expected key aliases %v but was %v", expectedKeyAliases, testTable.KeyAliases)
+	}
+	expectedColumnAliases := []string{"second_id"}
+	if !reflect.DeepEqual(testTable.ColumnAliases, expectedColumnAliases) {
+		t.Errorf("Expected key aliases %v but was %v", expectedColumnAliases, testTable.ColumnAliases)
+	}
+	if testTable.ValueAlias != "" {
+		t.Errorf("Expected value alias '' but was '%s'", testTable.ValueAlias)
+	}
+}
+
+func TestGetColumnMetadata(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE test_column_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+
+	if err := session.Query("CREATE INDEX index_column_metadata ON test_column_metadata ( third_id )").Exec(); err != nil {
+		t.Fatalf("failed to create index with err: %v", err)
+	}
+
+	columns, err := getColumnMetadata(session)
+	if err != nil {
+		t.Fatalf("failed to query column metadata with err: %v", err)
+	}
+	if columns == nil {
+		t.Fatal("failed to query column metadata, nil returned")
+	}
+
+	testColumns := map[string]*ColumnMetadata{}
+
+	// verify actual values on the test columns
+	for i := range columns {
+		column := &columns[i]
+
+		if column.Name == "" {
+			t.Errorf("Expected column name to be set, but it was empty: index=%d metadata=%+v", i, column)
+		}
+		if column.Table == "" {
+			t.Errorf("Expected column %s table name to be set, but it was empty", column.Name)
+		}
+		if column.Keyspace != "gocql_test" {
+			t.Errorf("Expected column %s keyspace name to be 'gocql_test', but it was '%s'", column.Name, column.Keyspace)
+		}
+		if column.Kind == "" {
+			t.Errorf("Expected column %s kind to be set, but it was empty", column.Name)
+		}
+		if session.cfg.ProtoVersion == 1 && column.Kind != "regular" {
+			t.Errorf("Expected column %s kind to be set to 'regular' for proto V1 but it was '%s'", column.Name, column.Kind)
+		}
+		if column.Validator == "" {
+			t.Errorf("Expected column %s validator to be set, but it was empty", column.Name)
+		}
+
+		// find the test table columns for the next step after this loop
+		if column.Table == "test_column_metadata" {
+			testColumns[column.Name] = column
+		}
+	}
+
+	if *flagProto == 1 {
+		// V1 proto only returns "regular columns"
+		if len(testColumns) != 1 {
+			t.Errorf("Expected 1 test columns but there were %d", len(testColumns))
+		}
+		thirdID, found := testColumns["third_id"]
+		if !found {
+			t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
+		}
+
+		if thirdID.Kind != REGULAR {
+			t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, REGULAR, thirdID.Kind)
+		}
+
+		if thirdID.Index.Name != "index_column_metadata" {
+			t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
+		}
+	} else {
+		if len(testColumns) != 3 {
+			t.Errorf("Expected 3 test columns but there were %d", len(testColumns))
+		}
+		firstID, found := testColumns["first_id"]
+		if !found {
+			t.Fatalf("Expected to find column 'first_id' metadata but there was only %v", testColumns)
+		}
+		secondID, found := testColumns["second_id"]
+		if !found {
+			t.Fatalf("Expected to find column 'second_id' metadata but there was only %v", testColumns)
+		}
+		thirdID, found := testColumns["third_id"]
+		if !found {
+			t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
+		}
+
+		if firstID.Kind != PARTITION_KEY {
+			t.Errorf("Expected %s column kind to be '%s' but it was '%s'", firstID.Name, PARTITION_KEY, firstID.Kind)
+		}
+		if secondID.Kind != CLUSTERING_KEY {
+			t.Errorf("Expected %s column kind to be '%s' but it was '%s'", secondID.Name, CLUSTERING_KEY, secondID.Kind)
+		}
+		if thirdID.Kind != REGULAR {
+			t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, REGULAR, thirdID.Kind)
+		}
+
+		if thirdID.Index.Name != "index_column_metadata" {
+			t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
+		}
+	}
+}
+
+func TestKeyspaceMetadata(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE test_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+
+	if err := session.Query("CREATE INDEX index_metadata ON test_metadata ( third_id )").Exec(); err != nil {
+		t.Fatalf("failed to create index with err: %v", err)
+	}
+
+	keyspaceMetadata, err := session.KeyspaceMetadata()
+	if err != nil {
+		t.Fatalf("failed to query keyspace metadata with err: %v", err)
+	}
+	if keyspaceMetadata == nil {
+		t.Fatal("expected the keyspace metadata to not be nil, but it was nil")
+	}
+	if keyspaceMetadata.Name != session.cfg.Keyspace {
+		t.Fatalf("Expected the keyspace name to be %s but was %s", session.cfg.Keyspace, keyspaceMetadata.Name)
+	}
+	if len(keyspaceMetadata.Tables) == 0 {
+		t.Errorf("Expected tables but there were none")
+	}
+
+	tableMetadata, found := keyspaceMetadata.Tables["test_metadata"]
+	if !found {
+		t.Fatalf("failed to find the test_metadata table metadata")
+	}
+
+	if len(tableMetadata.PartitionKey) != 1 {
+		t.Errorf("expected partition key length of 1, but was %d", len(tableMetadata.PartitionKey))
+	}
+	for i, column := range tableMetadata.PartitionKey {
+		if column == nil {
+			t.Errorf("partition key column metadata at index %d was nil", i)
+		}
+	}
+	if tableMetadata.PartitionKey[0].Name != "first_id" {
+		t.Errorf("Expected the first partition key column to be 'first_id' but was '%s'", tableMetadata.PartitionKey[0].Name)
+	}
+	if len(tableMetadata.ClusteringColumns) != 1 {
+		t.Fatalf("expected clustering columns length of 1, but was %d", len(tableMetadata.ClusteringColumns))
+	}
+	for i, column := range tableMetadata.ClusteringColumns {
+		if column == nil {
+			t.Fatalf("clustering column metadata at index %d was nil", i)
+		}
+	}
+	if tableMetadata.ClusteringColumns[0].Name != "second_id" {
+		t.Errorf("Expected the first clustering column to be 'second_id' but was '%s'", tableMetadata.ClusteringColumns[0].Name)
+	}
+	thirdColumn, found := tableMetadata.Columns["third_id"]
+	if !found {
+		t.Fatalf("Expected a column definition for 'third_id'")
+	}
+	if thirdColumn.Index.Name != "index_metadata" {
+		t.Errorf("Expected column index named 'index_metadata' but was '%s'", thirdColumn.Index.Name)
+	}
+}

+ 1 - 1
marshal.go

@@ -1123,7 +1123,7 @@ type TypeInfo struct {
 	Type   Type
 	Key    *TypeInfo // only used for TypeMap
 	Elem   *TypeInfo // only used for TypeMap, TypeList and TypeSet
-	Custom string    // only used for TypeCostum
+	Custom string    // only used for TypeCustom
 }
 
 // String returns a human readable name for the Cassandra datatype

+ 836 - 0
metadata.go

@@ -0,0 +1,836 @@
+// Copyright (c) 2015 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gocql
+
+import (
+	"encoding/hex"
+	"encoding/json"
+	"fmt"
+	"log"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+// schema metadata for a keyspace
+type KeyspaceMetadata struct {
+	Name            string
+	DurableWrites   bool
+	StrategyClass   string
+	StrategyOptions map[string]interface{}
+	Tables          map[string]*TableMetadata
+}
+
+// schema metadata for a table (a.k.a. column family)
+type TableMetadata struct {
+	Keyspace          string
+	Name              string
+	KeyValidator      string
+	Comparator        string
+	DefaultValidator  string
+	KeyAliases        []string
+	ColumnAliases     []string
+	ValueAlias        string
+	PartitionKey      []*ColumnMetadata
+	ClusteringColumns []*ColumnMetadata
+	Columns           map[string]*ColumnMetadata
+}
+
+// schema metadata for a column
+type ColumnMetadata struct {
+	Keyspace       string
+	Table          string
+	Name           string
+	ComponentIndex int
+	Kind           string
+	Validator      string
+	Type           TypeInfo
+	Order          ColumnOrder
+	Index          ColumnIndexMetadata
+}
+
+// the ordering of the column with regard to its comparator
+type ColumnOrder bool
+
+const (
+	ASC  ColumnOrder = false
+	DESC             = true
+)
+
+type ColumnIndexMetadata struct {
+	Name    string
+	Type    string
+	Options map[string]interface{}
+}
+
+// Column kind values
+const (
+	PARTITION_KEY  = "partition_key"
+	CLUSTERING_KEY = "clustering_key"
+	REGULAR        = "regular"
+	COMPACT_VALUE  = "compact_value"
+	STATIC         = "static"
+)
+
+// default alias values
+const (
+	DEFAULT_KEY_ALIAS    = "key"
+	DEFAULT_COLUMN_ALIAS = "column"
+	DEFAULT_VALUE_ALIAS  = "value"
+)
+
+// queries the cluster for schema information for a specific keyspace
+type schemaDescriber struct {
+	session  *Session
+	mu       sync.Mutex
+	lazyInit sync.Once
+
+	current *KeyspaceMetadata
+	err     error
+}
+
+func (s *schemaDescriber) init() {
+	s.err = s.refreshSchema()
+}
+
+func (s *schemaDescriber) getSchema() (*KeyspaceMetadata, error) {
+	// lazily-initialize the schema the first time
+	s.lazyInit.Do(s.init)
+
+	// TODO handle schema change events
+
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	return s.current, s.err
+}
+
+func (s *schemaDescriber) refreshSchema() error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	var err error
+
+	// query the system keyspace for schema data
+	// TODO retrieve concurrently
+	keyspace, err := getKeyspaceMetadata(s.session)
+	if err != nil {
+		return err
+	}
+	tables, err := getTableMetadata(s.session)
+	if err != nil {
+		return err
+	}
+	columns, err := getColumnMetadata(s.session)
+	if err != nil {
+		return err
+	}
+
+	// organize the schema data
+	compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns)
+
+	// update the current
+	s.current = keyspace
+	s.err = nil
+
+	return nil
+}
+
+// "compiles" keyspace, table, and column metadata for a keyspace together
+// linking the metadata objects together and calculating the partition key
+// and clustering key.
+func compileMetadata(
+	protoVersion int,
+	keyspace *KeyspaceMetadata,
+	tables []TableMetadata,
+	columns []ColumnMetadata,
+) {
+	keyspace.Tables = make(map[string]*TableMetadata)
+	for i := range tables {
+		tables[i].Columns = make(map[string]*ColumnMetadata)
+
+		keyspace.Tables[tables[i].Name] = &tables[i]
+	}
+
+	// add columns from the schema data
+	for i := range columns {
+		// decode the validator for TypeInfo and order
+		validatorParsed := parseType(columns[i].Validator)
+		columns[i].Type = validatorParsed.types[0]
+		columns[i].Order = ASC
+		if validatorParsed.reversed[0] {
+			columns[i].Order = DESC
+		}
+
+		table := keyspace.Tables[columns[i].Table]
+		table.Columns[columns[i].Name] = &columns[i]
+	}
+
+	if protoVersion == 1 {
+		compileV1Metadata(tables)
+	} else {
+		compileV2Metadata(tables)
+	}
+}
+
+// V1 protocol does not return as much column metadata as V2+ so determining
+// PartitionKey and ClusterColumns is more complex
+func compileV1Metadata(tables []TableMetadata) {
+	for i := range tables {
+		table := &tables[i]
+
+		// decode the key validator
+		keyValidatorParsed := parseType(table.KeyValidator)
+		// decode the comparator
+		comparatorParsed := parseType(table.Comparator)
+
+		// the partition key length is the same as the number of types in the
+		// key validator
+		table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
+
+		// V1 protocol only returns "regular" columns from
+		// system.schema_columns (there is no type field for columns)
+		// so the alias information is used to
+		// create the partition key and clustering columns
+
+		// construct the partition key from the alias
+		for i := range table.PartitionKey {
+			var alias string
+			if len(table.KeyAliases) > i {
+				alias = table.KeyAliases[i]
+			} else if i == 0 {
+				alias = DEFAULT_KEY_ALIAS
+			} else {
+				alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
+			}
+
+			column := &ColumnMetadata{
+				Keyspace:       table.Keyspace,
+				Table:          table.Name,
+				Name:           alias,
+				Type:           keyValidatorParsed.types[i],
+				Kind:           PARTITION_KEY,
+				ComponentIndex: i,
+			}
+
+			table.PartitionKey[i] = column
+			table.Columns[alias] = column
+		}
+
+		// determine the number of clustering columns
+		size := len(comparatorParsed.types)
+		if comparatorParsed.isComposite {
+			if len(comparatorParsed.collections) != 0 ||
+				(len(table.ColumnAliases) == size-1 &&
+					comparatorParsed.types[size-1].Type == TypeVarchar) {
+				size = size - 1
+			}
+		} else {
+			if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
+				size = 0
+			}
+		}
+
+		table.ClusteringColumns = make([]*ColumnMetadata, size)
+
+		for i := range table.ClusteringColumns {
+			var alias string
+			if len(table.ColumnAliases) > i {
+				alias = table.ColumnAliases[i]
+			} else if i == 0 {
+				alias = DEFAULT_COLUMN_ALIAS
+			} else {
+				alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
+			}
+
+			order := ASC
+			if comparatorParsed.reversed[i] {
+				order = DESC
+			}
+
+			column := &ColumnMetadata{
+				Keyspace:       table.Keyspace,
+				Table:          table.Name,
+				Name:           alias,
+				Type:           comparatorParsed.types[i],
+				Order:          order,
+				Kind:           CLUSTERING_KEY,
+				ComponentIndex: i,
+			}
+
+			table.ClusteringColumns[i] = column
+			table.Columns[alias] = column
+		}
+
+		if size != len(comparatorParsed.types)-1 {
+			alias := DEFAULT_VALUE_ALIAS
+			if len(table.ValueAlias) > 0 {
+				alias = table.ValueAlias
+			}
+			// decode the default validator
+			defaultValidatorParsed := parseType(table.DefaultValidator)
+			column := &ColumnMetadata{
+				Keyspace: table.Keyspace,
+				Table:    table.Name,
+				Name:     alias,
+				Type:     defaultValidatorParsed.types[0],
+				Kind:     REGULAR,
+			}
+			table.Columns[alias] = column
+		}
+	}
+}
+
+// The simpler compile case for V2+ protocol
+func compileV2Metadata(tables []TableMetadata) {
+	for i := range tables {
+		table := &tables[i]
+
+		partitionColumnCount := countColumnsOfKind(table.Columns, PARTITION_KEY)
+		table.PartitionKey = make([]*ColumnMetadata, partitionColumnCount)
+
+		clusteringColumnCount := countColumnsOfKind(table.Columns, CLUSTERING_KEY)
+		table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
+
+		for _, column := range table.Columns {
+			if column.Kind == PARTITION_KEY {
+				table.PartitionKey[column.ComponentIndex] = column
+			} else if column.Kind == CLUSTERING_KEY {
+				table.ClusteringColumns[column.ComponentIndex] = column
+			}
+		}
+
+	}
+}
+
+func countColumnsOfKind(columns map[string]*ColumnMetadata, kind string) int {
+	count := 0
+	for _, column := range columns {
+		if column.Kind == kind {
+			count++
+		}
+	}
+	return count
+}
+
+// query only for the keyspace metadata for the session's keyspace
+func getKeyspaceMetadata(
+	session *Session,
+) (*KeyspaceMetadata, error) {
+	query := session.Query(
+		`
+		SELECT durable_writes, strategy_class, strategy_options
+		FROM system.schema_keyspaces
+		WHERE keyspace_name = ?
+		`,
+		session.cfg.Keyspace,
+	)
+
+	keyspace := &KeyspaceMetadata{Name: session.cfg.Keyspace}
+	var strategyOptionsJSON []byte
+
+	err := query.Scan(
+		&keyspace.DurableWrites,
+		&keyspace.StrategyClass,
+		&strategyOptionsJSON,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
+	}
+
+	err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
+	if err != nil {
+		return nil, fmt.Errorf(
+			"Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
+			strategyOptionsJSON, keyspace.Name, err,
+		)
+	}
+
+	return keyspace, nil
+}
+
+// query for only the table metadata in the session's keyspace
+func getTableMetadata(
+	session *Session,
+) ([]TableMetadata, error) {
+	query := session.Query(
+		`
+		SELECT
+			columnfamily_name,
+			key_validator,
+			comparator,
+			default_validator,
+			key_aliases,
+			column_aliases,
+			value_alias
+		FROM system.schema_columnfamilies
+		WHERE keyspace_name = ?
+		`,
+		session.cfg.Keyspace,
+	)
+	iter := query.Iter()
+
+	tables := []TableMetadata{}
+	table := TableMetadata{Keyspace: session.cfg.Keyspace}
+
+	var keyAliasesJSON []byte
+	var columnAliasesJSON []byte
+	for iter.Scan(
+		&table.Name,
+		&table.KeyValidator,
+		&table.Comparator,
+		&table.DefaultValidator,
+		&keyAliasesJSON,
+		&columnAliasesJSON,
+		&table.ValueAlias,
+	) {
+		var err error
+
+		// decode the key aliases
+		if keyAliasesJSON != nil {
+			table.KeyAliases = []string{}
+			err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
+			if err != nil {
+				iter.Close()
+				return nil, fmt.Errorf(
+					"Invalid JSON value '%s' as key_aliases for in table '%s': %v",
+					keyAliasesJSON, table.Name, err,
+				)
+			}
+		}
+
+		// decode the column aliases
+		if columnAliasesJSON != nil {
+			table.ColumnAliases = []string{}
+			err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
+			if err != nil {
+				iter.Close()
+				return nil, fmt.Errorf(
+					"Invalid JSON value '%s' as column_aliases for in table '%s': %v",
+					columnAliasesJSON, table.Name, err,
+				)
+			}
+		}
+
+		tables = append(tables, table)
+		table = TableMetadata{Keyspace: session.cfg.Keyspace}
+	}
+
+	err := iter.Close()
+	if err != nil && err != ErrNotFound {
+		return nil, fmt.Errorf("Error querying table schema: %v", err)
+	}
+
+	return tables, nil
+}
+
+// query for only the table metadata in the session's keyspace
+func getColumnMetadata(session *Session) ([]ColumnMetadata, error) {
+	// Deal with differences in protocol versions
+	var stmt string
+	var scan func(*Iter, *ColumnMetadata, *[]byte) bool
+	if session.cfg.ProtoVersion == 1 {
+		// V1 does not support the type column, and all returned rows are
+		// of kind "regular".
+		stmt = `
+			SELECT
+				columnfamily_name,
+				column_name,
+				component_index,
+				validator,
+				index_name,
+				index_type,
+				index_options
+			FROM system.schema_columns
+			WHERE keyspace_name = ?
+			`
+		scan = func(
+			iter *Iter,
+			column *ColumnMetadata,
+			indexOptionsJSON *[]byte,
+		) bool {
+			// all columns returned by V1 are regular
+			column.Kind = REGULAR
+			return iter.Scan(
+				&column.Table,
+				&column.Name,
+				&column.ComponentIndex,
+				&column.Validator,
+				&column.Index.Name,
+				&column.Index.Type,
+				&indexOptionsJSON,
+			)
+		}
+	} else {
+		// V2+ supports the type column
+		stmt = `
+			SELECT
+				columnfamily_name,
+				column_name,
+				component_index,
+				validator,
+				index_name,
+				index_type,
+				index_options,
+				type
+			FROM system.schema_columns
+			WHERE keyspace_name = ?
+			`
+		scan = func(
+			iter *Iter,
+			column *ColumnMetadata,
+			indexOptionsJSON *[]byte,
+		) bool {
+			return iter.Scan(
+				&column.Table,
+				&column.Name,
+				&column.ComponentIndex,
+				&column.Validator,
+				&column.Index.Name,
+				&column.Index.Type,
+				&indexOptionsJSON,
+				&column.Kind,
+			)
+		}
+	}
+
+	// get the columns metadata
+	columns := []ColumnMetadata{}
+	column := ColumnMetadata{Keyspace: session.cfg.Keyspace}
+
+	var indexOptionsJSON []byte
+
+	query := session.Query(stmt, session.cfg.Keyspace)
+	iter := query.Iter()
+
+	for scan(iter, &column, &indexOptionsJSON) {
+		var err error
+
+		// decode the index options
+		if indexOptionsJSON != nil {
+			err = json.Unmarshal(indexOptionsJSON, &column.Index.Options)
+			if err != nil {
+				iter.Close()
+				return nil, fmt.Errorf(
+					"Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
+					indexOptionsJSON,
+					column.Name,
+					column.Table,
+					err,
+				)
+			}
+		}
+
+		columns = append(columns, column)
+		column = ColumnMetadata{Keyspace: session.cfg.Keyspace}
+	}
+
+	err := iter.Close()
+	if err != nil && err != ErrNotFound {
+		return nil, fmt.Errorf("Error querying column schema: %v", err)
+	}
+
+	return columns, nil
+}
+
+// type definition parser state
+type typeParser struct {
+	input string
+	index int
+}
+
+// the type definition parser result
+type typeParserResult struct {
+	isComposite bool
+	types       []TypeInfo
+	reversed    []bool
+	collections map[string]TypeInfo
+}
+
+// Parse the type definition used for validator and comparator schema data
+func parseType(def string) typeParserResult {
+	parser := &typeParser{input: def}
+	return parser.parse()
+}
+
+const (
+	REVERSED_TYPE   = "org.apache.cassandra.db.marshal.ReversedType"
+	COMPOSITE_TYPE  = "org.apache.cassandra.db.marshal.CompositeType"
+	COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
+	LIST_TYPE       = "org.apache.cassandra.db.marshal.ListType"
+	SET_TYPE        = "org.apache.cassandra.db.marshal.SetType"
+	MAP_TYPE        = "org.apache.cassandra.db.marshal.MapType"
+)
+
+// represents a class specification in the type def AST
+type typeParserClassNode struct {
+	name   string
+	params []typeParserParamNode
+	// this is the segment of the input string that defined this node
+	input string
+}
+
+// represents a class parameter in the type def AST
+type typeParserParamNode struct {
+	name  *string
+	class typeParserClassNode
+}
+
+func (t *typeParser) parse() typeParserResult {
+	// parse the AST
+	ast, ok := t.parseClassNode()
+	if !ok {
+		// treat this is a custom type
+		return typeParserResult{
+			isComposite: false,
+			types: []TypeInfo{
+				TypeInfo{
+					Type:   TypeCustom,
+					Custom: t.input,
+				},
+			},
+			reversed:    []bool{false},
+			collections: nil,
+		}
+	}
+
+	// interpret the AST
+	if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
+		count := len(ast.params)
+
+		// look for a collections param
+		last := ast.params[count-1]
+		collections := map[string]TypeInfo{}
+		if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
+			count--
+
+			for _, param := range last.class.params {
+				// decode the name
+				var name string
+				decoded, err := hex.DecodeString(*param.name)
+				if err != nil {
+					log.Printf(
+						"Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
+						t.input,
+						*param.name,
+						err,
+					)
+					// just use the provided name
+					name = *param.name
+				} else {
+					name = string(decoded)
+				}
+				collections[name] = param.class.asTypeInfo()
+			}
+		}
+
+		types := make([]TypeInfo, count)
+		reversed := make([]bool, count)
+
+		for i, param := range ast.params[:count] {
+			class := param.class
+			reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
+			if reversed[i] {
+				class = class.params[0].class
+			}
+			types[i] = class.asTypeInfo()
+		}
+
+		return typeParserResult{
+			isComposite: true,
+			types:       types,
+			reversed:    reversed,
+			collections: collections,
+		}
+	} else {
+		// not composite, so one type
+		class := *ast
+		reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
+		if reversed {
+			class = class.params[0].class
+		}
+		typeInfo := class.asTypeInfo()
+
+		return typeParserResult{
+			isComposite: false,
+			types:       []TypeInfo{typeInfo},
+			reversed:    []bool{reversed},
+		}
+	}
+}
+
+func (class *typeParserClassNode) asTypeInfo() TypeInfo {
+	if strings.HasPrefix(class.name, LIST_TYPE) {
+		elem := class.params[0].class.asTypeInfo()
+		return TypeInfo{
+			Type: TypeList,
+			Elem: &elem,
+		}
+	}
+	if strings.HasPrefix(class.name, SET_TYPE) {
+		elem := class.params[0].class.asTypeInfo()
+		return TypeInfo{
+			Type: TypeSet,
+			Elem: &elem,
+		}
+	}
+	if strings.HasPrefix(class.name, MAP_TYPE) {
+		key := class.params[0].class.asTypeInfo()
+		elem := class.params[1].class.asTypeInfo()
+		return TypeInfo{
+			Type: TypeMap,
+			Key:  &key,
+			Elem: &elem,
+		}
+	}
+
+	// must be a simple type or custom type
+	info := TypeInfo{Type: getApacheCassandraType(class.name)}
+	if info.Type == TypeCustom {
+		// add the entire class definition
+		info.Custom = class.input
+	}
+	return info
+}
+
+// CLASS := ID [ PARAMS ]
+func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
+	t.skipWhitespace()
+
+	startIndex := t.index
+
+	name, ok := t.nextIdentifier()
+	if !ok {
+		return nil, false
+	}
+
+	params, ok := t.parseParamNodes()
+	if !ok {
+		return nil, false
+	}
+
+	endIndex := t.index
+
+	node = &typeParserClassNode{
+		name:   name,
+		params: params,
+		input:  t.input[startIndex:endIndex],
+	}
+	return node, true
+}
+
+// PARAMS := "(" PARAM { "," PARAM } ")"
+// PARAM := [ PARAM_NAME ":" ] CLASS
+// PARAM_NAME := ID
+func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
+	t.skipWhitespace()
+
+	// the params are optional
+	if t.index == len(t.input) || t.input[t.index] != '(' {
+		return nil, true
+	}
+
+	params = []typeParserParamNode{}
+
+	// consume the '('
+	t.index++
+
+	t.skipWhitespace()
+
+	for t.input[t.index] != ')' {
+		// look for a named param, but if no colon, then we want to backup
+		backupIndex := t.index
+
+		// name will be a hex encoded version of a utf-8 string
+		name, ok := t.nextIdentifier()
+		if !ok {
+			return nil, false
+		}
+		hasName := true
+
+		// TODO handle '=>' used for DynamicCompositeType
+
+		t.skipWhitespace()
+
+		if t.input[t.index] == ':' {
+			// there is a name for this parameter
+
+			// consume the ':'
+			t.index++
+
+			t.skipWhitespace()
+		} else {
+			// no name, backup
+			hasName = false
+			t.index = backupIndex
+		}
+
+		// parse the next full parameter
+		classNode, ok := t.parseClassNode()
+		if !ok {
+			return nil, false
+		}
+
+		if hasName {
+			params = append(
+				params,
+				typeParserParamNode{name: &name, class: *classNode},
+			)
+		} else {
+			params = append(
+				params,
+				typeParserParamNode{class: *classNode},
+			)
+		}
+
+		t.skipWhitespace()
+
+		if t.input[t.index] == ',' {
+			// consume the comma
+			t.index++
+
+			t.skipWhitespace()
+		}
+	}
+
+	// consume the ')'
+	t.index++
+
+	return params, true
+}
+
+func (t *typeParser) skipWhitespace() {
+	for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
+		t.index++
+	}
+}
+
+func isWhitespaceChar(c byte) bool {
+	return c == ' ' || c == '\n' || c == '\t'
+}
+
+// ID := LETTER { LETTER }
+// LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
+func (t *typeParser) nextIdentifier() (id string, found bool) {
+	startIndex := t.index
+	for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
+		t.index++
+	}
+	if startIndex == t.index {
+		return "", false
+	}
+	return t.input[startIndex:t.index], true
+}
+
+func isIdentifierChar(c byte) bool {
+	return (c >= '0' && c <= '9') ||
+		(c >= 'a' && c <= 'z') ||
+		(c >= 'A' && c <= 'Z') ||
+		c == '-' ||
+		c == '+' ||
+		c == '.' ||
+		c == '_' ||
+		c == '&'
+}

+ 670 - 0
metadata_test.go

@@ -0,0 +1,670 @@
+// Copyright (c) 2015 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gocql
+
+import (
+	"strconv"
+	"testing"
+)
+
+func TestCompileMetadata(t *testing.T) {
+	// V1 tests - these are all based on real examples from the integration test ccm cluster
+	keyspace := &KeyspaceMetadata{
+		Name: "V1Keyspace",
+	}
+	tables := []TableMetadata{
+		TableMetadata{
+			// This table, found in the system keyspace, has no key aliases or column aliases
+			Keyspace:         "V1Keyspace",
+			Name:             "Schema",
+			KeyValidator:     "org.apache.cassandra.db.marshal.BytesType",
+			Comparator:       "org.apache.cassandra.db.marshal.UTF8Type",
+			DefaultValidator: "org.apache.cassandra.db.marshal.BytesType",
+			KeyAliases:       []string{},
+			ColumnAliases:    []string{},
+			ValueAlias:       "",
+		},
+		TableMetadata{
+			// This table, found in the system keyspace, has key aliases, column aliases, and a value alias.
+			Keyspace:         "V1Keyspace",
+			Name:             "hints",
+			KeyValidator:     "org.apache.cassandra.db.marshal.UUIDType",
+			Comparator:       "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.TimeUUIDType,org.apache.cassandra.db.marshal.Int32Type)",
+			DefaultValidator: "org.apache.cassandra.db.marshal.BytesType",
+			KeyAliases:       []string{"target_id"},
+			ColumnAliases:    []string{"hint_id", "message_version"},
+			ValueAlias:       "mutation",
+		},
+		TableMetadata{
+			// This table, found in the system keyspace, has a comparator with collections, but no column aliases
+			Keyspace:         "V1Keyspace",
+			Name:             "peers",
+			KeyValidator:     "org.apache.cassandra.db.marshal.InetAddressType",
+			Comparator:       "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType(746f6b656e73:org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)))",
+			DefaultValidator: "org.apache.cassandra.db.marshal.BytesType",
+			KeyAliases:       []string{"peer"},
+			ColumnAliases:    []string{},
+			ValueAlias:       "",
+		},
+		TableMetadata{
+			// This table, found in the system keyspace, has a column alias, but not a composite comparator
+			Keyspace:         "V1Keyspace",
+			Name:             "IndexInfo",
+			KeyValidator:     "org.apache.cassandra.db.marshal.UTF8Type",
+			Comparator:       "org.apache.cassandra.db.marshal.UTF8Type",
+			DefaultValidator: "org.apache.cassandra.db.marshal.BytesType",
+			KeyAliases:       []string{"table_name"},
+			ColumnAliases:    []string{"index_name"},
+			ValueAlias:       "",
+		},
+		TableMetadata{
+			// This table, found in the gocql_test keyspace following an integration test run, has a composite comparator with collections as well as a column alias
+			Keyspace:         "V1Keyspace",
+			Name:             "wiki_page",
+			KeyValidator:     "org.apache.cassandra.db.marshal.UTF8Type",
+			Comparator:       "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.TimeUUIDType,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType(74616773:org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type),6174746163686d656e7473:org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.BytesType)))",
+			DefaultValidator: "org.apache.cassandra.db.marshal.BytesType",
+			KeyAliases:       []string{"title"},
+			ColumnAliases:    []string{"revid"},
+			ValueAlias:       "",
+		},
+	}
+	columns := []ColumnMetadata{
+		// Here are the regular columns from the peers table for testing regular columns
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "data_center", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "host_id", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UUIDType"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "rack", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "release_version", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "rpc_address", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.InetAddressType"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "schema_version", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UUIDType"},
+		ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "tokens", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)"},
+	}
+	compileMetadata(1, keyspace, tables, columns)
+	assertKeyspaceMetadata(
+		t,
+		keyspace,
+		&KeyspaceMetadata{
+			Name: "V1Keyspace",
+			Tables: map[string]*TableMetadata{
+				"Schema": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "key",
+							Type: TypeInfo{Type: TypeBlob},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{},
+					Columns: map[string]*ColumnMetadata{
+						"key": &ColumnMetadata{
+							Name: "key",
+							Type: TypeInfo{Type: TypeBlob},
+							Kind: PARTITION_KEY,
+						},
+					},
+				},
+				"hints": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "target_id",
+							Type: TypeInfo{Type: TypeUUID},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name:  "hint_id",
+							Type:  TypeInfo{Type: TypeTimeUUID},
+							Order: ASC,
+						},
+						&ColumnMetadata{
+							Name:  "message_version",
+							Type:  TypeInfo{Type: TypeInt},
+							Order: ASC,
+						},
+					},
+					Columns: map[string]*ColumnMetadata{
+						"target_id": &ColumnMetadata{
+							Name: "target_id",
+							Type: TypeInfo{Type: TypeUUID},
+							Kind: PARTITION_KEY,
+						},
+						"hint_id": &ColumnMetadata{
+							Name:  "hint_id",
+							Type:  TypeInfo{Type: TypeTimeUUID},
+							Order: ASC,
+							Kind:  CLUSTERING_KEY,
+						},
+						"message_version": &ColumnMetadata{
+							Name:  "message_version",
+							Type:  TypeInfo{Type: TypeInt},
+							Order: ASC,
+							Kind:  CLUSTERING_KEY,
+						},
+						"mutation": &ColumnMetadata{
+							Name: "mutation",
+							Type: TypeInfo{Type: TypeBlob},
+							Kind: REGULAR,
+						},
+					},
+				},
+				"peers": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "peer",
+							Type: TypeInfo{Type: TypeInet},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{},
+					Columns: map[string]*ColumnMetadata{
+						"peer": &ColumnMetadata{
+							Name: "peer",
+							Type: TypeInfo{Type: TypeInet},
+							Kind: PARTITION_KEY,
+						},
+						"data_center":     &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "data_center", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type", Type: TypeInfo{Type: TypeVarchar}},
+						"host_id":         &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "host_id", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UUIDType", Type: TypeInfo{Type: TypeUUID}},
+						"rack":            &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "rack", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type", Type: TypeInfo{Type: TypeVarchar}},
+						"release_version": &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "release_version", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UTF8Type", Type: TypeInfo{Type: TypeVarchar}},
+						"rpc_address":     &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "rpc_address", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.InetAddressType", Type: TypeInfo{Type: TypeInet}},
+						"schema_version":  &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "schema_version", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.UUIDType", Type: TypeInfo{Type: TypeUUID}},
+						"tokens":          &ColumnMetadata{Keyspace: "V1Keyspace", Table: "peers", Kind: REGULAR, Name: "tokens", ComponentIndex: 0, Validator: "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)", Type: TypeInfo{Type: TypeSet}},
+					},
+				},
+				"IndexInfo": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "table_name",
+							Type: TypeInfo{Type: TypeVarchar},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name:  "index_name",
+							Type:  TypeInfo{Type: TypeVarchar},
+							Order: ASC,
+						},
+					},
+					Columns: map[string]*ColumnMetadata{
+						"table_name": &ColumnMetadata{
+							Name: "table_name",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: PARTITION_KEY,
+						},
+						"index_name": &ColumnMetadata{
+							Name: "index_name",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: CLUSTERING_KEY,
+						},
+						"value": &ColumnMetadata{
+							Name: "value",
+							Type: TypeInfo{Type: TypeBlob},
+							Kind: REGULAR,
+						},
+					},
+				},
+				"wiki_page": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "title",
+							Type: TypeInfo{Type: TypeVarchar},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name:  "revid",
+							Type:  TypeInfo{Type: TypeTimeUUID},
+							Order: ASC,
+						},
+					},
+					Columns: map[string]*ColumnMetadata{
+						"title": &ColumnMetadata{
+							Name: "title",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: PARTITION_KEY,
+						},
+						"revid": &ColumnMetadata{
+							Name: "revid",
+							Type: TypeInfo{Type: TypeTimeUUID},
+							Kind: CLUSTERING_KEY,
+						},
+					},
+				},
+			},
+		},
+	)
+
+	// V2 test - V2+ protocol is simpler so here are some toy examples to verify that the mapping works
+	keyspace = &KeyspaceMetadata{
+		Name: "V2Keyspace",
+	}
+	tables = []TableMetadata{
+		TableMetadata{
+			Keyspace: "V2Keyspace",
+			Name:     "Table1",
+		},
+		TableMetadata{
+			Keyspace: "V2Keyspace",
+			Name:     "Table2",
+		},
+	}
+	columns = []ColumnMetadata{
+		ColumnMetadata{
+			Keyspace:  "V2Keyspace",
+			Table:     "Table1",
+			Name:      "Key1",
+			Kind:      PARTITION_KEY,
+			Validator: "org.apache.cassandra.db.marshal.UTF8Type",
+		},
+		ColumnMetadata{
+			Keyspace:  "V2Keyspace",
+			Table:     "Table2",
+			Name:      "Column1",
+			Kind:      PARTITION_KEY,
+			Validator: "org.apache.cassandra.db.marshal.UTF8Type",
+		},
+		ColumnMetadata{
+			Keyspace:  "V2Keyspace",
+			Table:     "Table2",
+			Name:      "Column2",
+			Kind:      CLUSTERING_KEY,
+			Validator: "org.apache.cassandra.db.marshal.UTF8Type",
+		},
+		ColumnMetadata{
+			Keyspace:  "V2Keyspace",
+			Table:     "Table2",
+			Name:      "Column3",
+			Kind:      REGULAR,
+			Validator: "org.apache.cassandra.db.marshal.UTF8Type",
+		},
+	}
+	compileMetadata(2, keyspace, tables, columns)
+	assertKeyspaceMetadata(
+		t,
+		keyspace,
+		&KeyspaceMetadata{
+			Name: "V2Keyspace",
+			Tables: map[string]*TableMetadata{
+				"Table1": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "Key1",
+							Type: TypeInfo{Type: TypeVarchar},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{},
+					Columns: map[string]*ColumnMetadata{
+						"Key1": &ColumnMetadata{
+							Name: "Key1",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: PARTITION_KEY,
+						},
+					},
+				},
+				"Table2": &TableMetadata{
+					PartitionKey: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "Column1",
+							Type: TypeInfo{Type: TypeVarchar},
+						},
+					},
+					ClusteringColumns: []*ColumnMetadata{
+						&ColumnMetadata{
+							Name: "Column2",
+							Type: TypeInfo{Type: TypeVarchar},
+						},
+					},
+					Columns: map[string]*ColumnMetadata{
+						"Column1": &ColumnMetadata{
+							Name: "Column1",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: PARTITION_KEY,
+						},
+						"Column2": &ColumnMetadata{
+							Name: "Column2",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: CLUSTERING_KEY,
+						},
+						"Column3": &ColumnMetadata{
+							Name: "Column3",
+							Type: TypeInfo{Type: TypeVarchar},
+							Kind: REGULAR,
+						},
+					},
+				},
+			},
+		},
+	)
+}
+
+func assertKeyspaceMetadata(t *testing.T, actual, expected *KeyspaceMetadata) {
+	if len(expected.Tables) != len(actual.Tables) {
+		t.Errorf("Expected len(%s.Tables) to be %v but was %v", expected.Name, len(expected.Tables), len(actual.Tables))
+	}
+	for keyT := range expected.Tables {
+		et := expected.Tables[keyT]
+		at, found := actual.Tables[keyT]
+
+		if !found {
+			t.Errorf("Expected %s.Tables[%s] but was not found", expected.Name, keyT)
+		} else {
+			if keyT != at.Name {
+				t.Errorf("Expected %s.Tables[%s].Name to be %v but was %v", expected.Name, keyT, keyT, at.Name)
+			}
+			if len(et.PartitionKey) != len(at.PartitionKey) {
+				t.Errorf("Expected len(%s.Tables[%s].PartitionKey) to be %v but was %v", expected.Name, keyT, len(et.PartitionKey), len(at.PartitionKey))
+			} else {
+				for i := range et.PartitionKey {
+					if et.PartitionKey[i].Name != at.PartitionKey[i].Name {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].Name to be '%v' but was '%v'", expected.Name, keyT, i, et.PartitionKey[i].Name, at.PartitionKey[i].Name)
+					}
+					if expected.Name != at.PartitionKey[i].Keyspace {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].Keyspace to be '%v' but was '%v'", expected.Name, keyT, i, expected.Name, at.PartitionKey[i].Keyspace)
+					}
+					if keyT != at.PartitionKey[i].Table {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].Table to be '%v' but was '%v'", expected.Name, keyT, i, keyT, at.PartitionKey[i].Table)
+					}
+					if et.PartitionKey[i].Type.Type != at.PartitionKey[i].Type.Type {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].Type.Type to be %v but was %v", expected.Name, keyT, i, et.PartitionKey[i].Type.Type, at.PartitionKey[i].Type.Type)
+					}
+					if i != at.PartitionKey[i].ComponentIndex {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].ComponentIndex to be %v but was %v", expected.Name, keyT, i, i, at.PartitionKey[i].ComponentIndex)
+					}
+					if PARTITION_KEY != at.PartitionKey[i].Kind {
+						t.Errorf("Expected %s.Tables[%s].PartitionKey[%d].Kind to be '%v' but was '%v'", expected.Name, keyT, i, PARTITION_KEY, at.PartitionKey[i].Kind)
+					}
+				}
+			}
+			if len(et.ClusteringColumns) != len(at.ClusteringColumns) {
+				t.Errorf("Expected len(%s.Tables[%s].ClusteringColumns) to be %v but was %v", expected.Name, keyT, len(et.ClusteringColumns), len(at.ClusteringColumns))
+			} else {
+				for i := range et.ClusteringColumns {
+					if et.ClusteringColumns[i].Name != at.ClusteringColumns[i].Name {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Name to be '%v' but was '%v'", expected.Name, keyT, i, et.ClusteringColumns[i].Name, at.ClusteringColumns[i].Name)
+					}
+					if expected.Name != at.ClusteringColumns[i].Keyspace {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Keyspace to be '%v' but was '%v'", expected.Name, keyT, i, expected.Name, at.ClusteringColumns[i].Keyspace)
+					}
+					if keyT != at.ClusteringColumns[i].Table {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Table to be '%v' but was '%v'", expected.Name, keyT, i, keyT, at.ClusteringColumns[i].Table)
+					}
+					if et.ClusteringColumns[i].Type.Type != at.ClusteringColumns[i].Type.Type {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Type.Type to be %v but was %v", expected.Name, keyT, i, et.ClusteringColumns[i].Type.Type, at.ClusteringColumns[i].Type.Type)
+					}
+					if i != at.ClusteringColumns[i].ComponentIndex {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].ComponentIndex to be %v but was %v", expected.Name, keyT, i, i, at.ClusteringColumns[i].ComponentIndex)
+					}
+					if et.ClusteringColumns[i].Order != at.ClusteringColumns[i].Order {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Order to be %v but was %v", expected.Name, keyT, i, et.ClusteringColumns[i].Order, at.ClusteringColumns[i].Order)
+					}
+					if CLUSTERING_KEY != at.ClusteringColumns[i].Kind {
+						t.Errorf("Expected %s.Tables[%s].ClusteringColumns[%d].Kind to be '%v' but was '%v'", expected.Name, keyT, i, CLUSTERING_KEY, at.ClusteringColumns[i].Kind)
+					}
+				}
+			}
+			if len(et.Columns) != len(at.Columns) {
+				eKeys := make([]string, 0, len(et.Columns))
+				for key := range et.Columns {
+					eKeys = append(eKeys, key)
+				}
+				aKeys := make([]string, 0, len(at.Columns))
+				for key := range at.Columns {
+					aKeys = append(aKeys, key)
+				}
+				t.Errorf("Expected len(%s.Tables[%s].Columns) to be %v (keys:%v) but was %v (keys:%v)", expected.Name, keyT, len(et.Columns), eKeys, len(at.Columns), aKeys)
+			} else {
+				for keyC := range et.Columns {
+					ec := et.Columns[keyC]
+					ac, found := at.Columns[keyC]
+
+					if !found {
+						t.Errorf("Expected %s.Tables[%s].Columns[%s] but was not found", expected.Name, keyT, keyC)
+					} else {
+						if keyC != ac.Name {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Name to be '%v' but was '%v'", expected.Name, keyT, keyC, keyC, at.Name)
+						}
+						if expected.Name != ac.Keyspace {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Keyspace to be '%v' but was '%v'", expected.Name, keyT, keyC, expected.Name, ac.Keyspace)
+						}
+						if keyT != ac.Table {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Table to be '%v' but was '%v'", expected.Name, keyT, keyC, keyT, ac.Table)
+						}
+						if ec.Type.Type != ac.Type.Type {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Type.Type to be %v but was %v", expected.Name, keyT, keyC, ec.Type.Type, ac.Type.Type)
+						}
+						if ec.Order != ac.Order {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Order to be %v but was %v", expected.Name, keyT, keyC, ec.Order, ac.Order)
+						}
+						if ec.Kind != ac.Kind {
+							t.Errorf("Expected %s.Tables[%s].Columns[%s].Kind to be '%v' but was '%v'", expected.Name, keyT, keyC, ec.Kind, ac.Kind)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func TestTypeParser(t *testing.T) {
+	// native type
+	assertParseNonCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.UTF8Type",
+		assertTypeInfo{Type: TypeVarchar},
+	)
+
+	// reversed
+	assertParseNonCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UUIDType)",
+		assertTypeInfo{Type: TypeUUID, Reversed: true},
+	)
+
+	// set
+	assertParseNonCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.Int32Type)",
+		assertTypeInfo{
+			Type: TypeSet,
+			Elem: &assertTypeInfo{Type: TypeInt},
+		},
+	)
+
+	// map
+	assertParseNonCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UUIDType,org.apache.cassandra.db.marshal.BytesType)",
+		assertTypeInfo{
+			Type: TypeMap,
+			Key:  &assertTypeInfo{Type: TypeUUID},
+			Elem: &assertTypeInfo{Type: TypeBlob},
+		},
+	)
+
+	// custom
+	assertParseNonCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.DynamicCompositeType(u=>org.apache.cassandra.db.marshal.UUIDType,d=>org.apache.cassandra.db.marshal.DateType,t=>org.apache.cassandra.db.marshal.TimeUUIDType,b=>org.apache.cassandra.db.marshal.BytesType,s=>org.apache.cassandra.db.marshal.UTF8Type,B=>org.apache.cassandra.db.marshal.BooleanType,a=>org.apache.cassandra.db.marshal.AsciiType,l=>org.apache.cassandra.db.marshal.LongType,i=>org.apache.cassandra.db.marshal.IntegerType,x=>org.apache.cassandra.db.marshal.LexicalUUIDType)",
+		assertTypeInfo{Type: TypeCustom, Custom: "org.apache.cassandra.db.marshal.DynamicCompositeType(u=>org.apache.cassandra.db.marshal.UUIDType,d=>org.apache.cassandra.db.marshal.DateType,t=>org.apache.cassandra.db.marshal.TimeUUIDType,b=>org.apache.cassandra.db.marshal.BytesType,s=>org.apache.cassandra.db.marshal.UTF8Type,B=>org.apache.cassandra.db.marshal.BooleanType,a=>org.apache.cassandra.db.marshal.AsciiType,l=>org.apache.cassandra.db.marshal.LongType,i=>org.apache.cassandra.db.marshal.IntegerType,x=>org.apache.cassandra.db.marshal.LexicalUUIDType)"},
+	)
+
+	// composite defs
+	assertParseCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type)",
+		[]assertTypeInfo{
+			assertTypeInfo{Type: TypeVarchar},
+		},
+		nil,
+	)
+	assertParseCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.DateType,org.apache.cassandra.db.marshal.UTF8Type)",
+		[]assertTypeInfo{
+			assertTypeInfo{Type: TypeTimestamp},
+			assertTypeInfo{Type: TypeVarchar},
+		},
+		nil,
+	)
+	assertParseCompositeType(
+		t,
+		"org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType(726f77735f6d6572676564:org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.LongType)))",
+		[]assertTypeInfo{
+			assertTypeInfo{Type: TypeVarchar},
+		},
+		map[string]assertTypeInfo{
+			"rows_merged": assertTypeInfo{
+				Type: TypeMap,
+				Key:  &assertTypeInfo{Type: TypeInt},
+				Elem: &assertTypeInfo{Type: TypeBigInt},
+			},
+		},
+	)
+}
+
+//---------------------------------------
+// some code to assert the parser result
+//---------------------------------------
+
+type assertTypeInfo struct {
+	Type     Type
+	Reversed bool
+	Elem     *assertTypeInfo
+	Key      *assertTypeInfo
+	Custom   string
+}
+
+func assertParseNonCompositeType(
+	t *testing.T,
+	def string,
+	typeExpected assertTypeInfo,
+) {
+
+	result := parseType(def)
+	if len(result.reversed) != 1 {
+		t.Errorf("%s expected %d reversed values but there were %d", def, 1, len(result.reversed))
+	}
+
+	assertParseNonCompositeTypes(
+		t,
+		def,
+		[]assertTypeInfo{typeExpected},
+		result.types,
+	)
+
+	// expect no composite part of the result
+	if result.isComposite {
+		t.Errorf("%s: Expected not composite", def)
+	}
+	if result.collections != nil {
+		t.Errorf("%s: Expected nil collections: %v", def, result.collections)
+	}
+}
+
+func assertParseCompositeType(
+	t *testing.T,
+	def string,
+	typesExpected []assertTypeInfo,
+	collectionsExpected map[string]assertTypeInfo,
+) {
+
+	result := parseType(def)
+	if len(result.reversed) != len(typesExpected) {
+		t.Errorf("%s expected %d reversed values but there were %d", def, len(typesExpected), len(result.reversed))
+	}
+
+	assertParseNonCompositeTypes(
+		t,
+		def,
+		typesExpected,
+		result.types,
+	)
+
+	// expect composite part of the result
+	if !result.isComposite {
+		t.Errorf("%s: Expected composite", def)
+	}
+	if result.collections == nil {
+		t.Errorf("%s: Expected non-nil collections: %v", def, result.collections)
+	}
+
+	for name, typeExpected := range collectionsExpected {
+		// check for an actual type for this name
+		typeActual, found := result.collections[name]
+		if !found {
+			t.Errorf("%s.tcollections: Expected param named %s but there wasn't", def, name)
+		} else {
+			// remove the actual from the collection so we can detect extras
+			delete(result.collections, name)
+
+			// check the type
+			assertParseNonCompositeTypes(
+				t,
+				def+"collections["+name+"]",
+				[]assertTypeInfo{typeExpected},
+				[]TypeInfo{typeActual},
+			)
+		}
+	}
+
+	if len(result.collections) != 0 {
+		t.Errorf("%s.collections: Expected no more types in collections, but there was %v", def, result.collections)
+	}
+}
+
+func assertParseNonCompositeTypes(
+	t *testing.T,
+	context string,
+	typesExpected []assertTypeInfo,
+	typesActual []TypeInfo,
+) {
+	if len(typesActual) != len(typesExpected) {
+		t.Errorf("%s: Expected %d types, but there were %d", context, len(typesExpected), len(typesActual))
+	}
+
+	for i := range typesExpected {
+		typeExpected := typesExpected[i]
+		typeActual := typesActual[i]
+
+		// shadow copy the context for local modification
+		context := context
+		if len(typesExpected) > 1 {
+			context = context + "[" + strconv.Itoa(i) + "]"
+		}
+
+		// check the type
+		if typeActual.Type != typeExpected.Type {
+			t.Errorf("%s: Expected to parse Type to %s but was %s", context, typeExpected.Type, typeActual.Type)
+		}
+		// check the custom
+		if typeActual.Custom != typeExpected.Custom {
+			t.Errorf("%s: Expected to parse Custom %s but was %s", context, typeExpected.Custom, typeActual.Custom)
+		}
+		// check the elem
+		if typeActual.Elem == nil && typeExpected.Elem != nil {
+			t.Errorf("%s: Expected to parse Elem, but was nil ", context)
+		} else if typeExpected.Elem == nil && typeActual.Elem != nil {
+			t.Errorf("%s: Expected to not parse Elem, but was %+v", context, typeActual.Elem)
+		} else if typeActual.Elem != nil && typeExpected.Elem != nil {
+			assertParseNonCompositeTypes(
+				t,
+				context+".Elem",
+				[]assertTypeInfo{*typeExpected.Elem},
+				[]TypeInfo{*typeActual.Elem},
+			)
+		}
+		// check the key
+		if typeActual.Key == nil && typeExpected.Key != nil {
+			t.Errorf("%s: Expected to parse Key, but was nil ", context)
+		} else if typeExpected.Key == nil && typeActual.Key != nil {
+			t.Errorf("%s: Expected to not parse Key, but was %+v", context, typeActual.Key)
+		} else if typeActual.Key != nil && typeExpected.Key != nil {
+			assertParseNonCompositeTypes(
+				t,
+				context+".Key",
+				[]assertTypeInfo{*typeExpected.Key},
+				[]TypeInfo{*typeActual.Key},
+			)
+		}
+	}
+}

+ 18 - 6
session.go

@@ -24,12 +24,13 @@ import (
 // and automatically sets a default consinstency level on all operations
 // that do not have a consistency level set.
 type Session struct {
-	Pool     ConnectionPool
-	cons     Consistency
-	pageSize int
-	prefetch float64
-	trace    Tracer
-	mu       sync.RWMutex
+	Pool            ConnectionPool
+	cons            Consistency
+	pageSize        int
+	prefetch        float64
+	schemaDescriber *schemaDescriber
+	trace           Tracer
+	mu              sync.RWMutex
 
 	cfg ClusterConfig
 
@@ -163,6 +164,17 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	return iter
 }
 
+// Returns the schema metadata for the keyspace of this session including
+// table and column schema metadata.
+func (s *Session) KeyspaceMetadata() (*KeyspaceMetadata, error) {
+	s.mu.Lock()
+	if s.schemaDescriber == nil {
+		s.schemaDescriber = &schemaDescriber{session: s}
+	}
+	s.mu.Unlock()
+	return s.schemaDescriber.getSchema()
+}
+
 // ExecuteBatch executes a batch operation and returns nil if successful
 // otherwise an error is returned describing the failure.
 func (s *Session) ExecuteBatch(batch *Batch) error {