conn.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const defaultFrameSize = 4096
  15. const flagResponse = 0x80
  16. const maskVersion = 0x7F
  17. type Authenticator interface {
  18. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  19. Success(data []byte) error
  20. }
  21. type PasswordAuthenticator struct {
  22. Username string
  23. Password string
  24. }
  25. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  26. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  27. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  28. }
  29. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  30. resp[0] = 0
  31. copy(resp[1:], p.Username)
  32. resp[len(p.Username)+1] = 0
  33. copy(resp[2+len(p.Username):], p.Password)
  34. return resp, nil, nil
  35. }
  36. func (p PasswordAuthenticator) Success(data []byte) error {
  37. return nil
  38. }
  39. type ConnConfig struct {
  40. ProtoVersion int
  41. CQLVersion string
  42. Timeout time.Duration
  43. NumStreams int
  44. Compressor Compressor
  45. Authenticator Authenticator
  46. Keepalive time.Duration
  47. }
  48. // Conn is a single connection to a Cassandra node. It can be used to execute
  49. // queries, but users are usually advised to use a more reliable, higher
  50. // level API.
  51. type Conn struct {
  52. conn net.Conn
  53. r *bufio.Reader
  54. timeout time.Duration
  55. uniq chan uint8
  56. calls []callReq
  57. nwait int32
  58. pool ConnectionPool
  59. compressor Compressor
  60. auth Authenticator
  61. addr string
  62. version uint8
  63. currentKeyspace string
  64. closedMu sync.RWMutex
  65. isClosed bool
  66. }
  67. // Connect establishes a connection to a Cassandra node.
  68. // You must also call the Serve method before you can execute any queries.
  69. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  70. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  71. if err != nil {
  72. return nil, err
  73. }
  74. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  75. cfg.NumStreams = 128
  76. }
  77. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  78. cfg.ProtoVersion = 2
  79. }
  80. c := &Conn{
  81. conn: conn,
  82. r: bufio.NewReader(conn),
  83. uniq: make(chan uint8, cfg.NumStreams),
  84. calls: make([]callReq, cfg.NumStreams),
  85. timeout: cfg.Timeout,
  86. version: uint8(cfg.ProtoVersion),
  87. addr: conn.RemoteAddr().String(),
  88. pool: pool,
  89. compressor: cfg.Compressor,
  90. auth: cfg.Authenticator,
  91. }
  92. if cfg.Keepalive > 0 {
  93. c.setKeepalive(cfg.Keepalive)
  94. }
  95. for i := 0; i < cap(c.uniq); i++ {
  96. c.uniq <- uint8(i)
  97. }
  98. if err := c.startup(&cfg); err != nil {
  99. conn.Close()
  100. return nil, err
  101. }
  102. go c.serve()
  103. return c, nil
  104. }
  105. func (c *Conn) startup(cfg *ConnConfig) error {
  106. compression := ""
  107. if c.compressor != nil {
  108. compression = c.compressor.Name()
  109. }
  110. var req operation = &startupFrame{
  111. CQLVersion: cfg.CQLVersion,
  112. Compression: compression,
  113. }
  114. var challenger Authenticator
  115. for {
  116. resp, err := c.execSimple(req)
  117. if err != nil {
  118. return err
  119. }
  120. switch x := resp.(type) {
  121. case readyFrame:
  122. return nil
  123. case error:
  124. return x
  125. case authenticateFrame:
  126. if c.auth == nil {
  127. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  128. }
  129. var resp []byte
  130. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  131. if err != nil {
  132. return err
  133. }
  134. req = &authResponseFrame{resp}
  135. case authChallengeFrame:
  136. if challenger == nil {
  137. return fmt.Errorf("authentication error (invalid challenge)")
  138. }
  139. var resp []byte
  140. resp, challenger, err = challenger.Challenge(x.Data)
  141. if err != nil {
  142. return err
  143. }
  144. req = &authResponseFrame{resp}
  145. case authSuccessFrame:
  146. if challenger != nil {
  147. return challenger.Success(x.Data)
  148. }
  149. return nil
  150. default:
  151. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  152. }
  153. }
  154. }
  155. // Serve starts the stream multiplexer for this connection, which is required
  156. // to execute any queries. This method runs as long as the connection is
  157. // open and is therefore usually called in a separate goroutine.
  158. func (c *Conn) serve() {
  159. var (
  160. err error
  161. resp frame
  162. )
  163. for {
  164. resp, err = c.recv()
  165. if err != nil {
  166. break
  167. }
  168. c.dispatch(resp)
  169. }
  170. c.Close()
  171. for id := 0; id < len(c.calls); id++ {
  172. req := &c.calls[id]
  173. if atomic.LoadInt32(&req.active) == 1 {
  174. req.resp <- callResp{nil, err}
  175. }
  176. }
  177. c.pool.HandleError(c, err, true)
  178. }
  179. func (c *Conn) recv() (frame, error) {
  180. resp := make(frame, headerSize, headerSize+512)
  181. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  182. n, last, pinged := 0, 0, false
  183. for n < len(resp) {
  184. nn, err := c.r.Read(resp[n:])
  185. n += nn
  186. if err != nil {
  187. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  188. if n > last {
  189. // we hit the deadline but we made progress.
  190. // simply extend the deadline
  191. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  192. last = n
  193. } else if n == 0 && !pinged {
  194. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  195. if atomic.LoadInt32(&c.nwait) > 0 {
  196. go c.ping()
  197. pinged = true
  198. }
  199. } else {
  200. return nil, err
  201. }
  202. } else {
  203. return nil, err
  204. }
  205. }
  206. if n == headerSize && len(resp) == headerSize {
  207. if resp[0] != c.version|flagResponse {
  208. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  209. }
  210. resp.grow(resp.Length())
  211. }
  212. }
  213. return resp, nil
  214. }
  215. func (c *Conn) execSimple(op operation) (interface{}, error) {
  216. f, err := op.encodeFrame(c.version, nil)
  217. f.setLength(len(f) - headerSize)
  218. if _, err := c.conn.Write([]byte(f)); err != nil {
  219. c.Close()
  220. return nil, err
  221. }
  222. if f, err = c.recv(); err != nil {
  223. return nil, err
  224. }
  225. return c.decodeFrame(f, nil)
  226. }
  227. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  228. req, err := op.encodeFrame(c.version, nil)
  229. if err != nil {
  230. return nil, err
  231. }
  232. if trace != nil {
  233. req[1] |= flagTrace
  234. }
  235. if len(req) > headerSize && c.compressor != nil {
  236. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  237. if err != nil {
  238. return nil, err
  239. }
  240. req = append(req[:headerSize], frame(body)...)
  241. req[1] |= flagCompress
  242. }
  243. req.setLength(len(req) - headerSize)
  244. id := <-c.uniq
  245. req[2] = id
  246. call := &c.calls[id]
  247. call.resp = make(chan callResp, 1)
  248. atomic.AddInt32(&c.nwait, 1)
  249. atomic.StoreInt32(&call.active, 1)
  250. if _, err := c.conn.Write(req); err != nil {
  251. c.uniq <- id
  252. c.Close()
  253. return nil, err
  254. }
  255. reply := <-call.resp
  256. call.resp = nil
  257. c.uniq <- id
  258. if reply.err != nil {
  259. return nil, reply.err
  260. }
  261. return c.decodeFrame(reply.buf, trace)
  262. }
  263. func (c *Conn) dispatch(resp frame) {
  264. id := int(resp[2])
  265. if id >= len(c.calls) {
  266. return
  267. }
  268. call := &c.calls[id]
  269. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  270. return
  271. }
  272. atomic.AddInt32(&c.nwait, -1)
  273. call.resp <- callResp{resp, nil}
  274. }
  275. func (c *Conn) ping() error {
  276. _, err := c.exec(&optionsFrame{}, nil)
  277. return err
  278. }
  279. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  280. stmtsLRU.mu.Lock()
  281. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  282. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  283. flight := val.(*inflightPrepare)
  284. stmtsLRU.mu.Unlock()
  285. flight.wg.Wait()
  286. return flight.info, flight.err
  287. }
  288. flight := new(inflightPrepare)
  289. flight.wg.Add(1)
  290. stmtsLRU.lru.Add(stmtCacheKey, flight)
  291. stmtsLRU.mu.Unlock()
  292. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  293. if err != nil {
  294. flight.err = err
  295. } else {
  296. switch x := resp.(type) {
  297. case resultPreparedFrame:
  298. flight.info = &QueryInfo{
  299. Id: x.PreparedId,
  300. Args: x.Arguments,
  301. Rval: x.ReturnValues,
  302. }
  303. case error:
  304. flight.err = x
  305. default:
  306. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  307. }
  308. err = flight.err
  309. }
  310. flight.wg.Done()
  311. if err != nil {
  312. stmtsLRU.mu.Lock()
  313. stmtsLRU.lru.Remove(stmtCacheKey)
  314. stmtsLRU.mu.Unlock()
  315. }
  316. return flight.info, flight.err
  317. }
  318. func (c *Conn) executeQuery(qry *Query) *Iter {
  319. op := &queryFrame{
  320. Stmt: qry.stmt,
  321. Cons: qry.cons,
  322. PageSize: qry.pageSize,
  323. PageState: qry.pageState,
  324. }
  325. if qry.shouldPrepare() {
  326. // Prepare all DML queries. Other queries can not be prepared.
  327. info, err := c.prepareStatement(qry.stmt, qry.trace)
  328. if err != nil {
  329. return &Iter{err: err}
  330. }
  331. var values []interface{}
  332. if qry.binding == nil {
  333. values = qry.values
  334. } else {
  335. values, err = qry.binding(info)
  336. if err != nil {
  337. return &Iter{err: err}
  338. }
  339. }
  340. if len(values) != len(info.Args) {
  341. return &Iter{err: ErrQueryArgLength}
  342. }
  343. op.Prepared = info.Id
  344. op.Values = make([][]byte, len(values))
  345. for i := 0; i < len(values); i++ {
  346. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  347. if err != nil {
  348. return &Iter{err: err}
  349. }
  350. op.Values[i] = val
  351. }
  352. }
  353. resp, err := c.exec(op, qry.trace)
  354. if err != nil {
  355. return &Iter{err: err}
  356. }
  357. switch x := resp.(type) {
  358. case resultVoidFrame:
  359. return &Iter{}
  360. case resultRowsFrame:
  361. iter := &Iter{columns: x.Columns, rows: x.Rows}
  362. if len(x.PagingState) > 0 {
  363. iter.next = &nextIter{
  364. qry: *qry,
  365. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  366. }
  367. iter.next.qry.pageState = x.PagingState
  368. if iter.next.pos < 1 {
  369. iter.next.pos = 1
  370. }
  371. }
  372. return iter
  373. case resultKeyspaceFrame:
  374. return &Iter{}
  375. case RequestErrUnprepared:
  376. stmtsLRU.mu.Lock()
  377. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  378. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  379. stmtsLRU.lru.Remove(stmtCacheKey)
  380. stmtsLRU.mu.Unlock()
  381. return c.executeQuery(qry)
  382. }
  383. stmtsLRU.mu.Unlock()
  384. return &Iter{err: x}
  385. case error:
  386. return &Iter{err: x}
  387. default:
  388. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  389. }
  390. }
  391. func (c *Conn) Pick(qry *Query) *Conn {
  392. if c.Closed() {
  393. return nil
  394. }
  395. return c
  396. }
  397. func (c *Conn) Closed() bool {
  398. c.closedMu.RLock()
  399. closed := c.isClosed
  400. c.closedMu.RUnlock()
  401. return closed
  402. }
  403. func (c *Conn) Close() {
  404. c.closedMu.Lock()
  405. if c.isClosed {
  406. c.closedMu.Unlock()
  407. return
  408. }
  409. c.isClosed = true
  410. c.closedMu.Unlock()
  411. c.conn.Close()
  412. }
  413. func (c *Conn) Address() string {
  414. return c.addr
  415. }
  416. func (c *Conn) UseKeyspace(keyspace string) error {
  417. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  418. if err != nil {
  419. return err
  420. }
  421. switch x := resp.(type) {
  422. case resultKeyspaceFrame:
  423. case error:
  424. return x
  425. default:
  426. return NewErrProtocol("Unknown type in response to USE: %s", x)
  427. }
  428. c.currentKeyspace = keyspace
  429. return nil
  430. }
  431. func (c *Conn) executeBatch(batch *Batch) error {
  432. if c.version == 1 {
  433. return ErrUnsupported
  434. }
  435. f := make(frame, headerSize, defaultFrameSize)
  436. f.setHeader(c.version, 0, 0, opBatch)
  437. f.writeByte(byte(batch.Type))
  438. f.writeShort(uint16(len(batch.Entries)))
  439. stmts := make(map[string]string)
  440. for i := 0; i < len(batch.Entries); i++ {
  441. entry := &batch.Entries[i]
  442. var info *QueryInfo
  443. var args []interface{}
  444. if len(entry.Args) > 0 || entry.binding != nil {
  445. var err error
  446. info, err = c.prepareStatement(entry.Stmt, nil)
  447. if err != nil {
  448. return err
  449. }
  450. if entry.binding == nil {
  451. args = entry.Args
  452. } else {
  453. args, err = entry.binding(info)
  454. if err != nil {
  455. return err
  456. }
  457. }
  458. if len(args) != len(info.Args) {
  459. return ErrQueryArgLength
  460. }
  461. stmts[string(info.Id)] = entry.Stmt
  462. f.writeByte(1)
  463. f.writeShortBytes(info.Id)
  464. } else {
  465. f.writeByte(0)
  466. f.writeLongString(entry.Stmt)
  467. }
  468. f.writeShort(uint16(len(args)))
  469. for j := 0; j < len(args); j++ {
  470. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  471. if err != nil {
  472. return err
  473. }
  474. f.writeBytes(val)
  475. }
  476. }
  477. f.writeConsistency(batch.Cons)
  478. resp, err := c.exec(f, nil)
  479. if err != nil {
  480. return err
  481. }
  482. switch x := resp.(type) {
  483. case resultVoidFrame:
  484. return nil
  485. case RequestErrUnprepared:
  486. stmt, found := stmts[string(x.StatementId)]
  487. if found {
  488. stmtsLRU.mu.Lock()
  489. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  490. stmtsLRU.mu.Unlock()
  491. }
  492. if found {
  493. return c.executeBatch(batch)
  494. } else {
  495. return x
  496. }
  497. case error:
  498. return x
  499. default:
  500. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  501. }
  502. }
  503. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  504. defer func() {
  505. if r := recover(); r != nil {
  506. if e, ok := r.(ErrProtocol); ok {
  507. err = e
  508. return
  509. }
  510. panic(r)
  511. }
  512. }()
  513. if len(f) < headerSize {
  514. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  515. } else if f[0] != c.version|flagResponse {
  516. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  517. }
  518. flags, op, f := f[1], f[3], f[headerSize:]
  519. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  520. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  521. return nil, err
  522. } else {
  523. f = frame(buf)
  524. }
  525. }
  526. if flags&flagTrace != 0 {
  527. if len(f) < 16 {
  528. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  529. }
  530. traceId := []byte(f[:16])
  531. f = f[16:]
  532. trace.Trace(traceId)
  533. }
  534. switch op {
  535. case opReady:
  536. return readyFrame{}, nil
  537. case opResult:
  538. switch kind := f.readInt(); kind {
  539. case resultKindVoid:
  540. return resultVoidFrame{}, nil
  541. case resultKindRows:
  542. columns, pageState := f.readMetaData()
  543. numRows := f.readInt()
  544. values := make([][]byte, numRows*len(columns))
  545. for i := 0; i < len(values); i++ {
  546. values[i] = f.readBytes()
  547. }
  548. rows := make([][][]byte, numRows)
  549. for i := 0; i < numRows; i++ {
  550. rows[i], values = values[:len(columns)], values[len(columns):]
  551. }
  552. return resultRowsFrame{columns, rows, pageState}, nil
  553. case resultKindKeyspace:
  554. keyspace := f.readString()
  555. return resultKeyspaceFrame{keyspace}, nil
  556. case resultKindPrepared:
  557. id := f.readShortBytes()
  558. args, _ := f.readMetaData()
  559. if c.version < 2 {
  560. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  561. }
  562. rvals, _ := f.readMetaData()
  563. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  564. case resultKindSchemaChanged:
  565. return resultVoidFrame{}, nil
  566. default:
  567. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  568. }
  569. case opAuthenticate:
  570. return authenticateFrame{f.readString()}, nil
  571. case opAuthChallenge:
  572. return authChallengeFrame{f.readBytes()}, nil
  573. case opAuthSuccess:
  574. return authSuccessFrame{f.readBytes()}, nil
  575. case opSupported:
  576. return supportedFrame{}, nil
  577. case opError:
  578. return f.readError(), nil
  579. default:
  580. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  581. }
  582. }
  583. func (c *Conn) setKeepalive(d time.Duration) error {
  584. if tc, ok := c.conn.(*net.TCPConn); ok {
  585. err := tc.SetKeepAlivePeriod(d)
  586. if err != nil {
  587. return err
  588. }
  589. return tc.SetKeepAlive(true)
  590. }
  591. return nil
  592. }
  593. // QueryInfo represents the meta data associated with a prepared CQL statement.
  594. type QueryInfo struct {
  595. Id []byte
  596. Args []ColumnInfo
  597. Rval []ColumnInfo
  598. }
  599. type callReq struct {
  600. active int32
  601. resp chan callResp
  602. }
  603. type callResp struct {
  604. buf frame
  605. err error
  606. }
  607. type inflightPrepare struct {
  608. info *QueryInfo
  609. err error
  610. wg sync.WaitGroup
  611. }
  612. var (
  613. ErrQueryArgLength = errors.New("query argument length mismatch")
  614. )