gocql.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "strings"
  9. "time"
  10. )
  11. type Config struct {
  12. Nodes []string
  13. CQLVersion string
  14. Keyspace string
  15. Consistency Consistency
  16. DefaultPort int
  17. Timeout time.Duration
  18. NodePicker NodePicker
  19. Reconnector Reconnector
  20. }
  21. func (c *Config) normalize() {
  22. if c.CQLVersion == "" {
  23. c.CQLVersion = "3.0.0"
  24. }
  25. if c.DefaultPort == 0 {
  26. c.DefaultPort = 9042
  27. }
  28. if c.Timeout <= 0 {
  29. c.Timeout = 200 * time.Millisecond
  30. }
  31. if c.NodePicker == nil {
  32. c.NodePicker = NewRoundRobinPicker()
  33. }
  34. if c.Reconnector == nil {
  35. c.Reconnector = NewExponentialReconnector(1*time.Second, 10*time.Minute)
  36. }
  37. for i := 0; i < len(c.Nodes); i++ {
  38. c.Nodes[i] = strings.TrimSpace(c.Nodes[i])
  39. if strings.IndexByte(c.Nodes[i], ':') < 0 {
  40. c.Nodes[i] = fmt.Sprintf("%s:%d", c.Nodes[i], c.DefaultPort)
  41. }
  42. }
  43. }
  44. type Session struct {
  45. cfg *Config
  46. pool NodePicker
  47. reconnector Reconnector
  48. keyspace string
  49. nohosts chan bool
  50. }
  51. func NewSession(cfg Config) *Session {
  52. cfg.normalize()
  53. s := &Session{
  54. cfg: &cfg,
  55. nohosts: make(chan bool),
  56. reconnector: cfg.Reconnector,
  57. pool: cfg.NodePicker,
  58. }
  59. for _, address := range cfg.Nodes {
  60. go s.reconnector.Reconnect(s, address)
  61. }
  62. return s
  63. }
  64. func (s *Session) Query(stmt string, args ...interface{}) *Query {
  65. return &Query{
  66. stmt: stmt,
  67. args: args,
  68. cons: s.cfg.Consistency,
  69. ctx: s,
  70. }
  71. }
  72. func (s *Session) Do(query *Query) *Query {
  73. q := *query
  74. q.ctx = s
  75. return &q
  76. }
  77. func (s *Session) Close() {
  78. return
  79. }
  80. func (s *Session) executeQuery(query *Query) (frame, error) {
  81. node := s.pool.Pick(query)
  82. if node == nil {
  83. <-time.After(s.cfg.Timeout)
  84. node = s.pool.Pick(query)
  85. }
  86. if node == nil {
  87. return nil, ErrNoHostAvailable
  88. }
  89. return node.conn.executeQuery(query)
  90. }
  91. type Node struct {
  92. conn *Conn
  93. }
  94. type Query struct {
  95. stmt string
  96. args []interface{}
  97. cons Consistency
  98. ctx queryContext
  99. }
  100. func NewQuery(stmt string) *Query {
  101. return &Query{stmt: stmt, cons: ConQuorum}
  102. }
  103. func (q *Query) Exec() error {
  104. if q.ctx == nil {
  105. return ErrQueryUnbound
  106. }
  107. frame, err := q.ctx.executeQuery(q)
  108. if err != nil {
  109. return err
  110. } else if frame[3] == opError {
  111. return frame.readErrorFrame()
  112. } else if frame[3] != opResult {
  113. return ErrProtocol
  114. }
  115. return nil
  116. }
  117. func (q *Query) Iter() *Iter {
  118. if q.ctx == nil {
  119. return &Iter{err: ErrQueryUnbound}
  120. }
  121. frame, err := q.ctx.executeQuery(q)
  122. if err != nil {
  123. return &Iter{err: err}
  124. } else if frame[3] == opError {
  125. return &Iter{err: frame.readErrorFrame()}
  126. } else if frame[3] != opResult {
  127. return &Iter{err: ErrProtocol}
  128. }
  129. iter := new(Iter)
  130. iter.readFrame(frame)
  131. return iter
  132. }
  133. func (q *Query) Scan(values ...interface{}) error {
  134. found := false
  135. iter := q.Iter()
  136. if iter.Scan(values...) {
  137. found = true
  138. }
  139. if err := iter.Close(); err != nil {
  140. return err
  141. } else if !found {
  142. return ErrNotFound
  143. }
  144. return nil
  145. }
  146. func (q *Query) Consistency(cons Consistency) *Query {
  147. q.cons = cons
  148. return q
  149. }
  150. type Iter struct {
  151. err error
  152. pos int
  153. values [][]byte
  154. info []columnInfo
  155. }
  156. func (iter *Iter) readFrame(frame frame) {
  157. defer func() {
  158. if r := recover(); r != nil {
  159. if e, ok := r.(error); ok && e == ErrProtocol {
  160. iter.err = e
  161. return
  162. }
  163. panic(r)
  164. }
  165. }()
  166. frame.skipHeader()
  167. iter.pos = 0
  168. iter.err = nil
  169. iter.values = nil
  170. if frame.readInt() != resultKindRows {
  171. return
  172. }
  173. iter.info = frame.readMetaData()
  174. numRows := frame.readInt()
  175. iter.values = make([][]byte, numRows*len(iter.info))
  176. for i := 0; i < len(iter.values); i++ {
  177. iter.values[i] = frame.readBytes()
  178. }
  179. }
  180. func (iter *Iter) Scan(values ...interface{}) bool {
  181. if iter.err != nil || iter.pos >= len(iter.values) {
  182. return false
  183. }
  184. if len(values) != len(iter.info) {
  185. iter.err = errors.New("count mismatch")
  186. return false
  187. }
  188. for i := 0; i < len(values); i++ {
  189. err := Unmarshal(iter.info[i].TypeInfo, iter.values[i+iter.pos], values[i])
  190. if err != nil {
  191. iter.err = err
  192. return false
  193. }
  194. }
  195. iter.pos += len(values)
  196. return true
  197. }
  198. func (iter *Iter) Close() error {
  199. return iter.err
  200. }
  201. type queryContext interface {
  202. executeQuery(query *Query) (frame, error)
  203. }
  204. type columnInfo struct {
  205. Keyspace string
  206. Table string
  207. Name string
  208. TypeInfo *TypeInfo
  209. }
  210. type Consistency uint16
  211. const (
  212. ConAny Consistency = 0x0000
  213. ConOne Consistency = 0x0001
  214. ConTwo Consistency = 0x0002
  215. ConThree Consistency = 0x0003
  216. ConQuorum Consistency = 0x0004
  217. ConAll Consistency = 0x0005
  218. ConLocalQuorum Consistency = 0x0006
  219. ConEachQuorum Consistency = 0x0007
  220. ConSerial Consistency = 0x0008
  221. ConLocalSerial Consistency = 0x0009
  222. )
  223. type Error struct {
  224. Code int
  225. Message string
  226. }
  227. func (e Error) Error() string {
  228. return e.Message
  229. }
  230. var (
  231. ErrNotFound = errors.New("not found")
  232. ErrNoHostAvailable = errors.New("no host available")
  233. ErrQueryUnbound = errors.New("can not execute unbound query")
  234. ErrProtocol = errors.New("protocol error")
  235. )
  236. type node struct {
  237. conn *Conn
  238. }