session.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. )
  12. // Session is the interface used by users to interact with the database.
  13. //
  14. // It extends the Node interface by adding a convinient query builder and
  15. // automatically sets a default consinstency level on all operations
  16. // that do not have a consistency level set.
  17. type Session struct {
  18. Node Node
  19. Cons Consistency
  20. }
  21. // NewSession wraps an existing Node.
  22. func NewSession(node Node) *Session {
  23. return &Session{Node: node, Cons: Quorum}
  24. }
  25. // Query can be used to build new queries that should be executed on this
  26. // session.
  27. func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
  28. return QueryBuilder{NewQuery(stmt, args...), s}
  29. }
  30. // Do can be used to modify a copy of an existing query before it is
  31. // executed on this session.
  32. func (s *Session) Do(qry *Query) QueryBuilder {
  33. q := *qry
  34. return QueryBuilder{&q, s}
  35. }
  36. // Close closes all connections. The session is unuseable after this
  37. // operation.
  38. func (s *Session) Close() {
  39. s.Node.Close()
  40. }
  41. // ExecuteQuery executes a Query on the underlying Node.
  42. func (s *Session) ExecuteQuery(qry *Query) *Iter {
  43. return s.executeQuery(qry, nil)
  44. }
  45. func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
  46. if qry.Cons == 0 {
  47. qry.Cons = s.Cons
  48. }
  49. conn := s.Node.Pick(qry)
  50. if conn == nil {
  51. return &Iter{err: ErrUnavailable}
  52. }
  53. iter := conn.executeQuery(qry, pageState)
  54. if len(iter.pageState) > 0 {
  55. iter.qry = qry
  56. iter.session = s
  57. }
  58. return iter
  59. }
  60. func (s *Session) ExecuteBatch(batch *Batch) error {
  61. conn := s.Node.Pick(nil)
  62. if conn == nil {
  63. return ErrUnavailable
  64. }
  65. return conn.executeBatch(batch)
  66. }
  67. type Query struct {
  68. Stmt string
  69. Args []interface{}
  70. Cons Consistency
  71. Token string
  72. PageSize int
  73. Trace Tracer
  74. }
  75. func NewQuery(stmt string, args ...interface{}) *Query {
  76. return &Query{Stmt: stmt, Args: args}
  77. }
  78. type QueryBuilder struct {
  79. qry *Query
  80. session *Session
  81. }
  82. // Args specifies the query parameters.
  83. func (b QueryBuilder) Args(args ...interface{}) {
  84. b.qry.Args = args
  85. }
  86. // Consistency sets the consistency level for this query. If no consistency
  87. // level have been set, the default consistency level of the cluster
  88. // is used.
  89. func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
  90. b.qry.Cons = cons
  91. return b
  92. }
  93. func (b QueryBuilder) Token(token string) QueryBuilder {
  94. b.qry.Token = token
  95. return b
  96. }
  97. func (b QueryBuilder) Trace(trace Tracer) QueryBuilder {
  98. b.qry.Trace = trace
  99. return b
  100. }
  101. func (b QueryBuilder) PageSize(size int) QueryBuilder {
  102. b.qry.PageSize = size
  103. return b
  104. }
  105. func (b QueryBuilder) Exec() error {
  106. iter := b.session.ExecuteQuery(b.qry)
  107. return iter.err
  108. }
  109. func (b QueryBuilder) Iter() *Iter {
  110. return b.session.ExecuteQuery(b.qry)
  111. }
  112. func (b QueryBuilder) Scan(values ...interface{}) error {
  113. iter := b.Iter()
  114. iter.Scan(values...)
  115. return iter.Close()
  116. }
  117. type Iter struct {
  118. err error
  119. pos int
  120. rows [][][]byte
  121. columns []ColumnInfo
  122. qry *Query
  123. session *Session
  124. pageState []byte
  125. }
  126. func (iter *Iter) Columns() []ColumnInfo {
  127. return iter.columns
  128. }
  129. func (iter *Iter) Scan(values ...interface{}) bool {
  130. if iter.err != nil {
  131. return false
  132. }
  133. if iter.pos >= len(iter.rows) {
  134. if len(iter.pageState) > 0 {
  135. *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
  136. return iter.Scan(values...)
  137. }
  138. return false
  139. }
  140. if len(values) != len(iter.columns) {
  141. iter.err = errors.New("count mismatch")
  142. return false
  143. }
  144. for i := 0; i < len(iter.columns); i++ {
  145. err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
  146. if err != nil {
  147. iter.err = err
  148. return false
  149. }
  150. }
  151. iter.pos++
  152. return true
  153. }
  154. func (iter *Iter) Close() error {
  155. return iter.err
  156. }
  157. type Batch struct {
  158. Type BatchType
  159. Entries []BatchEntry
  160. Cons Consistency
  161. }
  162. func NewBatch(typ BatchType) *Batch {
  163. return &Batch{Type: typ}
  164. }
  165. func (b *Batch) Query(stmt string, args ...interface{}) {
  166. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  167. }
  168. type BatchType int
  169. const (
  170. LoggedBatch BatchType = 0
  171. UnloggedBatch BatchType = 1
  172. CounterBatch BatchType = 2
  173. )
  174. type BatchEntry struct {
  175. Stmt string
  176. Args []interface{}
  177. }
  178. type Consistency int
  179. const (
  180. Any Consistency = 1 + iota
  181. One
  182. Two
  183. Three
  184. Quorum
  185. All
  186. LocalQuorum
  187. EachQuorum
  188. Serial
  189. LocalSerial
  190. )
  191. var consinstencyNames = []string{
  192. 0: "default",
  193. Any: "any",
  194. One: "one",
  195. Two: "two",
  196. Three: "three",
  197. Quorum: "quorum",
  198. All: "all",
  199. LocalQuorum: "localquorum",
  200. EachQuorum: "eachquorum",
  201. Serial: "serial",
  202. LocalSerial: "localserial",
  203. }
  204. func (c Consistency) String() string {
  205. return consinstencyNames[c]
  206. }
  207. type ColumnInfo struct {
  208. Keyspace string
  209. Table string
  210. Name string
  211. TypeInfo *TypeInfo
  212. }
  213. type Tracer interface {
  214. Trace(conn *Conn, traceId []byte)
  215. }
  216. type traceWriter struct {
  217. w io.Writer
  218. mu sync.Mutex
  219. }
  220. func NewTraceWriter(w io.Writer) Tracer {
  221. return traceWriter{w: w}
  222. }
  223. func (t traceWriter) Trace(conn *Conn, traceId []byte) {
  224. var (
  225. coordinator string
  226. duration int
  227. )
  228. conn.executeQuery(&Query{
  229. Stmt: `SELECT coordinator, duration
  230. FROM system_traces.sessions
  231. WHERE session_id = ?`,
  232. Args: []interface{}{traceId},
  233. Cons: One,
  234. }, nil).Scan(&coordinator, &duration)
  235. iter := conn.executeQuery(&Query{
  236. Stmt: `SELECT event_id, activity, source, source_elapsed
  237. FROM system_traces.events
  238. WHERE session_id = ?`,
  239. Args: []interface{}{traceId},
  240. Cons: One,
  241. }, nil)
  242. var (
  243. timestamp time.Time
  244. activity string
  245. source string
  246. elapsed int
  247. )
  248. t.mu.Lock()
  249. defer t.mu.Unlock()
  250. fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
  251. traceId, coordinator, time.Duration(duration)*time.Microsecond)
  252. for iter.Scan(&timestamp, &activity, &source, &elapsed) {
  253. fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
  254. timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
  255. }
  256. if err := iter.Close(); err != nil {
  257. fmt.Fprintln(t.w, "Error:", err)
  258. }
  259. }
  260. type Error struct {
  261. Code int
  262. Message string
  263. }
  264. func (e Error) Error() string {
  265. return e.Message
  266. }
  267. var (
  268. ErrNotFound = errors.New("not found")
  269. ErrUnavailable = errors.New("unavailable")
  270. ErrProtocol = errors.New("protocol error")
  271. ErrUnsupported = errors.New("feature not supported")
  272. )