metadata.go 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. // schema metadata for a keyspace
  14. type KeyspaceMetadata struct {
  15. Name string
  16. DurableWrites bool
  17. StrategyClass string
  18. StrategyOptions map[string]interface{}
  19. Tables map[string]*TableMetadata
  20. }
  21. // schema metadata for a table (a.k.a. column family)
  22. type TableMetadata struct {
  23. Keyspace string
  24. Name string
  25. KeyValidator string
  26. Comparator string
  27. DefaultValidator string
  28. KeyAliases []string
  29. ColumnAliases []string
  30. ValueAlias string
  31. PartitionKey []*ColumnMetadata
  32. ClusteringColumns []*ColumnMetadata
  33. Columns map[string]*ColumnMetadata
  34. OrderedColumns []string
  35. }
  36. // schema metadata for a column
  37. type ColumnMetadata struct {
  38. Keyspace string
  39. Table string
  40. Name string
  41. ComponentIndex int
  42. Kind ColumnKind
  43. Validator string
  44. Type TypeInfo
  45. ClusteringOrder string
  46. Order ColumnOrder
  47. Index ColumnIndexMetadata
  48. }
  49. // the ordering of the column with regard to its comparator
  50. type ColumnOrder bool
  51. const (
  52. ASC ColumnOrder = false
  53. DESC = true
  54. )
  55. type ColumnIndexMetadata struct {
  56. Name string
  57. Type string
  58. Options map[string]interface{}
  59. }
  60. type ColumnKind int
  61. const (
  62. ColumnUnkownKind ColumnKind = iota
  63. ColumnPartitionKey
  64. ColumnClusteringKey
  65. ColumnRegular
  66. ColumnCompact
  67. ColumnStatic
  68. )
  69. func (c ColumnKind) String() string {
  70. switch c {
  71. case ColumnPartitionKey:
  72. return "partition_key"
  73. case ColumnClusteringKey:
  74. return "clustering_key"
  75. case ColumnRegular:
  76. return "regular"
  77. case ColumnCompact:
  78. return "compact"
  79. case ColumnStatic:
  80. return "static"
  81. default:
  82. return fmt.Sprintf("unknown_column_%d", c)
  83. }
  84. }
  85. func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
  86. if typ.Type() != TypeVarchar {
  87. return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
  88. }
  89. kind, err := columnKindFromSchema(string(p))
  90. if err != nil {
  91. return err
  92. }
  93. *c = kind
  94. return nil
  95. }
  96. func columnKindFromSchema(kind string) (ColumnKind, error) {
  97. switch kind {
  98. case "partition_key":
  99. return ColumnPartitionKey, nil
  100. case "clustering_key", "clustering":
  101. return ColumnClusteringKey, nil
  102. case "regular":
  103. return ColumnRegular, nil
  104. case "compact_value":
  105. return ColumnCompact, nil
  106. case "static":
  107. return ColumnStatic, nil
  108. default:
  109. return -1, fmt.Errorf("unknown column kind: %q", kind)
  110. }
  111. }
  112. // default alias values
  113. const (
  114. DEFAULT_KEY_ALIAS = "key"
  115. DEFAULT_COLUMN_ALIAS = "column"
  116. DEFAULT_VALUE_ALIAS = "value"
  117. )
  118. // queries the cluster for schema information for a specific keyspace
  119. type schemaDescriber struct {
  120. session *Session
  121. mu sync.Mutex
  122. cache map[string]*KeyspaceMetadata
  123. }
  124. // creates a session bound schema describer which will query and cache
  125. // keyspace metadata
  126. func newSchemaDescriber(session *Session) *schemaDescriber {
  127. return &schemaDescriber{
  128. session: session,
  129. cache: map[string]*KeyspaceMetadata{},
  130. }
  131. }
  132. // returns the cached KeyspaceMetadata held by the describer for the named
  133. // keyspace.
  134. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
  135. s.mu.Lock()
  136. defer s.mu.Unlock()
  137. metadata, found := s.cache[keyspaceName]
  138. if !found {
  139. // refresh the cache for this keyspace
  140. err := s.refreshSchema(keyspaceName)
  141. if err != nil {
  142. return nil, err
  143. }
  144. metadata = s.cache[keyspaceName]
  145. }
  146. return metadata, nil
  147. }
  148. // clears the already cached keyspace metadata
  149. func (s *schemaDescriber) clearSchema(keyspaceName string) {
  150. s.mu.Lock()
  151. defer s.mu.Unlock()
  152. delete(s.cache, keyspaceName)
  153. }
  154. // forcibly updates the current KeyspaceMetadata held by the schema describer
  155. // for a given named keyspace.
  156. func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
  157. var err error
  158. // query the system keyspace for schema data
  159. // TODO retrieve concurrently
  160. keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
  161. if err != nil {
  162. return err
  163. }
  164. tables, err := getTableMetadata(s.session, keyspaceName)
  165. if err != nil {
  166. return err
  167. }
  168. columns, err := getColumnMetadata(s.session, keyspaceName)
  169. if err != nil {
  170. return err
  171. }
  172. // organize the schema data
  173. compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns)
  174. // update the cache
  175. s.cache[keyspaceName] = keyspace
  176. return nil
  177. }
  178. // "compiles" derived information about keyspace, table, and column metadata
  179. // for a keyspace from the basic queried metadata objects returned by
  180. // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
  181. // Links the metadata objects together and derives the column composition of
  182. // the partition key and clustering key for a table.
  183. func compileMetadata(
  184. protoVersion int,
  185. keyspace *KeyspaceMetadata,
  186. tables []TableMetadata,
  187. columns []ColumnMetadata,
  188. ) {
  189. keyspace.Tables = make(map[string]*TableMetadata)
  190. for i := range tables {
  191. tables[i].Columns = make(map[string]*ColumnMetadata)
  192. keyspace.Tables[tables[i].Name] = &tables[i]
  193. }
  194. // add columns from the schema data
  195. for i := range columns {
  196. col := &columns[i]
  197. // decode the validator for TypeInfo and order
  198. if col.ClusteringOrder != "" { // Cassandra 3.x+
  199. col.Type = getCassandraType(col.Validator)
  200. col.Order = ASC
  201. if col.ClusteringOrder == "desc" {
  202. col.Order = DESC
  203. }
  204. } else {
  205. validatorParsed := parseType(col.Validator)
  206. col.Type = validatorParsed.types[0]
  207. col.Order = ASC
  208. if validatorParsed.reversed[0] {
  209. col.Order = DESC
  210. }
  211. }
  212. table, ok := keyspace.Tables[col.Table]
  213. if !ok {
  214. // if the schema is being updated we will race between seeing
  215. // the metadata be complete. Potentially we should check for
  216. // schema versions before and after reading the metadata and
  217. // if they dont match try again.
  218. continue
  219. }
  220. table.Columns[col.Name] = col
  221. table.OrderedColumns = append(table.OrderedColumns, col.Name)
  222. }
  223. if protoVersion == protoVersion1 {
  224. compileV1Metadata(tables)
  225. } else {
  226. compileV2Metadata(tables)
  227. }
  228. }
  229. // Compiles derived information from TableMetadata which have had
  230. // ColumnMetadata added already. V1 protocol does not return as much
  231. // column metadata as V2+ (because V1 doesn't support the "type" column in the
  232. // system.schema_columns table) so determining PartitionKey and ClusterColumns
  233. // is more complex.
  234. func compileV1Metadata(tables []TableMetadata) {
  235. for i := range tables {
  236. table := &tables[i]
  237. // decode the key validator
  238. keyValidatorParsed := parseType(table.KeyValidator)
  239. // decode the comparator
  240. comparatorParsed := parseType(table.Comparator)
  241. // the partition key length is the same as the number of types in the
  242. // key validator
  243. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  244. // V1 protocol only returns "regular" columns from
  245. // system.schema_columns (there is no type field for columns)
  246. // so the alias information is used to
  247. // create the partition key and clustering columns
  248. // construct the partition key from the alias
  249. for i := range table.PartitionKey {
  250. var alias string
  251. if len(table.KeyAliases) > i {
  252. alias = table.KeyAliases[i]
  253. } else if i == 0 {
  254. alias = DEFAULT_KEY_ALIAS
  255. } else {
  256. alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
  257. }
  258. column := &ColumnMetadata{
  259. Keyspace: table.Keyspace,
  260. Table: table.Name,
  261. Name: alias,
  262. Type: keyValidatorParsed.types[i],
  263. Kind: ColumnPartitionKey,
  264. ComponentIndex: i,
  265. }
  266. table.PartitionKey[i] = column
  267. table.Columns[alias] = column
  268. }
  269. // determine the number of clustering columns
  270. size := len(comparatorParsed.types)
  271. if comparatorParsed.isComposite {
  272. if len(comparatorParsed.collections) != 0 ||
  273. (len(table.ColumnAliases) == size-1 &&
  274. comparatorParsed.types[size-1].Type() == TypeVarchar) {
  275. size = size - 1
  276. }
  277. } else {
  278. if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
  279. size = 0
  280. }
  281. }
  282. table.ClusteringColumns = make([]*ColumnMetadata, size)
  283. for i := range table.ClusteringColumns {
  284. var alias string
  285. if len(table.ColumnAliases) > i {
  286. alias = table.ColumnAliases[i]
  287. } else if i == 0 {
  288. alias = DEFAULT_COLUMN_ALIAS
  289. } else {
  290. alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
  291. }
  292. order := ASC
  293. if comparatorParsed.reversed[i] {
  294. order = DESC
  295. }
  296. column := &ColumnMetadata{
  297. Keyspace: table.Keyspace,
  298. Table: table.Name,
  299. Name: alias,
  300. Type: comparatorParsed.types[i],
  301. Order: order,
  302. Kind: ColumnClusteringKey,
  303. ComponentIndex: i,
  304. }
  305. table.ClusteringColumns[i] = column
  306. table.Columns[alias] = column
  307. }
  308. if size != len(comparatorParsed.types)-1 {
  309. alias := DEFAULT_VALUE_ALIAS
  310. if len(table.ValueAlias) > 0 {
  311. alias = table.ValueAlias
  312. }
  313. // decode the default validator
  314. defaultValidatorParsed := parseType(table.DefaultValidator)
  315. column := &ColumnMetadata{
  316. Keyspace: table.Keyspace,
  317. Table: table.Name,
  318. Name: alias,
  319. Type: defaultValidatorParsed.types[0],
  320. Kind: ColumnRegular,
  321. }
  322. table.Columns[alias] = column
  323. }
  324. }
  325. }
  326. // The simpler compile case for V2+ protocol
  327. func compileV2Metadata(tables []TableMetadata) {
  328. for i := range tables {
  329. table := &tables[i]
  330. clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
  331. table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
  332. if table.KeyValidator != "" {
  333. keyValidatorParsed := parseType(table.KeyValidator)
  334. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  335. } else { // Cassandra 3.x+
  336. partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
  337. table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
  338. }
  339. for _, columnName := range table.OrderedColumns {
  340. column := table.Columns[columnName]
  341. if column.Kind == ColumnPartitionKey {
  342. table.PartitionKey[column.ComponentIndex] = column
  343. } else if column.Kind == ColumnClusteringKey {
  344. table.ClusteringColumns[column.ComponentIndex] = column
  345. }
  346. }
  347. }
  348. }
  349. // returns the count of coluns with the given "kind" value.
  350. func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
  351. maxComponentIndex := -1
  352. for _, column := range columns {
  353. if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
  354. maxComponentIndex = column.ComponentIndex
  355. }
  356. }
  357. return maxComponentIndex + 1
  358. }
  359. // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
  360. func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
  361. keyspace := &KeyspaceMetadata{Name: keyspaceName}
  362. if session.useSystemSchema { // Cassandra 3.x+
  363. const stmt = `
  364. SELECT durable_writes, replication
  365. FROM system_schema.keyspaces
  366. WHERE keyspace_name = ?`
  367. var replication map[string]string
  368. iter := session.control.query(stmt, keyspaceName)
  369. if iter.NumRows() == 0 {
  370. return nil, ErrKeyspaceDoesNotExist
  371. }
  372. iter.Scan(&keyspace.DurableWrites, &replication)
  373. err := iter.Close()
  374. if err != nil {
  375. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  376. }
  377. keyspace.StrategyClass = replication["class"]
  378. delete(replication, "class")
  379. keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
  380. for k, v := range replication {
  381. keyspace.StrategyOptions[k] = v
  382. }
  383. } else {
  384. const stmt = `
  385. SELECT durable_writes, strategy_class, strategy_options
  386. FROM system.schema_keyspaces
  387. WHERE keyspace_name = ?`
  388. var strategyOptionsJSON []byte
  389. iter := session.control.query(stmt, keyspaceName)
  390. if iter.NumRows() == 0 {
  391. return nil, ErrKeyspaceDoesNotExist
  392. }
  393. iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
  394. err := iter.Close()
  395. if err != nil {
  396. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  397. }
  398. err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
  399. if err != nil {
  400. return nil, fmt.Errorf(
  401. "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
  402. strategyOptionsJSON, keyspace.Name, err,
  403. )
  404. }
  405. }
  406. return keyspace, nil
  407. }
  408. // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
  409. func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
  410. var (
  411. iter *Iter
  412. scan func(iter *Iter, table *TableMetadata) bool
  413. stmt string
  414. keyAliasesJSON []byte
  415. columnAliasesJSON []byte
  416. )
  417. if session.useSystemSchema { // Cassandra 3.x+
  418. stmt = `
  419. SELECT
  420. table_name
  421. FROM system_schema.tables
  422. WHERE keyspace_name = ?`
  423. switchIter := func() *Iter {
  424. iter.Close()
  425. stmt = `
  426. SELECT
  427. view_name
  428. FROM system_schema.views
  429. WHERE keyspace_name = ?`
  430. iter = session.control.query(stmt, keyspaceName)
  431. return iter
  432. }
  433. scan = func(iter *Iter, table *TableMetadata) bool {
  434. r := iter.Scan(
  435. &table.Name,
  436. )
  437. if !r {
  438. iter = switchIter()
  439. if iter != nil {
  440. switchIter = func() *Iter { return nil }
  441. r = iter.Scan(&table.Name)
  442. }
  443. }
  444. return r
  445. }
  446. } else if session.cfg.ProtoVersion == protoVersion1 {
  447. // we have key aliases
  448. stmt = `
  449. SELECT
  450. columnfamily_name,
  451. key_validator,
  452. comparator,
  453. default_validator,
  454. key_aliases,
  455. column_aliases,
  456. value_alias
  457. FROM system.schema_columnfamilies
  458. WHERE keyspace_name = ?`
  459. scan = func(iter *Iter, table *TableMetadata) bool {
  460. return iter.Scan(
  461. &table.Name,
  462. &table.KeyValidator,
  463. &table.Comparator,
  464. &table.DefaultValidator,
  465. &keyAliasesJSON,
  466. &columnAliasesJSON,
  467. &table.ValueAlias,
  468. )
  469. }
  470. } else {
  471. stmt = `
  472. SELECT
  473. columnfamily_name,
  474. key_validator,
  475. comparator,
  476. default_validator
  477. FROM system.schema_columnfamilies
  478. WHERE keyspace_name = ?`
  479. scan = func(iter *Iter, table *TableMetadata) bool {
  480. return iter.Scan(
  481. &table.Name,
  482. &table.KeyValidator,
  483. &table.Comparator,
  484. &table.DefaultValidator,
  485. )
  486. }
  487. }
  488. iter = session.control.query(stmt, keyspaceName)
  489. tables := []TableMetadata{}
  490. table := TableMetadata{Keyspace: keyspaceName}
  491. for scan(iter, &table) {
  492. var err error
  493. // decode the key aliases
  494. if keyAliasesJSON != nil {
  495. table.KeyAliases = []string{}
  496. err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
  497. if err != nil {
  498. iter.Close()
  499. return nil, fmt.Errorf(
  500. "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
  501. keyAliasesJSON, table.Name, err,
  502. )
  503. }
  504. }
  505. // decode the column aliases
  506. if columnAliasesJSON != nil {
  507. table.ColumnAliases = []string{}
  508. err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
  509. if err != nil {
  510. iter.Close()
  511. return nil, fmt.Errorf(
  512. "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
  513. columnAliasesJSON, table.Name, err,
  514. )
  515. }
  516. }
  517. tables = append(tables, table)
  518. table = TableMetadata{Keyspace: keyspaceName}
  519. }
  520. err := iter.Close()
  521. if err != nil && err != ErrNotFound {
  522. return nil, fmt.Errorf("Error querying table schema: %v", err)
  523. }
  524. return tables, nil
  525. }
  526. func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
  527. // V1 does not support the type column, and all returned rows are
  528. // of kind "regular".
  529. const stmt = `
  530. SELECT
  531. columnfamily_name,
  532. column_name,
  533. component_index,
  534. validator,
  535. index_name,
  536. index_type,
  537. index_options
  538. FROM system.schema_columns
  539. WHERE keyspace_name = ?`
  540. var columns []ColumnMetadata
  541. rows := s.control.query(stmt, keyspace).Scanner()
  542. for rows.Next() {
  543. var (
  544. column = ColumnMetadata{Keyspace: keyspace}
  545. indexOptionsJSON []byte
  546. )
  547. // all columns returned by V1 are regular
  548. column.Kind = ColumnRegular
  549. err := rows.Scan(&column.Table,
  550. &column.Name,
  551. &column.ComponentIndex,
  552. &column.Validator,
  553. &column.Index.Name,
  554. &column.Index.Type,
  555. &indexOptionsJSON)
  556. if err != nil {
  557. return nil, err
  558. }
  559. if len(indexOptionsJSON) > 0 {
  560. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  561. if err != nil {
  562. return nil, fmt.Errorf(
  563. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  564. indexOptionsJSON,
  565. column.Name,
  566. column.Table,
  567. err)
  568. }
  569. }
  570. columns = append(columns, column)
  571. }
  572. if err := rows.Err(); err != nil {
  573. return nil, err
  574. }
  575. return columns, nil
  576. }
  577. func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
  578. // V2+ supports the type column
  579. const stmt = `
  580. SELECT
  581. columnfamily_name,
  582. column_name,
  583. component_index,
  584. validator,
  585. index_name,
  586. index_type,
  587. index_options,
  588. type
  589. FROM system.schema_columns
  590. WHERE keyspace_name = ?`
  591. var columns []ColumnMetadata
  592. rows := s.control.query(stmt, keyspace).Scanner()
  593. for rows.Next() {
  594. var (
  595. column = ColumnMetadata{Keyspace: keyspace}
  596. indexOptionsJSON []byte
  597. )
  598. err := rows.Scan(&column.Table,
  599. &column.Name,
  600. &column.ComponentIndex,
  601. &column.Validator,
  602. &column.Index.Name,
  603. &column.Index.Type,
  604. &indexOptionsJSON,
  605. &column.Kind,
  606. )
  607. if err != nil {
  608. return nil, err
  609. }
  610. if len(indexOptionsJSON) > 0 {
  611. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  612. if err != nil {
  613. return nil, fmt.Errorf(
  614. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  615. indexOptionsJSON,
  616. column.Name,
  617. column.Table,
  618. err)
  619. }
  620. }
  621. columns = append(columns, column)
  622. }
  623. if err := rows.Err(); err != nil {
  624. return nil, err
  625. }
  626. return columns, nil
  627. }
  628. func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
  629. const stmt = `
  630. SELECT
  631. table_name,
  632. column_name,
  633. clustering_order,
  634. type,
  635. kind,
  636. position
  637. FROM system_schema.columns
  638. WHERE keyspace_name = ?`
  639. var columns []ColumnMetadata
  640. rows := s.control.query(stmt, keyspace).Scanner()
  641. for rows.Next() {
  642. column := ColumnMetadata{Keyspace: keyspace}
  643. err := rows.Scan(&column.Table,
  644. &column.Name,
  645. &column.ClusteringOrder,
  646. &column.Validator,
  647. &column.Kind,
  648. &column.ComponentIndex,
  649. )
  650. if err != nil {
  651. return nil, err
  652. }
  653. columns = append(columns, column)
  654. }
  655. if err := rows.Err(); err != nil {
  656. return nil, err
  657. }
  658. // TODO(zariel): get column index info from system_schema.indexes
  659. return columns, nil
  660. }
  661. // query for only the column metadata in the specified keyspace from system.schema_columns
  662. func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
  663. var (
  664. columns []ColumnMetadata
  665. err error
  666. )
  667. // Deal with differences in protocol versions
  668. if session.cfg.ProtoVersion == 1 {
  669. columns, err = session.scanColumnMetadataV1(keyspaceName)
  670. } else if session.useSystemSchema { // Cassandra 3.x+
  671. columns, err = session.scanColumnMetadataSystem(keyspaceName)
  672. } else {
  673. columns, err = session.scanColumnMetadataV2(keyspaceName)
  674. }
  675. if err != nil && err != ErrNotFound {
  676. return nil, fmt.Errorf("Error querying column schema: %v", err)
  677. }
  678. return columns, nil
  679. }
  680. // type definition parser state
  681. type typeParser struct {
  682. input string
  683. index int
  684. }
  685. // the type definition parser result
  686. type typeParserResult struct {
  687. isComposite bool
  688. types []TypeInfo
  689. reversed []bool
  690. collections map[string]TypeInfo
  691. }
  692. // Parse the type definition used for validator and comparator schema data
  693. func parseType(def string) typeParserResult {
  694. parser := &typeParser{input: def}
  695. return parser.parse()
  696. }
  697. const (
  698. REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
  699. COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
  700. COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
  701. LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
  702. SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
  703. MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
  704. )
  705. // represents a class specification in the type def AST
  706. type typeParserClassNode struct {
  707. name string
  708. params []typeParserParamNode
  709. // this is the segment of the input string that defined this node
  710. input string
  711. }
  712. // represents a class parameter in the type def AST
  713. type typeParserParamNode struct {
  714. name *string
  715. class typeParserClassNode
  716. }
  717. func (t *typeParser) parse() typeParserResult {
  718. // parse the AST
  719. ast, ok := t.parseClassNode()
  720. if !ok {
  721. // treat this is a custom type
  722. return typeParserResult{
  723. isComposite: false,
  724. types: []TypeInfo{
  725. NativeType{
  726. typ: TypeCustom,
  727. custom: t.input,
  728. },
  729. },
  730. reversed: []bool{false},
  731. collections: nil,
  732. }
  733. }
  734. // interpret the AST
  735. if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
  736. count := len(ast.params)
  737. // look for a collections param
  738. last := ast.params[count-1]
  739. collections := map[string]TypeInfo{}
  740. if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
  741. count--
  742. for _, param := range last.class.params {
  743. // decode the name
  744. var name string
  745. decoded, err := hex.DecodeString(*param.name)
  746. if err != nil {
  747. Logger.Printf(
  748. "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
  749. t.input,
  750. *param.name,
  751. err,
  752. )
  753. // just use the provided name
  754. name = *param.name
  755. } else {
  756. name = string(decoded)
  757. }
  758. collections[name] = param.class.asTypeInfo()
  759. }
  760. }
  761. types := make([]TypeInfo, count)
  762. reversed := make([]bool, count)
  763. for i, param := range ast.params[:count] {
  764. class := param.class
  765. reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
  766. if reversed[i] {
  767. class = class.params[0].class
  768. }
  769. types[i] = class.asTypeInfo()
  770. }
  771. return typeParserResult{
  772. isComposite: true,
  773. types: types,
  774. reversed: reversed,
  775. collections: collections,
  776. }
  777. } else {
  778. // not composite, so one type
  779. class := *ast
  780. reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
  781. if reversed {
  782. class = class.params[0].class
  783. }
  784. typeInfo := class.asTypeInfo()
  785. return typeParserResult{
  786. isComposite: false,
  787. types: []TypeInfo{typeInfo},
  788. reversed: []bool{reversed},
  789. }
  790. }
  791. }
  792. func (class *typeParserClassNode) asTypeInfo() TypeInfo {
  793. if strings.HasPrefix(class.name, LIST_TYPE) {
  794. elem := class.params[0].class.asTypeInfo()
  795. return CollectionType{
  796. NativeType: NativeType{
  797. typ: TypeList,
  798. },
  799. Elem: elem,
  800. }
  801. }
  802. if strings.HasPrefix(class.name, SET_TYPE) {
  803. elem := class.params[0].class.asTypeInfo()
  804. return CollectionType{
  805. NativeType: NativeType{
  806. typ: TypeSet,
  807. },
  808. Elem: elem,
  809. }
  810. }
  811. if strings.HasPrefix(class.name, MAP_TYPE) {
  812. key := class.params[0].class.asTypeInfo()
  813. elem := class.params[1].class.asTypeInfo()
  814. return CollectionType{
  815. NativeType: NativeType{
  816. typ: TypeMap,
  817. },
  818. Key: key,
  819. Elem: elem,
  820. }
  821. }
  822. // must be a simple type or custom type
  823. info := NativeType{typ: getApacheCassandraType(class.name)}
  824. if info.typ == TypeCustom {
  825. // add the entire class definition
  826. info.custom = class.input
  827. }
  828. return info
  829. }
  830. // CLASS := ID [ PARAMS ]
  831. func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
  832. t.skipWhitespace()
  833. startIndex := t.index
  834. name, ok := t.nextIdentifier()
  835. if !ok {
  836. return nil, false
  837. }
  838. params, ok := t.parseParamNodes()
  839. if !ok {
  840. return nil, false
  841. }
  842. endIndex := t.index
  843. node = &typeParserClassNode{
  844. name: name,
  845. params: params,
  846. input: t.input[startIndex:endIndex],
  847. }
  848. return node, true
  849. }
  850. // PARAMS := "(" PARAM { "," PARAM } ")"
  851. // PARAM := [ PARAM_NAME ":" ] CLASS
  852. // PARAM_NAME := ID
  853. func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
  854. t.skipWhitespace()
  855. // the params are optional
  856. if t.index == len(t.input) || t.input[t.index] != '(' {
  857. return nil, true
  858. }
  859. params = []typeParserParamNode{}
  860. // consume the '('
  861. t.index++
  862. t.skipWhitespace()
  863. for t.input[t.index] != ')' {
  864. // look for a named param, but if no colon, then we want to backup
  865. backupIndex := t.index
  866. // name will be a hex encoded version of a utf-8 string
  867. name, ok := t.nextIdentifier()
  868. if !ok {
  869. return nil, false
  870. }
  871. hasName := true
  872. // TODO handle '=>' used for DynamicCompositeType
  873. t.skipWhitespace()
  874. if t.input[t.index] == ':' {
  875. // there is a name for this parameter
  876. // consume the ':'
  877. t.index++
  878. t.skipWhitespace()
  879. } else {
  880. // no name, backup
  881. hasName = false
  882. t.index = backupIndex
  883. }
  884. // parse the next full parameter
  885. classNode, ok := t.parseClassNode()
  886. if !ok {
  887. return nil, false
  888. }
  889. if hasName {
  890. params = append(
  891. params,
  892. typeParserParamNode{name: &name, class: *classNode},
  893. )
  894. } else {
  895. params = append(
  896. params,
  897. typeParserParamNode{class: *classNode},
  898. )
  899. }
  900. t.skipWhitespace()
  901. if t.input[t.index] == ',' {
  902. // consume the comma
  903. t.index++
  904. t.skipWhitespace()
  905. }
  906. }
  907. // consume the ')'
  908. t.index++
  909. return params, true
  910. }
  911. func (t *typeParser) skipWhitespace() {
  912. for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
  913. t.index++
  914. }
  915. }
  916. func isWhitespaceChar(c byte) bool {
  917. return c == ' ' || c == '\n' || c == '\t'
  918. }
  919. // ID := LETTER { LETTER }
  920. // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
  921. func (t *typeParser) nextIdentifier() (id string, found bool) {
  922. startIndex := t.index
  923. for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
  924. t.index++
  925. }
  926. if startIndex == t.index {
  927. return "", false
  928. }
  929. return t.input[startIndex:t.index], true
  930. }
  931. func isIdentifierChar(c byte) bool {
  932. return (c >= '0' && c <= '9') ||
  933. (c >= 'a' && c <= 'z') ||
  934. (c >= 'A' && c <= 'Z') ||
  935. c == '-' ||
  936. c == '+' ||
  937. c == '.' ||
  938. c == '_' ||
  939. c == '&'
  940. }