Pārlūkot izejas kodu

Merge pull request #493 from Zariel/reduce-gc-copy

Remove []byte copy in framer
Chris Bannister 10 gadi atpakaļ
vecāks
revīzija
1b22545087
7 mainītis faili ar 191 papildinājumiem un 91 dzēšanām
  1. 15 18
      cassandra_test.go
  2. 100 55
      conn.go
  3. 3 10
      frame.go
  4. 48 0
      framer_bench_test.go
  5. 6 0
      helpers.go
  6. 19 8
      session.go
  7. BIN
      testdata/frames/bench_parse_result.gz

+ 15 - 18
cassandra_test.go

@@ -1156,21 +1156,18 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
 	stmtsLRU.Lock()
 	stmtsLRU.Lock()
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
 	stmtsLRU.Unlock()
 	stmtsLRU.Unlock()
-	flight.info = &resultPreparedFrame{
-		preparedID: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
-		reqMeta: preparedMetadata{
-			resultMetadata: resultMetadata{
-				columns: []ColumnInfo{
-					{
-						Keyspace: "gocql_test",
-						Table:    table,
-						Name:     "foo",
-						TypeInfo: NativeType{
-							typ: TypeVarchar,
-						},
-					},
+	flight.info = QueryInfo{
+		Id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
+		Args: []ColumnInfo{
+			{
+				Keyspace: "gocql_test",
+				Table:    table,
+				Name:     "foo",
+				TypeInfo: NativeType{
+					typ: TypeVarchar,
 				},
 				},
-			}},
+			},
+		},
 	}
 	}
 	return stmt, conn
 	return stmt, conn
 }
 }
@@ -1233,13 +1230,13 @@ func TestQueryInfo(t *testing.T) {
 		t.Fatalf("Failed to execute query for preparing statement: %v", err)
 		t.Fatalf("Failed to execute query for preparing statement: %v", err)
 	}
 	}
 
 
-	if len(info.reqMeta.columns) != 1 {
-		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, len(info.reqMeta.columns))
+	if x := len(info.Args); x != 1 {
+		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, x)
 	}
 	}
 
 
 	if *flagProto > 1 {
 	if *flagProto > 1 {
-		if len(info.respMeta.columns) != 2 {
-			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, len(info.respMeta.columns))
+		if x := len(info.Rval); x != 2 {
+			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, x)
 		}
 		}
 	}
 	}
 }
 }

+ 100 - 55
conn.go

@@ -240,7 +240,12 @@ func (c *Conn) startup(cfg *ConnConfig) error {
 		m["COMPRESSION"] = c.compressor.Name()
 		m["COMPRESSION"] = c.compressor.Name()
 	}
 	}
 
 
-	frame, err := c.exec(&writeStartupFrame{opts: m}, nil)
+	framer, err := c.exec(&writeStartupFrame{opts: m}, nil)
+	if err != nil {
+		return err
+	}
+
+	frame, err := framer.parseFrame()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -270,7 +275,12 @@ func (c *Conn) authenticateHandshake(authFrame *authenticateFrame) error {
 	req := &writeAuthResponseFrame{data: resp}
 	req := &writeAuthResponseFrame{data: resp}
 
 
 	for {
 	for {
-		frame, err := c.exec(req, nil)
+		framer, err := c.exec(req, nil)
+		if err != nil {
+			return err
+		}
+
+		frame, err := framer.parseFrame()
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -295,6 +305,8 @@ func (c *Conn) authenticateHandshake(authFrame *authenticateFrame) error {
 		default:
 		default:
 			return fmt.Errorf("unknown frame response during authentication: %v", v)
 			return fmt.Errorf("unknown frame response during authentication: %v", v)
 		}
 		}
+
+		framerPool.Put(framer)
 	}
 	}
 }
 }
 
 
