session.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. )
  8. // Session is the interface used by users to interact with the database.
  9. //
  10. // It extends the Node interface by adding a convinient query builder and
  11. // automatically sets a default consinstency level on all operations
  12. // that do not have a consistency level set.
  13. type Session struct {
  14. Node Node
  15. Cons Consistency
  16. }
  17. // NewSession wraps an existing Node.
  18. func NewSession(node Node) *Session {
  19. return &Session{Node: node, Cons: Quorum}
  20. }
  21. // Query can be used to build new queries that should be executed on this
  22. // session.
  23. func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
  24. return QueryBuilder{NewQuery(stmt, args...), s}
  25. }
  26. // Do can be used to modify a copy of an existing query before it is
  27. // executed on this session.
  28. func (s *Session) Do(qry *Query) QueryBuilder {
  29. q := *qry
  30. return QueryBuilder{&q, s}
  31. }
  32. // Close closes all connections. The session is unuseable after this
  33. // operation.
  34. func (s *Session) Close() {
  35. s.Node.Close()
  36. }
  37. // ExecuteBatch executes a Batch on the underlying Node.
  38. func (s *Session) ExecuteBatch(batch *Batch) error {
  39. /*
  40. if batch.Cons == 0 {
  41. batch.Cons = s.Cons
  42. }
  43. return s.Node.ExecuteBatch(batch)
  44. */
  45. return nil
  46. }
  47. // ExecuteQuery executes a Query on the underlying Node.
  48. func (s *Session) ExecuteQuery(qry *Query) *Iter {
  49. return s.executeQuery(qry, nil)
  50. }
  51. func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
  52. if qry.Cons == 0 {
  53. qry.Cons = s.Cons
  54. }
  55. conn := s.Node.Pick(qry)
  56. if conn == nil {
  57. return &Iter{err: ErrUnavailable}
  58. }
  59. op := &queryFrame{
  60. Stmt: qry.Stmt,
  61. Cons: qry.Cons,
  62. PageSize: qry.PageSize,
  63. PageState: pageState,
  64. }
  65. if len(qry.Args) > 0 {
  66. info, err := conn.prepareStatement(qry.Stmt)
  67. if err != nil {
  68. return &Iter{err: err}
  69. }
  70. op.Prepared = info.id
  71. op.Values = make([][]byte, len(qry.Args))
  72. for i := 0; i < len(qry.Args); i++ {
  73. val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
  74. if err != nil {
  75. return &Iter{err: err}
  76. }
  77. op.Values[i] = val
  78. }
  79. }
  80. resp, err := conn.exec(op)
  81. if err != nil {
  82. return &Iter{err: err}
  83. }
  84. switch x := resp.(type) {
  85. case resultVoidFrame:
  86. return &Iter{}
  87. case resultRowsFrame:
  88. iter := &Iter{columns: x.Columns, rows: x.Rows}
  89. if len(x.PagingState) > 0 {
  90. iter.session = s
  91. iter.qry = qry
  92. iter.pageState = x.PagingState
  93. }
  94. return iter
  95. case resultKeyspaceFrame:
  96. conn.cluster.HandleKeyspace(conn, x.Keyspace)
  97. return &Iter{}
  98. case error:
  99. return &Iter{err: x}
  100. default:
  101. return &Iter{err: ErrProtocol}
  102. }
  103. }
  104. type Query struct {
  105. Stmt string
  106. Args []interface{}
  107. Cons Consistency
  108. Token string
  109. PageSize int
  110. Trace bool
  111. }
  112. func NewQuery(stmt string, args ...interface{}) *Query {
  113. return &Query{Stmt: stmt, Args: args}
  114. }
  115. type QueryBuilder struct {
  116. qry *Query
  117. session *Session
  118. }
  119. // Args specifies the query parameters.
  120. func (b QueryBuilder) Args(args ...interface{}) {
  121. b.qry.Args = args
  122. }
  123. // Consistency sets the consistency level for this query. If no consistency
  124. // level have been set, the default consistency level of the cluster
  125. // is used.
  126. func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
  127. b.qry.Cons = cons
  128. return b
  129. }
  130. func (b QueryBuilder) Token(token string) QueryBuilder {
  131. b.qry.Token = token
  132. return b
  133. }
  134. func (b QueryBuilder) Trace(trace bool) QueryBuilder {
  135. b.qry.Trace = trace
  136. return b
  137. }
  138. func (b QueryBuilder) PageSize(size int) QueryBuilder {
  139. b.qry.PageSize = size
  140. return b
  141. }
  142. func (b QueryBuilder) Exec() error {
  143. iter := b.session.ExecuteQuery(b.qry)
  144. return iter.err
  145. }
  146. func (b QueryBuilder) Iter() *Iter {
  147. return b.session.ExecuteQuery(b.qry)
  148. }
  149. func (b QueryBuilder) Scan(values ...interface{}) error {
  150. iter := b.Iter()
  151. iter.Scan(values...)
  152. return iter.Close()
  153. }
  154. type Iter struct {
  155. err error
  156. pos int
  157. rows [][][]byte
  158. columns []ColumnInfo
  159. qry *Query
  160. session *Session
  161. pageState []byte
  162. }
  163. func (iter *Iter) Columns() []ColumnInfo {
  164. return iter.columns
  165. }
  166. func (iter *Iter) Scan(values ...interface{}) bool {
  167. if iter.err != nil {
  168. return false
  169. }
  170. if iter.pos >= len(iter.rows) {
  171. if len(iter.pageState) > 0 {
  172. *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
  173. return iter.Scan(values...)
  174. }
  175. return false
  176. }
  177. if len(values) != len(iter.columns) {
  178. iter.err = errors.New("count mismatch")
  179. return false
  180. }
  181. for i := 0; i < len(iter.columns); i++ {
  182. err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
  183. if err != nil {
  184. iter.err = err
  185. return false
  186. }
  187. }
  188. iter.pos++
  189. return true
  190. }
  191. func (iter *Iter) Close() error {
  192. return iter.err
  193. }
  194. type Batch struct {
  195. Type BatchType
  196. Entries []BatchEntry
  197. Cons Consistency
  198. }
  199. func NewBatch(typ BatchType) *Batch {
  200. return &Batch{Type: typ}
  201. }
  202. func (b *Batch) Query(stmt string, args ...interface{}) {
  203. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  204. }
  205. type BatchType int
  206. const (
  207. LoggedBatch BatchType = 0
  208. UnloggedBatch BatchType = 1
  209. CounterBatch BatchType = 2
  210. )
  211. type BatchEntry struct {
  212. Stmt string
  213. Args []interface{}
  214. }
  215. type Consistency int
  216. const (
  217. Any Consistency = 1 + iota
  218. One
  219. Two
  220. Three
  221. Quorum
  222. All
  223. LocalQuorum
  224. EachQuorum
  225. Serial
  226. LocalSerial
  227. )
  228. var consinstencyNames = []string{
  229. 0: "default",
  230. Any: "any",
  231. One: "one",
  232. Two: "two",
  233. Three: "three",
  234. Quorum: "quorum",
  235. All: "all",
  236. LocalQuorum: "localquorum",
  237. EachQuorum: "eachquorum",
  238. Serial: "serial",
  239. LocalSerial: "localserial",
  240. }
  241. func (c Consistency) String() string {
  242. return consinstencyNames[c]
  243. }
  244. type ColumnInfo struct {
  245. Keyspace string
  246. Table string
  247. Name string
  248. TypeInfo *TypeInfo
  249. }
  250. type Error struct {
  251. Code int
  252. Message string
  253. }
  254. func (e Error) Error() string {
  255. return e.Message
  256. }
  257. var (
  258. ErrNotFound = errors.New("not found")
  259. ErrUnavailable = errors.New("unavailable")
  260. ErrProtocol = errors.New("protocol error")
  261. ErrUnsupported = errors.New("feature not supported")
  262. )