conn.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const defaultFrameSize = 4096
  14. const flagResponse = 0x80
  15. const maskVersion = 0x7F
  16. type Cluster interface {
  17. HandleError(conn *Conn, err error, closed bool)
  18. }
  19. type Authenticator interface {
  20. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  21. Success(data []byte) error
  22. }
  23. type PasswordAuthenticator struct {
  24. Username string
  25. Password string
  26. }
  27. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  28. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  29. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  30. }
  31. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  32. resp[0] = 0
  33. copy(resp[1:], p.Username)
  34. resp[len(p.Username)+1] = 0
  35. copy(resp[2+len(p.Username):], p.Password)
  36. return resp, nil, nil
  37. }
  38. func (p PasswordAuthenticator) Success(data []byte) error {
  39. return nil
  40. }
  41. type ConnConfig struct {
  42. ProtoVersion int
  43. CQLVersion string
  44. Timeout time.Duration
  45. NumStreams int
  46. Compressor Compressor
  47. Authenticator Authenticator
  48. Keepalive time.Duration
  49. }
  50. // Conn is a single connection to a Cassandra node. It can be used to execute
  51. // queries, but users are usually advised to use a more reliable, higher
  52. // level API.
  53. type Conn struct {
  54. conn net.Conn
  55. r *bufio.Reader
  56. timeout time.Duration
  57. uniq chan uint8
  58. calls []callReq
  59. nwait int32
  60. prepMu sync.Mutex
  61. prep map[string]*inflightPrepare
  62. cluster Cluster
  63. compressor Compressor
  64. auth Authenticator
  65. addr string
  66. version uint8
  67. closedMu sync.RWMutex
  68. isClosed bool
  69. }
  70. // Connect establishes a connection to a Cassandra node.
  71. // You must also call the Serve method before you can execute any queries.
  72. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  73. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  74. if err != nil {
  75. return nil, err
  76. }
  77. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  78. cfg.NumStreams = 128
  79. }
  80. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  81. cfg.ProtoVersion = 2
  82. }
  83. c := &Conn{
  84. conn: conn,
  85. r: bufio.NewReader(conn),
  86. uniq: make(chan uint8, cfg.NumStreams),
  87. calls: make([]callReq, cfg.NumStreams),
  88. prep: make(map[string]*inflightPrepare),
  89. timeout: cfg.Timeout,
  90. version: uint8(cfg.ProtoVersion),
  91. addr: conn.RemoteAddr().String(),
  92. cluster: cluster,
  93. compressor: cfg.Compressor,
  94. auth: cfg.Authenticator,
  95. }
  96. if cfg.Keepalive > 0 {
  97. c.setKeepalive(cfg.Keepalive)
  98. }
  99. for i := 0; i < cap(c.uniq); i++ {
  100. c.uniq <- uint8(i)
  101. }
  102. if err := c.startup(&cfg); err != nil {
  103. conn.Close()
  104. return nil, err
  105. }
  106. go c.serve()
  107. return c, nil
  108. }
  109. func (c *Conn) startup(cfg *ConnConfig) error {
  110. compression := ""
  111. if c.compressor != nil {
  112. compression = c.compressor.Name()
  113. }
  114. var req operation = &startupFrame{
  115. CQLVersion: cfg.CQLVersion,
  116. Compression: compression,
  117. }
  118. var challenger Authenticator
  119. for {
  120. resp, err := c.execSimple(req)
  121. if err != nil {
  122. return err
  123. }
  124. switch x := resp.(type) {
  125. case readyFrame:
  126. return nil
  127. case error:
  128. return x
  129. case authenticateFrame:
  130. if c.auth == nil {
  131. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  132. }
  133. var resp []byte
  134. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  135. if err != nil {
  136. return err
  137. }
  138. req = &authResponseFrame{resp}
  139. case authChallengeFrame:
  140. if challenger == nil {
  141. return fmt.Errorf("authentication error (invalid challenge)")
  142. }
  143. var resp []byte
  144. resp, challenger, err = challenger.Challenge(x.Data)
  145. if err != nil {
  146. return err
  147. }
  148. req = &authResponseFrame{resp}
  149. case authSuccessFrame:
  150. if challenger != nil {
  151. return challenger.Success(x.Data)
  152. }
  153. return nil
  154. default:
  155. return ErrProtocol
  156. }
  157. }
  158. }
  159. // Serve starts the stream multiplexer for this connection, which is required
  160. // to execute any queries. This method runs as long as the connection is
  161. // open and is therefore usually called in a separate goroutine.
  162. func (c *Conn) serve() {
  163. var (
  164. err error
  165. resp frame
  166. )
  167. for {
  168. resp, err = c.recv()
  169. if err != nil {
  170. break
  171. }
  172. c.dispatch(resp)
  173. }
  174. c.Close()
  175. for id := 0; id < len(c.calls); id++ {
  176. req := &c.calls[id]
  177. if atomic.LoadInt32(&req.active) == 1 {
  178. req.resp <- callResp{nil, err}
  179. }
  180. }
  181. c.cluster.HandleError(c, err, true)
  182. }
  183. func (c *Conn) recv() (frame, error) {
  184. resp := make(frame, headerSize, headerSize+512)
  185. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  186. n, last, pinged := 0, 0, false
  187. for n < len(resp) {
  188. nn, err := c.r.Read(resp[n:])
  189. n += nn
  190. if err != nil {
  191. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  192. if n > last {
  193. // we hit the deadline but we made progress.
  194. // simply extend the deadline
  195. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  196. last = n
  197. } else if n == 0 && !pinged {
  198. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  199. if atomic.LoadInt32(&c.nwait) > 0 {
  200. go c.ping()
  201. pinged = true
  202. }
  203. } else {
  204. return nil, err
  205. }
  206. } else {
  207. return nil, err
  208. }
  209. }
  210. if n == headerSize && len(resp) == headerSize {
  211. if resp[0] != c.version|flagResponse {
  212. return nil, ErrProtocol
  213. }
  214. resp.grow(resp.Length())
  215. }
  216. }
  217. return resp, nil
  218. }
  219. func (c *Conn) execSimple(op operation) (interface{}, error) {
  220. f, err := op.encodeFrame(c.version, nil)
  221. f.setLength(len(f) - headerSize)
  222. if _, err := c.conn.Write([]byte(f)); err != nil {
  223. c.Close()
  224. return nil, err
  225. }
  226. if f, err = c.recv(); err != nil {
  227. return nil, err
  228. }
  229. return c.decodeFrame(f, nil)
  230. }
  231. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  232. req, err := op.encodeFrame(c.version, nil)
  233. if err != nil {
  234. return nil, err
  235. }
  236. if trace != nil {
  237. req[1] |= flagTrace
  238. }
  239. if len(req) > headerSize && c.compressor != nil {
  240. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  241. if err != nil {
  242. return nil, err
  243. }
  244. req = append(req[:headerSize], frame(body)...)
  245. req[1] |= flagCompress
  246. }
  247. req.setLength(len(req) - headerSize)
  248. id := <-c.uniq
  249. req[2] = id
  250. call := &c.calls[id]
  251. call.resp = make(chan callResp, 1)
  252. atomic.AddInt32(&c.nwait, 1)
  253. atomic.StoreInt32(&call.active, 1)
  254. if _, err := c.conn.Write(req); err != nil {
  255. c.uniq <- id
  256. c.Close()
  257. return nil, err
  258. }
  259. reply := <-call.resp
  260. call.resp = nil
  261. c.uniq <- id
  262. if reply.err != nil {
  263. return nil, reply.err
  264. }
  265. return c.decodeFrame(reply.buf, trace)
  266. }
  267. func (c *Conn) dispatch(resp frame) {
  268. id := int(resp[2])
  269. if id >= len(c.calls) {
  270. return
  271. }
  272. call := &c.calls[id]
  273. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  274. return
  275. }
  276. atomic.AddInt32(&c.nwait, -1)
  277. call.resp <- callResp{resp, nil}
  278. }
  279. func (c *Conn) ping() error {
  280. _, err := c.exec(&optionsFrame{}, nil)
  281. return err
  282. }
  283. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  284. c.prepMu.Lock()
  285. flight := c.prep[stmt]
  286. if flight != nil {
  287. c.prepMu.Unlock()
  288. flight.wg.Wait()
  289. return flight.info, flight.err
  290. }
  291. flight = new(inflightPrepare)
  292. flight.wg.Add(1)
  293. c.prep[stmt] = flight
  294. c.prepMu.Unlock()
  295. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  296. if err != nil {
  297. flight.err = err
  298. } else {
  299. switch x := resp.(type) {
  300. case resultPreparedFrame:
  301. flight.info = &queryInfo{
  302. id: x.PreparedId,
  303. args: x.Values,
  304. }
  305. case error:
  306. flight.err = x
  307. default:
  308. flight.err = ErrProtocol
  309. }
  310. }
  311. flight.wg.Done()
  312. if err != nil {
  313. c.prepMu.Lock()
  314. delete(c.prep, stmt)
  315. c.prepMu.Unlock()
  316. }
  317. return flight.info, flight.err
  318. }
  319. func (c *Conn) executeQuery(qry *Query) *Iter {
  320. op := &queryFrame{
  321. Stmt: qry.stmt,
  322. Cons: qry.cons,
  323. PageSize: qry.pageSize,
  324. PageState: qry.pageState,
  325. }
  326. if qry.ShouldPrepare() {
  327. // Prepare all DML queries. Other queries can not be prepared.
  328. info, err := c.prepareStatement(qry.stmt, qry.trace)
  329. if err != nil {
  330. return &Iter{err: err}
  331. }
  332. op.Prepared = info.id
  333. op.Values = make([][]byte, len(qry.values))
  334. for i := 0; i < len(qry.values); i++ {
  335. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  336. if err != nil {
  337. return &Iter{err: err}
  338. }
  339. op.Values[i] = val
  340. }
  341. }
  342. resp, err := c.exec(op, qry.trace)
  343. if err != nil {
  344. return &Iter{err: err}
  345. }
  346. switch x := resp.(type) {
  347. case resultVoidFrame:
  348. return &Iter{}
  349. case resultRowsFrame:
  350. iter := &Iter{columns: x.Columns, rows: x.Rows}
  351. if len(x.PagingState) > 0 {
  352. iter.next = &nextIter{
  353. qry: *qry,
  354. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  355. }
  356. iter.next.qry.pageState = x.PagingState
  357. if iter.next.pos < 1 {
  358. iter.next.pos = 1
  359. }
  360. }
  361. return iter
  362. case resultKeyspaceFrame:
  363. return &Iter{}
  364. case errorFrame:
  365. if x.Code == errUnprepared && len(qry.values) > 0 {
  366. c.prepMu.Lock()
  367. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  368. delete(c.prep, qry.stmt)
  369. c.prepMu.Unlock()
  370. return c.executeQuery(qry)
  371. }
  372. c.prepMu.Unlock()
  373. return &Iter{err: x}
  374. } else {
  375. return &Iter{err: x}
  376. }
  377. case error:
  378. return &Iter{err: x}
  379. default:
  380. return &Iter{err: ErrProtocol}
  381. }
  382. }
  383. func (c *Conn) Pick(qry *Query) *Conn {
  384. if c.Closed() || len(c.uniq) == 0 {
  385. return nil
  386. }
  387. return c
  388. }
  389. func (c *Conn) Closed() bool {
  390. c.closedMu.RLock()
  391. closed := c.isClosed
  392. c.closedMu.RUnlock()
  393. return closed
  394. }
  395. func (c *Conn) Close() {
  396. c.closedMu.Lock()
  397. if c.isClosed {
  398. c.closedMu.Unlock()
  399. return
  400. }
  401. c.isClosed = true
  402. c.closedMu.Unlock()
  403. c.conn.Close()
  404. }
  405. func (c *Conn) Address() string {
  406. return c.addr
  407. }
  408. func (c *Conn) UseKeyspace(keyspace string) error {
  409. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  410. if err != nil {
  411. return err
  412. }
  413. switch x := resp.(type) {
  414. case resultKeyspaceFrame:
  415. case error:
  416. return x
  417. default:
  418. return ErrProtocol
  419. }
  420. return nil
  421. }
  422. func (c *Conn) executeBatch(batch *Batch) error {
  423. if c.version == 1 {
  424. return ErrUnsupported
  425. }
  426. f := make(frame, headerSize, defaultFrameSize)
  427. f.setHeader(c.version, 0, 0, opBatch)
  428. f.writeByte(byte(batch.Type))
  429. f.writeShort(uint16(len(batch.Entries)))
  430. for i := 0; i < len(batch.Entries); i++ {
  431. entry := &batch.Entries[i]
  432. var info *queryInfo
  433. if len(entry.Args) > 0 {
  434. var err error
  435. info, err = c.prepareStatement(entry.Stmt, nil)
  436. if err != nil {
  437. return err
  438. }
  439. f.writeByte(1)
  440. f.writeShortBytes(info.id)
  441. } else {
  442. f.writeByte(0)
  443. f.writeLongString(entry.Stmt)
  444. }
  445. f.writeShort(uint16(len(entry.Args)))
  446. for j := 0; j < len(entry.Args); j++ {
  447. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  448. if err != nil {
  449. return err
  450. }
  451. f.writeBytes(val)
  452. }
  453. }
  454. f.writeConsistency(batch.Cons)
  455. resp, err := c.exec(f, nil)
  456. if err != nil {
  457. return err
  458. }
  459. switch x := resp.(type) {
  460. case resultVoidFrame:
  461. return nil
  462. case error:
  463. return x
  464. default:
  465. return ErrProtocol
  466. }
  467. }
  468. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  469. defer func() {
  470. if r := recover(); r != nil {
  471. if e, ok := r.(error); ok && e == ErrProtocol {
  472. err = e
  473. return
  474. }
  475. panic(r)
  476. }
  477. }()
  478. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  479. return nil, ErrProtocol
  480. }
  481. flags, op, f := f[1], f[3], f[headerSize:]
  482. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  483. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  484. return nil, err
  485. } else {
  486. f = frame(buf)
  487. }
  488. }
  489. if flags&flagTrace != 0 {
  490. if len(f) < 16 {
  491. return nil, ErrProtocol
  492. }
  493. traceId := []byte(f[:16])
  494. f = f[16:]
  495. trace.Trace(traceId)
  496. }
  497. switch op {
  498. case opReady:
  499. return readyFrame{}, nil
  500. case opResult:
  501. switch kind := f.readInt(); kind {
  502. case resultKindVoid:
  503. return resultVoidFrame{}, nil
  504. case resultKindRows:
  505. columns, pageState := f.readMetaData()
  506. numRows := f.readInt()
  507. values := make([][]byte, numRows*len(columns))
  508. for i := 0; i < len(values); i++ {
  509. values[i] = f.readBytes()
  510. }
  511. rows := make([][][]byte, numRows)
  512. for i := 0; i < numRows; i++ {
  513. rows[i], values = values[:len(columns)], values[len(columns):]
  514. }
  515. return resultRowsFrame{columns, rows, pageState}, nil
  516. case resultKindKeyspace:
  517. keyspace := f.readString()
  518. return resultKeyspaceFrame{keyspace}, nil
  519. case resultKindPrepared:
  520. id := f.readShortBytes()
  521. values, _ := f.readMetaData()
  522. return resultPreparedFrame{id, values}, nil
  523. case resultKindSchemaChanged:
  524. return resultVoidFrame{}, nil
  525. default:
  526. return nil, ErrProtocol
  527. }
  528. case opAuthenticate:
  529. return authenticateFrame{f.readString()}, nil
  530. case opAuthChallenge:
  531. return authChallengeFrame{f.readBytes()}, nil
  532. case opAuthSuccess:
  533. return authSuccessFrame{f.readBytes()}, nil
  534. case opSupported:
  535. return supportedFrame{}, nil
  536. case opError:
  537. code := f.readInt()
  538. msg := f.readString()
  539. return errorFrame{code, msg}, nil
  540. default:
  541. return nil, ErrProtocol
  542. }
  543. }
  544. type queryInfo struct {
  545. id []byte
  546. args []ColumnInfo
  547. rval []ColumnInfo
  548. }
  549. type callReq struct {
  550. active int32
  551. resp chan callResp
  552. }
  553. type callResp struct {
  554. buf frame
  555. err error
  556. }
  557. type inflightPrepare struct {
  558. info *queryInfo
  559. err error
  560. wg sync.WaitGroup
  561. }