conn.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "code.google.com/p/snappy-go/snappy"
  12. )
  13. const defaultFrameSize = 4096
  14. const flagResponse = 0x80
  15. const maskVersion = 0x7F
  16. type Cluster interface {
  17. //HandleAuth(addr, method string) ([]byte, Challenger, error)
  18. HandleError(conn *Conn, err error, closed bool)
  19. HandleKeyspace(conn *Conn, keyspace string)
  20. // Authenticate(addr string)
  21. }
  22. /* type Challenger interface {
  23. Challenge(data []byte) ([]byte, error)
  24. } */
  25. type ConnConfig struct {
  26. ProtoVersion int
  27. CQLVersion string
  28. Timeout time.Duration
  29. NumStreams int
  30. Compressor Compressor
  31. }
  32. // Conn is a single connection to a Cassandra node. It can be used to execute
  33. // queries, but users are usually advised to use a more reliable, higher
  34. // level API.
  35. type Conn struct {
  36. conn net.Conn
  37. r *bufio.Reader
  38. timeout time.Duration
  39. uniq chan uint8
  40. calls []callReq
  41. nwait int32
  42. prepMu sync.Mutex
  43. prep map[string]*inflightPrepare
  44. cluster Cluster
  45. compressor Compressor
  46. addr string
  47. version uint8
  48. }
  49. // Connect establishes a connection to a Cassandra node.
  50. // You must also call the Serve method before you can execute any queries.
  51. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  52. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  53. if err != nil {
  54. return nil, err
  55. }
  56. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  57. cfg.NumStreams = 128
  58. }
  59. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  60. cfg.ProtoVersion = 2
  61. }
  62. c := &Conn{
  63. conn: conn,
  64. r: bufio.NewReader(conn),
  65. uniq: make(chan uint8, cfg.NumStreams),
  66. calls: make([]callReq, cfg.NumStreams),
  67. prep: make(map[string]*inflightPrepare),
  68. timeout: cfg.Timeout,
  69. version: uint8(cfg.ProtoVersion),
  70. addr: conn.RemoteAddr().String(),
  71. cluster: cluster,
  72. compressor: cfg.Compressor,
  73. }
  74. for i := 0; i < cap(c.uniq); i++ {
  75. c.uniq <- uint8(i)
  76. }
  77. if err := c.startup(&cfg); err != nil {
  78. return nil, err
  79. }
  80. go c.serve()
  81. return c, nil
  82. }
  83. func (c *Conn) startup(cfg *ConnConfig) error {
  84. req := &startupFrame{
  85. CQLVersion: cfg.CQLVersion,
  86. }
  87. if c.compressor != nil {
  88. req.Compression = c.compressor.Name()
  89. }
  90. resp, err := c.execSimple(req)
  91. if err != nil {
  92. return err
  93. }
  94. switch x := resp.(type) {
  95. case readyFrame:
  96. case error:
  97. return x
  98. default:
  99. return ErrProtocol
  100. }
  101. return nil
  102. }
  103. // Serve starts the stream multiplexer for this connection, which is required
  104. // to execute any queries. This method runs as long as the connection is
  105. // open and is therefore usually called in a separate goroutine.
  106. func (c *Conn) serve() {
  107. for {
  108. resp, err := c.recv()
  109. if err != nil {
  110. break
  111. }
  112. c.dispatch(resp)
  113. }
  114. c.conn.Close()
  115. for id := 0; id < len(c.calls); id++ {
  116. req := &c.calls[id]
  117. if atomic.LoadInt32(&req.active) == 1 {
  118. req.resp <- callResp{nil, ErrProtocol}
  119. }
  120. }
  121. c.cluster.HandleError(c, ErrProtocol, true)
  122. }
  123. func (c *Conn) recv() (frame, error) {
  124. resp := make(frame, headerSize, headerSize+512)
  125. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  126. n, last, pinged := 0, 0, false
  127. for n < len(resp) {
  128. nn, err := c.r.Read(resp[n:])
  129. n += nn
  130. if err != nil {
  131. if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
  132. if n > last {
  133. // we hit the deadline but we made progress.
  134. // simply extend the deadline
  135. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  136. last = n
  137. } else if n == 0 && !pinged {
  138. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  139. if atomic.LoadInt32(&c.nwait) > 0 {
  140. go c.ping()
  141. pinged = true
  142. }
  143. } else {
  144. return nil, err
  145. }
  146. } else {
  147. return nil, err
  148. }
  149. }
  150. if n == headerSize && len(resp) == headerSize {
  151. if resp[0] != c.version|flagResponse {
  152. return nil, ErrProtocol
  153. }
  154. resp.grow(resp.Length())
  155. }
  156. }
  157. return resp, nil
  158. }
  159. func (c *Conn) execSimple(op operation) (interface{}, error) {
  160. f, err := op.encodeFrame(c.version, nil)
  161. f.setLength(len(f) - headerSize)
  162. if _, err := c.conn.Write([]byte(f)); err != nil {
  163. c.conn.Close()
  164. return nil, err
  165. }
  166. if f, err = c.recv(); err != nil {
  167. return nil, err
  168. }
  169. return c.decodeFrame(f, nil)
  170. }
  171. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  172. req, err := op.encodeFrame(c.version, nil)
  173. if err != nil {
  174. return nil, err
  175. }
  176. if trace != nil {
  177. req[1] |= flagTrace
  178. }
  179. if len(req) > headerSize && c.compressor != nil {
  180. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  181. if err != nil {
  182. return nil, err
  183. }
  184. req = append(req[:headerSize], frame(body)...)
  185. req[1] |= flagCompress
  186. }
  187. req.setLength(len(req) - headerSize)
  188. id := <-c.uniq
  189. req[2] = id
  190. call := &c.calls[id]
  191. call.resp = make(chan callResp, 1)
  192. atomic.AddInt32(&c.nwait, 1)
  193. atomic.StoreInt32(&call.active, 1)
  194. if n, err := c.conn.Write(req); err != nil {
  195. c.conn.Close()
  196. if n > 0 {
  197. return nil, ErrProtocol
  198. }
  199. return nil, ErrUnavailable
  200. }
  201. reply := <-call.resp
  202. call.resp = nil
  203. c.uniq <- id
  204. if reply.err != nil {
  205. return nil, reply.err
  206. }
  207. return c.decodeFrame(reply.buf, trace)
  208. }
  209. func (c *Conn) dispatch(resp frame) {
  210. id := int(resp[2])
  211. if id >= len(c.calls) {
  212. return
  213. }
  214. call := &c.calls[id]
  215. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  216. return
  217. }
  218. atomic.AddInt32(&c.nwait, -1)
  219. call.resp <- callResp{resp, nil}
  220. }
  221. func (c *Conn) ping() error {
  222. _, err := c.exec(&optionsFrame{}, nil)
  223. return err
  224. }
  225. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  226. c.prepMu.Lock()
  227. flight := c.prep[stmt]
  228. if flight != nil {
  229. c.prepMu.Unlock()
  230. flight.wg.Wait()
  231. return flight.info, flight.err
  232. }
  233. flight = new(inflightPrepare)
  234. flight.wg.Add(1)
  235. c.prep[stmt] = flight
  236. c.prepMu.Unlock()
  237. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  238. if err != nil {
  239. flight.err = err
  240. } else {
  241. switch x := resp.(type) {
  242. case resultPreparedFrame:
  243. flight.info = &queryInfo{
  244. id: x.PreparedId,
  245. args: x.Values,
  246. }
  247. case error:
  248. flight.err = x
  249. default:
  250. flight.err = ErrProtocol
  251. }
  252. }
  253. flight.wg.Done()
  254. if err != nil {
  255. c.prepMu.Lock()
  256. delete(c.prep, stmt)
  257. c.prepMu.Unlock()
  258. }
  259. return flight.info, flight.err
  260. }
  261. func (c *Conn) executeQuery(qry *Query) *Iter {
  262. op := &queryFrame{
  263. Stmt: qry.stmt,
  264. Cons: qry.cons,
  265. PageSize: qry.pageSize,
  266. PageState: qry.pageState,
  267. }
  268. if len(qry.values) > 0 {
  269. info, err := c.prepareStatement(qry.stmt, qry.trace)
  270. if err != nil {
  271. return &Iter{err: err}
  272. }
  273. op.Prepared = info.id
  274. op.Values = make([][]byte, len(qry.values))
  275. for i := 0; i < len(qry.values); i++ {
  276. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  277. if err != nil {
  278. return &Iter{err: err}
  279. }
  280. op.Values[i] = val
  281. }
  282. }
  283. resp, err := c.exec(op, qry.trace)
  284. if err != nil {
  285. return &Iter{err: err}
  286. }
  287. switch x := resp.(type) {
  288. case resultVoidFrame:
  289. return &Iter{}
  290. case resultRowsFrame:
  291. iter := &Iter{columns: x.Columns, rows: x.Rows}
  292. if len(x.PagingState) > 0 {
  293. iter.next = &nextIter{
  294. qry: *qry,
  295. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  296. }
  297. iter.next.qry.pageState = x.PagingState
  298. if iter.next.pos < 1 {
  299. iter.next.pos = 1
  300. }
  301. }
  302. return iter
  303. case resultKeyspaceFrame:
  304. c.cluster.HandleKeyspace(c, x.Keyspace)
  305. return &Iter{}
  306. case errorFrame:
  307. if x.Code == errUnprepared && len(qry.values) > 0 {
  308. c.prepMu.Lock()
  309. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  310. delete(c.prep, qry.stmt)
  311. c.prepMu.Unlock()
  312. return c.executeQuery(qry)
  313. }
  314. c.prepMu.Unlock()
  315. return &Iter{err: x}
  316. } else {
  317. return &Iter{err: x}
  318. }
  319. case error:
  320. return &Iter{err: x}
  321. default:
  322. return &Iter{err: ErrProtocol}
  323. }
  324. }
  325. func (c *Conn) Pick(qry *Query) *Conn {
  326. return c
  327. }
  328. func (c *Conn) Close() {
  329. c.conn.Close()
  330. }
  331. func (c *Conn) Address() string {
  332. return c.addr
  333. }
  334. func (c *Conn) UseKeyspace(keyspace string) error {
  335. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  336. if err != nil {
  337. return err
  338. }
  339. switch x := resp.(type) {
  340. case resultKeyspaceFrame:
  341. case error:
  342. return x
  343. default:
  344. return ErrProtocol
  345. }
  346. return nil
  347. }
  348. func (c *Conn) executeBatch(batch *Batch) error {
  349. if c.version == 1 {
  350. return ErrUnsupported
  351. }
  352. f := make(frame, headerSize, defaultFrameSize)
  353. f.setHeader(c.version, 0, 0, opBatch)
  354. f.writeByte(byte(batch.Type))
  355. f.writeShort(uint16(len(batch.Entries)))
  356. for i := 0; i < len(batch.Entries); i++ {
  357. entry := &batch.Entries[i]
  358. var info *queryInfo
  359. if len(entry.Args) > 0 {
  360. var err error
  361. info, err = c.prepareStatement(entry.Stmt, nil)
  362. if err != nil {
  363. return err
  364. }
  365. f.writeByte(1)
  366. f.writeShortBytes(info.id)
  367. } else {
  368. f.writeByte(0)
  369. f.writeLongString(entry.Stmt)
  370. }
  371. f.writeShort(uint16(len(entry.Args)))
  372. for j := 0; j < len(entry.Args); j++ {
  373. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  374. if err != nil {
  375. return err
  376. }
  377. f.writeBytes(val)
  378. }
  379. }
  380. f.writeConsistency(batch.Cons)
  381. resp, err := c.exec(f, nil)
  382. if err != nil {
  383. return err
  384. }
  385. switch x := resp.(type) {
  386. case resultVoidFrame:
  387. return nil
  388. case error:
  389. return x
  390. default:
  391. return ErrProtocol
  392. }
  393. }
  394. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  395. defer func() {
  396. if r := recover(); r != nil {
  397. if e, ok := r.(error); ok && e == ErrProtocol {
  398. err = e
  399. return
  400. }
  401. panic(r)
  402. }
  403. }()
  404. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  405. return nil, ErrProtocol
  406. }
  407. flags, op, f := f[1], f[3], f[headerSize:]
  408. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  409. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  410. return nil, err
  411. } else {
  412. f = frame(buf)
  413. }
  414. }
  415. if flags&flagTrace != 0 {
  416. if len(f) < 16 {
  417. return nil, ErrProtocol
  418. }
  419. traceId := []byte(f[:16])
  420. f = f[16:]
  421. trace.Trace(traceId)
  422. }
  423. switch op {
  424. case opReady:
  425. return readyFrame{}, nil
  426. case opResult:
  427. switch kind := f.readInt(); kind {
  428. case resultKindVoid:
  429. return resultVoidFrame{}, nil
  430. case resultKindRows:
  431. columns, pageState := f.readMetaData()
  432. numRows := f.readInt()
  433. values := make([][]byte, numRows*len(columns))
  434. for i := 0; i < len(values); i++ {
  435. values[i] = f.readBytes()
  436. }
  437. rows := make([][][]byte, numRows)
  438. for i := 0; i < numRows; i++ {
  439. rows[i], values = values[:len(columns)], values[len(columns):]
  440. }
  441. return resultRowsFrame{columns, rows, pageState}, nil
  442. case resultKindKeyspace:
  443. keyspace := f.readString()
  444. return resultKeyspaceFrame{keyspace}, nil
  445. case resultKindPrepared:
  446. id := f.readShortBytes()
  447. values, _ := f.readMetaData()
  448. return resultPreparedFrame{id, values}, nil
  449. case resultKindSchemaChanged:
  450. return resultVoidFrame{}, nil
  451. default:
  452. return nil, ErrProtocol
  453. }
  454. case opSupported:
  455. return supportedFrame{}, nil
  456. case opError:
  457. code := f.readInt()
  458. msg := f.readString()
  459. return errorFrame{code, msg}, nil
  460. default:
  461. return nil, ErrProtocol
  462. }
  463. }
  464. type queryInfo struct {
  465. id []byte
  466. args []ColumnInfo
  467. rval []ColumnInfo
  468. }
  469. type callReq struct {
  470. active int32
  471. resp chan callResp
  472. }
  473. type callResp struct {
  474. buf frame
  475. err error
  476. }
  477. type Compressor interface {
  478. Name() string
  479. Encode(data []byte) ([]byte, error)
  480. Decode(data []byte) ([]byte, error)
  481. }
  482. type inflightPrepare struct {
  483. info *queryInfo
  484. err error
  485. wg sync.WaitGroup
  486. }
  487. // SnappyCompressor implements the Compressor interface and can be used to
  488. // compress incoming and outgoing frames. The snappy compression algorithm
  489. // aims for very high speeds and reasonable compression.
  490. type SnappyCompressor struct{}
  491. func (s SnappyCompressor) Name() string {
  492. return "snappy"
  493. }
  494. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  495. return snappy.Encode(nil, data)
  496. }
  497. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  498. return snappy.Decode(nil, data)
  499. }