conn.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.google.com/p/snappy-go/snappy"
  11. )
  12. const defaultFrameSize = 4096
  13. const flagResponse = 0x80
  14. const maskVersion = 0x7F
  15. type Cluster interface {
  16. //HandleAuth(addr, method string) ([]byte, Challenger, error)
  17. HandleError(conn *Conn, err error, closed bool)
  18. HandleKeyspace(conn *Conn, keyspace string)
  19. // Authenticate(addr string)
  20. }
  21. /* type Challenger interface {
  22. Challenge(data []byte) ([]byte, error)
  23. } */
  24. type ConnConfig struct {
  25. ProtoVersion int
  26. CQLVersion string
  27. Timeout time.Duration
  28. NumStreams int
  29. Compressor Compressor
  30. }
  31. // Conn is a single connection to a Cassandra node. It can be used to execute
  32. // queries, but users are usually advised to use a more reliable, higher
  33. // level API.
  34. type Conn struct {
  35. conn net.Conn
  36. timeout time.Duration
  37. uniq chan uint8
  38. calls []callReq
  39. nwait int32
  40. prepMu sync.Mutex
  41. prep map[string]*queryInfo
  42. cluster Cluster
  43. compressor Compressor
  44. addr string
  45. version uint8
  46. }
  47. // Connect establishes a connection to a Cassandra node.
  48. // You must also call the Serve method before you can execute any queries.
  49. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  50. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  51. if err != nil {
  52. return nil, err
  53. }
  54. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  55. cfg.NumStreams = 128
  56. }
  57. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  58. cfg.ProtoVersion = 2
  59. }
  60. c := &Conn{
  61. conn: conn,
  62. uniq: make(chan uint8, cfg.NumStreams),
  63. calls: make([]callReq, cfg.NumStreams),
  64. prep: make(map[string]*queryInfo),
  65. timeout: cfg.Timeout,
  66. version: uint8(cfg.ProtoVersion),
  67. addr: conn.RemoteAddr().String(),
  68. cluster: cluster,
  69. compressor: cfg.Compressor,
  70. }
  71. for i := 0; i < cap(c.uniq); i++ {
  72. c.uniq <- uint8(i)
  73. }
  74. if err := c.startup(&cfg); err != nil {
  75. return nil, err
  76. }
  77. go c.serve()
  78. return c, nil
  79. }
  80. func (c *Conn) startup(cfg *ConnConfig) error {
  81. req := &startupFrame{
  82. CQLVersion: cfg.CQLVersion,
  83. }
  84. if c.compressor != nil {
  85. req.Compression = c.compressor.Name()
  86. }
  87. resp, err := c.execSimple(req)
  88. if err != nil {
  89. return err
  90. }
  91. switch x := resp.(type) {
  92. case readyFrame:
  93. case error:
  94. return x
  95. default:
  96. return ErrProtocol
  97. }
  98. return nil
  99. }
  100. // Serve starts the stream multiplexer for this connection, which is required
  101. // to execute any queries. This method runs as long as the connection is
  102. // open and is therefore usually called in a separate goroutine.
  103. func (c *Conn) serve() {
  104. var (
  105. err error
  106. resp frame
  107. )
  108. for {
  109. resp, err = c.recv()
  110. if err != nil {
  111. break
  112. }
  113. c.dispatch(resp)
  114. }
  115. c.conn.Close()
  116. for id := 0; id < len(c.calls); id++ {
  117. req := &c.calls[id]
  118. if atomic.LoadInt32(&req.active) == 1 {
  119. req.resp <- callResp{nil, err}
  120. }
  121. }
  122. c.cluster.HandleError(c, err, true)
  123. }
  124. func (c *Conn) recv() (frame, error) {
  125. resp := make(frame, headerSize, headerSize+512)
  126. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  127. n, last, pinged := 0, 0, false
  128. for n < len(resp) {
  129. nn, err := c.conn.Read(resp[n:])
  130. n += nn
  131. if err != nil {
  132. if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
  133. if n > last {
  134. // we hit the deadline but we made progress.
  135. // simply extend the deadline
  136. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  137. last = n
  138. } else if n == 0 && !pinged {
  139. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  140. if atomic.LoadInt32(&c.nwait) > 0 {
  141. go c.ping()
  142. pinged = true
  143. }
  144. } else {
  145. return nil, err
  146. }
  147. } else {
  148. return nil, err
  149. }
  150. }
  151. if n == headerSize && len(resp) == headerSize {
  152. if resp[0] != c.version|flagResponse {
  153. return nil, ErrProtocol
  154. }
  155. resp.grow(resp.Length())
  156. }
  157. }
  158. return resp, nil
  159. }
  160. func (c *Conn) execSimple(op operation) (interface{}, error) {
  161. f, err := op.encodeFrame(c.version, nil)
  162. f.setLength(len(f) - headerSize)
  163. if _, err := c.conn.Write([]byte(f)); err != nil {
  164. c.conn.Close()
  165. return nil, err
  166. }
  167. if f, err = c.recv(); err != nil {
  168. return nil, err
  169. }
  170. return c.decodeFrame(f, nil)
  171. }
  172. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  173. req, err := op.encodeFrame(c.version, nil)
  174. if err != nil {
  175. return nil, err
  176. }
  177. if trace != nil {
  178. req[1] |= flagTrace
  179. }
  180. if len(req) > headerSize && c.compressor != nil {
  181. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  182. if err != nil {
  183. return nil, err
  184. }
  185. req = append(req[:headerSize], frame(body)...)
  186. req[1] |= flagCompress
  187. }
  188. req.setLength(len(req) - headerSize)
  189. id := <-c.uniq
  190. req[2] = id
  191. call := &c.calls[id]
  192. call.resp = make(chan callResp, 1)
  193. atomic.AddInt32(&c.nwait, 1)
  194. atomic.StoreInt32(&call.active, 1)
  195. if n, err := c.conn.Write(req); err != nil {
  196. c.conn.Close()
  197. if n > 0 {
  198. return nil, ErrProtocol
  199. }
  200. return nil, ErrUnavailable
  201. }
  202. reply := <-call.resp
  203. call.resp = nil
  204. c.uniq <- id
  205. if reply.err != nil {
  206. return nil, reply.err
  207. }
  208. return c.decodeFrame(reply.buf, trace)
  209. }
  210. func (c *Conn) dispatch(resp frame) {
  211. id := int(resp[2])
  212. if id >= len(c.calls) {
  213. return
  214. }
  215. call := &c.calls[id]
  216. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  217. return
  218. }
  219. atomic.AddInt32(&c.nwait, -1)
  220. call.resp <- callResp{resp, nil}
  221. }
  222. func (c *Conn) ping() error {
  223. _, err := c.exec(&optionsFrame{}, nil)
  224. return err
  225. }
  226. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  227. c.prepMu.Lock()
  228. info := c.prep[stmt]
  229. if info != nil {
  230. c.prepMu.Unlock()
  231. info.wg.Wait()
  232. return info, nil
  233. }
  234. info = new(queryInfo)
  235. info.wg.Add(1)
  236. c.prep[stmt] = info
  237. c.prepMu.Unlock()
  238. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  239. if err != nil {
  240. return nil, err
  241. }
  242. switch x := resp.(type) {
  243. case resultPreparedFrame:
  244. info.id = x.PreparedId
  245. info.args = x.Values
  246. info.wg.Done()
  247. case error:
  248. return nil, x
  249. default:
  250. return nil, ErrProtocol
  251. }
  252. return info, nil
  253. }
  254. func (c *Conn) executeQuery(qry *Query) *Iter {
  255. op := &queryFrame{
  256. Stmt: qry.stmt,
  257. Cons: qry.cons,
  258. PageSize: qry.pageSize,
  259. PageState: qry.pageState,
  260. }
  261. if len(qry.values) > 0 {
  262. info, err := c.prepareStatement(qry.stmt, qry.trace)
  263. if err != nil {
  264. return &Iter{err: err}
  265. }
  266. op.Prepared = info.id
  267. op.Values = make([][]byte, len(qry.values))
  268. for i := 0; i < len(qry.values); i++ {
  269. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  270. if err != nil {
  271. return &Iter{err: err}
  272. }
  273. op.Values[i] = val
  274. }
  275. }
  276. resp, err := c.exec(op, qry.trace)
  277. if err != nil {
  278. return &Iter{err: err}
  279. }
  280. switch x := resp.(type) {
  281. case resultVoidFrame:
  282. return &Iter{}
  283. case resultRowsFrame:
  284. iter := &Iter{columns: x.Columns, rows: x.Rows}
  285. if len(x.PagingState) > 0 {
  286. iter.next = &nextIter{
  287. qry: *qry,
  288. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  289. }
  290. iter.next.qry.pageState = x.PagingState
  291. if iter.next.pos < 1 {
  292. iter.next.pos = 1
  293. }
  294. }
  295. return iter
  296. case resultKeyspaceFrame:
  297. c.cluster.HandleKeyspace(c, x.Keyspace)
  298. return &Iter{}
  299. case errorFrame:
  300. if x.Code == errUnprepared && len(qry.values) > 0 {
  301. c.prepMu.Lock()
  302. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  303. delete(c.prep, qry.stmt)
  304. c.prepMu.Unlock()
  305. return c.executeQuery(qry)
  306. }
  307. c.prepMu.Unlock()
  308. return &Iter{err: x}
  309. } else {
  310. return &Iter{err: x}
  311. }
  312. case error:
  313. return &Iter{err: x}
  314. default:
  315. return &Iter{err: ErrProtocol}
  316. }
  317. }
  318. func (c *Conn) Pick(qry *Query) *Conn {
  319. return c
  320. }
  321. func (c *Conn) Close() {
  322. c.conn.Close()
  323. }
  324. func (c *Conn) Address() string {
  325. return c.addr
  326. }
  327. func (c *Conn) UseKeyspace(keyspace string) error {
  328. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  329. if err != nil {
  330. return err
  331. }
  332. switch x := resp.(type) {
  333. case resultKeyspaceFrame:
  334. case error:
  335. return x
  336. default:
  337. return ErrProtocol
  338. }
  339. return nil
  340. }
  341. func (c *Conn) executeBatch(batch *Batch) error {
  342. if c.version == 1 {
  343. return ErrUnsupported
  344. }
  345. f := make(frame, headerSize, defaultFrameSize)
  346. f.setHeader(c.version, 0, 0, opBatch)
  347. f.writeByte(byte(batch.Type))
  348. f.writeShort(uint16(len(batch.Entries)))
  349. for i := 0; i < len(batch.Entries); i++ {
  350. entry := &batch.Entries[i]
  351. var info *queryInfo
  352. if len(entry.Args) > 0 {
  353. var err error
  354. info, err = c.prepareStatement(entry.Stmt, nil)
  355. if err != nil {
  356. return err
  357. }
  358. f.writeByte(1)
  359. f.writeShortBytes(info.id)
  360. } else {
  361. f.writeByte(0)
  362. f.writeLongString(entry.Stmt)
  363. }
  364. f.writeShort(uint16(len(entry.Args)))
  365. for j := 0; j < len(entry.Args); j++ {
  366. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  367. if err != nil {
  368. return err
  369. }
  370. f.writeBytes(val)
  371. }
  372. }
  373. f.writeConsistency(batch.Cons)
  374. resp, err := c.exec(f, nil)
  375. if err != nil {
  376. return err
  377. }
  378. switch x := resp.(type) {
  379. case resultVoidFrame:
  380. return nil
  381. case error:
  382. return x
  383. default:
  384. return ErrProtocol
  385. }
  386. }
  387. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  388. defer func() {
  389. if r := recover(); r != nil {
  390. if e, ok := r.(error); ok && e == ErrProtocol {
  391. err = e
  392. return
  393. }
  394. panic(r)
  395. }
  396. }()
  397. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  398. return nil, ErrProtocol
  399. }
  400. flags, op, f := f[1], f[3], f[headerSize:]
  401. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  402. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  403. return nil, err
  404. } else {
  405. f = frame(buf)
  406. }
  407. }
  408. if flags&flagTrace != 0 {
  409. if len(f) < 16 {
  410. return nil, ErrProtocol
  411. }
  412. traceId := []byte(f[:16])
  413. f = f[16:]
  414. trace.Trace(traceId)
  415. }
  416. switch op {
  417. case opReady:
  418. return readyFrame{}, nil
  419. case opResult:
  420. switch kind := f.readInt(); kind {
  421. case resultKindVoid:
  422. return resultVoidFrame{}, nil
  423. case resultKindRows:
  424. columns, pageState := f.readMetaData()
  425. numRows := f.readInt()
  426. values := make([][]byte, numRows*len(columns))
  427. for i := 0; i < len(values); i++ {
  428. values[i] = f.readBytes()
  429. }
  430. rows := make([][][]byte, numRows)
  431. for i := 0; i < numRows; i++ {
  432. rows[i], values = values[:len(columns)], values[len(columns):]
  433. }
  434. return resultRowsFrame{columns, rows, pageState}, nil
  435. case resultKindKeyspace:
  436. keyspace := f.readString()
  437. return resultKeyspaceFrame{keyspace}, nil
  438. case resultKindPrepared:
  439. id := f.readShortBytes()
  440. values, _ := f.readMetaData()
  441. return resultPreparedFrame{id, values}, nil
  442. case resultKindSchemaChanged:
  443. return resultVoidFrame{}, nil
  444. default:
  445. return nil, ErrProtocol
  446. }
  447. case opSupported:
  448. return supportedFrame{}, nil
  449. case opError:
  450. code := f.readInt()
  451. msg := f.readString()
  452. return errorFrame{code, msg}, nil
  453. default:
  454. return nil, ErrProtocol
  455. }
  456. }
  457. type queryInfo struct {
  458. id []byte
  459. args []ColumnInfo
  460. rval []ColumnInfo
  461. wg sync.WaitGroup
  462. }
  463. type callReq struct {
  464. active int32
  465. resp chan callResp
  466. }
  467. type callResp struct {
  468. buf frame
  469. err error
  470. }
  471. type Compressor interface {
  472. Name() string
  473. Encode(data []byte) ([]byte, error)
  474. Decode(data []byte) ([]byte, error)
  475. }
  476. // SnappyCompressor implements the Compressor interface and can be used to
  477. // compress incoming and outgoing frames. The snappy compression algorithm
  478. // aims for very high speeds and reasonable compression.
  479. type SnappyCompressor struct{}
  480. func (s SnappyCompressor) Name() string {
  481. return "snappy"
  482. }
  483. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  484. return snappy.Encode(nil, data)
  485. }
  486. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  487. return snappy.Decode(nil, data)
  488. }