conn.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const defaultFrameSize = 4096
  15. const flagResponse = 0x80
  16. const maskVersion = 0x7F
  17. type Authenticator interface {
  18. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  19. Success(data []byte) error
  20. }
  21. type PasswordAuthenticator struct {
  22. Username string
  23. Password string
  24. }
  25. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  26. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  27. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  28. }
  29. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  30. resp[0] = 0
  31. copy(resp[1:], p.Username)
  32. resp[len(p.Username)+1] = 0
  33. copy(resp[2+len(p.Username):], p.Password)
  34. return resp, nil, nil
  35. }
  36. func (p PasswordAuthenticator) Success(data []byte) error {
  37. return nil
  38. }
  39. type ConnConfig struct {
  40. ProtoVersion int
  41. CQLVersion string
  42. Timeout time.Duration
  43. NumStreams int
  44. Compressor Compressor
  45. Authenticator Authenticator
  46. Keepalive time.Duration
  47. }
  48. // Conn is a single connection to a Cassandra node. It can be used to execute
  49. // queries, but users are usually advised to use a more reliable, higher
  50. // level API.
  51. type Conn struct {
  52. conn net.Conn
  53. r *bufio.Reader
  54. timeout time.Duration
  55. uniq chan uint8
  56. calls []callReq
  57. nwait int32
  58. pool ConnectionPool
  59. compressor Compressor
  60. auth Authenticator
  61. addr string
  62. version uint8
  63. closedMu sync.RWMutex
  64. isClosed bool
  65. }
  66. // Connect establishes a connection to a Cassandra node.
  67. // You must also call the Serve method before you can execute any queries.
  68. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  69. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  74. cfg.NumStreams = 128
  75. }
  76. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  77. cfg.ProtoVersion = 2
  78. }
  79. c := &Conn{
  80. conn: conn,
  81. r: bufio.NewReader(conn),
  82. uniq: make(chan uint8, cfg.NumStreams),
  83. calls: make([]callReq, cfg.NumStreams),
  84. timeout: cfg.Timeout,
  85. version: uint8(cfg.ProtoVersion),
  86. addr: conn.RemoteAddr().String(),
  87. pool: pool,
  88. compressor: cfg.Compressor,
  89. auth: cfg.Authenticator,
  90. }
  91. if cfg.Keepalive > 0 {
  92. c.setKeepalive(cfg.Keepalive)
  93. }
  94. for i := 0; i < cap(c.uniq); i++ {
  95. c.uniq <- uint8(i)
  96. }
  97. if err := c.startup(&cfg); err != nil {
  98. conn.Close()
  99. return nil, err
  100. }
  101. go c.serve()
  102. return c, nil
  103. }
  104. func (c *Conn) startup(cfg *ConnConfig) error {
  105. compression := ""
  106. if c.compressor != nil {
  107. compression = c.compressor.Name()
  108. }
  109. var req operation = &startupFrame{
  110. CQLVersion: cfg.CQLVersion,
  111. Compression: compression,
  112. }
  113. var challenger Authenticator
  114. for {
  115. resp, err := c.execSimple(req)
  116. if err != nil {
  117. return err
  118. }
  119. switch x := resp.(type) {
  120. case readyFrame:
  121. return nil
  122. case error:
  123. return x
  124. case authenticateFrame:
  125. if c.auth == nil {
  126. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  127. }
  128. var resp []byte
  129. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  130. if err != nil {
  131. return err
  132. }
  133. req = &authResponseFrame{resp}
  134. case authChallengeFrame:
  135. if challenger == nil {
  136. return fmt.Errorf("authentication error (invalid challenge)")
  137. }
  138. var resp []byte
  139. resp, challenger, err = challenger.Challenge(x.Data)
  140. if err != nil {
  141. return err
  142. }
  143. req = &authResponseFrame{resp}
  144. case authSuccessFrame:
  145. if challenger != nil {
  146. return challenger.Success(x.Data)
  147. }
  148. return nil
  149. default:
  150. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  151. }
  152. }
  153. }
  154. // Serve starts the stream multiplexer for this connection, which is required
  155. // to execute any queries. This method runs as long as the connection is
  156. // open and is therefore usually called in a separate goroutine.
  157. func (c *Conn) serve() {
  158. var (
  159. err error
  160. resp frame
  161. )
  162. for {
  163. resp, err = c.recv()
  164. if err != nil {
  165. break
  166. }
  167. c.dispatch(resp)
  168. }
  169. c.Close()
  170. for id := 0; id < len(c.calls); id++ {
  171. req := &c.calls[id]
  172. if atomic.LoadInt32(&req.active) == 1 {
  173. req.resp <- callResp{nil, err}
  174. }
  175. }
  176. c.pool.HandleError(c, err, true)
  177. }
  178. func (c *Conn) recv() (frame, error) {
  179. resp := make(frame, headerSize, headerSize+512)
  180. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  181. n, last, pinged := 0, 0, false
  182. for n < len(resp) {
  183. nn, err := c.r.Read(resp[n:])
  184. n += nn
  185. if err != nil {
  186. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  187. if n > last {
  188. // we hit the deadline but we made progress.
  189. // simply extend the deadline
  190. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  191. last = n
  192. } else if n == 0 && !pinged {
  193. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  194. if atomic.LoadInt32(&c.nwait) > 0 {
  195. go c.ping()
  196. pinged = true
  197. }
  198. } else {
  199. return nil, err
  200. }
  201. } else {
  202. return nil, err
  203. }
  204. }
  205. if n == headerSize && len(resp) == headerSize {
  206. if resp[0] != c.version|flagResponse {
  207. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  208. }
  209. resp.grow(resp.Length())
  210. }
  211. }
  212. return resp, nil
  213. }
  214. func (c *Conn) execSimple(op operation) (interface{}, error) {
  215. f, err := op.encodeFrame(c.version, nil)
  216. f.setLength(len(f) - headerSize)
  217. if _, err := c.conn.Write([]byte(f)); err != nil {
  218. c.Close()
  219. return nil, err
  220. }
  221. if f, err = c.recv(); err != nil {
  222. return nil, err
  223. }
  224. return c.decodeFrame(f, nil)
  225. }
  226. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  227. req, err := op.encodeFrame(c.version, nil)
  228. if err != nil {
  229. return nil, err
  230. }
  231. if trace != nil {
  232. req[1] |= flagTrace
  233. }
  234. if len(req) > headerSize && c.compressor != nil {
  235. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  236. if err != nil {
  237. return nil, err
  238. }
  239. req = append(req[:headerSize], frame(body)...)
  240. req[1] |= flagCompress
  241. }
  242. req.setLength(len(req) - headerSize)
  243. id := <-c.uniq
  244. req[2] = id
  245. call := &c.calls[id]
  246. call.resp = make(chan callResp, 1)
  247. atomic.AddInt32(&c.nwait, 1)
  248. atomic.StoreInt32(&call.active, 1)
  249. if _, err := c.conn.Write(req); err != nil {
  250. c.uniq <- id
  251. c.Close()
  252. return nil, err
  253. }
  254. reply := <-call.resp
  255. call.resp = nil
  256. c.uniq <- id
  257. if reply.err != nil {
  258. return nil, reply.err
  259. }
  260. return c.decodeFrame(reply.buf, trace)
  261. }
  262. func (c *Conn) dispatch(resp frame) {
  263. id := int(resp[2])
  264. if id >= len(c.calls) {
  265. return
  266. }
  267. call := &c.calls[id]
  268. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  269. return
  270. }
  271. atomic.AddInt32(&c.nwait, -1)
  272. call.resp <- callResp{resp, nil}
  273. }
  274. func (c *Conn) ping() error {
  275. _, err := c.exec(&optionsFrame{}, nil)
  276. return err
  277. }
  278. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  279. stmtsLRU.mu.Lock()
  280. if val, ok := stmtsLRU.lru.Get(c.addr + stmt); ok {
  281. flight := val.(*inflightPrepare)
  282. stmtsLRU.mu.Unlock()
  283. flight.wg.Wait()
  284. return flight.info, flight.err
  285. }
  286. flight := new(inflightPrepare)
  287. flight.wg.Add(1)
  288. stmtsLRU.lru.Add(c.addr+stmt, flight)
  289. stmtsLRU.mu.Unlock()
  290. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  291. if err != nil {
  292. flight.err = err
  293. } else {
  294. switch x := resp.(type) {
  295. case resultPreparedFrame:
  296. flight.info = &QueryInfo{
  297. id: x.PreparedId,
  298. args: x.Values,
  299. }
  300. case error:
  301. flight.err = x
  302. default:
  303. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  304. }
  305. }
  306. flight.wg.Done()
  307. if err != nil {
  308. stmtsLRU.mu.Lock()
  309. stmtsLRU.lru.Remove(c.addr + stmt)
  310. stmtsLRU.mu.Unlock()
  311. }
  312. return flight.info, flight.err
  313. }
  314. func (c *Conn) executeQuery(qry *Query) *Iter {
  315. op := &queryFrame{
  316. Stmt: qry.stmt,
  317. Cons: qry.cons,
  318. PageSize: qry.pageSize,
  319. PageState: qry.pageState,
  320. }
  321. if qry.shouldPrepare() {
  322. // Prepare all DML queries. Other queries can not be prepared.
  323. info, err := c.prepareStatement(qry.stmt, qry.trace)
  324. if err != nil {
  325. return &Iter{err: err}
  326. }
  327. var values []interface{}
  328. if qry.binding == nil {
  329. values = qry.values
  330. } else {
  331. values, err = qry.binding(info)
  332. if err != nil {
  333. return &Iter{err: err}
  334. }
  335. }
  336. if len(values) != len(info.args) {
  337. return &Iter{err: ErrQueryArgLength}
  338. }
  339. op.Prepared = info.id
  340. op.Values = make([][]byte, len(values))
  341. for i := 0; i < len(values); i++ {
  342. val, err := Marshal(info.args[i].TypeInfo, values[i])
  343. if err != nil {
  344. return &Iter{err: err}
  345. }
  346. op.Values[i] = val
  347. }
  348. }
  349. resp, err := c.exec(op, qry.trace)
  350. if err != nil {
  351. return &Iter{err: err}
  352. }
  353. switch x := resp.(type) {
  354. case resultVoidFrame:
  355. return &Iter{}
  356. case resultRowsFrame:
  357. iter := &Iter{columns: x.Columns, rows: x.Rows}
  358. if len(x.PagingState) > 0 {
  359. iter.next = &nextIter{
  360. qry: *qry,
  361. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  362. }
  363. iter.next.qry.pageState = x.PagingState
  364. if iter.next.pos < 1 {
  365. iter.next.pos = 1
  366. }
  367. }
  368. return iter
  369. case resultKeyspaceFrame:
  370. return &Iter{}
  371. case RequestErrUnprepared:
  372. stmtsLRU.mu.Lock()
  373. if _, ok := stmtsLRU.lru.Get(c.addr + qry.stmt); ok {
  374. stmtsLRU.lru.Remove(c.addr + qry.stmt)
  375. stmtsLRU.mu.Unlock()
  376. return c.executeQuery(qry)
  377. }
  378. stmtsLRU.mu.Unlock()
  379. return &Iter{err: x}
  380. case error:
  381. return &Iter{err: x}
  382. default:
  383. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  384. }
  385. }
  386. func (c *Conn) Pick(qry *Query) *Conn {
  387. if c.Closed() {
  388. return nil
  389. }
  390. return c
  391. }
  392. func (c *Conn) Closed() bool {
  393. c.closedMu.RLock()
  394. closed := c.isClosed
  395. c.closedMu.RUnlock()
  396. return closed
  397. }
  398. func (c *Conn) Close() {
  399. c.closedMu.Lock()
  400. if c.isClosed {
  401. c.closedMu.Unlock()
  402. return
  403. }
  404. c.isClosed = true
  405. c.closedMu.Unlock()
  406. c.conn.Close()
  407. }
  408. func (c *Conn) Address() string {
  409. return c.addr
  410. }
  411. func (c *Conn) UseKeyspace(keyspace string) error {
  412. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  413. if err != nil {
  414. return err
  415. }
  416. switch x := resp.(type) {
  417. case resultKeyspaceFrame:
  418. case error:
  419. return x
  420. default:
  421. return NewErrProtocol("Unknown type in response to USE: %s", x)
  422. }
  423. return nil
  424. }
  425. func (c *Conn) executeBatch(batch *Batch) error {
  426. if c.version == 1 {
  427. return ErrUnsupported
  428. }
  429. f := make(frame, headerSize, defaultFrameSize)
  430. f.setHeader(c.version, 0, 0, opBatch)
  431. f.writeByte(byte(batch.Type))
  432. f.writeShort(uint16(len(batch.Entries)))
  433. stmts := make(map[string]string)
  434. for i := 0; i < len(batch.Entries); i++ {
  435. entry := &batch.Entries[i]
  436. var info *QueryInfo
  437. var args []interface{}
  438. if len(entry.Args) > 0 || entry.binding != nil {
  439. var err error
  440. info, err = c.prepareStatement(entry.Stmt, nil)
  441. if entry.binding == nil {
  442. args = entry.Args
  443. } else {
  444. args, err = entry.binding(info)
  445. if err != nil {
  446. return err
  447. }
  448. }
  449. if len(args) != len(info.args) {
  450. return ErrQueryArgLength
  451. }
  452. stmts[string(info.id)] = entry.Stmt
  453. if err != nil {
  454. return err
  455. }
  456. f.writeByte(1)
  457. f.writeShortBytes(info.id)
  458. } else {
  459. f.writeByte(0)
  460. f.writeLongString(entry.Stmt)
  461. }
  462. f.writeShort(uint16(len(args)))
  463. for j := 0; j < len(args); j++ {
  464. val, err := Marshal(info.args[j].TypeInfo, args[j])
  465. if err != nil {
  466. return err
  467. }
  468. f.writeBytes(val)
  469. }
  470. }
  471. f.writeConsistency(batch.Cons)
  472. resp, err := c.exec(f, nil)
  473. if err != nil {
  474. return err
  475. }
  476. switch x := resp.(type) {
  477. case resultVoidFrame:
  478. return nil
  479. case RequestErrUnprepared:
  480. stmt, found := stmts[string(x.StatementId)]
  481. if found {
  482. stmtsLRU.mu.Lock()
  483. stmtsLRU.lru.Remove(c.addr + stmt)
  484. stmtsLRU.mu.Unlock()
  485. }
  486. if found {
  487. return c.executeBatch(batch)
  488. } else {
  489. return x
  490. }
  491. case error:
  492. return x
  493. default:
  494. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  495. }
  496. }
  497. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  498. defer func() {
  499. if r := recover(); r != nil {
  500. if e, ok := r.(ErrProtocol); ok {
  501. err = e
  502. return
  503. }
  504. panic(r)
  505. }
  506. }()
  507. if len(f) < headerSize {
  508. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  509. } else if f[0] != c.version|flagResponse {
  510. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  511. }
  512. flags, op, f := f[1], f[3], f[headerSize:]
  513. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  514. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  515. return nil, err
  516. } else {
  517. f = frame(buf)
  518. }
  519. }
  520. if flags&flagTrace != 0 {
  521. if len(f) < 16 {
  522. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  523. }
  524. traceId := []byte(f[:16])
  525. f = f[16:]
  526. trace.Trace(traceId)
  527. }
  528. switch op {
  529. case opReady:
  530. return readyFrame{}, nil
  531. case opResult:
  532. switch kind := f.readInt(); kind {
  533. case resultKindVoid:
  534. return resultVoidFrame{}, nil
  535. case resultKindRows:
  536. columns, pageState := f.readMetaData()
  537. numRows := f.readInt()
  538. values := make([][]byte, numRows*len(columns))
  539. for i := 0; i < len(values); i++ {
  540. values[i] = f.readBytes()
  541. }
  542. rows := make([][][]byte, numRows)
  543. for i := 0; i < numRows; i++ {
  544. rows[i], values = values[:len(columns)], values[len(columns):]
  545. }
  546. return resultRowsFrame{columns, rows, pageState}, nil
  547. case resultKindKeyspace:
  548. keyspace := f.readString()
  549. return resultKeyspaceFrame{keyspace}, nil
  550. case resultKindPrepared:
  551. id := f.readShortBytes()
  552. values, _ := f.readMetaData()
  553. return resultPreparedFrame{id, values}, nil
  554. case resultKindSchemaChanged:
  555. return resultVoidFrame{}, nil
  556. default:
  557. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  558. }
  559. case opAuthenticate:
  560. return authenticateFrame{f.readString()}, nil
  561. case opAuthChallenge:
  562. return authChallengeFrame{f.readBytes()}, nil
  563. case opAuthSuccess:
  564. return authSuccessFrame{f.readBytes()}, nil
  565. case opSupported:
  566. return supportedFrame{}, nil
  567. case opError:
  568. return f.readError(), nil
  569. default:
  570. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  571. }
  572. }
  573. func (c *Conn) setKeepalive(d time.Duration) error {
  574. if tc, ok := c.conn.(*net.TCPConn); ok {
  575. err := tc.SetKeepAlivePeriod(d)
  576. if err != nil {
  577. return err
  578. }
  579. return tc.SetKeepAlive(true)
  580. }
  581. return nil
  582. }
  583. // QueryInfo represents the meta data associated with a prepared CQL statement.
  584. type QueryInfo struct {
  585. id []byte
  586. args []ColumnInfo
  587. rval []ColumnInfo
  588. }
  589. type callReq struct {
  590. active int32
  591. resp chan callResp
  592. }
  593. type callResp struct {
  594. buf frame
  595. err error
  596. }
  597. type inflightPrepare struct {
  598. info *QueryInfo
  599. err error
  600. wg sync.WaitGroup
  601. }
  602. var (
  603. ErrQueryArgLength = errors.New("query argument length mismatch")
  604. )