@@ -385,6 +397,7 @@ func (c *Conn) recv() error {
 		if err := framer.readFrame(&head); err != nil {
 		if err := framer.readFrame(&head); err != nil {
 			return err
 			return err
 		}
 		}
+		defer framerPool.Put(framer)
 
 
 		frame, err := framer.parseFrame()
 		frame, err := framer.parseFrame()
 		if err != nil {
 		if err != nil {
@@ -435,7 +448,6 @@ type callReq struct {
 
 
 func (c *Conn) releaseStream(stream int) {
 func (c *Conn) releaseStream(stream int) {
 	call := &c.calls[stream]
 	call := &c.calls[stream]
-	framerPool.Put(call.framer)
 	call.framer = nil
 	call.framer = nil
 
 
 	select {
 	select {
@@ -450,7 +462,7 @@ func (c *Conn) handleTimeout() {
 	}
 	}
 }
 }
 
 
-func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
+func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
 	// TODO: move tracer onto conn
 	// TODO: move tracer onto conn
 	var stream int
 	var stream int
 	select {
 	select {
@@ -512,19 +524,10 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
 		return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
 		return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
 	}
 	}
 
 
-	frame, err := framer.parseFrame()
-	if err != nil {
-		return nil, err
-	}
-
-	if len(framer.traceID) > 0 {
-		tracer.Trace(framer.traceID)
-	}
-
-	return frame, nil
+	return framer, nil
 }
 }
 
 
-func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame, error) {
+func (c *Conn) prepareStatement(stmt string, tracer Tracer) (*QueryInfo, error) {
 	stmtsLRU.Lock()
 	stmtsLRU.Lock()
 	if stmtsLRU.lru == nil {
 	if stmtsLRU.lru == nil {
 		initStmtsLRU(defaultMaxPreparedStmts)
 		initStmtsLRU(defaultMaxPreparedStmts)
@@ -536,7 +539,7 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 		stmtsLRU.Unlock()
 		stmtsLRU.Unlock()
 		flight := val.(*inflightPrepare)
 		flight := val.(*inflightPrepare)
 		flight.wg.Wait()
 		flight.wg.Wait()
-		return flight.info, flight.err
+		return &flight.info, flight.err
 	}
 	}
 
 
 	flight := new(inflightPrepare)
 	flight := new(inflightPrepare)
@@ -548,16 +551,36 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 		statement: stmt,
 		statement: stmt,
 	}
 	}
 
 
-	resp, err := c.exec(prep, trace)
+	framer, err := c.exec(prep, tracer)
 	if err != nil {
 	if err != nil {
 		flight.err = err
 		flight.err = err
 		flight.wg.Done()
 		flight.wg.Done()
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	switch x := resp.(type) {
+	frame, err := framer.parseFrame()
+	if err != nil {
+		flight.err = err
+		flight.wg.Done()
+		return nil, err
+	}
+
+	// TODO(zariel): tidy this up, simplify handling of frame parsing so its not duplicated
+	// everytime we need to parse a frame.
+	if len(framer.traceID) > 0 {
+		tracer.Trace(framer.traceID)
+	}
+
+	switch x := frame.(type) {
 	case *resultPreparedFrame:
 	case *resultPreparedFrame:
-		flight.info = x
+		// defensivly copy as we will recycle the underlying buffer after we
+		// return.
+		flight.info.Id = copyBytes(x.preparedID)
+		// the type info's should _not_ have a reference to the framers read buffer,
+		// therefore we can just copy them directly.
+		flight.info.Args = x.reqMeta.columns
+		flight.info.PKeyColumns = x.reqMeta.pkeyColumns
+		flight.info.Rval = x.respMeta.columns
 	case error:
 	case error:
 		flight.err = x
 		flight.err = x
 	default:
 	default:
@@ -571,7 +594,9 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 		stmtsLRU.Unlock()
 		stmtsLRU.Unlock()
 	}
 	}
 
 
-	return flight.info, flight.err
+	framerPool.Put(framer)
+
+	return &flight.info, flight.err
 }
 }
 
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
 func (c *Conn) executeQuery(qry *Query) *Iter {
@@ -603,24 +628,19 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		if qry.binding == nil {
 		if qry.binding == nil {
 			values = qry.values
 			values = qry.values
 		} else {
 		} else {
-			binding := &QueryInfo{
-				Id:   info.preparedID,
-				Args: info.reqMeta.columns,
-				Rval: info.respMeta.columns,
-			}
-
-			values, err = qry.binding(binding)
+			values, err = qry.binding(info)
 			if err != nil {
 			if err != nil {
 				return &Iter{err: err}
 				return &Iter{err: err}
 			}
 			}
 		}
 		}
 
 
-		if len(values) != len(info.reqMeta.columns) {
+		if len(values) != len(info.Args) {
 			return &Iter{err: ErrQueryArgLength}
 			return &Iter{err: ErrQueryArgLength}
 		}
 		}
+
 		params.values = make([]queryValues, len(values))
 		params.values = make([]queryValues, len(values))
 		for i := 0; i < len(values); i++ {
 		for i := 0; i < len(values); i++ {
-			val, err := Marshal(info.reqMeta.columns[i].TypeInfo, values[i])
+			val, err := Marshal(info.Args[i].TypeInfo, values[i])
 			if err != nil {
 			if err != nil {
 				return &Iter{err: err}
 				return &Iter{err: err}
 			}
 			}
@@ -631,7 +651,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 		}
 
 
 		frame = &writeExecuteFrame{
 		frame = &writeExecuteFrame{
-			preparedID: info.preparedID,
+			preparedID: info.Id,
 			params:     params,
 			params:     params,
 		}
 		}
 	} else {
 	} else {
@@ -641,18 +661,28 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 		}
 	}
 	}
 
 
-	resp, err := c.exec(frame, qry.trace)
+	framer, err := c.exec(frame, qry.trace)
 	if err != nil {
 	if err != nil {
 		return &Iter{err: err}
 		return &Iter{err: err}
 	}
 	}
 
 
+	resp, err := framer.parseFrame()
+	if err != nil {
+		return &Iter{err: err}
+	}
+
+	if len(framer.traceID) > 0 {
+		qry.trace.Trace(framer.traceID)
+	}
+
 	switch x := resp.(type) {
 	switch x := resp.(type) {
 	case *resultVoidFrame:
 	case *resultVoidFrame:
-		return &Iter{}
+		return &Iter{framer: framer}
 	case *resultRowsFrame:
 	case *resultRowsFrame:
 		iter := &Iter{
 		iter := &Iter{
-			meta: x.meta,
-			rows: x.rows,
+			meta:   x.meta,
+			rows:   x.rows,
+			framer: framer,
 		}
 		}
 
 
 		if len(x.meta.pagingState) > 0 && !qry.disableAutoPage {
 		if len(x.meta.pagingState) > 0 && !qry.disableAutoPage {
@@ -669,7 +699,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 
 
 		return iter
 		return iter
 	case *resultKeyspaceFrame, *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable:
 	case *resultKeyspaceFrame, *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable:
-		return &Iter{}
+		return &Iter{framer: framer}
 	case *RequestErrUnprepared:
 	case *RequestErrUnprepared:
 		stmtsLRU.Lock()
 		stmtsLRU.Lock()
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
@@ -679,11 +709,14 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 			return c.executeQuery(qry)
 			return c.executeQuery(qry)
 		}
 		}
 		stmtsLRU.Unlock()
 		stmtsLRU.Unlock()
-		return &Iter{err: x}
+		return &Iter{err: x, framer: framer}
 	case error:
 	case error:
-		return &Iter{err: x}
+		return &Iter{err: x, framer: framer}
 	default:
 	default:
-		return &Iter{err: NewErrProtocol("Unknown type in response to execute query (%T): %s", x, x)}
+		return &Iter{
+			err:    NewErrProtocol("Unknown type in response to execute query (%T): %s", x, x),
+			framer: framer,
+		}
 	}
 	}
 }
 }
 
 
