conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "unicode"
  14. )
  15. const defaultFrameSize = 4096
  16. const flagResponse = 0x80
  17. const maskVersion = 0x7F
  18. type Cluster interface {
  19. HandleError(conn *Conn, err error, closed bool)
  20. }
  21. type Authenticator interface {
  22. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  23. Success(data []byte) error
  24. }
  25. type PasswordAuthenticator struct {
  26. Username string
  27. Password string
  28. }
  29. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  30. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  31. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  32. }
  33. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  34. resp[0] = 0
  35. copy(resp[1:], p.Username)
  36. resp[len(p.Username)+1] = 0
  37. copy(resp[2+len(p.Username):], p.Password)
  38. return resp, nil, nil
  39. }
  40. func (p PasswordAuthenticator) Success(data []byte) error {
  41. return nil
  42. }
  43. type ConnConfig struct {
  44. ProtoVersion int
  45. CQLVersion string
  46. Timeout time.Duration
  47. NumStreams int
  48. Compressor Compressor
  49. Authenticator Authenticator
  50. Keepalive time.Duration
  51. }
  52. // Conn is a single connection to a Cassandra node. It can be used to execute
  53. // queries, but users are usually advised to use a more reliable, higher
  54. // level API.
  55. type Conn struct {
  56. conn net.Conn
  57. r *bufio.Reader
  58. timeout time.Duration
  59. uniq chan uint8
  60. calls []callReq
  61. nwait int32
  62. prepMu sync.Mutex
  63. prep map[string]*inflightPrepare
  64. cluster Cluster
  65. compressor Compressor
  66. auth Authenticator
  67. addr string
  68. version uint8
  69. closedMu sync.RWMutex
  70. isClosed bool
  71. }
  72. // Connect establishes a connection to a Cassandra node.
  73. // You must also call the Serve method before you can execute any queries.
  74. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  75. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  76. if err != nil {
  77. return nil, err
  78. }
  79. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  80. cfg.NumStreams = 128
  81. }
  82. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  83. cfg.ProtoVersion = 2
  84. }
  85. c := &Conn{
  86. conn: conn,
  87. r: bufio.NewReader(conn),
  88. uniq: make(chan uint8, cfg.NumStreams),
  89. calls: make([]callReq, cfg.NumStreams),
  90. prep: make(map[string]*inflightPrepare),
  91. timeout: cfg.Timeout,
  92. version: uint8(cfg.ProtoVersion),
  93. addr: conn.RemoteAddr().String(),
  94. cluster: cluster,
  95. compressor: cfg.Compressor,
  96. auth: cfg.Authenticator,
  97. }
  98. if cfg.Keepalive > 0 {
  99. c.setKeepalive(cfg.Keepalive)
  100. }
  101. for i := 0; i < cap(c.uniq); i++ {
  102. c.uniq <- uint8(i)
  103. }
  104. if err := c.startup(&cfg); err != nil {
  105. conn.Close()
  106. return nil, err
  107. }
  108. go c.serve()
  109. return c, nil
  110. }
  111. func (c *Conn) startup(cfg *ConnConfig) error {
  112. compression := ""
  113. if c.compressor != nil {
  114. compression = c.compressor.Name()
  115. }
  116. var req operation = &startupFrame{
  117. CQLVersion: cfg.CQLVersion,
  118. Compression: compression,
  119. }
  120. var challenger Authenticator
  121. for {
  122. resp, err := c.execSimple(req)
  123. if err != nil {
  124. return err
  125. }
  126. switch x := resp.(type) {
  127. case readyFrame:
  128. return nil
  129. case error:
  130. return x
  131. case authenticateFrame:
  132. if c.auth == nil {
  133. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  134. }
  135. var resp []byte
  136. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  137. if err != nil {
  138. return err
  139. }
  140. req = &authResponseFrame{resp}
  141. case authChallengeFrame:
  142. if challenger == nil {
  143. return fmt.Errorf("authentication error (invalid challenge)")
  144. }
  145. var resp []byte
  146. resp, challenger, err = challenger.Challenge(x.Data)
  147. if err != nil {
  148. return err
  149. }
  150. req = &authResponseFrame{resp}
  151. case authSuccessFrame:
  152. if challenger != nil {
  153. return challenger.Success(x.Data)
  154. }
  155. return nil
  156. default:
  157. return ErrProtocol
  158. }
  159. }
  160. }
  161. // Serve starts the stream multiplexer for this connection, which is required
  162. // to execute any queries. This method runs as long as the connection is
  163. // open and is therefore usually called in a separate goroutine.
  164. func (c *Conn) serve() {
  165. var (
  166. err error
  167. resp frame
  168. )
  169. for {
  170. resp, err = c.recv()
  171. if err != nil {
  172. break
  173. }
  174. c.dispatch(resp)
  175. }
  176. c.Close()
  177. for id := 0; id < len(c.calls); id++ {
  178. req := &c.calls[id]
  179. if atomic.LoadInt32(&req.active) == 1 {
  180. req.resp <- callResp{nil, err}
  181. }
  182. }
  183. c.cluster.HandleError(c, err, true)
  184. }
  185. func (c *Conn) recv() (frame, error) {
  186. resp := make(frame, headerSize, headerSize+512)
  187. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  188. n, last, pinged := 0, 0, false
  189. for n < len(resp) {
  190. nn, err := c.r.Read(resp[n:])
  191. n += nn
  192. if err != nil {
  193. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  194. if n > last {
  195. // we hit the deadline but we made progress.
  196. // simply extend the deadline
  197. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  198. last = n
  199. } else if n == 0 && !pinged {
  200. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  201. if atomic.LoadInt32(&c.nwait) > 0 {
  202. go c.ping()
  203. pinged = true
  204. }
  205. } else {
  206. return nil, err
  207. }
  208. } else {
  209. return nil, err
  210. }
  211. }
  212. if n == headerSize && len(resp) == headerSize {
  213. if resp[0] != c.version|flagResponse {
  214. return nil, ErrProtocol
  215. }
  216. resp.grow(resp.Length())
  217. }
  218. }
  219. return resp, nil
  220. }
  221. func (c *Conn) execSimple(op operation) (interface{}, error) {
  222. f, err := op.encodeFrame(c.version, nil)
  223. f.setLength(len(f) - headerSize)
  224. if _, err := c.conn.Write([]byte(f)); err != nil {
  225. c.Close()
  226. return nil, err
  227. }
  228. if f, err = c.recv(); err != nil {
  229. return nil, err
  230. }
  231. return c.decodeFrame(f, nil)
  232. }
  233. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  234. req, err := op.encodeFrame(c.version, nil)
  235. if err != nil {
  236. return nil, err
  237. }
  238. if trace != nil {
  239. req[1] |= flagTrace
  240. }
  241. if len(req) > headerSize && c.compressor != nil {
  242. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  243. if err != nil {
  244. return nil, err
  245. }
  246. req = append(req[:headerSize], frame(body)...)
  247. req[1] |= flagCompress
  248. }
  249. req.setLength(len(req) - headerSize)
  250. id := <-c.uniq
  251. req[2] = id
  252. call := &c.calls[id]
  253. call.resp = make(chan callResp, 1)
  254. atomic.AddInt32(&c.nwait, 1)
  255. atomic.StoreInt32(&call.active, 1)
  256. if _, err := c.conn.Write(req); err != nil {
  257. c.uniq <- id
  258. c.Close()
  259. return nil, err
  260. }
  261. reply := <-call.resp
  262. call.resp = nil
  263. c.uniq <- id
  264. if reply.err != nil {
  265. return nil, reply.err
  266. }
  267. return c.decodeFrame(reply.buf, trace)
  268. }
  269. func (c *Conn) dispatch(resp frame) {
  270. id := int(resp[2])
  271. if id >= len(c.calls) {
  272. return
  273. }
  274. call := &c.calls[id]
  275. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  276. return
  277. }
  278. atomic.AddInt32(&c.nwait, -1)
  279. call.resp <- callResp{resp, nil}
  280. }
  281. func (c *Conn) ping() error {
  282. _, err := c.exec(&optionsFrame{}, nil)
  283. return err
  284. }
  285. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  286. c.prepMu.Lock()
  287. flight := c.prep[stmt]
  288. if flight != nil {
  289. c.prepMu.Unlock()
  290. flight.wg.Wait()
  291. return flight.info, flight.err
  292. }
  293. flight = new(inflightPrepare)
  294. flight.wg.Add(1)
  295. c.prep[stmt] = flight
  296. c.prepMu.Unlock()
  297. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  298. if err != nil {
  299. flight.err = err
  300. } else {
  301. switch x := resp.(type) {
  302. case resultPreparedFrame:
  303. flight.info = &queryInfo{
  304. id: x.PreparedId,
  305. args: x.Values,
  306. }
  307. case error:
  308. flight.err = x
  309. default:
  310. flight.err = ErrProtocol
  311. }
  312. }
  313. flight.wg.Done()
  314. if err != nil {
  315. c.prepMu.Lock()
  316. delete(c.prep, stmt)
  317. c.prepMu.Unlock()
  318. }
  319. return flight.info, flight.err
  320. }
  321. func shouldPrepare(stmt string) bool {
  322. stmt = strings.TrimLeftFunc(strings.TrimRightFunc(stmt, func(r rune) bool {
  323. return unicode.IsSpace(r) || r == ';'
  324. }), unicode.IsSpace)
  325. var stmtType string
  326. if n := strings.IndexFunc(stmt, unicode.IsSpace); n >= 0 {
  327. stmtType = strings.ToLower(stmt[:n])
  328. }
  329. if stmtType == "begin" {
  330. if n := strings.LastIndexFunc(stmt, unicode.IsSpace); n >= 0 {
  331. stmtType = strings.ToLower(stmt[n+1:])
  332. }
  333. }
  334. switch stmtType {
  335. case "select", "insert", "update", "delete", "batch":
  336. return true
  337. }
  338. return false
  339. }
  340. func (c *Conn) executeQuery(qry *Query) *Iter {
  341. op := &queryFrame{
  342. Stmt: qry.stmt,
  343. Cons: qry.cons,
  344. PageSize: qry.pageSize,
  345. PageState: qry.pageState,
  346. }
  347. if shouldPrepare(op.Stmt) {
  348. // Prepare all DML queries. Other queries can not be prepared.
  349. info, err := c.prepareStatement(qry.stmt, qry.trace)
  350. if err != nil {
  351. return &Iter{err: err}
  352. }
  353. op.Prepared = info.id
  354. op.Values = make([][]byte, len(qry.values))
  355. for i := 0; i < len(qry.values); i++ {
  356. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  357. if err != nil {
  358. return &Iter{err: err}
  359. }
  360. op.Values[i] = val
  361. }
  362. }
  363. resp, err := c.exec(op, qry.trace)
  364. if err != nil {
  365. return &Iter{err: err}
  366. }
  367. switch x := resp.(type) {
  368. case resultVoidFrame:
  369. return &Iter{}
  370. case resultRowsFrame:
  371. iter := &Iter{columns: x.Columns, rows: x.Rows}
  372. if len(x.PagingState) > 0 {
  373. iter.next = &nextIter{
  374. qry: *qry,
  375. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  376. }
  377. iter.next.qry.pageState = x.PagingState
  378. if iter.next.pos < 1 {
  379. iter.next.pos = 1
  380. }
  381. }
  382. return iter
  383. case resultKeyspaceFrame:
  384. return &Iter{}
  385. case errorFrame:
  386. if x.Code == errUnprepared && len(qry.values) > 0 {
  387. c.prepMu.Lock()
  388. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  389. delete(c.prep, qry.stmt)
  390. c.prepMu.Unlock()
  391. return c.executeQuery(qry)
  392. }
  393. c.prepMu.Unlock()
  394. return &Iter{err: x}
  395. } else {
  396. return &Iter{err: x}
  397. }
  398. case error:
  399. return &Iter{err: x}
  400. default:
  401. return &Iter{err: ErrProtocol}
  402. }
  403. }
  404. func (c *Conn) Pick(qry *Query) *Conn {
  405. if c.Closed() || len(c.uniq) == 0 {
  406. return nil
  407. }
  408. return c
  409. }
  410. func (c *Conn) Closed() bool {
  411. c.closedMu.RLock()
  412. closed := c.isClosed
  413. c.closedMu.RUnlock()
  414. return closed
  415. }
  416. func (c *Conn) Close() {
  417. c.closedMu.Lock()
  418. if c.isClosed {
  419. c.closedMu.Unlock()
  420. return
  421. }
  422. c.isClosed = true
  423. c.closedMu.Unlock()
  424. c.conn.Close()
  425. }
  426. func (c *Conn) Address() string {
  427. return c.addr
  428. }
  429. func (c *Conn) UseKeyspace(keyspace string) error {
  430. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  431. if err != nil {
  432. return err
  433. }
  434. switch x := resp.(type) {
  435. case resultKeyspaceFrame:
  436. case error:
  437. return x
  438. default:
  439. return ErrProtocol
  440. }
  441. return nil
  442. }
  443. func (c *Conn) executeBatch(batch *Batch) error {
  444. if c.version == 1 {
  445. return ErrUnsupported
  446. }
  447. f := make(frame, headerSize, defaultFrameSize)
  448. f.setHeader(c.version, 0, 0, opBatch)
  449. f.writeByte(byte(batch.Type))
  450. f.writeShort(uint16(len(batch.Entries)))
  451. for i := 0; i < len(batch.Entries); i++ {
  452. entry := &batch.Entries[i]
  453. var info *queryInfo
  454. if len(entry.Args) > 0 {
  455. var err error
  456. info, err = c.prepareStatement(entry.Stmt, nil)
  457. if err != nil {
  458. return err
  459. }
  460. f.writeByte(1)
  461. f.writeShortBytes(info.id)
  462. } else {
  463. f.writeByte(0)
  464. f.writeLongString(entry.Stmt)
  465. }
  466. f.writeShort(uint16(len(entry.Args)))
  467. for j := 0; j < len(entry.Args); j++ {
  468. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  469. if err != nil {
  470. return err
  471. }
  472. f.writeBytes(val)
  473. }
  474. }
  475. f.writeConsistency(batch.Cons)
  476. resp, err := c.exec(f, nil)
  477. if err != nil {
  478. return err
  479. }
  480. switch x := resp.(type) {
  481. case resultVoidFrame:
  482. return nil
  483. case error:
  484. return x
  485. default:
  486. return ErrProtocol
  487. }
  488. }
  489. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  490. defer func() {
  491. if r := recover(); r != nil {
  492. if e, ok := r.(error); ok && e == ErrProtocol {
  493. err = e
  494. return
  495. }
  496. panic(r)
  497. }
  498. }()
  499. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  500. return nil, ErrProtocol
  501. }
  502. flags, op, f := f[1], f[3], f[headerSize:]
  503. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  504. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  505. return nil, err
  506. } else {
  507. f = frame(buf)
  508. }
  509. }
  510. if flags&flagTrace != 0 {
  511. if len(f) < 16 {
  512. return nil, ErrProtocol
  513. }
  514. traceId := []byte(f[:16])
  515. f = f[16:]
  516. trace.Trace(traceId)
  517. }
  518. switch op {
  519. case opReady:
  520. return readyFrame{}, nil
  521. case opResult:
  522. switch kind := f.readInt(); kind {
  523. case resultKindVoid:
  524. return resultVoidFrame{}, nil
  525. case resultKindRows:
  526. columns, pageState := f.readMetaData()
  527. numRows := f.readInt()
  528. values := make([][]byte, numRows*len(columns))
  529. for i := 0; i < len(values); i++ {
  530. values[i] = f.readBytes()
  531. }
  532. rows := make([][][]byte, numRows)
  533. for i := 0; i < numRows; i++ {
  534. rows[i], values = values[:len(columns)], values[len(columns):]
  535. }
  536. return resultRowsFrame{columns, rows, pageState}, nil
  537. case resultKindKeyspace:
  538. keyspace := f.readString()
  539. return resultKeyspaceFrame{keyspace}, nil
  540. case resultKindPrepared:
  541. id := f.readShortBytes()
  542. values, _ := f.readMetaData()
  543. return resultPreparedFrame{id, values}, nil
  544. case resultKindSchemaChanged:
  545. return resultVoidFrame{}, nil
  546. default:
  547. return nil, ErrProtocol
  548. }
  549. case opAuthenticate:
  550. return authenticateFrame{f.readString()}, nil
  551. case opAuthChallenge:
  552. return authChallengeFrame{f.readBytes()}, nil
  553. case opAuthSuccess:
  554. return authSuccessFrame{f.readBytes()}, nil
  555. case opSupported:
  556. return supportedFrame{}, nil
  557. case opError:
  558. code := f.readInt()
  559. msg := f.readString()
  560. return errorFrame{code, msg}, nil
  561. default:
  562. return nil, ErrProtocol
  563. }
  564. }
  565. type queryInfo struct {
  566. id []byte
  567. args []ColumnInfo
  568. rval []ColumnInfo
  569. }
  570. type callReq struct {
  571. active int32
  572. resp chan callResp
  573. }
  574. type callResp struct {
  575. buf frame
  576. err error
  577. }
  578. type inflightPrepare struct {
  579. info *queryInfo
  580. err error
  581. wg sync.WaitGroup
  582. }