metadata.go 25 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. // schema metadata for a keyspace
  14. type KeyspaceMetadata struct {
  15. Name string
  16. DurableWrites bool
  17. StrategyClass string
  18. StrategyOptions map[string]interface{}
  19. Tables map[string]*TableMetadata
  20. }
  21. // schema metadata for a table (a.k.a. column family)
  22. type TableMetadata struct {
  23. Keyspace string
  24. Name string
  25. KeyValidator string
  26. Comparator string
  27. DefaultValidator string
  28. KeyAliases []string
  29. ColumnAliases []string
  30. ValueAlias string
  31. PartitionKey []*ColumnMetadata
  32. ClusteringColumns []*ColumnMetadata
  33. Columns map[string]*ColumnMetadata
  34. OrderedColumns []string
  35. }
  36. // schema metadata for a column
  37. type ColumnMetadata struct {
  38. Keyspace string
  39. Table string
  40. Name string
  41. ComponentIndex int
  42. Kind ColumnKind
  43. Validator string
  44. Type TypeInfo
  45. ClusteringOrder string
  46. Order ColumnOrder
  47. Index ColumnIndexMetadata
  48. }
  49. // the ordering of the column with regard to its comparator
  50. type ColumnOrder bool
  51. const (
  52. ASC ColumnOrder = false
  53. DESC = true
  54. )
  55. type ColumnIndexMetadata struct {
  56. Name string
  57. Type string
  58. Options map[string]interface{}
  59. }
  60. type ColumnKind int
  61. const (
  62. ColumnUnkownKind ColumnKind = iota
  63. ColumnPartitionKey
  64. ColumnClusteringKey
  65. ColumnRegular
  66. ColumnCompact
  67. ColumnStatic
  68. )
  69. func (c ColumnKind) String() string {
  70. switch c {
  71. case ColumnPartitionKey:
  72. return "partition_key"
  73. case ColumnClusteringKey:
  74. return "clustering_key"
  75. case ColumnRegular:
  76. return "regular"
  77. case ColumnCompact:
  78. return "compact"
  79. case ColumnStatic:
  80. return "static"
  81. default:
  82. return fmt.Sprintf("unknown_column_%d", c)
  83. }
  84. }
  85. func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
  86. if typ.Type() != TypeVarchar {
  87. return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
  88. }
  89. kind, err := columnKindFromSchema(string(p))
  90. if err != nil {
  91. return err
  92. }
  93. *c = kind
  94. return nil
  95. }
  96. func columnKindFromSchema(kind string) (ColumnKind, error) {
  97. switch kind {
  98. case "partition_key":
  99. return ColumnPartitionKey, nil
  100. case "clustering_key", "clustering":
  101. return ColumnClusteringKey, nil
  102. case "regular":
  103. return ColumnRegular, nil
  104. case "compact_value":
  105. return ColumnCompact, nil
  106. case "static":
  107. return ColumnStatic, nil
  108. default:
  109. return -1, fmt.Errorf("unknown column kind: %q", kind)
  110. }
  111. }
  112. // default alias values
  113. const (
  114. DEFAULT_KEY_ALIAS = "key"
  115. DEFAULT_COLUMN_ALIAS = "column"
  116. DEFAULT_VALUE_ALIAS = "value"
  117. )
  118. // queries the cluster for schema information for a specific keyspace
  119. type schemaDescriber struct {
  120. session *Session
  121. mu sync.Mutex
  122. cache map[string]*KeyspaceMetadata
  123. }
  124. // creates a session bound schema describer which will query and cache
  125. // keyspace metadata
  126. func newSchemaDescriber(session *Session) *schemaDescriber {
  127. return &schemaDescriber{
  128. session: session,
  129. cache: map[string]*KeyspaceMetadata{},
  130. }
  131. }
  132. // returns the cached KeyspaceMetadata held by the describer for the named
  133. // keyspace.
  134. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
  135. s.mu.Lock()
  136. defer s.mu.Unlock()
  137. metadata, found := s.cache[keyspaceName]
  138. if !found {
  139. // refresh the cache for this keyspace
  140. err := s.refreshSchema(keyspaceName)
  141. if err != nil {
  142. return nil, err
  143. }
  144. metadata = s.cache[keyspaceName]
  145. }
  146. return metadata, nil
  147. }
  148. // clears the already cached keyspace metadata
  149. func (s *schemaDescriber) clearSchema(keyspaceName string) {
  150. s.mu.Lock()
  151. defer s.mu.Unlock()
  152. delete(s.cache, keyspaceName)
  153. }
  154. // forcibly updates the current KeyspaceMetadata held by the schema describer
  155. // for a given named keyspace.
  156. func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
  157. var err error
  158. // query the system keyspace for schema data
  159. // TODO retrieve concurrently
  160. keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
  161. if err != nil {
  162. return err
  163. }
  164. tables, err := getTableMetadata(s.session, keyspaceName)
  165. if err != nil {
  166. return err
  167. }
  168. columns, err := getColumnMetadata(s.session, keyspaceName)
  169. if err != nil {
  170. return err
  171. }
  172. // organize the schema data
  173. compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns)
  174. // update the cache
  175. s.cache[keyspaceName] = keyspace
  176. return nil
  177. }
  178. // "compiles" derived information about keyspace, table, and column metadata
  179. // for a keyspace from the basic queried metadata objects returned by
  180. // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
  181. // Links the metadata objects together and derives the column composition of
  182. // the partition key and clustering key for a table.
  183. func compileMetadata(
  184. protoVersion int,
  185. keyspace *KeyspaceMetadata,
  186. tables []TableMetadata,
  187. columns []ColumnMetadata,
  188. ) {
  189. keyspace.Tables = make(map[string]*TableMetadata)
  190. for i := range tables {
  191. tables[i].Columns = make(map[string]*ColumnMetadata)
  192. keyspace.Tables[tables[i].Name] = &tables[i]
  193. }
  194. // add columns from the schema data
  195. for i := range columns {
  196. // decode the validator for TypeInfo and order
  197. if columns[i].ClusteringOrder != "" { // Cassandra 3.x+
  198. columns[i].Type = NativeType{typ: getCassandraType(columns[i].Validator)}
  199. columns[i].Order = ASC
  200. if columns[i].ClusteringOrder == "desc" {
  201. columns[i].Order = DESC
  202. }
  203. } else {
  204. validatorParsed := parseType(columns[i].Validator)
  205. columns[i].Type = validatorParsed.types[0]
  206. columns[i].Order = ASC
  207. if validatorParsed.reversed[0] {
  208. columns[i].Order = DESC
  209. }
  210. }
  211. table := keyspace.Tables[columns[i].Table]
  212. table.Columns[columns[i].Name] = &columns[i]
  213. table.OrderedColumns = append(table.OrderedColumns, columns[i].Name)
  214. }
  215. if protoVersion == protoVersion1 {
  216. compileV1Metadata(tables)
  217. } else {
  218. compileV2Metadata(tables)
  219. }
  220. }
  221. // Compiles derived information from TableMetadata which have had
  222. // ColumnMetadata added already. V1 protocol does not return as much
  223. // column metadata as V2+ (because V1 doesn't support the "type" column in the
  224. // system.schema_columns table) so determining PartitionKey and ClusterColumns
  225. // is more complex.
  226. func compileV1Metadata(tables []TableMetadata) {
  227. for i := range tables {
  228. table := &tables[i]
  229. // decode the key validator
  230. keyValidatorParsed := parseType(table.KeyValidator)
  231. // decode the comparator
  232. comparatorParsed := parseType(table.Comparator)
  233. // the partition key length is the same as the number of types in the
  234. // key validator
  235. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  236. // V1 protocol only returns "regular" columns from
  237. // system.schema_columns (there is no type field for columns)
  238. // so the alias information is used to
  239. // create the partition key and clustering columns
  240. // construct the partition key from the alias
  241. for i := range table.PartitionKey {
  242. var alias string
  243. if len(table.KeyAliases) > i {
  244. alias = table.KeyAliases[i]
  245. } else if i == 0 {
  246. alias = DEFAULT_KEY_ALIAS
  247. } else {
  248. alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
  249. }
  250. column := &ColumnMetadata{
  251. Keyspace: table.Keyspace,
  252. Table: table.Name,
  253. Name: alias,
  254. Type: keyValidatorParsed.types[i],
  255. Kind: ColumnPartitionKey,
  256. ComponentIndex: i,
  257. }
  258. table.PartitionKey[i] = column
  259. table.Columns[alias] = column
  260. }
  261. // determine the number of clustering columns
  262. size := len(comparatorParsed.types)
  263. if comparatorParsed.isComposite {
  264. if len(comparatorParsed.collections) != 0 ||
  265. (len(table.ColumnAliases) == size-1 &&
  266. comparatorParsed.types[size-1].Type() == TypeVarchar) {
  267. size = size - 1
  268. }
  269. } else {
  270. if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
  271. size = 0
  272. }
  273. }
  274. table.ClusteringColumns = make([]*ColumnMetadata, size)
  275. for i := range table.ClusteringColumns {
  276. var alias string
  277. if len(table.ColumnAliases) > i {
  278. alias = table.ColumnAliases[i]
  279. } else if i == 0 {
  280. alias = DEFAULT_COLUMN_ALIAS
  281. } else {
  282. alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
  283. }
  284. order := ASC
  285. if comparatorParsed.reversed[i] {
  286. order = DESC
  287. }
  288. column := &ColumnMetadata{
  289. Keyspace: table.Keyspace,
  290. Table: table.Name,
  291. Name: alias,
  292. Type: comparatorParsed.types[i],
  293. Order: order,
  294. Kind: ColumnClusteringKey,
  295. ComponentIndex: i,
  296. }
  297. table.ClusteringColumns[i] = column
  298. table.Columns[alias] = column
  299. }
  300. if size != len(comparatorParsed.types)-1 {
  301. alias := DEFAULT_VALUE_ALIAS
  302. if len(table.ValueAlias) > 0 {
  303. alias = table.ValueAlias
  304. }
  305. // decode the default validator
  306. defaultValidatorParsed := parseType(table.DefaultValidator)
  307. column := &ColumnMetadata{
  308. Keyspace: table.Keyspace,
  309. Table: table.Name,
  310. Name: alias,
  311. Type: defaultValidatorParsed.types[0],
  312. Kind: ColumnRegular,
  313. }
  314. table.Columns[alias] = column
  315. }
  316. }
  317. }
  318. // The simpler compile case for V2+ protocol
  319. func compileV2Metadata(tables []TableMetadata) {
  320. for i := range tables {
  321. table := &tables[i]
  322. clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
  323. table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
  324. if table.KeyValidator != "" {
  325. keyValidatorParsed := parseType(table.KeyValidator)
  326. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  327. } else { // Cassandra 3.x+
  328. partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
  329. table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
  330. }
  331. for _, columnName := range table.OrderedColumns {
  332. column := table.Columns[columnName]
  333. if column.Kind == ColumnPartitionKey {
  334. table.PartitionKey[column.ComponentIndex] = column
  335. } else if column.Kind == ColumnClusteringKey {
  336. table.ClusteringColumns[column.ComponentIndex] = column
  337. }
  338. }
  339. }
  340. }
  341. // returns the count of coluns with the given "kind" value.
  342. func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
  343. maxComponentIndex := -1
  344. for _, column := range columns {
  345. if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
  346. maxComponentIndex = column.ComponentIndex
  347. }
  348. }
  349. return maxComponentIndex + 1
  350. }
  351. // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
  352. func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
  353. keyspace := &KeyspaceMetadata{Name: keyspaceName}
  354. if session.useSystemSchema { // Cassandra 3.x+
  355. const stmt = `
  356. SELECT durable_writes, replication
  357. FROM system_schema.keyspaces
  358. WHERE keyspace_name = ?`
  359. var replication map[string]string
  360. iter := session.control.query(stmt, keyspaceName)
  361. if iter.NumRows() == 0 {
  362. return nil, ErrKeyspaceDoesNotExist
  363. }
  364. iter.Scan(&keyspace.DurableWrites, &replication)
  365. err := iter.Close()
  366. if err != nil {
  367. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  368. }
  369. keyspace.StrategyClass = replication["class"]
  370. keyspace.StrategyOptions = make(map[string]interface{})
  371. for k, v := range replication {
  372. keyspace.StrategyOptions[k] = v
  373. }
  374. } else {
  375. const stmt = `
  376. SELECT durable_writes, strategy_class, strategy_options
  377. FROM system.schema_keyspaces
  378. WHERE keyspace_name = ?`
  379. var strategyOptionsJSON []byte
  380. iter := session.control.query(stmt, keyspaceName)
  381. if iter.NumRows() == 0 {
  382. return nil, ErrKeyspaceDoesNotExist
  383. }
  384. iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
  385. err := iter.Close()
  386. if err != nil {
  387. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  388. }
  389. err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
  390. if err != nil {
  391. return nil, fmt.Errorf(
  392. "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
  393. strategyOptionsJSON, keyspace.Name, err,
  394. )
  395. }
  396. }
  397. return keyspace, nil
  398. }
  399. // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
  400. func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
  401. var (
  402. iter *Iter
  403. scan func(iter *Iter, table *TableMetadata) bool
  404. stmt string
  405. keyAliasesJSON []byte
  406. columnAliasesJSON []byte
  407. )
  408. if session.useSystemSchema { // Cassandra 3.x+
  409. stmt = `
  410. SELECT
  411. table_name
  412. FROM system_schema.tables
  413. WHERE keyspace_name = ?`
  414. switchIter := func() *Iter {
  415. iter.Close()
  416. stmt = `
  417. SELECT
  418. view_name
  419. FROM system_schema.views
  420. WHERE keyspace_name = ?`
  421. iter = session.control.query(stmt, keyspaceName)
  422. return iter
  423. }
  424. scan = func(iter *Iter, table *TableMetadata) bool {
  425. r := iter.Scan(
  426. &table.Name,
  427. )
  428. if !r {
  429. iter = switchIter()
  430. if iter != nil {
  431. switchIter = func() *Iter { return nil }
  432. r = iter.Scan(&table.Name)
  433. }
  434. }
  435. return r
  436. }
  437. } else if session.cfg.ProtoVersion == protoVersion1 {
  438. // we have key aliases
  439. stmt = `
  440. SELECT
  441. columnfamily_name,
  442. key_validator,
  443. comparator,
  444. default_validator,
  445. key_aliases,
  446. column_aliases,
  447. value_alias
  448. FROM system.schema_columnfamilies
  449. WHERE keyspace_name = ?`
  450. scan = func(iter *Iter, table *TableMetadata) bool {
  451. return iter.Scan(
  452. &table.Name,
  453. &table.KeyValidator,
  454. &table.Comparator,
  455. &table.DefaultValidator,
  456. &keyAliasesJSON,
  457. &columnAliasesJSON,
  458. &table.ValueAlias,
  459. )
  460. }
  461. } else {
  462. stmt = `
  463. SELECT
  464. columnfamily_name,
  465. key_validator,
  466. comparator,
  467. default_validator
  468. FROM system.schema_columnfamilies
  469. WHERE keyspace_name = ?`
  470. scan = func(iter *Iter, table *TableMetadata) bool {
  471. return iter.Scan(
  472. &table.Name,
  473. &table.KeyValidator,
  474. &table.Comparator,
  475. &table.DefaultValidator,
  476. )
  477. }
  478. }
  479. iter = session.control.query(stmt, keyspaceName)
  480. tables := []TableMetadata{}
  481. table := TableMetadata{Keyspace: keyspaceName}
  482. for scan(iter, &table) {
  483. var err error
  484. // decode the key aliases
  485. if keyAliasesJSON != nil {
  486. table.KeyAliases = []string{}
  487. err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
  488. if err != nil {
  489. iter.Close()
  490. return nil, fmt.Errorf(
  491. "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
  492. keyAliasesJSON, table.Name, err,
  493. )
  494. }
  495. }
  496. // decode the column aliases
  497. if columnAliasesJSON != nil {
  498. table.ColumnAliases = []string{}
  499. err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
  500. if err != nil {
  501. iter.Close()
  502. return nil, fmt.Errorf(
  503. "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
  504. columnAliasesJSON, table.Name, err,
  505. )
  506. }
  507. }
  508. tables = append(tables, table)
  509. table = TableMetadata{Keyspace: keyspaceName}
  510. }
  511. err := iter.Close()
  512. if err != nil && err != ErrNotFound {
  513. return nil, fmt.Errorf("Error querying table schema: %v", err)
  514. }
  515. return tables, nil
  516. }
  517. func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
  518. // V1 does not support the type column, and all returned rows are
  519. // of kind "regular".
  520. const stmt = `
  521. SELECT
  522. columnfamily_name,
  523. column_name,
  524. component_index,
  525. validator,
  526. index_name,
  527. index_type,
  528. index_options
  529. FROM system.schema_columns
  530. WHERE keyspace_name = ?`
  531. var columns []ColumnMetadata
  532. rows := s.control.query(stmt, keyspace).Scanner()
  533. for rows.Next() {
  534. var (
  535. column = ColumnMetadata{Keyspace: keyspace}
  536. indexOptionsJSON []byte
  537. )
  538. // all columns returned by V1 are regular
  539. column.Kind = ColumnRegular
  540. err := rows.Scan(&column.Table,
  541. &column.Name,
  542. &column.ComponentIndex,
  543. &column.Validator,
  544. &column.Index.Name,
  545. &column.Index.Type,
  546. &indexOptionsJSON)
  547. if err != nil {
  548. return nil, err
  549. }
  550. if len(indexOptionsJSON) > 0 {
  551. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  552. if err != nil {
  553. return nil, fmt.Errorf(
  554. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  555. indexOptionsJSON,
  556. column.Name,
  557. column.Table,
  558. err)
  559. }
  560. }
  561. columns = append(columns, column)
  562. }
  563. if err := rows.Err(); err != nil {
  564. return nil, err
  565. }
  566. return columns, nil
  567. }
  568. func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
  569. // V2+ supports the type column
  570. const stmt = `
  571. SELECT
  572. columnfamily_name,
  573. column_name,
  574. component_index,
  575. validator,
  576. index_name,
  577. index_type,
  578. index_options,
  579. type
  580. FROM system.schema_columns
  581. WHERE keyspace_name = ?`
  582. var columns []ColumnMetadata
  583. rows := s.control.query(stmt, keyspace).Scanner()
  584. for rows.Next() {
  585. var (
  586. column = ColumnMetadata{Keyspace: keyspace}
  587. indexOptionsJSON []byte
  588. )
  589. err := rows.Scan(&column.Table,
  590. &column.Name,
  591. &column.ComponentIndex,
  592. &column.Validator,
  593. &column.Index.Name,
  594. &column.Index.Type,
  595. &indexOptionsJSON,
  596. &column.Kind,
  597. )
  598. if err != nil {
  599. return nil, err
  600. }
  601. if len(indexOptionsJSON) > 0 {
  602. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  603. if err != nil {
  604. return nil, fmt.Errorf(
  605. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  606. indexOptionsJSON,
  607. column.Name,
  608. column.Table,
  609. err)
  610. }
  611. }
  612. columns = append(columns, column)
  613. }
  614. if err := rows.Err(); err != nil {
  615. return nil, err
  616. }
  617. return columns, nil
  618. }
  619. func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
  620. const stmt = `
  621. SELECT
  622. table_name,
  623. column_name,
  624. clustering_order,
  625. type,
  626. kind,
  627. position
  628. FROM system_schema.columns
  629. WHERE keyspace_name = ?`
  630. var columns []ColumnMetadata
  631. rows := s.control.query(stmt, keyspace).Scanner()
  632. for rows.Next() {
  633. column := ColumnMetadata{Keyspace: keyspace}
  634. err := rows.Scan(&column.Table,
  635. &column.Name,
  636. &column.ClusteringOrder,
  637. &column.Validator,
  638. &column.Kind,
  639. &column.ComponentIndex,
  640. )
  641. if err != nil {
  642. return nil, err
  643. }
  644. columns = append(columns, column)
  645. }
  646. if err := rows.Err(); err != nil {
  647. return nil, err
  648. }
  649. // TODO(zariel): get column index info from system_schema.indexes
  650. return columns, nil
  651. }
  652. // query for only the column metadata in the specified keyspace from system.schema_columns
  653. func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
  654. var (
  655. columns []ColumnMetadata
  656. err error
  657. )
  658. // Deal with differences in protocol versions
  659. if session.cfg.ProtoVersion == 1 {
  660. columns, err = session.scanColumnMetadataV1(keyspaceName)
  661. } else if session.useSystemSchema { // Cassandra 3.x+
  662. columns, err = session.scanColumnMetadataSystem(keyspaceName)
  663. } else {
  664. columns, err = session.scanColumnMetadataV2(keyspaceName)
  665. }
  666. if err != nil && err != ErrNotFound {
  667. return nil, fmt.Errorf("Error querying column schema: %v", err)
  668. }
  669. return columns, nil
  670. }
  671. // type definition parser state
  672. type typeParser struct {
  673. input string
  674. index int
  675. }
  676. // the type definition parser result
  677. type typeParserResult struct {
  678. isComposite bool
  679. types []TypeInfo
  680. reversed []bool
  681. collections map[string]TypeInfo
  682. }
  683. // Parse the type definition used for validator and comparator schema data
  684. func parseType(def string) typeParserResult {
  685. parser := &typeParser{input: def}
  686. return parser.parse()
  687. }
  688. const (
  689. REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
  690. COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
  691. COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
  692. LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
  693. SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
  694. MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
  695. )
  696. // represents a class specification in the type def AST
  697. type typeParserClassNode struct {
  698. name string
  699. params []typeParserParamNode
  700. // this is the segment of the input string that defined this node
  701. input string
  702. }
  703. // represents a class parameter in the type def AST
  704. type typeParserParamNode struct {
  705. name *string
  706. class typeParserClassNode
  707. }
  708. func (t *typeParser) parse() typeParserResult {
  709. // parse the AST
  710. ast, ok := t.parseClassNode()
  711. if !ok {
  712. // treat this is a custom type
  713. return typeParserResult{
  714. isComposite: false,
  715. types: []TypeInfo{
  716. NativeType{
  717. typ: TypeCustom,
  718. custom: t.input,
  719. },
  720. },
  721. reversed: []bool{false},
  722. collections: nil,
  723. }
  724. }
  725. // interpret the AST
  726. if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
  727. count := len(ast.params)
  728. // look for a collections param
  729. last := ast.params[count-1]
  730. collections := map[string]TypeInfo{}
  731. if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
  732. count--
  733. for _, param := range last.class.params {
  734. // decode the name
  735. var name string
  736. decoded, err := hex.DecodeString(*param.name)
  737. if err != nil {
  738. Logger.Printf(
  739. "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
  740. t.input,
  741. *param.name,
  742. err,
  743. )
  744. // just use the provided name
  745. name = *param.name
  746. } else {
  747. name = string(decoded)
  748. }
  749. collections[name] = param.class.asTypeInfo()
  750. }
  751. }
  752. types := make([]TypeInfo, count)
  753. reversed := make([]bool, count)
  754. for i, param := range ast.params[:count] {
  755. class := param.class
  756. reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
  757. if reversed[i] {
  758. class = class.params[0].class
  759. }
  760. types[i] = class.asTypeInfo()
  761. }
  762. return typeParserResult{
  763. isComposite: true,
  764. types: types,
  765. reversed: reversed,
  766. collections: collections,
  767. }
  768. } else {
  769. // not composite, so one type
  770. class := *ast
  771. reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
  772. if reversed {
  773. class = class.params[0].class
  774. }
  775. typeInfo := class.asTypeInfo()
  776. return typeParserResult{
  777. isComposite: false,
  778. types: []TypeInfo{typeInfo},
  779. reversed: []bool{reversed},
  780. }
  781. }
  782. }
  783. func (class *typeParserClassNode) asTypeInfo() TypeInfo {
  784. if strings.HasPrefix(class.name, LIST_TYPE) {
  785. elem := class.params[0].class.asTypeInfo()
  786. return CollectionType{
  787. NativeType: NativeType{
  788. typ: TypeList,
  789. },
  790. Elem: elem,
  791. }
  792. }
  793. if strings.HasPrefix(class.name, SET_TYPE) {
  794. elem := class.params[0].class.asTypeInfo()
  795. return CollectionType{
  796. NativeType: NativeType{
  797. typ: TypeSet,
  798. },
  799. Elem: elem,
  800. }
  801. }
  802. if strings.HasPrefix(class.name, MAP_TYPE) {
  803. key := class.params[0].class.asTypeInfo()
  804. elem := class.params[1].class.asTypeInfo()
  805. return CollectionType{
  806. NativeType: NativeType{
  807. typ: TypeMap,
  808. },
  809. Key: key,
  810. Elem: elem,
  811. }
  812. }
  813. // must be a simple type or custom type
  814. info := NativeType{typ: getApacheCassandraType(class.name)}
  815. if info.typ == TypeCustom {
  816. // add the entire class definition
  817. info.custom = class.input
  818. }
  819. return info
  820. }
  821. // CLASS := ID [ PARAMS ]
  822. func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
  823. t.skipWhitespace()
  824. startIndex := t.index
  825. name, ok := t.nextIdentifier()
  826. if !ok {
  827. return nil, false
  828. }
  829. params, ok := t.parseParamNodes()
  830. if !ok {
  831. return nil, false
  832. }
  833. endIndex := t.index
  834. node = &typeParserClassNode{
  835. name: name,
  836. params: params,
  837. input: t.input[startIndex:endIndex],
  838. }
  839. return node, true
  840. }
  841. // PARAMS := "(" PARAM { "," PARAM } ")"
  842. // PARAM := [ PARAM_NAME ":" ] CLASS
  843. // PARAM_NAME := ID
  844. func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
  845. t.skipWhitespace()
  846. // the params are optional
  847. if t.index == len(t.input) || t.input[t.index] != '(' {
  848. return nil, true
  849. }
  850. params = []typeParserParamNode{}
  851. // consume the '('
  852. t.index++
  853. t.skipWhitespace()
  854. for t.input[t.index] != ')' {
  855. // look for a named param, but if no colon, then we want to backup
  856. backupIndex := t.index
  857. // name will be a hex encoded version of a utf-8 string
  858. name, ok := t.nextIdentifier()
  859. if !ok {
  860. return nil, false
  861. }
  862. hasName := true
  863. // TODO handle '=>' used for DynamicCompositeType
  864. t.skipWhitespace()
  865. if t.input[t.index] == ':' {
  866. // there is a name for this parameter
  867. // consume the ':'
  868. t.index++
  869. t.skipWhitespace()
  870. } else {
  871. // no name, backup
  872. hasName = false
  873. t.index = backupIndex
  874. }
  875. // parse the next full parameter
  876. classNode, ok := t.parseClassNode()
  877. if !ok {
  878. return nil, false
  879. }
  880. if hasName {
  881. params = append(
  882. params,
  883. typeParserParamNode{name: &name, class: *classNode},
  884. )
  885. } else {
  886. params = append(
  887. params,
  888. typeParserParamNode{class: *classNode},
  889. )
  890. }
  891. t.skipWhitespace()
  892. if t.input[t.index] == ',' {
  893. // consume the comma
  894. t.index++
  895. t.skipWhitespace()
  896. }
  897. }
  898. // consume the ')'
  899. t.index++
  900. return params, true
  901. }
  902. func (t *typeParser) skipWhitespace() {
  903. for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
  904. t.index++
  905. }
  906. }
  907. func isWhitespaceChar(c byte) bool {
  908. return c == ' ' || c == '\n' || c == '\t'
  909. }
  910. // ID := LETTER { LETTER }
  911. // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
  912. func (t *typeParser) nextIdentifier() (id string, found bool) {
  913. startIndex := t.index
  914. for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
  915. t.index++
  916. }
  917. if startIndex == t.index {
  918. return "", false
  919. }
  920. return t.input[startIndex:t.index], true
  921. }
  922. func isIdentifierChar(c byte) bool {
  923. return (c >= '0' && c <= '9') ||
  924. (c >= 'a' && c <= 'z') ||
  925. (c >= 'A' && c <= 'Z') ||
  926. c == '-' ||
  927. c == '+' ||
  928. c == '.' ||
  929. c == '_' ||
  930. c == '&'
  931. }