@@ -710,7 +743,12 @@ func (c *Conn) UseKeyspace(keyspace string) error {
 	q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
 	q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
 	q.params.consistency = Any
 	q.params.consistency = Any
 
 
-	resp, err := c.exec(q, nil)
+	framer, err := c.exec(q, nil)
+	if err != nil {
+		return err
+	}
+
+	resp, err := framer.parseFrame()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -757,28 +795,23 @@ func (c *Conn) executeBatch(batch *Batch) (*Iter, error) {
 			if entry.binding == nil {
 			if entry.binding == nil {
 				args = entry.Args
 				args = entry.Args
 			} else {
 			} else {
-				binding := &QueryInfo{
-					Id:   info.preparedID,
-					Args: info.reqMeta.columns,
-					Rval: info.respMeta.columns,
-				}
-				args, err = entry.binding(binding)
+				args, err = entry.binding(info)
 				if err != nil {
 				if err != nil {
 					return nil, err
 					return nil, err
 				}
 				}
 			}
 			}
 
 
-			if len(args) != len(info.reqMeta.columns) {
+			if len(args) != len(info.Args) {
 				return nil, ErrQueryArgLength
 				return nil, ErrQueryArgLength
 			}
 			}
 
 
-			b.preparedID = info.preparedID
-			stmts[string(info.preparedID)] = entry.Stmt
+			b.preparedID = info.Id
+			stmts[string(info.Id)] = entry.Stmt
 
 
-			b.values = make([]queryValues, len(info.reqMeta.columns))
+			b.values = make([]queryValues, len(info.Args))
 
 
-			for j := 0; j < len(info.reqMeta.columns); j++ {
-				val, err := Marshal(info.reqMeta.columns[j].TypeInfo, args[j])
+			for j := 0; j < len(info.Args); j++ {
+				val, err := Marshal(info.Args[j].TypeInfo, args[j])
 				if err != nil {
 				if err != nil {
 					return nil, err
 					return nil, err
 				}
 				}
@@ -792,13 +825,19 @@ func (c *Conn) executeBatch(batch *Batch) (*Iter, error) {
 	}
 	}
 
 
 	// TODO: should batch support tracing?
 	// TODO: should batch support tracing?
-	resp, err := c.exec(req, nil)
+	framer, err := c.exec(req, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := framer.parseFrame()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
 	switch x := resp.(type) {
 	switch x := resp.(type) {
 	case *resultVoidFrame:
 	case *resultVoidFrame:
+		framerPool.Put(framer)
 		return nil, nil
 		return nil, nil
 	case *RequestErrUnprepared:
 	case *RequestErrUnprepared:
 		stmt, found := stmts[string(x.StatementId)]
 		stmt, found := stmts[string(x.StatementId)]
@@ -807,6 +846,9 @@ func (c *Conn) executeBatch(batch *Batch) (*Iter, error) {
 			stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
 			stmtsLRU.lru.Remove(c.addr + c.currentKeyspace + stmt)
 			stmtsLRU.Unlock()
 			stmtsLRU.Unlock()
 		}
 		}
+
+		framerPool.Put(framer)
+
 		if found {
 		if found {
 			return c.executeBatch(batch)
 			return c.executeBatch(batch)
 		} else {
 		} else {
@@ -814,14 +856,17 @@ func (c *Conn) executeBatch(batch *Batch) (*Iter, error) {
 		}
 		}
 	case *resultRowsFrame:
 	case *resultRowsFrame:
 		iter := &Iter{
 		iter := &Iter{
-			meta: x.meta,
-			rows: x.rows,
+			meta:   x.meta,
+			rows:   x.rows,
+			framer: framer,
 		}
 		}
 
 
 		return iter, nil
 		return iter, nil
 	case error:
 	case error:
+		framerPool.Put(framer)
 		return nil, x
 		return nil, x
 	default:
 	default:
+		framerPool.Put(framer)
 		return nil, NewErrProtocol("Unknown type in response to batch statement: %s", x)
 		return nil, NewErrProtocol("Unknown type in response to batch statement: %s", x)
 	}
 	}
 }
 }
@@ -900,7 +945,7 @@ func (c *Conn) awaitSchemaAgreement() (err error) {
 }
 }
 
 
 type inflightPrepare struct {
 type inflightPrepare struct {
-	info *resultPreparedFrame
+	info QueryInfo
 	err  error
 	err  error
 	wg   sync.WaitGroup
 	wg   sync.WaitGroup
 }
 }

+ 3 - 10
frame.go

@@ -526,7 +526,7 @@ func (f *framer) parseErrorFrame() frame {
 		stmtId := f.readShortBytes()
 		stmtId := f.readShortBytes()
 		return &RequestErrUnprepared{
 		return &RequestErrUnprepared{
 			errorFrame:  errD,
 			errorFrame:  errD,
-			StatementId: stmtId,
+			StatementId: copyBytes(stmtId), // defensivly copy
 		}
 		}
 	case errReadFailure:
 	case errReadFailure:
 		res := &RequestErrReadFailure{
 		res := &RequestErrReadFailure{
@@ -1492,13 +1492,7 @@ func (f *framer) readBytes() []byte {
 		panic(fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf)))
 		panic(fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf)))
 	}
 	}
 
 
