conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "bytes"
  8. "fmt"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const defaultFrameSize = 4096
  15. const flagResponse = 0x80
  16. const maskVersion = 0x7F
  17. type Authenticator interface {
  18. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  19. Success(data []byte) error
  20. }
  21. type PasswordAuthenticator struct {
  22. Username string
  23. Password string
  24. }
  25. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  26. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  27. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  28. }
  29. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  30. resp[0] = 0
  31. copy(resp[1:], p.Username)
  32. resp[len(p.Username)+1] = 0
  33. copy(resp[2+len(p.Username):], p.Password)
  34. return resp, nil, nil
  35. }
  36. func (p PasswordAuthenticator) Success(data []byte) error {
  37. return nil
  38. }
  39. type ConnConfig struct {
  40. ProtoVersion int
  41. CQLVersion string
  42. Timeout time.Duration
  43. NumStreams int
  44. Compressor Compressor
  45. Authenticator Authenticator
  46. Keepalive time.Duration
  47. }
  48. // Conn is a single connection to a Cassandra node. It can be used to execute
  49. // queries, but users are usually advised to use a more reliable, higher
  50. // level API.
  51. type Conn struct {
  52. conn net.Conn
  53. r *bufio.Reader
  54. timeout time.Duration
  55. uniq chan uint8
  56. calls []callReq
  57. nwait int32
  58. prepMu sync.Mutex
  59. prep map[string]*inflightPrepare
  60. pool ConnectionPool
  61. compressor Compressor
  62. auth Authenticator
  63. addr string
  64. version uint8
  65. closedMu sync.RWMutex
  66. isClosed bool
  67. }
  68. // Connect establishes a connection to a Cassandra node.
  69. // You must also call the Serve method before you can execute any queries.
  70. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  71. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  72. if err != nil {
  73. return nil, err
  74. }
  75. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  76. cfg.NumStreams = 128
  77. }
  78. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  79. cfg.ProtoVersion = 2
  80. }
  81. c := &Conn{
  82. conn: conn,
  83. r: bufio.NewReader(conn),
  84. uniq: make(chan uint8, cfg.NumStreams),
  85. calls: make([]callReq, cfg.NumStreams),
  86. prep: make(map[string]*inflightPrepare),
  87. timeout: cfg.Timeout,
  88. version: uint8(cfg.ProtoVersion),
  89. addr: conn.RemoteAddr().String(),
  90. pool: pool,
  91. compressor: cfg.Compressor,
  92. auth: cfg.Authenticator,
  93. }
  94. if cfg.Keepalive > 0 {
  95. c.setKeepalive(cfg.Keepalive)
  96. }
  97. for i := 0; i < cap(c.uniq); i++ {
  98. c.uniq <- uint8(i)
  99. }
  100. if err := c.startup(&cfg); err != nil {
  101. conn.Close()
  102. return nil, err
  103. }
  104. go c.serve()
  105. return c, nil
  106. }
  107. func (c *Conn) startup(cfg *ConnConfig) error {
  108. compression := ""
  109. if c.compressor != nil {
  110. compression = c.compressor.Name()
  111. }
  112. var req operation = &startupFrame{
  113. CQLVersion: cfg.CQLVersion,
  114. Compression: compression,
  115. }
  116. var challenger Authenticator
  117. for {
  118. resp, err := c.execSimple(req)
  119. if err != nil {
  120. return err
  121. }
  122. switch x := resp.(type) {
  123. case readyFrame:
  124. return nil
  125. case error:
  126. return x
  127. case authenticateFrame:
  128. if c.auth == nil {
  129. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  130. }
  131. var resp []byte
  132. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  133. if err != nil {
  134. return err
  135. }
  136. req = &authResponseFrame{resp}
  137. case authChallengeFrame:
  138. if challenger == nil {
  139. return fmt.Errorf("authentication error (invalid challenge)")
  140. }
  141. var resp []byte
  142. resp, challenger, err = challenger.Challenge(x.Data)
  143. if err != nil {
  144. return err
  145. }
  146. req = &authResponseFrame{resp}
  147. case authSuccessFrame:
  148. if challenger != nil {
  149. return challenger.Success(x.Data)
  150. }
  151. return nil
  152. default:
  153. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  154. }
  155. }
  156. }
  157. // Serve starts the stream multiplexer for this connection, which is required
  158. // to execute any queries. This method runs as long as the connection is
  159. // open and is therefore usually called in a separate goroutine.
  160. func (c *Conn) serve() {
  161. var (
  162. err error
  163. resp frame
  164. )
  165. for {
  166. resp, err = c.recv()
  167. if err != nil {
  168. break
  169. }
  170. c.dispatch(resp)
  171. }
  172. c.Close()
  173. for id := 0; id < len(c.calls); id++ {
  174. req := &c.calls[id]
  175. if atomic.LoadInt32(&req.active) == 1 {
  176. req.resp <- callResp{nil, err}
  177. }
  178. }
  179. c.pool.HandleError(c, err, true)
  180. }
  181. func (c *Conn) recv() (frame, error) {
  182. resp := make(frame, headerSize, headerSize+512)
  183. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  184. n, last, pinged := 0, 0, false
  185. for n < len(resp) {
  186. nn, err := c.r.Read(resp[n:])
  187. n += nn
  188. if err != nil {
  189. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  190. if n > last {
  191. // we hit the deadline but we made progress.
  192. // simply extend the deadline
  193. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  194. last = n
  195. } else if n == 0 && !pinged {
  196. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  197. if atomic.LoadInt32(&c.nwait) > 0 {
  198. go c.ping()
  199. pinged = true
  200. }
  201. } else {
  202. return nil, err
  203. }
  204. } else {
  205. return nil, err
  206. }
  207. }
  208. if n == headerSize && len(resp) == headerSize {
  209. if resp[0] != c.version|flagResponse {
  210. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  211. }
  212. resp.grow(resp.Length())
  213. }
  214. }
  215. return resp, nil
  216. }
  217. func (c *Conn) execSimple(op operation) (interface{}, error) {
  218. f, err := op.encodeFrame(c.version, nil)
  219. f.setLength(len(f) - headerSize)
  220. if _, err := c.conn.Write([]byte(f)); err != nil {
  221. c.Close()
  222. return nil, err
  223. }
  224. if f, err = c.recv(); err != nil {
  225. return nil, err
  226. }
  227. return c.decodeFrame(f, nil)
  228. }
  229. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  230. req, err := op.encodeFrame(c.version, nil)
  231. if err != nil {
  232. return nil, err
  233. }
  234. if trace != nil {
  235. req[1] |= flagTrace
  236. }
  237. if len(req) > headerSize && c.compressor != nil {
  238. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  239. if err != nil {
  240. return nil, err
  241. }
  242. req = append(req[:headerSize], frame(body)...)
  243. req[1] |= flagCompress
  244. }
  245. req.setLength(len(req) - headerSize)
  246. id := <-c.uniq
  247. req[2] = id
  248. call := &c.calls[id]
  249. call.resp = make(chan callResp, 1)
  250. atomic.AddInt32(&c.nwait, 1)
  251. atomic.StoreInt32(&call.active, 1)
  252. if _, err := c.conn.Write(req); err != nil {
  253. c.uniq <- id
  254. c.Close()
  255. return nil, err
  256. }
  257. reply := <-call.resp
  258. call.resp = nil
  259. c.uniq <- id
  260. if reply.err != nil {
  261. return nil, reply.err
  262. }
  263. return c.decodeFrame(reply.buf, trace)
  264. }
  265. func (c *Conn) dispatch(resp frame) {
  266. id := int(resp[2])
  267. if id >= len(c.calls) {
  268. return
  269. }
  270. call := &c.calls[id]
  271. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  272. return
  273. }
  274. atomic.AddInt32(&c.nwait, -1)
  275. call.resp <- callResp{resp, nil}
  276. }
  277. func (c *Conn) ping() error {
  278. _, err := c.exec(&optionsFrame{}, nil)
  279. return err
  280. }
  281. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  282. c.prepMu.Lock()
  283. flight := c.prep[stmt]
  284. if flight != nil {
  285. c.prepMu.Unlock()
  286. flight.wg.Wait()
  287. return flight.info, flight.err
  288. }
  289. flight = new(inflightPrepare)
  290. flight.wg.Add(1)
  291. c.prep[stmt] = flight
  292. c.prepMu.Unlock()
  293. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  294. if err != nil {
  295. flight.err = err
  296. } else {
  297. switch x := resp.(type) {
  298. case resultPreparedFrame:
  299. flight.info = &queryInfo{
  300. id: x.PreparedId,
  301. args: x.Values,
  302. }
  303. case error:
  304. flight.err = x
  305. default:
  306. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  307. }
  308. }
  309. flight.wg.Done()
  310. if err != nil {
  311. c.prepMu.Lock()
  312. delete(c.prep, stmt)
  313. c.prepMu.Unlock()
  314. }
  315. return flight.info, flight.err
  316. }
  317. func (c *Conn) executeQuery(qry *Query) *Iter {
  318. op := &queryFrame{
  319. Stmt: qry.stmt,
  320. Cons: qry.cons,
  321. PageSize: qry.pageSize,
  322. PageState: qry.pageState,
  323. }
  324. if qry.shouldPrepare() {
  325. // Prepare all DML queries. Other queries can not be prepared.
  326. info, err := c.prepareStatement(qry.stmt, qry.trace)
  327. if err != nil {
  328. return &Iter{err: err}
  329. }
  330. op.Prepared = info.id
  331. op.Values = make([][]byte, len(qry.values))
  332. for i := 0; i < len(qry.values); i++ {
  333. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  334. if err != nil {
  335. return &Iter{err: err}
  336. }
  337. op.Values[i] = val
  338. }
  339. }
  340. resp, err := c.exec(op, qry.trace)
  341. if err != nil {
  342. return &Iter{err: err}
  343. }
  344. switch x := resp.(type) {
  345. case resultVoidFrame:
  346. return &Iter{}
  347. case resultRowsFrame:
  348. iter := &Iter{columns: x.Columns, rows: x.Rows}
  349. if len(x.PagingState) > 0 {
  350. iter.next = &nextIter{
  351. qry: *qry,
  352. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  353. }
  354. iter.next.qry.pageState = x.PagingState
  355. if iter.next.pos < 1 {
  356. iter.next.pos = 1
  357. }
  358. }
  359. return iter
  360. case resultKeyspaceFrame:
  361. return &Iter{}
  362. case RequestErrUnprepared:
  363. c.prepMu.Lock()
  364. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  365. delete(c.prep, qry.stmt)
  366. c.prepMu.Unlock()
  367. return c.executeQuery(qry)
  368. }
  369. c.prepMu.Unlock()
  370. return &Iter{err: x}
  371. case error:
  372. return &Iter{err: x}
  373. default:
  374. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  375. }
  376. }
  377. func (c *Conn) Pick(qry *Query) *Conn {
  378. if c.Closed() {
  379. return nil
  380. }
  381. return c
  382. }
  383. func (c *Conn) Closed() bool {
  384. c.closedMu.RLock()
  385. closed := c.isClosed
  386. c.closedMu.RUnlock()
  387. return closed
  388. }
  389. func (c *Conn) Close() {
  390. c.closedMu.Lock()
  391. if c.isClosed {
  392. c.closedMu.Unlock()
  393. return
  394. }
  395. c.isClosed = true
  396. c.closedMu.Unlock()
  397. c.conn.Close()
  398. }
  399. func (c *Conn) Address() string {
  400. return c.addr
  401. }
  402. func (c *Conn) UseKeyspace(keyspace string) error {
  403. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  404. if err != nil {
  405. return err
  406. }
  407. switch x := resp.(type) {
  408. case resultKeyspaceFrame:
  409. case error:
  410. return x
  411. default:
  412. return NewErrProtocol("Unknown type in response to USE: %s", x)
  413. }
  414. return nil
  415. }
  416. func (c *Conn) executeBatch(batch *Batch) error {
  417. if c.version == 1 {
  418. return ErrUnsupported
  419. }
  420. f := make(frame, headerSize, defaultFrameSize)
  421. f.setHeader(c.version, 0, 0, opBatch)
  422. f.writeByte(byte(batch.Type))
  423. f.writeShort(uint16(len(batch.Entries)))
  424. for i := 0; i < len(batch.Entries); i++ {
  425. entry := &batch.Entries[i]
  426. var info *queryInfo
  427. if len(entry.Args) > 0 {
  428. var err error
  429. info, err = c.prepareStatement(entry.Stmt, nil)
  430. if err != nil {
  431. return err
  432. }
  433. f.writeByte(1)
  434. f.writeShortBytes(info.id)
  435. } else {
  436. f.writeByte(0)
  437. f.writeLongString(entry.Stmt)
  438. }
  439. f.writeShort(uint16(len(entry.Args)))
  440. for j := 0; j < len(entry.Args); j++ {
  441. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  442. if err != nil {
  443. return err
  444. }
  445. f.writeBytes(val)
  446. }
  447. }
  448. f.writeConsistency(batch.Cons)
  449. resp, err := c.exec(f, nil)
  450. if err != nil {
  451. return err
  452. }
  453. switch x := resp.(type) {
  454. case resultVoidFrame:
  455. return nil
  456. case RequestErrUnprepared:
  457. c.prepMu.Lock()
  458. found := false
  459. for stmt, flight := range c.prep {
  460. if flight == nil || flight.info == nil {
  461. continue
  462. }
  463. if bytes.Equal(flight.info.id, x.StatementId) {
  464. found = true
  465. delete(c.prep, stmt)
  466. break
  467. }
  468. }
  469. c.prepMu.Unlock()
  470. if found {
  471. return c.executeBatch(batch)
  472. } else {
  473. return x
  474. }
  475. case error:
  476. return x
  477. default:
  478. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  479. }
  480. }
  481. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  482. defer func() {
  483. if r := recover(); r != nil {
  484. if e, ok := r.(ErrProtocol); ok {
  485. err = e
  486. return
  487. }
  488. panic(r)
  489. }
  490. }()
  491. if len(f) < headerSize {
  492. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  493. } else if f[0] != c.version|flagResponse {
  494. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  495. }
  496. flags, op, f := f[1], f[3], f[headerSize:]
  497. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  498. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  499. return nil, err
  500. } else {
  501. f = frame(buf)
  502. }
  503. }
  504. if flags&flagTrace != 0 {
  505. if len(f) < 16 {
  506. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  507. }
  508. traceId := []byte(f[:16])
  509. f = f[16:]
  510. trace.Trace(traceId)
  511. }
  512. switch op {
  513. case opReady:
  514. return readyFrame{}, nil
  515. case opResult:
  516. switch kind := f.readInt(); kind {
  517. case resultKindVoid:
  518. return resultVoidFrame{}, nil
  519. case resultKindRows:
  520. columns, pageState := f.readMetaData()
  521. numRows := f.readInt()
  522. values := make([][]byte, numRows*len(columns))
  523. for i := 0; i < len(values); i++ {
  524. values[i] = f.readBytes()
  525. }
  526. rows := make([][][]byte, numRows)
  527. for i := 0; i < numRows; i++ {
  528. rows[i], values = values[:len(columns)], values[len(columns):]
  529. }
  530. return resultRowsFrame{columns, rows, pageState}, nil
  531. case resultKindKeyspace:
  532. keyspace := f.readString()
  533. return resultKeyspaceFrame{keyspace}, nil
  534. case resultKindPrepared:
  535. id := f.readShortBytes()
  536. values, _ := f.readMetaData()
  537. return resultPreparedFrame{id, values}, nil
  538. case resultKindSchemaChanged:
  539. return resultVoidFrame{}, nil
  540. default:
  541. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  542. }
  543. case opAuthenticate:
  544. return authenticateFrame{f.readString()}, nil
  545. case opAuthChallenge:
  546. return authChallengeFrame{f.readBytes()}, nil
  547. case opAuthSuccess:
  548. return authSuccessFrame{f.readBytes()}, nil
  549. case opSupported:
  550. return supportedFrame{}, nil
  551. case opError:
  552. return f.readError(), nil
  553. default:
  554. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  555. }
  556. }
  557. type queryInfo struct {
  558. id []byte
  559. args []ColumnInfo
  560. rval []ColumnInfo
  561. }
  562. type callReq struct {
  563. active int32
  564. resp chan callResp
  565. }
  566. type callResp struct {
  567. buf frame
  568. err error
  569. }
  570. type inflightPrepare struct {
  571. info *queryInfo
  572. err error
  573. wg sync.WaitGroup
  574. }