浏览代码

Improve support for complicated cassandra schemas.

In some schemas there may be "partition_key" type columns returned in the V2
protocol metadata which have the same component index; in our case we have
a couple column families with aliased columns "KEY" and "key" for historical
reasons. This change is based on the DataStax driver's behavior for columns
with the same component index, which prefers the last column in order defined
for that component index; the consequence of this behavior is that other column
names defined for the same component index will not be identified as part of a
routing key.

Also, "compact_value" has been added as a column kind/type.
Justin Corpron 10 年之前
父节点
当前提交
d24426d551
共有 2 个文件被更改,包括 26 次插入9 次删除
  1. 13 9
      metadata.go
  2. 13 0
      metadata_test.go

+ 13 - 9
metadata.go

@@ -36,6 +36,7 @@ type TableMetadata struct {
 	PartitionKey      []*ColumnMetadata
 	ClusteringColumns []*ColumnMetadata
 	Columns           map[string]*ColumnMetadata
+	OrderedColumns    []string
 }
 
 // schema metadata for a column
@@ -70,6 +71,7 @@ const (
 	PARTITION_KEY  = "partition_key"
 	CLUSTERING_KEY = "clustering_key"
 	REGULAR        = "regular"
+	COMPACT_VALUE  = "compact_value"
 )
 
 // default alias values
@@ -177,6 +179,7 @@ func compileMetadata(
 
 		table := keyspace.Tables[columns[i].Table]
 		table.Columns[columns[i].Name] = &columns[i]
+		table.OrderedColumns = append(table.OrderedColumns, columns[i].Name)
 	}
 
 	if protoVersion == 1 {
@@ -302,13 +305,14 @@ func compileV2Metadata(tables []TableMetadata) {
 	for i := range tables {
 		table := &tables[i]
 
-		partitionColumnCount := countColumnsOfKind(table.Columns, PARTITION_KEY)
-		table.PartitionKey = make([]*ColumnMetadata, partitionColumnCount)
+		keyValidatorParsed := parseType(table.KeyValidator)
+		table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
 
-		clusteringColumnCount := countColumnsOfKind(table.Columns, CLUSTERING_KEY)
+		clusteringColumnCount := componentColumnCountOfType(table.Columns, CLUSTERING_KEY)
 		table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
 
-		for _, column := range table.Columns {
+		for _, columnName := range table.OrderedColumns {
+			column := table.Columns[columnName]
 			if column.Kind == PARTITION_KEY {
 				table.PartitionKey[column.ComponentIndex] = column
 			} else if column.Kind == CLUSTERING_KEY {
@@ -320,14 +324,14 @@ func compileV2Metadata(tables []TableMetadata) {
 }
 
 // returns the count of coluns with the given "kind" value.
-func countColumnsOfKind(columns map[string]*ColumnMetadata, kind string) int {
-	count := 0
+func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind string) int {
+	maxComponentIndex := -1
 	for _, column := range columns {
-		if column.Kind == kind {
-			count++
+		if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
+			maxComponentIndex = column.ComponentIndex
 		}
 	}
-	return count
+	return maxComponentIndex + 1
 }
 
 // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace

+ 13 - 0
metadata_test.go

@@ -327,6 +327,14 @@ func TestCompileMetadata(t *testing.T) {
 		},
 	}
 	columns = []ColumnMetadata{
+		ColumnMetadata{
+			Keyspace:       "V2Keyspace",
+			Table:          "Table1",
+			Name:           "KEY1",
+			Kind:           PARTITION_KEY,
+			ComponentIndex: 0,
+			Validator:      "org.apache.cassandra.db.marshal.UTF8Type",
+		},
 		ColumnMetadata{
 			Keyspace:       "V2Keyspace",
 			Table:          "Table1",
@@ -383,6 +391,11 @@ func TestCompileMetadata(t *testing.T) {
 					},
 					ClusteringColumns: []*ColumnMetadata{},
 					Columns: map[string]*ColumnMetadata{
+						"KEY1": &ColumnMetadata{
+							Name: "KEY1",
+							Type: NativeType{typ: TypeVarchar},
+							Kind: PARTITION_KEY,
+						},
 						"Key1": &ColumnMetadata{
 							Name: "Key1",
 							Type: NativeType{typ: TypeVarchar},