瀏覽代碼

iteration over paged results

Christoph Hack 12 年之前
父節點
當前提交
83c5eba575
共有 7 個文件被更改,包括 313 次插入180 次删除
  1. 4 0
      README.md
  2. 2 12
      cluster.go
  3. 82 116
      conn.go
  4. 119 9
      frame.go
  5. 21 0
      gocql_test/main.go
  6. 82 23
      session.go
  7. 3 20
      topology.go

+ 4 - 0
README.md

@@ -4,6 +4,8 @@ gocql
 Package gocql implements a fast and robust Cassandra client for the
 Package gocql implements a fast and robust Cassandra client for the
 Go programming language.
 Go programming language.
 
 
+**Attention:** This package is currently actively developed and the API may change in the future.
+
 Installation
 Installation
 ------------
 ------------
 
 
@@ -25,6 +27,8 @@ Features
   * round robin distribution of queries to different hosts
   * round robin distribution of queries to different hosts
   * round robin distribution of queries to different connections on a host
   * round robin distribution of queries to different connections on a host
   * each connection can execute up to 128 concurrent queries
   * each connection can execute up to 128 concurrent queries
+* iteration over paged results with configurable page size
+* optional frame compression (using snappy)
 * automatic query preparation
 * automatic query preparation
 
 
 Example
 Example

+ 2 - 12
cluster.go

@@ -203,18 +203,8 @@ func (c *clusterImpl) HandleKeyspace(conn *Conn, keyspace string) {
 	}
 	}
 }
 }
 
 
