conn.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.google.com/p/snappy-go/snappy"
  11. )
  12. const defaultFrameSize = 4096
  13. const flagResponse = 0x80
  14. const maskVersion = 0x7F
  15. type Cluster interface {
  16. //HandleAuth(addr, method string) ([]byte, Challenger, error)
  17. HandleError(conn *Conn, err error, closed bool)
  18. HandleKeyspace(conn *Conn, keyspace string)
  19. // Authenticate(addr string)
  20. }
  21. /* type Challenger interface {
  22. Challenge(data []byte) ([]byte, error)
  23. } */
  24. type ConnConfig struct {
  25. ProtoVersion int
  26. CQLVersion string
  27. Timeout time.Duration
  28. NumStreams int
  29. Compressor Compressor
  30. }
  31. // Conn is a single connection to a Cassandra node. It can be used to execute
  32. // queries, but users are usually advised to use a more reliable, higher
  33. // level API.
  34. type Conn struct {
  35. conn net.Conn
  36. timeout time.Duration
  37. uniq chan uint8
  38. calls []callReq
  39. nwait int32
  40. prepMu sync.Mutex
  41. prep map[string]*queryInfo
  42. cluster Cluster
  43. compressor Compressor
  44. addr string
  45. version uint8
  46. }
  47. // Connect establishes a connection to a Cassandra node.
  48. // You must also call the Serve method before you can execute any queries.
  49. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  50. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  51. if err != nil {
  52. return nil, err
  53. }
  54. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  55. cfg.NumStreams = 128
  56. }
  57. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  58. cfg.ProtoVersion = 2
  59. }
  60. c := &Conn{
  61. conn: conn,
  62. uniq: make(chan uint8, cfg.NumStreams),
  63. calls: make([]callReq, cfg.NumStreams),
  64. prep: make(map[string]*queryInfo),
  65. timeout: cfg.Timeout,
  66. version: uint8(cfg.ProtoVersion),
  67. addr: conn.RemoteAddr().String(),
  68. cluster: cluster,
  69. compressor: cfg.Compressor,
  70. }
  71. for i := 0; i < cap(c.uniq); i++ {
  72. c.uniq <- uint8(i)
  73. }
  74. if err := c.startup(&cfg); err != nil {
  75. return nil, err
  76. }
  77. go c.serve()
  78. return c, nil
  79. }
  80. func (c *Conn) startup(cfg *ConnConfig) error {
  81. req := make(frame, headerSize, defaultFrameSize)
  82. req.setHeader(c.version, 0, 0, opStartup)
  83. m := map[string]string{
  84. "CQL_VERSION": cfg.CQLVersion,
  85. }
  86. if c.compressor != nil {
  87. m["COMPRESSION"] = c.compressor.Name()
  88. }
  89. req.writeStringMap(m)
  90. resp, err := c.callSimple(req)
  91. if err != nil {
  92. return err
  93. }
  94. switch x := resp.(type) {
  95. case readyFrame:
  96. case error:
  97. return x
  98. default:
  99. return ErrProtocol
  100. }
  101. return nil
  102. }
  103. // Serve starts the stream multiplexer for this connection, which is required
  104. // to execute any queries. This method runs as long as the connection is
  105. // open and is therefore usually called in a separate goroutine.
  106. func (c *Conn) serve() {
  107. for {
  108. resp, err := c.recv()
  109. if err != nil {
  110. break
  111. }
  112. c.dispatch(resp)
  113. }
  114. c.conn.Close()
  115. for id := 0; id < len(c.calls); id++ {
  116. req := &c.calls[id]
  117. if atomic.LoadInt32(&req.active) == 1 {
  118. req.resp <- callResp{nil, ErrProtocol}
  119. }
  120. }
  121. c.cluster.HandleError(c, ErrProtocol, true)
  122. }
  123. func (c *Conn) recv() (frame, error) {
  124. resp := make(frame, headerSize, headerSize+512)
  125. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  126. n, last, pinged := 0, 0, false
  127. for n < len(resp) {
  128. nn, err := c.conn.Read(resp[n:])
  129. n += nn
  130. if err != nil {
  131. if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
  132. if n > last {
  133. // we hit the deadline but we made progress.
  134. // simply extend the deadline
  135. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  136. last = n
  137. } else if n == 0 && !pinged {
  138. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  139. if atomic.LoadInt32(&c.nwait) > 0 {
  140. go c.ping()
  141. pinged = true
  142. }
  143. } else {
  144. return nil, err
  145. }
  146. } else {
  147. return nil, err
  148. }
  149. }
  150. if n == headerSize && len(resp) == headerSize {
  151. if resp[0] != c.version|flagResponse {
  152. return nil, ErrProtocol
  153. }
  154. resp.grow(resp.Length())
  155. }
  156. }
  157. return resp, nil
  158. }
  159. func (c *Conn) callSimple(req frame) (interface{}, error) {
  160. req.setLength(len(req) - headerSize)
  161. if _, err := c.conn.Write(req); err != nil {
  162. c.conn.Close()
  163. return nil, err
  164. }
  165. buf, err := c.recv()
  166. if err != nil {
  167. return nil, err
  168. }
  169. return c.decodeFrame(buf)
  170. }
  171. func (c *Conn) call(req frame) (interface{}, error) {
  172. id := <-c.uniq
  173. req[2] = id
  174. call := &c.calls[id]
  175. call.resp = make(chan callResp, 1)
  176. atomic.AddInt32(&c.nwait, 1)
  177. atomic.StoreInt32(&call.active, 1)
  178. req.setLength(len(req) - headerSize)
  179. if len(req) > headerSize && c.compressor != nil {
  180. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  181. if err != nil {
  182. return nil, err
  183. }
  184. req = append(req[:headerSize], frame(body)...)
  185. req[1] |= flagCompress
  186. req.setLength(len(req) - headerSize)
  187. }
  188. if n, err := c.conn.Write(req); err != nil {
  189. c.conn.Close()
  190. if n > 0 {
  191. return nil, ErrProtocol
  192. }
  193. return nil, ErrUnavailable
  194. }
  195. reply := <-call.resp
  196. call.resp = nil
  197. c.uniq <- id
  198. if reply.err != nil {
  199. return nil, reply.err
  200. }
  201. return c.decodeFrame(reply.buf)
  202. }
  203. func (c *Conn) dispatch(resp frame) {
  204. id := int(resp[2])
  205. if id >= len(c.calls) {
  206. return
  207. }
  208. call := &c.calls[id]
  209. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  210. return
  211. }
  212. atomic.AddInt32(&c.nwait, -1)
  213. call.resp <- callResp{resp, nil}
  214. }
  215. func (c *Conn) ping() error {
  216. req := make(frame, headerSize)
  217. req.setHeader(c.version, 0, 0, opOptions)
  218. _, err := c.call(req)
  219. return err
  220. }
  221. func (c *Conn) prepareStatement(stmt string) (*queryInfo, error) {
  222. c.prepMu.Lock()
  223. info := c.prep[stmt]
  224. if info != nil {
  225. c.prepMu.Unlock()
  226. info.wg.Wait()
  227. return info, nil
  228. }
  229. info = new(queryInfo)
  230. info.wg.Add(1)
  231. c.prep[stmt] = info
  232. c.prepMu.Unlock()
  233. frame := make(frame, headerSize, defaultFrameSize)
  234. frame.setHeader(c.version, 0, 0, opPrepare)
  235. frame.writeLongString(stmt)
  236. frame.setLength(len(frame) - headerSize)
  237. resp, err := c.call(frame)
  238. if err != nil {
  239. return nil, err
  240. }
  241. switch x := resp.(type) {
  242. case resultPreparedFrame:
  243. info.id = x.PreparedId
  244. info.args = x.Values
  245. info.wg.Done()
  246. case error:
  247. return nil, x
  248. default:
  249. return nil, ErrProtocol
  250. }
  251. return info, nil
  252. }
  253. func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
  254. var info *queryInfo
  255. if len(qry.Args) > 0 {
  256. var err error
  257. info, err = c.prepareStatement(qry.Stmt)
  258. if err != nil {
  259. return nil, err
  260. }
  261. }
  262. req := make(frame, headerSize, defaultFrameSize)
  263. if info == nil {
  264. req.setHeader(c.version, 0, 0, opQuery)
  265. req.writeLongString(qry.Stmt)
  266. req.writeConsistency(qry.Cons)
  267. if c.version > 1 {
  268. req.writeByte(0)
  269. }
  270. } else {
  271. req.setHeader(c.version, 0, 0, opExecute)
  272. req.writeShortBytes(info.id)
  273. if c.version == 1 {
  274. req.writeShort(uint16(len(qry.Args)))
  275. } else {
  276. req.writeConsistency(qry.Cons)
  277. flags := uint8(0)
  278. if len(qry.Args) > 0 {
  279. flags |= flagQueryValues
  280. }
  281. req.writeByte(flags)
  282. if flags&flagQueryValues != 0 {
  283. req.writeShort(uint16(len(qry.Args)))
  284. }
  285. }
  286. for i := 0; i < len(qry.Args); i++ {
  287. val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
  288. if err != nil {
  289. return nil, err
  290. }
  291. req.writeBytes(val)
  292. }
  293. if c.version == 1 {
  294. req.writeConsistency(qry.Cons)
  295. }
  296. }
  297. resp, err := c.call(req)
  298. if err != nil {
  299. return nil, err
  300. }
  301. switch x := resp.(type) {
  302. case resultVoidFrame:
  303. return &Iter{}, nil
  304. case resultRowsFrame:
  305. return &Iter{columns: x.Columns, rows: x.Rows}, nil
  306. case resultKeyspaceFrame:
  307. c.cluster.HandleKeyspace(c, x.Keyspace)
  308. return &Iter{}, nil
  309. case error:
  310. return &Iter{err: x}, nil
  311. }
  312. return nil, ErrProtocol
  313. }
  314. func (c *Conn) ExecuteBatch(batch *Batch) error {
  315. if c.version == 1 {
  316. return ErrProtocol
  317. }
  318. frame := make(frame, headerSize, defaultFrameSize)
  319. frame.setHeader(c.version, 0, 0, opBatch)
  320. frame.writeByte(byte(batch.Type))
  321. frame.writeShort(uint16(len(batch.Entries)))
  322. for i := 0; i < len(batch.Entries); i++ {
  323. entry := &batch.Entries[i]
  324. var info *queryInfo
  325. if len(entry.Args) > 0 {
  326. var err error
  327. info, err = c.prepareStatement(entry.Stmt)
  328. if err != nil {
  329. return err
  330. }
  331. frame.writeByte(1)
  332. frame.writeShortBytes(info.id)
  333. } else {
  334. frame.writeByte(0)
  335. frame.writeLongString(entry.Stmt)
  336. }
  337. frame.writeShort(uint16(len(entry.Args)))
  338. for j := 0; j < len(entry.Args); j++ {
  339. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  340. if err != nil {
  341. return err
  342. }
  343. frame.writeBytes(val)
  344. }
  345. }
  346. frame.writeConsistency(batch.Cons)
  347. resp, err := c.call(frame)
  348. if err != nil {
  349. return err
  350. }
  351. switch x := resp.(type) {
  352. case resultVoidFrame:
  353. case error:
  354. return x
  355. default:
  356. return ErrProtocol
  357. }
  358. return nil
  359. }
  360. func (c *Conn) Close() {
  361. c.conn.Close()
  362. }
  363. func (c *Conn) Address() string {
  364. return c.addr
  365. }
  366. func (c *Conn) UseKeyspace(keyspace string) error {
  367. frame := make(frame, headerSize, defaultFrameSize)
  368. frame.setHeader(c.version, 0, 0, opQuery)
  369. frame.writeLongString("USE " + keyspace)
  370. frame.writeConsistency(1)
  371. frame.writeByte(0)
  372. resp, err := c.call(frame)
  373. if err != nil {
  374. return err
  375. }
  376. switch x := resp.(type) {
  377. case resultKeyspaceFrame:
  378. case error:
  379. return x
  380. default:
  381. return ErrProtocol
  382. }
  383. return nil
  384. }
  385. func (c *Conn) decodeFrame(f frame) (rval interface{}, err error) {
  386. defer func() {
  387. if r := recover(); r != nil {
  388. if e, ok := r.(error); ok && e == ErrProtocol {
  389. err = e
  390. return
  391. }
  392. panic(r)
  393. }
  394. }()
  395. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  396. return nil, ErrProtocol
  397. }
  398. flags, op, f := f[1], f[3], f[headerSize:]
  399. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  400. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  401. return nil, err
  402. } else {
  403. f = frame(buf)
  404. }
  405. }
  406. switch op {
  407. case opReady:
  408. return readyFrame{}, nil
  409. case opResult:
  410. switch kind := f.readInt(); kind {
  411. case resultKindVoid:
  412. return resultVoidFrame{}, nil
  413. case resultKindRows:
  414. columns := f.readMetaData()
  415. numRows := f.readInt()
  416. values := make([][]byte, numRows*len(columns))
  417. for i := 0; i < len(values); i++ {
  418. values[i] = f.readBytes()
  419. }
  420. rows := make([][][]byte, numRows)
  421. for i := 0; i < len(values); i += len(columns) {
  422. rows[i] = values[i : i+len(columns)]
  423. }
  424. return resultRowsFrame{columns, rows, nil}, nil
  425. case resultKindKeyspace:
  426. keyspace := f.readString()
  427. return resultKeyspaceFrame{keyspace}, nil
  428. case resultKindPrepared:
  429. id := f.readShortBytes()
  430. values := f.readMetaData()
  431. return resultPreparedFrame{id, values}, nil
  432. case resultKindSchemaChanged:
  433. return resultVoidFrame{}, nil
  434. default:
  435. return nil, ErrProtocol
  436. }
  437. case opError:
  438. code := f.readInt()
  439. msg := f.readString()
  440. return errorFrame{code, msg}, nil
  441. default:
  442. return nil, ErrProtocol
  443. }
  444. }
  445. type queryInfo struct {
  446. id []byte
  447. args []ColumnInfo
  448. rval []ColumnInfo
  449. wg sync.WaitGroup
  450. }
  451. type callReq struct {
  452. active int32
  453. resp chan callResp
  454. }
  455. type callResp struct {
  456. buf frame
  457. err error
  458. }
  459. type Compressor interface {
  460. Name() string
  461. Encode(data []byte) ([]byte, error)
  462. Decode(data []byte) ([]byte, error)
  463. }
  464. type SnappyCompressor struct{}
  465. func (s SnappyCompressor) Name() string {
  466. return "snappy"
  467. }
  468. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  469. return snappy.Encode(nil, data)
  470. }
  471. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  472. return snappy.Decode(nil, data)
  473. }