conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const defaultFrameSize = 4096
  14. const flagResponse = 0x80
  15. const maskVersion = 0x7F
  16. type Authenticator interface {
  17. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  18. Success(data []byte) error
  19. }
  20. type PasswordAuthenticator struct {
  21. Username string
  22. Password string
  23. }
  24. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  25. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  26. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  27. }
  28. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  29. resp[0] = 0
  30. copy(resp[1:], p.Username)
  31. resp[len(p.Username)+1] = 0
  32. copy(resp[2+len(p.Username):], p.Password)
  33. return resp, nil, nil
  34. }
  35. func (p PasswordAuthenticator) Success(data []byte) error {
  36. return nil
  37. }
  38. type ConnConfig struct {
  39. ProtoVersion int
  40. CQLVersion string
  41. Timeout time.Duration
  42. NumStreams int
  43. Compressor Compressor
  44. Authenticator Authenticator
  45. Keepalive time.Duration
  46. }
  47. // Conn is a single connection to a Cassandra node. It can be used to execute
  48. // queries, but users are usually advised to use a more reliable, higher
  49. // level API.
  50. type Conn struct {
  51. conn net.Conn
  52. r *bufio.Reader
  53. timeout time.Duration
  54. uniq chan uint8
  55. calls []callReq
  56. nwait int32
  57. pool ConnectionPool
  58. compressor Compressor
  59. auth Authenticator
  60. addr string
  61. version uint8
  62. closedMu sync.RWMutex
  63. isClosed bool
  64. }
  65. // Connect establishes a connection to a Cassandra node.
  66. // You must also call the Serve method before you can execute any queries.
  67. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  68. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  69. if err != nil {
  70. return nil, err
  71. }
  72. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  73. cfg.NumStreams = 128
  74. }
  75. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  76. cfg.ProtoVersion = 2
  77. }
  78. c := &Conn{
  79. conn: conn,
  80. r: bufio.NewReader(conn),
  81. uniq: make(chan uint8, cfg.NumStreams),
  82. calls: make([]callReq, cfg.NumStreams),
  83. timeout: cfg.Timeout,
  84. version: uint8(cfg.ProtoVersion),
  85. addr: conn.RemoteAddr().String(),
  86. pool: pool,
  87. compressor: cfg.Compressor,
  88. auth: cfg.Authenticator,
  89. }
  90. if cfg.Keepalive > 0 {
  91. c.setKeepalive(cfg.Keepalive)
  92. }
  93. for i := 0; i < cap(c.uniq); i++ {
  94. c.uniq <- uint8(i)
  95. }
  96. if err := c.startup(&cfg); err != nil {
  97. conn.Close()
  98. return nil, err
  99. }
  100. go c.serve()
  101. return c, nil
  102. }
  103. func (c *Conn) startup(cfg *ConnConfig) error {
  104. compression := ""
  105. if c.compressor != nil {
  106. compression = c.compressor.Name()
  107. }
  108. var req operation = &startupFrame{
  109. CQLVersion: cfg.CQLVersion,
  110. Compression: compression,
  111. }
  112. var challenger Authenticator
  113. for {
  114. resp, err := c.execSimple(req)
  115. if err != nil {
  116. return err
  117. }
  118. switch x := resp.(type) {
  119. case readyFrame:
  120. return nil
  121. case error:
  122. return x
  123. case authenticateFrame:
  124. if c.auth == nil {
  125. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  126. }
  127. var resp []byte
  128. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  129. if err != nil {
  130. return err
  131. }
  132. req = &authResponseFrame{resp}
  133. case authChallengeFrame:
  134. if challenger == nil {
  135. return fmt.Errorf("authentication error (invalid challenge)")
  136. }
  137. var resp []byte
  138. resp, challenger, err = challenger.Challenge(x.Data)
  139. if err != nil {
  140. return err
  141. }
  142. req = &authResponseFrame{resp}
  143. case authSuccessFrame:
  144. if challenger != nil {
  145. return challenger.Success(x.Data)
  146. }
  147. return nil
  148. default:
  149. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  150. }
  151. }
  152. }
  153. // Serve starts the stream multiplexer for this connection, which is required
  154. // to execute any queries. This method runs as long as the connection is
  155. // open and is therefore usually called in a separate goroutine.
  156. func (c *Conn) serve() {
  157. var (
  158. err error
  159. resp frame
  160. )
  161. for {
  162. resp, err = c.recv()
  163. if err != nil {
  164. break
  165. }
  166. c.dispatch(resp)
  167. }
  168. c.Close()
  169. for id := 0; id < len(c.calls); id++ {
  170. req := &c.calls[id]
  171. if atomic.LoadInt32(&req.active) == 1 {
  172. req.resp <- callResp{nil, err}
  173. }
  174. }
  175. c.pool.HandleError(c, err, true)
  176. }
  177. func (c *Conn) recv() (frame, error) {
  178. resp := make(frame, headerSize, headerSize+512)
  179. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  180. n, last, pinged := 0, 0, false
  181. for n < len(resp) {
  182. nn, err := c.r.Read(resp[n:])
  183. n += nn
  184. if err != nil {
  185. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  186. if n > last {
  187. // we hit the deadline but we made progress.
  188. // simply extend the deadline
  189. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  190. last = n
  191. } else if n == 0 && !pinged {
  192. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  193. if atomic.LoadInt32(&c.nwait) > 0 {
  194. go c.ping()
  195. pinged = true
  196. }
  197. } else {
  198. return nil, err
  199. }
  200. } else {
  201. return nil, err
  202. }
  203. }
  204. if n == headerSize && len(resp) == headerSize {
  205. if resp[0] != c.version|flagResponse {
  206. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  207. }
  208. resp.grow(resp.Length())
  209. }
  210. }
  211. return resp, nil
  212. }
  213. func (c *Conn) execSimple(op operation) (interface{}, error) {
  214. f, err := op.encodeFrame(c.version, nil)
  215. f.setLength(len(f) - headerSize)
  216. if _, err := c.conn.Write([]byte(f)); err != nil {
  217. c.Close()
  218. return nil, err
  219. }
  220. if f, err = c.recv(); err != nil {
  221. return nil, err
  222. }
  223. return c.decodeFrame(f, nil)
  224. }
  225. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  226. req, err := op.encodeFrame(c.version, nil)
  227. if err != nil {
  228. return nil, err
  229. }
  230. if trace != nil {
  231. req[1] |= flagTrace
  232. }
  233. if len(req) > headerSize && c.compressor != nil {
  234. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  235. if err != nil {
  236. return nil, err
  237. }
  238. req = append(req[:headerSize], frame(body)...)
  239. req[1] |= flagCompress
  240. }
  241. req.setLength(len(req) - headerSize)
  242. id := <-c.uniq
  243. req[2] = id
  244. call := &c.calls[id]
  245. call.resp = make(chan callResp, 1)
  246. atomic.AddInt32(&c.nwait, 1)
  247. atomic.StoreInt32(&call.active, 1)
  248. if _, err := c.conn.Write(req); err != nil {
  249. c.uniq <- id
  250. c.Close()
  251. return nil, err
  252. }
  253. reply := <-call.resp
  254. call.resp = nil
  255. c.uniq <- id
  256. if reply.err != nil {
  257. return nil, reply.err
  258. }
  259. return c.decodeFrame(reply.buf, trace)
  260. }
  261. func (c *Conn) dispatch(resp frame) {
  262. id := int(resp[2])
  263. if id >= len(c.calls) {
  264. return
  265. }
  266. call := &c.calls[id]
  267. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  268. return
  269. }
  270. atomic.AddInt32(&c.nwait, -1)
  271. call.resp <- callResp{resp, nil}
  272. }
  273. func (c *Conn) ping() error {
  274. _, err := c.exec(&optionsFrame{}, nil)
  275. return err
  276. }
  277. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  278. stmtsLRU.mu.Lock()
  279. if val, ok := stmtsLRU.lru.Get(c.addr + stmt); ok {
  280. flight := val.(*inflightPrepare)
  281. stmtsLRU.mu.Unlock()
  282. flight.wg.Wait()
  283. return flight.info, flight.err
  284. }
  285. flight := new(inflightPrepare)
  286. flight.wg.Add(1)
  287. stmtsLRU.lru.Add(c.addr+stmt, flight)
  288. stmtsLRU.mu.Unlock()
  289. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  290. if err != nil {
  291. flight.err = err
  292. } else {
  293. switch x := resp.(type) {
  294. case resultPreparedFrame:
  295. flight.info = &queryInfo{
  296. id: x.PreparedId,
  297. args: x.Values,
  298. }
  299. case error:
  300. flight.err = x
  301. default:
  302. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  303. }
  304. }
  305. flight.wg.Done()
  306. if err != nil {
  307. stmtsLRU.mu.Lock()
  308. stmtsLRU.lru.Remove(c.addr + stmt)
  309. stmtsLRU.mu.Unlock()
  310. }
  311. return flight.info, flight.err
  312. }
  313. func (c *Conn) executeQuery(qry *Query) *Iter {
  314. op := &queryFrame{
  315. Stmt: qry.stmt,
  316. Cons: qry.cons,
  317. PageSize: qry.pageSize,
  318. PageState: qry.pageState,
  319. }
  320. if qry.shouldPrepare() {
  321. // Prepare all DML queries. Other queries can not be prepared.
  322. info, err := c.prepareStatement(qry.stmt, qry.trace)
  323. if err != nil {
  324. return &Iter{err: err}
  325. }
  326. op.Prepared = info.id
  327. op.Values = make([][]byte, len(qry.values))
  328. for i := 0; i < len(qry.values); i++ {
  329. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  330. if err != nil {
  331. return &Iter{err: err}
  332. }
  333. op.Values[i] = val
  334. }
  335. }
  336. resp, err := c.exec(op, qry.trace)
  337. if err != nil {
  338. return &Iter{err: err}
  339. }
  340. switch x := resp.(type) {
  341. case resultVoidFrame:
  342. return &Iter{}
  343. case resultRowsFrame:
  344. iter := &Iter{columns: x.Columns, rows: x.Rows}
  345. if len(x.PagingState) > 0 {
  346. iter.next = &nextIter{
  347. qry: *qry,
  348. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  349. }
  350. iter.next.qry.pageState = x.PagingState
  351. if iter.next.pos < 1 {
  352. iter.next.pos = 1
  353. }
  354. }
  355. return iter
  356. case resultKeyspaceFrame:
  357. return &Iter{}
  358. case RequestErrUnprepared:
  359. stmtsLRU.mu.Lock()
  360. if _, ok := stmtsLRU.lru.Get(c.addr + qry.stmt); ok {
  361. stmtsLRU.lru.Remove(c.addr + qry.stmt)
  362. stmtsLRU.mu.Unlock()
  363. return c.executeQuery(qry)
  364. }
  365. stmtsLRU.mu.Unlock()
  366. return &Iter{err: x}
  367. case error:
  368. return &Iter{err: x}
  369. default:
  370. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  371. }
  372. }
  373. func (c *Conn) Pick(qry *Query) *Conn {
  374. if c.Closed() {
  375. return nil
  376. }
  377. return c
  378. }
  379. func (c *Conn) Closed() bool {
  380. c.closedMu.RLock()
  381. closed := c.isClosed
  382. c.closedMu.RUnlock()
  383. return closed
  384. }
  385. func (c *Conn) Close() {
  386. c.closedMu.Lock()
  387. if c.isClosed {
  388. c.closedMu.Unlock()
  389. return
  390. }
  391. c.isClosed = true
  392. c.closedMu.Unlock()
  393. c.conn.Close()
  394. }
  395. func (c *Conn) Address() string {
  396. return c.addr
  397. }
  398. func (c *Conn) UseKeyspace(keyspace string) error {
  399. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  400. if err != nil {
  401. return err
  402. }
  403. switch x := resp.(type) {
  404. case resultKeyspaceFrame:
  405. case error:
  406. return x
  407. default:
  408. return NewErrProtocol("Unknown type in response to USE: %s", x)
  409. }
  410. return nil
  411. }
  412. func (c *Conn) executeBatch(batch *Batch) error {
  413. if c.version == 1 {
  414. return ErrUnsupported
  415. }
  416. f := make(frame, headerSize, defaultFrameSize)
  417. f.setHeader(c.version, 0, 0, opBatch)
  418. f.writeByte(byte(batch.Type))
  419. f.writeShort(uint16(len(batch.Entries)))
  420. stmts := make(map[string]string)
  421. for i := 0; i < len(batch.Entries); i++ {
  422. entry := &batch.Entries[i]
  423. var info *queryInfo
  424. if len(entry.Args) > 0 {
  425. var err error
  426. info, err = c.prepareStatement(entry.Stmt, nil)
  427. stmts[string(info.id)] = entry.Stmt
  428. if err != nil {
  429. return err
  430. }
  431. f.writeByte(1)
  432. f.writeShortBytes(info.id)
  433. } else {
  434. f.writeByte(0)
  435. f.writeLongString(entry.Stmt)
  436. }
  437. f.writeShort(uint16(len(entry.Args)))
  438. for j := 0; j < len(entry.Args); j++ {
  439. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  440. if err != nil {
  441. return err
  442. }
  443. f.writeBytes(val)
  444. }
  445. }
  446. f.writeConsistency(batch.Cons)
  447. resp, err := c.exec(f, nil)
  448. if err != nil {
  449. return err
  450. }
  451. switch x := resp.(type) {
  452. case resultVoidFrame:
  453. return nil
  454. case RequestErrUnprepared:
  455. stmt, found := stmts[string(x.StatementId)]
  456. if found {
  457. stmtsLRU.mu.Lock()
  458. stmtsLRU.lru.Remove(c.addr + stmt)
  459. stmtsLRU.mu.Unlock()
  460. }
  461. if found {
  462. return c.executeBatch(batch)
  463. } else {
  464. return x
  465. }
  466. case error:
  467. return x
  468. default:
  469. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  470. }
  471. }
  472. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  473. defer func() {
  474. if r := recover(); r != nil {
  475. if e, ok := r.(ErrProtocol); ok {
  476. err = e
  477. return
  478. }
  479. panic(r)
  480. }
  481. }()
  482. if len(f) < headerSize {
  483. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  484. } else if f[0] != c.version|flagResponse {
  485. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  486. }
  487. flags, op, f := f[1], f[3], f[headerSize:]
  488. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  489. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  490. return nil, err
  491. } else {
  492. f = frame(buf)
  493. }
  494. }
  495. if flags&flagTrace != 0 {
  496. if len(f) < 16 {
  497. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  498. }
  499. traceId := []byte(f[:16])
  500. f = f[16:]
  501. trace.Trace(traceId)
  502. }
  503. switch op {
  504. case opReady:
  505. return readyFrame{}, nil
  506. case opResult:
  507. switch kind := f.readInt(); kind {
  508. case resultKindVoid:
  509. return resultVoidFrame{}, nil
  510. case resultKindRows:
  511. columns, pageState := f.readMetaData()
  512. numRows := f.readInt()
  513. values := make([][]byte, numRows*len(columns))
  514. for i := 0; i < len(values); i++ {
  515. values[i] = f.readBytes()
  516. }
  517. rows := make([][][]byte, numRows)
  518. for i := 0; i < numRows; i++ {
  519. rows[i], values = values[:len(columns)], values[len(columns):]
  520. }
  521. return resultRowsFrame{columns, rows, pageState}, nil
  522. case resultKindKeyspace:
  523. keyspace := f.readString()
  524. return resultKeyspaceFrame{keyspace}, nil
  525. case resultKindPrepared:
  526. id := f.readShortBytes()
  527. values, _ := f.readMetaData()
  528. return resultPreparedFrame{id, values}, nil
  529. case resultKindSchemaChanged:
  530. return resultVoidFrame{}, nil
  531. default:
  532. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  533. }
  534. case opAuthenticate:
  535. return authenticateFrame{f.readString()}, nil
  536. case opAuthChallenge:
  537. return authChallengeFrame{f.readBytes()}, nil
  538. case opAuthSuccess:
  539. return authSuccessFrame{f.readBytes()}, nil
  540. case opSupported:
  541. return supportedFrame{}, nil
  542. case opError:
  543. return f.readError(), nil
  544. default:
  545. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  546. }
  547. }
  548. type queryInfo struct {
  549. id []byte
  550. args []ColumnInfo
  551. rval []ColumnInfo
  552. }
  553. type callReq struct {
  554. active int32
  555. resp chan callResp
  556. }
  557. type callResp struct {
  558. buf frame
  559. err error
  560. }
  561. type inflightPrepare struct {
  562. info *queryInfo
  563. err error
  564. wg sync.WaitGroup
  565. }