소스 검색

support for prefetching and lighter Iter struct

Christoph Hack 12 년 전
부모
커밋
1a181ce13a
2개의 변경된 파일73개의 추가작업 그리고 26개의 파일을 삭제
  1. 17 6
      conn.go
  2. 56 20
      session.go

+ 17 - 6
conn.go

@@ -301,20 +301,31 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 	}
 	resp, err := c.exec(op, qry.trace)
 	if err != nil {
-		return &Iter{qry: qry, err: err}
+		return &Iter{err: err}
 	}
 	switch x := resp.(type) {
 	case resultVoidFrame:
-		return &Iter{qry: qry}
+		return &Iter{}
 	case resultRowsFrame:
-		return &Iter{qry: qry, columns: x.Columns, rows: x.Rows, pageState: x.PagingState}
+		iter := &Iter{columns: x.Columns, rows: x.Rows}
+		if len(x.PagingState) > 0 {
+			iter.next = &nextIter{
+				qry: *qry,
+				pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
+			}
+			iter.next.qry.pageState = x.PagingState
+			if iter.next.pos < 1 {
+				iter.next.pos = 1
+			}
+		}
+		return iter
 	case resultKeyspaceFrame:
 		c.cluster.HandleKeyspace(c, x.Keyspace)
-		return &Iter{qry: qry}
+		return &Iter{}
 	case error:
-		return &Iter{qry: qry, err: x}
+		return &Iter{err: x}
 	default:
-		return &Iter{qry: qry, err: ErrProtocol}
+		return &Iter{err: ErrProtocol}
 	}
 }
 

+ 56 - 20
session.go

@@ -21,34 +21,46 @@ type Session struct {
 	Node     Node
 	cons     Consistency
 	pageSize int
+	prefetch float64
 	trace    Tracer
 	mu       sync.RWMutex
 }
 
 // NewSession wraps an existing Node.
 func NewSession(node Node) *Session {
-	return &Session{Node: node, cons: Quorum}
+	return &Session{Node: node, cons: Quorum, prefetch: 0.25}
 }
 
-// Consistency sets the default consistency level for this session. This
-// setting can also be changed on a per-query basis.
-func (s *Session) Consistency(cons Consistency) {
+// SetConsistency sets the default consistency level for this session. This
+// setting can also be changed on a per-query basis and the default value
+// is Quorum.
+func (s *Session) SetConsistency(cons Consistency) {
 	s.mu.Lock()
 	s.cons = cons
 	s.mu.Unlock()
 }
 
-// PageSize sets the default page size for this session. A value <= 0 will
+// SetPageSize sets the default page size for this session. A value <= 0 will
 // disable paging. This setting can also be changed on a per-query basis.
-func (s *Session) PageSize(n int) {
+func (s *Session) SetPageSize(n int) {
 	s.mu.Lock()
 	s.pageSize = n
 	s.mu.Unlock()
 }
 
-// Trace sets the default tracer for this session. This setting can also
+// SetPrefetch sets the default threshold for pre-fetching new pages. If
+// there are only p*pageSize rows remaining, the next page will be requested
+// automatically. This value can also be changed on a per-query basis and
+// the default value is 0.25.
+func (s *Session) SetPrefetch(p float64) {
+	s.mu.Lock()
+	s.prefetch = p
+	s.mu.Unlock()
+}
+
+// SetTrace sets the default tracer for this session. This setting can also
 // be changed on a per-query basis.
-func (s *Session) Trace(trace Tracer) {
+func (s *Session) SetTrace(trace Tracer) {
 	s.mu.Lock()
 	s.trace = trace
 	s.mu.Unlock()
@@ -60,7 +72,8 @@ func (s *Session) Trace(trace Tracer) {
 func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	s.mu.RLock()
 	qry := &Query{stmt: stmt, values: values, cons: s.cons,
-		session: s, pageSize: s.pageSize, trace: s.trace}
+		session: s, pageSize: s.pageSize, trace: s.trace,
+		prefetch: s.prefetch}
 	s.mu.RUnlock()
 	return qry
 }
@@ -74,7 +87,7 @@ func (s *Session) Close() {
 func (s *Session) executeQuery(qry *Query) *Iter {
 	conn := s.Node.Pick(nil)
 	if conn == nil {
-		return &Iter{qry: qry, err: ErrUnavailable}
+		return &Iter{err: ErrUnavailable}
 	}
 	return conn.executeQuery(qry)
 }
@@ -94,6 +107,7 @@ type Query struct {
 	cons      Consistency
 	pageSize  int
 	pageState []byte
+	prefetch  float64
 	trace     Tracer
 	session   *Session
 }
@@ -122,6 +136,14 @@ func (q *Query) PageSize(n int) *Query {
 	return q
 }
 
+// SetPrefetch sets the default threshold for pre-fetching new pages. If
+// there are only p*pageSize rows remaining, the next page will be requested
+// automatically.
+func (q *Query) Prefetch(p float64) *Query {
+	q.prefetch = p
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.session.executeQuery(q)
@@ -147,12 +169,11 @@ func (q *Query) Scan(dest ...interface{}) error {
 // were returned by a query. The iterator might send additional queries to the
 // database during the iteration if paging was enabled.
 type Iter struct {
-	err       error
-	pos       int
-	rows      [][][]byte
-	columns   []ColumnInfo
-	qry       *Query
-	pageState []byte
+	err     error
+	pos     int
+	rows    [][][]byte
+	columns []ColumnInfo
+	next    *nextIter
 }
 
 // Columns returns the name and type of the selected columns.
@@ -173,14 +194,15 @@ func (iter *Iter) Scan(dest ...interface{}) bool {
 		return false
 	}
 	if iter.pos >= len(iter.rows) {
-		if len(iter.pageState) > 0 {
-			qry := *iter.qry
-			qry.pageState = iter.pageState
-			*iter = *iter.qry.session.executeQuery(&qry)
+		if iter.next != nil {
+			*iter = *iter.next.fetch()
 			return iter.Scan(dest...)
 		}
 		return false
 	}
+	if iter.next != nil && iter.pos == iter.next.pos {
+		go iter.next.fetch()
+	}
 	if len(dest) != len(iter.columns) {
 		iter.err = errors.New("count mismatch")
 		return false
@@ -202,6 +224,20 @@ func (iter *Iter) Close() error {
 	return iter.err
 }
 
+type nextIter struct {
+	qry  Query
+	pos  int
+	once sync.Once
+	next *Iter
+}
+
+func (n *nextIter) fetch() *Iter {
+	n.once.Do(func() {
+		n.next = n.qry.session.executeQuery(&n.qry)
+	})
+	return n.next
+}
+
 type Batch struct {
 	Type    BatchType
 	Entries []BatchEntry