conn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. const defaultFrameSize = 4096
  19. const flagResponse = 0x80
  20. const maskVersion = 0x7F
  21. type Authenticator interface {
  22. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  23. Success(data []byte) error
  24. }
  25. type PasswordAuthenticator struct {
  26. Username string
  27. Password string
  28. }
  29. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  30. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  31. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  32. }
  33. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  34. resp[0] = 0
  35. copy(resp[1:], p.Username)
  36. resp[len(p.Username)+1] = 0
  37. copy(resp[2+len(p.Username):], p.Password)
  38. return resp, nil, nil
  39. }
  40. func (p PasswordAuthenticator) Success(data []byte) error {
  41. return nil
  42. }
  43. type SslOptions struct {
  44. CertPath string
  45. KeyPath string
  46. CaPath string //optional depending on server config
  47. // If you want to verify the hostname and server cert (like a wildcard for cass cluster) then you should turn this on
  48. // This option is basically the inverse of InSecureSkipVerify
  49. // See InSecureSkipVerify in http://golang.org/pkg/crypto/tls/ for more info
  50. EnableHostVerification bool
  51. }
  52. type ConnConfig struct {
  53. ProtoVersion int
  54. CQLVersion string
  55. Timeout time.Duration
  56. NumStreams int
  57. Compressor Compressor
  58. Authenticator Authenticator
  59. Keepalive time.Duration
  60. SslOpts *SslOptions
  61. }
  62. // Conn is a single connection to a Cassandra node. It can be used to execute
  63. // queries, but users are usually advised to use a more reliable, higher
  64. // level API.
  65. type Conn struct {
  66. conn net.Conn
  67. r *bufio.Reader
  68. timeout time.Duration
  69. uniq chan uint8
  70. calls []callReq
  71. nwait int32
  72. pool ConnectionPool
  73. compressor Compressor
  74. auth Authenticator
  75. addr string
  76. version uint8
  77. currentKeyspace string
  78. closedMu sync.RWMutex
  79. isClosed bool
  80. }
  81. // Connect establishes a connection to a Cassandra node.
  82. // You must also call the Serve method before you can execute any queries.
  83. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  84. var (
  85. err error
  86. conn net.Conn
  87. )
  88. address := addr[:strings.LastIndex(addr, ":")]
  89. port := addr[strings.LastIndex(addr, ":")+1:]
  90. if strings.Count(address, ":") > 1 && strings.Index(address, "[") < 0 {
  91. addr = fmt.Sprintf("[%s]:%s", address, port)
  92. }
  93. if cfg.SslOpts != nil {
  94. certPool := x509.NewCertPool()
  95. //ca cert is optional
  96. if cfg.SslOpts.CaPath != "" {
  97. pem, err := ioutil.ReadFile(cfg.SslOpts.CaPath)
  98. if err != nil {
  99. return nil, err
  100. }
  101. if !certPool.AppendCertsFromPEM(pem) {
  102. return nil, errors.New("Failed parsing or appending certs")
  103. }
  104. }
  105. mycert, err := tls.LoadX509KeyPair(cfg.SslOpts.CertPath, cfg.SslOpts.KeyPath)
  106. if err != nil {
  107. return nil, err
  108. }
  109. config := tls.Config{
  110. Certificates: []tls.Certificate{mycert},
  111. RootCAs: certPool,
  112. }
  113. config.InsecureSkipVerify = !cfg.SslOpts.EnableHostVerification
  114. if conn, err = tls.Dial("tcp", addr, &config); err != nil {
  115. return nil, err
  116. }
  117. } else if conn, err = net.DialTimeout("tcp", addr, cfg.Timeout); err != nil {
  118. return nil, err
  119. }
  120. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  121. cfg.NumStreams = 128
  122. }
  123. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  124. cfg.ProtoVersion = 2
  125. }
  126. c := &Conn{
  127. conn: conn,
  128. r: bufio.NewReader(conn),
  129. uniq: make(chan uint8, cfg.NumStreams),
  130. calls: make([]callReq, cfg.NumStreams),
  131. timeout: cfg.Timeout,
  132. version: uint8(cfg.ProtoVersion),
  133. addr: conn.RemoteAddr().String(),
  134. pool: pool,
  135. compressor: cfg.Compressor,
  136. auth: cfg.Authenticator,
  137. }
  138. if cfg.Keepalive > 0 {
  139. c.setKeepalive(cfg.Keepalive)
  140. }
  141. for i := 0; i < cap(c.uniq); i++ {
  142. c.uniq <- uint8(i)
  143. }
  144. if err := c.startup(&cfg); err != nil {
  145. conn.Close()
  146. return nil, err
  147. }
  148. go c.serve()
  149. return c, nil
  150. }
  151. func (c *Conn) startup(cfg *ConnConfig) error {
  152. compression := ""
  153. if c.compressor != nil {
  154. compression = c.compressor.Name()
  155. }
  156. var req operation = &startupFrame{
  157. CQLVersion: cfg.CQLVersion,
  158. Compression: compression,
  159. }
  160. var challenger Authenticator
  161. for {
  162. resp, err := c.execSimple(req)
  163. if err != nil {
  164. return err
  165. }
  166. switch x := resp.(type) {
  167. case readyFrame:
  168. return nil
  169. case error:
  170. return x
  171. case authenticateFrame:
  172. if c.auth == nil {
  173. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  174. }
  175. var resp []byte
  176. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  177. if err != nil {
  178. return err
  179. }
  180. req = &authResponseFrame{resp}
  181. case authChallengeFrame:
  182. if challenger == nil {
  183. return fmt.Errorf("authentication error (invalid challenge)")
  184. }
  185. var resp []byte
  186. resp, challenger, err = challenger.Challenge(x.Data)
  187. if err != nil {
  188. return err
  189. }
  190. req = &authResponseFrame{resp}
  191. case authSuccessFrame:
  192. if challenger != nil {
  193. return challenger.Success(x.Data)
  194. }
  195. return nil
  196. default:
  197. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  198. }
  199. }
  200. }
  201. // Serve starts the stream multiplexer for this connection, which is required
  202. // to execute any queries. This method runs as long as the connection is
  203. // open and is therefore usually called in a separate goroutine.
  204. func (c *Conn) serve() {
  205. var (
  206. err error
  207. resp frame
  208. )
  209. for {
  210. resp, err = c.recv()
  211. if err != nil {
  212. break
  213. }
  214. c.dispatch(resp)
  215. }
  216. c.Close()
  217. for id := 0; id < len(c.calls); id++ {
  218. req := &c.calls[id]
  219. if atomic.LoadInt32(&req.active) == 1 {
  220. req.resp <- callResp{nil, err}
  221. }
  222. }
  223. c.pool.HandleError(c, err, true)
  224. }
  225. func (c *Conn) recv() (frame, error) {
  226. resp := make(frame, headerSize, headerSize+512)
  227. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  228. n, last, pinged := 0, 0, false
  229. for n < len(resp) {
  230. nn, err := c.r.Read(resp[n:])
  231. n += nn
  232. if err != nil {
  233. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  234. if n > last {
  235. // we hit the deadline but we made progress.
  236. // simply extend the deadline
  237. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  238. last = n
  239. } else if n == 0 && !pinged {
  240. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  241. if atomic.LoadInt32(&c.nwait) > 0 {
  242. go c.ping()
  243. pinged = true
  244. }
  245. } else {
  246. return nil, err
  247. }
  248. } else {
  249. return nil, err
  250. }
  251. }
  252. if n == headerSize && len(resp) == headerSize {
  253. if resp[0] != c.version|flagResponse {
  254. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  255. }
  256. resp.grow(resp.Length())
  257. }
  258. }
  259. return resp, nil
  260. }
  261. func (c *Conn) execSimple(op operation) (interface{}, error) {
  262. f, err := op.encodeFrame(c.version, nil)
  263. f.setLength(len(f) - headerSize)
  264. if _, err := c.conn.Write([]byte(f)); err != nil {
  265. c.Close()
  266. return nil, err
  267. }
  268. if f, err = c.recv(); err != nil {
  269. return nil, err
  270. }
  271. return c.decodeFrame(f, nil)
  272. }
  273. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  274. req, err := op.encodeFrame(c.version, nil)
  275. if err != nil {
  276. return nil, err
  277. }
  278. if trace != nil {
  279. req[1] |= flagTrace
  280. }
  281. if len(req) > headerSize && c.compressor != nil {
  282. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  283. if err != nil {
  284. return nil, err
  285. }
  286. req = append(req[:headerSize], frame(body)...)
  287. req[1] |= flagCompress
  288. }
  289. req.setLength(len(req) - headerSize)
  290. id := <-c.uniq
  291. req[2] = id
  292. call := &c.calls[id]
  293. call.resp = make(chan callResp, 1)
  294. atomic.AddInt32(&c.nwait, 1)
  295. atomic.StoreInt32(&call.active, 1)
  296. if _, err := c.conn.Write(req); err != nil {
  297. c.uniq <- id
  298. c.Close()
  299. return nil, err
  300. }
  301. reply := <-call.resp
  302. call.resp = nil
  303. c.uniq <- id
  304. if reply.err != nil {
  305. return nil, reply.err
  306. }
  307. return c.decodeFrame(reply.buf, trace)
  308. }
  309. func (c *Conn) dispatch(resp frame) {
  310. id := int(resp[2])
  311. if id >= len(c.calls) {
  312. return
  313. }
  314. call := &c.calls[id]
  315. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  316. return
  317. }
  318. atomic.AddInt32(&c.nwait, -1)
  319. call.resp <- callResp{resp, nil}
  320. }
  321. func (c *Conn) ping() error {
  322. _, err := c.exec(&optionsFrame{}, nil)
  323. return err
  324. }
  325. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  326. stmtsLRU.mu.Lock()
  327. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  328. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  329. flight := val.(*inflightPrepare)
  330. stmtsLRU.mu.Unlock()
  331. flight.wg.Wait()
  332. return flight.info, flight.err
  333. }
  334. flight := new(inflightPrepare)
  335. flight.wg.Add(1)
  336. stmtsLRU.lru.Add(stmtCacheKey, flight)
  337. stmtsLRU.mu.Unlock()
  338. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  339. if err != nil {
  340. flight.err = err
  341. } else {
  342. switch x := resp.(type) {
  343. case resultPreparedFrame:
  344. flight.info = &QueryInfo{
  345. Id: x.PreparedId,
  346. Args: x.Arguments,
  347. Rval: x.ReturnValues,
  348. }
  349. case error:
  350. flight.err = x
  351. default:
  352. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  353. }
  354. err = flight.err
  355. }
  356. flight.wg.Done()
  357. if err != nil {
  358. stmtsLRU.mu.Lock()
  359. stmtsLRU.lru.Remove(stmtCacheKey)
  360. stmtsLRU.mu.Unlock()
  361. }
  362. return flight.info, flight.err
  363. }
  364. func (c *Conn) executeQuery(qry *Query) *Iter {
  365. op := &queryFrame{
  366. Stmt: qry.stmt,
  367. Cons: qry.cons,
  368. PageSize: qry.pageSize,
  369. PageState: qry.pageState,
  370. }
  371. if qry.shouldPrepare() {
  372. // Prepare all DML queries. Other queries can not be prepared.
  373. info, err := c.prepareStatement(qry.stmt, qry.trace)
  374. if err != nil {
  375. return &Iter{err: err}
  376. }
  377. var values []interface{}
  378. if qry.binding == nil {
  379. values = qry.values
  380. } else {
  381. values, err = qry.binding(info)
  382. if err != nil {
  383. return &Iter{err: err}
  384. }
  385. }
  386. if len(values) != len(info.Args) {
  387. return &Iter{err: ErrQueryArgLength}
  388. }
  389. op.Prepared = info.Id
  390. op.Values = make([][]byte, len(values))
  391. for i := 0; i < len(values); i++ {
  392. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  393. if err != nil {
  394. return &Iter{err: err}
  395. }
  396. op.Values[i] = val
  397. }
  398. }
  399. resp, err := c.exec(op, qry.trace)
  400. if err != nil {
  401. return &Iter{err: err}
  402. }
  403. switch x := resp.(type) {
  404. case resultVoidFrame:
  405. return &Iter{}
  406. case resultRowsFrame:
  407. iter := &Iter{columns: x.Columns, rows: x.Rows}
  408. if len(x.PagingState) > 0 {
  409. iter.next = &nextIter{
  410. qry: *qry,
  411. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  412. }
  413. iter.next.qry.pageState = x.PagingState
  414. if iter.next.pos < 1 {
  415. iter.next.pos = 1
  416. }
  417. }
  418. return iter
  419. case resultKeyspaceFrame:
  420. return &Iter{}
  421. case RequestErrUnprepared:
  422. stmtsLRU.mu.Lock()
  423. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  424. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  425. stmtsLRU.lru.Remove(stmtCacheKey)
  426. stmtsLRU.mu.Unlock()
  427. return c.executeQuery(qry)
  428. }
  429. stmtsLRU.mu.Unlock()
  430. return &Iter{err: x}
  431. case error:
  432. return &Iter{err: x}
  433. default:
  434. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  435. }
  436. }
  437. func (c *Conn) Pick(qry *Query) *Conn {
  438. if c.Closed() {
  439. return nil
  440. }
  441. return c
  442. }
  443. func (c *Conn) Closed() bool {
  444. c.closedMu.RLock()
  445. closed := c.isClosed
  446. c.closedMu.RUnlock()
  447. return closed
  448. }
  449. func (c *Conn) Close() {
  450. c.closedMu.Lock()
  451. if c.isClosed {
  452. c.closedMu.Unlock()
  453. return
  454. }
  455. c.isClosed = true
  456. c.closedMu.Unlock()
  457. c.conn.Close()
  458. }
  459. func (c *Conn) Address() string {
  460. return c.addr
  461. }
  462. func (c *Conn) AvailableStreams() int {
  463. return len(c.uniq)
  464. }
  465. func (c *Conn) UseKeyspace(keyspace string) error {
  466. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  467. if err != nil {
  468. return err
  469. }
  470. switch x := resp.(type) {
  471. case resultKeyspaceFrame:
  472. case error:
  473. return x
  474. default:
  475. return NewErrProtocol("Unknown type in response to USE: %s", x)
  476. }
  477. c.currentKeyspace = keyspace
  478. return nil
  479. }
  480. func (c *Conn) executeBatch(batch *Batch) error {
  481. if c.version == 1 {
  482. return ErrUnsupported
  483. }
  484. f := make(frame, headerSize, defaultFrameSize)
  485. f.setHeader(c.version, 0, 0, opBatch)
  486. f.writeByte(byte(batch.Type))
  487. f.writeShort(uint16(len(batch.Entries)))
  488. stmts := make(map[string]string)
  489. for i := 0; i < len(batch.Entries); i++ {
  490. entry := &batch.Entries[i]
  491. var info *QueryInfo
  492. var args []interface{}
  493. if len(entry.Args) > 0 || entry.binding != nil {
  494. var err error
  495. info, err = c.prepareStatement(entry.Stmt, nil)
  496. if err != nil {
  497. return err
  498. }
  499. if entry.binding == nil {
  500. args = entry.Args
  501. } else {
  502. args, err = entry.binding(info)
  503. if err != nil {
  504. return err
  505. }
  506. }
  507. if len(args) != len(info.Args) {
  508. return ErrQueryArgLength
  509. }
  510. stmts[string(info.Id)] = entry.Stmt
  511. f.writeByte(1)
  512. f.writeShortBytes(info.Id)
  513. } else {
  514. f.writeByte(0)
  515. f.writeLongString(entry.Stmt)
  516. }
  517. f.writeShort(uint16(len(args)))
  518. for j := 0; j < len(args); j++ {
  519. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  520. if err != nil {
  521. return err
  522. }
  523. f.writeBytes(val)
  524. }
  525. }
  526. f.writeConsistency(batch.Cons)
  527. resp, err := c.exec(f, nil)
  528. if err != nil {
  529. return err
  530. }
  531. switch x := resp.(type) {
  532. case resultVoidFrame:
  533. return nil
  534. case RequestErrUnprepared:
  535. stmt, found := stmts[string(x.StatementId)]
  536. if found {
  537. stmtsLRU.mu.Lock()
  538. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  539. stmtsLRU.mu.Unlock()
  540. }
  541. if found {
  542. return c.executeBatch(batch)
  543. } else {
  544. return x
  545. }
  546. case error:
  547. return x
  548. default:
  549. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  550. }
  551. }
  552. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  553. defer func() {
  554. if r := recover(); r != nil {
  555. if e, ok := r.(ErrProtocol); ok {
  556. err = e
  557. return
  558. }
  559. panic(r)
  560. }
  561. }()
  562. if len(f) < headerSize {
  563. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  564. } else if f[0] != c.version|flagResponse {
  565. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  566. }
  567. flags, op, f := f[1], f[3], f[headerSize:]
  568. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  569. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  570. return nil, err
  571. } else {
  572. f = frame(buf)
  573. }
  574. }
  575. if flags&flagTrace != 0 {
  576. if len(f) < 16 {
  577. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  578. }
  579. traceId := []byte(f[:16])
  580. f = f[16:]
  581. trace.Trace(traceId)
  582. }
  583. switch op {
  584. case opReady:
  585. return readyFrame{}, nil
  586. case opResult:
  587. switch kind := f.readInt(); kind {
  588. case resultKindVoid:
  589. return resultVoidFrame{}, nil
  590. case resultKindRows:
  591. columns, pageState := f.readMetaData()
  592. numRows := f.readInt()
  593. values := make([][]byte, numRows*len(columns))
  594. for i := 0; i < len(values); i++ {
  595. values[i] = f.readBytes()
  596. }
  597. rows := make([][][]byte, numRows)
  598. for i := 0; i < numRows; i++ {
  599. rows[i], values = values[:len(columns)], values[len(columns):]
  600. }
  601. return resultRowsFrame{columns, rows, pageState}, nil
  602. case resultKindKeyspace:
  603. keyspace := f.readString()
  604. return resultKeyspaceFrame{keyspace}, nil
  605. case resultKindPrepared:
  606. id := f.readShortBytes()
  607. args, _ := f.readMetaData()
  608. if c.version < 2 {
  609. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  610. }
  611. rvals, _ := f.readMetaData()
  612. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  613. case resultKindSchemaChanged:
  614. return resultVoidFrame{}, nil
  615. default:
  616. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  617. }
  618. case opAuthenticate:
  619. return authenticateFrame{f.readString()}, nil
  620. case opAuthChallenge:
  621. return authChallengeFrame{f.readBytes()}, nil
  622. case opAuthSuccess:
  623. return authSuccessFrame{f.readBytes()}, nil
  624. case opSupported:
  625. return supportedFrame{}, nil
  626. case opError:
  627. return f.readError(), nil
  628. default:
  629. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  630. }
  631. }
  632. func (c *Conn) setKeepalive(d time.Duration) error {
  633. if tc, ok := c.conn.(*net.TCPConn); ok {
  634. err := tc.SetKeepAlivePeriod(d)
  635. if err != nil {
  636. return err
  637. }
  638. return tc.SetKeepAlive(true)
  639. }
  640. return nil
  641. }
  642. // QueryInfo represents the meta data associated with a prepared CQL statement.
  643. type QueryInfo struct {
  644. Id []byte
  645. Args []ColumnInfo
  646. Rval []ColumnInfo
  647. }
  648. type callReq struct {
  649. active int32
  650. resp chan callResp
  651. }
  652. type callResp struct {
  653. buf frame
  654. err error
  655. }
  656. type inflightPrepare struct {
  657. info *QueryInfo
  658. err error
  659. wg sync.WaitGroup
  660. }
  661. var (
  662. ErrQueryArgLength = errors.New("query argument length mismatch")
  663. )