Преглед на файлове

Added routing key computation (with override) to Query

- Added routingKeyInfo(string) function to Session which is a cached calculation of the query
value indexes to routing key indexes with type info
- Added RoutingKey([]byte) function to Query which sets the routing key bytes for the query
- Added GetRoutingKey() []byte which returns the routing key for the query; if the routing
key is not set explicitly with RoutingKey([]byte) then the routing key is constructed using
the routing key info retrieved from Session
Justin Corpron преди 10 години
родител
ревизия
d1eb33572d
променени са 4 файла, в които са добавени 300 реда и са изтрити 37 реда
  1. 62 0
      cassandra_test.go
  2. 31 29
      cluster.go
  3. 9 0
      metadata.go
  4. 198 8
      session.go

+ 62 - 0
cassandra_test.go

@@ -1704,3 +1704,65 @@ func TestKeyspaceMetadata(t *testing.T) {
 		t.Errorf("Expected column index named 'index_metadata' but was '%s'", thirdColumn.Index.Name)
 	}
 }
+
+func TestRoutingKey(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+	if err := createTable(session, "CREATE TABLE test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id,second_id)))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+
+	routingKeyInfo := session.routingKeyInfo("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
+	if routingKeyInfo == nil {
+		t.Fatal("Expected routing key info, but was nil")
+	}
+	if len(routingKeyInfo.indexes) != 1 {
+		t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
+	}
+	if routingKeyInfo.indexes[0] != 1 {
+		t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
+	}
+	query := session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2)
+	routingKey := query.GetRoutingKey()
+	expectedRoutingKey := []byte{0, 0, 0, 2}
+	if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
+		t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
+	}
+
+	// verify the cache is working
+	session.routingKeyInfo("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
+	cacheSize := session.routingKeyInfoCache.lru.Len()
+	if cacheSize != 1 {
+		t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
+	}
+
+	routingKeyInfo = session.routingKeyInfo("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?")
+	if routingKeyInfo == nil {
+		t.Fatal("Expected routing key info, but was nil")
+	}
+	if len(routingKeyInfo.indexes) != 2 {
+		t.Fatalf("Expected routing key indexes length to be 2 but was %d", len(routingKeyInfo.indexes))
+	}
+	if routingKeyInfo.indexes[0] != 1 {
+		t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
+	}
+	if routingKeyInfo.indexes[1] != 0 {
+		t.Errorf("Expected routing key index[1] to be 0 but was %d", routingKeyInfo.indexes[1])
+	}
+	query = session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2)
+	routingKey = query.GetRoutingKey()
+	expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 4, 0, 0, 0, 1, 0}
+	if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
+		t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
+	}
+
+	// verify the cache is working
+	cacheSize = session.routingKeyInfoCache.lru.Len()
+	if cacheSize != 2 {
+		t.Errorf("Expected cache size to be 2 but was %d", cacheSize)
+	}
+}

+ 31 - 29
cluster.go

