session.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. )
  12. // Session is the interface used by users to interact with the database.
  13. //
  14. // It extends the Node interface by adding a convinient query builder and
  15. // automatically sets a default consinstency level on all operations
  16. // that do not have a consistency level set.
  17. type Session struct {
  18. Node Node
  19. Cons Consistency
  20. }
  21. // NewSession wraps an existing Node.
  22. func NewSession(node Node) *Session {
  23. return &Session{Node: node, Cons: Quorum}
  24. }
  25. // Query can be used to build new queries that should be executed on this
  26. // session.
  27. func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
  28. return QueryBuilder{NewQuery(stmt, args...), s}
  29. }
  30. // Do can be used to modify a copy of an existing query before it is
  31. // executed on this session.
  32. func (s *Session) Do(qry *Query) QueryBuilder {
  33. q := *qry
  34. return QueryBuilder{&q, s}
  35. }
  36. // Close closes all connections. The session is unuseable after this
  37. // operation.
  38. func (s *Session) Close() {
  39. s.Node.Close()
  40. }
  41. // ExecuteQuery executes a Query on the underlying Node.
  42. func (s *Session) ExecuteQuery(qry *Query) *Iter {
  43. return s.executeQuery(qry, nil)
  44. }
  45. func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
  46. if qry.Cons == 0 {
  47. qry.Cons = s.Cons
  48. }
  49. conn := s.Node.Pick(qry)
  50. if conn == nil {
  51. return &Iter{err: ErrUnavailable}
  52. }
  53. iter := conn.executeQuery(qry, pageState)
  54. if len(iter.pageState) > 0 {
  55. iter.qry = qry
  56. iter.session = s
  57. }
  58. return iter
  59. }
  60. func (s *Session) ExecuteBatch(batch *Batch) error {
  61. conn := s.Node.Pick(nil)
  62. if conn == nil {
  63. return ErrUnavailable
  64. }
  65. return conn.executeBatch(batch)
  66. }
  67. type Query struct {
  68. Stmt string
  69. Args []interface{}
  70. Cons Consistency
  71. Token string
  72. PageSize int
  73. Trace Tracer
  74. }
  75. func NewQuery(stmt string, args ...interface{}) *Query {
  76. return &Query{Stmt: stmt, Args: args}
  77. }
  78. type QueryBuilder struct {
  79. qry *Query
  80. session *Session
  81. }
  82. // Args specifies the query parameters.
  83. func (b QueryBuilder) Args(args ...interface{}) {
  84. b.qry.Args = args
  85. }
  86. // Consistency sets the consistency level for this query. If no consistency
  87. // level have been set, the default consistency level of the cluster
  88. // is used.
  89. func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
  90. b.qry.Cons = cons
  91. return b
  92. }
  93. func (b QueryBuilder) Token(token string) QueryBuilder {
  94. b.qry.Token = token
  95. return b
  96. }
  97. // Trace enables tracing of this query. Look at the documentation of the
  98. // Tracer interface to learn more about tracing.
  99. func (b QueryBuilder) Trace(trace Tracer) QueryBuilder {
  100. b.qry.Trace = trace
  101. return b
  102. }
  103. func (b QueryBuilder) PageSize(size int) QueryBuilder {
  104. b.qry.PageSize = size
  105. return b
  106. }
  107. func (b QueryBuilder) Exec() error {
  108. iter := b.session.ExecuteQuery(b.qry)
  109. return iter.err
  110. }
  111. func (b QueryBuilder) Iter() *Iter {
  112. return b.session.ExecuteQuery(b.qry)
  113. }
  114. func (b QueryBuilder) Scan(values ...interface{}) error {
  115. iter := b.Iter()
  116. iter.Scan(values...)
  117. return iter.Close()
  118. }
  119. type Iter struct {
  120. err error
  121. pos int
  122. rows [][][]byte
  123. columns []ColumnInfo
  124. qry *Query
  125. session *Session
  126. pageState []byte
  127. }
  128. func (iter *Iter) Columns() []ColumnInfo {
  129. return iter.columns
  130. }
  131. func (iter *Iter) Scan(values ...interface{}) bool {
  132. if iter.err != nil {
  133. return false
  134. }
  135. if iter.pos >= len(iter.rows) {
  136. if len(iter.pageState) > 0 {
  137. *iter = *iter.session.executeQuery(iter.qry, iter.pageState)
  138. return iter.Scan(values...)
  139. }
  140. return false
  141. }
  142. if len(values) != len(iter.columns) {
  143. iter.err = errors.New("count mismatch")
  144. return false
  145. }
  146. for i := 0; i < len(iter.columns); i++ {
  147. err := Unmarshal(iter.columns[i].TypeInfo, iter.rows[iter.pos][i], values[i])
  148. if err != nil {
  149. iter.err = err
  150. return false
  151. }
  152. }
  153. iter.pos++
  154. return true
  155. }
  156. func (iter *Iter) Close() error {
  157. return iter.err
  158. }
  159. type Batch struct {
  160. Type BatchType
  161. Entries []BatchEntry
  162. Cons Consistency
  163. }
  164. func NewBatch(typ BatchType) *Batch {
  165. return &Batch{Type: typ}
  166. }
  167. func (b *Batch) Query(stmt string, args ...interface{}) {
  168. b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
  169. }
  170. type BatchType int
  171. const (
  172. LoggedBatch BatchType = 0
  173. UnloggedBatch BatchType = 1
  174. CounterBatch BatchType = 2
  175. )
  176. type BatchEntry struct {
  177. Stmt string
  178. Args []interface{}
  179. }
  180. type Consistency int
  181. const (
  182. Any Consistency = 1 + iota
  183. One
  184. Two
  185. Three
  186. Quorum
  187. All
  188. LocalQuorum
  189. EachQuorum
  190. Serial
  191. LocalSerial
  192. )
  193. var consinstencyNames = []string{
  194. 0: "default",
  195. Any: "any",
  196. One: "one",
  197. Two: "two",
  198. Three: "three",
  199. Quorum: "quorum",
  200. All: "all",
  201. LocalQuorum: "localquorum",
  202. EachQuorum: "eachquorum",
  203. Serial: "serial",
  204. LocalSerial: "localserial",
  205. }
  206. func (c Consistency) String() string {
  207. return consinstencyNames[c]
  208. }
  209. type ColumnInfo struct {
  210. Keyspace string
  211. Table string
  212. Name string
  213. TypeInfo *TypeInfo
  214. }
  215. // Tracer is the interface implemented by query tracers. Tracers have the
  216. // ability to obtain a detailed event log of all events that happend during
  217. // the execution of a query from Cassandra. Gathering this information might
  218. // be essential for debugging and optimizing queries, but this feature should
  219. // not be used on production systems with very high load.
  220. type Tracer interface {
  221. Trace(conn *Conn, traceId []byte)
  222. }
  223. type traceWriter struct {
  224. w io.Writer
  225. mu sync.Mutex
  226. }
  227. // NewTraceWriter returns a simple Tracer implementation that outputs
  228. // the event log in a textual format.
  229. func NewTraceWriter(w io.Writer) Tracer {
  230. return traceWriter{w: w}
  231. }
  232. func (t traceWriter) Trace(conn *Conn, traceId []byte) {
  233. var (
  234. coordinator string
  235. duration int
  236. )
  237. conn.executeQuery(&Query{
  238. Stmt: `SELECT coordinator, duration
  239. FROM system_traces.sessions
  240. WHERE session_id = ?`,
  241. Args: []interface{}{traceId},
  242. Cons: One,
  243. }, nil).Scan(&coordinator, &duration)
  244. iter := conn.executeQuery(&Query{
  245. Stmt: `SELECT event_id, activity, source, source_elapsed
  246. FROM system_traces.events
  247. WHERE session_id = ?`,
  248. Args: []interface{}{traceId},
  249. Cons: One,
  250. }, nil)
  251. var (
  252. timestamp time.Time
  253. activity string
  254. source string
  255. elapsed int
  256. )
  257. t.mu.Lock()
  258. defer t.mu.Unlock()
  259. fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n",
  260. traceId, coordinator, time.Duration(duration)*time.Microsecond)
  261. for iter.Scan(&timestamp, &activity, &source, &elapsed) {
  262. fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n",
  263. timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed)
  264. }
  265. if err := iter.Close(); err != nil {
  266. fmt.Fprintln(t.w, "Error:", err)
  267. }
  268. }
  269. type Error struct {
  270. Code int
  271. Message string
  272. }
  273. func (e Error) Error() string {
  274. return e.Message
  275. }
  276. var (
  277. ErrNotFound = errors.New("not found")
  278. ErrUnavailable = errors.New("unavailable")
  279. ErrProtocol = errors.New("protocol error")
  280. ErrUnsupported = errors.New("feature not supported")
  281. )