session.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "time"
  10. )
  11. // Session is the interface used by users to interact with the database.
  12. //
  13. // It extends the Node interface by adding a convinient query builder and
  14. // automatically sets a default consinstency level on all operations
  15. // that do not have a consistency level set.
  16. type Session struct {
  17. Node Node
  18. Cons Consistency
  19. }
  20. // NewSession wraps an existing Node.
  21. func NewSession(node Node) *Session {
  22. return &Session{Node: node, Cons: Quorum}
  23. }
  24. // Query can be used to build new queries that should be executed on this
  25. // session.
  26. func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
  27. return QueryBuilder{NewQuery(stmt, args...), s}
  28. }
  29. // Do can be used to modify a copy of an existing query before it is
  30. // executed on this session.
  31. func (s *Session) Do(qry *Query) QueryBuilder {
  32. q := *qry
  33. return QueryBuilder{&q, s}
  34. }
  35. // Close closes all connections. The session is unuseable after this
  36. // operation.
  37. func (s *Session) Close() {
  38. s.Node.Close()
  39. }
  40. // ExecuteQuery executes a Query on the underlying Node.
  41. func (s *Session) ExecuteQuery(qry *Query) *Iter {
  42. return s.executeQuery(qry, nil)
  43. }
  44. func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
  45. if qry.Cons == 0 {
  46. qry.Cons = s.Cons
  47. }
  48. conn := s.Node.Pick(qry)
  49. if conn == nil {
  50. return &Iter{err: ErrUnavailable}
  51. }
  52. iter := conn.executeQuery(qry, pageState)
  53. if len(iter.pageState) > 0 {
  54. iter.qry = qry
  55. iter.session = s
  56. }
  57. return iter
  58. }
  59. func (s *Session) ExecuteBatch(batch *Batch) error {
  60. conn := s.Node.Pick(nil)
  61. if conn == nil {
  62. return ErrUnavailable
  63. }
  64. return conn.executeBatch(batch)
  65. }
  66. type Query struct {
  67. Stmt string
  68. Args []interface{}
  69. Cons Consistency
  70. Token string
  71. PageSize int
  72. Trace Tracer
  73. }
  74. func NewQuery(stmt string, args ...interface{}) *Query {
  75. return &Query{Stmt: stmt, Args: args}
  76. }
  77. type QueryBuilder struct {
  78. qry *Query
  79. session *Session
  80. }
  81. // Args specifies the query parameters.
  82. func (b QueryBuilder) Args(args ...interface{}) {
  83. b.qry.Args = args
  84. }
  85. // Consistency sets the consistency level for this query. If no consistency
  86. // level have been set, the default consistency level of the cluster
  87. // is used.
  88. func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
  89. b.qry.Cons = cons
  90. return b
  91. }
  92. func (b QueryBuilder) Token(token string) QueryBuilder {
  93. b.qry.Token = token
  94. return b
  95. }
  96. func (b QueryBuilder) Trace(trace Tracer) QueryBuilder {
  97. b.qry.Trace = trace
  98. return b
  99. }
  100. func (b QueryBuilder) PageSize(size int) QueryBuilder {
  101. b.qry.PageSize = size
  102. return b
  103. }
  104. func (b QueryBuilder) Exec() error {
  105. iter := b.session.ExecuteQuery(b.qry)
  106. return iter.err
  107. }
  108. func (b QueryBuilder) Iter() *Iter {
  109. return b.session.ExecuteQuery(b.qry)
  110. }
  111. func (b QueryBuilder) Scan(values ...interface{}) error {
  112. iter := b.Iter()
  113. iter.Scan(values...)
  114. return iter.Close()
  115. }
  116. type Iter struct {
  117. err error
  118. pos int
  119. rows [][][]byte
  120. columns []ColumnInfo
  121. qry *Query
  122. session *Session
  123. pageState []byte
  124. }
  125. func (iter *Iter) Columns() []ColumnInfo {
  126. return iter.columns
  127. }
  128. func (iter *Iter) Scan(values ...interface{}) bool {
  129. if iter.err != nil {
  130. return false
  131. }
  132. if iter.pos >= len(iter.rows) {
  133. if len(iter.pageState) > 0 {
  134. *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
  135. return iter.Scan(values...)
  136. }
  137. return false
  138. }
  139. if len(values) != len(iter.columns) {
  140. iter.err = errors.New("count mismatch")
  141. return false
  142. }
  143. for i := 0; i < len(iter.columns); i++ {
  144. err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
  145. if err != nil {
  146. iter.err = err
  147. return false
  148. }
  149. }
  150. iter.pos++
  151. return true
  152. }
  153. func (iter *Iter) Close() error {
  154. return iter.err
  155. }
  156. type Batch struct {
  157. Type BatchType
  158. Entries []BatchEntry
  159. Cons Consistency
  160. }
  161. func NewBatch(typ BatchType) *Batch {
  162. return &Batch{Type: typ}
  163. }
  164. func (b *Batch) Query(stmt string, args ...interface{}) {
  165. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  166. }
  167. type BatchType int
  168. const (
  169. LoggedBatch BatchType = 0
  170. UnloggedBatch BatchType = 1
  171. CounterBatch BatchType = 2
  172. )
  173. type BatchEntry struct {
  174. Stmt string
  175. Args []interface{}
  176. }
  177. type Consistency int
  178. const (
  179. Any Consistency = 1 + iota
  180. One
  181. Two
  182. Three
  183. Quorum
  184. All
  185. LocalQuorum
  186. EachQuorum
  187. Serial
  188. LocalSerial
  189. )
  190. var consinstencyNames = []string{
  191. 0: "default",
  192. Any: "any",
  193. One: "one",
  194. Two: "two",
  195. Three: "three",
  196. Quorum: "quorum",
  197. All: "all",
  198. LocalQuorum: "localquorum",
  199. EachQuorum: "eachquorum",
  200. Serial: "serial",
  201. LocalSerial: "localserial",
  202. }
  203. func (c Consistency) String() string {
  204. return consinstencyNames[c]
  205. }
  206. type ColumnInfo struct {
  207. Keyspace string
  208. Table string
  209. Name string
  210. TypeInfo *TypeInfo
  211. }
  212. type Tracer interface {
  213. Trace(time time.Time, activity string, source string, elapsed int)
  214. }
  215. type traceWriter struct {
  216. w io.Writer
  217. }
  218. func NewTraceWriter(w io.Writer) Tracer {
  219. return traceWriter{w}
  220. }
  221. func (t traceWriter) Trace(time time.Time, activity string, source string, elapsed int) {
  222. fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n", time, activity, source, elapsed)
  223. }
  224. type Error struct {
  225. Code int
  226. Message string
  227. }
  228. func (e Error) Error() string {
  229. return e.Message
  230. }
  231. var (
  232. ErrNotFound = errors.New("not found")
  233. ErrUnavailable = errors.New("unavailable")
  234. ErrProtocol = errors.New("protocol error")
  235. ErrUnsupported = errors.New("feature not supported")
  236. )