-func (c *clusterImpl) ExecuteQuery(qry *Query) (*Iter, error) {
-	if qry.Cons == 0 {
-		qry.Cons = c.cfg.Consistency
-	}
-	return c.hostPool.ExecuteQuery(qry)
-}
-
-func (c *clusterImpl) ExecuteBatch(batch *Batch) error {
-	if batch.Cons == 0 {
-		batch.Cons = c.cfg.Consistency
-	}
-	return c.hostPool.ExecuteBatch(batch)
+func (c *clusterImpl) Pick(qry *Query) *Conn {
+	return c.hostPool.Pick(qry)
 }
 }
 
 
 func (c *clusterImpl) Close() {
 func (c *clusterImpl) Close() {

+ 82 - 116
conn.go

@@ -94,16 +94,13 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 }
 }
 
 
 func (c *Conn) startup(cfg *ConnConfig) error {
 func (c *Conn) startup(cfg *ConnConfig) error {
-	req := make(frame, headerSize, defaultFrameSize)
-	req.setHeader(c.version, 0, 0, opStartup)
-	m := map[string]string{
-		"CQL_VERSION": cfg.CQLVersion,
+	req := &startupFrame{
+		CQLVersion: cfg.CQLVersion,
 	}
 	}
 	if c.compressor != nil {
 	if c.compressor != nil {
-		m["COMPRESSION"] = c.compressor.Name()
+		req.Compression = c.compressor.Name()
 	}
 	}
-	req.writeStringMap(m)
-	resp, err := c.callSimple(req)
+	resp, err := c.execSimple(req)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -176,29 +173,25 @@ func (c *Conn) recv() (frame, error) {
 	return resp, nil
 	return resp, nil
 }
 }
 
 
-func (c *Conn) callSimple(req frame) (interface{}, error) {
-	req.setLength(len(req) - headerSize)
-	if _, err := c.conn.Write(req); err != nil {
+func (c *Conn) execSimple(op operation) (interface{}, error) {
+	f, err := op.encodeFrame(c.version, nil)
+	f.setLength(len(f) - headerSize)
+	if _, err := c.conn.Write([]byte(f)); err != nil {
 		c.conn.Close()
 		c.conn.Close()
 		return nil, err
 		return nil, err
 	}
 	}
-	buf, err := c.recv()
-	if err != nil {
+	if f, err = c.recv(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return c.decodeFrame(buf)
+	return c.decodeFrame(f)
 }
 }
 
 
-func (c *Conn) call(req frame) (interface{}, error) {
-	id := <-c.uniq
-	req[2] = id
-
-	call := &c.calls[id]
-	call.resp = make(chan callResp, 1)
-	atomic.AddInt32(&c.nwait, 1)
-	atomic.StoreInt32(&call.active, 1)
-
-	req.setLength(len(req) - headerSize)
+func (c *Conn) exec(op operation) (interface{}, error) {
+	//fmt.Printf("exec: %#v\n", op)
+	req, err := op.encodeFrame(c.version, nil)
+	if err != nil {
+		return nil, err
+	}
 	if len(req) > headerSize && c.compressor != nil {
 	if len(req) > headerSize && c.compressor != nil {
 		body, err := c.compressor.Encode([]byte(req[headerSize:]))
 		body, err := c.compressor.Encode([]byte(req[headerSize:]))
 		if err != nil {
 		if err != nil {
@@ -206,8 +199,16 @@ func (c *Conn) call(req frame) (interface{}, error) {
 		}
 		}
 		req = append(req[:headerSize], frame(body)...)
 		req = append(req[:headerSize], frame(body)...)
 		req[1] |= flagCompress
 		req[1] |= flagCompress
-		req.setLength(len(req) - headerSize)
 	}
 	}
+	req.setLength(len(req) - headerSize)
+
+	id := <-c.uniq
+	req[2] = id
+	call := &c.calls[id]
+	call.resp = make(chan callResp, 1)
+	atomic.AddInt32(&c.nwait, 1)
+	atomic.StoreInt32(&call.active, 1)
+
 	if n, err := c.conn.Write(req); err != nil {
 	if n, err := c.conn.Write(req); err != nil {
 		c.conn.Close()
 		c.conn.Close()
 		if n > 0 {
 		if n > 0 {
@@ -240,9 +241,7 @@ func (c *Conn) dispatch(resp frame) {
 }
 }
 
 
 func (c *Conn) ping() error {
 func (c *Conn) ping() error {
-	req := make(frame, headerSize)
-	req.setHeader(c.version, 0, 0, opOptions)
-	_, err := c.call(req)
+	_, err := c.exec(&optionsFrame{})
 	return err
 	return err
 }
 }
 
 
@@ -259,12 +258,7 @@ func (c *Conn) prepareStatement(stmt string) (*queryInfo, error) {
 	c.prep[stmt] = info
 	c.prep[stmt] = info
 	c.prepMu.Unlock()
 	c.prepMu.Unlock()
 
 
-	frame := make(frame, headerSize, defaultFrameSize)
-	frame.setHeader(c.version, 0, 0, opPrepare)
-	frame.writeLongString(stmt)
-	frame.setLength(len(frame) - headerSize)
-
-	resp, err := c.call(frame)
+	resp, err := c.exec(&prepareFrame{Stmt: stmt})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -282,50 +276,23 @@ func (c *Conn) prepareStatement(stmt string) (*queryInfo, error) {
 }
 }
 
 
 func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
 func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
-	var info *queryInfo
+	op := &queryFrame{Stmt: qry.Stmt, Cons: qry.Cons, PageSize: qry.PageSize}
 	if len(qry.Args) > 0 {
 	if len(qry.Args) > 0 {
-		var err error
-		info, err = c.prepareStatement(qry.Stmt)
+		info, err := c.prepareStatement(qry.Stmt)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-	}
-	req := make(frame, headerSize, defaultFrameSize)
-	if info == nil {
-		req.setHeader(c.version, 0, 0, opQuery)
-		req.writeLongString(qry.Stmt)
-		req.writeConsistency(qry.Cons)
-		if c.version > 1 {
-			req.writeByte(0)
-		}
-	} else {
-		req.setHeader(c.version, 0, 0, opExecute)
-		req.writeShortBytes(info.id)
-		if c.version == 1 {
-			req.writeShort(uint16(len(qry.Args)))
-		} else {
-			req.writeConsistency(qry.Cons)
-			flags := uint8(0)
-			if len(qry.Args) > 0 {
-				flags |= flagQueryValues
-			}
-			req.writeByte(flags)
-			if flags&flagQueryValues != 0 {
-				req.writeShort(uint16(len(qry.Args)))
-			}
-		}
+		op.Prepared = info.id
+		op.Values = make([][]byte, len(qry.Args))
 		for i := 0; i < len(qry.Args); i++ {
 		for i := 0; i < len(qry.Args); i++ {
 			val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
 			val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
 			if err != nil {
 			if err != nil {
 				return nil, err
 				return nil, err
 			}
 			}
-			req.writeBytes(val)
-		}
-		if c.version == 1 {
-			req.writeConsistency(qry.Cons)
+			op.Values[i] = val
 		}
 		}
 	}
 	}
-	resp, err := c.call(req)
+	resp, err := c.exec(op)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -344,53 +311,58 @@ func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
 }
 }
 
 
 func (c *Conn) ExecuteBatch(batch *Batch) error {
 func (c *Conn) ExecuteBatch(batch *Batch) error {
-	if c.version == 1 {
-		return ErrProtocol
-	}
-	frame := make(frame, headerSize, defaultFrameSize)
-	frame.setHeader(c.version, 0, 0, opBatch)
-	frame.writeByte(byte(batch.Type))
-	frame.writeShort(uint16(len(batch.Entries)))
-	for i := 0; i < len(batch.Entries); i++ {
-		entry := &batch.Entries[i]
-		var info *queryInfo
-		if len(entry.Args) > 0 {
-			var err error
-			info, err = c.prepareStatement(entry.Stmt)
-			if err != nil {
-				return err
-			}
-			frame.writeByte(1)
-			frame.writeShortBytes(info.id)
-		} else {
-			frame.writeByte(0)
-			frame.writeLongString(entry.Stmt)
+	/*
+		if c.version == 1 {
+			return ErrUnsupported
 		}
 		}
-		frame.writeShort(uint16(len(entry.Args)))
-		for j := 0; j < len(entry.Args); j++ {
-			val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
-			if err != nil {
-				return err
+		frame := make(frame, headerSize, defaultFrameSize)
+		frame.setHeader(c.version, 0, 0, opBatch)
+		frame.writeByte(byte(batch.Type))
+		frame.writeShort(uint16(len(batch.Entries)))
+		for i := 0; i < len(batch.Entries); i++ {
+			entry := &batch.Entries[i]
+			var info *queryInfo
+			if len(entry.Args) > 0 {
+				var err error
+				info, err = c.prepareStatement(entry.Stmt)
+				if err != nil {
+					return err
+				}
+				frame.writeByte(1)
+				frame.writeShortBytes(info.id)
+			} else {
+				frame.writeByte(0)
+				frame.writeLongString(entry.Stmt)
+			}
+			frame.writeShort(uint16(len(entry.Args)))
+			for j := 0; j < len(entry.Args); j++ {
+				val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
+				if err != nil {
+					return err
+				}
+				frame.writeBytes(val)
 			}
 			}
-			frame.writeBytes(val)
 		}
 		}
-	}
-	frame.writeConsistency(batch.Cons)
+		frame.writeConsistency(batch.Cons)
 
 
-	resp, err := c.call(frame)
-	if err != nil {
-		return err
-	}
-	switch x := resp.(type) {
-	case resultVoidFrame:
-	case error:
-		return x
-	default:
-		return ErrProtocol
-	}
+		resp, err := c.call(frame)
+		if err != nil {
+			return err
+		}
+		switch x := resp.(type) {
+		case resultVoidFrame:
+		case error:
+			return x
+		default:
+			return ErrProtocol
+		}*/
 	return nil
 	return nil
 }
 }
 
 
+func (c *Conn) Pick(qry *Query) *Conn {
+	return c
+}
+
 func (c *Conn) Close() {
 func (c *Conn) Close() {
 	c.conn.Close()
 	c.conn.Close()
 }
 }
@@ -400,13 +372,7 @@ func (c *Conn) Address() string {
 }
 }
 
 
 func (c *Conn) UseKeyspace(keyspace string) error {
 func (c *Conn) UseKeyspace(keyspace string) error {
-	frame := make(frame, headerSize, defaultFrameSize)
-	frame.setHeader(c.version, 0, 0, opQuery)
-	frame.writeLongString("USE " + keyspace)
-	frame.writeConsistency(1)
-	frame.writeByte(0)
-
-	resp, err := c.call(frame)
+	resp, err := c.exec(&queryFrame{Stmt: "USE " + keyspace, Cons: Any})
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -450,7 +416,7 @@ func (c *Conn) decodeFrame(f frame) (rval interface{}, err error) {
 		case resultKindVoid:
 		case resultKindVoid:
 			return resultVoidFrame{}, nil
 			return resultVoidFrame{}, nil
 		case resultKindRows:
 		case resultKindRows:
-			columns := f.readMetaData()
+			columns, pageState := f.readMetaData()
 			numRows := f.readInt()
 			numRows := f.readInt()
 			values := make([][]byte, numRows*len(columns))
 			values := make([][]byte, numRows*len(columns))
 			for i := 0; i < len(values); i++ {
 			for i := 0; i < len(values); i++ {
@@ -460,13 +426,13 @@ func (c *Conn) decodeFrame(f frame) (rval interface{}, err error) {
 			for i := 0; i < len(values); i += len(columns) {
 			for i := 0; i < len(values); i += len(columns) {
 				rows[i] = values[i : i+len(columns)]
 				rows[i] = values[i : i+len(columns)]
 			}
 			}
-			return resultRowsFrame{columns, rows, nil}, nil
+			return resultRowsFrame{columns, rows, pageState}, nil
 		case resultKindKeyspace:
 		case resultKindKeyspace:
 			keyspace := f.readString()
 			keyspace := f.readString()
 			return resultKeyspaceFrame{keyspace}, nil
 			return resultKeyspaceFrame{keyspace}, nil
 		case resultKindPrepared:
 		case resultKindPrepared:
 			id := f.readShortBytes()
 			id := f.readShortBytes()
-			values := f.readMetaData()
+			values, _ := f.readMetaData()
 			return resultPreparedFrame{id, values}, nil
 			return resultPreparedFrame{id, values}, nil
 		case resultKindSchemaChanged:
 		case resultKindSchemaChanged:
 			return resultVoidFrame{}, nil
 			return resultVoidFrame{}, nil

+ 119 - 9
frame.go

@@ -37,6 +37,9 @@ const (
 
 
 	flagQueryValues uint8 = 1
 	flagQueryValues uint8 = 1
 	flagCompress    uint8 = 1
 	flagCompress    uint8 = 1
+	flagPageSize    uint8 = 4
+	flagPageState   uint8 = 8
+	flagHasMore     uint8 = 2
 
 
 	headerSize = 8
 	headerSize = 8
 )
 )
@@ -232,27 +235,31 @@ func (f *frame) readTypeInfo() *TypeInfo {
 	return typ
 	return typ
 }
 }
 
 
-func (f *frame) readMetaData() []ColumnInfo {
+func (f *frame) readMetaData() ([]ColumnInfo, []byte) {
 	flags := f.readInt()
 	flags := f.readInt()
 	numColumns := f.readInt()
 	numColumns := f.readInt()
+	var pageState []byte
+	if flags&2 != 0 {
+		pageState = f.readBytes()
+	}
 	globalKeyspace := ""
 	globalKeyspace := ""
 	globalTable := ""
 	globalTable := ""
 	if flags&1 != 0 {
 	if flags&1 != 0 {
 		globalKeyspace = f.readString()
 		globalKeyspace = f.readString()
 		globalTable = f.readString()
 		globalTable = f.readString()
 	}
 	}
-	info := make([]ColumnInfo, numColumns)
+	columns := make([]ColumnInfo, numColumns)
 	for i := 0; i < numColumns; i++ {
 	for i := 0; i < numColumns; i++ {
-		info[i].Keyspace = globalKeyspace
-		info[i].Table = globalTable
+		columns[i].Keyspace = globalKeyspace
+		columns[i].Table = globalTable
 		if flags&1 == 0 {
 		if flags&1 == 0 {
-			info[i].Keyspace = f.readString()
-			info[i].Table = f.readString()
+			columns[i].Keyspace = f.readString()
+			columns[i].Table = f.readString()
 		}
 		}
-		info[i].Name = f.readString()
-		info[i].TypeInfo = f.readTypeInfo()
+		columns[i].Name = f.readString()
+		columns[i].TypeInfo = f.readTypeInfo()
 	}
 	}
-	return info
+	return columns, pageState
 }
 }
 
 
 func (f *frame) writeConsistency(c Consistency) {
 func (f *frame) writeConsistency(c Consistency) {
@@ -299,3 +306,106 @@ type errorFrame struct {
 func (e errorFrame) Error() string {
 func (e errorFrame) Error() string {
 	return e.Message
 	return e.Message
 }
 }
+
+type operation interface {
+	encodeFrame(version uint8, dst frame) (frame, error)
+}
+
+type startupFrame struct {
+	CQLVersion  string
+	Compression string
+}
+
+func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
+	if f == nil {
+		f = make(frame, headerSize, defaultFrameSize)
+	}
+	f.setHeader(version, 0, 0, opStartup)
+	f.writeShort(1)
+	f.writeString("CQL_VERSION")
+	f.writeString(op.CQLVersion)
+	if op.Compression != "" {
+		f[headerSize+1] += 1
+		f.writeString("COMPRESSION")
+		f.writeString(op.Compression)
+	}
+	return f, nil
+}
+
+type queryFrame struct {
+	Stmt      string
+	Prepared  []byte
+	Cons      Consistency
+	Values    [][]byte
+	PageSize  int
+	PageState []byte
+}
+
+func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
+	if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
+		(len(op.Values) > 0 && len(op.Prepared) == 0)) {
+		return nil, ErrUnsupported
+	}
+	if f == nil {
+		f = make(frame, headerSize, defaultFrameSize)
+	}
+	if len(op.Prepared) > 0 {
+		f.setHeader(version, 0, 0, opExecute)
+		f.writeShortBytes(op.Prepared)
+	} else {
+		f.setHeader(version, 0, 0, opQuery)
+		f.writeLongString(op.Stmt)
+	}
+	if version >= 2 {
+		f.writeConsistency(op.Cons)
+		flagPos := len(f)
+		f.writeByte(0)
+		if len(op.Values) > 0 {
+			f[flagPos] |= flagQueryValues
+			f.writeShort(uint16(len(op.Values)))
+			for _, value := range op.Values {
+				f.writeBytes(value)
+			}
+		}
+		if op.PageSize > 0 {
+			f[flagPos] |= flagPageSize
+			f.writeInt(int32(op.PageSize))
+		}
+		if len(op.PageState) > 0 {
+			f[flagPos] |= flagPageState
+			f.writeBytes(op.PageState)
+		}
+	} else if version == 1 {
+		if len(op.Prepared) > 0 {
+			f.writeShort(uint16(len(op.Values)))
+			for _, value := range op.Values {
+				f.writeBytes(value)
+			}
+		}
+		f.writeConsistency(op.Cons)
+	}
+	return f, nil
+}
+
+type prepareFrame struct {
+	Stmt string
+}
+
+func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
+	if f == nil {
+		f = make(frame, headerSize, defaultFrameSize)
+	}
+	f.setHeader(version, 0, 0, opPrepare)
+	f.writeLongString(op.Stmt)
+	return f, nil
+}
+
+type optionsFrame struct{}
+
+func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
+	if f == nil {
+		f = make(frame, headerSize, defaultFrameSize)
+	}
+	f.setHeader(version, 0, 0, opOptions)
+	return f, nil
+}

+ 21 - 0
gocql_test/main.go

@@ -165,6 +165,27 @@ func main() {
 		}
 		}
 	}
 	}
 
 
+	if err := session.Query("CREATE TABLE large (id int primary key)").Exec(); err != nil {
+		log.Fatal("create table", err)
+	}
+	for i := 0; i < 100; i++ {
+		if err := session.Query("INSERT INTO large (id) VALUES (?)", i).Exec(); err != nil {
+			log.Fatal("insert", err)
+		}
+	}
+	iter := session.Query("SELECT id FROM large").PageSize(10).Iter()
+	var id int
+	count = 0
+	for iter.Scan(&id) {
+		count++
+	}
+	if err := iter.Close(); err != nil {
+		log.Fatal("large iter:", err)
+	}
+	if count != 100 {
+		log.Fatalf("expected %d, got %d", 100, count)
+	}
+
 	for _, original := range pageTestData {
 	for _, original := range pageTestData {
 		if err := session.Query("DELETE FROM page WHERE title = ? AND revid = ?",
 		if err := session.Query("DELETE FROM page WHERE title = ? AND revid = ?",
 			original.Title, original.RevId).Exec(); err != nil {
 			original.Title, original.RevId).Exec(); err != nil {

+ 82 - 23
session.go

@@ -20,9 +20,6 @@ type Session struct {
 
 
 // NewSession wraps an existing Node.
 // NewSession wraps an existing Node.
 func NewSession(node Node) *Session {
 func NewSession(node Node) *Session {
-	if s, ok := node.(*Session); ok {
-		return &Session{Node: s.Node}
-	}
 	return &Session{Node: node, Cons: Quorum}
 	return &Session{Node: node, Cons: Quorum}
 }
 }
 
 
@@ -47,18 +44,73 @@ func (s *Session) Close() {
 
 
 // ExecuteBatch executes a Batch on the underlying Node.
 // ExecuteBatch executes a Batch on the underlying Node.
 func (s *Session) ExecuteBatch(batch *Batch) error {
 func (s *Session) ExecuteBatch(batch *Batch) error {
-	if batch.Cons == 0 {
-		batch.Cons = s.Cons
-	}
-	return s.Node.ExecuteBatch(batch)
+	/*
+		if batch.Cons == 0 {
+			batch.Cons = s.Cons
+		}
+		return s.Node.ExecuteBatch(batch)
+	*/
+	return nil
 }
 }
 
 
 // ExecuteQuery executes a Query on the underlying Node.
 // ExecuteQuery executes a Query on the underlying Node.
-func (s *Session) ExecuteQuery(qry *Query) (*Iter, error) {
+func (s *Session) ExecuteQuery(qry *Query) *Iter {
+	return s.executeQuery(qry, nil)
+}
+
+func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
 	if qry.Cons == 0 {
 	if qry.Cons == 0 {
 		qry.Cons = s.Cons
 		qry.Cons = s.Cons
 	}
 	}
-	return s.Node.ExecuteQuery(qry)
+
+	conn := s.Node.Pick(qry)
+	if conn == nil {
+		return &Iter{err: ErrUnavailable}
+	}
+	op := &queryFrame{
+		Stmt:      qry.Stmt,
+		Cons:      qry.Cons,
+		PageSize:  qry.PageSize,
+		PageState: pageState,
+	}
+	if len(qry.Args) > 0 {
+		info, err := conn.prepareStatement(qry.Stmt)
+		if err != nil {
+			return &Iter{err: err}
+		}
+		op.Prepared = info.id
+		op.Values = make([][]byte, len(qry.Args))
+		for i := 0; i < len(qry.Args); i++ {
+			val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
+			if err != nil {
+				return &Iter{err: err}
+			}
+			op.Values[i] = val
+		}
+	}
+	resp, err := conn.exec(op)
+	if err != nil {
+		return &Iter{err: err}
+	}
+	switch x := resp.(type) {
+	case resultVoidFrame:
+		return &Iter{}
+	case resultRowsFrame:
+		iter := &Iter{columns: x.Columns, rows: x.Rows}
+		if len(x.PagingState) > 0 {
+			iter.session = s
+			iter.qry = qry
+			iter.pageState = x.PagingState
+		}
+		return iter
+	case resultKeyspaceFrame:
+		conn.cluster.HandleKeyspace(conn, x.Keyspace)
+		return &Iter{}
+	case error:
+		return &Iter{err: x}
+	default:
+		return &Iter{err: ErrProtocol}
+	}
 }
 }
 
 
 type Query struct {
 type Query struct {
@@ -75,8 +127,8 @@ func NewQuery(stmt string, args ...interface{}) *Query {
 }
 }
 
 
 type QueryBuilder struct {
 type QueryBuilder struct {
-	qry *Query
-	ctx Node
+	qry     *Query
+	session *Session
 }
 }
 
 
 // Args specifies the query parameters.
 // Args specifies the query parameters.
@@ -108,16 +160,12 @@ func (b QueryBuilder) PageSize(size int) QueryBuilder {
 }
 }
 
 
 func (b QueryBuilder) Exec() error {
 func (b QueryBuilder) Exec() error {
-	_, err := b.ctx.ExecuteQuery(b.qry)
-	return err
+	iter := b.session.ExecuteQuery(b.qry)
+	return iter.err
 }
 }
 
 
 func (b QueryBuilder) Iter() *Iter {
 func (b QueryBuilder) Iter() *Iter {
-	iter, err := b.ctx.ExecuteQuery(b.qry)
-	if err != nil {
-		return &Iter{err: err}
-	}
-	return iter
+	return b.session.ExecuteQuery(b.qry)
 }
 }
 
 
 func (b QueryBuilder) Scan(values ...interface{}) error {
 func (b QueryBuilder) Scan(values ...interface{}) error {
@@ -127,10 +175,13 @@ func (b QueryBuilder) Scan(values ...interface{}) error {
 }
 }
 
 
 type Iter struct {
 type Iter struct {
-	err     error
-	pos     int
-	rows    [][][]byte
-	columns []ColumnInfo
+	err       error
+	pos       int
+	rows      [][][]byte
+	columns   []ColumnInfo
+	qry       *Query
+	session   *Session
+	pageState []byte
 }
 }
 
 
 func (iter *Iter) Columns() []ColumnInfo {
 func (iter *Iter) Columns() []ColumnInfo {
@@ -138,7 +189,14 @@ func (iter *Iter) Columns() []ColumnInfo {
 }
 }
 
 
 func (iter *Iter) Scan(values ...interface{}) bool {
 func (iter *Iter) Scan(values ...interface{}) bool {
-	if iter.err != nil || iter.pos >= len(iter.rows) {
+	if iter.err != nil {
+		return false
+	}
+	if iter.pos >= len(iter.rows) {
+		if len(iter.pageState) > 0 {
+			*iter = *iter.session.executeQuery(iter.qry, iter.pageState)
+			return iter.Scan(values...)
+		}
 		return false
 		return false
 	}
 	}
 	if len(values) != len(iter.columns) {
 	if len(values) != len(iter.columns) {
@@ -240,4 +298,5 @@ var (
 	ErrNotFound    = errors.New("not found")
 	ErrNotFound    = errors.New("not found")
 	ErrUnavailable = errors.New("unavailable")
 	ErrUnavailable = errors.New("unavailable")
 	ErrProtocol    = errors.New("protocol error")
 	ErrProtocol    = errors.New("protocol error")
+	ErrUnsupported = errors.New("feature not supported")
 )
 )

+ 3 - 20
topology.go

@@ -10,8 +10,7 @@ import (
 )
 )
 
 
 type Node interface {
 type Node interface {
-	ExecuteQuery(qry *Query) (*Iter, error)
-	ExecuteBatch(batch *Batch) error
+	Pick(qry *Query) *Conn
 	Close()
 	Close()
 }
 }
 
 
@@ -51,23 +50,7 @@ func (r *RoundRobin) Size() int {
 	return n
 	return n
 }
 }
 
 
-func (r *RoundRobin) ExecuteQuery(qry *Query) (*Iter, error) {
-	node := r.pick()
-	if node == nil {
-		return nil, ErrUnavailable
-	}
-	return node.ExecuteQuery(qry)
-}
-
-func (r *RoundRobin) ExecuteBatch(batch *Batch) error {
-	node := r.pick()
-	if node == nil {
-		return ErrUnavailable
-	}
-	return node.ExecuteBatch(batch)
-}
-
-func (r *RoundRobin) pick() Node {
+func (r *RoundRobin) Pick(qry *Query) *Conn {
 	pos := atomic.AddUint32(&r.pos, 1)
 	pos := atomic.AddUint32(&r.pos, 1)
 	var node Node
 	var node Node
 	r.mu.RLock()
 	r.mu.RLock()
@@ -75,7 +58,7 @@ func (r *RoundRobin) pick() Node {
 		node = r.pool[pos%uint32(len(r.pool))]
 		node = r.pool[pos%uint32(len(r.pool))]
 	}
 	}
 	r.mu.RUnlock()
 	r.mu.RUnlock()
-	return node
+	return node.Pick(qry)
 }
 }
 
 
 func (r *RoundRobin) Close() {
 func (r *RoundRobin) Close() {