conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "unicode"
  14. "code.google.com/p/snappy-go/snappy"
  15. )
  16. const defaultFrameSize = 4096
  17. const flagResponse = 0x80
  18. const maskVersion = 0x7F
  19. type Cluster interface {
  20. HandleError(conn *Conn, err error, closed bool)
  21. }
  22. type Authenticator interface {
  23. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  24. Success(data []byte) error
  25. }
  26. type PasswordAuthenticator struct {
  27. Username string
  28. Password string
  29. }
  30. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  31. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  32. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  33. }
  34. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  35. resp[0] = 0
  36. copy(resp[1:], p.Username)
  37. resp[len(p.Username)+1] = 0
  38. copy(resp[2+len(p.Username):], p.Password)
  39. return resp, nil, nil
  40. }
  41. func (p PasswordAuthenticator) Success(data []byte) error {
  42. return nil
  43. }
  44. type ConnConfig struct {
  45. ProtoVersion int
  46. CQLVersion string
  47. Timeout time.Duration
  48. NumStreams int
  49. Compressor Compressor
  50. Authenticator Authenticator
  51. Keepalive time.Duration
  52. }
  53. // Conn is a single connection to a Cassandra node. It can be used to execute
  54. // queries, but users are usually advised to use a more reliable, higher
  55. // level API.
  56. type Conn struct {
  57. conn net.Conn
  58. r *bufio.Reader
  59. timeout time.Duration
  60. uniq chan uint8
  61. calls []callReq
  62. nwait int32
  63. prepMu sync.Mutex
  64. prep map[string]*inflightPrepare
  65. cluster Cluster
  66. compressor Compressor
  67. auth Authenticator
  68. addr string
  69. version uint8
  70. isClosed bool
  71. }
  72. // Connect establishes a connection to a Cassandra node.
  73. // You must also call the Serve method before you can execute any queries.
  74. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  75. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  76. if err != nil {
  77. return nil, err
  78. }
  79. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  80. cfg.NumStreams = 128
  81. }
  82. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  83. cfg.ProtoVersion = 2
  84. }
  85. c := &Conn{
  86. conn: conn,
  87. r: bufio.NewReader(conn),
  88. uniq: make(chan uint8, cfg.NumStreams),
  89. calls: make([]callReq, cfg.NumStreams),
  90. prep: make(map[string]*inflightPrepare),
  91. timeout: cfg.Timeout,
  92. version: uint8(cfg.ProtoVersion),
  93. addr: conn.RemoteAddr().String(),
  94. cluster: cluster,
  95. compressor: cfg.Compressor,
  96. isClosed: false,
  97. auth: cfg.Authenticator,
  98. }
  99. if cfg.Keepalive > 0 {
  100. c.setKeepalive(cfg.Keepalive)
  101. }
  102. for i := 0; i < cap(c.uniq); i++ {
  103. c.uniq <- uint8(i)
  104. }
  105. if err := c.startup(&cfg); err != nil {
  106. conn.Close()
  107. return nil, err
  108. }
  109. go c.serve()
  110. return c, nil
  111. }
  112. func (c *Conn) startup(cfg *ConnConfig) error {
  113. compression := ""
  114. if c.compressor != nil {
  115. compression = c.compressor.Name()
  116. }
  117. var req operation = &startupFrame{
  118. CQLVersion: cfg.CQLVersion,
  119. Compression: compression,
  120. }
  121. var challenger Authenticator
  122. for {
  123. resp, err := c.execSimple(req)
  124. if err != nil {
  125. return err
  126. }
  127. switch x := resp.(type) {
  128. case readyFrame:
  129. return nil
  130. case error:
  131. return x
  132. case authenticateFrame:
  133. if c.auth == nil {
  134. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  135. }
  136. var resp []byte
  137. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  138. if err != nil {
  139. return err
  140. }
  141. req = &authResponseFrame{resp}
  142. case authChallengeFrame:
  143. if challenger == nil {
  144. return fmt.Errorf("authentication error (invalid challenge)")
  145. }
  146. var resp []byte
  147. resp, challenger, err = challenger.Challenge(x.Data)
  148. if err != nil {
  149. return err
  150. }
  151. req = &authResponseFrame{resp}
  152. case authSuccessFrame:
  153. if challenger != nil {
  154. return challenger.Success(x.Data)
  155. }
  156. return nil
  157. default:
  158. return ErrProtocol
  159. }
  160. }
  161. }
  162. // Serve starts the stream multiplexer for this connection, which is required
  163. // to execute any queries. This method runs as long as the connection is
  164. // open and is therefore usually called in a separate goroutine.
  165. func (c *Conn) serve() {
  166. var (
  167. err error
  168. resp frame
  169. )
  170. for {
  171. resp, err = c.recv()
  172. if err != nil {
  173. break
  174. }
  175. c.dispatch(resp)
  176. }
  177. c.Close()
  178. for id := 0; id < len(c.calls); id++ {
  179. req := &c.calls[id]
  180. if atomic.LoadInt32(&req.active) == 1 {
  181. req.resp <- callResp{nil, err}
  182. }
  183. }
  184. c.cluster.HandleError(c, err, true)
  185. }
  186. func (c *Conn) recv() (frame, error) {
  187. resp := make(frame, headerSize, headerSize+512)
  188. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  189. n, last, pinged := 0, 0, false
  190. for n < len(resp) {
  191. nn, err := c.r.Read(resp[n:])
  192. n += nn
  193. if err != nil {
  194. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  195. if n > last {
  196. // we hit the deadline but we made progress.
  197. // simply extend the deadline
  198. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  199. last = n
  200. } else if n == 0 && !pinged {
  201. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  202. if atomic.LoadInt32(&c.nwait) > 0 {
  203. go c.ping()
  204. pinged = true
  205. }
  206. } else {
  207. return nil, err
  208. }
  209. } else {
  210. return nil, err
  211. }
  212. }
  213. if n == headerSize && len(resp) == headerSize {
  214. if resp[0] != c.version|flagResponse {
  215. return nil, ErrProtocol
  216. }
  217. resp.grow(resp.Length())
  218. }
  219. }
  220. return resp, nil
  221. }
  222. func (c *Conn) execSimple(op operation) (interface{}, error) {
  223. f, err := op.encodeFrame(c.version, nil)
  224. f.setLength(len(f) - headerSize)
  225. if _, err := c.conn.Write([]byte(f)); err != nil {
  226. c.Close()
  227. return nil, err
  228. }
  229. if f, err = c.recv(); err != nil {
  230. return nil, err
  231. }
  232. return c.decodeFrame(f, nil)
  233. }
  234. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  235. req, err := op.encodeFrame(c.version, nil)
  236. if err != nil {
  237. return nil, err
  238. }
  239. if trace != nil {
  240. req[1] |= flagTrace
  241. }
  242. if len(req) > headerSize && c.compressor != nil {
  243. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  244. if err != nil {
  245. return nil, err
  246. }
  247. req = append(req[:headerSize], frame(body)...)
  248. req[1] |= flagCompress
  249. }
  250. req.setLength(len(req) - headerSize)
  251. id := <-c.uniq
  252. req[2] = id
  253. call := &c.calls[id]
  254. call.resp = make(chan callResp, 1)
  255. atomic.AddInt32(&c.nwait, 1)
  256. atomic.StoreInt32(&call.active, 1)
  257. if _, err := c.conn.Write(req); err != nil {
  258. c.uniq <- id
  259. c.Close()
  260. return nil, err
  261. }
  262. reply := <-call.resp
  263. call.resp = nil
  264. c.uniq <- id
  265. if reply.err != nil {
  266. return nil, reply.err
  267. }
  268. return c.decodeFrame(reply.buf, trace)
  269. }
  270. func (c *Conn) dispatch(resp frame) {
  271. id := int(resp[2])
  272. if id >= len(c.calls) {
  273. return
  274. }
  275. call := &c.calls[id]
  276. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  277. return
  278. }
  279. atomic.AddInt32(&c.nwait, -1)
  280. call.resp <- callResp{resp, nil}
  281. }
  282. func (c *Conn) ping() error {
  283. _, err := c.exec(&optionsFrame{}, nil)
  284. return err
  285. }
  286. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  287. c.prepMu.Lock()
  288. flight := c.prep[stmt]
  289. if flight != nil {
  290. c.prepMu.Unlock()
  291. flight.wg.Wait()
  292. return flight.info, flight.err
  293. }
  294. flight = new(inflightPrepare)
  295. flight.wg.Add(1)
  296. c.prep[stmt] = flight
  297. c.prepMu.Unlock()
  298. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  299. if err != nil {
  300. flight.err = err
  301. } else {
  302. switch x := resp.(type) {
  303. case resultPreparedFrame:
  304. flight.info = &queryInfo{
  305. id: x.PreparedId,
  306. args: x.Values,
  307. }
  308. case error:
  309. flight.err = x
  310. default:
  311. flight.err = ErrProtocol
  312. }
  313. }
  314. flight.wg.Done()
  315. if err != nil {
  316. c.prepMu.Lock()
  317. delete(c.prep, stmt)
  318. c.prepMu.Unlock()
  319. }
  320. return flight.info, flight.err
  321. }
  322. func shouldPrepare(stmt string) bool {
  323. stmt = strings.TrimLeftFunc(strings.TrimRightFunc(stmt, func(r rune) bool {
  324. return unicode.IsSpace(r) || r == ';'
  325. }), unicode.IsSpace)
  326. var stmtType string
  327. if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
  328. stmtType = strings.ToLower(stmt[:n])
  329. }
  330. if stmtType == "begin" {
  331. if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
  332. stmtType = strings.ToLower(stmt[n+1:])
  333. }
  334. }
  335. switch stmtType {
  336. case "select", "insert", "update", "delete", "batch":
  337. return true
  338. }
  339. return false
  340. }
  341. func (c *Conn) executeQuery(qry *Query) *Iter {
  342. op := &queryFrame{
  343. Stmt: qry.stmt,
  344. Cons: qry.cons,
  345. PageSize: qry.pageSize,
  346. PageState: qry.pageState,
  347. }
  348. if shouldPrepare(op.Stmt) {
  349. // Prepare all DML queries. Other queries can not be prepared.
  350. info, err := c.prepareStatement(qry.stmt, qry.trace)
  351. if err != nil {
  352. return &Iter{err: err}
  353. }
  354. op.Prepared = info.id
  355. op.Values = make([][]byte, len(qry.values))
  356. for i := 0; i < len(qry.values); i++ {
  357. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  358. if err != nil {
  359. return &Iter{err: err}
  360. }
  361. op.Values[i] = val
  362. }
  363. }
  364. resp, err := c.exec(op, qry.trace)
  365. if err != nil {
  366. return &Iter{err: err}
  367. }
  368. switch x := resp.(type) {
  369. case resultVoidFrame:
  370. return &Iter{}
  371. case resultRowsFrame:
  372. iter := &Iter{columns: x.Columns, rows: x.Rows}
  373. if len(x.PagingState) > 0 {
  374. iter.next = &nextIter{
  375. qry: *qry,
  376. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  377. }
  378. iter.next.qry.pageState = x.PagingState
  379. if iter.next.pos < 1 {
  380. iter.next.pos = 1
  381. }
  382. }
  383. return iter
  384. case resultKeyspaceFrame:
  385. return &Iter{}
  386. case errorFrame:
  387. if x.Code == errUnprepared && len(qry.values) > 0 {
  388. c.prepMu.Lock()
  389. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  390. delete(c.prep, qry.stmt)
  391. c.prepMu.Unlock()
  392. return c.executeQuery(qry)
  393. }
  394. c.prepMu.Unlock()
  395. return &Iter{err: x}
  396. } else {
  397. return &Iter{err: x}
  398. }
  399. case error:
  400. return &Iter{err: x}
  401. default:
  402. return &Iter{err: ErrProtocol}
  403. }
  404. }
  405. func (c *Conn) Pick(qry *Query) *Conn {
  406. if c.isClosed || len(c.uniq) == 0 {
  407. return nil
  408. }
  409. return c
  410. }
  411. func (c *Conn) Close() {
  412. c.isClosed = true
  413. c.conn.Close()
  414. }
  415. func (c *Conn) Address() string {
  416. return c.addr
  417. }
  418. func (c *Conn) UseKeyspace(keyspace string) error {
  419. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  420. if err != nil {
  421. return err
  422. }
  423. switch x := resp.(type) {
  424. case resultKeyspaceFrame:
  425. case error:
  426. return x
  427. default:
  428. return ErrProtocol
  429. }
  430. return nil
  431. }
  432. func (c *Conn) executeBatch(batch *Batch) error {
  433. if c.version == 1 {
  434. return ErrUnsupported
  435. }
  436. f := make(frame, headerSize, defaultFrameSize)
  437. f.setHeader(c.version, 0, 0, opBatch)
  438. f.writeByte(byte(batch.Type))
  439. f.writeShort(uint16(len(batch.Entries)))
  440. for i := 0; i < len(batch.Entries); i++ {
  441. entry := &batch.Entries[i]
  442. var info *queryInfo
  443. if len(entry.Args) > 0 {
  444. var err error
  445. info, err = c.prepareStatement(entry.Stmt, nil)
  446. if err != nil {
  447. return err
  448. }
  449. f.writeByte(1)
  450. f.writeShortBytes(info.id)
  451. } else {
  452. f.writeByte(0)
  453. f.writeLongString(entry.Stmt)
  454. }
  455. f.writeShort(uint16(len(entry.Args)))
  456. for j := 0; j < len(entry.Args); j++ {
  457. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  458. if err != nil {
  459. return err
  460. }
  461. f.writeBytes(val)
  462. }
  463. }
  464. f.writeConsistency(batch.Cons)
  465. resp, err := c.exec(f, nil)
  466. if err != nil {
  467. return err
  468. }
  469. switch x := resp.(type) {
  470. case resultVoidFrame:
  471. return nil
  472. case error:
  473. return x
  474. default:
  475. return ErrProtocol
  476. }
  477. }
  478. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  479. defer func() {
  480. if r := recover(); r != nil {
  481. if e, ok := r.(error); ok && e == ErrProtocol {
  482. err = e
  483. return
  484. }
  485. panic(r)
  486. }
  487. }()
  488. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  489. return nil, ErrProtocol
  490. }
  491. flags, op, f := f[1], f[3], f[headerSize:]
  492. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  493. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  494. return nil, err
  495. } else {
  496. f = frame(buf)
  497. }
  498. }
  499. if flags&flagTrace != 0 {
  500. if len(f) < 16 {
  501. return nil, ErrProtocol
  502. }
  503. traceId := []byte(f[:16])
  504. f = f[16:]
  505. trace.Trace(traceId)
  506. }
  507. switch op {
  508. case opReady:
  509. return readyFrame{}, nil
  510. case opResult:
  511. switch kind := f.readInt(); kind {
  512. case resultKindVoid:
  513. return resultVoidFrame{}, nil
  514. case resultKindRows:
  515. columns, pageState := f.readMetaData()
  516. numRows := f.readInt()
  517. values := make([][]byte, numRows*len(columns))
  518. for i := 0; i < len(values); i++ {
  519. values[i] = f.readBytes()
  520. }
  521. rows := make([][][]byte, numRows)
  522. for i := 0; i < numRows; i++ {
  523. rows[i], values = values[:len(columns)], values[len(columns):]
  524. }
  525. return resultRowsFrame{columns, rows, pageState}, nil
  526. case resultKindKeyspace:
  527. keyspace := f.readString()
  528. return resultKeyspaceFrame{keyspace}, nil
  529. case resultKindPrepared:
  530. id := f.readShortBytes()
  531. values, _ := f.readMetaData()
  532. return resultPreparedFrame{id, values}, nil
  533. case resultKindSchemaChanged:
  534. return resultVoidFrame{}, nil
  535. default:
  536. return nil, ErrProtocol
  537. }
  538. case opAuthenticate:
  539. return authenticateFrame{f.readString()}, nil
  540. case opAuthChallenge:
  541. return authChallengeFrame{f.readBytes()}, nil
  542. case opAuthSuccess:
  543. return authSuccessFrame{f.readBytes()}, nil
  544. case opSupported:
  545. return supportedFrame{}, nil
  546. case opError:
  547. code := f.readInt()
  548. msg := f.readString()
  549. return errorFrame{code, msg}, nil
  550. default:
  551. return nil, ErrProtocol
  552. }
  553. }
  554. type queryInfo struct {
  555. id []byte
  556. args []ColumnInfo
  557. rval []ColumnInfo
  558. }
  559. type callReq struct {
  560. active int32
  561. resp chan callResp
  562. }
  563. type callResp struct {
  564. buf frame
  565. err error
  566. }
  567. type Compressor interface {
  568. Name() string
  569. Encode(data []byte) ([]byte, error)
  570. Decode(data []byte) ([]byte, error)
  571. }
  572. type inflightPrepare struct {
  573. info *queryInfo
  574. err error
  575. wg sync.WaitGroup
  576. }
  577. // SnappyCompressor implements the Compressor interface and can be used to
  578. // compress incoming and outgoing frames. The snappy compression algorithm
  579. // aims for very high speeds and reasonable compression.
  580. type SnappyCompressor struct{}
  581. func (s SnappyCompressor) Name() string {
  582. return "snappy"
  583. }
  584. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  585. return snappy.Encode(nil, data)
  586. }
  587. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  588. return snappy.Decode(nil, data)
  589. }