metadata.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. // schema metadata for a keyspace
  14. type KeyspaceMetadata struct {
  15. Name string
  16. DurableWrites bool
  17. StrategyClass string
  18. StrategyOptions map[string]interface{}
  19. Tables map[string]*TableMetadata
  20. Functions map[string]*FunctionMetadata
  21. Aggregates map[string]*AggregateMetadata
  22. }
  23. // schema metadata for a table (a.k.a. column family)
  24. type TableMetadata struct {
  25. Keyspace string
  26. Name string
  27. KeyValidator string
  28. Comparator string
  29. DefaultValidator string
  30. KeyAliases []string
  31. ColumnAliases []string
  32. ValueAlias string
  33. PartitionKey []*ColumnMetadata
  34. ClusteringColumns []*ColumnMetadata
  35. Columns map[string]*ColumnMetadata
  36. OrderedColumns []string
  37. }
  38. // schema metadata for a column
  39. type ColumnMetadata struct {
  40. Keyspace string
  41. Table string
  42. Name string
  43. ComponentIndex int
  44. Kind ColumnKind
  45. Validator string
  46. Type TypeInfo
  47. ClusteringOrder string
  48. Order ColumnOrder
  49. Index ColumnIndexMetadata
  50. }
  51. // FunctionMetadata holds metadata for function constructs
  52. type FunctionMetadata struct {
  53. Keyspace string
  54. Name string
  55. ArgumentTypes []TypeInfo
  56. ArgumentNames []string
  57. Body string
  58. CalledOnNullInput bool
  59. Language string
  60. ReturnType TypeInfo
  61. }
  62. // AggregateMetadata holds metadata for aggregate constructs
  63. type AggregateMetadata struct {
  64. Keyspace string
  65. Name string
  66. ArgumentTypes []TypeInfo
  67. FinalFunc FunctionMetadata
  68. InitCond string
  69. ReturnType TypeInfo
  70. StateFunc FunctionMetadata
  71. StateType TypeInfo
  72. stateFunc string
  73. finalFunc string
  74. }
  75. // the ordering of the column with regard to its comparator
  76. type ColumnOrder bool
  77. const (
  78. ASC ColumnOrder = false
  79. DESC = true
  80. )
  81. type ColumnIndexMetadata struct {
  82. Name string
  83. Type string
  84. Options map[string]interface{}
  85. }
  86. type ColumnKind int
  87. const (
  88. ColumnUnkownKind ColumnKind = iota
  89. ColumnPartitionKey
  90. ColumnClusteringKey
  91. ColumnRegular
  92. ColumnCompact
  93. ColumnStatic
  94. )
  95. func (c ColumnKind) String() string {
  96. switch c {
  97. case ColumnPartitionKey:
  98. return "partition_key"
  99. case ColumnClusteringKey:
  100. return "clustering_key"
  101. case ColumnRegular:
  102. return "regular"
  103. case ColumnCompact:
  104. return "compact"
  105. case ColumnStatic:
  106. return "static"
  107. default:
  108. return fmt.Sprintf("unknown_column_%d", c)
  109. }
  110. }
  111. func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
  112. if typ.Type() != TypeVarchar {
  113. return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
  114. }
  115. kind, err := columnKindFromSchema(string(p))
  116. if err != nil {
  117. return err
  118. }
  119. *c = kind
  120. return nil
  121. }
  122. func columnKindFromSchema(kind string) (ColumnKind, error) {
  123. switch kind {
  124. case "partition_key":
  125. return ColumnPartitionKey, nil
  126. case "clustering_key", "clustering":
  127. return ColumnClusteringKey, nil
  128. case "regular":
  129. return ColumnRegular, nil
  130. case "compact_value":
  131. return ColumnCompact, nil
  132. case "static":
  133. return ColumnStatic, nil
  134. default:
  135. return -1, fmt.Errorf("unknown column kind: %q", kind)
  136. }
  137. }
  138. // default alias values
  139. const (
  140. DEFAULT_KEY_ALIAS = "key"
  141. DEFAULT_COLUMN_ALIAS = "column"
  142. DEFAULT_VALUE_ALIAS = "value"
  143. )
  144. // queries the cluster for schema information for a specific keyspace
  145. type schemaDescriber struct {
  146. session *Session
  147. mu sync.Mutex
  148. cache map[string]*KeyspaceMetadata
  149. }
  150. // creates a session bound schema describer which will query and cache
  151. // keyspace metadata
  152. func newSchemaDescriber(session *Session) *schemaDescriber {
  153. return &schemaDescriber{
  154. session: session,
  155. cache: map[string]*KeyspaceMetadata{},
  156. }
  157. }
  158. // returns the cached KeyspaceMetadata held by the describer for the named
  159. // keyspace.
  160. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
  161. s.mu.Lock()
  162. defer s.mu.Unlock()
  163. metadata, found := s.cache[keyspaceName]
  164. if !found {
  165. // refresh the cache for this keyspace
  166. err := s.refreshSchema(keyspaceName)
  167. if err != nil {
  168. return nil, err
  169. }
  170. metadata = s.cache[keyspaceName]
  171. }
  172. return metadata, nil
  173. }
  174. // clears the already cached keyspace metadata
  175. func (s *schemaDescriber) clearSchema(keyspaceName string) {
  176. s.mu.Lock()
  177. defer s.mu.Unlock()
  178. delete(s.cache, keyspaceName)
  179. }
  180. // forcibly updates the current KeyspaceMetadata held by the schema describer
  181. // for a given named keyspace.
  182. func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
  183. var err error
  184. // query the system keyspace for schema data
  185. // TODO retrieve concurrently
  186. keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
  187. if err != nil {
  188. return err
  189. }
  190. tables, err := getTableMetadata(s.session, keyspaceName)
  191. if err != nil {
  192. return err
  193. }
  194. columns, err := getColumnMetadata(s.session, keyspaceName)
  195. if err != nil {
  196. return err
  197. }
  198. functions, err := getFunctionsMetadata(s.session, keyspaceName)
  199. if err != nil {
  200. return err
  201. }
  202. aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
  203. if err != nil {
  204. return err
  205. }
  206. // organize the schema data
  207. compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates)
  208. // update the cache
  209. s.cache[keyspaceName] = keyspace
  210. return nil
  211. }
  212. // "compiles" derived information about keyspace, table, and column metadata
  213. // for a keyspace from the basic queried metadata objects returned by
  214. // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
  215. // Links the metadata objects together and derives the column composition of
  216. // the partition key and clustering key for a table.
  217. func compileMetadata(
  218. protoVersion int,
  219. keyspace *KeyspaceMetadata,
  220. tables []TableMetadata,
  221. columns []ColumnMetadata,
  222. functions []FunctionMetadata,
  223. aggregates []AggregateMetadata,
  224. ) {
  225. keyspace.Tables = make(map[string]*TableMetadata)
  226. for i := range tables {
  227. tables[i].Columns = make(map[string]*ColumnMetadata)
  228. keyspace.Tables[tables[i].Name] = &tables[i]
  229. }
  230. keyspace.Functions = make(map[string]*FunctionMetadata, len(functions))
  231. for i := range functions {
  232. keyspace.Functions[functions[i].Name] = &functions[i]
  233. }
  234. keyspace.Aggregates = make(map[string]*AggregateMetadata, len(aggregates))
  235. for _, aggregate := range aggregates {
  236. aggregate.FinalFunc = *keyspace.Functions[aggregate.finalFunc]
  237. aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc]
  238. keyspace.Aggregates[aggregate.Name] = &aggregate
  239. }
  240. // add columns from the schema data
  241. for i := range columns {
  242. col := &columns[i]
  243. // decode the validator for TypeInfo and order
  244. if col.ClusteringOrder != "" { // Cassandra 3.x+
  245. col.Type = getCassandraType(col.Validator)
  246. col.Order = ASC
  247. if col.ClusteringOrder == "desc" {
  248. col.Order = DESC
  249. }
  250. } else {
  251. validatorParsed := parseType(col.Validator)
  252. col.Type = validatorParsed.types[0]
  253. col.Order = ASC
  254. if validatorParsed.reversed[0] {
  255. col.Order = DESC
  256. }
  257. }
  258. table, ok := keyspace.Tables[col.Table]
  259. if !ok {
  260. // if the schema is being updated we will race between seeing
  261. // the metadata be complete. Potentially we should check for
  262. // schema versions before and after reading the metadata and
  263. // if they dont match try again.
  264. continue
  265. }
  266. table.Columns[col.Name] = col
  267. table.OrderedColumns = append(table.OrderedColumns, col.Name)
  268. }
  269. if protoVersion == protoVersion1 {
  270. compileV1Metadata(tables)
  271. } else {
  272. compileV2Metadata(tables)
  273. }
  274. }
  275. // Compiles derived information from TableMetadata which have had
  276. // ColumnMetadata added already. V1 protocol does not return as much
  277. // column metadata as V2+ (because V1 doesn't support the "type" column in the
  278. // system.schema_columns table) so determining PartitionKey and ClusterColumns
  279. // is more complex.
  280. func compileV1Metadata(tables []TableMetadata) {
  281. for i := range tables {
  282. table := &tables[i]
  283. // decode the key validator
  284. keyValidatorParsed := parseType(table.KeyValidator)
  285. // decode the comparator
  286. comparatorParsed := parseType(table.Comparator)
  287. // the partition key length is the same as the number of types in the
  288. // key validator
  289. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  290. // V1 protocol only returns "regular" columns from
  291. // system.schema_columns (there is no type field for columns)
  292. // so the alias information is used to
  293. // create the partition key and clustering columns
  294. // construct the partition key from the alias
  295. for i := range table.PartitionKey {
  296. var alias string
  297. if len(table.KeyAliases) > i {
  298. alias = table.KeyAliases[i]
  299. } else if i == 0 {
  300. alias = DEFAULT_KEY_ALIAS
  301. } else {
  302. alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
  303. }
  304. column := &ColumnMetadata{
  305. Keyspace: table.Keyspace,
  306. Table: table.Name,
  307. Name: alias,
  308. Type: keyValidatorParsed.types[i],
  309. Kind: ColumnPartitionKey,
  310. ComponentIndex: i,
  311. }
  312. table.PartitionKey[i] = column
  313. table.Columns[alias] = column
  314. }
  315. // determine the number of clustering columns
  316. size := len(comparatorParsed.types)
  317. if comparatorParsed.isComposite {
  318. if len(comparatorParsed.collections) != 0 ||
  319. (len(table.ColumnAliases) == size-1 &&
  320. comparatorParsed.types[size-1].Type() == TypeVarchar) {
  321. size = size - 1
  322. }
  323. } else {
  324. if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
  325. size = 0
  326. }
  327. }
  328. table.ClusteringColumns = make([]*ColumnMetadata, size)
  329. for i := range table.ClusteringColumns {
  330. var alias string
  331. if len(table.ColumnAliases) > i {
  332. alias = table.ColumnAliases[i]
  333. } else if i == 0 {
  334. alias = DEFAULT_COLUMN_ALIAS
  335. } else {
  336. alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
  337. }
  338. order := ASC
  339. if comparatorParsed.reversed[i] {
  340. order = DESC
  341. }
  342. column := &ColumnMetadata{
  343. Keyspace: table.Keyspace,
  344. Table: table.Name,
  345. Name: alias,
  346. Type: comparatorParsed.types[i],
  347. Order: order,
  348. Kind: ColumnClusteringKey,
  349. ComponentIndex: i,
  350. }
  351. table.ClusteringColumns[i] = column
  352. table.Columns[alias] = column
  353. }
  354. if size != len(comparatorParsed.types)-1 {
  355. alias := DEFAULT_VALUE_ALIAS
  356. if len(table.ValueAlias) > 0 {
  357. alias = table.ValueAlias
  358. }
  359. // decode the default validator
  360. defaultValidatorParsed := parseType(table.DefaultValidator)
  361. column := &ColumnMetadata{
  362. Keyspace: table.Keyspace,
  363. Table: table.Name,
  364. Name: alias,
  365. Type: defaultValidatorParsed.types[0],
  366. Kind: ColumnRegular,
  367. }
  368. table.Columns[alias] = column
  369. }
  370. }
  371. }
  372. // The simpler compile case for V2+ protocol
  373. func compileV2Metadata(tables []TableMetadata) {
  374. for i := range tables {
  375. table := &tables[i]
  376. clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
  377. table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
  378. if table.KeyValidator != "" {
  379. keyValidatorParsed := parseType(table.KeyValidator)
  380. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  381. } else { // Cassandra 3.x+
  382. partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
  383. table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
  384. }
  385. for _, columnName := range table.OrderedColumns {
  386. column := table.Columns[columnName]
  387. if column.Kind == ColumnPartitionKey {
  388. table.PartitionKey[column.ComponentIndex] = column
  389. } else if column.Kind == ColumnClusteringKey {
  390. table.ClusteringColumns[column.ComponentIndex] = column
  391. }
  392. }
  393. }
  394. }
  395. // returns the count of coluns with the given "kind" value.
  396. func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
  397. maxComponentIndex := -1
  398. for _, column := range columns {
  399. if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
  400. maxComponentIndex = column.ComponentIndex
  401. }
  402. }
  403. return maxComponentIndex + 1
  404. }
  405. // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
  406. func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
  407. keyspace := &KeyspaceMetadata{Name: keyspaceName}
  408. if session.useSystemSchema { // Cassandra 3.x+
  409. const stmt = `
  410. SELECT durable_writes, replication
  411. FROM system_schema.keyspaces
  412. WHERE keyspace_name = ?`
  413. var replication map[string]string
  414. iter := session.control.query(stmt, keyspaceName)
  415. if iter.NumRows() == 0 {
  416. return nil, ErrKeyspaceDoesNotExist
  417. }
  418. iter.Scan(&keyspace.DurableWrites, &replication)
  419. err := iter.Close()
  420. if err != nil {
  421. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  422. }
  423. keyspace.StrategyClass = replication["class"]
  424. delete(replication, "class")
  425. keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
  426. for k, v := range replication {
  427. keyspace.StrategyOptions[k] = v
  428. }
  429. } else {
  430. const stmt = `
  431. SELECT durable_writes, strategy_class, strategy_options
  432. FROM system.schema_keyspaces
  433. WHERE keyspace_name = ?`
  434. var strategyOptionsJSON []byte
  435. iter := session.control.query(stmt, keyspaceName)
  436. if iter.NumRows() == 0 {
  437. return nil, ErrKeyspaceDoesNotExist
  438. }
  439. iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
  440. err := iter.Close()
  441. if err != nil {
  442. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  443. }
  444. err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
  445. if err != nil {
  446. return nil, fmt.Errorf(
  447. "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
  448. strategyOptionsJSON, keyspace.Name, err,
  449. )
  450. }
  451. }
  452. return keyspace, nil
  453. }
  454. // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
  455. func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
  456. var (
  457. iter *Iter
  458. scan func(iter *Iter, table *TableMetadata) bool
  459. stmt string
  460. keyAliasesJSON []byte
  461. columnAliasesJSON []byte
  462. )
  463. if session.useSystemSchema { // Cassandra 3.x+
  464. stmt = `
  465. SELECT
  466. table_name
  467. FROM system_schema.tables
  468. WHERE keyspace_name = ?`
  469. switchIter := func() *Iter {
  470. iter.Close()
  471. stmt = `
  472. SELECT
  473. view_name
  474. FROM system_schema.views
  475. WHERE keyspace_name = ?`
  476. iter = session.control.query(stmt, keyspaceName)
  477. return iter
  478. }
  479. scan = func(iter *Iter, table *TableMetadata) bool {
  480. r := iter.Scan(
  481. &table.Name,
  482. )
  483. if !r {
  484. iter = switchIter()
  485. if iter != nil {
  486. switchIter = func() *Iter { return nil }
  487. r = iter.Scan(&table.Name)
  488. }
  489. }
  490. return r
  491. }
  492. } else if session.cfg.ProtoVersion == protoVersion1 {
  493. // we have key aliases
  494. stmt = `
  495. SELECT
  496. columnfamily_name,
  497. key_validator,
  498. comparator,
  499. default_validator,
  500. key_aliases,
  501. column_aliases,
  502. value_alias
  503. FROM system.schema_columnfamilies
  504. WHERE keyspace_name = ?`
  505. scan = func(iter *Iter, table *TableMetadata) bool {
  506. return iter.Scan(
  507. &table.Name,
  508. &table.KeyValidator,
  509. &table.Comparator,
  510. &table.DefaultValidator,
  511. &keyAliasesJSON,
  512. &columnAliasesJSON,
  513. &table.ValueAlias,
  514. )
  515. }
  516. } else {
  517. stmt = `
  518. SELECT
  519. columnfamily_name,
  520. key_validator,
  521. comparator,
  522. default_validator
  523. FROM system.schema_columnfamilies
  524. WHERE keyspace_name = ?`
  525. scan = func(iter *Iter, table *TableMetadata) bool {
  526. return iter.Scan(
  527. &table.Name,
  528. &table.KeyValidator,
  529. &table.Comparator,
  530. &table.DefaultValidator,
  531. )
  532. }
  533. }
  534. iter = session.control.query(stmt, keyspaceName)
  535. tables := []TableMetadata{}
  536. table := TableMetadata{Keyspace: keyspaceName}
  537. for scan(iter, &table) {
  538. var err error
  539. // decode the key aliases
  540. if keyAliasesJSON != nil {
  541. table.KeyAliases = []string{}
  542. err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
  543. if err != nil {
  544. iter.Close()
  545. return nil, fmt.Errorf(
  546. "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
  547. keyAliasesJSON, table.Name, err,
  548. )
  549. }
  550. }
  551. // decode the column aliases
  552. if columnAliasesJSON != nil {
  553. table.ColumnAliases = []string{}
  554. err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
  555. if err != nil {
  556. iter.Close()
  557. return nil, fmt.Errorf(
  558. "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
  559. columnAliasesJSON, table.Name, err,
  560. )
  561. }
  562. }
  563. tables = append(tables, table)
  564. table = TableMetadata{Keyspace: keyspaceName}
  565. }
  566. err := iter.Close()
  567. if err != nil && err != ErrNotFound {
  568. return nil, fmt.Errorf("Error querying table schema: %v", err)
  569. }
  570. return tables, nil
  571. }
  572. func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
  573. // V1 does not support the type column, and all returned rows are
  574. // of kind "regular".
  575. const stmt = `
  576. SELECT
  577. columnfamily_name,
  578. column_name,
  579. component_index,
  580. validator,
  581. index_name,
  582. index_type,
  583. index_options
  584. FROM system.schema_columns
  585. WHERE keyspace_name = ?`
  586. var columns []ColumnMetadata
  587. rows := s.control.query(stmt, keyspace).Scanner()
  588. for rows.Next() {
  589. var (
  590. column = ColumnMetadata{Keyspace: keyspace}
  591. indexOptionsJSON []byte
  592. )
  593. // all columns returned by V1 are regular
  594. column.Kind = ColumnRegular
  595. err := rows.Scan(&column.Table,
  596. &column.Name,
  597. &column.ComponentIndex,
  598. &column.Validator,
  599. &column.Index.Name,
  600. &column.Index.Type,
  601. &indexOptionsJSON)
  602. if err != nil {
  603. return nil, err
  604. }
  605. if len(indexOptionsJSON) > 0 {
  606. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  607. if err != nil {
  608. return nil, fmt.Errorf(
  609. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  610. indexOptionsJSON,
  611. column.Name,
  612. column.Table,
  613. err)
  614. }
  615. }
  616. columns = append(columns, column)
  617. }
  618. if err := rows.Err(); err != nil {
  619. return nil, err
  620. }
  621. return columns, nil
  622. }
  623. func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
  624. // V2+ supports the type column
  625. const stmt = `
  626. SELECT
  627. columnfamily_name,
  628. column_name,
  629. component_index,
  630. validator,
  631. index_name,
  632. index_type,
  633. index_options,
  634. type
  635. FROM system.schema_columns
  636. WHERE keyspace_name = ?`
  637. var columns []ColumnMetadata
  638. rows := s.control.query(stmt, keyspace).Scanner()
  639. for rows.Next() {
  640. var (
  641. column = ColumnMetadata{Keyspace: keyspace}
  642. indexOptionsJSON []byte
  643. )
  644. err := rows.Scan(&column.Table,
  645. &column.Name,
  646. &column.ComponentIndex,
  647. &column.Validator,
  648. &column.Index.Name,
  649. &column.Index.Type,
  650. &indexOptionsJSON,
  651. &column.Kind,
  652. )
  653. if err != nil {
  654. return nil, err
  655. }
  656. if len(indexOptionsJSON) > 0 {
  657. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  658. if err != nil {
  659. return nil, fmt.Errorf(
  660. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  661. indexOptionsJSON,
  662. column.Name,
  663. column.Table,
  664. err)
  665. }
  666. }
  667. columns = append(columns, column)
  668. }
  669. if err := rows.Err(); err != nil {
  670. return nil, err
  671. }
  672. return columns, nil
  673. }
  674. func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
  675. const stmt = `
  676. SELECT
  677. table_name,
  678. column_name,
  679. clustering_order,
  680. type,
  681. kind,
  682. position
  683. FROM system_schema.columns
  684. WHERE keyspace_name = ?`
  685. var columns []ColumnMetadata
  686. rows := s.control.query(stmt, keyspace).Scanner()
  687. for rows.Next() {
  688. column := ColumnMetadata{Keyspace: keyspace}
  689. err := rows.Scan(&column.Table,
  690. &column.Name,
  691. &column.ClusteringOrder,
  692. &column.Validator,
  693. &column.Kind,
  694. &column.ComponentIndex,
  695. )
  696. if err != nil {
  697. return nil, err
  698. }
  699. columns = append(columns, column)
  700. }
  701. if err := rows.Err(); err != nil {
  702. return nil, err
  703. }
  704. // TODO(zariel): get column index info from system_schema.indexes
  705. return columns, nil
  706. }
  707. // query for only the column metadata in the specified keyspace from system.schema_columns
  708. func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
  709. var (
  710. columns []ColumnMetadata
  711. err error
  712. )
  713. // Deal with differences in protocol versions
  714. if session.cfg.ProtoVersion == 1 {
  715. columns, err = session.scanColumnMetadataV1(keyspaceName)
  716. } else if session.useSystemSchema { // Cassandra 3.x+
  717. columns, err = session.scanColumnMetadataSystem(keyspaceName)
  718. } else {
  719. columns, err = session.scanColumnMetadataV2(keyspaceName)
  720. }
  721. if err != nil && err != ErrNotFound {
  722. return nil, fmt.Errorf("Error querying column schema: %v", err)
  723. }
  724. return columns, nil
  725. }
  726. func getTypeInfo(t string) TypeInfo {
  727. if strings.HasPrefix(t, apacheCassandraTypePrefix) {
  728. t = apacheToCassandraType(t)
  729. }
  730. return getCassandraType(t)
  731. }
  732. func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) {
  733. if session.cfg.ProtoVersion == protoVersion1 {
  734. return nil, nil
  735. }
  736. var tableName string
  737. if session.useSystemSchema {
  738. tableName = "system_schema.functions"
  739. } else {
  740. tableName = "system.schema_functions"
  741. }
  742. stmt := fmt.Sprintf(`
  743. SELECT
  744. function_name,
  745. argument_types,
  746. argument_names,
  747. body,
  748. called_on_null_input,
  749. language,
  750. return_type
  751. FROM %s
  752. WHERE keyspace_name = ?`, tableName)
  753. var functions []FunctionMetadata
  754. rows := session.control.query(stmt, keyspaceName).Scanner()
  755. for rows.Next() {
  756. function := FunctionMetadata{Keyspace: keyspaceName}
  757. var argumentTypes []string
  758. var returnType string
  759. err := rows.Scan(&function.Name,
  760. &argumentTypes,
  761. &function.ArgumentNames,
  762. &function.Body,
  763. &function.CalledOnNullInput,
  764. &function.Language,
  765. &returnType,
  766. )
  767. if err != nil {
  768. return nil, err
  769. }
  770. function.ReturnType = getTypeInfo(returnType)
  771. function.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
  772. for i, argumentType := range argumentTypes {
  773. function.ArgumentTypes[i] = getTypeInfo(argumentType)
  774. }
  775. functions = append(functions, function)
  776. }
  777. if err := rows.Err(); err != nil {
  778. return nil, err
  779. }
  780. return functions, nil
  781. }
  782. func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) {
  783. if session.cfg.ProtoVersion == protoVersion1 {
  784. return nil, nil
  785. }
  786. var tableName string
  787. if session.useSystemSchema {
  788. tableName = "system_schema.aggregates"
  789. } else {
  790. tableName = "system.schema_aggregates"
  791. }
  792. stmt := fmt.Sprintf(`
  793. SELECT
  794. aggregate_name,
  795. argument_types,
  796. final_func,
  797. initcond,
  798. return_type,
  799. state_func,
  800. state_type
  801. FROM %s
  802. WHERE keyspace_name = ?`, tableName)
  803. var aggregates []AggregateMetadata
  804. rows := session.control.query(stmt, keyspaceName).Scanner()
  805. for rows.Next() {
  806. aggregate := AggregateMetadata{Keyspace: keyspaceName}
  807. var argumentTypes []string
  808. var returnType string
  809. var stateType string
  810. err := rows.Scan(&aggregate.Name,
  811. &argumentTypes,
  812. &aggregate.finalFunc,
  813. &aggregate.InitCond,
  814. &returnType,
  815. &aggregate.stateFunc,
  816. &stateType,
  817. )
  818. if err != nil {
  819. return nil, err
  820. }
  821. aggregate.ReturnType = getTypeInfo(returnType)
  822. aggregate.StateType = getTypeInfo(stateType)
  823. aggregate.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
  824. for i, argumentType := range argumentTypes {
  825. aggregate.ArgumentTypes[i] = getTypeInfo(argumentType)
  826. }
  827. aggregates = append(aggregates, aggregate)
  828. }
  829. if err := rows.Err(); err != nil {
  830. return nil, err
  831. }
  832. return aggregates, nil
  833. }
  834. // type definition parser state
  835. type typeParser struct {
  836. input string
  837. index int
  838. }
  839. // the type definition parser result
  840. type typeParserResult struct {
  841. isComposite bool
  842. types []TypeInfo
  843. reversed []bool
  844. collections map[string]TypeInfo
  845. }
  846. // Parse the type definition used for validator and comparator schema data
  847. func parseType(def string) typeParserResult {
  848. parser := &typeParser{input: def}
  849. return parser.parse()
  850. }
  851. const (
  852. REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
  853. COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
  854. COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
  855. LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
  856. SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
  857. MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
  858. )
  859. // represents a class specification in the type def AST
  860. type typeParserClassNode struct {
  861. name string
  862. params []typeParserParamNode
  863. // this is the segment of the input string that defined this node
  864. input string
  865. }
  866. // represents a class parameter in the type def AST
  867. type typeParserParamNode struct {
  868. name *string
  869. class typeParserClassNode
  870. }
  871. func (t *typeParser) parse() typeParserResult {
  872. // parse the AST
  873. ast, ok := t.parseClassNode()
  874. if !ok {
  875. // treat this is a custom type
  876. return typeParserResult{
  877. isComposite: false,
  878. types: []TypeInfo{
  879. NativeType{
  880. typ: TypeCustom,
  881. custom: t.input,
  882. },
  883. },
  884. reversed: []bool{false},
  885. collections: nil,
  886. }
  887. }
  888. // interpret the AST
  889. if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
  890. count := len(ast.params)
  891. // look for a collections param
  892. last := ast.params[count-1]
  893. collections := map[string]TypeInfo{}
  894. if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
  895. count--
  896. for _, param := range last.class.params {
  897. // decode the name
  898. var name string
  899. decoded, err := hex.DecodeString(*param.name)
  900. if err != nil {
  901. Logger.Printf(
  902. "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
  903. t.input,
  904. *param.name,
  905. err,
  906. )
  907. // just use the provided name
  908. name = *param.name
  909. } else {
  910. name = string(decoded)
  911. }
  912. collections[name] = param.class.asTypeInfo()
  913. }
  914. }
  915. types := make([]TypeInfo, count)
  916. reversed := make([]bool, count)
  917. for i, param := range ast.params[:count] {
  918. class := param.class
  919. reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
  920. if reversed[i] {
  921. class = class.params[0].class
  922. }
  923. types[i] = class.asTypeInfo()
  924. }
  925. return typeParserResult{
  926. isComposite: true,
  927. types: types,
  928. reversed: reversed,
  929. collections: collections,
  930. }
  931. } else {
  932. // not composite, so one type
  933. class := *ast
  934. reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
  935. if reversed {
  936. class = class.params[0].class
  937. }
  938. typeInfo := class.asTypeInfo()
  939. return typeParserResult{
  940. isComposite: false,
  941. types: []TypeInfo{typeInfo},
  942. reversed: []bool{reversed},
  943. }
  944. }
  945. }
  946. func (class *typeParserClassNode) asTypeInfo() TypeInfo {
  947. if strings.HasPrefix(class.name, LIST_TYPE) {
  948. elem := class.params[0].class.asTypeInfo()
  949. return CollectionType{
  950. NativeType: NativeType{
  951. typ: TypeList,
  952. },
  953. Elem: elem,
  954. }
  955. }
  956. if strings.HasPrefix(class.name, SET_TYPE) {
  957. elem := class.params[0].class.asTypeInfo()
  958. return CollectionType{
  959. NativeType: NativeType{
  960. typ: TypeSet,
  961. },
  962. Elem: elem,
  963. }
  964. }
  965. if strings.HasPrefix(class.name, MAP_TYPE) {
  966. key := class.params[0].class.asTypeInfo()
  967. elem := class.params[1].class.asTypeInfo()
  968. return CollectionType{
  969. NativeType: NativeType{
  970. typ: TypeMap,
  971. },
  972. Key: key,
  973. Elem: elem,
  974. }
  975. }
  976. // must be a simple type or custom type
  977. info := NativeType{typ: getApacheCassandraType(class.name)}
  978. if info.typ == TypeCustom {
  979. // add the entire class definition
  980. info.custom = class.input
  981. }
  982. return info
  983. }
  984. // CLASS := ID [ PARAMS ]
  985. func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
  986. t.skipWhitespace()
  987. startIndex := t.index
  988. name, ok := t.nextIdentifier()
  989. if !ok {
  990. return nil, false
  991. }
  992. params, ok := t.parseParamNodes()
  993. if !ok {
  994. return nil, false
  995. }
  996. endIndex := t.index
  997. node = &typeParserClassNode{
  998. name: name,
  999. params: params,
  1000. input: t.input[startIndex:endIndex],
  1001. }
  1002. return node, true
  1003. }
  1004. // PARAMS := "(" PARAM { "," PARAM } ")"
  1005. // PARAM := [ PARAM_NAME ":" ] CLASS
  1006. // PARAM_NAME := ID
  1007. func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
  1008. t.skipWhitespace()
  1009. // the params are optional
  1010. if t.index == len(t.input) || t.input[t.index] != '(' {
  1011. return nil, true
  1012. }
  1013. params = []typeParserParamNode{}
  1014. // consume the '('
  1015. t.index++
  1016. t.skipWhitespace()
  1017. for t.input[t.index] != ')' {
  1018. // look for a named param, but if no colon, then we want to backup
  1019. backupIndex := t.index
  1020. // name will be a hex encoded version of a utf-8 string
  1021. name, ok := t.nextIdentifier()
  1022. if !ok {
  1023. return nil, false
  1024. }
  1025. hasName := true
  1026. // TODO handle '=>' used for DynamicCompositeType
  1027. t.skipWhitespace()
  1028. if t.input[t.index] == ':' {
  1029. // there is a name for this parameter
  1030. // consume the ':'
  1031. t.index++
  1032. t.skipWhitespace()
  1033. } else {
  1034. // no name, backup
  1035. hasName = false
  1036. t.index = backupIndex
  1037. }
  1038. // parse the next full parameter
  1039. classNode, ok := t.parseClassNode()
  1040. if !ok {
  1041. return nil, false
  1042. }
  1043. if hasName {
  1044. params = append(
  1045. params,
  1046. typeParserParamNode{name: &name, class: *classNode},
  1047. )
  1048. } else {
  1049. params = append(
  1050. params,
  1051. typeParserParamNode{class: *classNode},
  1052. )
  1053. }
  1054. t.skipWhitespace()
  1055. if t.input[t.index] == ',' {
  1056. // consume the comma
  1057. t.index++
  1058. t.skipWhitespace()
  1059. }
  1060. }
  1061. // consume the ')'
  1062. t.index++
  1063. return params, true
  1064. }
  1065. func (t *typeParser) skipWhitespace() {
  1066. for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
  1067. t.index++
  1068. }
  1069. }
  1070. func isWhitespaceChar(c byte) bool {
  1071. return c == ' ' || c == '\n' || c == '\t'
  1072. }
  1073. // ID := LETTER { LETTER }
  1074. // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
  1075. func (t *typeParser) nextIdentifier() (id string, found bool) {
  1076. startIndex := t.index
  1077. for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
  1078. t.index++
  1079. }
  1080. if startIndex == t.index {
  1081. return "", false
  1082. }
  1083. return t.input[startIndex:t.index], true
  1084. }
  1085. func isIdentifierChar(c byte) bool {
  1086. return (c >= '0' && c <= '9') ||
  1087. (c >= 'a' && c <= 'z') ||
  1088. (c >= 'A' && c <= 'Z') ||
  1089. c == '-' ||
  1090. c == '+' ||
  1091. c == '.' ||
  1092. c == '_' ||
  1093. c == '&'
  1094. }