Pārlūkot izejas kodu

Merge pull request #471 from Zariel/expose-page-state

Expose page state
Chris Bannister 10 gadi atpakaļ
vecāks
revīzija
bda24f1626
3 mainītis faili ar 69 papildinājumiem un 1 dzēšanām
  1. 51 0
      cassandra_test.go
  2. 1 1
      conn.go
  3. 17 0
      session.go

+ 51 - 0
cassandra_test.go

@@ -2075,3 +2075,54 @@ func TestNegativeStream(t *testing.T) {
 		t.Fatalf("expected to get nil frame got %+v", frame)
 	}
 }
+
+func TestManualQueryPaging(t *testing.T) {
+	const rowsToInsert = 5
+
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE testManualPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < rowsToInsert; i++ {
+		err := session.Query("INSERT INTO testManualPaging(id, count) VALUES(?, ?)", i, i*i).Exec()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// disable auto paging, 1 page per iteration
+	query := session.Query("SELECT id, count FROM testManualPaging").PageState(nil).PageSize(2)
+	var id, count, fetched int
+
+	iter := query.Iter()
+	// NOTE: this isnt very indicitive of how it should be used, the idea is that
+	// the page state is returned to some client who will send it back to manually
+	// page through the results.
+	for {
+		for iter.Scan(&id, &count) {
+			if count != (id * id) {
+				t.Fatalf("got wrong value from iteration: got %d expected %d", count, id*id)
+			}
+
+			fetched++
+		}
+
+		if len(iter.PageState()) > 0 {
+			// more pages
+			iter = query.PageState(iter.PageState()).Iter()
+		} else {
+			break
+		}
+	}
+
+	if err := iter.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	if fetched != rowsToInsert {
+		t.Fatalf("expected to fetch %d rows got %d", fetched, rowsToInsert)
+	}
+}

+ 1 - 1
conn.go

@@ -640,7 +640,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 			rows: x.rows,
 		}
 
-		if len(x.meta.pagingState) > 0 {
+		if len(x.meta.pagingState) > 0 && !qry.disableAutoPage {
 			iter.next = &nextIter{
 				qry: *qry,
 				pos: int((1 - qry.prefetch) * float64(len(iter.rows))),

+ 17 - 0
session.go

@@ -432,6 +432,8 @@ type Query struct {
 	totalLatency     int64
 	serialCons       SerialConsistency
 	defaultTimestamp bool
+
+	disableAutoPage bool
 }
 
 // String implements the stringer interface.
@@ -611,6 +613,15 @@ func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
 	return q
 }
 
+// PageState sets the paging state for the query to resume paging from a specific
+// point in time. Setting this will disable to query paging for this query, and
+// must be used for all subsequent pages.
+func (q *Query) PageState(state []byte) *Query {
+	q.pageState = state
+	q.disableAutoPage = true
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.Iter()
@@ -791,6 +802,12 @@ func (iter *Iter) checkErrAndNotFound() error {
 	return nil
 }
 
+// PageState return the current paging state for a query which can be used for
+// subsequent quries to resume paging this point.
+func (iter *Iter) PageState() []byte {
+	return iter.meta.pagingState
+}
+
 type nextIter struct {
 	qry  Query
 	pos  int