conn.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. const defaultFrameSize = 4096
  18. const flagResponse = 0x80
  19. const maskVersion = 0x7F
  20. type Authenticator interface {
  21. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  22. Success(data []byte) error
  23. }
  24. type PasswordAuthenticator struct {
  25. Username string
  26. Password string
  27. }
  28. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  29. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  30. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  31. }
  32. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  33. resp[0] = 0
  34. copy(resp[1:], p.Username)
  35. resp[len(p.Username)+1] = 0
  36. copy(resp[2+len(p.Username):], p.Password)
  37. return resp, nil, nil
  38. }
  39. func (p PasswordAuthenticator) Success(data []byte) error {
  40. return nil
  41. }
  42. type SslOptions struct {
  43. CertPath string
  44. KeyPath string
  45. CaPath string
  46. EnableHostVerification bool //most of the time people will want to not verify host they are connecting to
  47. }
  48. type ConnConfig struct {
  49. ProtoVersion int
  50. CQLVersion string
  51. Timeout time.Duration
  52. NumStreams int
  53. Compressor Compressor
  54. Authenticator Authenticator
  55. Keepalive time.Duration
  56. SslOpts *SslOptions
  57. }
  58. // Conn is a single connection to a Cassandra node. It can be used to execute
  59. // queries, but users are usually advised to use a more reliable, higher
  60. // level API.
  61. type Conn struct {
  62. conn net.Conn
  63. r *bufio.Reader
  64. timeout time.Duration
  65. uniq chan uint8
  66. calls []callReq
  67. nwait int32
  68. pool ConnectionPool
  69. compressor Compressor
  70. auth Authenticator
  71. addr string
  72. version uint8
  73. currentKeyspace string
  74. closedMu sync.RWMutex
  75. isClosed bool
  76. }
  77. // Connect establishes a connection to a Cassandra node.
  78. // You must also call the Serve method before you can execute any queries.
  79. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  80. var (
  81. err error
  82. conn net.Conn
  83. )
  84. if cfg.SslOpts != nil {
  85. pem, err := ioutil.ReadFile(cfg.SslOpts.CaPath)
  86. certPool := x509.NewCertPool()
  87. if !certPool.AppendCertsFromPEM(pem) {
  88. panic("Failed parsing or appending certs")
  89. }
  90. mycert, err := tls.LoadX509KeyPair(cfg.SslOpts.CertPath, cfg.SslOpts.KeyPath)
  91. if err != nil {
  92. panic(err)
  93. }
  94. config := tls.Config{
  95. Certificates: []tls.Certificate{mycert},
  96. RootCAs: certPool,
  97. }
  98. config.InsecureSkipVerify = !cfg.SslOpts.EnableHostVerification
  99. if conn, err = tls.Dial("tcp", addr, &config); err != nil {
  100. return nil, err
  101. }
  102. } else if conn, err = net.DialTimeout("tcp", addr, cfg.Timeout); err != nil {
  103. return nil, err
  104. }
  105. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  106. cfg.NumStreams = 128
  107. }
  108. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  109. cfg.ProtoVersion = 2
  110. }
  111. c := &Conn{
  112. conn: conn,
  113. r: bufio.NewReader(conn),
  114. uniq: make(chan uint8, cfg.NumStreams),
  115. calls: make([]callReq, cfg.NumStreams),
  116. timeout: cfg.Timeout,
  117. version: uint8(cfg.ProtoVersion),
  118. addr: conn.RemoteAddr().String(),
  119. pool: pool,
  120. compressor: cfg.Compressor,
  121. auth: cfg.Authenticator,
  122. }
  123. if cfg.Keepalive > 0 {
  124. c.setKeepalive(cfg.Keepalive)
  125. }
  126. for i := 0; i < cap(c.uniq); i++ {
  127. c.uniq <- uint8(i)
  128. }
  129. if err := c.startup(&cfg); err != nil {
  130. conn.Close()
  131. return nil, err
  132. }
  133. go c.serve()
  134. return c, nil
  135. }
  136. func (c *Conn) startup(cfg *ConnConfig) error {
  137. compression := ""
  138. if c.compressor != nil {
  139. compression = c.compressor.Name()
  140. }
  141. var req operation = &startupFrame{
  142. CQLVersion: cfg.CQLVersion,
  143. Compression: compression,
  144. }
  145. var challenger Authenticator
  146. for {
  147. resp, err := c.execSimple(req)
  148. if err != nil {
  149. return err
  150. }
  151. switch x := resp.(type) {
  152. case readyFrame:
  153. return nil
  154. case error:
  155. return x
  156. case authenticateFrame:
  157. if c.auth == nil {
  158. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  159. }
  160. var resp []byte
  161. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  162. if err != nil {
  163. return err
  164. }
  165. req = &authResponseFrame{resp}
  166. case authChallengeFrame:
  167. if challenger == nil {
  168. return fmt.Errorf("authentication error (invalid challenge)")
  169. }
  170. var resp []byte
  171. resp, challenger, err = challenger.Challenge(x.Data)
  172. if err != nil {
  173. return err
  174. }
  175. req = &authResponseFrame{resp}
  176. case authSuccessFrame:
  177. if challenger != nil {
  178. return challenger.Success(x.Data)
  179. }
  180. return nil
  181. default:
  182. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  183. }
  184. }
  185. }
  186. // Serve starts the stream multiplexer for this connection, which is required
  187. // to execute any queries. This method runs as long as the connection is
  188. // open and is therefore usually called in a separate goroutine.
  189. func (c *Conn) serve() {
  190. var (
  191. err error
  192. resp frame
  193. )
  194. for {
  195. resp, err = c.recv()
  196. if err != nil {
  197. break
  198. }
  199. c.dispatch(resp)
  200. }
  201. c.Close()
  202. for id := 0; id < len(c.calls); id++ {
  203. req := &c.calls[id]
  204. if atomic.LoadInt32(&req.active) == 1 {
  205. req.resp <- callResp{nil, err}
  206. }
  207. }
  208. c.pool.HandleError(c, err, true)
  209. }
  210. func (c *Conn) recv() (frame, error) {
  211. resp := make(frame, headerSize, headerSize+512)
  212. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  213. n, last, pinged := 0, 0, false
  214. for n < len(resp) {
  215. nn, err := c.r.Read(resp[n:])
  216. n += nn
  217. if err != nil {
  218. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  219. if n > last {
  220. // we hit the deadline but we made progress.
  221. // simply extend the deadline
  222. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  223. last = n
  224. } else if n == 0 && !pinged {
  225. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  226. if atomic.LoadInt32(&c.nwait) > 0 {
  227. go c.ping()
  228. pinged = true
  229. }
  230. } else {
  231. return nil, err
  232. }
  233. } else {
  234. return nil, err
  235. }
  236. }
  237. if n == headerSize && len(resp) == headerSize {
  238. if resp[0] != c.version|flagResponse {
  239. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  240. }
  241. resp.grow(resp.Length())
  242. }
  243. }
  244. return resp, nil
  245. }
  246. func (c *Conn) execSimple(op operation) (interface{}, error) {
  247. f, err := op.encodeFrame(c.version, nil)
  248. f.setLength(len(f) - headerSize)
  249. if _, err := c.conn.Write([]byte(f)); err != nil {
  250. c.Close()
  251. return nil, err
  252. }
  253. if f, err = c.recv(); err != nil {
  254. return nil, err
  255. }
  256. return c.decodeFrame(f, nil)
  257. }
  258. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  259. req, err := op.encodeFrame(c.version, nil)
  260. if err != nil {
  261. return nil, err
  262. }
  263. if trace != nil {
  264. req[1] |= flagTrace
  265. }
  266. if len(req) > headerSize && c.compressor != nil {
  267. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  268. if err != nil {
  269. return nil, err
  270. }
  271. req = append(req[:headerSize], frame(body)...)
  272. req[1] |= flagCompress
  273. }
  274. req.setLength(len(req) - headerSize)
  275. id := <-c.uniq
  276. req[2] = id
  277. call := &c.calls[id]
  278. call.resp = make(chan callResp, 1)
  279. atomic.AddInt32(&c.nwait, 1)
  280. atomic.StoreInt32(&call.active, 1)
  281. if _, err := c.conn.Write(req); err != nil {
  282. c.uniq <- id
  283. c.Close()
  284. return nil, err
  285. }
  286. reply := <-call.resp
  287. call.resp = nil
  288. c.uniq <- id
  289. if reply.err != nil {
  290. return nil, reply.err
  291. }
  292. return c.decodeFrame(reply.buf, trace)
  293. }
  294. func (c *Conn) dispatch(resp frame) {
  295. id := int(resp[2])
  296. if id >= len(c.calls) {
  297. return
  298. }
  299. call := &c.calls[id]
  300. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  301. return
  302. }
  303. atomic.AddInt32(&c.nwait, -1)
  304. call.resp <- callResp{resp, nil}
  305. }
  306. func (c *Conn) ping() error {
  307. _, err := c.exec(&optionsFrame{}, nil)
  308. return err
  309. }
  310. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  311. stmtsLRU.mu.Lock()
  312. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  313. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  314. flight := val.(*inflightPrepare)
  315. stmtsLRU.mu.Unlock()
  316. flight.wg.Wait()
  317. return flight.info, flight.err
  318. }
  319. flight := new(inflightPrepare)
  320. flight.wg.Add(1)
  321. stmtsLRU.lru.Add(stmtCacheKey, flight)
  322. stmtsLRU.mu.Unlock()
  323. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  324. if err != nil {
  325. flight.err = err
  326. } else {
  327. switch x := resp.(type) {
  328. case resultPreparedFrame:
  329. flight.info = &QueryInfo{
  330. Id: x.PreparedId,
  331. Args: x.Arguments,
  332. Rval: x.ReturnValues,
  333. }
  334. case error:
  335. flight.err = x
  336. default:
  337. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  338. }
  339. err = flight.err
  340. }
  341. flight.wg.Done()
  342. if err != nil {
  343. stmtsLRU.mu.Lock()
  344. stmtsLRU.lru.Remove(stmtCacheKey)
  345. stmtsLRU.mu.Unlock()
  346. }
  347. return flight.info, flight.err
  348. }
  349. func (c *Conn) executeQuery(qry *Query) *Iter {
  350. op := &queryFrame{
  351. Stmt: qry.stmt,
  352. Cons: qry.cons,
  353. PageSize: qry.pageSize,
  354. PageState: qry.pageState,
  355. }
  356. if qry.shouldPrepare() {
  357. // Prepare all DML queries. Other queries can not be prepared.
  358. info, err := c.prepareStatement(qry.stmt, qry.trace)
  359. if err != nil {
  360. return &Iter{err: err}
  361. }
  362. var values []interface{}
  363. if qry.binding == nil {
  364. values = qry.values
  365. } else {
  366. values, err = qry.binding(info)
  367. if err != nil {
  368. return &Iter{err: err}
  369. }
  370. }
  371. if len(values) != len(info.Args) {
  372. return &Iter{err: ErrQueryArgLength}
  373. }
  374. op.Prepared = info.Id
  375. op.Values = make([][]byte, len(values))
  376. for i := 0; i < len(values); i++ {
  377. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  378. if err != nil {
  379. return &Iter{err: err}
  380. }
  381. op.Values[i] = val
  382. }
  383. }
  384. resp, err := c.exec(op, qry.trace)
  385. if err != nil {
  386. return &Iter{err: err}
  387. }
  388. switch x := resp.(type) {
  389. case resultVoidFrame:
  390. return &Iter{}
  391. case resultRowsFrame:
  392. iter := &Iter{columns: x.Columns, rows: x.Rows}
  393. if len(x.PagingState) > 0 {
  394. iter.next = &nextIter{
  395. qry: *qry,
  396. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  397. }
  398. iter.next.qry.pageState = x.PagingState
  399. if iter.next.pos < 1 {
  400. iter.next.pos = 1
  401. }
  402. }
  403. return iter
  404. case resultKeyspaceFrame:
  405. return &Iter{}
  406. case RequestErrUnprepared:
  407. stmtsLRU.mu.Lock()
  408. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  409. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  410. stmtsLRU.lru.Remove(stmtCacheKey)
  411. stmtsLRU.mu.Unlock()
  412. return c.executeQuery(qry)
  413. }
  414. stmtsLRU.mu.Unlock()
  415. return &Iter{err: x}
  416. case error:
  417. return &Iter{err: x}
  418. default:
  419. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  420. }
  421. }
  422. func (c *Conn) Pick(qry *Query) *Conn {
  423. if c.Closed() {
  424. return nil
  425. }
  426. return c
  427. }
  428. func (c *Conn) Closed() bool {
  429. c.closedMu.RLock()
  430. closed := c.isClosed
  431. c.closedMu.RUnlock()
  432. return closed
  433. }
  434. func (c *Conn) Close() {
  435. c.closedMu.Lock()
  436. if c.isClosed {
  437. c.closedMu.Unlock()
  438. return
  439. }
  440. c.isClosed = true
  441. c.closedMu.Unlock()
  442. c.conn.Close()
  443. }
  444. func (c *Conn) Address() string {
  445. return c.addr
  446. }
  447. func (c *Conn) UseKeyspace(keyspace string) error {
  448. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  449. if err != nil {
  450. return err
  451. }
  452. switch x := resp.(type) {
  453. case resultKeyspaceFrame:
  454. case error:
  455. return x
  456. default:
  457. return NewErrProtocol("Unknown type in response to USE: %s", x)
  458. }
  459. c.currentKeyspace = keyspace
  460. return nil
  461. }
  462. func (c *Conn) executeBatch(batch *Batch) error {
  463. if c.version == 1 {
  464. return ErrUnsupported
  465. }
  466. f := make(frame, headerSize, defaultFrameSize)
  467. f.setHeader(c.version, 0, 0, opBatch)
  468. f.writeByte(byte(batch.Type))
  469. f.writeShort(uint16(len(batch.Entries)))
  470. stmts := make(map[string]string)
  471. for i := 0; i < len(batch.Entries); i++ {
  472. entry := &batch.Entries[i]
  473. var info *QueryInfo
  474. var args []interface{}
  475. if len(entry.Args) > 0 || entry.binding != nil {
  476. var err error
  477. info, err = c.prepareStatement(entry.Stmt, nil)
  478. if err != nil {
  479. return err
  480. }
  481. if entry.binding == nil {
  482. args = entry.Args
  483. } else {
  484. args, err = entry.binding(info)
  485. if err != nil {
  486. return err
  487. }
  488. }
  489. if len(args) != len(info.Args) {
  490. return ErrQueryArgLength
  491. }
  492. stmts[string(info.Id)] = entry.Stmt
  493. f.writeByte(1)
  494. f.writeShortBytes(info.Id)
  495. } else {
  496. f.writeByte(0)
  497. f.writeLongString(entry.Stmt)
  498. }
  499. f.writeShort(uint16(len(args)))
  500. for j := 0; j < len(args); j++ {
  501. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  502. if err != nil {
  503. return err
  504. }
  505. f.writeBytes(val)
  506. }
  507. }
  508. f.writeConsistency(batch.Cons)
  509. resp, err := c.exec(f, nil)
  510. if err != nil {
  511. return err
  512. }
  513. switch x := resp.(type) {
  514. case resultVoidFrame:
  515. return nil
  516. case RequestErrUnprepared:
  517. stmt, found := stmts[string(x.StatementId)]
  518. if found {
  519. stmtsLRU.mu.Lock()
  520. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  521. stmtsLRU.mu.Unlock()
  522. }
  523. if found {
  524. return c.executeBatch(batch)
  525. } else {
  526. return x
  527. }
  528. case error:
  529. return x
  530. default:
  531. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  532. }
  533. }
  534. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  535. defer func() {
  536. if r := recover(); r != nil {
  537. if e, ok := r.(ErrProtocol); ok {
  538. err = e
  539. return
  540. }
  541. panic(r)
  542. }
  543. }()
  544. if len(f) < headerSize {
  545. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  546. } else if f[0] != c.version|flagResponse {
  547. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  548. }
  549. flags, op, f := f[1], f[3], f[headerSize:]
  550. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  551. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  552. return nil, err
  553. } else {
  554. f = frame(buf)
  555. }
  556. }
  557. if flags&flagTrace != 0 {
  558. if len(f) < 16 {
  559. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  560. }
  561. traceId := []byte(f[:16])
  562. f = f[16:]
  563. trace.Trace(traceId)
  564. }
  565. switch op {
  566. case opReady:
  567. return readyFrame{}, nil
  568. case opResult:
  569. switch kind := f.readInt(); kind {
  570. case resultKindVoid:
  571. return resultVoidFrame{}, nil
  572. case resultKindRows:
  573. columns, pageState := f.readMetaData()
  574. numRows := f.readInt()
  575. values := make([][]byte, numRows*len(columns))
  576. for i := 0; i < len(values); i++ {
  577. values[i] = f.readBytes()
  578. }
  579. rows := make([][][]byte, numRows)
  580. for i := 0; i < numRows; i++ {
  581. rows[i], values = values[:len(columns)], values[len(columns):]
  582. }
  583. return resultRowsFrame{columns, rows, pageState}, nil
  584. case resultKindKeyspace:
  585. keyspace := f.readString()
  586. return resultKeyspaceFrame{keyspace}, nil
  587. case resultKindPrepared:
  588. id := f.readShortBytes()
  589. args, _ := f.readMetaData()
  590. if c.version < 2 {
  591. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  592. }
  593. rvals, _ := f.readMetaData()
  594. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  595. case resultKindSchemaChanged:
  596. return resultVoidFrame{}, nil
  597. default:
  598. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  599. }
  600. case opAuthenticate:
  601. return authenticateFrame{f.readString()}, nil
  602. case opAuthChallenge:
  603. return authChallengeFrame{f.readBytes()}, nil
  604. case opAuthSuccess:
  605. return authSuccessFrame{f.readBytes()}, nil
  606. case opSupported:
  607. return supportedFrame{}, nil
  608. case opError:
  609. return f.readError(), nil
  610. default:
  611. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  612. }
  613. }
  614. func (c *Conn) setKeepalive(d time.Duration) error {
  615. if tc, ok := c.conn.(*net.TCPConn); ok {
  616. err := tc.SetKeepAlivePeriod(d)
  617. if err != nil {
  618. return err
  619. }
  620. return tc.SetKeepAlive(true)
  621. }
  622. return nil
  623. }
  624. // QueryInfo represents the meta data associated with a prepared CQL statement.
  625. type QueryInfo struct {
  626. Id []byte
  627. Args []ColumnInfo
  628. Rval []ColumnInfo
  629. }
  630. type callReq struct {
  631. active int32
  632. resp chan callResp
  633. }
  634. type callResp struct {
  635. buf frame
  636. err error
  637. }
  638. type inflightPrepare struct {
  639. info *QueryInfo
  640. err error
  641. wg sync.WaitGroup
  642. }
  643. var (
  644. ErrQueryArgLength = errors.New("query argument length mismatch")
  645. )