|
|
@@ -68,6 +68,8 @@ type Session struct {
|
|
|
|
|
|
closeMu sync.RWMutex
|
|
|
isClosed bool
|
|
|
+
|
|
|
+ queryPool *sync.Pool
|
|
|
}
|
|
|
|
|
|
func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
|
|
|
@@ -98,11 +100,12 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
|
|
|
}
|
|
|
|
|
|
s := &Session{
|
|
|
- cons: cfg.Consistency,
|
|
|
- prefetch: 0.25,
|
|
|
- cfg: cfg,
|
|
|
- pageSize: cfg.PageSize,
|
|
|
- stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
|
|
|
+ cons: cfg.Consistency,
|
|
|
+ prefetch: 0.25,
|
|
|
+ cfg: cfg,
|
|
|
+ pageSize: cfg.PageSize,
|
|
|
+ stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
|
|
|
+ queryPool: &sync.Pool{New: func() interface{} { return new(Query) }},
|
|
|
}
|
|
|
|
|
|
connCfg, err := connConfig(s)
|
|
|
@@ -259,11 +262,17 @@ func (s *Session) SetTrace(trace Tracer) {
|
|
|
// if it has not previously been executed.
|
|
|
func (s *Session) Query(stmt string, values ...interface{}) *Query {
|
|
|
s.mu.RLock()
|
|
|
- qry := &Query{stmt: stmt, values: values, cons: s.cons,
|
|
|
- session: s, pageSize: s.pageSize, trace: s.trace,
|
|
|
- prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency,
|
|
|
- defaultTimestamp: s.cfg.DefaultTimestamp,
|
|
|
- }
|
|
|
+ qry := s.queryPool.Get().(*Query)
|
|
|
+ qry.stmt = stmt
|
|
|
+ qry.values = values
|
|
|
+ qry.cons = s.cons
|
|
|
+ qry.session = s
|
|
|
+ qry.pageSize = s.pageSize
|
|
|
+ qry.trace = s.trace
|
|
|
+ qry.prefetch = s.prefetch
|
|
|
+ qry.rt = s.cfg.RetryPolicy
|
|
|
+ qry.serialCons = s.cfg.SerialConsistency
|
|
|
+ qry.defaultTimestamp = s.cfg.DefaultTimestamp
|
|
|
s.mu.RUnlock()
|
|
|
return qry
|
|
|
}
|
|
|
@@ -947,6 +956,43 @@ func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error
|
|
|
return applied, iter.Close()
|
|
|
}
|
|
|
|
|
|
+// Release releases a query back into a pool of queries. Released Queries
|
|
|
+// cannot be reused.
|
|
|
+//
|
|
|
+// Example:
|
|
|
+// qry := session.Query("SELECT * FROM my_table")
|
|
|
+// qry.Exec()
|
|
|
+// qry.Release()
|
|
|
+func (q *Query) Release() {
|
|
|
+ if q == nil || q.session == nil || q.session.queryPool == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ pool := q.session.queryPool
|
|
|
+ pool.Put(q)
|
|
|
+}
|
|
|
+
|
|
|
+// reset zeroes out all fields of a query so that it can be safely pooled.
|
|
|
+func (q *Query) reset() {
|
|
|
+ q.stmt = ""
|
|
|
+ q.values = nil
|
|
|
+ q.cons = 0
|
|
|
+ q.pageSize = 0
|
|
|
+ q.routingKey = nil
|
|
|
+ q.routingKeyBuffer = nil
|
|
|
+ q.pageState = nil
|
|
|
+ q.prefetch = 0
|
|
|
+ q.trace = nil
|
|
|
+ q.session = nil
|
|
|
+ q.rt = nil
|
|
|
+ q.binding = nil
|
|
|
+ q.attempts = 0
|
|
|
+ q.totalLatency = 0
|
|
|
+ q.serialCons = 0
|
|
|
+ q.defaultTimestamp = false
|
|
|
+ q.disableSkipMetadata = false
|
|
|
+ q.disableAutoPage = false
|
|
|
+}
|
|
|
+
|
|
|
// Iter represents an iterator that can be used to iterate over all rows that
|
|
|
// were returned by a query. The iterator might send additional queries to the
|
|
|
// database during the iteration if paging was enabled.
|