conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "github.com/golang/groupcache/lru"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const defaultFrameSize = 4096
  15. const flagResponse = 0x80
  16. const maskVersion = 0x7F
  17. //Package global reference to Prepared Statements LRU
  18. var stmtsLRU *preparedLRU
  19. //init houses could to initialize components related to connections like LRU for prepared statements
  20. func init() {
  21. stmtsLRU = &preparedLRU{lru: lru.New(10)}
  22. }
  23. type Authenticator interface {
  24. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  25. Success(data []byte) error
  26. }
  27. type PasswordAuthenticator struct {
  28. Username string
  29. Password string
  30. }
  31. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  32. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  33. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  34. }
  35. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  36. resp[0] = 0
  37. copy(resp[1:], p.Username)
  38. resp[len(p.Username)+1] = 0
  39. copy(resp[2+len(p.Username):], p.Password)
  40. return resp, nil, nil
  41. }
  42. func (p PasswordAuthenticator) Success(data []byte) error {
  43. return nil
  44. }
  45. type ConnConfig struct {
  46. ProtoVersion int
  47. CQLVersion string
  48. Timeout time.Duration
  49. NumStreams int
  50. Compressor Compressor
  51. Authenticator Authenticator
  52. Keepalive time.Duration
  53. }
  54. // Conn is a single connection to a Cassandra node. It can be used to execute
  55. // queries, but users are usually advised to use a more reliable, higher
  56. // level API.
  57. type Conn struct {
  58. conn net.Conn
  59. r *bufio.Reader
  60. timeout time.Duration
  61. uniq chan uint8
  62. calls []callReq
  63. nwait int32
  64. pool ConnectionPool
  65. compressor Compressor
  66. auth Authenticator
  67. addr string
  68. version uint8
  69. closedMu sync.RWMutex
  70. isClosed bool
  71. }
  72. // Connect establishes a connection to a Cassandra node.
  73. // You must also call the Serve method before you can execute any queries.
  74. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  75. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  76. if err != nil {
  77. return nil, err
  78. }
  79. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  80. cfg.NumStreams = 128
  81. }
  82. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  83. cfg.ProtoVersion = 2
  84. }
  85. c := &Conn{
  86. conn: conn,
  87. r: bufio.NewReader(conn),
  88. uniq: make(chan uint8, cfg.NumStreams),
  89. calls: make([]callReq, cfg.NumStreams),
  90. timeout: cfg.Timeout,
  91. version: uint8(cfg.ProtoVersion),
  92. addr: conn.RemoteAddr().String(),
  93. pool: pool,
  94. compressor: cfg.Compressor,
  95. auth: cfg.Authenticator,
  96. }
  97. if cfg.Keepalive > 0 {
  98. c.setKeepalive(cfg.Keepalive)
  99. }
  100. for i := 0; i < cap(c.uniq); i++ {
  101. c.uniq <- uint8(i)
  102. }
  103. if err := c.startup(&cfg); err != nil {
  104. conn.Close()
  105. return nil, err
  106. }
  107. go c.serve()
  108. return c, nil
  109. }
  110. func (c *Conn) startup(cfg *ConnConfig) error {
  111. compression := ""
  112. if c.compressor != nil {
  113. compression = c.compressor.Name()
  114. }
  115. var req operation = &startupFrame{
  116. CQLVersion: cfg.CQLVersion,
  117. Compression: compression,
  118. }
  119. var challenger Authenticator
  120. for {
  121. resp, err := c.execSimple(req)
  122. if err != nil {
  123. return err
  124. }
  125. switch x := resp.(type) {
  126. case readyFrame:
  127. return nil
  128. case error:
  129. return x
  130. case authenticateFrame:
  131. if c.auth == nil {
  132. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  133. }
  134. var resp []byte
  135. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  136. if err != nil {
  137. return err
  138. }
  139. req = &authResponseFrame{resp}
  140. case authChallengeFrame:
  141. if challenger == nil {
  142. return fmt.Errorf("authentication error (invalid challenge)")
  143. }
  144. var resp []byte
  145. resp, challenger, err = challenger.Challenge(x.Data)
  146. if err != nil {
  147. return err
  148. }
  149. req = &authResponseFrame{resp}
  150. case authSuccessFrame:
  151. if challenger != nil {
  152. return challenger.Success(x.Data)
  153. }
  154. return nil
  155. default:
  156. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  157. }
  158. }
  159. }
  160. // Serve starts the stream multiplexer for this connection, which is required
  161. // to execute any queries. This method runs as long as the connection is
  162. // open and is therefore usually called in a separate goroutine.
  163. func (c *Conn) serve() {
  164. var (
  165. err error
  166. resp frame
  167. )
  168. for {
  169. resp, err = c.recv()
  170. if err != nil {
  171. break
  172. }
  173. c.dispatch(resp)
  174. }
  175. c.Close()
  176. for id := 0; id < len(c.calls); id++ {
  177. req := &c.calls[id]
  178. if atomic.LoadInt32(&req.active) == 1 {
  179. req.resp <- callResp{nil, err}
  180. }
  181. }
  182. c.pool.HandleError(c, err, true)
  183. }
  184. func (c *Conn) recv() (frame, error) {
  185. resp := make(frame, headerSize, headerSize+512)
  186. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  187. n, last, pinged := 0, 0, false
  188. for n < len(resp) {
  189. nn, err := c.r.Read(resp[n:])
  190. n += nn
  191. if err != nil {
  192. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  193. if n > last {
  194. // we hit the deadline but we made progress.
  195. // simply extend the deadline
  196. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  197. last = n
  198. } else if n == 0 && !pinged {
  199. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  200. if atomic.LoadInt32(&c.nwait) > 0 {
  201. go c.ping()
  202. pinged = true
  203. }
  204. } else {
  205. return nil, err
  206. }
  207. } else {
  208. return nil, err
  209. }
  210. }
  211. if n == headerSize && len(resp) == headerSize {
  212. if resp[0] != c.version|flagResponse {
  213. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  214. }
  215. resp.grow(resp.Length())
  216. }
  217. }
  218. return resp, nil
  219. }
  220. func (c *Conn) execSimple(op operation) (interface{}, error) {
  221. f, err := op.encodeFrame(c.version, nil)
  222. f.setLength(len(f) - headerSize)
  223. if _, err := c.conn.Write([]byte(f)); err != nil {
  224. c.Close()
  225. return nil, err
  226. }
  227. if f, err = c.recv(); err != nil {
  228. return nil, err
  229. }
  230. return c.decodeFrame(f, nil)
  231. }
  232. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  233. req, err := op.encodeFrame(c.version, nil)
  234. if err != nil {
  235. return nil, err
  236. }
  237. if trace != nil {
  238. req[1] |= flagTrace
  239. }
  240. if len(req) > headerSize && c.compressor != nil {
  241. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  242. if err != nil {
  243. return nil, err
  244. }
  245. req = append(req[:headerSize], frame(body)...)
  246. req[1] |= flagCompress
  247. }
  248. req.setLength(len(req) - headerSize)
  249. id := <-c.uniq
  250. req[2] = id
  251. call := &c.calls[id]
  252. call.resp = make(chan callResp, 1)
  253. atomic.AddInt32(&c.nwait, 1)
  254. atomic.StoreInt32(&call.active, 1)
  255. if _, err := c.conn.Write(req); err != nil {
  256. c.uniq <- id
  257. c.Close()
  258. return nil, err
  259. }
  260. reply := <-call.resp
  261. call.resp = nil
  262. c.uniq <- id
  263. if reply.err != nil {
  264. return nil, reply.err
  265. }
  266. return c.decodeFrame(reply.buf, trace)
  267. }
  268. func (c *Conn) dispatch(resp frame) {
  269. id := int(resp[2])
  270. if id >= len(c.calls) {
  271. return
  272. }
  273. call := &c.calls[id]
  274. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  275. return
  276. }
  277. atomic.AddInt32(&c.nwait, -1)
  278. call.resp <- callResp{resp, nil}
  279. }
  280. func (c *Conn) ping() error {
  281. _, err := c.exec(&optionsFrame{}, nil)
  282. return err
  283. }
  284. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  285. stmtsLRU.mu.Lock()
  286. if val, ok := stmtsLRU.lru.Get(c.addr + stmt); ok {
  287. flight := val.(*inflightPrepare)
  288. stmtsLRU.mu.Unlock()
  289. flight.wg.Wait()
  290. return flight.info, flight.err
  291. }
  292. flight := new(inflightPrepare)
  293. flight.wg.Add(1)
  294. stmtsLRU.lru.Add(c.addr+stmt, flight)
  295. stmtsLRU.mu.Unlock()
  296. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  297. if err != nil {
  298. flight.err = err
  299. } else {
  300. switch x := resp.(type) {
  301. case resultPreparedFrame:
  302. flight.info = &queryInfo{
  303. id: x.PreparedId,
  304. args: x.Values,
  305. }
  306. case error:
  307. flight.err = x
  308. default:
  309. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  310. }
  311. }
  312. flight.wg.Done()
  313. if err != nil {
  314. stmtsLRU.mu.Lock()
  315. stmtsLRU.lru.Remove(c.addr + stmt)
  316. stmtsLRU.mu.Unlock()
  317. }
  318. return flight.info, flight.err
  319. }
  320. func (c *Conn) executeQuery(qry *Query) *Iter {
  321. op := &queryFrame{
  322. Stmt: qry.stmt,
  323. Cons: qry.cons,
  324. PageSize: qry.pageSize,
  325. PageState: qry.pageState,
  326. }
  327. if qry.shouldPrepare() {
  328. // Prepare all DML queries. Other queries can not be prepared.
  329. info, err := c.prepareStatement(qry.stmt, qry.trace)
  330. if err != nil {
  331. return &Iter{err: err}
  332. }
  333. op.Prepared = info.id
  334. op.Values = make([][]byte, len(qry.values))
  335. for i := 0; i < len(qry.values); i++ {
  336. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  337. if err != nil {
  338. return &Iter{err: err}
  339. }
  340. op.Values[i] = val
  341. }
  342. }
  343. resp, err := c.exec(op, qry.trace)
  344. if err != nil {
  345. return &Iter{err: err}
  346. }
  347. switch x := resp.(type) {
  348. case resultVoidFrame:
  349. return &Iter{}
  350. case resultRowsFrame:
  351. iter := &Iter{columns: x.Columns, rows: x.Rows}
  352. if len(x.PagingState) > 0 {
  353. iter.next = &nextIter{
  354. qry: *qry,
  355. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  356. }
  357. iter.next.qry.pageState = x.PagingState
  358. if iter.next.pos < 1 {
  359. iter.next.pos = 1
  360. }
  361. }
  362. return iter
  363. case resultKeyspaceFrame:
  364. return &Iter{}
  365. case RequestErrUnprepared:
  366. stmtsLRU.mu.Lock()
  367. if _, ok := stmtsLRU.lru.Get(c.addr + qry.stmt); ok {
  368. stmtsLRU.lru.Remove(c.addr + qry.stmt)
  369. stmtsLRU.mu.Unlock()
  370. return c.executeQuery(qry)
  371. }
  372. stmtsLRU.mu.Unlock()
  373. return &Iter{err: x}
  374. case error:
  375. return &Iter{err: x}
  376. default:
  377. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  378. }
  379. }
  380. func (c *Conn) Pick(qry *Query) *Conn {
  381. if c.Closed() {
  382. return nil
  383. }
  384. return c
  385. }
  386. func (c *Conn) Closed() bool {
  387. c.closedMu.RLock()
  388. closed := c.isClosed
  389. c.closedMu.RUnlock()
  390. return closed
  391. }
  392. func (c *Conn) Close() {
  393. c.closedMu.Lock()
  394. if c.isClosed {
  395. c.closedMu.Unlock()
  396. return
  397. }
  398. c.isClosed = true
  399. c.closedMu.Unlock()
  400. c.conn.Close()
  401. }
  402. func (c *Conn) Address() string {
  403. return c.addr
  404. }
  405. func (c *Conn) UseKeyspace(keyspace string) error {
  406. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  407. if err != nil {
  408. return err
  409. }
  410. switch x := resp.(type) {
  411. case resultKeyspaceFrame:
  412. case error:
  413. return x
  414. default:
  415. return NewErrProtocol("Unknown type in response to USE: %s", x)
  416. }
  417. return nil
  418. }
  419. func (c *Conn) executeBatch(batch *Batch) error {
  420. if c.version == 1 {
  421. return ErrUnsupported
  422. }
  423. f := make(frame, headerSize, defaultFrameSize)
  424. f.setHeader(c.version, 0, 0, opBatch)
  425. f.writeByte(byte(batch.Type))
  426. f.writeShort(uint16(len(batch.Entries)))
  427. stmts := make(map[string]string)
  428. for i := 0; i < len(batch.Entries); i++ {
  429. entry := &batch.Entries[i]
  430. var info *queryInfo
  431. if len(entry.Args) > 0 {
  432. var err error
  433. info, err = c.prepareStatement(entry.Stmt, nil)
  434. stmts[string(info.id)] = entry.Stmt
  435. if err != nil {
  436. return err
  437. }
  438. f.writeByte(1)
  439. f.writeShortBytes(info.id)
  440. } else {
  441. f.writeByte(0)
  442. f.writeLongString(entry.Stmt)
  443. }
  444. f.writeShort(uint16(len(entry.Args)))
  445. for j := 0; j < len(entry.Args); j++ {
  446. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  447. if err != nil {
  448. return err
  449. }
  450. f.writeBytes(val)
  451. }
  452. }
  453. f.writeConsistency(batch.Cons)
  454. resp, err := c.exec(f, nil)
  455. if err != nil {
  456. return err
  457. }
  458. switch x := resp.(type) {
  459. case resultVoidFrame:
  460. return nil
  461. case RequestErrUnprepared:
  462. stmtsLRU.mu.Lock()
  463. stmt, found := stmts[string(x.StatementId)]
  464. if found {
  465. stmtsLRU.lru.Remove(c.addr + stmt)
  466. }
  467. stmtsLRU.mu.Unlock()
  468. if found {
  469. return c.executeBatch(batch)
  470. } else {
  471. return x
  472. }
  473. case error:
  474. return x
  475. default:
  476. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  477. }
  478. }
  479. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  480. defer func() {
  481. if r := recover(); r != nil {
  482. if e, ok := r.(ErrProtocol); ok {
  483. err = e
  484. return
  485. }
  486. panic(r)
  487. }
  488. }()
  489. if len(f) < headerSize {
  490. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  491. } else if f[0] != c.version|flagResponse {
  492. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  493. }
  494. flags, op, f := f[1], f[3], f[headerSize:]
  495. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  496. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  497. return nil, err
  498. } else {
  499. f = frame(buf)
  500. }
  501. }
  502. if flags&flagTrace != 0 {
  503. if len(f) < 16 {
  504. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  505. }
  506. traceId := []byte(f[:16])
  507. f = f[16:]
  508. trace.Trace(traceId)
  509. }
  510. switch op {
  511. case opReady:
  512. return readyFrame{}, nil
  513. case opResult:
  514. switch kind := f.readInt(); kind {
  515. case resultKindVoid:
  516. return resultVoidFrame{}, nil
  517. case resultKindRows:
  518. columns, pageState := f.readMetaData()
  519. numRows := f.readInt()
  520. values := make([][]byte, numRows*len(columns))
  521. for i := 0; i < len(values); i++ {
  522. values[i] = f.readBytes()
  523. }
  524. rows := make([][][]byte, numRows)
  525. for i := 0; i < numRows; i++ {
  526. rows[i], values = values[:len(columns)], values[len(columns):]
  527. }
  528. return resultRowsFrame{columns, rows, pageState}, nil
  529. case resultKindKeyspace:
  530. keyspace := f.readString()
  531. return resultKeyspaceFrame{keyspace}, nil
  532. case resultKindPrepared:
  533. id := f.readShortBytes()
  534. values, _ := f.readMetaData()
  535. return resultPreparedFrame{id, values}, nil
  536. case resultKindSchemaChanged:
  537. return resultVoidFrame{}, nil
  538. default:
  539. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  540. }
  541. case opAuthenticate:
  542. return authenticateFrame{f.readString()}, nil
  543. case opAuthChallenge:
  544. return authChallengeFrame{f.readBytes()}, nil
  545. case opAuthSuccess:
  546. return authSuccessFrame{f.readBytes()}, nil
  547. case opSupported:
  548. return supportedFrame{}, nil
  549. case opError:
  550. return f.readError(), nil
  551. default:
  552. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  553. }
  554. }
  555. type queryInfo struct {
  556. id []byte
  557. args []ColumnInfo
  558. rval []ColumnInfo
  559. }
  560. type callReq struct {
  561. active int32
  562. resp chan callResp
  563. }
  564. type callResp struct {
  565. buf frame
  566. err error
  567. }
  568. type inflightPrepare struct {
  569. info *queryInfo
  570. err error
  571. wg sync.WaitGroup
  572. }
  573. type preparedLRU struct {
  574. lru *lru.Cache
  575. mu sync.Mutex
  576. }