|
|
@@ -27,44 +27,25 @@ func NewSession(node Node) *Session {
|
|
|
return &Session{Node: node, Cons: Quorum}
|
|
|
}
|
|
|
|
|
|
-// Query can be used to build new queries that should be executed on this
|
|
|
-// session.
|
|
|
-func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
|
|
|
- return QueryBuilder{NewQuery(stmt, args...), s}
|
|
|
+// Query generates a new query object for interacting with the database.
|
|
|
+// Further details of the query may be tweaked using the resulting query
|
|
|
+// value before the query is executed.
|
|
|
+func (s *Session) Query(stmt string, values ...interface{}) *Query {
|
|
|
+ return &Query{stmt: stmt, values: values, cons: s.Cons, session: s}
|
|
|
}
|
|
|
|
|
|
-// Do can be used to modify a copy of an existing query before it is
|
|
|
-// executed on this session.
|
|
|
-func (s *Session) Do(qry *Query) QueryBuilder {
|
|
|
- q := *qry
|
|
|
- return QueryBuilder{&q, s}
|
|
|
-}
|
|
|
-
|
|
|
-// Close closes all connections. The session is unuseable after this
|
|
|
+// Close closes all connections. The session is unusable after this
|
|
|
// operation.
|
|
|
func (s *Session) Close() {
|
|
|
s.Node.Close()
|
|
|
}
|
|
|
|
|
|
-// ExecuteQuery executes a Query on the underlying Node.
|
|
|
-func (s *Session) ExecuteQuery(qry *Query) *Iter {
|
|
|
- return s.executeQuery(qry, nil)
|
|
|
-}
|
|
|
-
|
|
|
-func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
|
|
|
- if qry.Cons == 0 {
|
|
|
- qry.Cons = s.Cons
|
|
|
- }
|
|
|
- conn := s.Node.Pick(qry)
|
|
|
+func (s *Session) executeQuery(qry *Query) *Iter {
|
|
|
+ conn := s.Node.Pick(nil)
|
|
|
if conn == nil {
|
|
|
- return &Iter{err: ErrUnavailable}
|
|
|
- }
|
|
|
- iter := conn.executeQuery(qry, pageState)
|
|
|
- if len(iter.pageState) > 0 {
|
|
|
- iter.qry = qry
|
|
|
- iter.session = s
|
|
|
+ return &Iter{qry: qry, err: ErrUnavailable}
|
|
|
}
|
|
|
- return iter
|
|
|
+ return conn.executeQuery(qry)
|
|
|
}
|
|
|
|
|
|
func (s *Session) ExecuteBatch(batch *Batch) error {
|
|
|
@@ -75,66 +56,59 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
|
|
|
return conn.executeBatch(batch)
|
|
|
}
|
|
|
|
|
|
+// Query represents a CQL statement that can be executed.
|
|
|
type Query struct {
|
|
|
- Stmt string
|
|
|
- Args []interface{}
|
|
|
- Cons Consistency
|
|
|
- Token string
|
|
|
- PageSize int
|
|
|
- Trace Tracer
|
|
|
-}
|
|
|
-
|
|
|
-func NewQuery(stmt string, args ...interface{}) *Query {
|
|
|
- return &Query{Stmt: stmt, Args: args}
|
|
|
-}
|
|
|
-
|
|
|
-type QueryBuilder struct {
|
|
|
- qry *Query
|
|
|
- session *Session
|
|
|
-}
|
|
|
-
|
|
|
-// Args specifies the query parameters.
|
|
|
-func (b QueryBuilder) Args(args ...interface{}) {
|
|
|
- b.qry.Args = args
|
|
|
+ stmt string
|
|
|
+ values []interface{}
|
|
|
+ cons Consistency
|
|
|
+ pageSize int
|
|
|
+ pageState []byte
|
|
|
+ trace Tracer
|
|
|
+ session *Session
|
|
|
}
|
|
|
|
|
|
// Consistency sets the consistency level for this query. If no consistency
|
|
|
// level have been set, the default consistency level of the cluster
|
|
|
// is used.
|
|
|
-func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
|
|
|
- b.qry.Cons = cons
|
|
|
- return b
|
|
|
-}
|
|
|
-
|
|
|
-func (b QueryBuilder) Token(token string) QueryBuilder {
|
|
|
- b.qry.Token = token
|
|
|
- return b
|
|
|
+func (q *Query) Consistency(c Consistency) *Query {
|
|
|
+ q.cons = c
|
|
|
+ return q
|
|
|
}
|
|
|
|
|
|
// Trace enables tracing of this query. Look at the documentation of the
|
|
|
// Tracer interface to learn more about tracing.
|
|
|
-func (b QueryBuilder) Trace(trace Tracer) QueryBuilder {
|
|
|
- b.qry.Trace = trace
|
|
|
- return b
|
|
|
+func (q *Query) Trace(trace Tracer) *Query {
|
|
|
+ q.trace = trace
|
|
|
+ return q
|
|
|
}
|
|
|
|
|
|
-func (b QueryBuilder) PageSize(size int) QueryBuilder {
|
|
|
- b.qry.PageSize = size
|
|
|
- return b
|
|
|
+// PageSize will tell the iterator to fetch the result in pages of size n.
|
|
|
+// This is useful for iterating over large result sets, but setting the
|
|
|
+// page size to low might decrease the performance. This feature is only
|
|
|
+// available in Cassandra 2 and onwards.
|
|
|
+func (q *Query) PageSize(n int) *Query {
|
|
|
+ q.pageSize = n
|
|
|
+ return q
|
|
|
}
|
|
|
|
|
|
-func (b QueryBuilder) Exec() error {
|
|
|
- iter := b.session.ExecuteQuery(b.qry)
|
|
|
+// Exec executes the query without returning any rows.
|
|
|
+func (q *Query) Exec() error {
|
|
|
+ iter := q.session.executeQuery(q)
|
|
|
return iter.err
|
|
|
}
|
|
|
|
|
|
-func (b QueryBuilder) Iter() *Iter {
|
|
|
- return b.session.ExecuteQuery(b.qry)
|
|
|
+// Iter executes the query and returns an iterator capable of iterating
|
|
|
+// over all results.
|
|
|
+func (q *Query) Iter() *Iter {
|
|
|
+ return q.session.executeQuery(q)
|
|
|
}
|
|
|
|
|
|
-func (b QueryBuilder) Scan(values ...interface{}) error {
|
|
|
- iter := b.Iter()
|
|
|
- iter.Scan(values...)
|
|
|
+// Scan executes the query, copies the columns of the first selected
|
|
|
+// row into the values pointed at by dest and discards the rest. If no rows
|
|
|
+// were selected, ErrNotFound is returned.
|
|
|
+func (q *Query) Scan(dest ...interface{}) error {
|
|
|
+ iter := q.Iter()
|
|
|
+ iter.Scan(dest...)
|
|
|
return iter.Close()
|
|
|
}
|
|
|
|
|
|
@@ -144,7 +118,6 @@ type Iter struct {
|
|
|
rows [][][]byte
|
|
|
columns []ColumnInfo
|
|
|
qry *Query
|
|
|
- session *Session
|
|
|
pageState []byte
|
|
|
}
|
|
|
|
|
|
@@ -152,23 +125,25 @@ func (iter *Iter) Columns() []ColumnInfo {
|
|
|
return iter.columns
|
|
|
}
|
|
|
|
|
|
-func (iter *Iter) Scan(values ...interface{}) bool {
|
|
|
+func (iter *Iter) Scan(dest ...interface{}) bool {
|
|
|
if iter.err != nil {
|
|
|
return false
|
|
|
}
|
|
|
if iter.pos >= len(iter.rows) {
|
|
|
if len(iter.pageState) > 0 {
|
|
|
- *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
|
|
|
- return iter.Scan(values...)
|
|
|
+ qry := *iter.qry
|
|
|
+ qry.pageState = iter.pageState
|
|
|
+ *iter = *iter.qry.session.executeQuery(&qry)
|
|
|
+ return iter.Scan(dest...)
|
|
|
}
|
|
|
return false
|
|
|
}
|
|
|
- if len(values) != len(iter.columns) {
|
|
|
+ if len(dest) != len(iter.columns) {
|
|
|
iter.err = errors.New("count mismatch")
|
|
|
return false
|
|
|
}
|
|
|
for i := 0; i < len(iter.columns); i++ {
|
|
|
- err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
|
|
|
+ err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], dest[i])
|
|
|
if err != nil {
|
|
|
iter.err = err
|
|
|
return false
|
|
|
@@ -250,45 +225,40 @@ type ColumnInfo struct {
|
|
|
}
|
|
|
|
|
|
// Tracer is the interface implemented by query tracers. Tracers have the
|
|
|
-// ability to obtain a detailed event log of all events that happend during
|
|
|
+// ability to obtain a detailed event log of all events that happened during
|
|
|
// the execution of a query from Cassandra. Gathering this information might
|
|
|
// be essential for debugging and optimizing queries, but this feature should
|
|
|
// not be used on production systems with very high load.
|
|
|
type Tracer interface {
|
|
|
- Trace(conn *Conn, traceId []byte)
|
|
|
+ Trace(traceId []byte)
|
|
|
}
|
|
|
|
|
|
type traceWriter struct {
|
|
|
- w io.Writer
|
|
|
- mu sync.Mutex
|
|
|
+ session *Session
|
|
|
+ w io.Writer
|
|
|
+ mu sync.Mutex
|
|
|
}
|
|
|
|
|
|
// NewTraceWriter returns a simple Tracer implementation that outputs
|
|
|
// the event log in a textual format.
|
|
|
-func NewTraceWriter(w io.Writer) Tracer {
|
|
|
- return traceWriter{w: w}
|
|
|
+func NewTraceWriter(session *Session, w io.Writer) Tracer {
|
|
|
+ return traceWriter{session: session, w: w}
|
|
|
}
|
|
|
|
|
|
-func (t traceWriter) Trace(conn *Conn, traceId []byte) {
|
|
|
+func (t traceWriter) Trace(traceId []byte) {
|
|
|
var (
|
|
|
coordinator string
|
|
|
duration int
|
|
|
)
|
|
|
- conn.executeQuery(&Query{
|
|
|
- Stmt: `SELECT coordinator, duration
|
|
|
+ t.session.Query(`SELECT coordinator, duration
|
|
|
FROM system_traces.sessions
|
|
|
- WHERE session_id = ?`,
|
|
|
- Args: []interface{}{traceId},
|
|
|
- Cons: One,
|
|
|
- }, nil).Scan(&coordinator, &duration)
|
|
|
+ WHERE session_id = ?`, traceId).
|
|
|
+ Consistency(One).Scan(&coordinator, &duration)
|
|
|
|
|
|
- iter := conn.executeQuery(&Query{
|
|
|
- Stmt: `SELECT event_id, activity, source, source_elapsed
|
|
|
+ iter := t.session.Query(`SELECT event_id, activity, source, source_elapsed
|
|
|
FROM system_traces.events
|
|
|
- WHERE session_id = ?`,
|
|
|
- Args: []interface{}{traceId},
|
|
|
- Cons: One,
|
|
|
- }, nil)
|
|
|
+ WHERE session_id = ?`, traceId).
|
|
|
+ Consistency(One).Iter()
|
|
|
var (
|
|
|
timestamp time.Time
|
|
|
activity string
|