gocql.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // Copyright (c) 2012 by Christoph Hack <christoph@tux21b.org>
  2. // All rights reserved. Distributed under the Simplified BSD License.
  3. // The gocql package provides a database/sql driver for CQL, the Cassandra
  4. // query language.
  5. //
  6. // This package requires a recent version of Cassandra (≥ 1.2) that supports
  7. // CQL 3.0 and the new native protocol. The native protocol is still considered
  8. // beta and must be enabled manually in Cassandra 1.2 by setting
  9. // "start_native_transport" to true in conf/cassandra.yaml.
  10. //
  11. // Example Usage:
  12. //
  13. // db, err := sql.Open("gocql", "localhost:8000 keyspace=system")
  14. // // ...
  15. // rows, err := db.Query("SELECT keyspace_name FROM schema_keyspaces")
  16. // // ...
  17. // for rows.Next() {
  18. // var keyspace string
  19. // err = rows.Scan(&keyspace)
  20. // // ...
  21. // fmt.Println(keyspace)
  22. // }
  23. // if err := rows.Err(); err != nil {
  24. // // ...
  25. // }
  26. //
  27. package gocql
  28. import (
  29. "database/sql"
  30. "database/sql/driver"
  31. "encoding/binary"
  32. "fmt"
  33. "io"
  34. "net"
  35. "strings"
  36. )
  37. const (
  38. protoRequest byte = 0x01
  39. protoResponse byte = 0x81
  40. opError byte = 0x00
  41. opStartup byte = 0x01
  42. opReady byte = 0x02
  43. opAuthenticate byte = 0x03
  44. opCredentials byte = 0x04
  45. opOptions byte = 0x05
  46. opSupported byte = 0x06
  47. opQuery byte = 0x07
  48. opResult byte = 0x08
  49. opPrepare byte = 0x09
  50. opExecute byte = 0x0A
  51. flagCompressed byte = 0x01
  52. )
  53. type drv struct{}
  54. func (d drv) Open(name string) (driver.Conn, error) {
  55. return Open(name)
  56. }
  57. type connection struct {
  58. c net.Conn
  59. }
  60. func Open(name string) (*connection, error) {
  61. parts := strings.Split(name, " ")
  62. address := ""
  63. if len(parts) >= 1 {
  64. address = parts[0]
  65. }
  66. c, err := net.Dial("tcp", address)
  67. if err != nil {
  68. return nil, err
  69. }
  70. cn := &connection{c: c}
  71. version := []byte("3.0.0")
  72. body := make([]byte, 4+len(version))
  73. binary.BigEndian.PutUint16(body[0:2], uint16(len(version)))
  74. copy(body[2:len(body)-2], version)
  75. binary.BigEndian.PutUint16(body[len(body)-2:], 0)
  76. if err := cn.send(opStartup, body); err != nil {
  77. return nil, err
  78. }
  79. opcode, body, err := cn.recv()
  80. if err != nil {
  81. return nil, err
  82. }
  83. if opcode != opReady {
  84. return nil, fmt.Errorf("connection not ready")
  85. }
  86. keyspace := ""
  87. for i := 1; i < len(parts); i++ {
  88. switch {
  89. case parts[i] == "":
  90. continue
  91. case strings.HasPrefix(parts[i], "keyspace="):
  92. keyspace = parts[i][9:]
  93. default:
  94. return nil, fmt.Errorf("unsupported option %q", parts[i])
  95. }
  96. }
  97. if keyspace != "" {
  98. st, err := cn.Prepare(fmt.Sprintf("USE %s", keyspace))
  99. if err != nil {
  100. return nil, err
  101. }
  102. if _, err = st.Exec([]driver.Value{}); err != nil {
  103. return nil, err
  104. }
  105. }
  106. return cn, nil
  107. }
  108. func (cn *connection) send(opcode byte, body []byte) error {
  109. frame := make([]byte, len(body)+8)
  110. frame[0] = protoRequest
  111. frame[1] = 0
  112. frame[2] = 0
  113. frame[3] = opcode
  114. binary.BigEndian.PutUint32(frame[4:8], uint32(len(body)))
  115. copy(frame[8:], body)
  116. if _, err := cn.c.Write(frame); err != nil {
  117. return err
  118. }
  119. return nil
  120. }
  121. func (cn *connection) recv() (byte, []byte, error) {
  122. header := make([]byte, 8)
  123. if _, err := cn.c.Read(header); err != nil {
  124. return 0, nil, err
  125. }
  126. opcode := header[3]
  127. length := binary.BigEndian.Uint32(header[4:8])
  128. var body []byte
  129. if length > 0 {
  130. body = make([]byte, length)
  131. if _, err := cn.c.Read(body); err != nil {
  132. return 0, nil, err
  133. }
  134. }
  135. if opcode == opError {
  136. code := binary.BigEndian.Uint32(body[0:4])
  137. msglen := binary.BigEndian.Uint16(body[4:6])
  138. msg := string(body[6 : 6+msglen])
  139. return opcode, body, Error{Code: int(code), Msg: msg}
  140. }
  141. return opcode, body, nil
  142. }
  143. func (cn *connection) Begin() (driver.Tx, error) {
  144. return cn, nil
  145. }
  146. func (cn *connection) Commit() error {
  147. return nil
  148. }
  149. func (cn *connection) Close() error {
  150. return cn.c.Close()
  151. }
  152. func (cn *connection) Rollback() error {
  153. return nil
  154. }
  155. func (cn *connection) Prepare(query string) (driver.Stmt, error) {
  156. body := make([]byte, len(query)+4)
  157. binary.BigEndian.PutUint32(body[0:4], uint32(len(query)))
  158. copy(body[4:], []byte(query))
  159. if err := cn.send(opPrepare, body); err != nil {
  160. return nil, err
  161. }
  162. opcode, body, err := cn.recv()
  163. if err != nil {
  164. return nil, err
  165. }
  166. if opcode != opResult || binary.BigEndian.Uint32(body) != 4 {
  167. return nil, fmt.Errorf("expected prepared result")
  168. }
  169. prepared := int(binary.BigEndian.Uint32(body[4:]))
  170. columns, meta, _ := parseMeta(body[8:])
  171. return &statement{cn: cn, query: query,
  172. prepared: prepared, columns: columns, meta: meta}, nil
  173. }
  174. type statement struct {
  175. cn *connection
  176. query string
  177. prepared int
  178. columns []string
  179. meta []uint16
  180. }
  181. func (s *statement) Close() error {
  182. return nil
  183. }
  184. func (st *statement) ColumnConverter(idx int) driver.ValueConverter {
  185. return (&columnEncoder{st.meta}).ColumnConverter(idx)
  186. }
  187. func (st *statement) NumInput() int {
  188. return len(st.columns)
  189. }
  190. func parseMeta(body []byte) ([]string, []uint16, int) {
  191. flags := binary.BigEndian.Uint32(body)
  192. globalTableSpec := flags&1 == 1
  193. columnCount := int(binary.BigEndian.Uint32(body[4:]))
  194. i := 8
  195. if globalTableSpec {
  196. l := int(binary.BigEndian.Uint16(body[i:]))
  197. keyspace := string(body[i+2 : i+2+l])
  198. i += 2 + l
  199. l = int(binary.BigEndian.Uint16(body[i:]))
  200. tablename := string(body[i+2 : i+2+l])
  201. i += 2 + l
  202. _, _ = keyspace, tablename
  203. }
  204. columns := make([]string, columnCount)
  205. meta := make([]uint16, columnCount)
  206. for c := 0; c < columnCount; c++ {
  207. l := int(binary.BigEndian.Uint16(body[i:]))
  208. columns[c] = string(body[i+2 : i+2+l])
  209. i += 2 + l
  210. meta[c] = binary.BigEndian.Uint16(body[i:])
  211. i += 2
  212. }
  213. return columns, meta, i
  214. }
  215. func (st *statement) exec(v []driver.Value) error {
  216. sz := 8
  217. for i := range v {
  218. if b, ok := v[i].([]byte); ok {
  219. sz += len(b) + 4
  220. }
  221. }
  222. body, p := make([]byte, sz), 6
  223. binary.BigEndian.PutUint32(body, uint32(st.prepared))
  224. binary.BigEndian.PutUint16(body[4:], uint16(len(v)))
  225. for i := range v {
  226. b, ok := v[i].([]byte)
  227. if !ok {
  228. return fmt.Errorf("unsupported type %T at column %d", v[i], i)
  229. }
  230. binary.BigEndian.PutUint32(body[p:], uint32(len(b)))
  231. copy(body[p+4:], b)
  232. p += 4 + len(b)
  233. }
  234. if err := st.cn.send(opExecute, body); err != nil {
  235. return err
  236. }
  237. return nil
  238. }
  239. func (st *statement) Exec(v []driver.Value) (driver.Result, error) {
  240. if err := st.exec(v); err != nil {
  241. return nil, err
  242. }
  243. opcode, body, err := st.cn.recv()
  244. if err != nil {
  245. return nil, err
  246. }
  247. _, _ = opcode, body
  248. return nil, nil
  249. }
  250. func (st *statement) Query(v []driver.Value) (driver.Rows, error) {
  251. if err := st.exec(v); err != nil {
  252. return nil, err
  253. }
  254. opcode, body, err := st.cn.recv()
  255. if err != nil {
  256. return nil, err
  257. }
  258. kind := binary.BigEndian.Uint32(body[0:4])
  259. if opcode != opResult || kind != 2 {
  260. return nil, fmt.Errorf("expected rows as result")
  261. }
  262. columns, meta, n := parseMeta(body[4:])
  263. i := n + 4
  264. rows := &rows{
  265. columns: columns,
  266. meta: meta,
  267. numRows: int(binary.BigEndian.Uint32(body[i:])),
  268. }
  269. i += 4
  270. rows.body = body[i:]
  271. return rows, nil
  272. }
  273. type rows struct {
  274. columns []string
  275. meta []uint16
  276. body []byte
  277. row int
  278. numRows int
  279. }
  280. func (r *rows) Close() error {
  281. return nil
  282. }
  283. func (r *rows) Columns() []string {
  284. return r.columns
  285. }
  286. func (r *rows) Next(values []driver.Value) error {
  287. if r.row >= r.numRows {
  288. return io.EOF
  289. }
  290. for column := 0; column < len(r.columns); column++ {
  291. n := int(binary.BigEndian.Uint32(r.body))
  292. r.body = r.body[4:]
  293. if n >= 0 {
  294. values[column] = decode(r.body[:n], r.meta[column])
  295. r.body = r.body[n:]
  296. } else {
  297. fmt.Println(column, n)
  298. values[column] = nil
  299. }
  300. }
  301. r.row++
  302. return nil
  303. }
  304. type Error struct {
  305. Code int
  306. Msg string
  307. }
  308. func (e Error) Error() string {
  309. return e.Msg
  310. }
  311. func init() {
  312. sql.Register("gocql", &drv{})
  313. }