-	// we cant make assumptions about the length of the life of the supplied byte
-	// slice so we defensivly copy it out of the underlying buffer. This has the
-	// downside of increasing allocs per read but will provide much greater memory
-	// safety. The allocs can hopefully be improved in the future.
-	// TODO: dont copy into a new slice
-	l := make([]byte, size)
-	copy(l, f.rbuf[:size])
+	l := f.rbuf[:size]
 	f.rbuf = f.rbuf[size:]
 	f.rbuf = f.rbuf[size:]
 
 
 	return l
 	return l
@@ -1510,8 +1504,7 @@ func (f *framer) readShortBytes() []byte {
 		panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf)))
 		panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf)))
 	}
 	}
 
 
-	l := make([]byte, size)
-	copy(l, f.rbuf[:size])
+	l := f.rbuf[:size]
 	f.rbuf = f.rbuf[size:]
 	f.rbuf = f.rbuf[size:]
 
 
 	return l
 	return l

+ 48 - 0
framer_bench_test.go

@@ -0,0 +1,48 @@
+package gocql
+
+import (
+	"compress/gzip"
+	"io/ioutil"
+	"os"
+	"testing"
+)
+
+func readGzipData(path string) ([]byte, error) {
+	f, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	r, err := gzip.NewReader(f)
+	if err != nil {
+		return nil, err
+	}
+	defer r.Close()
+
+	return ioutil.ReadAll(r)
+}
+
+func BenchmarkParseRowsFrame(b *testing.B) {
+	data, err := readGzipData("testdata/frames/bench_parse_result.gz")
+	if err != nil {
+		b.Fatal(err)
+	}
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		framer := &framer{
+			header: &frameHeader{
+				version: protoVersion4 | 0x80,
+				op:      opResult,
+				length:  len(data),
+			},
+			rbuf: data,
+		}
+
+		_, err = framer.parseFrame()
+		if err != nil {
+			b.Fatal(err)
+		}
+	}
+}

