|
|
@@ -196,9 +196,10 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
|
|
|
}
|
|
|
|
|
|
// returns routing key indexes and type info
|
|
|
-func (s *Session) routingKeyInfo(stmt string) *routingKeyInfo {
|
|
|
+func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
|
|
|
s.routingKeyInfoCache.mu.Lock()
|
|
|
cacheKey := s.cfg.Keyspace + stmt
|
|
|
+
|
|
|
entry, cached := s.routingKeyInfoCache.lru.Get(cacheKey)
|
|
|
if cached {
|
|
|
// done accessing the cache
|
|
|
@@ -206,15 +207,15 @@ func (s *Session) routingKeyInfo(stmt string) *routingKeyInfo {
|
|
|
// the entry is an inflight struct similiar to that used by
|
|
|
// Conn to prepare statements
|
|
|
inflight := entry.(*inflightCachedEntry)
|
|
|
+
|
|
|
// wait for any inflight work
|
|
|
inflight.wg.Wait()
|
|
|
|
|
|
if inflight.err != nil {
|
|
|
- // return nil for any error
|
|
|
- return nil
|
|
|
+ return nil, inflight.err
|
|
|
}
|
|
|
|
|
|
- return inflight.value.(*routingKeyInfo)
|
|
|
+ return inflight.value.(*routingKeyInfo), nil
|
|
|
}
|
|
|
|
|
|
// create a new inflight entry while the data is created
|
|
|
@@ -229,68 +230,78 @@ func (s *Session) routingKeyInfo(stmt string) *routingKeyInfo {
|
|
|
|
|
|
// get the query info for the statement
|
|
|
conn := s.Pool.Pick(nil)
|
|
|
- if conn != nil {
|
|
|
- queryInfo, inflight.err = conn.prepareStatement(stmt, s.trace)
|
|
|
- if inflight.err == nil {
|
|
|
- if len(queryInfo.Args) == 0 {
|
|
|
- // no arguments, no routing key, and no error
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- // get the table metadata
|
|
|
- table := queryInfo.Args[0].Table
|
|
|
- var keyspaceMetadata *KeyspaceMetadata
|
|
|
- keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
|
|
|
- if inflight.err == nil {
|
|
|
- tableMetadata, found := keyspaceMetadata.Tables[table]
|
|
|
- if !found {
|
|
|
- inflight.err = ErrNoMetadata
|
|
|
- }
|
|
|
-
|
|
|
- partitionKey = tableMetadata.PartitionKey
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
+ if conn == nil {
|
|
|
// no connections
|
|
|
inflight.err = ErrNoConnections
|
|
|
+ // don't cache this error
|
|
|
+ s.routingKeyInfoCache.Remove(cacheKey)
|
|
|
+ return nil, inflight.err
|
|
|
}
|
|
|
|
|
|
+ queryInfo, inflight.err = conn.prepareStatement(stmt, nil)
|
|
|
if inflight.err != nil {
|
|
|
- // remove from the cache
|
|
|
- s.routingKeyInfoCache.mu.Lock()
|
|
|
- s.routingKeyInfoCache.lru.Remove(cacheKey)
|
|
|
- s.routingKeyInfoCache.mu.Unlock()
|
|
|
- return nil
|
|
|
+ // don't cache this error
|
|
|
+ s.routingKeyInfoCache.Remove(cacheKey)
|
|
|
+ return nil, inflight.err
|
|
|
+ }
|
|
|
+ if len(queryInfo.Args) == 0 {
|
|
|
+ // no arguments, no routing key, and no error
|
|
|
+ return nil, nil
|
|
|
}
|
|
|
|
|
|
+ // get the table metadata
|
|
|
+ table := queryInfo.Args[0].Table
|
|
|
+ var keyspaceMetadata *KeyspaceMetadata
|
|
|
+ keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
|
|
|
+ if inflight.err != nil {
|
|
|
+ // don't cache this error
|
|
|
+ s.routingKeyInfoCache.Remove(cacheKey)
|
|
|
+ return nil, inflight.err
|
|
|
+ }
|
|
|
+
|
|
|
+ tableMetadata, found := keyspaceMetadata.Tables[table]
|
|
|
+ if !found {
|
|
|
+ // unlikely that the statement could be prepared and the metadata for
|
|
|
+ // the table couldn't be found, but this may indicate either a bug
|
|
|
+ // in the metadata code, or that the table was just dropped.
|
|
|
+ inflight.err = ErrNoMetadata
|
|
|
+ // don't cache this error
|
|
|
+ s.routingKeyInfoCache.Remove(cacheKey)
|
|
|
+ return nil, inflight.err
|
|
|
+ }
|
|
|
+
|
|
|
+ partitionKey = tableMetadata.PartitionKey
|
|
|
+
|
|
|
size := len(partitionKey)
|
|
|
routingKeyInfo := &routingKeyInfo{
|
|
|
indexes: make([]int, size),
|
|
|
types: make([]*TypeInfo, size),
|
|
|
}
|
|
|
- for i, keyColumn := range partitionKey {
|
|
|
- routingKeyInfo.indexes[i] = -1
|
|
|
+ for keyIndex, keyColumn := range partitionKey {
|
|
|
+ // set an indicator for checking if the mapping is missing
|
|
|
+ routingKeyInfo.indexes[keyIndex] = -1
|
|
|
+
|
|
|
// find the column in the query info
|
|
|
- for j, boundColumn := range queryInfo.Args {
|
|
|
+ for argIndex, boundColumn := range queryInfo.Args {
|
|
|
if keyColumn.Name == boundColumn.Name {
|
|
|
- // there may be many such columns, pick the first
|
|
|
- routingKeyInfo.indexes[i] = j
|
|
|
- routingKeyInfo.types[i] = boundColumn.TypeInfo
|
|
|
+ // there may be many such bound columns, pick the first
|
|
|
+ routingKeyInfo.indexes[keyIndex] = argIndex
|
|
|
+ routingKeyInfo.types[keyIndex] = boundColumn.TypeInfo
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if routingKeyInfo.indexes[i] == -1 {
|
|
|
+ if routingKeyInfo.indexes[keyIndex] == -1 {
|
|
|
// missing a routing key column mapping
|
|
|
- // no error, but cache a nil result
|
|
|
- return nil
|
|
|
+ // no routing key, and no error
|
|
|
+ return nil, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// cache this result
|
|
|
inflight.value = routingKeyInfo
|
|
|
|
|
|
- return routingKeyInfo
|
|
|
+ return routingKeyInfo, nil
|
|
|
}
|
|
|
|
|
|
// ExecuteBatch executes a batch operation and returns nil if successful
|
|
|
@@ -406,16 +417,21 @@ func (q *Query) RoutingKey(routingKey []byte) *Query {
|
|
|
// GetRoutingKey gets the routing key to use for routing this query. If
|
|
|
// a routing key has not been explicitly set, then the routing key will
|
|
|
// be constructed if possible using the keyspace's schema and the query
|
|
|
-// info for this query statement.
|
|
|
-func (q *Query) GetRoutingKey() []byte {
|
|
|
+// info for this query statement. If the routing key cannot be determined
|
|
|
+// then nil will be returned with no error. On any error condition,
|
|
|
+// an error description will be returned.
|
|
|
+func (q *Query) GetRoutingKey() ([]byte, error) {
|
|
|
if q.routingKey != nil {
|
|
|
- return q.routingKey
|
|
|
+ return q.routingKey, nil
|
|
|
}
|
|
|
|
|
|
// try to determine the routing key
|
|
|
- routingKeyInfo := q.session.routingKeyInfo(q.stmt)
|
|
|
+ routingKeyInfo, err := q.session.routingKeyInfo(q.stmt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
if routingKeyInfo == nil {
|
|
|
- return nil
|
|
|
+ return nil, nil
|
|
|
}
|
|
|
|
|
|
if len(routingKeyInfo.indexes) == 1 {
|
|
|
@@ -425,9 +441,9 @@ func (q *Query) GetRoutingKey() []byte {
|
|
|
q.values[routingKeyInfo.indexes[0]],
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- return routingKey
|
|
|
+ return routingKey, nil
|
|
|
}
|
|
|
|
|
|
// composite routing key
|
|
|
@@ -438,14 +454,14 @@ func (q *Query) GetRoutingKey() []byte {
|
|
|
q.values[routingKeyInfo.indexes[i]],
|
|
|
)
|
|
|
if err != nil {
|
|
|
- return nil
|
|
|
+ return nil, err
|
|
|
}
|
|
|
binary.Write(buf, binary.BigEndian, int16(len(encoded)))
|
|
|
buf.Write(encoded)
|
|
|
buf.WriteByte(0x00)
|
|
|
}
|
|
|
routingKey := buf.Bytes()
|
|
|
- return routingKey
|
|
|
+ return routingKey, nil
|
|
|
}
|
|
|
|
|
|
func (q *Query) shouldPrepare() bool {
|
|
|
@@ -782,15 +798,21 @@ type routingKeyInfo struct {
|
|
|
types []*TypeInfo
|
|
|
}
|
|
|
|
|
|
+func (r *routingKeyInfoLRU) Remove(key string) {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.lru.Remove(key)
|
|
|
+ r.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
//Max adjusts the maximum size of the cache and cleans up the oldest records if
|
|
|
//the new max is lower than the previous value. Not concurrency safe.
|
|
|
-func (q *routingKeyInfoLRU) Max(max int) {
|
|
|
- q.mu.Lock()
|
|
|
- for q.lru.Len() > max {
|
|
|
- q.lru.RemoveOldest()
|
|
|
+func (r *routingKeyInfoLRU) Max(max int) {
|
|
|
+ r.mu.Lock()
|
|
|
+ for r.lru.Len() > max {
|
|
|
+ r.lru.RemoveOldest()
|
|
|
}
|
|
|
- q.lru.MaxEntries = max
|
|
|
- q.mu.Unlock()
|
|
|
+ r.lru.MaxEntries = max
|
|
|
+ r.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
type inflightCachedEntry struct {
|