|
|
@@ -70,8 +70,6 @@ const (
|
|
|
PARTITION_KEY = "partition_key"
|
|
|
CLUSTERING_KEY = "clustering_key"
|
|
|
REGULAR = "regular"
|
|
|
- COMPACT_VALUE = "compact_value"
|
|
|
- STATIC = "static"
|
|
|
)
|
|
|
|
|
|
// default alias values
|
|
|
@@ -83,47 +81,53 @@ const (
|
|
|
|
|
|
// queries the cluster for schema information for a specific keyspace
|
|
|
type schemaDescriber struct {
|
|
|
- session *Session
|
|
|
- mu sync.Mutex
|
|
|
- lazyInit sync.Once
|
|
|
+ session *Session
|
|
|
+ mu sync.Mutex
|
|
|
|
|
|
- current *KeyspaceMetadata
|
|
|
- err error
|
|
|
+ cache map[string]*KeyspaceMetadata
|
|
|
}
|
|
|
|
|
|
-func (s *schemaDescriber) init() {
|
|
|
- s.err = s.refreshSchema()
|
|
|
+func newSchemaDescriber(session *Session) *schemaDescriber {
|
|
|
+ return &schemaDescriber{
|
|
|
+ session: session,
|
|
|
+ cache: map[string]*KeyspaceMetadata{},
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (s *schemaDescriber) getSchema() (*KeyspaceMetadata, error) {
|
|
|
- // lazily-initialize the schema the first time
|
|
|
- s.lazyInit.Do(s.init)
|
|
|
+func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
|
|
|
+ s.mu.Lock()
|
|
|
+ defer s.mu.Unlock()
|
|
|
|
|
|
// TODO handle schema change events
|
|
|
|
|
|
- s.mu.Lock()
|
|
|
- defer s.mu.Unlock()
|
|
|
+ metadata, found := s.cache[keyspaceName]
|
|
|
+ if !found {
|
|
|
+ // refresh the cache for this keyspace
|
|
|
+ err := s.refreshSchema(keyspaceName)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
|
|
|
- return s.current, s.err
|
|
|
-}
|
|
|
+ metadata = s.cache[keyspaceName]
|
|
|
+ }
|
|
|
|
|
|
-func (s *schemaDescriber) refreshSchema() error {
|
|
|
- s.mu.Lock()
|
|
|
- defer s.mu.Unlock()
|
|
|
+ return metadata, nil
|
|
|
+}
|
|
|
|
|
|
+func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
|
|
|
var err error
|
|
|
|
|
|
// query the system keyspace for schema data
|
|
|
// TODO retrieve concurrently
|
|
|
- keyspace, err := getKeyspaceMetadata(s.session)
|
|
|
+ keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- tables, err := getTableMetadata(s.session)
|
|
|
+ tables, err := getTableMetadata(s.session, keyspaceName)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- columns, err := getColumnMetadata(s.session)
|
|
|
+ columns, err := getColumnMetadata(s.session, keyspaceName)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -131,9 +135,8 @@ func (s *schemaDescriber) refreshSchema() error {
|
|
|
// organize the schema data
|
|
|
compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns)
|
|
|
|
|
|
- // update the current
|
|
|
- s.current = keyspace
|
|
|
- s.err = nil
|
|
|
+ // update the cache
|
|
|
+ s.cache[keyspaceName] = keyspace
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -315,9 +318,10 @@ func countColumnsOfKind(columns map[string]*ColumnMetadata, kind string) int {
|
|
|
return count
|
|
|
}
|
|
|
|
|
|
-// query only for the keyspace metadata for the session's keyspace
|
|
|
+// query only for the keyspace metadata for the specified keyspace
|
|
|
func getKeyspaceMetadata(
|
|
|
session *Session,
|
|
|
+ keyspaceName string,
|
|
|
) (*KeyspaceMetadata, error) {
|
|
|
query := session.Query(
|
|
|
`
|
|
|
@@ -325,10 +329,10 @@ func getKeyspaceMetadata(
|
|
|
FROM system.schema_keyspaces
|
|
|
WHERE keyspace_name = ?
|
|
|
`,
|
|
|
- session.cfg.Keyspace,
|
|
|
+ keyspaceName,
|
|
|
)
|
|
|
|
|
|
- keyspace := &KeyspaceMetadata{Name: session.cfg.Keyspace}
|
|
|
+ keyspace := &KeyspaceMetadata{Name: keyspaceName}
|
|
|
var strategyOptionsJSON []byte
|
|
|
|
|
|
err := query.Scan(
|
|
|
@@ -351,9 +355,10 @@ func getKeyspaceMetadata(
|
|
|
return keyspace, nil
|
|
|
}
|
|
|
|
|
|
-// query for only the table metadata in the session's keyspace
|
|
|
+// query for only the table metadata in the specified keyspace
|
|
|
func getTableMetadata(
|
|
|
session *Session,
|
|
|
+ keyspaceName string,
|
|
|
) ([]TableMetadata, error) {
|
|
|
query := session.Query(
|
|
|
`
|
|
|
@@ -368,12 +373,12 @@ func getTableMetadata(
|
|
|
FROM system.schema_columnfamilies
|
|
|
WHERE keyspace_name = ?
|
|
|
`,
|
|
|
- session.cfg.Keyspace,
|
|
|
+ keyspaceName,
|
|
|
)
|
|
|
iter := query.Iter()
|
|
|
|
|
|
tables := []TableMetadata{}
|
|
|
- table := TableMetadata{Keyspace: session.cfg.Keyspace}
|
|
|
+ table := TableMetadata{Keyspace: keyspaceName}
|
|
|
|
|
|
var keyAliasesJSON []byte
|
|
|
var columnAliasesJSON []byte
|
|
|
@@ -415,7 +420,7 @@ func getTableMetadata(
|
|
|
}
|
|
|
|
|
|
tables = append(tables, table)
|
|
|
- table = TableMetadata{Keyspace: session.cfg.Keyspace}
|
|
|
+ table = TableMetadata{Keyspace: keyspaceName}
|
|
|
}
|
|
|
|
|
|
err := iter.Close()
|
|
|
@@ -426,8 +431,11 @@ func getTableMetadata(
|
|
|
return tables, nil
|
|
|
}
|
|
|
|
|
|
-// query for only the table metadata in the session's keyspace
|
|
|
-func getColumnMetadata(session *Session) ([]ColumnMetadata, error) {
|
|
|
+// query for only the table metadata in the specified keyspace
|
|
|
+func getColumnMetadata(
|
|
|
+ session *Session,
|
|
|
+ keyspaceName string,
|
|
|
+) ([]ColumnMetadata, error) {
|
|
|
// Deal with differences in protocol versions
|
|
|
var stmt string
|
|
|
var scan func(*Iter, *ColumnMetadata, *[]byte) bool
|
|
|
@@ -498,11 +506,11 @@ func getColumnMetadata(session *Session) ([]ColumnMetadata, error) {
|
|
|
|
|
|
// get the columns metadata
|
|
|
columns := []ColumnMetadata{}
|
|
|
- column := ColumnMetadata{Keyspace: session.cfg.Keyspace}
|
|
|
+ column := ColumnMetadata{Keyspace: keyspaceName}
|
|
|
|
|
|
var indexOptionsJSON []byte
|
|
|
|
|
|
- query := session.Query(stmt, session.cfg.Keyspace)
|
|
|
+ query := session.Query(stmt, keyspaceName)
|
|
|
iter := query.Iter()
|
|
|
|
|
|
for scan(iter, &column, &indexOptionsJSON) {
|
|
|
@@ -524,7 +532,7 @@ func getColumnMetadata(session *Session) ([]ColumnMetadata, error) {
|
|
|
}
|
|
|
|
|
|
columns = append(columns, column)
|
|
|
- column = ColumnMetadata{Keyspace: session.cfg.Keyspace}
|
|
|
+ column = ColumnMetadata{Keyspace: keyspaceName}
|
|
|
}
|
|
|
|
|
|
err := iter.Close()
|