+ 6 - 0
helpers.go

@@ -180,3 +180,9 @@ func (iter *Iter) MapScan(m map[string]interface{}) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+func copyBytes(p []byte) []byte {
+	b := make([]byte, len(p))
+	copy(b, p)
+	return b
+}

+ 19 - 8
session.go

@@ -151,9 +151,10 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 }
 }
 
 
 type QueryInfo struct {
 type QueryInfo struct {
-	Id   []byte
-	Args []ColumnInfo
-	Rval []ColumnInfo
+	Id          []byte
+	Args        []ColumnInfo
+	Rval        []ColumnInfo
+	PKeyColumns []int
 }
 }
 
 
 // Bind generates a new query object based on the query statement passed in.
 // Bind generates a new query object based on the query statement passed in.
@@ -286,7 +287,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	s.routingKeyInfoCache.mu.Unlock()
 	s.routingKeyInfoCache.mu.Unlock()
 
 
 	var (
 	var (
-		prepared     *resultPreparedFrame
+		info         *QueryInfo
 		partitionKey []*ColumnMetadata
 		partitionKey []*ColumnMetadata
 	)
 	)
 
 
@@ -300,20 +301,20 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		return nil, inflight.err
 		return nil, inflight.err
 	}
 	}
 
 
-	prepared, inflight.err = conn.prepareStatement(stmt, nil)
+	info, inflight.err = conn.prepareStatement(stmt, nil)
 	if inflight.err != nil {
 	if inflight.err != nil {
 		// don't cache this error
 		// don't cache this error
 		s.routingKeyInfoCache.Remove(stmt)
 		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 		return nil, inflight.err
 	}
 	}
 
 
-	if len(prepared.reqMeta.columns) == 0 {
+	if len(info.Args) == 0 {
 		// no arguments, no routing key, and no error
 		// no arguments, no routing key, and no error
 		return nil, nil
 		return nil, nil
 	}
 	}
 
 
 	// get the table metadata
 	// get the table metadata
-	table := prepared.reqMeta.columns[0].Table
+	table := info.Args[0].Table
 
 
 	var keyspaceMetadata *KeyspaceMetadata
 	var keyspaceMetadata *KeyspaceMetadata
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
@@ -346,7 +347,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		routingKeyInfo.indexes[keyIndex] = -1
 		routingKeyInfo.indexes[keyIndex] = -1
 
 
 		// find the column in the query info
 		// find the column in the query info
-		for argIndex, boundColumn := range prepared.reqMeta.columns {
+		for argIndex, boundColumn := range info.Args {
 			if keyColumn.Name == boundColumn.Name {
 			if keyColumn.Name == boundColumn.Name {
 				// there may be many such bound columns, pick the first
 				// there may be many such bound columns, pick the first
 				routingKeyInfo.indexes[keyIndex] = argIndex
 				routingKeyInfo.indexes[keyIndex] = argIndex
@@ -763,6 +764,9 @@ type Iter struct {
 	rows [][][]byte
 	rows [][][]byte
 	meta resultMetadata
 	meta resultMetadata
 	next *nextIter
 	next *nextIter
+
+	framer *framer
+	once   sync.Once
 }
 }
 
 
 // Columns returns the name and type of the selected columns.
 // Columns returns the name and type of the selected columns.
@@ -836,6 +840,13 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
 // Close closes the iterator and returns any errors that happened during
 // Close closes the iterator and returns any errors that happened during
 // the query or the iteration.
 // the query or the iteration.
 func (iter *Iter) Close() error {
 func (iter *Iter) Close() error {
+	iter.once.Do(func() {
+		if iter.framer != nil {
+			framerPool.Put(iter.framer)
+			iter.framer = nil
+		}
+	})
+
 	return iter.err
 	return iter.err
 }
 }
 
 

BIN
testdata/frames/bench_parse_result.gz