Browse Source

Merge pull request #667 from ChannelMeter/cassandra3x/metadata

Add support for Cassandra 3 system_schema metadata
Chris Bannister 9 years ago
parent
commit
15590c2435
3 changed files with 169 additions and 36 deletions
  1. 54 0
      helpers.go
  2. 110 34
      metadata.go
  3. 5 2
      session.go

+ 54 - 0
helpers.go

@@ -62,6 +62,60 @@ func dereference(i interface{}) interface{} {
 	return reflect.Indirect(reflect.ValueOf(i)).Interface()
 }
 
+func getCassandraType(name string) Type {
+	switch name {
+	case "ascii":
+		return TypeAscii
+	case "bigint":
+		return TypeBigInt
+	case "blob":
+		return TypeBlob
+	case "boolean":
+		return TypeBoolean
+	case "counter":
+		return TypeCounter
+	case "decimal":
+		return TypeDecimal
+	case "double":
+		return TypeDouble
+	case "float":
+		return TypeFloat
+	case "int":
+		return TypeInt
+	case "timestamp":
+		return TypeTimestamp
+	case "uuid":
+		return TypeUUID
+	case "varchar", "text":
+		return TypeVarchar
+	case "varint":
+		return TypeVarint
+	case "timeuuid":
+		return TypeTimeUUID
+	case "inet":
+		return TypeInet
+	case "MapType":
+		return TypeMap
+	case "ListType":
+		return TypeList
+	case "SetType":
+		return TypeSet
+	case "TupleType":
+		return TypeTuple
+	default:
+		if strings.HasPrefix(name, "set") {
+			return TypeSet
+		} else if strings.HasPrefix(name, "list") {
+			return TypeList
+		} else if strings.HasPrefix(name, "map") {
+			return TypeMap
+		} else if strings.HasPrefix(name, "tuple") {
+			return TypeTuple
+		}
+		return TypeCustom
+	}
+}
+
 func getApacheCassandraType(class string) Type {
 	switch strings.TrimPrefix(class, apacheCassandraTypePrefix) {
 	case "AsciiType":

+ 110 - 34
metadata.go

@@ -41,15 +41,16 @@ type TableMetadata struct {
 
 // schema metadata for a column
 type ColumnMetadata struct {
-	Keyspace       string
-	Table          string
-	Name           string
-	ComponentIndex int
-	Kind           string
-	Validator      string
-	Type           TypeInfo
-	Order          ColumnOrder
-	Index          ColumnIndexMetadata
+	Keyspace        string
+	Table           string
+	Name            string
+	ComponentIndex  int
+	Kind            string
+	Validator       string
+	Type            TypeInfo
+	ClusteringOrder string
+	Order           ColumnOrder
+	Index           ColumnIndexMetadata
 }
 
 // the ordering of the column with regard to its comparator
@@ -170,11 +171,19 @@ func compileMetadata(
 	// add columns from the schema data
 	for i := range columns {
 		// decode the validator for TypeInfo and order
-		validatorParsed := parseType(columns[i].Validator)
-		columns[i].Type = validatorParsed.types[0]
-		columns[i].Order = ASC
-		if validatorParsed.reversed[0] {
-			columns[i].Order = DESC
+		if columns[i].ClusteringOrder != "" { // Cassandra 3.x+
+			columns[i].Type = NativeType{typ: getCassandraType(columns[i].Validator)}
+			columns[i].Order = ASC
+			if columns[i].ClusteringOrder == "desc" {
+				columns[i].Order = DESC
+			}
+		} else {
+			validatorParsed := parseType(columns[i].Validator)
+			columns[i].Type = validatorParsed.types[0]
+			columns[i].Order = ASC
+			if validatorParsed.reversed[0] {
+				columns[i].Order = DESC
+			}
 		}
 
 		table := keyspace.Tables[columns[i].Table]
@@ -305,12 +314,17 @@ func compileV2Metadata(tables []TableMetadata) {
 	for i := range tables {
 		table := &tables[i]
 
-		keyValidatorParsed := parseType(table.KeyValidator)
-		table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
-
 		clusteringColumnCount := componentColumnCountOfType(table.Columns, CLUSTERING_KEY)
 		table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
 
+		if table.KeyValidator != "" {
+			keyValidatorParsed := parseType(table.KeyValidator)
+			table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
+		} else { // Cassandra 3.x+
+			partitionKeyCount := componentColumnCountOfType(table.Columns, PARTITION_KEY)
+			table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
+		}
+
 		for _, columnName := range table.OrderedColumns {
 			column := table.Columns[columnName]
 			if column.Kind == PARTITION_KEY {
@@ -319,7 +333,6 @@ func compileV2Metadata(tables []TableMetadata) {
 				table.ClusteringColumns[column.ComponentIndex] = column
 			}
 		}
-
 	}
 }
 
@@ -336,27 +349,52 @@ func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind string)
 
 // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
 func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
-	const stmt = `
+	keyspace := &KeyspaceMetadata{Name: keyspaceName}
+
+	if session.useSystemSchema { // Cassandra 3.x+
+		const stmt = `
+		SELECT durable_writes, replication
+		FROM system_schema.keyspaces
+		WHERE keyspace_name = ?`
+
+		var replication map[string]string
+
+		iter := session.control.query(stmt, keyspaceName)
+		iter.Scan(&keyspace.DurableWrites, &replication)
+		err := iter.Close()
+		if err != nil {
+			return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
+		}
+
+		keyspace.StrategyClass = replication["class"]
+
+		keyspace.StrategyOptions = make(map[string]interface{})
+		for k, v := range replication {
+			keyspace.StrategyOptions[k] = v
+		}
+	} else {
+
+		const stmt = `
 		SELECT durable_writes, strategy_class, strategy_options
 		FROM system.schema_keyspaces
 		WHERE keyspace_name = ?`
 
-	keyspace := &KeyspaceMetadata{Name: keyspaceName}
-	var strategyOptionsJSON []byte
+		var strategyOptionsJSON []byte
 
-	iter := session.control.query(stmt, keyspaceName)
-	iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
-	err := iter.Close()
-	if err != nil {
-		return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
-	}
+		iter := session.control.query(stmt, keyspaceName)
+		iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
+		err := iter.Close()
+		if err != nil {
+			return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
+		}
 
-	err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
-	if err != nil {
-		return nil, fmt.Errorf(
-			"Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
-			strategyOptionsJSON, keyspace.Name, err,
-		)
+		err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
+		if err != nil {
+			return nil, fmt.Errorf(
+				"Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
+				strategyOptionsJSON, keyspace.Name, err,
+			)
+		}
 	}
 
 	return keyspace, nil
@@ -373,7 +411,19 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
 		columnAliasesJSON []byte
 	)
 
-	if session.cfg.ProtoVersion < protoVersion4 {
+	if session.useSystemSchema { // Cassandra 3.x+
+		stmt = `
+		SELECT
+			table_name
+		FROM system_schema.tables
+		WHERE keyspace_name = ?`
+
+		scan = func(iter *Iter, table *TableMetadata) bool {
+			return iter.Scan(
+				&table.Name,
+			)
+		}
+	} else if session.cfg.ProtoVersion < protoVersion4 {
 		// we have key aliases
 		// TODO: Do we need key_aliases?
 		stmt = `
@@ -505,6 +555,32 @@ func getColumnMetadata(
 				&indexOptionsJSON,
 			)
 		}
+	} else if session.useSystemSchema { // Cassandra 3.x+
+		stmt = `
+			SELECT
+				table_name,
+				column_name,
+				clustering_order,
+				type,
+				kind,
+				position
+			FROM system_schema.columns
+			WHERE keyspace_name = ?
+			`
+		scan = func(
+			iter *Iter,
+			column *ColumnMetadata,
+			indexOptionsJSON *[]byte,
+		) bool {
+			return iter.Scan(
+				&column.Table,
+				&column.Name,
+				&column.ClusteringOrder,
+				&column.Validator,
+				&column.Kind,
+				&column.ComponentIndex,
+			)
+		}
 	} else {
 		// V2+ supports the type column
 		stmt = `

+ 5 - 2
session.go

@@ -53,7 +53,8 @@ type Session struct {
 	schemaEvents *eventDeouncer
 
 	// ring metadata
-	hosts []HostInfo
+	hosts           []HostInfo
+	useSystemSchema bool
 
 	cfg ClusterConfig
 
@@ -166,6 +167,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		return nil, ErrNoConnectionsStarted
 	}
 
+	s.useSystemSchema = hosts[0].Version().Major >= 3
+
 	return s, nil
 }
 
@@ -427,7 +430,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	table := info.request.columns[0].Table
 
 	var keyspaceMetadata *KeyspaceMetadata
-	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
+	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
 	if inflight.err != nil {
 		// don't cache this error
 		s.routingKeyInfoCache.Remove(stmt)