| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332 |
- // Copyright (c) 2015 The gocql Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package gocql
- import (
- "encoding/hex"
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "sync"
- )
- // schema metadata for a keyspace
- type KeyspaceMetadata struct {
- Name string
- DurableWrites bool
- StrategyClass string
- StrategyOptions map[string]interface{}
- Tables map[string]*TableMetadata
- Functions map[string]*FunctionMetadata
- Aggregates map[string]*AggregateMetadata
- Views map[string]*ViewMetadata
- }
- // schema metadata for a table (a.k.a. column family)
- type TableMetadata struct {
- Keyspace string
- Name string
- KeyValidator string
- Comparator string
- DefaultValidator string
- KeyAliases []string
- ColumnAliases []string
- ValueAlias string
- PartitionKey []*ColumnMetadata
- ClusteringColumns []*ColumnMetadata
- Columns map[string]*ColumnMetadata
- OrderedColumns []string
- }
- // schema metadata for a column
- type ColumnMetadata struct {
- Keyspace string
- Table string
- Name string
- ComponentIndex int
- Kind ColumnKind
- Validator string
- Type TypeInfo
- ClusteringOrder string
- Order ColumnOrder
- Index ColumnIndexMetadata
- }
- // FunctionMetadata holds metadata for function constructs
- type FunctionMetadata struct {
- Keyspace string
- Name string
- ArgumentTypes []TypeInfo
- ArgumentNames []string
- Body string
- CalledOnNullInput bool
- Language string
- ReturnType TypeInfo
- }
- // AggregateMetadata holds metadata for aggregate constructs
- type AggregateMetadata struct {
- Keyspace string
- Name string
- ArgumentTypes []TypeInfo
- FinalFunc FunctionMetadata
- InitCond string
- ReturnType TypeInfo
- StateFunc FunctionMetadata
- StateType TypeInfo
- stateFunc string
- finalFunc string
- }
- // ViewMetadata holds the metadata for views.
- type ViewMetadata struct {
- Keyspace string
- Name string
- FieldNames []string
- FieldTypes []TypeInfo
- }
- // the ordering of the column with regard to its comparator
- type ColumnOrder bool
- const (
- ASC ColumnOrder = false
- DESC = true
- )
- type ColumnIndexMetadata struct {
- Name string
- Type string
- Options map[string]interface{}
- }
- type ColumnKind int
- const (
- ColumnUnkownKind ColumnKind = iota
- ColumnPartitionKey
- ColumnClusteringKey
- ColumnRegular
- ColumnCompact
- ColumnStatic
- )
- func (c ColumnKind) String() string {
- switch c {
- case ColumnPartitionKey:
- return "partition_key"
- case ColumnClusteringKey:
- return "clustering_key"
- case ColumnRegular:
- return "regular"
- case ColumnCompact:
- return "compact"
- case ColumnStatic:
- return "static"
- default:
- return fmt.Sprintf("unknown_column_%d", c)
- }
- }
- func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
- if typ.Type() != TypeVarchar {
- return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
- }
- kind, err := columnKindFromSchema(string(p))
- if err != nil {
- return err
- }
- *c = kind
- return nil
- }
- func columnKindFromSchema(kind string) (ColumnKind, error) {
- switch kind {
- case "partition_key":
- return ColumnPartitionKey, nil
- case "clustering_key", "clustering":
- return ColumnClusteringKey, nil
- case "regular":
- return ColumnRegular, nil
- case "compact_value":
- return ColumnCompact, nil
- case "static":
- return ColumnStatic, nil
- default:
- return -1, fmt.Errorf("unknown column kind: %q", kind)
- }
- }
- // default alias values
- const (
- DEFAULT_KEY_ALIAS = "key"
- DEFAULT_COLUMN_ALIAS = "column"
- DEFAULT_VALUE_ALIAS = "value"
- )
- // queries the cluster for schema information for a specific keyspace
- type schemaDescriber struct {
- session *Session
- mu sync.Mutex
- cache map[string]*KeyspaceMetadata
- }
- // creates a session bound schema describer which will query and cache
- // keyspace metadata
- func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
- session: session,
- cache: map[string]*KeyspaceMetadata{},
- }
- }
- // returns the cached KeyspaceMetadata held by the describer for the named
- // keyspace.
- func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- metadata, found := s.cache[keyspaceName]
- if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
- if err != nil {
- return nil, err
- }
- metadata = s.cache[keyspaceName]
- }
- return metadata, nil
- }
- // clears the already cached keyspace metadata
- func (s *schemaDescriber) clearSchema(keyspaceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
- delete(s.cache, keyspaceName)
- }
- // forcibly updates the current KeyspaceMetadata held by the schema describer
- // for a given named keyspace.
- func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
- var err error
- // query the system keyspace for schema data
- // TODO retrieve concurrently
- keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- tables, err := getTableMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- columns, err := getColumnMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- functions, err := getFunctionsMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- views, err := getViewsMetadata(s.session, keyspaceName)
- if err != nil {
- return err
- }
- // organize the schema data
- compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views)
- // update the cache
- s.cache[keyspaceName] = keyspace
- return nil
- }
- // "compiles" derived information about keyspace, table, and column metadata
- // for a keyspace from the basic queried metadata objects returned by
- // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
- // Links the metadata objects together and derives the column composition of
- // the partition key and clustering key for a table.
- func compileMetadata(
- protoVersion int,
- keyspace *KeyspaceMetadata,
- tables []TableMetadata,
- columns []ColumnMetadata,
- functions []FunctionMetadata,
- aggregates []AggregateMetadata,
- views []ViewMetadata,
- ) {
- keyspace.Tables = make(map[string]*TableMetadata)
- for i := range tables {
- tables[i].Columns = make(map[string]*ColumnMetadata)
- keyspace.Tables[tables[i].Name] = &tables[i]
- }
- keyspace.Functions = make(map[string]*FunctionMetadata, len(functions))
- for i := range functions {
- keyspace.Functions[functions[i].Name] = &functions[i]
- }
- keyspace.Aggregates = make(map[string]*AggregateMetadata, len(aggregates))
- for _, aggregate := range aggregates {
- aggregate.FinalFunc = *keyspace.Functions[aggregate.finalFunc]
- aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc]
- keyspace.Aggregates[aggregate.Name] = &aggregate
- }
- keyspace.Views = make(map[string]*ViewMetadata, len(views))
- for i := range views {
- keyspace.Views[views[i].Name] = &views[i]
- }
- // add columns from the schema data
- for i := range columns {
- col := &columns[i]
- // decode the validator for TypeInfo and order
- if col.ClusteringOrder != "" { // Cassandra 3.x+
- col.Type = getCassandraType(col.Validator)
- col.Order = ASC
- if col.ClusteringOrder == "desc" {
- col.Order = DESC
- }
- } else {
- validatorParsed := parseType(col.Validator)
- col.Type = validatorParsed.types[0]
- col.Order = ASC
- if validatorParsed.reversed[0] {
- col.Order = DESC
- }
- }
- table, ok := keyspace.Tables[col.Table]
- if !ok {
- // if the schema is being updated we will race between seeing
- // the metadata be complete. Potentially we should check for
- // schema versions before and after reading the metadata and
- // if they dont match try again.
- continue
- }
- table.Columns[col.Name] = col
- table.OrderedColumns = append(table.OrderedColumns, col.Name)
- }
- if protoVersion == protoVersion1 {
- compileV1Metadata(tables)
- } else {
- compileV2Metadata(tables)
- }
- }
- // Compiles derived information from TableMetadata which have had
- // ColumnMetadata added already. V1 protocol does not return as much
- // column metadata as V2+ (because V1 doesn't support the "type" column in the
- // system.schema_columns table) so determining PartitionKey and ClusterColumns
- // is more complex.
- func compileV1Metadata(tables []TableMetadata) {
- for i := range tables {
- table := &tables[i]
- // decode the key validator
- keyValidatorParsed := parseType(table.KeyValidator)
- // decode the comparator
- comparatorParsed := parseType(table.Comparator)
- // the partition key length is the same as the number of types in the
- // key validator
- table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
- // V1 protocol only returns "regular" columns from
- // system.schema_columns (there is no type field for columns)
- // so the alias information is used to
- // create the partition key and clustering columns
- // construct the partition key from the alias
- for i := range table.PartitionKey {
- var alias string
- if len(table.KeyAliases) > i {
- alias = table.KeyAliases[i]
- } else if i == 0 {
- alias = DEFAULT_KEY_ALIAS
- } else {
- alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
- }
- column := &ColumnMetadata{
- Keyspace: table.Keyspace,
- Table: table.Name,
- Name: alias,
- Type: keyValidatorParsed.types[i],
- Kind: ColumnPartitionKey,
- ComponentIndex: i,
- }
- table.PartitionKey[i] = column
- table.Columns[alias] = column
- }
- // determine the number of clustering columns
- size := len(comparatorParsed.types)
- if comparatorParsed.isComposite {
- if len(comparatorParsed.collections) != 0 ||
- (len(table.ColumnAliases) == size-1 &&
- comparatorParsed.types[size-1].Type() == TypeVarchar) {
- size = size - 1
- }
- } else {
- if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
- size = 0
- }
- }
- table.ClusteringColumns = make([]*ColumnMetadata, size)
- for i := range table.ClusteringColumns {
- var alias string
- if len(table.ColumnAliases) > i {
- alias = table.ColumnAliases[i]
- } else if i == 0 {
- alias = DEFAULT_COLUMN_ALIAS
- } else {
- alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
- }
- order := ASC
- if comparatorParsed.reversed[i] {
- order = DESC
- }
- column := &ColumnMetadata{
- Keyspace: table.Keyspace,
- Table: table.Name,
- Name: alias,
- Type: comparatorParsed.types[i],
- Order: order,
- Kind: ColumnClusteringKey,
- ComponentIndex: i,
- }
- table.ClusteringColumns[i] = column
- table.Columns[alias] = column
- }
- if size != len(comparatorParsed.types)-1 {
- alias := DEFAULT_VALUE_ALIAS
- if len(table.ValueAlias) > 0 {
- alias = table.ValueAlias
- }
- // decode the default validator
- defaultValidatorParsed := parseType(table.DefaultValidator)
- column := &ColumnMetadata{
- Keyspace: table.Keyspace,
- Table: table.Name,
- Name: alias,
- Type: defaultValidatorParsed.types[0],
- Kind: ColumnRegular,
- }
- table.Columns[alias] = column
- }
- }
- }
- // The simpler compile case for V2+ protocol
- func compileV2Metadata(tables []TableMetadata) {
- for i := range tables {
- table := &tables[i]
- clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
- table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
- if table.KeyValidator != "" {
- keyValidatorParsed := parseType(table.KeyValidator)
- table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
- } else { // Cassandra 3.x+
- partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
- table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
- }
- for _, columnName := range table.OrderedColumns {
- column := table.Columns[columnName]
- if column.Kind == ColumnPartitionKey {
- table.PartitionKey[column.ComponentIndex] = column
- } else if column.Kind == ColumnClusteringKey {
- table.ClusteringColumns[column.ComponentIndex] = column
- }
- }
- }
- }
- // returns the count of coluns with the given "kind" value.
- func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
- maxComponentIndex := -1
- for _, column := range columns {
- if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
- maxComponentIndex = column.ComponentIndex
- }
- }
- return maxComponentIndex + 1
- }
- // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
- func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
- keyspace := &KeyspaceMetadata{Name: keyspaceName}
- if session.useSystemSchema { // Cassandra 3.x+
- const stmt = `
- SELECT durable_writes, replication
- FROM system_schema.keyspaces
- WHERE keyspace_name = ?`
- var replication map[string]string
- iter := session.control.query(stmt, keyspaceName)
- if iter.NumRows() == 0 {
- return nil, ErrKeyspaceDoesNotExist
- }
- iter.Scan(&keyspace.DurableWrites, &replication)
- err := iter.Close()
- if err != nil {
- return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
- }
- keyspace.StrategyClass = replication["class"]
- delete(replication, "class")
- keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
- for k, v := range replication {
- keyspace.StrategyOptions[k] = v
- }
- } else {
- const stmt = `
- SELECT durable_writes, strategy_class, strategy_options
- FROM system.schema_keyspaces
- WHERE keyspace_name = ?`
- var strategyOptionsJSON []byte
- iter := session.control.query(stmt, keyspaceName)
- if iter.NumRows() == 0 {
- return nil, ErrKeyspaceDoesNotExist
- }
- iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
- err := iter.Close()
- if err != nil {
- return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
- }
- err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
- if err != nil {
- return nil, fmt.Errorf(
- "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
- strategyOptionsJSON, keyspace.Name, err,
- )
- }
- }
- return keyspace, nil
- }
- // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
- func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
- var (
- iter *Iter
- scan func(iter *Iter, table *TableMetadata) bool
- stmt string
- keyAliasesJSON []byte
- columnAliasesJSON []byte
- )
- if session.useSystemSchema { // Cassandra 3.x+
- stmt = `
- SELECT
- table_name
- FROM system_schema.tables
- WHERE keyspace_name = ?`
- switchIter := func() *Iter {
- iter.Close()
- stmt = `
- SELECT
- view_name
- FROM system_schema.views
- WHERE keyspace_name = ?`
- iter = session.control.query(stmt, keyspaceName)
- return iter
- }
- scan = func(iter *Iter, table *TableMetadata) bool {
- r := iter.Scan(
- &table.Name,
- )
- if !r {
- iter = switchIter()
- if iter != nil {
- switchIter = func() *Iter { return nil }
- r = iter.Scan(&table.Name)
- }
- }
- return r
- }
- } else if session.cfg.ProtoVersion == protoVersion1 {
- // we have key aliases
- stmt = `
- SELECT
- columnfamily_name,
- key_validator,
- comparator,
- default_validator,
- key_aliases,
- column_aliases,
- value_alias
- FROM system.schema_columnfamilies
- WHERE keyspace_name = ?`
- scan = func(iter *Iter, table *TableMetadata) bool {
- return iter.Scan(
- &table.Name,
- &table.KeyValidator,
- &table.Comparator,
- &table.DefaultValidator,
- &keyAliasesJSON,
- &columnAliasesJSON,
- &table.ValueAlias,
- )
- }
- } else {
- stmt = `
- SELECT
- columnfamily_name,
- key_validator,
- comparator,
- default_validator
- FROM system.schema_columnfamilies
- WHERE keyspace_name = ?`
- scan = func(iter *Iter, table *TableMetadata) bool {
- return iter.Scan(
- &table.Name,
- &table.KeyValidator,
- &table.Comparator,
- &table.DefaultValidator,
- )
- }
- }
- iter = session.control.query(stmt, keyspaceName)
- tables := []TableMetadata{}
- table := TableMetadata{Keyspace: keyspaceName}
- for scan(iter, &table) {
- var err error
- // decode the key aliases
- if keyAliasesJSON != nil {
- table.KeyAliases = []string{}
- err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
- if err != nil {
- iter.Close()
- return nil, fmt.Errorf(
- "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
- keyAliasesJSON, table.Name, err,
- )
- }
- }
- // decode the column aliases
- if columnAliasesJSON != nil {
- table.ColumnAliases = []string{}
- err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
- if err != nil {
- iter.Close()
- return nil, fmt.Errorf(
- "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
- columnAliasesJSON, table.Name, err,
- )
- }
- }
- tables = append(tables, table)
- table = TableMetadata{Keyspace: keyspaceName}
- }
- err := iter.Close()
- if err != nil && err != ErrNotFound {
- return nil, fmt.Errorf("Error querying table schema: %v", err)
- }
- return tables, nil
- }
- func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
- // V1 does not support the type column, and all returned rows are
- // of kind "regular".
- const stmt = `
- SELECT
- columnfamily_name,
- column_name,
- component_index,
- validator,
- index_name,
- index_type,
- index_options
- FROM system.schema_columns
- WHERE keyspace_name = ?`
- var columns []ColumnMetadata
- rows := s.control.query(stmt, keyspace).Scanner()
- for rows.Next() {
- var (
- column = ColumnMetadata{Keyspace: keyspace}
- indexOptionsJSON []byte
- )
- // all columns returned by V1 are regular
- column.Kind = ColumnRegular
- err := rows.Scan(&column.Table,
- &column.Name,
- &column.ComponentIndex,
- &column.Validator,
- &column.Index.Name,
- &column.Index.Type,
- &indexOptionsJSON)
- if err != nil {
- return nil, err
- }
- if len(indexOptionsJSON) > 0 {
- err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
- if err != nil {
- return nil, fmt.Errorf(
- "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
- indexOptionsJSON,
- column.Name,
- column.Table,
- err)
- }
- }
- columns = append(columns, column)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return columns, nil
- }
- func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
- // V2+ supports the type column
- const stmt = `
- SELECT
- columnfamily_name,
- column_name,
- component_index,
- validator,
- index_name,
- index_type,
- index_options,
- type
- FROM system.schema_columns
- WHERE keyspace_name = ?`
- var columns []ColumnMetadata
- rows := s.control.query(stmt, keyspace).Scanner()
- for rows.Next() {
- var (
- column = ColumnMetadata{Keyspace: keyspace}
- indexOptionsJSON []byte
- )
- err := rows.Scan(&column.Table,
- &column.Name,
- &column.ComponentIndex,
- &column.Validator,
- &column.Index.Name,
- &column.Index.Type,
- &indexOptionsJSON,
- &column.Kind,
- )
- if err != nil {
- return nil, err
- }
- if len(indexOptionsJSON) > 0 {
- err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
- if err != nil {
- return nil, fmt.Errorf(
- "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
- indexOptionsJSON,
- column.Name,
- column.Table,
- err)
- }
- }
- columns = append(columns, column)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return columns, nil
- }
- func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
- const stmt = `
- SELECT
- table_name,
- column_name,
- clustering_order,
- type,
- kind,
- position
- FROM system_schema.columns
- WHERE keyspace_name = ?`
- var columns []ColumnMetadata
- rows := s.control.query(stmt, keyspace).Scanner()
- for rows.Next() {
- column := ColumnMetadata{Keyspace: keyspace}
- err := rows.Scan(&column.Table,
- &column.Name,
- &column.ClusteringOrder,
- &column.Validator,
- &column.Kind,
- &column.ComponentIndex,
- )
- if err != nil {
- return nil, err
- }
- columns = append(columns, column)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- // TODO(zariel): get column index info from system_schema.indexes
- return columns, nil
- }
- // query for only the column metadata in the specified keyspace from system.schema_columns
- func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
- var (
- columns []ColumnMetadata
- err error
- )
- // Deal with differences in protocol versions
- if session.cfg.ProtoVersion == 1 {
- columns, err = session.scanColumnMetadataV1(keyspaceName)
- } else if session.useSystemSchema { // Cassandra 3.x+
- columns, err = session.scanColumnMetadataSystem(keyspaceName)
- } else {
- columns, err = session.scanColumnMetadataV2(keyspaceName)
- }
- if err != nil && err != ErrNotFound {
- return nil, fmt.Errorf("Error querying column schema: %v", err)
- }
- return columns, nil
- }
- func getTypeInfo(t string) TypeInfo {
- if strings.HasPrefix(t, apacheCassandraTypePrefix) {
- t = apacheToCassandraType(t)
- }
- return getCassandraType(t)
- }
- func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, error) {
- if session.cfg.ProtoVersion == protoVersion1 {
- return nil, nil
- }
- var tableName string
- if session.useSystemSchema {
- tableName = "system_schema.types"
- } else {
- tableName = "system.schema_usertypes"
- }
- stmt := fmt.Sprintf(`
- SELECT
- type_name,
- field_names,
- field_types
- FROM %s
- WHERE keyspace_name = ?`, tableName)
- var views []ViewMetadata
- rows := session.control.query(stmt, keyspaceName).Scanner()
- for rows.Next() {
- view := ViewMetadata{Keyspace: keyspaceName}
- var argumentTypes []string
- err := rows.Scan(&view.Name,
- &view.FieldNames,
- &argumentTypes,
- )
- if err != nil {
- return nil, err
- }
- view.FieldTypes = make([]TypeInfo, len(argumentTypes))
- for i, argumentType := range argumentTypes {
- view.FieldTypes[i] = getTypeInfo(argumentType)
- }
- views = append(views, view)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return views, nil
- }
- func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) {
- if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
- return nil, nil
- }
- var tableName string
- if session.useSystemSchema {
- tableName = "system_schema.functions"
- } else {
- tableName = "system.schema_functions"
- }
- stmt := fmt.Sprintf(`
- SELECT
- function_name,
- argument_types,
- argument_names,
- body,
- called_on_null_input,
- language,
- return_type
- FROM %s
- WHERE keyspace_name = ?`, tableName)
- var functions []FunctionMetadata
- rows := session.control.query(stmt, keyspaceName).Scanner()
- for rows.Next() {
- function := FunctionMetadata{Keyspace: keyspaceName}
- var argumentTypes []string
- var returnType string
- err := rows.Scan(&function.Name,
- &argumentTypes,
- &function.ArgumentNames,
- &function.Body,
- &function.CalledOnNullInput,
- &function.Language,
- &returnType,
- )
- if err != nil {
- return nil, err
- }
- function.ReturnType = getTypeInfo(returnType)
- function.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
- for i, argumentType := range argumentTypes {
- function.ArgumentTypes[i] = getTypeInfo(argumentType)
- }
- functions = append(functions, function)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return functions, nil
- }
- func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) {
- if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
- return nil, nil
- }
- var tableName string
- if session.useSystemSchema {
- tableName = "system_schema.aggregates"
- } else {
- tableName = "system.schema_aggregates"
- }
- stmt := fmt.Sprintf(`
- SELECT
- aggregate_name,
- argument_types,
- final_func,
- initcond,
- return_type,
- state_func,
- state_type
- FROM %s
- WHERE keyspace_name = ?`, tableName)
- var aggregates []AggregateMetadata
- rows := session.control.query(stmt, keyspaceName).Scanner()
- for rows.Next() {
- aggregate := AggregateMetadata{Keyspace: keyspaceName}
- var argumentTypes []string
- var returnType string
- var stateType string
- err := rows.Scan(&aggregate.Name,
- &argumentTypes,
- &aggregate.finalFunc,
- &aggregate.InitCond,
- &returnType,
- &aggregate.stateFunc,
- &stateType,
- )
- if err != nil {
- return nil, err
- }
- aggregate.ReturnType = getTypeInfo(returnType)
- aggregate.StateType = getTypeInfo(stateType)
- aggregate.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
- for i, argumentType := range argumentTypes {
- aggregate.ArgumentTypes[i] = getTypeInfo(argumentType)
- }
- aggregates = append(aggregates, aggregate)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return aggregates, nil
- }
- // type definition parser state
- type typeParser struct {
- input string
- index int
- }
- // the type definition parser result
- type typeParserResult struct {
- isComposite bool
- types []TypeInfo
- reversed []bool
- collections map[string]TypeInfo
- }
- // Parse the type definition used for validator and comparator schema data
- func parseType(def string) typeParserResult {
- parser := &typeParser{input: def}
- return parser.parse()
- }
- const (
- REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
- COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
- COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
- LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
- SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
- MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
- )
- // represents a class specification in the type def AST
- type typeParserClassNode struct {
- name string
- params []typeParserParamNode
- // this is the segment of the input string that defined this node
- input string
- }
- // represents a class parameter in the type def AST
- type typeParserParamNode struct {
- name *string
- class typeParserClassNode
- }
- func (t *typeParser) parse() typeParserResult {
- // parse the AST
- ast, ok := t.parseClassNode()
- if !ok {
- // treat this is a custom type
- return typeParserResult{
- isComposite: false,
- types: []TypeInfo{
- NativeType{
- typ: TypeCustom,
- custom: t.input,
- },
- },
- reversed: []bool{false},
- collections: nil,
- }
- }
- // interpret the AST
- if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
- count := len(ast.params)
- // look for a collections param
- last := ast.params[count-1]
- collections := map[string]TypeInfo{}
- if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
- count--
- for _, param := range last.class.params {
- // decode the name
- var name string
- decoded, err := hex.DecodeString(*param.name)
- if err != nil {
- Logger.Printf(
- "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
- t.input,
- *param.name,
- err,
- )
- // just use the provided name
- name = *param.name
- } else {
- name = string(decoded)
- }
- collections[name] = param.class.asTypeInfo()
- }
- }
- types := make([]TypeInfo, count)
- reversed := make([]bool, count)
- for i, param := range ast.params[:count] {
- class := param.class
- reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
- if reversed[i] {
- class = class.params[0].class
- }
- types[i] = class.asTypeInfo()
- }
- return typeParserResult{
- isComposite: true,
- types: types,
- reversed: reversed,
- collections: collections,
- }
- } else {
- // not composite, so one type
- class := *ast
- reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
- if reversed {
- class = class.params[0].class
- }
- typeInfo := class.asTypeInfo()
- return typeParserResult{
- isComposite: false,
- types: []TypeInfo{typeInfo},
- reversed: []bool{reversed},
- }
- }
- }
- func (class *typeParserClassNode) asTypeInfo() TypeInfo {
- if strings.HasPrefix(class.name, LIST_TYPE) {
- elem := class.params[0].class.asTypeInfo()
- return CollectionType{
- NativeType: NativeType{
- typ: TypeList,
- },
- Elem: elem,
- }
- }
- if strings.HasPrefix(class.name, SET_TYPE) {
- elem := class.params[0].class.asTypeInfo()
- return CollectionType{
- NativeType: NativeType{
- typ: TypeSet,
- },
- Elem: elem,
- }
- }
- if strings.HasPrefix(class.name, MAP_TYPE) {
- key := class.params[0].class.asTypeInfo()
- elem := class.params[1].class.asTypeInfo()
- return CollectionType{
- NativeType: NativeType{
- typ: TypeMap,
- },
- Key: key,
- Elem: elem,
- }
- }
- // must be a simple type or custom type
- info := NativeType{typ: getApacheCassandraType(class.name)}
- if info.typ == TypeCustom {
- // add the entire class definition
- info.custom = class.input
- }
- return info
- }
- // CLASS := ID [ PARAMS ]
- func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
- t.skipWhitespace()
- startIndex := t.index
- name, ok := t.nextIdentifier()
- if !ok {
- return nil, false
- }
- params, ok := t.parseParamNodes()
- if !ok {
- return nil, false
- }
- endIndex := t.index
- node = &typeParserClassNode{
- name: name,
- params: params,
- input: t.input[startIndex:endIndex],
- }
- return node, true
- }
- // PARAMS := "(" PARAM { "," PARAM } ")"
- // PARAM := [ PARAM_NAME ":" ] CLASS
- // PARAM_NAME := ID
- func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
- t.skipWhitespace()
- // the params are optional
- if t.index == len(t.input) || t.input[t.index] != '(' {
- return nil, true
- }
- params = []typeParserParamNode{}
- // consume the '('
- t.index++
- t.skipWhitespace()
- for t.input[t.index] != ')' {
- // look for a named param, but if no colon, then we want to backup
- backupIndex := t.index
- // name will be a hex encoded version of a utf-8 string
- name, ok := t.nextIdentifier()
- if !ok {
- return nil, false
- }
- hasName := true
- // TODO handle '=>' used for DynamicCompositeType
- t.skipWhitespace()
- if t.input[t.index] == ':' {
- // there is a name for this parameter
- // consume the ':'
- t.index++
- t.skipWhitespace()
- } else {
- // no name, backup
- hasName = false
- t.index = backupIndex
- }
- // parse the next full parameter
- classNode, ok := t.parseClassNode()
- if !ok {
- return nil, false
- }
- if hasName {
- params = append(
- params,
- typeParserParamNode{name: &name, class: *classNode},
- )
- } else {
- params = append(
- params,
- typeParserParamNode{class: *classNode},
- )
- }
- t.skipWhitespace()
- if t.input[t.index] == ',' {
- // consume the comma
- t.index++
- t.skipWhitespace()
- }
- }
- // consume the ')'
- t.index++
- return params, true
- }
- func (t *typeParser) skipWhitespace() {
- for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
- t.index++
- }
- }
- func isWhitespaceChar(c byte) bool {
- return c == ' ' || c == '\n' || c == '\t'
- }
- // ID := LETTER { LETTER }
- // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
- func (t *typeParser) nextIdentifier() (id string, found bool) {
- startIndex := t.index
- for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
- t.index++
- }
- if startIndex == t.index {
- return "", false
- }
- return t.input[startIndex:t.index], true
- }
- func isIdentifierChar(c byte) bool {
- return (c >= '0' && c <= '9') ||
- (c >= 'a' && c <= 'z') ||
- (c >= 'A' && c <= 'Z') ||
- c == '-' ||
- c == '+' ||
- c == '.' ||
- c == '_' ||
- c == '&'
- }
|