metadata.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. // schema metadata for a keyspace
  14. type KeyspaceMetadata struct {
  15. Name string
  16. DurableWrites bool
  17. StrategyClass string
  18. StrategyOptions map[string]interface{}
  19. Tables map[string]*TableMetadata
  20. Functions map[string]*FunctionMetadata
  21. Aggregates map[string]*AggregateMetadata
  22. Views map[string]*ViewMetadata
  23. }
  24. // schema metadata for a table (a.k.a. column family)
  25. type TableMetadata struct {
  26. Keyspace string
  27. Name string
  28. KeyValidator string
  29. Comparator string
  30. DefaultValidator string
  31. KeyAliases []string
  32. ColumnAliases []string
  33. ValueAlias string
  34. PartitionKey []*ColumnMetadata
  35. ClusteringColumns []*ColumnMetadata
  36. Columns map[string]*ColumnMetadata
  37. OrderedColumns []string
  38. }
  39. // schema metadata for a column
  40. type ColumnMetadata struct {
  41. Keyspace string
  42. Table string
  43. Name string
  44. ComponentIndex int
  45. Kind ColumnKind
  46. Validator string
  47. Type TypeInfo
  48. ClusteringOrder string
  49. Order ColumnOrder
  50. Index ColumnIndexMetadata
  51. }
  52. // FunctionMetadata holds metadata for function constructs
  53. type FunctionMetadata struct {
  54. Keyspace string
  55. Name string
  56. ArgumentTypes []TypeInfo
  57. ArgumentNames []string
  58. Body string
  59. CalledOnNullInput bool
  60. Language string
  61. ReturnType TypeInfo
  62. }
  63. // AggregateMetadata holds metadata for aggregate constructs
  64. type AggregateMetadata struct {
  65. Keyspace string
  66. Name string
  67. ArgumentTypes []TypeInfo
  68. FinalFunc FunctionMetadata
  69. InitCond string
  70. ReturnType TypeInfo
  71. StateFunc FunctionMetadata
  72. StateType TypeInfo
  73. stateFunc string
  74. finalFunc string
  75. }
  76. // ViewMetadata holds the metadata for views.
  77. type ViewMetadata struct {
  78. Keyspace string
  79. Name string
  80. FieldNames []string
  81. FieldTypes []TypeInfo
  82. }
  83. // the ordering of the column with regard to its comparator
  84. type ColumnOrder bool
  85. const (
  86. ASC ColumnOrder = false
  87. DESC = true
  88. )
  89. type ColumnIndexMetadata struct {
  90. Name string
  91. Type string
  92. Options map[string]interface{}
  93. }
  94. type ColumnKind int
  95. const (
  96. ColumnUnkownKind ColumnKind = iota
  97. ColumnPartitionKey
  98. ColumnClusteringKey
  99. ColumnRegular
  100. ColumnCompact
  101. ColumnStatic
  102. )
  103. func (c ColumnKind) String() string {
  104. switch c {
  105. case ColumnPartitionKey:
  106. return "partition_key"
  107. case ColumnClusteringKey:
  108. return "clustering_key"
  109. case ColumnRegular:
  110. return "regular"
  111. case ColumnCompact:
  112. return "compact"
  113. case ColumnStatic:
  114. return "static"
  115. default:
  116. return fmt.Sprintf("unknown_column_%d", c)
  117. }
  118. }
  119. func (c *ColumnKind) UnmarshalCQL(typ TypeInfo, p []byte) error {
  120. if typ.Type() != TypeVarchar {
  121. return unmarshalErrorf("unable to marshall %s into ColumnKind, expected Varchar", typ)
  122. }
  123. kind, err := columnKindFromSchema(string(p))
  124. if err != nil {
  125. return err
  126. }
  127. *c = kind
  128. return nil
  129. }
  130. func columnKindFromSchema(kind string) (ColumnKind, error) {
  131. switch kind {
  132. case "partition_key":
  133. return ColumnPartitionKey, nil
  134. case "clustering_key", "clustering":
  135. return ColumnClusteringKey, nil
  136. case "regular":
  137. return ColumnRegular, nil
  138. case "compact_value":
  139. return ColumnCompact, nil
  140. case "static":
  141. return ColumnStatic, nil
  142. default:
  143. return -1, fmt.Errorf("unknown column kind: %q", kind)
  144. }
  145. }
  146. // default alias values
  147. const (
  148. DEFAULT_KEY_ALIAS = "key"
  149. DEFAULT_COLUMN_ALIAS = "column"
  150. DEFAULT_VALUE_ALIAS = "value"
  151. )
  152. // queries the cluster for schema information for a specific keyspace
  153. type schemaDescriber struct {
  154. session *Session
  155. mu sync.Mutex
  156. cache map[string]*KeyspaceMetadata
  157. }
  158. // creates a session bound schema describer which will query and cache
  159. // keyspace metadata
  160. func newSchemaDescriber(session *Session) *schemaDescriber {
  161. return &schemaDescriber{
  162. session: session,
  163. cache: map[string]*KeyspaceMetadata{},
  164. }
  165. }
  166. // returns the cached KeyspaceMetadata held by the describer for the named
  167. // keyspace.
  168. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
  169. s.mu.Lock()
  170. defer s.mu.Unlock()
  171. metadata, found := s.cache[keyspaceName]
  172. if !found {
  173. // refresh the cache for this keyspace
  174. err := s.refreshSchema(keyspaceName)
  175. if err != nil {
  176. return nil, err
  177. }
  178. metadata = s.cache[keyspaceName]
  179. }
  180. return metadata, nil
  181. }
  182. // clears the already cached keyspace metadata
  183. func (s *schemaDescriber) clearSchema(keyspaceName string) {
  184. s.mu.Lock()
  185. defer s.mu.Unlock()
  186. delete(s.cache, keyspaceName)
  187. }
  188. // forcibly updates the current KeyspaceMetadata held by the schema describer
  189. // for a given named keyspace.
  190. func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
  191. var err error
  192. // query the system keyspace for schema data
  193. // TODO retrieve concurrently
  194. keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
  195. if err != nil {
  196. return err
  197. }
  198. tables, err := getTableMetadata(s.session, keyspaceName)
  199. if err != nil {
  200. return err
  201. }
  202. columns, err := getColumnMetadata(s.session, keyspaceName)
  203. if err != nil {
  204. return err
  205. }
  206. functions, err := getFunctionsMetadata(s.session, keyspaceName)
  207. if err != nil {
  208. return err
  209. }
  210. aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
  211. if err != nil {
  212. return err
  213. }
  214. views, err := getViewsMetadata(s.session, keyspaceName)
  215. if err != nil {
  216. return err
  217. }
  218. // organize the schema data
  219. compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views)
  220. // update the cache
  221. s.cache[keyspaceName] = keyspace
  222. return nil
  223. }
  224. // "compiles" derived information about keyspace, table, and column metadata
  225. // for a keyspace from the basic queried metadata objects returned by
  226. // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
  227. // Links the metadata objects together and derives the column composition of
  228. // the partition key and clustering key for a table.
  229. func compileMetadata(
  230. protoVersion int,
  231. keyspace *KeyspaceMetadata,
  232. tables []TableMetadata,
  233. columns []ColumnMetadata,
  234. functions []FunctionMetadata,
  235. aggregates []AggregateMetadata,
  236. views []ViewMetadata,
  237. ) {
  238. keyspace.Tables = make(map[string]*TableMetadata)
  239. for i := range tables {
  240. tables[i].Columns = make(map[string]*ColumnMetadata)
  241. keyspace.Tables[tables[i].Name] = &tables[i]
  242. }
  243. keyspace.Functions = make(map[string]*FunctionMetadata, len(functions))
  244. for i := range functions {
  245. keyspace.Functions[functions[i].Name] = &functions[i]
  246. }
  247. keyspace.Aggregates = make(map[string]*AggregateMetadata, len(aggregates))
  248. for _, aggregate := range aggregates {
  249. aggregate.FinalFunc = *keyspace.Functions[aggregate.finalFunc]
  250. aggregate.StateFunc = *keyspace.Functions[aggregate.stateFunc]
  251. keyspace.Aggregates[aggregate.Name] = &aggregate
  252. }
  253. keyspace.Views = make(map[string]*ViewMetadata, len(views))
  254. for i := range views {
  255. keyspace.Views[views[i].Name] = &views[i]
  256. }
  257. // add columns from the schema data
  258. for i := range columns {
  259. col := &columns[i]
  260. // decode the validator for TypeInfo and order
  261. if col.ClusteringOrder != "" { // Cassandra 3.x+
  262. col.Type = getCassandraType(col.Validator)
  263. col.Order = ASC
  264. if col.ClusteringOrder == "desc" {
  265. col.Order = DESC
  266. }
  267. } else {
  268. validatorParsed := parseType(col.Validator)
  269. col.Type = validatorParsed.types[0]
  270. col.Order = ASC
  271. if validatorParsed.reversed[0] {
  272. col.Order = DESC
  273. }
  274. }
  275. table, ok := keyspace.Tables[col.Table]
  276. if !ok {
  277. // if the schema is being updated we will race between seeing
  278. // the metadata be complete. Potentially we should check for
  279. // schema versions before and after reading the metadata and
  280. // if they dont match try again.
  281. continue
  282. }
  283. table.Columns[col.Name] = col
  284. table.OrderedColumns = append(table.OrderedColumns, col.Name)
  285. }
  286. if protoVersion == protoVersion1 {
  287. compileV1Metadata(tables)
  288. } else {
  289. compileV2Metadata(tables)
  290. }
  291. }
  292. // Compiles derived information from TableMetadata which have had
  293. // ColumnMetadata added already. V1 protocol does not return as much
  294. // column metadata as V2+ (because V1 doesn't support the "type" column in the
  295. // system.schema_columns table) so determining PartitionKey and ClusterColumns
  296. // is more complex.
  297. func compileV1Metadata(tables []TableMetadata) {
  298. for i := range tables {
  299. table := &tables[i]
  300. // decode the key validator
  301. keyValidatorParsed := parseType(table.KeyValidator)
  302. // decode the comparator
  303. comparatorParsed := parseType(table.Comparator)
  304. // the partition key length is the same as the number of types in the
  305. // key validator
  306. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  307. // V1 protocol only returns "regular" columns from
  308. // system.schema_columns (there is no type field for columns)
  309. // so the alias information is used to
  310. // create the partition key and clustering columns
  311. // construct the partition key from the alias
  312. for i := range table.PartitionKey {
  313. var alias string
  314. if len(table.KeyAliases) > i {
  315. alias = table.KeyAliases[i]
  316. } else if i == 0 {
  317. alias = DEFAULT_KEY_ALIAS
  318. } else {
  319. alias = DEFAULT_KEY_ALIAS + strconv.Itoa(i+1)
  320. }
  321. column := &ColumnMetadata{
  322. Keyspace: table.Keyspace,
  323. Table: table.Name,
  324. Name: alias,
  325. Type: keyValidatorParsed.types[i],
  326. Kind: ColumnPartitionKey,
  327. ComponentIndex: i,
  328. }
  329. table.PartitionKey[i] = column
  330. table.Columns[alias] = column
  331. }
  332. // determine the number of clustering columns
  333. size := len(comparatorParsed.types)
  334. if comparatorParsed.isComposite {
  335. if len(comparatorParsed.collections) != 0 ||
  336. (len(table.ColumnAliases) == size-1 &&
  337. comparatorParsed.types[size-1].Type() == TypeVarchar) {
  338. size = size - 1
  339. }
  340. } else {
  341. if !(len(table.ColumnAliases) != 0 || len(table.Columns) == 0) {
  342. size = 0
  343. }
  344. }
  345. table.ClusteringColumns = make([]*ColumnMetadata, size)
  346. for i := range table.ClusteringColumns {
  347. var alias string
  348. if len(table.ColumnAliases) > i {
  349. alias = table.ColumnAliases[i]
  350. } else if i == 0 {
  351. alias = DEFAULT_COLUMN_ALIAS
  352. } else {
  353. alias = DEFAULT_COLUMN_ALIAS + strconv.Itoa(i+1)
  354. }
  355. order := ASC
  356. if comparatorParsed.reversed[i] {
  357. order = DESC
  358. }
  359. column := &ColumnMetadata{
  360. Keyspace: table.Keyspace,
  361. Table: table.Name,
  362. Name: alias,
  363. Type: comparatorParsed.types[i],
  364. Order: order,
  365. Kind: ColumnClusteringKey,
  366. ComponentIndex: i,
  367. }
  368. table.ClusteringColumns[i] = column
  369. table.Columns[alias] = column
  370. }
  371. if size != len(comparatorParsed.types)-1 {
  372. alias := DEFAULT_VALUE_ALIAS
  373. if len(table.ValueAlias) > 0 {
  374. alias = table.ValueAlias
  375. }
  376. // decode the default validator
  377. defaultValidatorParsed := parseType(table.DefaultValidator)
  378. column := &ColumnMetadata{
  379. Keyspace: table.Keyspace,
  380. Table: table.Name,
  381. Name: alias,
  382. Type: defaultValidatorParsed.types[0],
  383. Kind: ColumnRegular,
  384. }
  385. table.Columns[alias] = column
  386. }
  387. }
  388. }
  389. // The simpler compile case for V2+ protocol
  390. func compileV2Metadata(tables []TableMetadata) {
  391. for i := range tables {
  392. table := &tables[i]
  393. clusteringColumnCount := componentColumnCountOfType(table.Columns, ColumnClusteringKey)
  394. table.ClusteringColumns = make([]*ColumnMetadata, clusteringColumnCount)
  395. if table.KeyValidator != "" {
  396. keyValidatorParsed := parseType(table.KeyValidator)
  397. table.PartitionKey = make([]*ColumnMetadata, len(keyValidatorParsed.types))
  398. } else { // Cassandra 3.x+
  399. partitionKeyCount := componentColumnCountOfType(table.Columns, ColumnPartitionKey)
  400. table.PartitionKey = make([]*ColumnMetadata, partitionKeyCount)
  401. }
  402. for _, columnName := range table.OrderedColumns {
  403. column := table.Columns[columnName]
  404. if column.Kind == ColumnPartitionKey {
  405. table.PartitionKey[column.ComponentIndex] = column
  406. } else if column.Kind == ColumnClusteringKey {
  407. table.ClusteringColumns[column.ComponentIndex] = column
  408. }
  409. }
  410. }
  411. }
  412. // returns the count of coluns with the given "kind" value.
  413. func componentColumnCountOfType(columns map[string]*ColumnMetadata, kind ColumnKind) int {
  414. maxComponentIndex := -1
  415. for _, column := range columns {
  416. if column.Kind == kind && column.ComponentIndex > maxComponentIndex {
  417. maxComponentIndex = column.ComponentIndex
  418. }
  419. }
  420. return maxComponentIndex + 1
  421. }
  422. // query only for the keyspace metadata for the specified keyspace from system.schema_keyspace
  423. func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetadata, error) {
  424. keyspace := &KeyspaceMetadata{Name: keyspaceName}
  425. if session.useSystemSchema { // Cassandra 3.x+
  426. const stmt = `
  427. SELECT durable_writes, replication
  428. FROM system_schema.keyspaces
  429. WHERE keyspace_name = ?`
  430. var replication map[string]string
  431. iter := session.control.query(stmt, keyspaceName)
  432. if iter.NumRows() == 0 {
  433. return nil, ErrKeyspaceDoesNotExist
  434. }
  435. iter.Scan(&keyspace.DurableWrites, &replication)
  436. err := iter.Close()
  437. if err != nil {
  438. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  439. }
  440. keyspace.StrategyClass = replication["class"]
  441. delete(replication, "class")
  442. keyspace.StrategyOptions = make(map[string]interface{}, len(replication))
  443. for k, v := range replication {
  444. keyspace.StrategyOptions[k] = v
  445. }
  446. } else {
  447. const stmt = `
  448. SELECT durable_writes, strategy_class, strategy_options
  449. FROM system.schema_keyspaces
  450. WHERE keyspace_name = ?`
  451. var strategyOptionsJSON []byte
  452. iter := session.control.query(stmt, keyspaceName)
  453. if iter.NumRows() == 0 {
  454. return nil, ErrKeyspaceDoesNotExist
  455. }
  456. iter.Scan(&keyspace.DurableWrites, &keyspace.StrategyClass, &strategyOptionsJSON)
  457. err := iter.Close()
  458. if err != nil {
  459. return nil, fmt.Errorf("Error querying keyspace schema: %v", err)
  460. }
  461. err = json.Unmarshal(strategyOptionsJSON, &keyspace.StrategyOptions)
  462. if err != nil {
  463. return nil, fmt.Errorf(
  464. "Invalid JSON value '%s' as strategy_options for in keyspace '%s': %v",
  465. strategyOptionsJSON, keyspace.Name, err,
  466. )
  467. }
  468. }
  469. return keyspace, nil
  470. }
  471. // query for only the table metadata in the specified keyspace from system.schema_columnfamilies
  472. func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, error) {
  473. var (
  474. iter *Iter
  475. scan func(iter *Iter, table *TableMetadata) bool
  476. stmt string
  477. keyAliasesJSON []byte
  478. columnAliasesJSON []byte
  479. )
  480. if session.useSystemSchema { // Cassandra 3.x+
  481. stmt = `
  482. SELECT
  483. table_name
  484. FROM system_schema.tables
  485. WHERE keyspace_name = ?`
  486. switchIter := func() *Iter {
  487. iter.Close()
  488. stmt = `
  489. SELECT
  490. view_name
  491. FROM system_schema.views
  492. WHERE keyspace_name = ?`
  493. iter = session.control.query(stmt, keyspaceName)
  494. return iter
  495. }
  496. scan = func(iter *Iter, table *TableMetadata) bool {
  497. r := iter.Scan(
  498. &table.Name,
  499. )
  500. if !r {
  501. iter = switchIter()
  502. if iter != nil {
  503. switchIter = func() *Iter { return nil }
  504. r = iter.Scan(&table.Name)
  505. }
  506. }
  507. return r
  508. }
  509. } else if session.cfg.ProtoVersion == protoVersion1 {
  510. // we have key aliases
  511. stmt = `
  512. SELECT
  513. columnfamily_name,
  514. key_validator,
  515. comparator,
  516. default_validator,
  517. key_aliases,
  518. column_aliases,
  519. value_alias
  520. FROM system.schema_columnfamilies
  521. WHERE keyspace_name = ?`
  522. scan = func(iter *Iter, table *TableMetadata) bool {
  523. return iter.Scan(
  524. &table.Name,
  525. &table.KeyValidator,
  526. &table.Comparator,
  527. &table.DefaultValidator,
  528. &keyAliasesJSON,
  529. &columnAliasesJSON,
  530. &table.ValueAlias,
  531. )
  532. }
  533. } else {
  534. stmt = `
  535. SELECT
  536. columnfamily_name,
  537. key_validator,
  538. comparator,
  539. default_validator
  540. FROM system.schema_columnfamilies
  541. WHERE keyspace_name = ?`
  542. scan = func(iter *Iter, table *TableMetadata) bool {
  543. return iter.Scan(
  544. &table.Name,
  545. &table.KeyValidator,
  546. &table.Comparator,
  547. &table.DefaultValidator,
  548. )
  549. }
  550. }
  551. iter = session.control.query(stmt, keyspaceName)
  552. tables := []TableMetadata{}
  553. table := TableMetadata{Keyspace: keyspaceName}
  554. for scan(iter, &table) {
  555. var err error
  556. // decode the key aliases
  557. if keyAliasesJSON != nil {
  558. table.KeyAliases = []string{}
  559. err = json.Unmarshal(keyAliasesJSON, &table.KeyAliases)
  560. if err != nil {
  561. iter.Close()
  562. return nil, fmt.Errorf(
  563. "Invalid JSON value '%s' as key_aliases for in table '%s': %v",
  564. keyAliasesJSON, table.Name, err,
  565. )
  566. }
  567. }
  568. // decode the column aliases
  569. if columnAliasesJSON != nil {
  570. table.ColumnAliases = []string{}
  571. err = json.Unmarshal(columnAliasesJSON, &table.ColumnAliases)
  572. if err != nil {
  573. iter.Close()
  574. return nil, fmt.Errorf(
  575. "Invalid JSON value '%s' as column_aliases for in table '%s': %v",
  576. columnAliasesJSON, table.Name, err,
  577. )
  578. }
  579. }
  580. tables = append(tables, table)
  581. table = TableMetadata{Keyspace: keyspaceName}
  582. }
  583. err := iter.Close()
  584. if err != nil && err != ErrNotFound {
  585. return nil, fmt.Errorf("Error querying table schema: %v", err)
  586. }
  587. return tables, nil
  588. }
  589. func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
  590. // V1 does not support the type column, and all returned rows are
  591. // of kind "regular".
  592. const stmt = `
  593. SELECT
  594. columnfamily_name,
  595. column_name,
  596. component_index,
  597. validator,
  598. index_name,
  599. index_type,
  600. index_options
  601. FROM system.schema_columns
  602. WHERE keyspace_name = ?`
  603. var columns []ColumnMetadata
  604. rows := s.control.query(stmt, keyspace).Scanner()
  605. for rows.Next() {
  606. var (
  607. column = ColumnMetadata{Keyspace: keyspace}
  608. indexOptionsJSON []byte
  609. )
  610. // all columns returned by V1 are regular
  611. column.Kind = ColumnRegular
  612. err := rows.Scan(&column.Table,
  613. &column.Name,
  614. &column.ComponentIndex,
  615. &column.Validator,
  616. &column.Index.Name,
  617. &column.Index.Type,
  618. &indexOptionsJSON)
  619. if err != nil {
  620. return nil, err
  621. }
  622. if len(indexOptionsJSON) > 0 {
  623. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  624. if err != nil {
  625. return nil, fmt.Errorf(
  626. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  627. indexOptionsJSON,
  628. column.Name,
  629. column.Table,
  630. err)
  631. }
  632. }
  633. columns = append(columns, column)
  634. }
  635. if err := rows.Err(); err != nil {
  636. return nil, err
  637. }
  638. return columns, nil
  639. }
  640. func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error) {
  641. // V2+ supports the type column
  642. const stmt = `
  643. SELECT
  644. columnfamily_name,
  645. column_name,
  646. component_index,
  647. validator,
  648. index_name,
  649. index_type,
  650. index_options,
  651. type
  652. FROM system.schema_columns
  653. WHERE keyspace_name = ?`
  654. var columns []ColumnMetadata
  655. rows := s.control.query(stmt, keyspace).Scanner()
  656. for rows.Next() {
  657. var (
  658. column = ColumnMetadata{Keyspace: keyspace}
  659. indexOptionsJSON []byte
  660. )
  661. err := rows.Scan(&column.Table,
  662. &column.Name,
  663. &column.ComponentIndex,
  664. &column.Validator,
  665. &column.Index.Name,
  666. &column.Index.Type,
  667. &indexOptionsJSON,
  668. &column.Kind,
  669. )
  670. if err != nil {
  671. return nil, err
  672. }
  673. if len(indexOptionsJSON) > 0 {
  674. err := json.Unmarshal(indexOptionsJSON, &column.Index.Options)
  675. if err != nil {
  676. return nil, fmt.Errorf(
  677. "Invalid JSON value '%s' as index_options for column '%s' in table '%s': %v",
  678. indexOptionsJSON,
  679. column.Name,
  680. column.Table,
  681. err)
  682. }
  683. }
  684. columns = append(columns, column)
  685. }
  686. if err := rows.Err(); err != nil {
  687. return nil, err
  688. }
  689. return columns, nil
  690. }
  691. func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, error) {
  692. const stmt = `
  693. SELECT
  694. table_name,
  695. column_name,
  696. clustering_order,
  697. type,
  698. kind,
  699. position
  700. FROM system_schema.columns
  701. WHERE keyspace_name = ?`
  702. var columns []ColumnMetadata
  703. rows := s.control.query(stmt, keyspace).Scanner()
  704. for rows.Next() {
  705. column := ColumnMetadata{Keyspace: keyspace}
  706. err := rows.Scan(&column.Table,
  707. &column.Name,
  708. &column.ClusteringOrder,
  709. &column.Validator,
  710. &column.Kind,
  711. &column.ComponentIndex,
  712. )
  713. if err != nil {
  714. return nil, err
  715. }
  716. columns = append(columns, column)
  717. }
  718. if err := rows.Err(); err != nil {
  719. return nil, err
  720. }
  721. // TODO(zariel): get column index info from system_schema.indexes
  722. return columns, nil
  723. }
  724. // query for only the column metadata in the specified keyspace from system.schema_columns
  725. func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, error) {
  726. var (
  727. columns []ColumnMetadata
  728. err error
  729. )
  730. // Deal with differences in protocol versions
  731. if session.cfg.ProtoVersion == 1 {
  732. columns, err = session.scanColumnMetadataV1(keyspaceName)
  733. } else if session.useSystemSchema { // Cassandra 3.x+
  734. columns, err = session.scanColumnMetadataSystem(keyspaceName)
  735. } else {
  736. columns, err = session.scanColumnMetadataV2(keyspaceName)
  737. }
  738. if err != nil && err != ErrNotFound {
  739. return nil, fmt.Errorf("Error querying column schema: %v", err)
  740. }
  741. return columns, nil
  742. }
  743. func getTypeInfo(t string) TypeInfo {
  744. if strings.HasPrefix(t, apacheCassandraTypePrefix) {
  745. t = apacheToCassandraType(t)
  746. }
  747. return getCassandraType(t)
  748. }
  749. func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, error) {
  750. if session.cfg.ProtoVersion == protoVersion1 {
  751. return nil, nil
  752. }
  753. var tableName string
  754. if session.useSystemSchema {
  755. tableName = "system_schema.types"
  756. } else {
  757. tableName = "system.schema_usertypes"
  758. }
  759. stmt := fmt.Sprintf(`
  760. SELECT
  761. type_name,
  762. field_names,
  763. field_types
  764. FROM %s
  765. WHERE keyspace_name = ?`, tableName)
  766. var views []ViewMetadata
  767. rows := session.control.query(stmt, keyspaceName).Scanner()
  768. for rows.Next() {
  769. view := ViewMetadata{Keyspace: keyspaceName}
  770. var argumentTypes []string
  771. err := rows.Scan(&view.Name,
  772. &view.FieldNames,
  773. &argumentTypes,
  774. )
  775. if err != nil {
  776. return nil, err
  777. }
  778. view.FieldTypes = make([]TypeInfo, len(argumentTypes))
  779. for i, argumentType := range argumentTypes {
  780. view.FieldTypes[i] = getTypeInfo(argumentType)
  781. }
  782. views = append(views, view)
  783. }
  784. if err := rows.Err(); err != nil {
  785. return nil, err
  786. }
  787. return views, nil
  788. }
  789. func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) {
  790. if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
  791. return nil, nil
  792. }
  793. var tableName string
  794. if session.useSystemSchema {
  795. tableName = "system_schema.functions"
  796. } else {
  797. tableName = "system.schema_functions"
  798. }
  799. stmt := fmt.Sprintf(`
  800. SELECT
  801. function_name,
  802. argument_types,
  803. argument_names,
  804. body,
  805. called_on_null_input,
  806. language,
  807. return_type
  808. FROM %s
  809. WHERE keyspace_name = ?`, tableName)
  810. var functions []FunctionMetadata
  811. rows := session.control.query(stmt, keyspaceName).Scanner()
  812. for rows.Next() {
  813. function := FunctionMetadata{Keyspace: keyspaceName}
  814. var argumentTypes []string
  815. var returnType string
  816. err := rows.Scan(&function.Name,
  817. &argumentTypes,
  818. &function.ArgumentNames,
  819. &function.Body,
  820. &function.CalledOnNullInput,
  821. &function.Language,
  822. &returnType,
  823. )
  824. if err != nil {
  825. return nil, err
  826. }
  827. function.ReturnType = getTypeInfo(returnType)
  828. function.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
  829. for i, argumentType := range argumentTypes {
  830. function.ArgumentTypes[i] = getTypeInfo(argumentType)
  831. }
  832. functions = append(functions, function)
  833. }
  834. if err := rows.Err(); err != nil {
  835. return nil, err
  836. }
  837. return functions, nil
  838. }
  839. func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMetadata, error) {
  840. if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
  841. return nil, nil
  842. }
  843. var tableName string
  844. if session.useSystemSchema {
  845. tableName = "system_schema.aggregates"
  846. } else {
  847. tableName = "system.schema_aggregates"
  848. }
  849. stmt := fmt.Sprintf(`
  850. SELECT
  851. aggregate_name,
  852. argument_types,
  853. final_func,
  854. initcond,
  855. return_type,
  856. state_func,
  857. state_type
  858. FROM %s
  859. WHERE keyspace_name = ?`, tableName)
  860. var aggregates []AggregateMetadata
  861. rows := session.control.query(stmt, keyspaceName).Scanner()
  862. for rows.Next() {
  863. aggregate := AggregateMetadata{Keyspace: keyspaceName}
  864. var argumentTypes []string
  865. var returnType string
  866. var stateType string
  867. err := rows.Scan(&aggregate.Name,
  868. &argumentTypes,
  869. &aggregate.finalFunc,
  870. &aggregate.InitCond,
  871. &returnType,
  872. &aggregate.stateFunc,
  873. &stateType,
  874. )
  875. if err != nil {
  876. return nil, err
  877. }
  878. aggregate.ReturnType = getTypeInfo(returnType)
  879. aggregate.StateType = getTypeInfo(stateType)
  880. aggregate.ArgumentTypes = make([]TypeInfo, len(argumentTypes))
  881. for i, argumentType := range argumentTypes {
  882. aggregate.ArgumentTypes[i] = getTypeInfo(argumentType)
  883. }
  884. aggregates = append(aggregates, aggregate)
  885. }
  886. if err := rows.Err(); err != nil {
  887. return nil, err
  888. }
  889. return aggregates, nil
  890. }
  891. // type definition parser state
  892. type typeParser struct {
  893. input string
  894. index int
  895. }
  896. // the type definition parser result
  897. type typeParserResult struct {
  898. isComposite bool
  899. types []TypeInfo
  900. reversed []bool
  901. collections map[string]TypeInfo
  902. }
  903. // Parse the type definition used for validator and comparator schema data
  904. func parseType(def string) typeParserResult {
  905. parser := &typeParser{input: def}
  906. return parser.parse()
  907. }
  908. const (
  909. REVERSED_TYPE = "org.apache.cassandra.db.marshal.ReversedType"
  910. COMPOSITE_TYPE = "org.apache.cassandra.db.marshal.CompositeType"
  911. COLLECTION_TYPE = "org.apache.cassandra.db.marshal.ColumnToCollectionType"
  912. LIST_TYPE = "org.apache.cassandra.db.marshal.ListType"
  913. SET_TYPE = "org.apache.cassandra.db.marshal.SetType"
  914. MAP_TYPE = "org.apache.cassandra.db.marshal.MapType"
  915. )
  916. // represents a class specification in the type def AST
  917. type typeParserClassNode struct {
  918. name string
  919. params []typeParserParamNode
  920. // this is the segment of the input string that defined this node
  921. input string
  922. }
  923. // represents a class parameter in the type def AST
  924. type typeParserParamNode struct {
  925. name *string
  926. class typeParserClassNode
  927. }
  928. func (t *typeParser) parse() typeParserResult {
  929. // parse the AST
  930. ast, ok := t.parseClassNode()
  931. if !ok {
  932. // treat this is a custom type
  933. return typeParserResult{
  934. isComposite: false,
  935. types: []TypeInfo{
  936. NativeType{
  937. typ: TypeCustom,
  938. custom: t.input,
  939. },
  940. },
  941. reversed: []bool{false},
  942. collections: nil,
  943. }
  944. }
  945. // interpret the AST
  946. if strings.HasPrefix(ast.name, COMPOSITE_TYPE) {
  947. count := len(ast.params)
  948. // look for a collections param
  949. last := ast.params[count-1]
  950. collections := map[string]TypeInfo{}
  951. if strings.HasPrefix(last.class.name, COLLECTION_TYPE) {
  952. count--
  953. for _, param := range last.class.params {
  954. // decode the name
  955. var name string
  956. decoded, err := hex.DecodeString(*param.name)
  957. if err != nil {
  958. Logger.Printf(
  959. "Error parsing type '%s', contains collection name '%s' with an invalid format: %v",
  960. t.input,
  961. *param.name,
  962. err,
  963. )
  964. // just use the provided name
  965. name = *param.name
  966. } else {
  967. name = string(decoded)
  968. }
  969. collections[name] = param.class.asTypeInfo()
  970. }
  971. }
  972. types := make([]TypeInfo, count)
  973. reversed := make([]bool, count)
  974. for i, param := range ast.params[:count] {
  975. class := param.class
  976. reversed[i] = strings.HasPrefix(class.name, REVERSED_TYPE)
  977. if reversed[i] {
  978. class = class.params[0].class
  979. }
  980. types[i] = class.asTypeInfo()
  981. }
  982. return typeParserResult{
  983. isComposite: true,
  984. types: types,
  985. reversed: reversed,
  986. collections: collections,
  987. }
  988. } else {
  989. // not composite, so one type
  990. class := *ast
  991. reversed := strings.HasPrefix(class.name, REVERSED_TYPE)
  992. if reversed {
  993. class = class.params[0].class
  994. }
  995. typeInfo := class.asTypeInfo()
  996. return typeParserResult{
  997. isComposite: false,
  998. types: []TypeInfo{typeInfo},
  999. reversed: []bool{reversed},
  1000. }
  1001. }
  1002. }
  1003. func (class *typeParserClassNode) asTypeInfo() TypeInfo {
  1004. if strings.HasPrefix(class.name, LIST_TYPE) {
  1005. elem := class.params[0].class.asTypeInfo()
  1006. return CollectionType{
  1007. NativeType: NativeType{
  1008. typ: TypeList,
  1009. },
  1010. Elem: elem,
  1011. }
  1012. }
  1013. if strings.HasPrefix(class.name, SET_TYPE) {
  1014. elem := class.params[0].class.asTypeInfo()
  1015. return CollectionType{
  1016. NativeType: NativeType{
  1017. typ: TypeSet,
  1018. },
  1019. Elem: elem,
  1020. }
  1021. }
  1022. if strings.HasPrefix(class.name, MAP_TYPE) {
  1023. key := class.params[0].class.asTypeInfo()
  1024. elem := class.params[1].class.asTypeInfo()
  1025. return CollectionType{
  1026. NativeType: NativeType{
  1027. typ: TypeMap,
  1028. },
  1029. Key: key,
  1030. Elem: elem,
  1031. }
  1032. }
  1033. // must be a simple type or custom type
  1034. info := NativeType{typ: getApacheCassandraType(class.name)}
  1035. if info.typ == TypeCustom {
  1036. // add the entire class definition
  1037. info.custom = class.input
  1038. }
  1039. return info
  1040. }
  1041. // CLASS := ID [ PARAMS ]
  1042. func (t *typeParser) parseClassNode() (node *typeParserClassNode, ok bool) {
  1043. t.skipWhitespace()
  1044. startIndex := t.index
  1045. name, ok := t.nextIdentifier()
  1046. if !ok {
  1047. return nil, false
  1048. }
  1049. params, ok := t.parseParamNodes()
  1050. if !ok {
  1051. return nil, false
  1052. }
  1053. endIndex := t.index
  1054. node = &typeParserClassNode{
  1055. name: name,
  1056. params: params,
  1057. input: t.input[startIndex:endIndex],
  1058. }
  1059. return node, true
  1060. }
  1061. // PARAMS := "(" PARAM { "," PARAM } ")"
  1062. // PARAM := [ PARAM_NAME ":" ] CLASS
  1063. // PARAM_NAME := ID
  1064. func (t *typeParser) parseParamNodes() (params []typeParserParamNode, ok bool) {
  1065. t.skipWhitespace()
  1066. // the params are optional
  1067. if t.index == len(t.input) || t.input[t.index] != '(' {
  1068. return nil, true
  1069. }
  1070. params = []typeParserParamNode{}
  1071. // consume the '('
  1072. t.index++
  1073. t.skipWhitespace()
  1074. for t.input[t.index] != ')' {
  1075. // look for a named param, but if no colon, then we want to backup
  1076. backupIndex := t.index
  1077. // name will be a hex encoded version of a utf-8 string
  1078. name, ok := t.nextIdentifier()
  1079. if !ok {
  1080. return nil, false
  1081. }
  1082. hasName := true
  1083. // TODO handle '=>' used for DynamicCompositeType
  1084. t.skipWhitespace()
  1085. if t.input[t.index] == ':' {
  1086. // there is a name for this parameter
  1087. // consume the ':'
  1088. t.index++
  1089. t.skipWhitespace()
  1090. } else {
  1091. // no name, backup
  1092. hasName = false
  1093. t.index = backupIndex
  1094. }
  1095. // parse the next full parameter
  1096. classNode, ok := t.parseClassNode()
  1097. if !ok {
  1098. return nil, false
  1099. }
  1100. if hasName {
  1101. params = append(
  1102. params,
  1103. typeParserParamNode{name: &name, class: *classNode},
  1104. )
  1105. } else {
  1106. params = append(
  1107. params,
  1108. typeParserParamNode{class: *classNode},
  1109. )
  1110. }
  1111. t.skipWhitespace()
  1112. if t.input[t.index] == ',' {
  1113. // consume the comma
  1114. t.index++
  1115. t.skipWhitespace()
  1116. }
  1117. }
  1118. // consume the ')'
  1119. t.index++
  1120. return params, true
  1121. }
  1122. func (t *typeParser) skipWhitespace() {
  1123. for t.index < len(t.input) && isWhitespaceChar(t.input[t.index]) {
  1124. t.index++
  1125. }
  1126. }
  1127. func isWhitespaceChar(c byte) bool {
  1128. return c == ' ' || c == '\n' || c == '\t'
  1129. }
  1130. // ID := LETTER { LETTER }
  1131. // LETTER := "0"..."9" | "a"..."z" | "A"..."Z" | "-" | "+" | "." | "_" | "&"
  1132. func (t *typeParser) nextIdentifier() (id string, found bool) {
  1133. startIndex := t.index
  1134. for t.index < len(t.input) && isIdentifierChar(t.input[t.index]) {
  1135. t.index++
  1136. }
  1137. if startIndex == t.index {
  1138. return "", false
  1139. }
  1140. return t.input[startIndex:t.index], true
  1141. }
  1142. func isIdentifierChar(c byte) bool {
  1143. return (c >= '0' && c <= '9') ||
  1144. (c >= 'a' && c <= 'z') ||
  1145. (c >= 'A' && c <= 'Z') ||
  1146. c == '-' ||
  1147. c == '+' ||
  1148. c == '.' ||
  1149. c == '_' ||
  1150. c == '&'
  1151. }