|
|
@@ -579,14 +579,27 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
|
|
|
return framer, nil
|
|
|
}
|
|
|
|
|
|
-func (c *Conn) prepareStatement(stmt string, tracer Tracer) (*QueryInfo, error) {
|
|
|
+type preparedStatment struct {
|
|
|
+ id []byte
|
|
|
+ request preparedMetadata
|
|
|
+ response resultMetadata
|
|
|
+}
|
|
|
+
|
|
|
+type inflightPrepare struct {
|
|
|
+ wg sync.WaitGroup
|
|
|
+ err error
|
|
|
+
|
|
|
+ preparedStatment *preparedStatment
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Conn) prepareStatement(stmt string, tracer Tracer) (*preparedStatment, error) {
|
|
|
c.session.stmtsLRU.Lock()
|
|
|
stmtCacheKey := c.addr + c.currentKeyspace + stmt
|
|
|
if val, ok := c.session.stmtsLRU.lru.Get(stmtCacheKey); ok {
|
|
|
c.session.stmtsLRU.Unlock()
|
|
|
flight := val.(*inflightPrepare)
|
|
|
flight.wg.Wait()
|
|
|
- return &flight.info, flight.err
|
|
|
+ return flight.preparedStatment, flight.err
|
|
|
}
|
|
|
|
|
|
flight := new(inflightPrepare)
|
|
|
@@ -620,14 +633,15 @@ func (c *Conn) prepareStatement(stmt string, tracer Tracer) (*QueryInfo, error)
|
|
|
|
|
|
switch x := frame.(type) {
|
|
|
case *resultPreparedFrame:
|
|
|
- // defensivly copy as we will recycle the underlying buffer after we
|
|
|
- // return.
|
|
|
- flight.info.Id = copyBytes(x.preparedID)
|
|
|
- // the type info's should _not_ have a reference to the framers read buffer,
|
|
|
- // therefore we can just copy them directly.
|
|
|
- flight.info.Args = x.reqMeta.columns
|
|
|
- flight.info.PKeyColumns = x.reqMeta.pkeyColumns
|
|
|
- flight.info.Rval = x.respMeta.columns
|
|
|
+ flight.preparedStatment = &preparedStatment{
|
|
|
+ // defensivly copy as we will recycle the underlying buffer after we
|
|
|
+ // return.
|
|
|
+ id: copyBytes(x.preparedID),
|
|
|
+ // the type info's should _not_ have a reference to the framers read buffer,
|
|
|
+ // therefore we can just copy them directly.
|
|
|
+ request: x.reqMeta,
|
|
|
+ response: x.respMeta,
|
|
|
+ }
|
|
|
case error:
|
|
|
flight.err = x
|
|
|
default:
|
|
|
@@ -643,7 +657,7 @@ func (c *Conn) prepareStatement(stmt string, tracer Tracer) (*QueryInfo, error)
|
|
|
|
|
|
framerPool.Put(framer)
|
|
|
|
|
|
- return &flight.info, flight.err
|
|
|
+ return flight.preparedStatment, flight.err
|
|
|
}
|
|
|
|
|
|
func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
@@ -664,7 +678,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
|
|
|
var (
|
|
|
frame frameWriter
|
|
|
- info *QueryInfo
|
|
|
+ info *preparedStatment
|
|
|
)
|
|
|
|
|
|
if qry.shouldPrepare() {
|
|
|
@@ -680,19 +694,25 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
if qry.binding == nil {
|
|
|
values = qry.values
|
|
|
} else {
|
|
|
- values, err = qry.binding(info)
|
|
|
+ values, err = qry.binding(&QueryInfo{
|
|
|
+ Id: info.id,
|
|
|
+ Args: info.request.columns,
|
|
|
+ Rval: info.response.columns,
|
|
|
+ PKeyColumns: info.request.pkeyColumns,
|
|
|
+ })
|
|
|
+
|
|
|
if err != nil {
|
|
|
return &Iter{err: err}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if len(values) != len(info.Args) {
|
|
|
- return &Iter{err: ErrQueryArgLength}
|
|
|
+ if len(values) != info.request.actualColCount {
|
|
|
+ return &Iter{err: fmt.Errorf("gocql: expected %d values send got %d", info.request.actualColCount, len(values))}
|
|
|
}
|
|
|
|
|
|
params.values = make([]queryValues, len(values))
|
|
|
for i := 0; i < len(values); i++ {
|
|
|
- val, err := Marshal(info.Args[i].TypeInfo, values[i])
|
|
|
+ val, err := Marshal(info.request.columns[i].TypeInfo, values[i])
|
|
|
if err != nil {
|
|
|
return &Iter{err: err}
|
|
|
}
|
|
|
@@ -705,7 +725,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
params.skipMeta = !qry.isCAS
|
|
|
|
|
|
frame = &writeExecuteFrame{
|
|
|
- preparedID: info.Id,
|
|
|
+ preparedID: info.id,
|
|
|
params: params,
|
|
|
}
|
|
|
} else {
|
|
|
@@ -734,17 +754,19 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
return &Iter{framer: framer}
|
|
|
case *resultRowsFrame:
|
|
|
iter := &Iter{
|
|
|
- meta: x.meta,
|
|
|
rows: x.rows,
|
|
|
framer: framer,
|
|
|
}
|
|
|
|
|
|
if params.skipMeta {
|
|
|
if info != nil {
|
|
|
- iter.meta.columns = info.Rval
|
|
|
+ iter.meta = info.response
|
|
|
+ iter.meta.pagingState = x.meta.pagingState
|
|
|
} else {
|
|
|
return &Iter{framer: framer, err: errors.New("gocql: did not receive metadata but prepared info is nil")}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ iter.meta = x.meta
|
|
|
}
|
|
|
|
|
|
if len(x.meta.pagingState) > 0 && !qry.disableAutoPage {
|
|
|
@@ -753,7 +775,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
|
|
|
pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
|
|
|
}
|
|
|
|
|
|
- iter.next.qry.pageState = x.meta.pagingState
|
|
|
+ iter.next.qry.pageState = copyBytes(x.meta.pagingState)
|
|
|
if iter.next.pos < 1 {
|
|
|
iter.next.pos = 1
|
|
|
}
|
|
|
@@ -863,27 +885,32 @@ func (c *Conn) executeBatch(batch *Batch) *Iter {
|
|
|
return &Iter{err: err}
|
|
|
}
|
|
|
|
|
|
- var args []interface{}
|
|
|
+ var values []interface{}
|
|
|
if entry.binding == nil {
|
|
|
- args = entry.Args
|
|
|
+ values = entry.Args
|
|
|
} else {
|
|
|
- args, err = entry.binding(info)
|
|
|
+ values, err = entry.binding(&QueryInfo{
|
|
|
+ Id: info.id,
|
|
|
+ Args: info.request.columns,
|
|
|
+ Rval: info.response.columns,
|
|
|
+ PKeyColumns: info.request.pkeyColumns,
|
|
|
+ })
|
|
|
if err != nil {
|
|
|
return &Iter{err: err}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if len(args) != len(info.Args) {
|
|
|
- return &Iter{err: ErrQueryArgLength}
|
|
|
+ if len(values) != info.request.actualColCount {
|
|
|
+ return &Iter{err: fmt.Errorf("gocql: batch statment %d expected %d values send got %d", i, info.request.actualColCount, len(values))}
|
|
|
}
|
|
|
|
|
|
- b.preparedID = info.Id
|
|
|
- stmts[string(info.Id)] = entry.Stmt
|
|
|
+ b.preparedID = info.id
|
|
|
+ stmts[string(info.id)] = entry.Stmt
|
|
|
|
|
|
- b.values = make([]queryValues, len(info.Args))
|
|
|
+ b.values = make([]queryValues, info.request.actualColCount)
|
|
|
|
|
|
- for j := 0; j < len(info.Args); j++ {
|
|
|
- val, err := Marshal(info.Args[j].TypeInfo, args[j])
|
|
|
+ for j := 0; j < info.request.actualColCount; j++ {
|
|
|
+ val, err := Marshal(info.request.columns[j].TypeInfo, values[j])
|
|
|
if err != nil {
|
|
|
return &Iter{err: err}
|
|
|
}
|
|
|
@@ -1015,12 +1042,6 @@ func (c *Conn) awaitSchemaAgreement() (err error) {
|
|
|
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
|
|
|
}
|
|
|
|
|
|
-type inflightPrepare struct {
|
|
|
- info QueryInfo
|
|
|
- err error
|
|
|
- wg sync.WaitGroup
|
|
|
-}
|
|
|
-
|
|
|
var (
|
|
|
ErrQueryArgLength = errors.New("gocql: query argument length mismatch")
|
|
|
ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period")
|