conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "unicode"
  14. "code.google.com/p/snappy-go/snappy"
  15. )
  16. const defaultFrameSize = 4096
  17. const flagResponse = 0x80
  18. const maskVersion = 0x7F
  19. type Cluster interface {
  20. HandleError(conn *Conn, err error, closed bool)
  21. HandleKeyspace(conn *Conn, keyspace string)
  22. }
  23. type Authenticator interface {
  24. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  25. Success(data []byte) error
  26. }
  27. type PasswordAuthenticator struct {
  28. Username string
  29. Password string
  30. }
  31. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  32. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  33. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  34. }
  35. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  36. resp[0] = 0
  37. copy(resp[1:], p.Username)
  38. resp[len(p.Username)+1] = 0
  39. copy(resp[2+len(p.Username):], p.Password)
  40. return resp, nil, nil
  41. }
  42. func (p PasswordAuthenticator) Success(data []byte) error {
  43. return nil
  44. }
  45. type ConnConfig struct {
  46. ProtoVersion int
  47. CQLVersion string
  48. Timeout time.Duration
  49. NumStreams int
  50. Compressor Compressor
  51. Authenticator Authenticator
  52. Keepalive time.Duration
  53. }
  54. // Conn is a single connection to a Cassandra node. It can be used to execute
  55. // queries, but users are usually advised to use a more reliable, higher
  56. // level API.
  57. type Conn struct {
  58. conn net.Conn
  59. r *bufio.Reader
  60. timeout time.Duration
  61. uniq chan uint8
  62. calls []callReq
  63. nwait int32
  64. prepMu sync.Mutex
  65. prep map[string]*inflightPrepare
  66. cluster Cluster
  67. compressor Compressor
  68. auth Authenticator
  69. addr string
  70. version uint8
  71. closedMu sync.RWMutex
  72. isClosed bool
  73. }
  74. // Connect establishes a connection to a Cassandra node.
  75. // You must also call the Serve method before you can execute any queries.
  76. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  77. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  78. if err != nil {
  79. return nil, err
  80. }
  81. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  82. cfg.NumStreams = 128
  83. }
  84. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  85. cfg.ProtoVersion = 2
  86. }
  87. c := &Conn{
  88. conn: conn,
  89. r: bufio.NewReader(conn),
  90. uniq: make(chan uint8, cfg.NumStreams),
  91. calls: make([]callReq, cfg.NumStreams),
  92. prep: make(map[string]*inflightPrepare),
  93. timeout: cfg.Timeout,
  94. version: uint8(cfg.ProtoVersion),
  95. addr: conn.RemoteAddr().String(),
  96. cluster: cluster,
  97. compressor: cfg.Compressor,
  98. auth: cfg.Authenticator,
  99. }
  100. if cfg.Keepalive > 0 {
  101. c.setKeepalive(cfg.Keepalive)
  102. }
  103. for i := 0; i < cap(c.uniq); i++ {
  104. c.uniq <- uint8(i)
  105. }
  106. if err := c.startup(&cfg); err != nil {
  107. conn.Close()
  108. return nil, err
  109. }
  110. go c.serve()
  111. return c, nil
  112. }
  113. func (c *Conn) startup(cfg *ConnConfig) error {
  114. compression := ""
  115. if c.compressor != nil {
  116. compression = c.compressor.Name()
  117. }
  118. var req operation = &startupFrame{
  119. CQLVersion: cfg.CQLVersion,
  120. Compression: compression,
  121. }
  122. var challenger Authenticator
  123. for {
  124. resp, err := c.execSimple(req)
  125. if err != nil {
  126. return err
  127. }
  128. switch x := resp.(type) {
  129. case readyFrame:
  130. return nil
  131. case error:
  132. return x
  133. case authenticateFrame:
  134. if c.auth == nil {
  135. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  136. }
  137. var resp []byte
  138. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  139. if err != nil {
  140. return err
  141. }
  142. req = &authResponseFrame{resp}
  143. case authChallengeFrame:
  144. if challenger == nil {
  145. return fmt.Errorf("authentication error (invalid challenge)")
  146. }
  147. var resp []byte
  148. resp, challenger, err = challenger.Challenge(x.Data)
  149. if err != nil {
  150. return err
  151. }
  152. req = &authResponseFrame{resp}
  153. case authSuccessFrame:
  154. if challenger != nil {
  155. return challenger.Success(x.Data)
  156. }
  157. return nil
  158. default:
  159. return ErrProtocol
  160. }
  161. }
  162. }
  163. // Serve starts the stream multiplexer for this connection, which is required
  164. // to execute any queries. This method runs as long as the connection is
  165. // open and is therefore usually called in a separate goroutine.
  166. func (c *Conn) serve() {
  167. var (
  168. err error
  169. resp frame
  170. )
  171. for {
  172. resp, err = c.recv()
  173. if err != nil {
  174. break
  175. }
  176. c.dispatch(resp)
  177. }
  178. c.Close()
  179. for id := 0; id < len(c.calls); id++ {
  180. req := &c.calls[id]
  181. if atomic.LoadInt32(&req.active) == 1 {
  182. req.resp <- callResp{nil, err}
  183. }
  184. }
  185. c.cluster.HandleError(c, err, true)
  186. }
  187. func (c *Conn) recv() (frame, error) {
  188. resp := make(frame, headerSize, headerSize+512)
  189. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  190. n, last, pinged := 0, 0, false
  191. for n < len(resp) {
  192. nn, err := c.r.Read(resp[n:])
  193. n += nn
  194. if err != nil {
  195. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  196. if n > last {
  197. // we hit the deadline but we made progress.
  198. // simply extend the deadline
  199. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  200. last = n
  201. } else if n == 0 && !pinged {
  202. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  203. if atomic.LoadInt32(&c.nwait) > 0 {
  204. go c.ping()
  205. pinged = true
  206. }
  207. } else {
  208. return nil, err
  209. }
  210. } else {
  211. return nil, err
  212. }
  213. }
  214. if n == headerSize && len(resp) == headerSize {
  215. if resp[0] != c.version|flagResponse {
  216. return nil, ErrProtocol
  217. }
  218. resp.grow(resp.Length())
  219. }
  220. }
  221. return resp, nil
  222. }
  223. func (c *Conn) execSimple(op operation) (interface{}, error) {
  224. f, err := op.encodeFrame(c.version, nil)
  225. f.setLength(len(f) - headerSize)
  226. if _, err := c.conn.Write([]byte(f)); err != nil {
  227. c.Close()
  228. return nil, err
  229. }
  230. if f, err = c.recv(); err != nil {
  231. return nil, err
  232. }
  233. return c.decodeFrame(f, nil)
  234. }
  235. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  236. req, err := op.encodeFrame(c.version, nil)
  237. if err != nil {
  238. return nil, err
  239. }
  240. if trace != nil {
  241. req[1] |= flagTrace
  242. }
  243. if len(req) > headerSize && c.compressor != nil {
  244. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  245. if err != nil {
  246. return nil, err
  247. }
  248. req = append(req[:headerSize], frame(body)...)
  249. req[1] |= flagCompress
  250. }
  251. req.setLength(len(req) - headerSize)
  252. id := <-c.uniq
  253. req[2] = id
  254. call := &c.calls[id]
  255. call.resp = make(chan callResp, 1)
  256. atomic.AddInt32(&c.nwait, 1)
  257. atomic.StoreInt32(&call.active, 1)
  258. if _, err := c.conn.Write(req); err != nil {
  259. c.uniq <- id
  260. c.Close()
  261. return nil, err
  262. }
  263. reply := <-call.resp
  264. call.resp = nil
  265. c.uniq <- id
  266. if reply.err != nil {
  267. return nil, reply.err
  268. }
  269. return c.decodeFrame(reply.buf, trace)
  270. }
  271. func (c *Conn) dispatch(resp frame) {
  272. id := int(resp[2])
  273. if id >= len(c.calls) {
  274. return
  275. }
  276. call := &c.calls[id]
  277. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  278. return
  279. }
  280. atomic.AddInt32(&c.nwait, -1)
  281. call.resp <- callResp{resp, nil}
  282. }
  283. func (c *Conn) ping() error {
  284. _, err := c.exec(&optionsFrame{}, nil)
  285. return err
  286. }
  287. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  288. c.prepMu.Lock()
  289. flight := c.prep[stmt]
  290. if flight != nil {
  291. c.prepMu.Unlock()
  292. flight.wg.Wait()
  293. return flight.info, flight.err
  294. }
  295. flight = new(inflightPrepare)
  296. flight.wg.Add(1)
  297. c.prep[stmt] = flight
  298. c.prepMu.Unlock()
  299. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  300. if err != nil {
  301. flight.err = err
  302. } else {
  303. switch x := resp.(type) {
  304. case resultPreparedFrame:
  305. flight.info = &queryInfo{
  306. id: x.PreparedId,
  307. args: x.Values,
  308. }
  309. case error:
  310. flight.err = x
  311. default:
  312. flight.err = ErrProtocol
  313. }
  314. }
  315. flight.wg.Done()
  316. if err != nil {
  317. c.prepMu.Lock()
  318. delete(c.prep, stmt)
  319. c.prepMu.Unlock()
  320. }
  321. return flight.info, flight.err
  322. }
  323. func shouldPrepare(stmt string) bool {
  324. stmt = strings.TrimLeftFunc(strings.TrimRightFunc(stmt, func(r rune) bool {
  325. return unicode.IsSpace(r) || r == ';'
  326. }), unicode.IsSpace)
  327. var stmtType string
  328. if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
  329. stmtType = strings.ToLower(stmt[:n])
  330. }
  331. if stmtType == "begin" {
  332. if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
  333. stmtType = strings.ToLower(stmt[n+1:])
  334. }
  335. }
  336. switch stmtType {
  337. case "select", "insert", "update", "delete", "batch":
  338. return true
  339. }
  340. return false
  341. }
  342. func (c *Conn) executeQuery(qry *Query) *Iter {
  343. op := &queryFrame{
  344. Stmt: qry.stmt,
  345. Cons: qry.cons,
  346. PageSize: qry.pageSize,
  347. PageState: qry.pageState,
  348. }
  349. if shouldPrepare(op.Stmt) {
  350. // Prepare all DML queries. Other queries can not be prepared.
  351. info, err := c.prepareStatement(qry.stmt, qry.trace)
  352. if err != nil {
  353. return &Iter{err: err}
  354. }
  355. op.Prepared = info.id
  356. op.Values = make([][]byte, len(qry.values))
  357. for i := 0; i < len(qry.values); i++ {
  358. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  359. if err != nil {
  360. return &Iter{err: err}
  361. }
  362. op.Values[i] = val
  363. }
  364. }
  365. resp, err := c.exec(op, qry.trace)
  366. if err != nil {
  367. return &Iter{err: err}
  368. }
  369. switch x := resp.(type) {
  370. case resultVoidFrame:
  371. return &Iter{}
  372. case resultRowsFrame:
  373. iter := &Iter{columns: x.Columns, rows: x.Rows}
  374. if len(x.PagingState) > 0 {
  375. iter.next = &nextIter{
  376. qry: *qry,
  377. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  378. }
  379. iter.next.qry.pageState = x.PagingState
  380. if iter.next.pos < 1 {
  381. iter.next.pos = 1
  382. }
  383. }
  384. return iter
  385. case resultKeyspaceFrame:
  386. c.cluster.HandleKeyspace(c, x.Keyspace)
  387. return &Iter{}
  388. case errorFrame:
  389. if x.Code == errUnprepared && len(qry.values) > 0 {
  390. c.prepMu.Lock()
  391. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  392. delete(c.prep, qry.stmt)
  393. c.prepMu.Unlock()
  394. return c.executeQuery(qry)
  395. }
  396. c.prepMu.Unlock()
  397. return &Iter{err: x}
  398. } else {
  399. return &Iter{err: x}
  400. }
  401. case error:
  402. return &Iter{err: x}
  403. default:
  404. return &Iter{err: ErrProtocol}
  405. }
  406. }
  407. func (c *Conn) Pick(qry *Query) *Conn {
  408. if c.Closed() || len(c.uniq) == 0 {
  409. return nil
  410. }
  411. return c
  412. }
  413. func (c *Conn) Closed() bool {
  414. c.closedMu.RLock()
  415. closed := c.isClosed
  416. c.closedMu.RUnlock()
  417. return closed
  418. }
  419. func (c *Conn) Close() {
  420. c.closedMu.Lock()
  421. if c.isClosed {
  422. c.closedMu.Unlock()
  423. return
  424. }
  425. c.isClosed = true
  426. c.closedMu.Unlock()
  427. c.conn.Close()
  428. }
  429. func (c *Conn) Address() string {
  430. return c.addr
  431. }
  432. func (c *Conn) UseKeyspace(keyspace string) error {
  433. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  434. if err != nil {
  435. return err
  436. }
  437. switch x := resp.(type) {
  438. case resultKeyspaceFrame:
  439. case error:
  440. return x
  441. default:
  442. return ErrProtocol
  443. }
  444. return nil
  445. }
  446. func (c *Conn) executeBatch(batch *Batch) error {
  447. if c.version == 1 {
  448. return ErrUnsupported
  449. }
  450. f := make(frame, headerSize, defaultFrameSize)
  451. f.setHeader(c.version, 0, 0, opBatch)
  452. f.writeByte(byte(batch.Type))
  453. f.writeShort(uint16(len(batch.Entries)))
  454. for i := 0; i < len(batch.Entries); i++ {
  455. entry := &batch.Entries[i]
  456. var info *queryInfo
  457. if len(entry.Args) > 0 {
  458. var err error
  459. info, err = c.prepareStatement(entry.Stmt, nil)
  460. if err != nil {
  461. return err
  462. }
  463. f.writeByte(1)
  464. f.writeShortBytes(info.id)
  465. } else {
  466. f.writeByte(0)
  467. f.writeLongString(entry.Stmt)
  468. }
  469. f.writeShort(uint16(len(entry.Args)))
  470. for j := 0; j < len(entry.Args); j++ {
  471. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  472. if err != nil {
  473. return err
  474. }
  475. f.writeBytes(val)
  476. }
  477. }
  478. f.writeConsistency(batch.Cons)
  479. resp, err := c.exec(f, nil)
  480. if err != nil {
  481. return err
  482. }
  483. switch x := resp.(type) {
  484. case resultVoidFrame:
  485. return nil
  486. case error:
  487. return x
  488. default:
  489. return ErrProtocol
  490. }
  491. }
  492. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  493. defer func() {
  494. if r := recover(); r != nil {
  495. if e, ok := r.(error); ok && e == ErrProtocol {
  496. err = e
  497. return
  498. }
  499. panic(r)
  500. }
  501. }()
  502. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  503. return nil, ErrProtocol
  504. }
  505. flags, op, f := f[1], f[3], f[headerSize:]
  506. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  507. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  508. return nil, err
  509. } else {
  510. f = frame(buf)
  511. }
  512. }
  513. if flags&flagTrace != 0 {
  514. if len(f) < 16 {
  515. return nil, ErrProtocol
  516. }
  517. traceId := []byte(f[:16])
  518. f = f[16:]
  519. trace.Trace(traceId)
  520. }
  521. switch op {
  522. case opReady:
  523. return readyFrame{}, nil
  524. case opResult:
  525. switch kind := f.readInt(); kind {
  526. case resultKindVoid:
  527. return resultVoidFrame{}, nil
  528. case resultKindRows:
  529. columns, pageState := f.readMetaData()
  530. numRows := f.readInt()
  531. values := make([][]byte, numRows*len(columns))
  532. for i := 0; i < len(values); i++ {
  533. values[i] = f.readBytes()
  534. }
  535. rows := make([][][]byte, numRows)
  536. for i := 0; i < numRows; i++ {
  537. rows[i], values = values[:len(columns)], values[len(columns):]
  538. }
  539. return resultRowsFrame{columns, rows, pageState}, nil
  540. case resultKindKeyspace:
  541. keyspace := f.readString()
  542. return resultKeyspaceFrame{keyspace}, nil
  543. case resultKindPrepared:
  544. id := f.readShortBytes()
  545. values, _ := f.readMetaData()
  546. return resultPreparedFrame{id, values}, nil
  547. case resultKindSchemaChanged:
  548. return resultVoidFrame{}, nil
  549. default:
  550. return nil, ErrProtocol
  551. }
  552. case opAuthenticate:
  553. return authenticateFrame{f.readString()}, nil
  554. case opAuthChallenge:
  555. return authChallengeFrame{f.readBytes()}, nil
  556. case opAuthSuccess:
  557. return authSuccessFrame{f.readBytes()}, nil
  558. case opSupported:
  559. return supportedFrame{}, nil
  560. case opError:
  561. code := f.readInt()
  562. msg := f.readString()
  563. return errorFrame{code, msg}, nil
  564. default:
  565. return nil, ErrProtocol
  566. }
  567. }
  568. type queryInfo struct {
  569. id []byte
  570. args []ColumnInfo
  571. rval []ColumnInfo
  572. }
  573. type callReq struct {
  574. active int32
  575. resp chan callResp
  576. }
  577. type callResp struct {
  578. buf frame
  579. err error
  580. }
  581. type Compressor interface {
  582. Name() string
  583. Encode(data []byte) ([]byte, error)
  584. Decode(data []byte) ([]byte, error)
  585. }
  586. type inflightPrepare struct {
  587. info *queryInfo
  588. err error
  589. wg sync.WaitGroup
  590. }
  591. // SnappyCompressor implements the Compressor interface and can be used to
  592. // compress incoming and outgoing frames. The snappy compression algorithm
  593. // aims for very high speeds and reasonable compression.
  594. type SnappyCompressor struct{}
  595. func (s SnappyCompressor) Name() string {
  596. return "snappy"
  597. }
  598. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  599. return snappy.Encode(nil, data)
  600. }
  601. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  602. return snappy.Decode(nil, data)
  603. }