session.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. )
  8. // Session is the interface used by users to interact with the database.
  9. //
  10. // It extends the Node interface by adding a convinient query builder and
  11. // automatically sets a default consinstency level on all operations
  12. // that do not have a consistency level set.
  13. type Session struct {
  14. Node Node
  15. Cons Consistency
  16. }
  17. // NewSession wraps an existing Node.
  18. func NewSession(node Node) *Session {
  19. return &Session{Node: node, Cons: Quorum}
  20. }
  21. // Query can be used to build new queries that should be executed on this
  22. // session.
  23. func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
  24. return QueryBuilder{NewQuery(stmt, args...), s}
  25. }
  26. // Do can be used to modify a copy of an existing query before it is
  27. // executed on this session.
  28. func (s *Session) Do(qry *Query) QueryBuilder {
  29. q := *qry
  30. return QueryBuilder{&q, s}
  31. }
  32. // Close closes all connections. The session is unuseable after this
  33. // operation.
  34. func (s *Session) Close() {
  35. s.Node.Close()
  36. }
  37. // ExecuteQuery executes a Query on the underlying Node.
  38. func (s *Session) ExecuteQuery(qry *Query) *Iter {
  39. return s.executeQuery(qry, nil)
  40. }
  41. func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
  42. if qry.Cons == 0 {
  43. qry.Cons = s.Cons
  44. }
  45. conn := s.Node.Pick(qry)
  46. if conn == nil {
  47. return &Iter{err: ErrUnavailable}
  48. }
  49. op := &queryFrame{
  50. Stmt: qry.Stmt,
  51. Cons: qry.Cons,
  52. PageSize: qry.PageSize,
  53. PageState: pageState,
  54. }
  55. if len(qry.Args) > 0 {
  56. info, err := conn.prepareStatement(qry.Stmt)
  57. if err != nil {
  58. return &Iter{err: err}
  59. }
  60. op.Prepared = info.id
  61. op.Values = make([][]byte, len(qry.Args))
  62. for i := 0; i < len(qry.Args); i++ {
  63. val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
  64. if err != nil {
  65. return &Iter{err: err}
  66. }
  67. op.Values[i] = val
  68. }
  69. }
  70. resp, err := conn.exec(op)
  71. if err != nil {
  72. return &Iter{err: err}
  73. }
  74. switch x := resp.(type) {
  75. case resultVoidFrame:
  76. return &Iter{}
  77. case resultRowsFrame:
  78. iter := &Iter{columns: x.Columns, rows: x.Rows}
  79. if len(x.PagingState) > 0 {
  80. iter.session = s
  81. iter.qry = qry
  82. iter.pageState = x.PagingState
  83. }
  84. return iter
  85. case resultKeyspaceFrame:
  86. conn.cluster.HandleKeyspace(conn, x.Keyspace)
  87. return &Iter{}
  88. case error:
  89. return &Iter{err: x}
  90. default:
  91. return &Iter{err: ErrProtocol}
  92. }
  93. }
  94. func (s *Session) ExecuteBatch(batch *Batch) error {
  95. conn := s.Node.Pick(nil)
  96. if conn == nil {
  97. return ErrUnavailable
  98. }
  99. return conn.executeBatch(batch)
  100. }
  101. type Query struct {
  102. Stmt string
  103. Args []interface{}
  104. Cons Consistency
  105. Token string
  106. PageSize int
  107. Trace bool
  108. }
  109. func NewQuery(stmt string, args ...interface{}) *Query {
  110. return &Query{Stmt: stmt, Args: args}
  111. }
  112. type QueryBuilder struct {
  113. qry *Query
  114. session *Session
  115. }
  116. // Args specifies the query parameters.
  117. func (b QueryBuilder) Args(args ...interface{}) {
  118. b.qry.Args = args
  119. }
  120. // Consistency sets the consistency level for this query. If no consistency
  121. // level have been set, the default consistency level of the cluster
  122. // is used.
  123. func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
  124. b.qry.Cons = cons
  125. return b
  126. }
  127. func (b QueryBuilder) Token(token string) QueryBuilder {
  128. b.qry.Token = token
  129. return b
  130. }
  131. func (b QueryBuilder) Trace(trace bool) QueryBuilder {
  132. b.qry.Trace = trace
  133. return b
  134. }
  135. func (b QueryBuilder) PageSize(size int) QueryBuilder {
  136. b.qry.PageSize = size
  137. return b
  138. }
  139. func (b QueryBuilder) Exec() error {
  140. iter := b.session.ExecuteQuery(b.qry)
  141. return iter.err
  142. }
  143. func (b QueryBuilder) Iter() *Iter {
  144. return b.session.ExecuteQuery(b.qry)
  145. }
  146. func (b QueryBuilder) Scan(values ...interface{}) error {
  147. iter := b.Iter()
  148. iter.Scan(values...)
  149. return iter.Close()
  150. }
  151. type Iter struct {
  152. err error
  153. pos int
  154. rows [][][]byte
  155. columns []ColumnInfo
  156. qry *Query
  157. session *Session
  158. pageState []byte
  159. }
  160. func (iter *Iter) Columns() []ColumnInfo {
  161. return iter.columns
  162. }
  163. func (iter *Iter) Scan(values ...interface{}) bool {
  164. if iter.err != nil {
  165. return false
  166. }
  167. if iter.pos >= len(iter.rows) {
  168. if len(iter.pageState) > 0 {
  169. *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
  170. return iter.Scan(values...)
  171. }
  172. return false
  173. }
  174. if len(values) != len(iter.columns) {
  175. iter.err = errors.New("count mismatch")
  176. return false
  177. }
  178. for i := 0; i < len(iter.columns); i++ {
  179. err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
  180. if err != nil {
  181. iter.err = err
  182. return false
  183. }
  184. }
  185. iter.pos++
  186. return true
  187. }
  188. func (iter *Iter) Close() error {
  189. return iter.err
  190. }
  191. type Batch struct {
  192. Type BatchType
  193. Entries []BatchEntry
  194. Cons Consistency
  195. }
  196. func NewBatch(typ BatchType) *Batch {
  197. return &Batch{Type: typ}
  198. }
  199. func (b *Batch) Query(stmt string, args ...interface{}) {
  200. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  201. }
  202. type BatchType int
  203. const (
  204. LoggedBatch BatchType = 0
  205. UnloggedBatch BatchType = 1
  206. CounterBatch BatchType = 2
  207. )
  208. type BatchEntry struct {
  209. Stmt string
  210. Args []interface{}
  211. }
  212. type Consistency int
  213. const (
  214. Any Consistency = 1 + iota
  215. One
  216. Two
  217. Three
  218. Quorum
  219. All
  220. LocalQuorum
  221. EachQuorum
  222. Serial
  223. LocalSerial
  224. )
  225. var consinstencyNames = []string{
  226. 0: "default",
  227. Any: "any",
  228. One: "one",
  229. Two: "two",
  230. Three: "three",
  231. Quorum: "quorum",
  232. All: "all",
  233. LocalQuorum: "localquorum",
  234. EachQuorum: "eachquorum",
  235. Serial: "serial",
  236. LocalSerial: "localserial",
  237. }
  238. func (c Consistency) String() string {
  239. return consinstencyNames[c]
  240. }
  241. type ColumnInfo struct {
  242. Keyspace string
  243. Table string
  244. Name string
  245. TypeInfo *TypeInfo
  246. }
  247. type Error struct {
  248. Code int
  249. Message string
  250. }
  251. func (e Error) Error() string {
  252. return e.Message
  253. }
  254. var (
  255. ErrNotFound = errors.New("not found")
  256. ErrUnavailable = errors.New("unavailable")
  257. ErrProtocol = errors.New("protocol error")
  258. ErrUnsupported = errors.New("feature not supported")
  259. )