Forráskód Böngészése

Dont read row bytes from framer until scan

Delay reading the column bytes until scanning so we dont allocate a
large number of byte slices upfront, this has the effect of reducing GC
pressure, which under testing makes reads around 20% faster.
Chris Bannister 9 éve
szülő
commit
ddda62507c
3 módosított fájl, 52 hozzáadás és 41 törlés
  1. 7 6
      conn.go
  2. 20 23
      frame.go
  3. 25 12
      session.go

+ 7 - 6
conn.go

@@ -754,8 +754,9 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		return &Iter{framer: framer}
 	case *resultRowsFrame:
 		iter := &Iter{
-			rows:   x.rows,
-			framer: framer,
+			meta:    x.meta,
+			framer:  framer,
+			numRows: x.numRows,
 		}
 
 		if params.skipMeta {
@@ -772,7 +773,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		if len(x.meta.pagingState) > 0 && !qry.disableAutoPage {
 			iter.next = &nextIter{
 				qry: *qry,
-				pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
+				pos: int((1 - qry.prefetch) * float64(x.numRows)),
 			}
 
 			iter.next.qry.pageState = copyBytes(x.meta.pagingState)
@@ -958,9 +959,9 @@ func (c *Conn) executeBatch(batch *Batch) *Iter {
 		}
 	case *resultRowsFrame:
 		iter := &Iter{
-			meta:   x.meta,
-			rows:   x.rows,
-			framer: framer,
+			meta:    x.meta,
+			framer:  framer,
+			numRows: x.numRows,
 		}
 
 		return iter

+ 20 - 23
frame.go

@@ -935,7 +935,8 @@ type resultRowsFrame struct {
 	frameHeader
 
 	meta resultMetadata
-	rows [][][]byte
+	// dont parse the rows here as we only need to do it once
+	numRows int
 }
 
 func (f *resultRowsFrame) String() string {
@@ -943,28 +944,15 @@ func (f *resultRowsFrame) String() string {
 }
 
 func (f *framer) parseResultRows() frame {
-	meta := f.parseResultMetadata()
+	result := &resultRowsFrame{}
+	result.meta = f.parseResultMetadata()
 
-	numRows := f.readInt()
-	if numRows < 0 {
-		panic(fmt.Errorf("invalid row_count in result frame: %d", numRows))
+	result.numRows = f.readInt()
+	if result.numRows < 0 {
+		panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows))
 	}
 
-	colCount := meta.colCount
-
-	rows := make([][][]byte, numRows)
-	for i := 0; i < numRows; i++ {
-		rows[i] = make([][]byte, colCount)
-		for j := 0; j < colCount; j++ {
-			rows[i][j] = f.readBytes()
-		}
-	}
-
-	return &resultRowsFrame{
-		frameHeader: *f.header,
-		meta:        meta,
-		rows:        rows,
-	}
+	return result
 }
 
 type resultKeyspaceFrame struct {
@@ -1563,19 +1551,28 @@ func (f *framer) readStringList() []string {
 	return l
 }
 
-func (f *framer) readBytes() []byte {
+func (f *framer) readBytesInternal() ([]byte, error) {
 	size := f.readInt()
 	if size < 0 {
-		return nil
+		return nil, nil
 	}
 
 	if len(f.rbuf) < size {
-		panic(fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf)))
+		return nil, fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf))
 	}
 
 	l := f.rbuf[:size]
 	f.rbuf = f.rbuf[size:]
 
+	return l, nil
+}
+
+func (f *framer) readBytes() []byte {
+	l, err := f.readBytesInternal()
+	if err != nil {
+		panic(err)
+	}
+
 	return l
 }
 

+ 25 - 12
session.go

@@ -865,12 +865,12 @@ func (q *Query) MapScanCAS(dest map[string]interface{}) (applied bool, err error
 // were returned by a query. The iterator might send additional queries to the
 // database during the iteration if paging was enabled.
 type Iter struct {
-	err  error
-	pos  int
-	rows [][][]byte
-	meta resultMetadata
-	next *nextIter
-	host *HostInfo
+	err     error
+	pos     int
+	meta    resultMetadata
+	numRows int
+	next    *nextIter
+	host    *HostInfo
 
 	framer *framer
 	once   sync.Once
@@ -886,6 +886,10 @@ func (iter *Iter) Columns() []ColumnInfo {
 	return iter.meta.columns
 }
 
+func (iter *Iter) readColumn() ([]byte, error) {
+	return iter.framer.readBytesInternal()
+}
+
 // Scan consumes the next row of the iterator and copies the columns of the
 // current row into the values pointed at by dest. Use nil as a dest value
 // to skip the corresponding column. Scan might send additional queries
@@ -898,13 +902,15 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
 	if iter.err != nil {
 		return false
 	}
-	if iter.pos >= len(iter.rows) {
+
+	if iter.pos >= iter.numRows {
 		if iter.next != nil {
 			*iter = *iter.next.fetch()
 			return iter.Scan(dest...)
 		}
 		return false
 	}
+
 	if iter.next != nil && iter.pos == iter.next.pos {
 		go iter.next.fetch()
 	}
@@ -919,7 +925,14 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
 	// i is the current position in dest, could posible replace it and just use
 	// slices of dest
 	i := 0
-	for c, col := range iter.meta.columns {
+	for c := range iter.meta.columns {
+		col := &iter.meta.columns[c]
+		colBytes, err := iter.readColumn()
+		if err != nil {
+			iter.err = err
+			return false
+		}
+
 		if dest[i] == nil {
 			i++
 			continue
@@ -933,10 +946,10 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
 			count := len(tuple.Elems)
 			// here we pass in a slice of the struct which has the number number of
 			// values as elements in the tuple
-			iter.err = Unmarshal(col.TypeInfo, iter.rows[iter.pos][c], dest[i:i+count])
+			iter.err = Unmarshal(col.TypeInfo, colBytes, dest[i:i+count])
 			i += count
 		default:
-			iter.err = Unmarshal(col.TypeInfo, iter.rows[iter.pos][c], dest[i])
+			iter.err = Unmarshal(col.TypeInfo, colBytes, dest[i])
 			i++
 		}
 
@@ -965,14 +978,14 @@ func (iter *Iter) Close() error {
 // WillSwitchPage detects if iterator reached end of current page
 // and the next page is available.
 func (iter *Iter) WillSwitchPage() bool {
-	return iter.pos >= len(iter.rows) && iter.next != nil
+	return iter.pos >= iter.numRows && iter.next != nil
 }
 
 // checkErrAndNotFound handle error and NotFound in one method.
 func (iter *Iter) checkErrAndNotFound() error {
 	if iter.err != nil {
 		return iter.err
-	} else if len(iter.rows) == 0 {
+	} else if iter.numRows == 0 {
 		return ErrNotFound
 	}
 	return nil