conn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. )
  19. const defaultFrameSize = 4096
  20. const flagResponse = 0x80
  21. const maskVersion = 0x7F
  22. //JoinHostPort is a utility to return a address string that can be used
  23. //gocql.Conn to form a connection with a host.
  24. func JoinHostPort(addr string, port int) string {
  25. addr = strings.TrimSpace(addr)
  26. if _, _, err := net.SplitHostPort(addr); err != nil {
  27. addr = net.JoinHostPort(addr, strconv.Itoa(port))
  28. }
  29. return addr
  30. }
  31. type Authenticator interface {
  32. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  33. Success(data []byte) error
  34. }
  35. type PasswordAuthenticator struct {
  36. Username string
  37. Password string
  38. }
  39. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  40. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  41. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  42. }
  43. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  44. resp[0] = 0
  45. copy(resp[1:], p.Username)
  46. resp[len(p.Username)+1] = 0
  47. copy(resp[2+len(p.Username):], p.Password)
  48. return resp, nil, nil
  49. }
  50. func (p PasswordAuthenticator) Success(data []byte) error {
  51. return nil
  52. }
  53. type SslOptions struct {
  54. CertPath string
  55. KeyPath string
  56. CaPath string //optional depending on server config
  57. // If you want to verify the hostname and server cert (like a wildcard for cass cluster) then you should turn this on
  58. // This option is basically the inverse of InSecureSkipVerify
  59. // See InSecureSkipVerify in http://golang.org/pkg/crypto/tls/ for more info
  60. EnableHostVerification bool
  61. }
  62. type ConnConfig struct {
  63. ProtoVersion int
  64. CQLVersion string
  65. Timeout time.Duration
  66. NumStreams int
  67. Compressor Compressor
  68. Authenticator Authenticator
  69. Keepalive time.Duration
  70. SslOpts *SslOptions
  71. }
  72. // Conn is a single connection to a Cassandra node. It can be used to execute
  73. // queries, but users are usually advised to use a more reliable, higher
  74. // level API.
  75. type Conn struct {
  76. conn net.Conn
  77. r *bufio.Reader
  78. timeout time.Duration
  79. uniq chan uint8
  80. calls []callReq
  81. nwait int32
  82. pool ConnectionPool
  83. compressor Compressor
  84. auth Authenticator
  85. addr string
  86. version uint8
  87. currentKeyspace string
  88. closedMu sync.RWMutex
  89. isClosed bool
  90. }
  91. // Connect establishes a connection to a Cassandra node.
  92. // You must also call the Serve method before you can execute any queries.
  93. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  94. var (
  95. err error
  96. conn net.Conn
  97. )
  98. if cfg.SslOpts != nil {
  99. certPool := x509.NewCertPool()
  100. //ca cert is optional
  101. if cfg.SslOpts.CaPath != "" {
  102. pem, err := ioutil.ReadFile(cfg.SslOpts.CaPath)
  103. if err != nil {
  104. return nil, err
  105. }
  106. if !certPool.AppendCertsFromPEM(pem) {
  107. return nil, errors.New("Failed parsing or appending certs")
  108. }
  109. }
  110. mycert, err := tls.LoadX509KeyPair(cfg.SslOpts.CertPath, cfg.SslOpts.KeyPath)
  111. if err != nil {
  112. return nil, err
  113. }
  114. config := tls.Config{
  115. Certificates: []tls.Certificate{mycert},
  116. RootCAs: certPool,
  117. }
  118. config.InsecureSkipVerify = !cfg.SslOpts.EnableHostVerification
  119. if conn, err = tls.Dial("tcp", addr, &config); err != nil {
  120. return nil, err
  121. }
  122. } else if conn, err = net.DialTimeout("tcp", addr, cfg.Timeout); err != nil {
  123. return nil, err
  124. }
  125. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  126. cfg.NumStreams = 128
  127. }
  128. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  129. cfg.ProtoVersion = 2
  130. }
  131. c := &Conn{
  132. conn: conn,
  133. r: bufio.NewReader(conn),
  134. uniq: make(chan uint8, cfg.NumStreams),
  135. calls: make([]callReq, cfg.NumStreams),
  136. timeout: cfg.Timeout,
  137. version: uint8(cfg.ProtoVersion),
  138. addr: conn.RemoteAddr().String(),
  139. pool: pool,
  140. compressor: cfg.Compressor,
  141. auth: cfg.Authenticator,
  142. }
  143. if cfg.Keepalive > 0 {
  144. c.setKeepalive(cfg.Keepalive)
  145. }
  146. for i := 0; i < cap(c.uniq); i++ {
  147. c.uniq <- uint8(i)
  148. }
  149. if err := c.startup(&cfg); err != nil {
  150. conn.Close()
  151. return nil, err
  152. }
  153. go c.serve()
  154. return c, nil
  155. }
  156. func (c *Conn) startup(cfg *ConnConfig) error {
  157. compression := ""
  158. if c.compressor != nil {
  159. compression = c.compressor.Name()
  160. }
  161. var req operation = &startupFrame{
  162. CQLVersion: cfg.CQLVersion,
  163. Compression: compression,
  164. }
  165. var challenger Authenticator
  166. for {
  167. resp, err := c.execSimple(req)
  168. if err != nil {
  169. return err
  170. }
  171. switch x := resp.(type) {
  172. case readyFrame:
  173. return nil
  174. case error:
  175. return x
  176. case authenticateFrame:
  177. if c.auth == nil {
  178. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  179. }
  180. var resp []byte
  181. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  182. if err != nil {
  183. return err
  184. }
  185. req = &authResponseFrame{resp}
  186. case authChallengeFrame:
  187. if challenger == nil {
  188. return fmt.Errorf("authentication error (invalid challenge)")
  189. }
  190. var resp []byte
  191. resp, challenger, err = challenger.Challenge(x.Data)
  192. if err != nil {
  193. return err
  194. }
  195. req = &authResponseFrame{resp}
  196. case authSuccessFrame:
  197. if challenger != nil {
  198. return challenger.Success(x.Data)
  199. }
  200. return nil
  201. default:
  202. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  203. }
  204. }
  205. }
  206. // Serve starts the stream multiplexer for this connection, which is required
  207. // to execute any queries. This method runs as long as the connection is
  208. // open and is therefore usually called in a separate goroutine.
  209. func (c *Conn) serve() {
  210. var (
  211. err error
  212. resp frame
  213. )
  214. for {
  215. resp, err = c.recv()
  216. if err != nil {
  217. break
  218. }
  219. c.dispatch(resp)
  220. }
  221. c.Close()
  222. for id := 0; id < len(c.calls); id++ {
  223. req := &c.calls[id]
  224. if atomic.LoadInt32(&req.active) == 1 {
  225. req.resp <- callResp{nil, err}
  226. }
  227. }
  228. c.pool.HandleError(c, err, true)
  229. }
  230. func (c *Conn) recv() (frame, error) {
  231. resp := make(frame, headerSize, headerSize+512)
  232. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  233. n, last, pinged := 0, 0, false
  234. for n < len(resp) {
  235. nn, err := c.r.Read(resp[n:])
  236. n += nn
  237. if err != nil {
  238. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  239. if n > last {
  240. // we hit the deadline but we made progress.
  241. // simply extend the deadline
  242. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  243. last = n
  244. } else if n == 0 && !pinged {
  245. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  246. if atomic.LoadInt32(&c.nwait) > 0 {
  247. go c.ping()
  248. pinged = true
  249. }
  250. } else {
  251. return nil, err
  252. }
  253. } else {
  254. return nil, err
  255. }
  256. }
  257. if n == headerSize && len(resp) == headerSize {
  258. if resp[0] != c.version|flagResponse {
  259. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  260. }
  261. resp.grow(resp.Length())
  262. }
  263. }
  264. return resp, nil
  265. }
  266. func (c *Conn) execSimple(op operation) (interface{}, error) {
  267. f, err := op.encodeFrame(c.version, nil)
  268. f.setLength(len(f) - headerSize)
  269. if _, err := c.conn.Write([]byte(f)); err != nil {
  270. c.Close()
  271. return nil, err
  272. }
  273. if f, err = c.recv(); err != nil {
  274. return nil, err
  275. }
  276. return c.decodeFrame(f, nil)
  277. }
  278. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  279. req, err := op.encodeFrame(c.version, nil)
  280. if err != nil {
  281. return nil, err
  282. }
  283. if trace != nil {
  284. req[1] |= flagTrace
  285. }
  286. if len(req) > headerSize && c.compressor != nil {
  287. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  288. if err != nil {
  289. return nil, err
  290. }
  291. req = append(req[:headerSize], frame(body)...)
  292. req[1] |= flagCompress
  293. }
  294. req.setLength(len(req) - headerSize)
  295. id := <-c.uniq
  296. req[2] = id
  297. call := &c.calls[id]
  298. call.resp = make(chan callResp, 1)
  299. atomic.AddInt32(&c.nwait, 1)
  300. atomic.StoreInt32(&call.active, 1)
  301. if _, err := c.conn.Write(req); err != nil {
  302. c.uniq <- id
  303. c.Close()
  304. return nil, err
  305. }
  306. reply := <-call.resp
  307. call.resp = nil
  308. c.uniq <- id
  309. if reply.err != nil {
  310. return nil, reply.err
  311. }
  312. return c.decodeFrame(reply.buf, trace)
  313. }
  314. func (c *Conn) dispatch(resp frame) {
  315. id := int(resp[2])
  316. if id >= len(c.calls) {
  317. return
  318. }
  319. call := &c.calls[id]
  320. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  321. return
  322. }
  323. atomic.AddInt32(&c.nwait, -1)
  324. call.resp <- callResp{resp, nil}
  325. }
  326. func (c *Conn) ping() error {
  327. _, err := c.exec(&optionsFrame{}, nil)
  328. return err
  329. }
  330. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  331. stmtsLRU.Lock()
  332. if stmtsLRU.lru == nil {
  333. initStmtsLRU(defaultMaxPreparedStmts)
  334. }
  335. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  336. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  337. flight := val.(*inflightPrepare)
  338. stmtsLRU.Unlock()
  339. flight.wg.Wait()
  340. return flight.info, flight.err
  341. }
  342. flight := new(inflightPrepare)
  343. flight.wg.Add(1)
  344. stmtsLRU.lru.Add(stmtCacheKey, flight)
  345. stmtsLRU.Unlock()
  346. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  347. if err != nil {
  348. flight.err = err
  349. } else {
  350. switch x := resp.(type) {
  351. case resultPreparedFrame:
  352. flight.info = &QueryInfo{
  353. Id: x.PreparedId,
  354. Args: x.Arguments,
  355. Rval: x.ReturnValues,
  356. }
  357. case error:
  358. flight.err = x
  359. default:
  360. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  361. }
  362. err = flight.err
  363. }
  364. flight.wg.Done()
  365. if err != nil {
  366. stmtsLRU.Lock()
  367. stmtsLRU.lru.Remove(stmtCacheKey)
  368. stmtsLRU.Unlock()
  369. }
  370. return flight.info, flight.err
  371. }
  372. func (c *Conn) executeQuery(qry *Query) *Iter {
  373. op := &queryFrame{
  374. Stmt: qry.stmt,
  375. Cons: qry.cons,
  376. PageSize: qry.pageSize,
  377. PageState: qry.pageState,
  378. }
  379. if qry.shouldPrepare() {
  380. // Prepare all DML queries. Other queries can not be prepared.
  381. info, err := c.prepareStatement(qry.stmt, qry.trace)
  382. if err != nil {
  383. return &Iter{err: err}
  384. }
  385. var values []interface{}
  386. if qry.binding == nil {
  387. values = qry.values
  388. } else {
  389. values, err = qry.binding(info)
  390. if err != nil {
  391. return &Iter{err: err}
  392. }
  393. }
  394. if len(values) != len(info.Args) {
  395. return &Iter{err: ErrQueryArgLength}
  396. }
  397. op.Prepared = info.Id
  398. op.Values = make([][]byte, len(values))
  399. for i := 0; i < len(values); i++ {
  400. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  401. if err != nil {
  402. return &Iter{err: err}
  403. }
  404. op.Values[i] = val
  405. }
  406. }
  407. resp, err := c.exec(op, qry.trace)
  408. if err != nil {
  409. return &Iter{err: err}
  410. }
  411. switch x := resp.(type) {
  412. case resultVoidFrame:
  413. return &Iter{}
  414. case resultRowsFrame:
  415. iter := &Iter{columns: x.Columns, rows: x.Rows}
  416. if len(x.PagingState) > 0 {
  417. iter.next = &nextIter{
  418. qry: *qry,
  419. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  420. }
  421. iter.next.qry.pageState = x.PagingState
  422. if iter.next.pos < 1 {
  423. iter.next.pos = 1
  424. }
  425. }
  426. return iter
  427. case resultKeyspaceFrame:
  428. return &Iter{}
  429. case RequestErrUnprepared:
  430. stmtsLRU.Lock()
  431. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  432. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  433. stmtsLRU.lru.Remove(stmtCacheKey)
  434. stmtsLRU.Unlock()
  435. return c.executeQuery(qry)
  436. }
  437. stmtsLRU.Unlock()
  438. return &Iter{err: x}
  439. case error:
  440. return &Iter{err: x}
  441. default:
  442. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  443. }
  444. }
  445. func (c *Conn) Pick(qry *Query) *Conn {
  446. if c.Closed() {
  447. return nil
  448. }
  449. return c
  450. }
  451. func (c *Conn) Closed() bool {
  452. c.closedMu.RLock()
  453. closed := c.isClosed
  454. c.closedMu.RUnlock()
  455. return closed
  456. }
  457. func (c *Conn) Close() {
  458. c.closedMu.Lock()
  459. if c.isClosed {
  460. c.closedMu.Unlock()
  461. return
  462. }
  463. c.isClosed = true
  464. c.closedMu.Unlock()
  465. c.conn.Close()
  466. }
  467. func (c *Conn) Address() string {
  468. return c.addr
  469. }
  470. func (c *Conn) AvailableStreams() int {
  471. return len(c.uniq)
  472. }
  473. func (c *Conn) UseKeyspace(keyspace string) error {
  474. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  475. if err != nil {
  476. return err
  477. }
  478. switch x := resp.(type) {
  479. case resultKeyspaceFrame:
  480. case error:
  481. return x
  482. default:
  483. return NewErrProtocol("Unknown type in response to USE: %s", x)
  484. }
  485. c.currentKeyspace = keyspace
  486. return nil
  487. }
  488. func (c *Conn) executeBatch(batch *Batch) error {
  489. if c.version == 1 {
  490. return ErrUnsupported
  491. }
  492. f := make(frame, headerSize, defaultFrameSize)
  493. f.setHeader(c.version, 0, 0, opBatch)
  494. f.writeByte(byte(batch.Type))
  495. f.writeShort(uint16(len(batch.Entries)))
  496. stmts := make(map[string]string)
  497. for i := 0; i < len(batch.Entries); i++ {
  498. entry := &batch.Entries[i]
  499. var info *QueryInfo
  500. var args []interface{}
  501. if len(entry.Args) > 0 || entry.binding != nil {
  502. var err error
  503. info, err = c.prepareStatement(entry.Stmt, nil)
  504. if err != nil {
  505. return err
  506. }
  507. if entry.binding == nil {
  508. args = entry.Args
  509. } else {
  510. args, err = entry.binding(info)
  511. if err != nil {
  512. return err
  513. }
  514. }
  515. if len(args) != len(info.Args) {
  516. return ErrQueryArgLength
  517. }
  518. stmts[string(info.Id)] = entry.Stmt
  519. f.writeByte(1)
  520. f.writeShortBytes(info.Id)
  521. } else {
  522. f.writeByte(0)
  523. f.writeLongString(entry.Stmt)
  524. }
  525. f.writeShort(uint16(len(args)))
  526. for j := 0; j < len(args); j++ {
  527. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  528. if err != nil {
  529. return err
  530. }
  531. f.writeBytes(val)
  532. }
  533. }
  534. f.writeConsistency(batch.Cons)
  535. resp, err := c.exec(f, nil)
  536. if err != nil {
  537. return err
  538. }
  539. switch x := resp.(type) {
  540. case resultVoidFrame:
  541. return nil
  542. case RequestErrUnprepared:
  543. stmt, found := stmts[string(x.StatementId)]
  544. if found {
  545. stmtsLRU.Lock()
  546. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  547. stmtsLRU.Unlock()
  548. }
  549. if found {
  550. return c.executeBatch(batch)
  551. } else {
  552. return x
  553. }
  554. case error:
  555. return x
  556. default:
  557. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  558. }
  559. }
  560. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  561. defer func() {
  562. if r := recover(); r != nil {
  563. if e, ok := r.(ErrProtocol); ok {
  564. err = e
  565. return
  566. }
  567. panic(r)
  568. }
  569. }()
  570. if len(f) < headerSize {
  571. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  572. } else if f[0] != c.version|flagResponse {
  573. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  574. }
  575. flags, op, f := f[1], f[3], f[headerSize:]
  576. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  577. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  578. return nil, err
  579. } else {
  580. f = frame(buf)
  581. }
  582. }
  583. if flags&flagTrace != 0 {
  584. if len(f) < 16 {
  585. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  586. }
  587. traceId := []byte(f[:16])
  588. f = f[16:]
  589. trace.Trace(traceId)
  590. }
  591. switch op {
  592. case opReady:
  593. return readyFrame{}, nil
  594. case opResult:
  595. switch kind := f.readInt(); kind {
  596. case resultKindVoid:
  597. return resultVoidFrame{}, nil
  598. case resultKindRows:
  599. columns, pageState := f.readMetaData()
  600. numRows := f.readInt()
  601. values := make([][]byte, numRows*len(columns))
  602. for i := 0; i < len(values); i++ {
  603. values[i] = f.readBytes()
  604. }
  605. rows := make([][][]byte, numRows)
  606. for i := 0; i < numRows; i++ {
  607. rows[i], values = values[:len(columns)], values[len(columns):]
  608. }
  609. return resultRowsFrame{columns, rows, pageState}, nil
  610. case resultKindKeyspace:
  611. keyspace := f.readString()
  612. return resultKeyspaceFrame{keyspace}, nil
  613. case resultKindPrepared:
  614. id := f.readShortBytes()
  615. args, _ := f.readMetaData()
  616. if c.version < 2 {
  617. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  618. }
  619. rvals, _ := f.readMetaData()
  620. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  621. case resultKindSchemaChanged:
  622. return resultVoidFrame{}, nil
  623. default:
  624. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  625. }
  626. case opAuthenticate:
  627. return authenticateFrame{f.readString()}, nil
  628. case opAuthChallenge:
  629. return authChallengeFrame{f.readBytes()}, nil
  630. case opAuthSuccess:
  631. return authSuccessFrame{f.readBytes()}, nil
  632. case opSupported:
  633. return supportedFrame{}, nil
  634. case opError:
  635. return f.readError(), nil
  636. default:
  637. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  638. }
  639. }
  640. func (c *Conn) setKeepalive(d time.Duration) error {
  641. if tc, ok := c.conn.(*net.TCPConn); ok {
  642. err := tc.SetKeepAlivePeriod(d)
  643. if err != nil {
  644. return err
  645. }
  646. return tc.SetKeepAlive(true)
  647. }
  648. return nil
  649. }
  650. // QueryInfo represents the meta data associated with a prepared CQL statement.
  651. type QueryInfo struct {
  652. Id []byte
  653. Args []ColumnInfo
  654. Rval []ColumnInfo
  655. }
  656. type callReq struct {
  657. active int32
  658. resp chan callResp
  659. }
  660. type callResp struct {
  661. buf frame
  662. err error
  663. }
  664. type inflightPrepare struct {
  665. info *QueryInfo
  666. err error
  667. wg sync.WaitGroup
  668. }
  669. var (
  670. ErrQueryArgLength = errors.New("query argument length mismatch")
  671. )