conn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "crypto/x509"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "net"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. )
  19. const defaultFrameSize = 4096
  20. const flagResponse = 0x80
  21. const maskVersion = 0x7F
  22. //JoinHostPort is a utility to return a address string that can be used
  23. //gocql.Conn to form a connection with a host.
  24. func JoinHostPort(addr string, port int) string {
  25. addr = strings.TrimSpace(addr)
  26. if _, _, err := net.SplitHostPort(addr); err != nil {
  27. addr = net.JoinHostPort(addr, strconv.Itoa(port))
  28. }
  29. return addr
  30. }
  31. type Authenticator interface {
  32. Challenge(req []byte) (resp []byte, auth Authenticator, err error)
  33. Success(data []byte) error
  34. }
  35. type PasswordAuthenticator struct {
  36. Username string
  37. Password string
  38. }
  39. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
  40. if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
  41. return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
  42. }
  43. resp := make([]byte, 2+len(p.Username)+len(p.Password))
  44. resp[0] = 0
  45. copy(resp[1:], p.Username)
  46. resp[len(p.Username)+1] = 0
  47. copy(resp[2+len(p.Username):], p.Password)
  48. return resp, nil, nil
  49. }
  50. func (p PasswordAuthenticator) Success(data []byte) error {
  51. return nil
  52. }
  53. type SslOptions struct {
  54. CertPath string
  55. KeyPath string
  56. CaPath string //optional depending on server config
  57. // If you want to verify the hostname and server cert (like a wildcard for cass cluster) then you should turn this on
  58. // This option is basically the inverse of InSecureSkipVerify
  59. // See InSecureSkipVerify in http://golang.org/pkg/crypto/tls/ for more info
  60. EnableHostVerification bool
  61. }
  62. type ConnConfig struct {
  63. ProtoVersion int
  64. CQLVersion string
  65. Timeout time.Duration
  66. NumStreams int
  67. Compressor Compressor
  68. Authenticator Authenticator
  69. Keepalive time.Duration
  70. SslOpts *SslOptions
  71. }
  72. // Conn is a single connection to a Cassandra node. It can be used to execute
  73. // queries, but users are usually advised to use a more reliable, higher
  74. // level API.
  75. type Conn struct {
  76. conn net.Conn
  77. r *bufio.Reader
  78. timeout time.Duration
  79. uniq chan uint8
  80. calls []callReq
  81. nwait int32
  82. pool ConnectionPool
  83. compressor Compressor
  84. auth Authenticator
  85. addr string
  86. version uint8
  87. currentKeyspace string
  88. closedMu sync.RWMutex
  89. isClosed bool
  90. }
  91. // Connect establishes a connection to a Cassandra node.
  92. // You must also call the Serve method before you can execute any queries.
  93. func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
  94. var (
  95. err error
  96. conn net.Conn
  97. )
  98. if cfg.SslOpts != nil {
  99. certPool := x509.NewCertPool()
  100. //ca cert is optional
  101. if cfg.SslOpts.CaPath != "" {
  102. pem, err := ioutil.ReadFile(cfg.SslOpts.CaPath)
  103. if err != nil {
  104. return nil, err
  105. }
  106. if !certPool.AppendCertsFromPEM(pem) {
  107. return nil, errors.New("Failed parsing or appending certs")
  108. }
  109. }
  110. mycert, err := tls.LoadX509KeyPair(cfg.SslOpts.CertPath, cfg.SslOpts.KeyPath)
  111. if err != nil {
  112. return nil, err
  113. }
  114. config := tls.Config{
  115. Certificates: []tls.Certificate{mycert},
  116. RootCAs: certPool,
  117. }
  118. config.InsecureSkipVerify = !cfg.SslOpts.EnableHostVerification
  119. if conn, err = tls.Dial("tcp", addr, &config); err != nil {
  120. return nil, err
  121. }
  122. } else if conn, err = net.DialTimeout("tcp", addr, cfg.Timeout); err != nil {
  123. return nil, err
  124. }
  125. if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
  126. cfg.NumStreams = 128
  127. }
  128. if cfg.ProtoVersion != 1 && cfg.ProtoVersion != 2 {
  129. cfg.ProtoVersion = 2
  130. }
  131. c := &Conn{
  132. conn: conn,
  133. r: bufio.NewReader(conn),
  134. uniq: make(chan uint8, cfg.NumStreams),
  135. calls: make([]callReq, cfg.NumStreams),
  136. timeout: cfg.Timeout,
  137. version: uint8(cfg.ProtoVersion),
  138. addr: conn.RemoteAddr().String(),
  139. pool: pool,
  140. compressor: cfg.Compressor,
  141. auth: cfg.Authenticator,
  142. }
  143. if cfg.Keepalive > 0 {
  144. c.setKeepalive(cfg.Keepalive)
  145. }
  146. for i := 0; i < cap(c.uniq); i++ {
  147. c.uniq <- uint8(i)
  148. }
  149. if err := c.startup(&cfg); err != nil {
  150. conn.Close()
  151. return nil, err
  152. }
  153. go c.serve()
  154. return c, nil
  155. }
  156. func (c *Conn) startup(cfg *ConnConfig) error {
  157. compression := ""
  158. if c.compressor != nil {
  159. compression = c.compressor.Name()
  160. }
  161. var req operation = &startupFrame{
  162. CQLVersion: cfg.CQLVersion,
  163. Compression: compression,
  164. }
  165. var challenger Authenticator
  166. for {
  167. resp, err := c.execSimple(req)
  168. if err != nil {
  169. return err
  170. }
  171. switch x := resp.(type) {
  172. case readyFrame:
  173. return nil
  174. case error:
  175. return x
  176. case authenticateFrame:
  177. if c.auth == nil {
  178. return fmt.Errorf("authentication required (using %q)", x.Authenticator)
  179. }
  180. var resp []byte
  181. resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
  182. if err != nil {
  183. return err
  184. }
  185. req = &authResponseFrame{resp}
  186. case authChallengeFrame:
  187. if challenger == nil {
  188. return fmt.Errorf("authentication error (invalid challenge)")
  189. }
  190. var resp []byte
  191. resp, challenger, err = challenger.Challenge(x.Data)
  192. if err != nil {
  193. return err
  194. }
  195. req = &authResponseFrame{resp}
  196. case authSuccessFrame:
  197. if challenger != nil {
  198. return challenger.Success(x.Data)
  199. }
  200. return nil
  201. default:
  202. return NewErrProtocol("Unknown type of response to startup frame: %s", x)
  203. }
  204. }
  205. }
  206. // Serve starts the stream multiplexer for this connection, which is required
  207. // to execute any queries. This method runs as long as the connection is
  208. // open and is therefore usually called in a separate goroutine.
  209. func (c *Conn) serve() {
  210. var (
  211. err error
  212. resp frame
  213. )
  214. for {
  215. resp, err = c.recv()
  216. if err != nil {
  217. break
  218. }
  219. c.dispatch(resp)
  220. }
  221. c.Close()
  222. for id := 0; id < len(c.calls); id++ {
  223. req := &c.calls[id]
  224. if atomic.LoadInt32(&req.active) == 1 {
  225. req.resp <- callResp{nil, err}
  226. }
  227. }
  228. c.pool.HandleError(c, err, true)
  229. }
  230. func (c *Conn) recv() (frame, error) {
  231. resp := make(frame, headerSize, headerSize+512)
  232. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  233. n, last, pinged := 0, 0, false
  234. for n < len(resp) {
  235. nn, err := c.r.Read(resp[n:])
  236. n += nn
  237. if err != nil {
  238. if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
  239. if n > last {
  240. // we hit the deadline but we made progress.
  241. // simply extend the deadline
  242. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  243. last = n
  244. } else if n == 0 && !pinged {
  245. c.conn.SetReadDeadline(time.Now().Add(c.timeout))
  246. if atomic.LoadInt32(&c.nwait) > 0 {
  247. go c.ping()
  248. pinged = true
  249. }
  250. } else {
  251. return nil, err
  252. }
  253. } else {
  254. return nil, err
  255. }
  256. }
  257. if n == headerSize && len(resp) == headerSize {
  258. if resp[0] != c.version|flagResponse {
  259. return nil, NewErrProtocol("recv: Response protocol version does not match connection protocol version (%d != %d)", resp[0], c.version|flagResponse)
  260. }
  261. resp.grow(resp.Length())
  262. }
  263. }
  264. return resp, nil
  265. }
  266. func (c *Conn) execSimple(op operation) (interface{}, error) {
  267. f, err := op.encodeFrame(c.version, nil)
  268. f.setLength(len(f) - headerSize)
  269. if _, err := c.conn.Write([]byte(f)); err != nil {
  270. c.Close()
  271. return nil, err
  272. }
  273. if f, err = c.recv(); err != nil {
  274. return nil, err
  275. }
  276. return c.decodeFrame(f, nil)
  277. }
  278. func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
  279. req, err := op.encodeFrame(c.version, nil)
  280. if err != nil {
  281. return nil, err
  282. }
  283. if trace != nil {
  284. req[1] |= flagTrace
  285. }
  286. if len(req) > headerSize && c.compressor != nil {
  287. body, err := c.compressor.Encode([]byte(req[headerSize:]))
  288. if err != nil {
  289. return nil, err
  290. }
  291. req = append(req[:headerSize], frame(body)...)
  292. req[1] |= flagCompress
  293. }
  294. req.setLength(len(req) - headerSize)
  295. id := <-c.uniq
  296. req[2] = id
  297. call := &c.calls[id]
  298. call.resp = make(chan callResp, 1)
  299. atomic.AddInt32(&c.nwait, 1)
  300. atomic.StoreInt32(&call.active, 1)
  301. if _, err := c.conn.Write(req); err != nil {
  302. c.uniq <- id
  303. c.Close()
  304. return nil, err
  305. }
  306. reply := <-call.resp
  307. call.resp = nil
  308. c.uniq <- id
  309. if reply.err != nil {
  310. return nil, reply.err
  311. }
  312. return c.decodeFrame(reply.buf, trace)
  313. }
  314. func (c *Conn) dispatch(resp frame) {
  315. id := int(resp[2])
  316. if id >= len(c.calls) {
  317. return
  318. }
  319. call := &c.calls[id]
  320. if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
  321. return
  322. }
  323. atomic.AddInt32(&c.nwait, -1)
  324. call.resp <- callResp{resp, nil}
  325. }
  326. func (c *Conn) ping() error {
  327. _, err := c.exec(&optionsFrame{}, nil)
  328. return err
  329. }
  330. func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
  331. stmtsLRU.mu.Lock()
  332. stmtCacheKey := c.addr + c.currentKeyspace + stmt
  333. if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  334. flight := val.(*inflightPrepare)
  335. stmtsLRU.mu.Unlock()
  336. flight.wg.Wait()
  337. return flight.info, flight.err
  338. }
  339. flight := new(inflightPrepare)
  340. flight.wg.Add(1)
  341. stmtsLRU.lru.Add(stmtCacheKey, flight)
  342. stmtsLRU.mu.Unlock()
  343. resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
  344. if err != nil {
  345. flight.err = err
  346. } else {
  347. switch x := resp.(type) {
  348. case resultPreparedFrame:
  349. flight.info = &QueryInfo{
  350. Id: x.PreparedId,
  351. Args: x.Arguments,
  352. Rval: x.ReturnValues,
  353. }
  354. case error:
  355. flight.err = x
  356. default:
  357. flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
  358. }
  359. err = flight.err
  360. }
  361. flight.wg.Done()
  362. if err != nil {
  363. stmtsLRU.mu.Lock()
  364. stmtsLRU.lru.Remove(stmtCacheKey)
  365. stmtsLRU.mu.Unlock()
  366. }
  367. return flight.info, flight.err
  368. }
  369. func (c *Conn) executeQuery(qry *Query) *Iter {
  370. op := &queryFrame{
  371. Stmt: qry.stmt,
  372. Cons: qry.cons,
  373. PageSize: qry.pageSize,
  374. PageState: qry.pageState,
  375. }
  376. if qry.shouldPrepare() {
  377. // Prepare all DML queries. Other queries can not be prepared.
  378. info, err := c.prepareStatement(qry.stmt, qry.trace)
  379. if err != nil {
  380. return &Iter{err: err}
  381. }
  382. var values []interface{}
  383. if qry.binding == nil {
  384. values = qry.values
  385. } else {
  386. values, err = qry.binding(info)
  387. if err != nil {
  388. return &Iter{err: err}
  389. }
  390. }
  391. if len(values) != len(info.Args) {
  392. return &Iter{err: ErrQueryArgLength}
  393. }
  394. op.Prepared = info.Id
  395. op.Values = make([][]byte, len(values))
  396. for i := 0; i < len(values); i++ {
  397. val, err := Marshal(info.Args[i].TypeInfo, values[i])
  398. if err != nil {
  399. return &Iter{err: err}
  400. }
  401. op.Values[i] = val
  402. }
  403. }
  404. resp, err := c.exec(op, qry.trace)
  405. if err != nil {
  406. return &Iter{err: err}
  407. }
  408. switch x := resp.(type) {
  409. case resultVoidFrame:
  410. return &Iter{}
  411. case resultRowsFrame:
  412. iter := &Iter{columns: x.Columns, rows: x.Rows}
  413. if len(x.PagingState) > 0 {
  414. iter.next = &nextIter{
  415. qry: *qry,
  416. pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
  417. }
  418. iter.next.qry.pageState = x.PagingState
  419. if iter.next.pos < 1 {
  420. iter.next.pos = 1
  421. }
  422. }
  423. return iter
  424. case resultKeyspaceFrame:
  425. return &Iter{}
  426. case RequestErrUnprepared:
  427. stmtsLRU.mu.Lock()
  428. stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
  429. if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
  430. stmtsLRU.lru.Remove(stmtCacheKey)
  431. stmtsLRU.mu.Unlock()
  432. return c.executeQuery(qry)
  433. }
  434. stmtsLRU.mu.Unlock()
  435. return &Iter{err: x}
  436. case error:
  437. return &Iter{err: x}
  438. default:
  439. return &Iter{err: NewErrProtocol("Unknown type in response to execute query: %s", x)}
  440. }
  441. }
  442. func (c *Conn) Pick(qry *Query) *Conn {
  443. if c.Closed() {
  444. return nil
  445. }
  446. return c
  447. }
  448. func (c *Conn) Closed() bool {
  449. c.closedMu.RLock()
  450. closed := c.isClosed
  451. c.closedMu.RUnlock()
  452. return closed
  453. }
  454. func (c *Conn) Close() {
  455. c.closedMu.Lock()
  456. if c.isClosed {
  457. c.closedMu.Unlock()
  458. return
  459. }
  460. c.isClosed = true
  461. c.closedMu.Unlock()
  462. c.conn.Close()
  463. }
  464. func (c *Conn) Address() string {
  465. return c.addr
  466. }
  467. func (c *Conn) AvailableStreams() int {
  468. return len(c.uniq)
  469. }
  470. func (c *Conn) UseKeyspace(keyspace string) error {
  471. resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
  472. if err != nil {
  473. return err
  474. }
  475. switch x := resp.(type) {
  476. case resultKeyspaceFrame:
  477. case error:
  478. return x
  479. default:
  480. return NewErrProtocol("Unknown type in response to USE: %s", x)
  481. }
  482. c.currentKeyspace = keyspace
  483. return nil
  484. }
  485. func (c *Conn) executeBatch(batch *Batch) error {
  486. if c.version == 1 {
  487. return ErrUnsupported
  488. }
  489. f := make(frame, headerSize, defaultFrameSize)
  490. f.setHeader(c.version, 0, 0, opBatch)
  491. f.writeByte(byte(batch.Type))
  492. f.writeShort(uint16(len(batch.Entries)))
  493. stmts := make(map[string]string)
  494. for i := 0; i < len(batch.Entries); i++ {
  495. entry := &batch.Entries[i]
  496. var info *QueryInfo
  497. var args []interface{}
  498. if len(entry.Args) > 0 || entry.binding != nil {
  499. var err error
  500. info, err = c.prepareStatement(entry.Stmt, nil)
  501. if err != nil {
  502. return err
  503. }
  504. if entry.binding == nil {
  505. args = entry.Args
  506. } else {
  507. args, err = entry.binding(info)
  508. if err != nil {
  509. return err
  510. }
  511. }
  512. if len(args) != len(info.Args) {
  513. return ErrQueryArgLength
  514. }
  515. stmts[string(info.Id)] = entry.Stmt
  516. f.writeByte(1)
  517. f.writeShortBytes(info.Id)
  518. } else {
  519. f.writeByte(0)
  520. f.writeLongString(entry.Stmt)
  521. }
  522. f.writeShort(uint16(len(args)))
  523. for j := 0; j < len(args); j++ {
  524. val, err := Marshal(info.Args[j].TypeInfo, args[j])
  525. if err != nil {
  526. return err
  527. }
  528. f.writeBytes(val)
  529. }
  530. }
  531. f.writeConsistency(batch.Cons)
  532. resp, err := c.exec(f, nil)
  533. if err != nil {
  534. return err
  535. }
  536. switch x := resp.(type) {
  537. case resultVoidFrame:
  538. return nil
  539. case RequestErrUnprepared:
  540. stmt, found := stmts[string(x.StatementId)]
  541. if found {
  542. stmtsLRU.mu.Lock()
  543. stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
  544. stmtsLRU.mu.Unlock()
  545. }
  546. if found {
  547. return c.executeBatch(batch)
  548. } else {
  549. return x
  550. }
  551. case error:
  552. return x
  553. default:
  554. return NewErrProtocol("Unknown type in response to batch statement: %s", x)
  555. }
  556. }
  557. func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
  558. defer func() {
  559. if r := recover(); r != nil {
  560. if e, ok := r.(ErrProtocol); ok {
  561. err = e
  562. return
  563. }
  564. panic(r)
  565. }
  566. }()
  567. if len(f) < headerSize {
  568. return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
  569. } else if f[0] != c.version|flagResponse {
  570. return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
  571. }
  572. flags, op, f := f[1], f[3], f[headerSize:]
  573. if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
  574. if buf, err := c.compressor.Decode([]byte(f)); err != nil {
  575. return nil, err
  576. } else {
  577. f = frame(buf)
  578. }
  579. }
  580. if flags&flagTrace != 0 {
  581. if len(f) < 16 {
  582. return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
  583. }
  584. traceId := []byte(f[:16])
  585. f = f[16:]
  586. trace.Trace(traceId)
  587. }
  588. switch op {
  589. case opReady:
  590. return readyFrame{}, nil
  591. case opResult:
  592. switch kind := f.readInt(); kind {
  593. case resultKindVoid:
  594. return resultVoidFrame{}, nil
  595. case resultKindRows:
  596. columns, pageState := f.readMetaData()
  597. numRows := f.readInt()
  598. values := make([][]byte, numRows*len(columns))
  599. for i := 0; i < len(values); i++ {
  600. values[i] = f.readBytes()
  601. }
  602. rows := make([][][]byte, numRows)
  603. for i := 0; i < numRows; i++ {
  604. rows[i], values = values[:len(columns)], values[len(columns):]
  605. }
  606. return resultRowsFrame{columns, rows, pageState}, nil
  607. case resultKindKeyspace:
  608. keyspace := f.readString()
  609. return resultKeyspaceFrame{keyspace}, nil
  610. case resultKindPrepared:
  611. id := f.readShortBytes()
  612. args, _ := f.readMetaData()
  613. if c.version < 2 {
  614. return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
  615. }
  616. rvals, _ := f.readMetaData()
  617. return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
  618. case resultKindSchemaChanged:
  619. return resultVoidFrame{}, nil
  620. default:
  621. return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
  622. }
  623. case opAuthenticate:
  624. return authenticateFrame{f.readString()}, nil
  625. case opAuthChallenge:
  626. return authChallengeFrame{f.readBytes()}, nil
  627. case opAuthSuccess:
  628. return authSuccessFrame{f.readBytes()}, nil
  629. case opSupported:
  630. return supportedFrame{}, nil
  631. case opError:
  632. return f.readError(), nil
  633. default:
  634. return nil, NewErrProtocol("Decoding frame: unknown op", op)
  635. }
  636. }
  637. func (c *Conn) setKeepalive(d time.Duration) error {
  638. if tc, ok := c.conn.(*net.TCPConn); ok {
  639. err := tc.SetKeepAlivePeriod(d)
  640. if err != nil {
  641. return err
  642. }
  643. return tc.SetKeepAlive(true)
  644. }
  645. return nil
  646. }
  647. // QueryInfo represents the meta data associated with a prepared CQL statement.
  648. type QueryInfo struct {
  649. Id []byte
  650. Args []ColumnInfo
  651. Rval []ColumnInfo
  652. }
  653. type callReq struct {
  654. active int32
  655. resp chan callResp
  656. }
  657. type callResp struct {
  658. buf frame
  659. err error
  660. }
  661. type inflightPrepare struct {
  662. info *QueryInfo
  663. err error
  664. wg sync.WaitGroup
  665. }
  666. var (
  667. ErrQueryArgLength = errors.New("query argument length mismatch")
  668. )