metadata.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. // schema metadata for a keyspace
  14. type KeyspaceMetadata struct {
  15. Name string
  16. DurableWrites bool
  17. StrategyClass string
  18. StrategyOptions map[string]interface{}
  19. Tables map[string]*TableMetadata
  20. }
  21. // schema metadata for a table (a.k.a. column family)
  22. type TableMetadata struct {
  23. Keyspace string
  24. Name string
  25. KeyValidator string
  26. Comparator string
  27. DefaultValidator string
  28. KeyAliases []string
  29. ColumnAliases []string
  30. ValueAlias string
  31. PartitionKey []*ColumnMetadata
  32. ClusteringColumns []*ColumnMetadata
  33. Columns map[string]*ColumnMetadata
  34. OrderedColumns []string
  35. }
  36. // schema metadata for a column
  37. type ColumnMetadata struct {
  38. Keyspace string
  39. Table string
  40. Name string
  41. ComponentIndex int
  42. Kind ColumnKind
  43. Validator string
  44. Type TypeInfo
  45. ClusteringOrder string
  46. Order ColumnOrder
  47. Index ColumnIndexMetadata
  48. }
  49. // the ordering of the column with regard to its comparator
  50. type ColumnOrder bool
  51. const (
  52. ASC ColumnOrder = false
  53. DESC = true
  54. )
  55. type ColumnIndexMetadata struct {
  56. Name string
  57. Type string
  58. Options map[string]interface{}
  59. }
  60. type ColumnKind int
  61. const (
  62. ColumnUnkownKind ColumnKind = iota
  63. ColumnPartitionKey
  64. ColumnClusteringKey
  65. ColumnRegular
  66. ColumnCompact
  67. ColumnStatic
  68. )
  69. func (c ColumnKind) String() string {
  70. switch c {
  71. case ColumnPartitionKey:
  72. return "partition_key"
  73. case ColumnClusteringKey:
  74. return "clustering_key"
  75. case ColumnRegular:
  76. return "regular"
  77. case ColumnCompact:
  78. return "compact"
  79. case ColumnStatic:
  80. return "static"
  81. default:
  82. return fmt.Sprintf("unknown_column_%d", c)
  83. }
  84. }
  85. func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
  86. if typ.Type() != TypeVarchar {
  87. return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
  88. }
  89. kind, err := columnKindFromSchema(string(p))
  90. if err != nil {
  91. return err
  92. }
  93. *c = kind
  94. return nil
  95. }
  96. func columnKindFromSchema(kind string) (ColumnKind, error) {
  97. switch kind {
  98. case "partition_key":
  99. return ColumnPartitionKey, nil
  100. case "clustering_key", "clustering":
  101. return ColumnClusteringKey, nil
  102. case "regular":
  103. return ColumnRegular, nil
  104. case "compact_value":
  105. return ColumnCompact, nil
  106. case "static":
  107. return ColumnStatic, nil
  108. default:
  109. return -1, fmt.Errorf("unknown column kind: %q", kind)
  110. }
  111. }
  112. // default alias values
  113. const (
  114. DEFAULT_KEY_ALIAS = "key"
  115. DEFAULT_COLUMN_ALIAS = "column"
  116. DEFAULT_VALUE_ALIAS = "value"
  117. )
  118. // queries the cluster for schema information for a specific keyspace
  119. type schemaDescriber struct {
  120. session *Session
  121. mu sync.Mutex
  122. cache map[string]*KeyspaceMetadata
  123. }
  124. // creates a session bound schema describer which will query and cache
  125. // keyspace metadata
  126. func newSchemaDescriber(session *Session) *schemaDescriber {
  127. return &schemaDescriber{
  128. session: session,
  129. cache: map[string]*KeyspaceMetadata{},
  130. }
  131. }
  132. // returns the cached KeyspaceMetadata held by the describer for the named
  133. // keyspace.
  134. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
  135. s.mu.Lock()
  136. defer s.mu.Unlock()
  137. metadata, found := s.cache[keyspaceName]
  138. if !found {
  139. // refresh the cache for this keyspace
  140. err := s.refreshSchema(keyspaceName)
  141. if err != nil {
  142. return nil, err
  143. }
  144. metadata = s.cache[keyspaceName]
  145. }
  146. return metadata, nil
  147. }
  148. // clears the already cached keyspace metadata
  149. func (s *schemaDescriber) clearSchema(keyspaceName string) {
  150. s.mu.Lock()
  151. defer s.mu.Unlock()
  152. delete(s.cache, keyspaceName)
  153. }
  154. // forcibly updates the current KeyspaceMetadata held by the schema describer
  155. // for a given named keyspace.
  156. func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
  157. var err error
  158. // query the system keyspace for schema data
  159. // TODO retrieve concurrently
  160. keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
  161. if err != nil {
  162. return err
  163. }
  164. tables, err := getTableMetadata(s.session, keyspaceName)
  165. if err != nil {
  166. return err
  167. }
  168. columns, err := getColumnMetadata(s.session, keyspaceName)
  169. if err != nil {
  170. return err
  171. }
  172. // organize the schema data
  173. compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns)
  174. // update the cache
  175. s.cache[keyspaceName] = keyspace
  176. return nil
  177. }
  178. // "compiles" derived information about keyspace, table, and column metadata
  179. // for a keyspace from the basic queried metadata objects returned by
  180. // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
  181. // Links the metadata objects together and derives the column composition of
  182. // the partition key and clustering key for a table.
  183. func compileMetadata(
  184. protoVersion int,
  185. keyspace *KeyspaceMetadata,
  186. tables []TableMetadata,
  187. columns []ColumnMetadata,
  188. ) {
  189. keyspace.Tables = make(map[string]*TableMetadata)
  190. for i := range tables {
  191. tables[i].Columns = make(map[string]*ColumnMetadata)
  192. keyspace.Tables[tables[i].Name] = &tables[i]
  193. }
  194. // add columns from the schema data
  195. for i := range columns {
  196. col := &columns[i]
  197. // decode the validator for TypeInfo and order
  198. if col.ClusteringOrder != "" { // Cassandra 3.x+
  199. col.Type = getCassandraType(col.Validator)
  200. col.Order = ASC
  201. if col.ClusteringOrder == "desc" {
  202. col.Order = DESC
  203. }
  204. } else {
  205. validatorParsed := parseType(col.Validator)
  206. col.Type = validatorParsed.types[0]
  207. col.Order = ASC
  208. if validatorParsed.reversed[0] {
  209. col.Order = DESC
  210. }
  211. }
  212. table := keyspace.Tables[col.Table]
  213. table.Columns[col.Name] = col
  214. table.OrderedColumns = append(table.OrderedColumns, col.Name)
  215. }
  216. if protoVersion == protoVersion1 {
  217. compileV1Metadata(tables)
  218. } else {
  219. compileV2Metadata(tables)
  220. }
  221. }
  222. // Compiles derived information from TableMetadata which have had
  223. // ColumnMetadata added already. V1 protocol does not return as much
  224. // column metadata as V2+ (because V1 doesn't support the "type" column in the
  225. // system.schema_columns table) so determining PartitionKey and ClusterColumns
  226. // is more complex.
  227. func compileV1Metadata(tables []TableMetadata) {
  228. for i := range tables {
  229. table := &tables[i]
  230. // decode the key validator
  231. keyValidatorParsed := parseType(table.KeyValidator)
  232. // decode the comparator
  233. comparatorParsed := parseType(table.Comparator)
  234. // the partition key length is the same as the number of types in the
  235. // key validator
  236. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  237. // V1 protocol only returns "regular" columns from
  238. // system.schema_columns (there is no type field for columns)
  239. // so the alias information is used to
  240. // create the partition key and clustering columns
  241. // construct the partition key from the alias
  242. for i := range table.PartitionKey {
  243. var alias string
  244. if len(table.KeyAliases) > i {
  245. alias = table.KeyAliases[i]
  246. } else if i == 0 {
  247. alias = DEFAULT_KEY_ALIAS
  248. } else {
  249. alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
  250. }
  251. column := &ColumnMetadata{
  252. Keyspace: table.Keyspace,
  253. Table: table.Name,
  254. Name: alias,
  255. Type: keyValidatorParsed.types[i],
  256. Kind: ColumnPartitionKey,
  257. ComponentIndex: i,
  258. }
  259. table.PartitionKey[i] = column
  260. table.Columns[alias] = column
  261. }
  262. // determine the number of clustering columns
  263. size := len(comparatorParsed.types)
  264. if comparatorParsed.isComposite {
  265. if len(comparatorParsed.collections) != 0 ||
  266. (len(table.ColumnAliases) == size-1 &&
  267. comparatorParsed.types[size-1].Type() == TypeVarchar) {
  268. size = size - 1
  269. }
  270. } else {
  271. if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
  272. size = 0
  273. }
  274. }
  275. table.ClusteringColumns = make([]*ColumnMetadata, size)
  276. for i := range table.ClusteringColumns {
  277. var alias string
  278. if len(table.ColumnAliases) > i {
  279. alias = table.ColumnAliases[i]
  280. } else if i == 0 {
  281. alias = DEFAULT_COLUMN_ALIAS
  282. } else {
  283. alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
  284. }
  285. order := ASC
  286. if comparatorParsed.reversed[i] {
  287. order = DESC
  288. }
  289. column := &ColumnMetadata{
  290. Keyspace: table.Keyspace,
  291. Table: table.Name,
  292. Name: alias,
  293. Type: comparatorParsed.types[i],
  294. Order: order,
  295. Kind: ColumnClusteringKey,
  296. ComponentIndex: i,
  297. }
  298. table.ClusteringColumns[i] = column
  299. table.Columns[alias] = column
  300. }
  301. if size != len(comparatorParsed.types)-1 {
  302. alias := DEFAULT_VALUE_ALIAS
  303. if len(table.ValueAlias) > 0 {
  304. alias = table.ValueAlias
  305. }
  306. // decode the default validator
  307. defaultValidatorParsed := parseType(table.DefaultValidator)
  308. column := &ColumnMetadata{
  309. Keyspace: table.Keyspace,
  310. Table: table.Name,
  311. Name: alias,
  312. Type: defaultValidatorParsed.types[0],
  313. Kind: ColumnRegular,
  314. }
  315. table.Columns[alias] = column
  316. }
  317. }
  318. }
  319. // The simpler compile case for V2+ protocol
  320. func compileV2Metadata(tables []TableMetadata) {
  321. for i := range tables {
  322. table := &tables[i]
  323. clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
  324. table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
  325. if table.KeyValidator != "" {
  326. keyValidatorParsed := parseType(table.KeyValidator)
  327. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  328. } else { // Cassandra 3.x+
  329. partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
  330. table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
  331. }
  332. for _, columnName := range table.OrderedColumns {
  333. column := table.Columns[columnName]
  334. if column.Kind == ColumnPartitionKey {
  335. table.PartitionKey[column.ComponentIndex] = column
  336. } else if column.Kind == ColumnClusteringKey {
  337. table.ClusteringColumns[column.ComponentIndex] = column
  338. }
  339. }
  340. }
  341. }
  342. // returns the count of coluns with the given "kind" value.
  343. func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
  344. maxComponentIndex := -1
  345. for _, column := range columns {
  346. if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
  347. maxComponentIndex = column.ComponentIndex
  348. }
  349. }
  350. return maxComponentIndex + 1
  351. }
  352. // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
  353. func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
  354. keyspace := &KeyspaceMetadata{Name: keyspaceName}
  355. if session.useSystemSchema { // Cassandra 3.x+
  356. const stmt = `
  357. SELECT durable_writes, replication
  358. FROM system_schema.keyspaces
  359. WHERE keyspace_name = ?`
  360. var replication map[string]string
  361. iter := session.control.query(stmt, keyspaceName)
  362. if iter.NumRows() == 0 {
  363. return nil, ErrKeyspaceDoesNotExist
  364. }
  365. iter.Scan(&keyspace.DurableWrites, &replication)
  366. err := iter.Close()
  367. if err != nil {
  368. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  369. }
  370. keyspace.StrategyClass = replication["class"]
  371. delete(replication, "class")
  372. keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
  373. for k, v := range replication {
  374. keyspace.StrategyOptions[k] = v
  375. }
  376. } else {
  377. const stmt = `
  378. SELECT durable_writes, strategy_class, strategy_options
  379. FROM system.schema_keyspaces
  380. WHERE keyspace_name = ?`
  381. var strategyOptionsJSON []byte
  382. iter := session.control.query(stmt, keyspaceName)
  383. if iter.NumRows() == 0 {
  384. return nil, ErrKeyspaceDoesNotExist
  385. }
  386. iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
  387. err := iter.Close()
  388. if err != nil {
  389. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  390. }
  391. err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
  392. if err != nil {
  393. return nil, fmt.Errorf(
  394. "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
  395. strategyOptionsJSON, keyspace.Name, err,
  396. )
  397. }
  398. }
  399. return keyspace, nil
  400. }
  401. // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
  402. func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
  403. var (
  404. iter *Iter
  405. scan func(iter *Iter, table *TableMetadata) bool
  406. stmt string
  407. keyAliasesJSON []byte
  408. columnAliasesJSON []byte
  409. )
  410. if session.useSystemSchema { // Cassandra 3.x+
  411. stmt = `
  412. SELECT
  413. table_name
  414. FROM system_schema.tables
  415. WHERE keyspace_name = ?`
  416. switchIter := func() *Iter {
  417. iter.Close()
  418. stmt = `
  419. SELECT
  420. view_name
  421. FROM system_schema.views
  422. WHERE keyspace_name = ?`
  423. iter = session.control.query(stmt, keyspaceName)
  424. return iter
  425. }
  426. scan = func(iter *Iter, table *TableMetadata) bool {
  427. r := iter.Scan(
  428. &table.Name,
  429. )
  430. if !r {
  431. iter = switchIter()
  432. if iter != nil {
  433. switchIter = func() *Iter { return nil }
  434. r = iter.Scan(&table.Name)
  435. }
  436. }
  437. return r
  438. }
  439. } else if session.cfg.ProtoVersion == protoVersion1 {
  440. // we have key aliases
  441. stmt = `
  442. SELECT
  443. columnfamily_name,
  444. key_validator,
  445. comparator,
  446. default_validator,
  447. key_aliases,
  448. column_aliases,
  449. value_alias
  450. FROM system.schema_columnfamilies
  451. WHERE keyspace_name = ?`
  452. scan = func(iter *Iter, table *TableMetadata) bool {
  453. return iter.Scan(
  454. &table.Name,
  455. &table.KeyValidator,
  456. &table.Comparator,
  457. &table.DefaultValidator,
  458. &keyAliasesJSON,
  459. &columnAliasesJSON,
  460. &table.ValueAlias,
  461. )
  462. }
  463. } else {
  464. stmt = `
  465. SELECT
  466. columnfamily_name,
  467. key_validator,
  468. comparator,
  469. default_validator
  470. FROM system.schema_columnfamilies
  471. WHERE keyspace_name = ?`
  472. scan = func(iter *Iter, table *TableMetadata) bool {
  473. return iter.Scan(
  474. &table.Name,
  475. &table.KeyValidator,
  476. &table.Comparator,
  477. &table.DefaultValidator,
  478. )
  479. }
  480. }
  481. iter = session.control.query(stmt, keyspaceName)
  482. tables := []TableMetadata{}
  483. table := TableMetadata{Keyspace: keyspaceName}
  484. for scan(iter, &table) {
  485. var err error
  486. // decode the key aliases
  487. if keyAliasesJSON != nil {
  488. table.KeyAliases = []string{}
  489. err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
  490. if err != nil {
  491. iter.Close()
  492. return nil, fmt.Errorf(
  493. "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
  494. keyAliasesJSON, table.Name, err,
  495. )
  496. }
  497. }
  498. // decode the column aliases
  499. if columnAliasesJSON != nil {
  500. table.ColumnAliases = []string{}
  501. err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
  502. if err != nil {
  503. iter.Close()
  504. return nil, fmt.Errorf(
  505. "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
  506. columnAliasesJSON, table.Name, err,
  507. )
  508. }
  509. }
  510. tables = append(tables, table)
  511. table = TableMetadata{Keyspace: keyspaceName}
  512. }
  513. err := iter.Close()
  514. if err != nil && err != ErrNotFound {
  515. return nil, fmt.Errorf("Error querying table schema: %v", err)
  516. }
  517. return tables, nil
  518. }
  519. func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
  520. // V1 does not support the type column, and all returned rows are
  521. // of kind "regular".
  522. const stmt = `
  523. SELECT
  524. columnfamily_name,
  525. column_name,
  526. component_index,
  527. validator,
  528. index_name,
  529. index_type,
  530. index_options
  531. FROM system.schema_columns
  532. WHERE keyspace_name = ?`
  533. var columns []ColumnMetadata
  534. rows := s.control.query(stmt, keyspace).Scanner()
  535. for rows.Next() {
  536. var (
  537. column = ColumnMetadata{Keyspace: keyspace}
  538. indexOptionsJSON []byte
  539. )
  540. // all columns returned by V1 are regular
  541. column.Kind = ColumnRegular
  542. err := rows.Scan(&column.Table,
  543. &column.Name,
  544. &column.ComponentIndex,
  545. &column.Validator,
  546. &column.Index.Name,
  547. &column.Index.Type,
  548. &indexOptionsJSON)
  549. if err != nil {
  550. return nil, err
  551. }
  552. if len(indexOptionsJSON) > 0 {
  553. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  554. if err != nil {
  555. return nil, fmt.Errorf(
  556. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  557. indexOptionsJSON,
  558. column.Name,
  559. column.Table,
  560. err)
  561. }
  562. }
  563. columns = append(columns, column)
  564. }
  565. if err := rows.Err(); err != nil {
  566. return nil, err
  567. }
  568. return columns, nil
  569. }
  570. func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
  571. // V2+ supports the type column
  572. const stmt = `
  573. SELECT
  574. columnfamily_name,
  575. column_name,
  576. component_index,
  577. validator,
  578. index_name,
  579. index_type,
  580. index_options,
  581. type
  582. FROM system.schema_columns
  583. WHERE keyspace_name = ?`
  584. var columns []ColumnMetadata
  585. rows := s.control.query(stmt, keyspace).Scanner()
  586. for rows.Next() {
  587. var (
  588. column = ColumnMetadata{Keyspace: keyspace}
  589. indexOptionsJSON []byte
  590. )
  591. err := rows.Scan(&column.Table,
  592. &column.Name,
  593. &column.ComponentIndex,
  594. &column.Validator,
  595. &column.Index.Name,
  596. &column.Index.Type,
  597. &indexOptionsJSON,
  598. &column.Kind,
  599. )
  600. if err != nil {
  601. return nil, err
  602. }
  603. if len(indexOptionsJSON) > 0 {
  604. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  605. if err != nil {
  606. return nil, fmt.Errorf(
  607. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  608. indexOptionsJSON,
  609. column.Name,
  610. column.Table,
  611. err)
  612. }
  613. }
  614. columns = append(columns, column)
  615. }
  616. if err := rows.Err(); err != nil {
  617. return nil, err
  618. }
  619. return columns, nil
  620. }
  621. func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
  622. const stmt = `
  623. SELECT
  624. table_name,
  625. column_name,
  626. clustering_order,
  627. type,
  628. kind,
  629. position
  630. FROM system_schema.columns
  631. WHERE keyspace_name = ?`
  632. var columns []ColumnMetadata
  633. rows := s.control.query(stmt, keyspace).Scanner()
  634. for rows.Next() {
  635. column := ColumnMetadata{Keyspace: keyspace}
  636. err := rows.Scan(&column.Table,
  637. &column.Name,
  638. &column.ClusteringOrder,
  639. &column.Validator,
  640. &column.Kind,
  641. &column.ComponentIndex,
  642. )
  643. if err != nil {
  644. return nil, err
  645. }
  646. columns = append(columns, column)
  647. }
  648. if err := rows.Err(); err != nil {
  649. return nil, err
  650. }
  651. // TODO(zariel): get column index info from system_schema.indexes
  652. return columns, nil
  653. }
  654. // query for only the column metadata in the specified keyspace from system.schema_columns
  655. func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
  656. var (
  657. columns []ColumnMetadata
  658. err error
  659. )
  660. // Deal with differences in protocol versions
  661. if session.cfg.ProtoVersion == 1 {
  662. columns, err = session.scanColumnMetadataV1(keyspaceName)
  663. } else if session.useSystemSchema { // Cassandra 3.x+
  664. columns, err = session.scanColumnMetadataSystem(keyspaceName)
  665. } else {
  666. columns, err = session.scanColumnMetadataV2(keyspaceName)
  667. }
  668. if err != nil && err != ErrNotFound {
  669. return nil, fmt.Errorf("Error querying column schema: %v", err)
  670. }
  671. return columns, nil
  672. }
  673. // type definition parser state
  674. type typeParser struct {
  675. input string
  676. index int
  677. }
  678. // the type definition parser result
  679. type typeParserResult struct {
  680. isComposite bool
  681. types []TypeInfo
  682. reversed []bool
  683. collections map[string]TypeInfo
  684. }
  685. // Parse the type definition used for validator and comparator schema data
  686. func parseType(def string) typeParserResult {
  687. parser := &typeParser{input: def}
  688. return parser.parse()
  689. }
  690. const (
  691. REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
  692. COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
  693. COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
  694. LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
  695. SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
  696. MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
  697. )
  698. // represents a class specification in the type def AST
  699. type typeParserClassNode struct {
  700. name string
  701. params []typeParserParamNode
  702. // this is the segment of the input string that defined this node
  703. input string
  704. }
  705. // represents a class parameter in the type def AST
  706. type typeParserParamNode struct {
  707. name *string
  708. class typeParserClassNode
  709. }
  710. func (t *typeParser) parse() typeParserResult {
  711. // parse the AST
  712. ast, ok := t.parseClassNode()
  713. if !ok {
  714. // treat this is a custom type
  715. return typeParserResult{
  716. isComposite: false,
  717. types: []TypeInfo{
  718. NativeType{
  719. typ: TypeCustom,
  720. custom: t.input,
  721. },
  722. },
  723. reversed: []bool{false},
  724. collections: nil,
  725. }
  726. }
  727. // interpret the AST
  728. if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
  729. count := len(ast.params)
  730. // look for a collections param
  731. last := ast.params[count-1]
  732. collections := map[string]TypeInfo{}
  733. if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
  734. count--
  735. for _, param := range last.class.params {
  736. // decode the name
  737. var name string
  738. decoded, err := hex.DecodeString(*param.name)
  739. if err != nil {
  740. Logger.Printf(
  741. "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
  742. t.input,
  743. *param.name,
  744. err,
  745. )
  746. // just use the provided name
  747. name = *param.name
  748. } else {
  749. name = string(decoded)
  750. }
  751. collections[name] = param.class.asTypeInfo()
  752. }
  753. }
  754. types := make([]TypeInfo, count)
  755. reversed := make([]bool, count)
  756. for i, param := range ast.params[:count] {
  757. class := param.class
  758. reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
  759. if reversed[i] {
  760. class = class.params[0].class
  761. }
  762. types[i] = class.asTypeInfo()
  763. }
  764. return typeParserResult{
  765. isComposite: true,
  766. types: types,
  767. reversed: reversed,
  768. collections: collections,
  769. }
  770. } else {
  771. // not composite, so one type
  772. class := *ast
  773. reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
  774. if reversed {
  775. class = class.params[0].class
  776. }
  777. typeInfo := class.asTypeInfo()
  778. return typeParserResult{
  779. isComposite: false,
  780. types: []TypeInfo{typeInfo},
  781. reversed: []bool{reversed},
  782. }
  783. }
  784. }
  785. func (class *typeParserClassNode) asTypeInfo() TypeInfo {
  786. if strings.HasPrefix(class.name, LIST_TYPE) {
  787. elem := class.params[0].class.asTypeInfo()
  788. return CollectionType{
  789. NativeType: NativeType{
  790. typ: TypeList,
  791. },
  792. Elem: elem,
  793. }
  794. }
  795. if strings.HasPrefix(class.name, SET_TYPE) {
  796. elem := class.params[0].class.asTypeInfo()
  797. return CollectionType{
  798. NativeType: NativeType{
  799. typ: TypeSet,
  800. },
  801. Elem: elem,
  802. }
  803. }
  804. if strings.HasPrefix(class.name, MAP_TYPE) {
  805. key := class.params[0].class.asTypeInfo()
  806. elem := class.params[1].class.asTypeInfo()
  807. return CollectionType{
  808. NativeType: NativeType{
  809. typ: TypeMap,
  810. },
  811. Key: key,
  812. Elem: elem,
  813. }
  814. }
  815. // must be a simple type or custom type
  816. info := NativeType{typ: getApacheCassandraType(class.name)}
  817. if info.typ == TypeCustom {
  818. // add the entire class definition
  819. info.custom = class.input
  820. }
  821. return info
  822. }
  823. // CLASS := ID [ PARAMS ]
  824. func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
  825. t.skipWhitespace()
  826. startIndex := t.index
  827. name, ok := t.nextIdentifier()
  828. if !ok {
  829. return nil, false
  830. }
  831. params, ok := t.parseParamNodes()
  832. if !ok {
  833. return nil, false
  834. }
  835. endIndex := t.index
  836. node = &typeParserClassNode{
  837. name: name,
  838. params: params,
  839. input: t.input[startIndex:endIndex],
  840. }
  841. return node, true
  842. }
  843. // PARAMS := "(" PARAM { "," PARAM } ")"
  844. // PARAM := [ PARAM_NAME ":" ] CLASS
  845. // PARAM_NAME := ID
  846. func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
  847. t.skipWhitespace()
  848. // the params are optional
  849. if t.index == len(t.input) || t.input[t.index] != '(' {
  850. return nil, true
  851. }
  852. params = []typeParserParamNode{}
  853. // consume the '('
  854. t.index++
  855. t.skipWhitespace()
  856. for t.input[t.index] != ')' {
  857. // look for a named param, but if no colon, then we want to backup
  858. backupIndex := t.index
  859. // name will be a hex encoded version of a utf-8 string
  860. name, ok := t.nextIdentifier()
  861. if !ok {
  862. return nil, false
  863. }
  864. hasName := true
  865. // TODO handle '=>' used for DynamicCompositeType
  866. t.skipWhitespace()
  867. if t.input[t.index] == ':' {
  868. // there is a name for this parameter
  869. // consume the ':'
  870. t.index++
  871. t.skipWhitespace()
  872. } else {
  873. // no name, backup
  874. hasName = false
  875. t.index = backupIndex
  876. }
  877. // parse the next full parameter
  878. classNode, ok := t.parseClassNode()
  879. if !ok {
  880. return nil, false
  881. }
  882. if hasName {
  883. params = append(
  884. params,
  885. typeParserParamNode{name: &name, class: *classNode},
  886. )
  887. } else {
  888. params = append(
  889. params,
  890. typeParserParamNode{class: *classNode},
  891. )
  892. }
  893. t.skipWhitespace()
  894. if t.input[t.index] == ',' {
  895. // consume the comma
  896. t.index++
  897. t.skipWhitespace()
  898. }
  899. }
  900. // consume the ')'
  901. t.index++
  902. return params, true
  903. }
  904. func (t *typeParser) skipWhitespace() {
  905. for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
  906. t.index++
  907. }
  908. }
  909. func isWhitespaceChar(c byte) bool {
  910. return c == ' ' || c == '\n' || c == '\t'
  911. }
  912. // ID := LETTER { LETTER }
  913. // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
  914. func (t *typeParser) nextIdentifier() (id string, found bool) {
  915. startIndex := t.index
  916. for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
  917. t.index++
  918. }
  919. if startIndex == t.index {
  920. return "", false
  921. }
  922. return t.input[startIndex:t.index], true
  923. }
  924. func isIdentifierChar(c byte) bool {
  925. return (c >= '0' && c <= '9') ||
  926. (c >= 'a' && c <= 'z') ||
  927. (c >= 'A' && c <= 'Z') ||
  928. c == '-' ||
  929. c == '+' ||
  930. c == '.' ||
  931. c == '_' ||
  932. c == '&'
  933. }