conn.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "fmt"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "unicode"
  14. "code.google.com/p/snappy-go/snappy"
  15. )
  16. const defaultFrameSize = 4096
  17. const flagResponse = 0x80
  18. const maskVersion = 0x7F
  19. type Cluster interface {
  20. HandleError(conn *Conn, err error, closed bool)
  21. HandleKeyspace(conn *Conn, keyspace string)
  22. }
  23. type Authenticator interface {
  24. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  25. Success(data []byte) error
  26. }
  27. type PasswordAuthenticator struct {
  28. Username string
  29. Password string
  30. }
  31. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  32. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  33. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  34. }
  35. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  36. resp[0] = 0
  37. copy(resp[1:], p.Username)
  38. resp[len(p.Username)+1] = 0
  39. copy(resp[2+len(p.Username):], p.Password)
  40. return resp, nil, nil
  41. }
  42. func (p PasswordAuthenticator) Success(data []byte) error {
  43. return nil
  44. }
  45. type ConnConfig struct {
  46. ProtoVersion int
  47. CQLVersion string
  48. Timeout time.Duration
  49. NumStreams int
  50. Compressor Compressor
  51. Authenticator Authenticator
  52. }
  53. // Conn is a single connection to a Cassandra node. It can be used to execute
  54. // queries, but users are usually advised to use a more reliable, higher
  55. // level API.
  56. type Conn struct {
  57. conn net.Conn
  58. r *bufio.Reader
  59. timeout time.Duration
  60. uniq chan uint8
  61. calls []callReq
  62. nwait int32
  63. prepMu sync.Mutex
  64. prep map[string]*inflightPrepare
  65. cluster Cluster
  66. compressor Compressor
  67. auth Authenticator
  68. addr string
  69. version uint8
  70. }
  71. // Connect establishes a connection to a Cassandra node.
  72. // You must also call the Serve method before you can execute any queries.
  73. func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
  74. conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
  75. if err != nil {
  76. return nil, err
  77. }
  78. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  79. cfg.NumStreams = 128
  80. }
  81. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  82. cfg.ProtoVersion = 2
  83. }
  84. c := &Conn{
  85. conn: conn,
  86. r: bufio.NewReader(conn),
  87. uniq: make(chan uint8, cfg.NumStreams),
  88. calls: make([]callReq, cfg.NumStreams),
  89. prep: make(map[string]*inflightPrepare),
  90. timeout: cfg.Timeout,
  91. version: uint8(cfg.ProtoVersion),
  92. addr: conn.RemoteAddr().String(),
  93. cluster: cluster,
  94. compressor: cfg.Compressor,
  95. auth: cfg.Authenticator,
  96. }
  97. for i := 0; i < cap(c.uniq); i++ {
  98. c.uniq <- uint8(i)
  99. }
  100. if err := c.startup(&cfg); err != nil {
  101. return nil, err
  102. }
  103. go c.serve()
  104. return c, nil
  105. }
  106. func (c *Conn) startup(cfg *ConnConfig) error {
  107. compression := ""
  108. if c.compressor != nil {
  109. compression = c.compressor.Name()
  110. }
  111. var req operation = &startupFrame{
  112. CQLVersion: cfg.CQLVersion,
  113. Compression: compression,
  114. }
  115. var challenger Authenticator
  116. for {
  117. resp, err := c.execSimple(req)
  118. if err != nil {
  119. return err
  120. }
  121. switch x := resp.(type) {
  122. case readyFrame:
  123. return nil
  124. case error:
  125. return x
  126. case authenticateFrame:
  127. if c.auth == nil {
  128. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  129. }
  130. var resp []byte
  131. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  132. if err != nil {
  133. return err
  134. }
  135. req = &authResponseFrame{resp}
  136. case authChallengeFrame:
  137. if challenger == nil {
  138. return fmt.Errorf("authentication error (invalid challenge)")
  139. }
  140. var resp []byte
  141. resp, challenger, err = challenger.Challenge(x.Data)
  142. if err != nil {
  143. return err
  144. }
  145. req = &authResponseFrame{resp}
  146. case authSuccessFrame:
  147. if challenger != nil {
  148. return challenger.Success(x.Data)
  149. }
  150. return nil
  151. default:
  152. return ErrProtocol
  153. }
  154. }
  155. }
  156. // Serve starts the stream multiplexer for this connection, which is required
  157. // to execute any queries. This method runs as long as the connection is
  158. // open and is therefore usually called in a separate goroutine.
  159. func (c *Conn) serve() {
  160. for {
  161. resp, err := c.recv()
  162. if err != nil {
  163. break
  164. }
  165. c.dispatch(resp)
  166. }
  167. c.conn.Close()
  168. for id := 0; id < len(c.calls); id++ {
  169. req := &c.calls[id]
  170. if atomic.LoadInt32(&req.active) == 1 {
  171. req.resp <- callResp{nil, ErrProtocol}
  172. }
  173. }
  174. c.cluster.HandleError(c, ErrProtocol, true)
  175. }
  176. func (c *Conn) recv() (frame, error) {
  177. resp := make(frame, headerSize, headerSize+512)
  178. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  179. n, last, pinged := 0, 0, false
  180. for n < len(resp) {
  181. nn, err := c.r.Read(resp[n:])
  182. n += nn
  183. if err != nil {
  184. if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
  185. if n > last {
  186. // we hit the deadline but we made progress.
  187. // simply extend the deadline
  188. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  189. last = n
  190. } else if n == 0 && !pinged {
  191. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  192. if atomic.LoadInt32(&c.nwait) > 0 {
  193. go c.ping()
  194. pinged = true
  195. }
  196. } else {
  197. return nil, err
  198. }
  199. } else {
  200. return nil, err
  201. }
  202. }
  203. if n == headerSize && len(resp) == headerSize {
  204. if resp[0] != c.version|flagResponse {
  205. return nil, ErrProtocol
  206. }
  207. resp.grow(resp.Length())
  208. }
  209. }
  210. return resp, nil
  211. }
  212. func (c *Conn) execSimple(op operation) (interface{}, error) {
  213. f, err := op.encodeFrame(c.version, nil)
  214. f.setLength(len(f) - headerSize)
  215. if _, err := c.conn.Write([]byte(f)); err != nil {
  216. c.conn.Close()
  217. return nil, err
  218. }
  219. if f, err = c.recv(); err != nil {
  220. return nil, err
  221. }
  222. return c.decodeFrame(f, nil)
  223. }
  224. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  225. req, err := op.encodeFrame(c.version, nil)
  226. if err != nil {
  227. return nil, err
  228. }
  229. if trace != nil {
  230. req[1] |= flagTrace
  231. }
  232. if len(req) > headerSize && c.compressor != nil {
  233. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  234. if err != nil {
  235. return nil, err
  236. }
  237. req = append(req[:headerSize], frame(body)...)
  238. req[1] |= flagCompress
  239. }
  240. req.setLength(len(req) - headerSize)
  241. id := <-c.uniq
  242. req[2] = id
  243. call := &c.calls[id]
  244. call.resp = make(chan callResp, 1)
  245. atomic.AddInt32(&c.nwait, 1)
  246. atomic.StoreInt32(&call.active, 1)
  247. if n, err := c.conn.Write(req); err != nil {
  248. c.conn.Close()
  249. c.uniq <- id
  250. if n > 0 {
  251. return nil, ErrProtocol
  252. }
  253. return nil, ErrUnavailable
  254. }
  255. reply := <-call.resp
  256. call.resp = nil
  257. c.uniq <- id
  258. if reply.err != nil {
  259. return nil, reply.err
  260. }
  261. return c.decodeFrame(reply.buf, trace)
  262. }
  263. func (c *Conn) dispatch(resp frame) {
  264. id := int(resp[2])
  265. if id >= len(c.calls) {
  266. return
  267. }
  268. call := &c.calls[id]
  269. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  270. return
  271. }
  272. atomic.AddInt32(&c.nwait, -1)
  273. call.resp <- callResp{resp, nil}
  274. }
  275. func (c *Conn) ping() error {
  276. _, err := c.exec(&optionsFrame{}, nil)
  277. return err
  278. }
  279. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
  280. c.prepMu.Lock()
  281. flight := c.prep[stmt]
  282. if flight != nil {
  283. c.prepMu.Unlock()
  284. flight.wg.Wait()
  285. return flight.info, flight.err
  286. }
  287. flight = new(inflightPrepare)
  288. flight.wg.Add(1)
  289. c.prep[stmt] = flight
  290. c.prepMu.Unlock()
  291. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  292. if err != nil {
  293. flight.err = err
  294. } else {
  295. switch x := resp.(type) {
  296. case resultPreparedFrame:
  297. flight.info = &queryInfo{
  298. id: x.PreparedId,
  299. args: x.Values,
  300. }
  301. case error:
  302. flight.err = x
  303. default:
  304. flight.err = ErrProtocol
  305. }
  306. }
  307. flight.wg.Done()
  308. if err != nil {
  309. c.prepMu.Lock()
  310. delete(c.prep, stmt)
  311. c.prepMu.Unlock()
  312. }
  313. return flight.info, flight.err
  314. }
  315. func (c *Conn) executeQuery(qry *Query) *Iter {
  316. op := &queryFrame{
  317. Stmt: strings.TrimSpace(qry.stmt),
  318. Cons: qry.cons,
  319. PageSize: qry.pageSize,
  320. PageState: qry.pageState,
  321. }
  322. stmtType := op.Stmt
  323. if n := strings.IndexFunc(stmtType, unicode.IsSpace); n >= 0 {
  324. stmtType = strings.ToLower(stmtType[:n])
  325. switch stmtType {
  326. case "begin":
  327. stmtTail := strings.TrimSpace(op.Stmt[n:])
  328. if n := strings.IndexFunc(stmtTail, unicode.IsSpace); n >= 0 {
  329. stmtType = stmtType + " " + strings.ToLower(stmtTail[:n])
  330. }
  331. }
  332. }
  333. switch stmtType {
  334. case "select", "insert", "update", "delete", "begin batch":
  335. // Prepare all DML queries. Other queries can not be prepared.
  336. info, err := c.prepareStatement(qry.stmt, qry.trace)
  337. if err != nil {
  338. return &Iter{err: err}
  339. }
  340. op.Prepared = info.id
  341. op.Values = make([][]byte, len(qry.values))
  342. for i := 0; i < len(qry.values); i++ {
  343. val, err := Marshal(info.args[i].TypeInfo, qry.values[i])
  344. if err != nil {
  345. return &Iter{err: err}
  346. }
  347. op.Values[i] = val
  348. }
  349. }
  350. resp, err := c.exec(op, qry.trace)
  351. if err != nil {
  352. return &Iter{err: err}
  353. }
  354. switch x := resp.(type) {
  355. case resultVoidFrame:
  356. return &Iter{}
  357. case resultRowsFrame:
  358. iter := &Iter{columns: x.Columns, rows: x.Rows}
  359. if len(x.PagingState) > 0 {
  360. iter.next = &nextIter{
  361. qry: *qry,
  362. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  363. }
  364. iter.next.qry.pageState = x.PagingState
  365. if iter.next.pos < 1 {
  366. iter.next.pos = 1
  367. }
  368. }
  369. return iter
  370. case resultKeyspaceFrame:
  371. c.cluster.HandleKeyspace(c, x.Keyspace)
  372. return &Iter{}
  373. case errorFrame:
  374. if x.Code == errUnprepared && len(qry.values) > 0 {
  375. c.prepMu.Lock()
  376. if val, ok := c.prep[qry.stmt]; ok && val != nil {
  377. delete(c.prep, qry.stmt)
  378. c.prepMu.Unlock()
  379. return c.executeQuery(qry)
  380. }
  381. c.prepMu.Unlock()
  382. return &Iter{err: x}
  383. } else {
  384. return &Iter{err: x}
  385. }
  386. case error:
  387. return &Iter{err: x}
  388. default:
  389. return &Iter{err: ErrProtocol}
  390. }
  391. }
  392. func (c *Conn) Pick(qry *Query) *Conn {
  393. return c
  394. }
  395. func (c *Conn) Close() {
  396. c.conn.Close()
  397. }
  398. func (c *Conn) Address() string {
  399. return c.addr
  400. }
  401. func (c *Conn) UseKeyspace(keyspace string) error {
  402. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  403. if err != nil {
  404. return err
  405. }
  406. switch x := resp.(type) {
  407. case resultKeyspaceFrame:
  408. case error:
  409. return x
  410. default:
  411. return ErrProtocol
  412. }
  413. return nil
  414. }
  415. func (c *Conn) executeBatch(batch *Batch) error {
  416. if c.version == 1 {
  417. return ErrUnsupported
  418. }
  419. f := make(frame, headerSize, defaultFrameSize)
  420. f.setHeader(c.version, 0, 0, opBatch)
  421. f.writeByte(byte(batch.Type))
  422. f.writeShort(uint16(len(batch.Entries)))
  423. for i := 0; i < len(batch.Entries); i++ {
  424. entry := &batch.Entries[i]
  425. var info *queryInfo
  426. if len(entry.Args) > 0 {
  427. var err error
  428. info, err = c.prepareStatement(entry.Stmt, nil)
  429. if err != nil {
  430. return err
  431. }
  432. f.writeByte(1)
  433. f.writeShortBytes(info.id)
  434. } else {
  435. f.writeByte(0)
  436. f.writeLongString(entry.Stmt)
  437. }
  438. f.writeShort(uint16(len(entry.Args)))
  439. for j := 0; j < len(entry.Args); j++ {
  440. val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
  441. if err != nil {
  442. return err
  443. }
  444. f.writeBytes(val)
  445. }
  446. }
  447. f.writeConsistency(batch.Cons)
  448. resp, err := c.exec(f, nil)
  449. if err != nil {
  450. return err
  451. }
  452. switch x := resp.(type) {
  453. case resultVoidFrame:
  454. return nil
  455. case error:
  456. return x
  457. default:
  458. return ErrProtocol
  459. }
  460. }
  461. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  462. defer func() {
  463. if r := recover(); r != nil {
  464. if e, ok := r.(error); ok && e == ErrProtocol {
  465. err = e
  466. return
  467. }
  468. panic(r)
  469. }
  470. }()
  471. if len(f) < headerSize || (f[0] != c.version|flagResponse) {
  472. return nil, ErrProtocol
  473. }
  474. flags, op, f := f[1], f[3], f[headerSize:]
  475. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  476. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  477. return nil, err
  478. } else {
  479. f = frame(buf)
  480. }
  481. }
  482. if flags&flagTrace != 0 {
  483. if len(f) < 16 {
  484. return nil, ErrProtocol
  485. }
  486. traceId := []byte(f[:16])
  487. f = f[16:]
  488. trace.Trace(traceId)
  489. }
  490. switch op {
  491. case opReady:
  492. return readyFrame{}, nil
  493. case opResult:
  494. switch kind := f.readInt(); kind {
  495. case resultKindVoid:
  496. return resultVoidFrame{}, nil
  497. case resultKindRows:
  498. columns, pageState := f.readMetaData()
  499. numRows := f.readInt()
  500. values := make([][]byte, numRows*len(columns))
  501. for i := 0; i < len(values); i++ {
  502. values[i] = f.readBytes()
  503. }
  504. rows := make([][][]byte, numRows)
  505. for i := 0; i < numRows; i++ {
  506. rows[i], values = values[:len(columns)], values[len(columns):]
  507. }
  508. return resultRowsFrame{columns, rows, pageState}, nil
  509. case resultKindKeyspace:
  510. keyspace := f.readString()
  511. return resultKeyspaceFrame{keyspace}, nil
  512. case resultKindPrepared:
  513. id := f.readShortBytes()
  514. values, _ := f.readMetaData()
  515. return resultPreparedFrame{id, values}, nil
  516. case resultKindSchemaChanged:
  517. return resultVoidFrame{}, nil
  518. default:
  519. return nil, ErrProtocol
  520. }
  521. case opAuthenticate:
  522. return authenticateFrame{f.readString()}, nil
  523. case opAuthChallenge:
  524. return authChallengeFrame{f.readBytes()}, nil
  525. case opAuthSuccess:
  526. return authSuccessFrame{f.readBytes()}, nil
  527. case opSupported:
  528. return supportedFrame{}, nil
  529. case opError:
  530. code := f.readInt()
  531. msg := f.readString()
  532. return errorFrame{code, msg}, nil
  533. default:
  534. return nil, ErrProtocol
  535. }
  536. }
  537. type queryInfo struct {
  538. id []byte
  539. args []ColumnInfo
  540. rval []ColumnInfo
  541. }
  542. type callReq struct {
  543. active int32
  544. resp chan callResp
  545. }
  546. type callResp struct {
  547. buf frame
  548. err error
  549. }
  550. type Compressor interface {
  551. Name() string
  552. Encode(data []byte) ([]byte, error)
  553. Decode(data []byte) ([]byte, error)
  554. }
  555. type inflightPrepare struct {
  556. info *queryInfo
  557. err error
  558. wg sync.WaitGroup
  559. }
  560. // SnappyCompressor implements the Compressor interface and can be used to
  561. // compress incoming and outgoing frames. The snappy compression algorithm
  562. // aims for very high speeds and reasonable compression.
  563. type SnappyCompressor struct{}
  564. func (s SnappyCompressor) Name() string {
  565. return "snappy"
  566. }
  567. func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
  568. return snappy.Encode(nil, data)
  569. }
  570. func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
  571. return snappy.Decode(nil, data)
  572. }