@@ -55,40 +55,42 @@ type DiscoveryConfig struct {
 // behavior to fit the most common use cases. Applications that requre a
 // different setup must implement their own cluster.
 type ClusterConfig struct {
-	Hosts            []string      // addresses for the initial connections
-	CQLVersion       string        // CQL version (default: 3.0.0)
-	ProtoVersion     int           // version of the native protocol (default: 2)
-	Timeout          time.Duration // connection timeout (default: 600ms)
-	Port             int           // port (default: 9042)
-	Keyspace         string        // initial keyspace (optional)
-	NumConns         int           // number of connections per host (default: 2)
-	NumStreams       int           // number of streams per connection (default: max per protocol, either 128 or 32768)
-	Consistency      Consistency   // default consistency level (default: Quorum)
-	Compressor       Compressor    // compression algorithm (default: nil)
-	Authenticator    Authenticator // authenticator (default: nil)
-	RetryPolicy      RetryPolicy   // Default retry policy to use for queries (default: 0)
-	SocketKeepalive  time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
-	ConnPoolType     NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
-	DiscoverHosts    bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
-	MaxPreparedStmts int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
-	PageSize         int           // Default page size to use for created sessions (default: 0)
-	Discovery        DiscoveryConfig
-	SslOpts          *SslOptions
+	Hosts             []string      // addresses for the initial connections
+	CQLVersion        string        // CQL version (default: 3.0.0)
+	ProtoVersion      int           // version of the native protocol (default: 2)
+	Timeout           time.Duration // connection timeout (default: 600ms)
+	Port              int           // port (default: 9042)
+	Keyspace          string        // initial keyspace (optional)
+	NumConns          int           // number of connections per host (default: 2)
+	NumStreams        int           // number of streams per connection (default: max per protocol, either 128 or 32768)
+	Consistency       Consistency   // default consistency level (default: Quorum)
+	Compressor        Compressor    // compression algorithm (default: nil)
+	Authenticator     Authenticator // authenticator (default: nil)
+	RetryPolicy       RetryPolicy   // Default retry policy to use for queries (default: 0)
+	SocketKeepalive   time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
+	ConnPoolType      NewPoolFunc   // The function used to create the connection pool for the session (default: NewSimplePool)
+	DiscoverHosts     bool          // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
+	MaxPreparedStmts  int           // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
+	MaxRoutingKeyInfo int           // Sets the maximum cache size for query info about statements for each session (default: 1000)
+	PageSize          int           // Default page size to use for created sessions (default: 0)
+	Discovery         DiscoveryConfig
+	SslOpts           *SslOptions
 }
 
 // NewCluster generates a new config for the default cluster implementation.
 func NewCluster(hosts ...string) *ClusterConfig {
 	cfg := &ClusterConfig{
-		Hosts:            hosts,
-		CQLVersion:       "3.0.0",
-		ProtoVersion:     2,
-		Timeout:          600 * time.Millisecond,
-		Port:             9042,
-		NumConns:         2,
-		Consistency:      Quorum,
-		ConnPoolType:     NewSimplePool,
-		DiscoverHosts:    false,
-		MaxPreparedStmts: defaultMaxPreparedStmts,
+		Hosts:             hosts,
+		CQLVersion:        "3.0.0",
+		ProtoVersion:      2,
+		Timeout:           600 * time.Millisecond,
+		Port:              9042,
+		NumConns:          2,
+		Consistency:       Quorum,
+		ConnPoolType:      NewSimplePool,
+		DiscoverHosts:     false,
+		MaxPreparedStmts:  defaultMaxPreparedStmts,
+		MaxRoutingKeyInfo: 1000,
 	}
 	return cfg
 }

+ 9 - 0
metadata.go

@@ -331,6 +331,9 @@ func getKeyspaceMetadata(
 		`,
 		keyspaceName,
 	)
+	// Set a routing key to avoid GetRoutingKey from computing the routing key
+	// TODO use a separate connection (pool) for system keyspace queries.
+	query.RoutingKey([]byte{})
 
 	keyspace := &KeyspaceMetadata{Name: keyspaceName}
 	var strategyOptionsJSON []byte
@@ -375,6 +378,9 @@ func getTableMetadata(
 		`,
 		keyspaceName,
 	)
+	// Set a routing key to avoid GetRoutingKey from computing the routing key
+	// TODO use a separate connection (pool) for system keyspace queries.
+	query.RoutingKey([]byte{})
 	iter := query.Iter()
 
 	tables := []TableMetadata{}
@@ -511,6 +517,9 @@ func getColumnMetadata(
 	var indexOptionsJSON []byte
 
 	query := session.Query(stmt, keyspaceName)
+	// Set a routing key to avoid GetRoutingKey from computing the routing key
+	// TODO use a separate connection (pool) for system keyspace queries.
+	query.RoutingKey([]byte{})
 	iter := query.Iter()
 
 	for scan(iter, &column, &indexOptionsJSON) {

+ 198 - 8
session.go

@@ -5,6 +5,8 @@
 package gocql
 
 import (
+	"bytes"
+	"encoding/binary"
 	"errors"
 	"fmt"
 	"io"
@@ -12,6 +14,8 @@ import (
 	"sync"
 	"time"
 	"unicode"
+
+	"github.com/golang/groupcache/lru"
 )
 
 // Session is the interface used by users to interact with the database.
@@ -24,13 +28,14 @@ import (
 // and automatically sets a default consinstency level on all operations
 // that do not have a consistency level set.
 type Session struct {
-	Pool            ConnectionPool
-	cons            Consistency
-	pageSize        int
-	prefetch        float64
-	schemaDescriber *schemaDescriber
-	trace           Tracer
-	mu              sync.RWMutex
+	Pool                ConnectionPool
+	cons                Consistency
+	pageSize            int
+	prefetch            float64
+	routingKeyInfoCache routingKeyInfoLRU
+	schemaDescriber     *schemaDescriber
+	trace               Tracer
+	mu                  sync.RWMutex
 
 	cfg ClusterConfig
 
@@ -40,7 +45,12 @@ type Session struct {
 
 // NewSession wraps an existing Node.
 func NewSession(p ConnectionPool, c ClusterConfig) *Session {
-	return &Session{Pool: p, cons: c.Consistency, prefetch: 0.25, cfg: c}
+	session := &Session{Pool: p, cons: c.Consistency, prefetch: 0.25, cfg: c}
+
+	// create the query info cache
+	session.routingKeyInfoCache.lru = lru.New(c.MaxRoutingKeyInfo)
+
+	return session
 }
 
 // SetConsistency sets the default consistency level for this session. This
@@ -185,6 +195,104 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
 	return s.schemaDescriber.getSchema(keyspace)
 }
 
+// returns routing key indexes and type info
+func (s *Session) routingKeyInfo(stmt string) *routingKeyInfo {
+	s.routingKeyInfoCache.mu.Lock()
+	cacheKey := s.cfg.Keyspace + stmt
+	entry, cached := s.routingKeyInfoCache.lru.Get(cacheKey)
+	if cached {
+		// done accessing the cache
+		s.routingKeyInfoCache.mu.Unlock()
+		// the entry is an inflight struct similiar to that used by
+		// Conn to prepare statements
+		inflight := entry.(*inflightCachedEntry)
+		// wait for any inflight work
+		inflight.wg.Wait()
+
+		if inflight.err != nil {
+			// return nil for any error
+			return nil
+		}
+
+		return inflight.value.(*routingKeyInfo)
+	}
+
+	// create a new inflight entry while the data is created
+	inflight := new(inflightCachedEntry)
+	inflight.wg.Add(1)
+	defer inflight.wg.Done()
+	s.routingKeyInfoCache.lru.Add(cacheKey, inflight)
+	s.routingKeyInfoCache.mu.Unlock()
+
+	var queryInfo *QueryInfo
+	var partitionKey []*ColumnMetadata
+
+	// get the query info for the statement
+	conn := s.Pool.Pick(nil)
+	if conn != nil {
+		queryInfo, inflight.err = conn.prepareStatement(stmt, s.trace)
+		if inflight.err == nil {
+			if len(queryInfo.Args) == 0 {
+				// no arguments, no routing key, and no error
+				return nil
+			}
+
+			// get the table metadata
+			table := queryInfo.Args[0].Table
+			var keyspaceMetadata *KeyspaceMetadata
+			keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
+			if inflight.err == nil {
+				tableMetadata, found := keyspaceMetadata.Tables[table]
+				if !found {
+					inflight.err = ErrNoMetadata
+				}
+
+				partitionKey = tableMetadata.PartitionKey
+			}
+		}
+	} else {
+		// no connections
+		inflight.err = ErrNoConnections
+	}
+
+	if inflight.err != nil {
+		// remove from the cache
+		s.routingKeyInfoCache.mu.Lock()
+		s.routingKeyInfoCache.lru.Remove(cacheKey)
+		s.routingKeyInfoCache.mu.Unlock()
+		return nil
+	}
+
+	size := len(partitionKey)
+	routingKeyInfo := &routingKeyInfo{
+		indexes: make([]int, size),
+		types:   make([]*TypeInfo, size),
+	}
+	for i, keyColumn := range partitionKey {
+		routingKeyInfo.indexes[i] = -1
+		// find the column in the query info
+		for j, boundColumn := range queryInfo.Args {
+			if keyColumn.Name == boundColumn.Name {
+				// there may be many such columns, pick the first
+				routingKeyInfo.indexes[i] = j
+				routingKeyInfo.types[i] = boundColumn.TypeInfo
+				break
+			}
+		}
+
+		if routingKeyInfo.indexes[i] == -1 {
+			// missing a routing key column mapping
+			// no error, but cache a nil result
+			return nil
+		}
+	}
+
+	// cache this result
+	inflight.value = routingKeyInfo
+
+	return routingKeyInfo
+}
+
 // ExecuteBatch executes a batch operation and returns nil if successful
 // otherwise an error is returned describing the failure.
 func (s *Session) ExecuteBatch(batch *Batch) error {
@@ -234,6 +342,7 @@ type Query struct {
 	values       []interface{}
 	cons         Consistency
 	pageSize     int
+	routingKey   []byte
 	pageState    []byte
 	prefetch     float64
 	trace        Tracer
@@ -287,6 +396,58 @@ func (q *Query) PageSize(n int) *Query {
 	return q
 }
 
+// RoutingKey sets the routing key to use when a token aware connection
+// pool is used to optimize the routing of this query.
+func (q *Query) RoutingKey(routingKey []byte) *Query {
+	q.routingKey = routingKey
+	return q
+}
+
+// GetRoutingKey gets the routing key to use for routing this query. If
+// a routing key has not been explicitly set, then the routing key will
+// be constructed if possible using the keyspace's schema and the query
+// info for this query statement.
+func (q *Query) GetRoutingKey() []byte {
+	if q.routingKey != nil {
+		return q.routingKey
+	}
+
+	// try to determine the routing key
+	routingKeyInfo := q.session.routingKeyInfo(q.stmt)
+	if routingKeyInfo == nil {
+		return nil
+	}
+
+	if len(routingKeyInfo.indexes) == 1 {
+		// single column routing key
+		routingKey, err := Marshal(
+			routingKeyInfo.types[0],
+			q.values[routingKeyInfo.indexes[0]],
+		)
+		if err != nil {
+			return nil
+		}
+		return routingKey
+	}
+
+	// composite routing key
+	buf := &bytes.Buffer{}
+	for i := range routingKeyInfo.indexes {
+		encoded, err := Marshal(
+			routingKeyInfo.types[i],
+			q.values[routingKeyInfo.indexes[i]],
+		)
+		if err != nil {
+			return nil
+		}
+		binary.Write(buf, binary.BigEndian, int16(len(encoded)))
+		buf.Write(encoded)
+		buf.WriteByte(0x00)
+	}
+	routingKey := buf.Bytes()
+	return routingKey
+}
+
 func (q *Query) shouldPrepare() bool {
 
 	stmt := strings.TrimLeftFunc(strings.TrimRightFunc(q.stmt, func(r rune) bool {
@@ -610,6 +771,34 @@ type ColumnInfo struct {
 	TypeInfo *TypeInfo
 }
 
+// routing key indexes LRU cache
+type routingKeyInfoLRU struct {
+	lru *lru.Cache
+	mu  sync.Mutex
+}
+
+type routingKeyInfo struct {
+	indexes []int
+	types   []*TypeInfo
+}
+
+//Max adjusts the maximum size of the cache and cleans up the oldest records if
+//the new max is lower than the previous value. Not concurrency safe.
+func (q *routingKeyInfoLRU) Max(max int) {
+	q.mu.Lock()
+	for q.lru.Len() > max {
+		q.lru.RemoveOldest()
+	}
+	q.lru.MaxEntries = max
+	q.mu.Unlock()
+}
+
+type inflightCachedEntry struct {
+	wg    sync.WaitGroup
+	err   error
+	value interface{}
+}
+
 // Tracer is the interface implemented by query tracers. Tracers have the
 // ability to obtain a detailed event log of all events that happened during
 // the execution of a query from Cassandra. Gathering this information might
@@ -682,6 +871,7 @@ var (
 	ErrSessionClosed = errors.New("session has been closed")
 	ErrNoConnections = errors.New("no connections available")
 	ErrNoKeyspace    = errors.New("no keyspace provided")
+	ErrNoMetadata    = errors.New("no metadata available")
 )
 
 type ErrProtocol struct{ error }